Building a Baseline Model
It uses MLflow to keep track of the model accuracy, and to save the model for later use.
from mlflow.models.signature import infer_signature
from fbprophet import Prophet
from sklearn.metrics import mean_absolute_error, mean_squared_error
class FbProphetWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def predict(self, context, model_input):
return self.model.predict(model_input)
with mlflow.start_run(run_name='base_prophet_model'):
model_fbp = Prophet()
#for feature in exogenous_features:
# model_fbp.add_regressor(feature)
model_fbp.fit(train)
forecast = model_fbp.predict(test[["ds", "y"]])
test["Predicted_Prophet"] = forecast.yhat.values
MAPE = mean_absolute_percentage_error(test.y, test.Predicted_Prophet)
print(MAPE)
#mlflow.log_param('exogenous_features', exogenous_features)
mlflow.log_metric('RMSE', np.sqrt(mean_squared_error(test.y, test.Predicted_Prophet)))
mlflow.log_metric('MAPE', mean_absolute_percentage_error(test.y, test.Predicted_Prophet))
mlflow.log_metric('MAE', mean_absolute_error(test.y, test.Predicted_Prophet))
wrappedModel = FbProphetWrapper(model_fbp)
signature = infer_signature(test[["ds", "y"]], wrappedModel.predict(None, test[["ds", "y"]]))
mlflow.pyfunc.log_model("prophet_model", python_model=wrappedModel, signature=signature)
Registering the model in the MLflow Model Registry
By registering this model in the Model Registry, we can easily reference the model from anywhere within Databricks.
The following section shows how to do this programmatically, but we can also register a model using the UI
run_id = mlflow.search_runs(filter_string='tags.mlflow.runName = "base_prophet_model"').iloc[0].run_id
model_name = "inc_vol_pred"
model_version = mlflow.register_model(f"runs:/{run_id}/prophet_model", model_name)
We should now see the inc_vol_pred model in the Models page. To display the Models page, click the Models icon in the left sidebar.
Next, transition this model to staging and load it into this notebook from the model registry.
from mlflow.tracking import MlflowClient
client = MlflowClient()
client.transition_model_version_stage(
name=model_name,
version=model_version.version,
stage="Production",
)
The Models page now shows the model version in stage "Production”. We can now refer to the model using the path "models:/ inc_vol_pred/production".
model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")
# Sanity-check: This should match the AUC logged by MLflow
forecast = model.predict(test[["ds","y"]])
#print(forecast)
MAPE = mean_absolute_percentage_error(test.y, forecast.yhat.values)
print(f'MAPE: {MAPE}')
Experimenting with a hyper optimized model
The model performed well even without hyperparameter tuning.
The following code runs a parallel hyperparameter sweep to train multiple models in parallel, using Hyperopt and SparkTrials. As before, the code tracks the performance of each parameter configuration with MLflow.
from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK
from hyperopt.pyll import scope
from math import exp
import numpy as np
from mlflow.models.signature import infer_signature
from fbprophet import Prophet
class FbProphetWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def predict(self, context, model_input):
return self.model.predict(model_input)
search_space = { 'changepoint_prior_scale': hp.uniform('changepoint_prior_scale', 0.001, 0.5), 'seasonality_prior_scale': hp.uniform('seasonality_prior_scale', 0.01, 10), 'seasonality_mode': hp.choice('seasonality_mode', ['additive','multiplicative']) }
def train_model(params):
with mlflow.start_run(nested=True):
model_fbp =Prophet(changepoint_prior_scale = params['changepoint_prior_scale'],
seasonality_prior_scale = params['seasonality_prior_scale'],
seasonality_mode = params['seasonality_mode'])
model_fbp.fit(train)
forecast = model_fbp.predict(test[["ds", "y"]])
test["Predicted_Prophet_ht"] = forecast.yhat.values
#test["Predicted_Prophet"] = forecast.yhat.values
#mlflow.log_param('Parameters', params)
mlflow.log_param('changepoint_prior_scale', params['changepoint_prior_scale'])
mlflow.log_param('seasonality_prior_scale', params['seasonality_prior_scale'])
mlflow.log_param('seasonality_mode', params['seasonality_mode'])
MAPE = mean_absolute_percentage_error(test.y, test.Predicted_Prophet_ht)
print(MAPE)
#mlflow.log_param('exogenous_features', exogenous_features)
mlflow.log_metric('RMSE', np.sqrt(mean_squared_error(test.y, test.Predicted_Prophet_ht)))
mlflow.log_metric('MAPE', mean_absolute_percentage_error(test.y, test.Predicted_Prophet_ht))
mlflow.log_metric('MAE', mean_absolute_error(test.y, test.Predicted_Prophet_ht))
#mlflow.log_metric('MAPE', mean_absolute_percentage_error(test.Freq, test.Predicted_Prophet))
#mlflow.log_metric('MAE', mean_absolute_error(test.Freq, test.Predicted_Prophet))
#model_fbp.plot_components(forecast)
#test[["Freq", "Predicted_ARIMAX", "Predicted_Prophet"]].plot(figsize=(14, 7))
wrappedModel = FbProphetWrapper(model_fbp)
# Log the model with a signature that defines the schema of the model's inputs and outputs.
# When the model is deployed, this signature will be used to validate inputs.
signature = infer_signature(train, model_fbp.predict())
mlflow.pyfunc.log_model("prophet_model", python_model=wrappedModel, signature=signature)
return {'status': STATUS_OK, 'loss': MAPE}
#spark_trials = SparkTrials(parallelism=4)
trials = Trials()
rstate = np.random.RandomState(42)
with mlflow.start_run(run_name='hyperoptimized_prophet_model'):
best_params = fmin(
fn=train_model,
space=search_space,
algo=tpe.suggest,
max_evals=10,
trials=trials
)
Use MLflow to view the results
Open the Experiment Runs sidebar to see the MLflow runs.
MLflow tracks the parameters and performance metrics of each run.
We used MLflow to log the model produced by each hyperparameter configuration. The following code finds the best performing run and saves the model to the model registry.
best_run = mlflow.search_runs(order_by=['metrics.MAPE ASC']).iloc[0]
print(f'MAPE of Best Run: {best_run["metrics.MAPE"]}')
#best_run
Updating the production wine_quality model in the MLflow Model Registry
Earlier, we saved the baseline model to the Model Registry under " inc_vol_pred ". Now that you have a created a more accurate model, update inc_vol_pred.
new_model_version = mlflow.register_model(f"runs:/{best_run.run_id}/prophet_model", model_name) version = result.version
Click Models in the left sidebar to see that the inc_vol_pred model now has two versions.
The following code promotes the new version to production.
# Archive the old model version
from mlflow.tracking import MlflowClient
client = MlflowClient()
client.transition_model_version_stage(
name=model_name,
version=model_version.version,
stage="Archived"
)
# Promote the new model version to Production
client.transition_model_version_stage(
name=model_name,
version=new_model_version.version,
stage="Production"
)
Clients that call load_model now receive the new model.
loaded_model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")
forecast = loaded_model.predict(pd.DataFrame(test[["ds","y"]]))
#print(forecast)
MAPE = mean_absolute_percentage_error(test.y, forecast.yhat.values)
print(f'MAPE: {MAPE}')