Showing posts with label mlflow. Show all posts
Showing posts with label mlflow. Show all posts

Saturday, 28 January 2023

End-to-end example of Predictive models with MLFlow

 Building a Baseline Model

It uses MLflow to keep track of the model accuracy, and to save the model for later use.


from mlflow.models.signature import infer_signature

from fbprophet import Prophet

from sklearn.metrics import mean_absolute_error, mean_squared_error

class FbProphetWrapper(mlflow.pyfunc.PythonModel):

def __init__(self, model):

self.model = model

def predict(self, context, model_input):

return self.model.predict(model_input)

with mlflow.start_run(run_name='base_prophet_model'):

model_fbp = Prophet()

#for feature in exogenous_features:

# model_fbp.add_regressor(feature)

model_fbp.fit(train)

forecast = model_fbp.predict(test[["ds", "y"]])

test["Predicted_Prophet"] = forecast.yhat.values

MAPE = mean_absolute_percentage_error(test.y, test.Predicted_Prophet)

print(MAPE)

#mlflow.log_param('exogenous_features', exogenous_features)

mlflow.log_metric('RMSE', np.sqrt(mean_squared_error(test.y, test.Predicted_Prophet)))

mlflow.log_metric('MAPE', mean_absolute_percentage_error(test.y, test.Predicted_Prophet))

mlflow.log_metric('MAE', mean_absolute_error(test.y, test.Predicted_Prophet))

wrappedModel = FbProphetWrapper(model_fbp)

signature = infer_signature(test[["ds", "y"]], wrappedModel.predict(None, test[["ds", "y"]]))

mlflow.pyfunc.log_model("prophet_model", python_model=wrappedModel, signature=signature)


Registering the model in the MLflow Model Registry

By registering this model in the Model Registry, we can easily reference the model from anywhere within Databricks.

The following section shows how to do this programmatically, but we can also register a model using the UI


run_id = mlflow.search_runs(filter_string='tags.mlflow.runName = "base_prophet_model"').iloc[0].run_id

model_name = "inc_vol_pred"

model_version = mlflow.register_model(f"runs:/{run_id}/prophet_model", model_name)


We should now see the inc_vol_pred model in the Models page. To display the Models page, click the Models icon in the left sidebar.

Next, transition this model to staging and load it into this notebook from the model registry.


from mlflow.tracking import MlflowClient

client = MlflowClient()

client.transition_model_version_stage(

name=model_name,

version=model_version.version,

stage="Production",

)


The Models page now shows the model version in stage "Production”. We can now refer to the model using the path "models:/ inc_vol_pred/production".


model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")

# Sanity-check: This should match the AUC logged by MLflow

forecast = model.predict(test[["ds","y"]])

#print(forecast)

MAPE = mean_absolute_percentage_error(test.y, forecast.yhat.values)

print(f'MAPE: {MAPE}')


Experimenting with a hyper optimized model

The model performed well even without hyperparameter tuning.

The following code runs a parallel hyperparameter sweep to train multiple models in parallel, using Hyperopt and SparkTrials. As before, the code tracks the performance of each parameter configuration with MLflow.


from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK

from hyperopt.pyll import scope

from math import exp

import numpy as np

from mlflow.models.signature import infer_signature

from fbprophet import Prophet

class FbProphetWrapper(mlflow.pyfunc.PythonModel):

def __init__(self, model):

self.model = model

def predict(self, context, model_input):

return self.model.predict(model_input)


search_space = { 'changepoint_prior_scale': hp.uniform('changepoint_prior_scale', 0.001, 0.5), 'seasonality_prior_scale': hp.uniform('seasonality_prior_scale', 0.01, 10), 'seasonality_mode': hp.choice('seasonality_mode', ['additive','multiplicative']) }




def train_model(params):

with mlflow.start_run(nested=True):


model_fbp =Prophet(changepoint_prior_scale = params['changepoint_prior_scale'],

seasonality_prior_scale = params['seasonality_prior_scale'],

seasonality_mode = params['seasonality_mode'])

model_fbp.fit(train)

forecast = model_fbp.predict(test[["ds", "y"]])

test["Predicted_Prophet_ht"] = forecast.yhat.values

#test["Predicted_Prophet"] = forecast.yhat.values

#mlflow.log_param('Parameters', params)

mlflow.log_param('changepoint_prior_scale', params['changepoint_prior_scale'])

mlflow.log_param('seasonality_prior_scale', params['seasonality_prior_scale'])

mlflow.log_param('seasonality_mode', params['seasonality_mode'])

MAPE = mean_absolute_percentage_error(test.y, test.Predicted_Prophet_ht)

print(MAPE)

#mlflow.log_param('exogenous_features', exogenous_features)

mlflow.log_metric('RMSE', np.sqrt(mean_squared_error(test.y, test.Predicted_Prophet_ht)))

mlflow.log_metric('MAPE', mean_absolute_percentage_error(test.y, test.Predicted_Prophet_ht))

mlflow.log_metric('MAE', mean_absolute_error(test.y, test.Predicted_Prophet_ht))

#mlflow.log_metric('MAPE', mean_absolute_percentage_error(test.Freq, test.Predicted_Prophet))

#mlflow.log_metric('MAE', mean_absolute_error(test.Freq, test.Predicted_Prophet))

#model_fbp.plot_components(forecast)

#test[["Freq", "Predicted_ARIMAX", "Predicted_Prophet"]].plot(figsize=(14, 7))

wrappedModel = FbProphetWrapper(model_fbp)

# Log the model with a signature that defines the schema of the model's inputs and outputs.

# When the model is deployed, this signature will be used to validate inputs.

signature = infer_signature(train, model_fbp.predict())

mlflow.pyfunc.log_model("prophet_model", python_model=wrappedModel, signature=signature)

return {'status': STATUS_OK, 'loss': MAPE}

#spark_trials = SparkTrials(parallelism=4)

trials = Trials()

rstate = np.random.RandomState(42)

with mlflow.start_run(run_name='hyperoptimized_prophet_model'):

best_params = fmin(

fn=train_model,

space=search_space,

algo=tpe.suggest,

max_evals=10,

trials=trials

)


Use MLflow to view the results

Open the Experiment Runs sidebar to see the MLflow runs.

MLflow tracks the parameters and performance metrics of each run.



We used MLflow to log the model produced by each hyperparameter configuration. The following code finds the best performing run and saves the model to the model registry.


best_run = mlflow.search_runs(order_by=['metrics.MAPE ASC']).iloc[0]

print(f'MAPE of Best Run: {best_run["metrics.MAPE"]}')

#best_run


Updating the production wine_quality model in the MLflow Model Registry

Earlier, we saved the baseline model to the Model Registry under " inc_vol_pred ". Now that you have a created a more accurate model, update inc_vol_pred.


new_model_version = mlflow.register_model(f"runs:/{best_run.run_id}/prophet_model", model_name) version = result.version


Click Models in the left sidebar to see that the inc_vol_pred model now has two versions.

The following code promotes the new version to production.


# Archive the old model version

from mlflow.tracking import MlflowClient

client = MlflowClient()

client.transition_model_version_stage(

name=model_name,

version=model_version.version,

stage="Archived"

)


# Promote the new model version to Production

client.transition_model_version_stage(

name=model_name,

version=new_model_version.version,

stage="Production"

)


Clients that call load_model now receive the new model.


loaded_model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")

forecast = loaded_model.predict(pd.DataFrame(test[["ds","y"]]))

#print(forecast)

MAPE = mean_absolute_percentage_error(test.y, forecast.yhat.values)

print(f'MAPE: {MAPE}')