Saturday 28 January 2023

End-to-end example of Predictive models with MLFlow

 Building a Baseline Model

It uses MLflow to keep track of the model accuracy, and to save the model for later use.


from mlflow.models.signature import infer_signature

from fbprophet import Prophet

from sklearn.metrics import mean_absolute_error, mean_squared_error

class FbProphetWrapper(mlflow.pyfunc.PythonModel):

def __init__(self, model):

self.model = model

def predict(self, context, model_input):

return self.model.predict(model_input)

with mlflow.start_run(run_name='base_prophet_model'):

model_fbp = Prophet()

#for feature in exogenous_features:

# model_fbp.add_regressor(feature)

model_fbp.fit(train)

forecast = model_fbp.predict(test[["ds", "y"]])

test["Predicted_Prophet"] = forecast.yhat.values

MAPE = mean_absolute_percentage_error(test.y, test.Predicted_Prophet)

print(MAPE)

#mlflow.log_param('exogenous_features', exogenous_features)

mlflow.log_metric('RMSE', np.sqrt(mean_squared_error(test.y, test.Predicted_Prophet)))

mlflow.log_metric('MAPE', mean_absolute_percentage_error(test.y, test.Predicted_Prophet))

mlflow.log_metric('MAE', mean_absolute_error(test.y, test.Predicted_Prophet))

wrappedModel = FbProphetWrapper(model_fbp)

signature = infer_signature(test[["ds", "y"]], wrappedModel.predict(None, test[["ds", "y"]]))

mlflow.pyfunc.log_model("prophet_model", python_model=wrappedModel, signature=signature)


Registering the model in the MLflow Model Registry

By registering this model in the Model Registry, we can easily reference the model from anywhere within Databricks.

The following section shows how to do this programmatically, but we can also register a model using the UI


run_id = mlflow.search_runs(filter_string='tags.mlflow.runName = "base_prophet_model"').iloc[0].run_id

model_name = "inc_vol_pred"

model_version = mlflow.register_model(f"runs:/{run_id}/prophet_model", model_name)


We should now see the inc_vol_pred model in the Models page. To display the Models page, click the Models icon in the left sidebar.

Next, transition this model to staging and load it into this notebook from the model registry.


from mlflow.tracking import MlflowClient

client = MlflowClient()

client.transition_model_version_stage(

name=model_name,

version=model_version.version,

stage="Production",

)


The Models page now shows the model version in stage "Production”. We can now refer to the model using the path "models:/ inc_vol_pred/production".


model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")

# Sanity-check: This should match the AUC logged by MLflow

forecast = model.predict(test[["ds","y"]])

#print(forecast)

MAPE = mean_absolute_percentage_error(test.y, forecast.yhat.values)

print(f'MAPE: {MAPE}')


Experimenting with a hyper optimized model

The model performed well even without hyperparameter tuning.

The following code runs a parallel hyperparameter sweep to train multiple models in parallel, using Hyperopt and SparkTrials. As before, the code tracks the performance of each parameter configuration with MLflow.


from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK

from hyperopt.pyll import scope

from math import exp

import numpy as np

from mlflow.models.signature import infer_signature

from fbprophet import Prophet

class FbProphetWrapper(mlflow.pyfunc.PythonModel):

def __init__(self, model):

self.model = model

def predict(self, context, model_input):

return self.model.predict(model_input)


search_space = { 'changepoint_prior_scale': hp.uniform('changepoint_prior_scale', 0.001, 0.5), 'seasonality_prior_scale': hp.uniform('seasonality_prior_scale', 0.01, 10), 'seasonality_mode': hp.choice('seasonality_mode', ['additive','multiplicative']) }




def train_model(params):

with mlflow.start_run(nested=True):


model_fbp =Prophet(changepoint_prior_scale = params['changepoint_prior_scale'],

seasonality_prior_scale = params['seasonality_prior_scale'],

seasonality_mode = params['seasonality_mode'])

model_fbp.fit(train)

forecast = model_fbp.predict(test[["ds", "y"]])

test["Predicted_Prophet_ht"] = forecast.yhat.values

#test["Predicted_Prophet"] = forecast.yhat.values

#mlflow.log_param('Parameters', params)

mlflow.log_param('changepoint_prior_scale', params['changepoint_prior_scale'])

mlflow.log_param('seasonality_prior_scale', params['seasonality_prior_scale'])

mlflow.log_param('seasonality_mode', params['seasonality_mode'])

MAPE = mean_absolute_percentage_error(test.y, test.Predicted_Prophet_ht)

print(MAPE)

#mlflow.log_param('exogenous_features', exogenous_features)

mlflow.log_metric('RMSE', np.sqrt(mean_squared_error(test.y, test.Predicted_Prophet_ht)))

mlflow.log_metric('MAPE', mean_absolute_percentage_error(test.y, test.Predicted_Prophet_ht))

mlflow.log_metric('MAE', mean_absolute_error(test.y, test.Predicted_Prophet_ht))

#mlflow.log_metric('MAPE', mean_absolute_percentage_error(test.Freq, test.Predicted_Prophet))

#mlflow.log_metric('MAE', mean_absolute_error(test.Freq, test.Predicted_Prophet))

#model_fbp.plot_components(forecast)

#test[["Freq", "Predicted_ARIMAX", "Predicted_Prophet"]].plot(figsize=(14, 7))

wrappedModel = FbProphetWrapper(model_fbp)

# Log the model with a signature that defines the schema of the model's inputs and outputs.

# When the model is deployed, this signature will be used to validate inputs.

signature = infer_signature(train, model_fbp.predict())

mlflow.pyfunc.log_model("prophet_model", python_model=wrappedModel, signature=signature)

return {'status': STATUS_OK, 'loss': MAPE}

#spark_trials = SparkTrials(parallelism=4)

trials = Trials()

rstate = np.random.RandomState(42)

with mlflow.start_run(run_name='hyperoptimized_prophet_model'):

best_params = fmin(

fn=train_model,

space=search_space,

algo=tpe.suggest,

max_evals=10,

trials=trials

)


Use MLflow to view the results

Open the Experiment Runs sidebar to see the MLflow runs.

MLflow tracks the parameters and performance metrics of each run.



We used MLflow to log the model produced by each hyperparameter configuration. The following code finds the best performing run and saves the model to the model registry.


best_run = mlflow.search_runs(order_by=['metrics.MAPE ASC']).iloc[0]

print(f'MAPE of Best Run: {best_run["metrics.MAPE"]}')

#best_run


Updating the production wine_quality model in the MLflow Model Registry

Earlier, we saved the baseline model to the Model Registry under " inc_vol_pred ". Now that you have a created a more accurate model, update inc_vol_pred.


new_model_version = mlflow.register_model(f"runs:/{best_run.run_id}/prophet_model", model_name) version = result.version


Click Models in the left sidebar to see that the inc_vol_pred model now has two versions.

The following code promotes the new version to production.


# Archive the old model version

from mlflow.tracking import MlflowClient

client = MlflowClient()

client.transition_model_version_stage(

name=model_name,

version=model_version.version,

stage="Archived"

)


# Promote the new model version to Production

client.transition_model_version_stage(

name=model_name,

version=new_model_version.version,

stage="Production"

)


Clients that call load_model now receive the new model.


loaded_model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")

forecast = loaded_model.predict(pd.DataFrame(test[["ds","y"]]))

#print(forecast)

MAPE = mean_absolute_percentage_error(test.y, forecast.yhat.values)

print(f'MAPE: {MAPE}')

The rise of Entrepreneurial Engineers

 

I often use the sentence – Good engineers build things; great engineers build valuable things. But the idea of value is abstract, what might be valuable to one person may not be valuable to another. But it is not so, when it comes to profitable organizations. Here ultimately it should end up – in contributing to organization value while complying all governance checks. I have seen engineering colleagues wondering if we should worry about all this. May be 5 years back the answer would have been not necessary, but now it’s necessary. The rise of Entrepreneurial Engineers is like what we observed with rise of Citizen Data Scientist, but also the other way around. The idea is, though this engineering team still works with the Business Owners and Product owner & management team, they have skin in the game of what gets build. They understand business priorities and are driven towards solving customer problem and adding business values. They are aware how to apply their engineering mindset to solve impactful problem using sustainable solutions. 

With teams I work with, it is something we continuously work towards. Some of the steps, I work towards with my team, and I can recommend are:

Before adopting any feature, we try to ask ourselves:

  1.  How will this feature add value?
  2. What will happen if we don’t adopt this feature?
  3.  How is this problem currently addressed?
  4. Can we find out the financial impact & Return on Investment on this feature?

All this can be documented under different artifacts like – CONOPS document, Value document, User story etc. 

Teams I work with, with experience, we have discovered, learned, and created a process of continuous – conversation & collaboration. It is a cycle of continuous conversation with other stake holders. Some of the check points we have developed over the years, are - 

  1. Do an internal tech team analysis – feasibility, compatibility & valuation analysis
  2. Present understanding, POC or plan to business leaders, owners & SMEs.
  3. Broader Technology team peer presentation
  4. Discussion with vendor to clarify assumptions, if required
  5. Discuss with enterprise architects how this piece will interact with overall architecture
  6. Then start the implementation

But there may be scenarios, where, before being able to do any of this, you might need to have an MVP in place before you can have any discussion. This strategy is extremely useful when the overall team is not too sure, building what would be valuable. Its when, all the stakeholders, see a tangible MVP in front of them, they start to share improvement ideas. In most new product development lifecycles, first few features, are developed like this, until the process matures.

In the last decade with more engineers in board rooms than ever, this new breed of entrepreneurial engineers will be a significant player in the organizational vision & roadmap. The organizations which will thrive & grow, will need to have 2 things:


·         Superior product-market fitment

·         And a good product - Design & Architecture

 

Organizations should put a lot of focus in these two areas. Once these two are in place, it enables the Sales & Marketing team to effectively do theirs. Organizations & teams should be very aware that all operation & enabling process should be focused around improving these 2 areas. Once a solid foundation of these is in place, Customer relationship & Delivery should take care of themselves.