Connect with us

AI

Handling Missing Data For Advanced Machine Learning

Throughout this article, you will become good at spotting, understanding, and imputing missing data. We demonstrate various imputation techniques on a real-world logistic regression task using Python. Properly handling missing data has an improving effect on inferences and predictions. This is not to be ignored. The first part of this article presents the framework for […]

The post Handling Missing Data For Advanced Machine Learning appeared first on TOPBOTS.

Published

on

Throughout this article, you will become good at spotting, understanding, and imputing missing data. We demonstrate various imputation techniques on a real-world logistic regression task using Python. Properly handling missing data has an improving effect on inferences and predictions. This is not to be ignored.

The first part of this article presents the framework for understanding missing data. Later we demonstrate the most popular strategies in dealing with missingness on a classification task to predict the onset of diabetes.

Missing data is hard to avoid

A considerable part of data science or machine learning job is data cleaning. Often when data is collected, there are some missing values appearing in the dataset.

To understand the reason why data goes missing, let’s simulate a dataset with two predictors x1x2, and a response variable y.

We will virtually make some data missing to illustrate various reasons why many real-world datasets may contain missing values.

missing data

There are 3 major types of missing values to be concerned about.

Missing Completely at Random (MCAR)

MCAR occurs when the probability of missing values in a variable is the same for all samples.

For example, when a survey is conducted, and values were just randomly missed when being entered in the computer or a respondent chose not to respond to a question.

There is no effect of MCAR on inferences made by a model trained on such data.

To illustrate MCAR, we randomly remove half of the values for x1 as follows.

## Missing Completely at Random (MCAR) # randomly mark half of x1 samples as missing MCAR
# independend of any information recorded
idx_mcar= np.random.choice([0, 1], size=(100,)) == 1 plt.scatter(x1,y, label='data')
plt.scatter(x1[idx_mcar],y[idx_mcar], label='missing (MCAR)', color='red')
plt.xlabel('x1')
plt.ylabel('y')
plt.legend()
plt.title('Missing Completely at Random (MCAR)');
missing data

x1 samples (red-marked), which are missing completely at random (MCAR), depend neither on x1 values nor on the values of any other measured variables.

Missing at Random (MAR)

The probability of missing values, at random, in a variable depends only on the available information in other predictors.

For example, when men and women respond to the question “have you ever taken parental leave?”, men would tend to ignore the question at a different rate compared to women.

MARs are handled by using the information in the other predictors to build a model and impute a value for the missing entry.

We simulate MAR by removing x1 values depending on x2 values. When x2 has the value 1, then the corresponding x1 is missing.

## Missing at Random (MAR) # randomly mark half of x1 samples as missing MAR
# depending on value of recorded predictor x2 idx_mar = x2 == 1 fig, ax = plt.subplots(1,2,figsize=(15,5))
ax[0].scatter(x1, y, label='data')
ax[0].scatter(x1[idx_mar], y[idx_mar], label='missing', color='red')
ax[0].set_xlabel('x1')
ax[0].set_ylabel('y')
ax[0].legend()
ax[0].set_title('Missing at Random (MAR)');
ax[1].vlines(x1[x2 == 1], 0, 1, color='black')
ax[1].set_xlabel('x1')
ax[1].set_ylabel('x2')
ax[1].set_title('dependent predictor - measured');
missing data

x1 samples (red-marked), which are missing at random (MAR), are related to the values of another dependent and measured variable (x2) inside the dataset. When x2 has the boolean value 1, then x1 is missing.

Missing Not at Random (MNAR)

The probability of missing values, not at random, depends on information that has not been recorded, and this information also predicts the missing values.

For example, in a survey, cheaters are less likely to respond when asked if they have ever cheated.

MNARs are almost impossible to handle.

Luckily there shouldn’t be any effect of MNAR on inferences made by a model trained on such data.

Assuming that there is a hypothetical variable x3 that was not measured, but determines which x1 values are missing, we can simulate MNAR as follows.

## Missing Not at Random (MNAR) # randomly mark half of x1 samples as missing MNAR
# depending on unrecorded predictor x3
x3 = np.random.uniform(0, 1, 100)
idx_mnar = x3 > .5 fig, ax = plt.subplots(1,2,figsize=(15,5))
ax[0].scatter(x1, y, label='data')
ax[0].scatter(x1[idx_mnar], y[idx_mnar], label='missing', color='red')
ax[0].set_xlabel('x1')
ax[0].set_ylabel('y')
ax[0].legend()
ax[0].set_title('Missing Not at Random (MNAR)');
ax[1].scatter(x1, x3, color='black')
ax[1].axhline(.5, -3, 3)
ax[1].set_xlabel('x1')
ax[1].set_ylabel('x3')
ax[1].set_title('dependent predictor - not measured');
missing data

x1 samples (red-marked), which are missing not at random (MNAR), are related to the values of another dependent variable (x3) outside the dataset. When x3 is greater than 0.5, than x1 value is missing.

Do you like this in-depth educational content on applied machine learning? Subscribe to our Enterprise AI mailing list to be alerted when we release new material.

3 Main Approaches to Compensate for Missing Values

It generally cannot be determined whether data are missing at random, or whether the missingness depends on unobserved predictors or the missing data themselves.

In practice, the missing at random assumption is reasonable.

Several different approaches to imputing missing values are found in the literature:

1. Imputation using zero, mean, median or most frequent value

This works by imputing all missing values with zero, the mean or median for quantitative variables, or the most common value for categorical variables.

Additionally, we can create a new variable that is an indicator of missingness and includes it in the model to predict the response. This is done after plugging in zero, mean, median, or most frequent value in the actual variable.

2. Imputation using a randomly selected value

This works by randomly selecting an observed entry in the variable and use it to impute missing values.

3. Imputation with a model

This works by replacing missing values with predicted values from a model based on the other observed predictors.

The k nearest neighbor algorithm is often used to impute a missing value based on how closely it resembles the points in the training set.

Model-based imputation with uncertainty works by replacing missing values with predicted values plus randomness from a model based on the other observed predictors.

Model-based progressive imputation uses previously imputed missing values to predict other missing values.

Additional methods include Stochastic Regression Imputation, Multiple ImputationsDatawig, Hot-Deck imputation, Extrapolation, Interpolation, Listwise Deletion.

missing data

There is nothing missing here — Photo By Nika Akin on Pixabay

A Practical Guide for Handling Missing Values

Describing the data

In the following sections, we are going to use the Pima Indians onset of diabetes dataset to be found here.

This dataset describes patient medical record data for Pima Indians and whether they had an onset of diabetes within five years.

It is a binary classification problem (onset of diabetes as 1 or not as 0).

The input variables that describe each patient are numerical and have varying scales.

The list below shows the eight attributes plus the target variable for the dataset:

  • The number of times the patient was pregnant.
  • 2-hours plasma glucose concentration per 2 hours in an oral glucose tolerance test.
  • Diastolic blood pressure (mm Hg).
  • Triceps skinfold thickness (mm).
  • 2-hours serum insulin (mu U/ml).
  • Body mass index.
  • Diabetes pedigree function.
  • Age (years).
  • Outcome (1 for early onset of diabetes within five years, 0 for not).
import pandas as pd pima_df = pd.read_csv('pima-indians-diabetes.csv') response = 'Outcome'
predictors = pima_df.columns.difference([response]).values pima_df.describe()

missing data

The data has 764 observations and 9 features. We immediately see strange values for some of the predictors.

Pregnancies range up to 17. The blood pressure, glucose, skin thickness, insulin, and BMI variables include zeros, which are physically implausible.

We will have to handle these somehow.

Properly flagging missing values

Although missing values are usually coded using NaN, Null, or None, it seems that none of the observations are marked in this way.

missing data

As a first step we should properly flag missing values.

In terms of missing data, the variables we need to look at most closely are Glucose, BloodPressure, SkinThickness, Insulin, and BMI, all of which contain 0 among their observations.

A quick search in the literature shows that these features cannot have a physiological value of zero. The most plausible explanation is that missing observations were somehow replaced with zero.

This disguised missing data would mislead our later classification attempts. We will clean the data by marking disguised missing values clearly as NaN.

The response variable, which should be coded as 0 or 1, contained values with \ or } appended to the 0’s and 1s. It looks like an error similar to those introduced when reading from or writing CSVs have found their way into the data. The solution we choose is simply to remove the characters.

Looking at the data types, predictors are stored as float and the response as an object. The float data type makes sense for BMI and DiabetesPedigreeFunction. The remaining features can be stored as integers.

The following function cleans the data and replaces zeros with NaNs for the five columns discussed: GlucoseBloodPressureSkinThicknessInsulin, and BMI.

def clean_data(df_raw, cols_with_zeros=['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI'], response = ['Outcome']): df = df_raw.copy() # replace zero with NaN in features df[cols_with_zeros] = df[cols_with_zeros].replace(0, np.nan) # remove \ and } from response df = df.replace(to_replace=r'\\|\}', value='', regex=True) # change response data type to int df[response] = df[response].astype('int') return df pima_df_cleaned = clean_data(pima_df)
pima_df_cleaned.head()

missing data

We have replaced any zero and indicated missing values with NaN.

Exploring missing values

We can now calculate the proportion of missing values for each feature: 48.56% of Insulin, 29.58% of SkinThickness, 4.58% of BloodPressure, 1.43% of BMI, and 0.65% of Glucose. The remaining features do not have any missing values.

print("Proportion of missing values")
missing_values_count = pima_df_cleaned.isna().sum()*100/pima_df_cleaned.shape[0]
features_with_missing_values = missing_values_count[missing_values_count>0].index.values
missing_values_count

missing data

As a second step, we might need to perform some statistical tests of the hypothesis that the features are Missing at Random (MAR), Missing Completely at Random (MCAR), or Missing not at Random (MNAR).

By looking closer, it seems that missing values for SkinThickness are correlated with missing values for Insulin. When SkinThickness is missing, then Insulin is also missing.

Furthermore, when BloodPressure or BMI is missing, then the probability is higher that Insulin or SkinThickness values will be missing as well.

missing data

As a third step, we explore and choose the most appropriate technique to handle missing values.

Strategies for handling missing data

We move on by providing a Python function where the following data imputation strategies are implemented.

The drop strategy removes all observations where at least one of the features has a missing value (NaN).

The mean strategy replaces any missing value (NaN) by the mean of all values available for that feature.

The median strategy and most-frequent strategy replace missing values by the median or the most frequently appearing values, respectively.

The model-based strategy uses the features without missing values for training kNN regression models.

We choose kNN in order to capture the variability of available data. This would not be the case if we would use a linear regression model that would predict missing values along a regression line.

We distinguish between two modes in this model-based strategy. The following features are used as predictors in the basic mode: Age, DiabetesPedigreeFunction, Outcome, Pregnancies. The fitted model is used to predict the missing values in the remaining features.

In the progressive mode, after we fill in missing values in a given feature, we consider the feature as a predictor for estimating the missing values of the next feature.

The imputed data can be optionally standardized between 0 and 1. This might improve the performance of classification using regularized logistic regression.

Because the features are on a different scale (e.g., Age vs. Insulin) and their values range differs (e.g., Pedigree vs. Insulin), the shrinkage penalty could be wrongly calculated if features are not scaled.

from sklearn.impute import SimpleImputer
from sklearn.neighbors import KNeighborsRegressor
from sklearn.preprocessing import MinMaxScaler # function for KNN model-based imputation of missing values using features without NaN as predictors
def impute_model_basic(df): cols_nan = df.columns[df.isna().any()].tolist() cols_no_nan = df.columns.difference(cols_nan).values for col in cols_nan: test_data = df[df[col].isna()] train_data = df.dropna() knr = KNeighborsRegressor(n_neighbors=5).fit(train_data[cols_no_nan], train_data[col]) df.loc[df[col].isna(), col] = knr.predict(test_data[cols_no_nan]) return df # function for KNN model-based imputation of missing values using features without NaN as predictors, # including progressively added imputed features
def impute_model_progressive(df): cols_nan = df.columns[df.isna().any()].tolist() cols_no_nan = df.columns.difference(cols_nan).values while len(cols_nan)>0: col = cols_nan[0] test_data = df[df[col].isna()] train_data = df.dropna() knr = KNeighborsRegressor(n_neighbors=5).fit(train_data[cols_no_nan], train_data[col]) df.loc[df[col].isna(), col] = knr.predict(test_data[cols_no_nan]) cols_nan = df.columns[df.isna().any()].tolist() cols_no_nan = df.columns.difference(cols_nan).values return df # function for imputing missing data according to a given impute_strategy:
# drop_rows: drop all rows with one or more missing values
# drop_cols: drop columns with one or more missing values
# model_basic: KNN-model-based imputation with fixed predictors
# model_progressive: KNN-model-based imputation with progressively added predictors
# mean, median, most_frequent: imputation with mean, median or most frequent values
#
# cols_to_standardize: if provided, the specified columns are scaled between 0 and 1, after imputation
def impute_data(df_cleaned, impute_strategy=None, cols_to_standardize=None): df = df_cleaned.copy() if impute_strategy == 'drop_rows': df = df.dropna(axis=0) elif impute_strategy == 'drop_cols': df = df.dropna(axis=1) elif impute_strategy == 'model_basic': df = impute_model_basic(df) elif impute_strategy == 'model_progressive': df = impute_model_progressive(df) else: arr = SimpleImputer(missing_values=np.nan,strategy=impute_strategy).fit( df.values).transform(df.values) df = pd.DataFrame(data=arr, index=df.index.values, columns=df.columns.values) if cols_to_standardize != None: cols_to_standardize = list(set(cols_to_standardize) & set(df.columns.values)) df[cols_to_standardize] = df[cols_to_standardize].astype('float') df[cols_to_standardize] = pd.DataFrame(data=MinMaxScaler().fit( df[cols_to_standardize]).transform(df[cols_to_standardize]), index=df[cols_to_standardize].index.values, columns=df[cols_to_standardize].columns.values) return df
missing data

Onset of diabetes? Photo By Myriam on Pixabay

Logistic regression with missing data

In this section, we fit a logistic regression model on the cleaned data after applying a specific imputation strategy:

  • dropping rows with missing values,
  • dropping columns with missing values,
  • imputing missing values with column mean,
  • imputing missing values with model-based prediction,
  • imputing missing values with progressive model-based prediction.
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegressionCV
from sklearn.metrics import accuracy_score
from timeit import default_timer as timer # function for handling missing values # and fitting logistic regression on clean data
def logistic_regression(data, impute_strategy=None, cols_to_standardize=None, test_size=0.25, random_state=9001): start = timer() # store original columns original_columns = data.columns.difference(['Outcome']) df_imputed = impute_data(data, impute_strategy, cols_to_standardize) train_data, test_data = train_test_split(df_imputed, test_size=test_size, random_state=random_state) # note which predictor columns were dropped or kept kept_columns = df_imputed.columns.difference(['Outcome']) dropped_columns = original_columns.difference(df_imputed.columns) original_columns = original_columns.difference(['Outcome']) # prepare tensors X_train = train_data.drop(columns=['Outcome']) y_train = train_data['Outcome'] X_test = test_data.drop(columns=['Outcome']) y_test = test_data['Outcome'] # model training logistic_model = LogisticRegressionCV(cv=10, penalty='l2', max_iter=1000).fit( X_train, y_train) # model evaluation train_score = accuracy_score(y_train, logistic_model.predict(X_train)) test_score = accuracy_score(y_test, logistic_model.predict(X_test)) duration = timer() - start print("Classification rate on training data: {}".format(train_score)) print("Classification rate on test data: {}".format(test_score)) print("Execution time: {}".format(duration)) return { 'imputation strategy': impute_strategy, 'standardized': cols_to_standardize!=None, 'model': logistic_model, 'train score': train_score, 'test score': test_score, 'execution time (s)': duration } # list to store models' performance lr_results = [] # prepare data
pima_df_cleaned = clean_data(pima_df)
cols_to_standardize=['Age','BMI','BloodPressure','Glucose','Insulin','Pregnancies','SkinThickness','DiabetesPedigreeFunction'] # fit logistic regression for each imputation strategy
# with and without standardizing features
for impute_strategy in ['drop_rows', 'mean', 'model_basic', 'model_progressive']: for cols in [None, cols_to_standardize]: result = logistic_regression(pima_df_cleaned, impute_strategy=impute_strategy, cols_to_standardize=cols) lr_results.append(result) # display logistic regression performance
lr_results_df = pd.DataFrame(lr_results)
lr_results_df.drop(['model'], axis=1).drop_duplicates()

It is worth noticing the significant effect that imputation brings to the values of estimated parameters and their accuracy.

missing data

Effect of dropping strategy on accuracy

By dropping rows or columns with missing values, we lose valuable information that might have a significant impact on the response variable.

The consequence is overfitting in the training dataset and worse prediction performance on the test dataset.

Effect of mean/model imputation on accuracy

The mean imputation of missing data reduces overfitting and improves the prediction on the test dataset.

Classification is the best when a model-based is used when imputing missing data. This is because the original variance of the data is better approached when using k-nearest neighbors as a replacement of missing data.

Effect of imputation on training time

The computational complexity is assessed by measuring the cumulative execution time of imputation, logistic regression model fitting, and prediction.

The execution time for the model-based approach is the highest when predictors are not standardized. Calculating the euclidian distance to nearest neighbors requires more execution time than calculating the mean of data.

When dealing with a very large number of observations, we might prefer mean imputation at the cost of lower classification accuracy.

Effect of imputation on inference

We retrieve the coefficients estimated by our regularized logistic regression models as follows.

# get index of strategies
lr_results_df = pd.DataFrame(lr_results)
strategies = lr_results_df['imputation strategy'] # get a boolean array where True => standardized
standardized = lr_results_df['standardized']
st = lambda s: ' standardized' if s else ''
coefs_ = {}
for key, value in enumerate(strategies): if value == 'drop_cols': # skip pass else: strategy = value + st(standardized[key]) coefs_[strategy] = lr_results_df['model'][key].coef_[0]
coef_df = pd.DataFrame(data=coefs_, index=predictors)
coef_df.T

missing data

The coefficients in the table show how each predictor affects the likelihood of diabetes occurrence.

Positive values indicate factors in favor of the onset of diabetes.

Values around zero suggest that the associated predictors do not contribute much to diabetes onset.

The table shows inferred coefficients after applying an imputation strategy for missing data. Coefficients are also provided as obtained by logistic regression fit on standardized predictors values (after imputation).

  • Overall standardized coefficients are larger.
  • When rows/columns are dropped, the coefficient estimates change drastically, compared to their values when missing data is imputed by mean or model.
  • Coefficients for Insulin do not vary much between imputation methods. Insulin had the highest percentage of missing values between all predictors. This suggests that Insulin might be Missing Completely At Random (MCAR).

The following table compares the effect of mean imputation and model-based imputation on the coefficient magnitude obtained after dropping rows with missing data.

coef_perc_df = coef_df.copy()
cols = coef_df.columns.difference(['drop_rows']).values
for col in cols: coef_perc_df[col] = np.round(100*(coef_df[col]/coef_df['drop_rows']-1))
coef_perc_df[['drop_rows','mean','model_basic','model_progressive']]

missing data

The first column shows the coefficient estimates for the logistic model trained on data where rows with missing values where removed.

The other columns show the percentage change of coefficients values after imputing missing values, compared to the drop strategy.

Age, blood pressure, and Pedigree function have the highest percentage change between dropping rows and mean/model-based imputations.

The dropping strategy is usually not the golden way.

missing data

Coloring By Sophie Madeleine

Conclusion

In this article, we demonstrated how cleaning data and handling missing values would mean much better performance of Machine Learning algorithms.

Differentiating between MCAR, MAR, MNAR types of missing data is essential because they can affect inferences and predictions significantly.

Although there is no perfect way to handle missing data, you should be aware of the different methods available.

Compensating missing data and capitalizing on insights gained from them is one of the time-consuming parts of a Data Scientist’s job.

A model for self-driving cars that has learned from an insufficiently diverse training set is an interesting example. If the car is unsure where there is a pedestrian on the road, we would expect it to let the driver take charge.

This article was originally published in the Medium and re-published to TOPBOTS with permission from the author.

Enjoy this article? Sign up for more updates on applied ML.

We’ll let you know when we release more technical education.

AI

Using Amazon SageMaker inference pipelines with multi-model endpoints

Businesses are increasingly deploying multiple machine learning (ML) models to serve precise and accurate predictions to their consumers. Consider a media company that wants to provide recommendations to its subscribers. The company may want to employ different custom models for recommending different categories of products—such as movies, books, music, and articles. If the company wants […]

Published

on

Businesses are increasingly deploying multiple machine learning (ML) models to serve precise and accurate predictions to their consumers. Consider a media company that wants to provide recommendations to its subscribers. The company may want to employ different custom models for recommending different categories of products—such as movies, books, music, and articles. If the company wants to add personalization to the recommendations by using individual subscriber information, the number of custom models further increases. Hosting each custom model on a distinct compute instance is not only cost prohibitive, but also leads to underutilization of the hosting resources if not all models are frequently used.

Amazon SageMaker is a fully managed service that enables developers and data scientists to quickly and easily build, train, and deploy ML models at any scale. After you train an ML model, you can deploy it on Amazon SageMaker endpoints that are fully managed and can serve inferences in real time with low latency. Amazon SageMaker multi-model endpoints (MMEs) are a cost-effective solution to deploy a large number of ML models or per-user models. You can deploy multiple models on a single multi-model enabled endpoint such that all models share the compute resources and the serving container. You get significant cost savings and also simplify model deployments and updates. For more information about MME, see Save on inference costs by using Amazon SageMaker multi-model endpoints.

The following diagram depicts how MMEs work.

Multiple model artifacts are persisted in an Amazon S3 bucket. When a specific model is invoked, Amazon SageMaker dynamically loads it onto the container hosting the endpoint. If the model is already loaded in the container’s memory, invocation is faster because Amazon SageMaker doesn’t need to download and load it.

Until now, you could use MME with several frameworks, such as TensorFlow, PyTorch, MXNet, SKLearn, and build your own container with a multi-model server. This post introduces the following feature enhancements to MME:

  • MME support for Amazon SageMaker built-in algorithms – MME is now supported natively in the following popular Amazon SageMaker built-in algorithms: XGBoost, linear learner, RCF, and KNN. You can directly use the Amazon SageMaker provided containers while using these algorithms without having to build your own custom container.
  • MME support for Amazon SageMaker inference pipelines – The Amazon SageMaker inference pipeline model consists of a sequence of containers that serve inference requests by combining preprocessing, predictions, and postprocessing data science tasks. An inference pipeline allows you to reuse the same preprocessing code used during model training to process the inference request data used for predictions. You can now deploy an inference pipeline on an MME where one of the containers in the pipeline can dynamically serve requests based on the model being invoked.
  • IAM condition keys for granular access to models – Prior to this enhancement, an AWS Identity and Access Management (IAM) principal with InvokeEndpoint permission on the endpoint resource could invoke all the models hosted on that endpoint. Now, we support granular access to models using IAM condition keys. For example, the following IAM condition restricts the principal’s access to a model persisted in the Amazon Simple Storage Service (Amazon S3) bucket with company_a or common prefixes:
 Condition": { "StringLike": { "sagemaker:TargetModel": ["company_a/*", "common/*"] } }

We also provide a fully functional notebook to demonstrate these enhancements.

Walkthrough overview

To demonstrate these capabilities, the notebook discusses the use case of predicting house prices in multiple cities using linear regression. House prices are predicted based on features like number of bedrooms, number of garages, square footage, and more. Depending on the city, the features affect the house price differently. For example, small changes in the square footage cause a drastic change in house prices in New York City when compared to price changes in Houston.

For accurate house price predictions, we train multiple linear regression models, with a unique location-specific model per city. Each location-specific model is trained on synthetic housing data with randomly generated characteristics. To cost-effectively serve the multiple housing price prediction models, we deploy the models on a single multi-model enabled endpoint, as shown in the following diagram.

The walkthrough includes the following high-level steps:

  1. Examine the synthetic housing data generated.
  2. Preprocess the raw housing data using Scikit-learn.
  3. Train regression models using the built-in Amazon SageMaker linear learner algorithm.
  4. Create an Amazon SageMaker model with multi-model support.
  5. Create an Amazon SageMaker inference pipeline with an Sklearn model and multi-model enabled linear learner model.
  6. Test the inference pipeline by getting predictions from the different linear learner models.
  7. Update the MME with new models.
  8. Monitor the MME with Amazon CloudWatch
  9. Explore fine-grained access to models hosted on the MME using IAM condition keys.

Other steps necessary to import libraries, set up IAM permissions, and use utility functions are defined in the notebook, which this post doesn’t discuss. You can walk through and run the code with the following notebook on the GitHub repo.

Examining the synthetic housing data

The dataset consists of six numerical features that capture the year the house was built, house size in square feet, number of bedrooms, number of bathrooms, lot size, number of garages, and two categorical features: deck and front porch, indicating whether these are present or not.

To see the raw data, enter the following code:

_houses.head()

The following screenshot shows the results.

You can now preprocess the categorical variables (front_porch and deck) using Scikit-learn.

Preprocessing the raw housing data

To preprocess the raw data, you first create an SKLearn estimator and use the sklearn_preprocessor.py script as the entry_point:

#Create the SKLearn estimator with the sklearn_preprocessor.py as the script
from sagemaker.sklearn.estimator import SKLearn
script_path = 'sklearn_preprocessor.py'
sklearn_preprocessor = SKLearn( entry_point=script_path, role=role, train_instance_type="ml.c4.xlarge", sagemaker_session=sagemaker_session_gamma)

You then launch multiple Scikit-learn training jobs to process the raw synthetic data generated for multiple locations. Before running the following code, take the training instance limits in your account and cost into consideration and adjust the PARALLEL_TRAINING_JOBS value accordingly:

preprocessor_transformers = [] for index, loc in enumerate(LOCATIONS[:PARALLEL_TRAINING_JOBS]): print("preprocessing fit input data at ", index , " for loc ", loc) job_name='scikit-learn-preprocessor-{}'.format(strftime('%Y-%m-%d-%H-%M-%S', gmtime())) sklearn_preprocessor.fit({'train': train_inputs[index]}, job_name=job_name, wait=True) ##Once the preprocessor is fit, use tranformer to preprocess the raw training data and store the transformed data right back into s3. transformer = sklearn_preprocessor.transformer( instance_count=1, instance_type='ml.m4.xlarge', assemble_with='Line', accept='text/csv' ) preprocessor_transformers.append(transformer)

When the preprocessors are properly fitted, preprocess the training data using batch transform to directly preprocess the raw data and store back into Amazon S3:

 preprocessed_train_data_path = [] for index, transformer in enumerate(preprocessor_transformers): transformer.transform(train_inputs[index], content_type='text/csv') print('Launching batch transform job: {}'.format(transformer.latest_transform_job.job_name)) preprocessed_train_data_path.append(transformer.output_path)

Training regression models

In this step, you train multiple models, one for each location.

Start by accessing the built-in linear learner algorithm:

from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(boto3.Session().region_name, 'linear-learner')
container

Depending on the Region you’re using, you receive output similar to the following:

	382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:1

Next, define a method to launch a training job for a single location using the Amazon SageMaker Estimator API. In the hyperparameter configuration, you use predictor_type='regressor' to indicate that you’re using the algorithm to train a regression model. See the following code:

def launch_training_job(location, transformer): """Launch a linear learner traing job""" train_inputs = '{}/{}'.format(transformer.output_path, "train.csv") val_inputs = '{}/{}'.format(transformer.output_path, "val.csv") print("train_inputs:", train_inputs) print("val_inputs:", val_inputs) full_output_prefix = '{}/model_artifacts/{}'.format(DATA_PREFIX, location) s3_output_path = 's3://{}/{}'.format(BUCKET, full_output_prefix) print("s3_output_path ", s3_output_path) s3_output_path = 's3://{}/{}/model_artifacts/{}'.format(BUCKET, DATA_PREFIX, location) linear_estimator = sagemaker.estimator.Estimator( container, role, train_instance_count=1, train_instance_type='ml.c4.xlarge', output_path=s3_output_path, sagemaker_session=sagemaker_session) linear_estimator.set_hyperparameters( feature_dim=10, mini_batch_size=100, predictor_type='regressor', epochs=10, num_models=32, loss='absolute_loss') DISTRIBUTION_MODE = 'FullyReplicated' train_input = sagemaker.s3_input(s3_data=train_inputs, distribution=DISTRIBUTION_MODE, content_type='text/csv;label_size=1') val_input = sagemaker.s3_input(s3_data=val_inputs, distribution=DISTRIBUTION_MODE, content_type='text/csv;label_size=1') remote_inputs = {'train': train_input, 'validation': val_input} linear_estimator.fit(remote_inputs, wait=False) return linear_estimator.latest_training_job.name

You can now start multiple model training jobs, one for each location. Make sure to choose the correct value for PARALLEL TRAINING_JOBS, taking your AWS account service limits and cost into consideration. In the notebook, this value is set to 4. See the following code:

training_jobs = []
for transformer, loc in zip(preprocessor_transformers, LOCATIONS[:PARALLEL_TRAINING_JOBS]): job = launch_training_job(loc, transformer) training_jobs.append(job)
print('{} training jobs launched: {}'.format(len(training_jobs), training_jobs))

You receive output similar to the following:

4 training jobs launched: [(<sagemaker.estimator.Estimator object at 0x7fb54784b6d8>, 'linear-learner-2020-06-03-03-51-26-548'), (<sagemaker.estimator.Estimator object at 0x7fb5478b3198>, 'linear-learner-2020-06-03-03-51-26-973'), (<sagemaker.estimator.Estimator object at 0x7fb54780dbe0>, 'linear-learner-2020-06-03-03-51-27-775'), (<sagemaker.estimator.Estimator object at 0x7fb5477664e0>, 'linear-learner-2020-06-03-03-51-31-457')]

Wait until all training jobs are complete before proceeding to the next step.

Creating an Amazon SageMaker model with multi-model support

When the training jobs are complete, you’re ready to create an MME.

First, define a method to copy model artifacts from the training job output to a location in Amazon S3 where the MME dynamically loads individual models:

def deploy_artifacts_to_mme(job_name): print("job_name :", job_name) response = sm_client.describe_training_job(TrainingJobName=job_name) source_s3_key,model_name = parse_model_artifacts(response['ModelArtifacts']['S3ModelArtifacts']) copy_source = {'Bucket': BUCKET, 'Key': source_s3_key} key = '{}/{}/{}/{}.tar.gz'.format(DATA_PREFIX, MULTI_MODEL_ARTIFACTS, model_name, model_name) print('Copying {} modeln from: {}n to: {}...'.format(model_name, source_s3_key, key)) s3_client.copy_object(Bucket=BUCKET, CopySource=copy_source, Key=key)

Copy the model artifacts from all the training jobs to this location:

## Deploy all but the last model trained to MME
for job_name in training_jobs[:-1]: deploy_artifacts_to_mme(job_name)

You receive output similar to the following:

linear-learner-2020-06-03-03-51-26-973
Copying LosAngeles_CA model from: DEMO_MME_LINEAR_LEARNER/model_artifacts/LosAngeles_CA/linear-learner-2020-06-03-03-51-26-973/output/model.tar.gz to: DEMO_MME_LINEAR_LEARNER/multi_model_artifacts/LosAngeles_CA/LosAngeles_CA.tar.gz...
linear-learner-2020-06-03-03-51-27-775
Copying Chicago_IL model from: DEMO_MME_LINEAR_LEARNER/model_artifacts/Chicago_IL/linear-learner-2020-06-03-03-51-27-775/output/model.tar.gz to: DEMO_MME_LINEAR_LEARNER/multi_model_artifacts/Chicago_IL/Chicago_IL.tar.gz...
linear-learner-2020-06-03-03-51-31-457

Create the Amazon SageMaker model entity using the MultiDataModel API:

MODEL_NAME = '{}-{}'.format(HOUSING_MODEL_NAME, strftime('%Y-%m-%d-%H-%M-%S', gmtime())) _model_url = 's3://{}/{}/{}/'.format(BUCKET, DATA_PREFIX, MULTI_MODEL_ARTIFACTS) ll_multi_model = MultiDataModel( name=MODEL_NAME, model_data_prefix=_model_url, image=container, role=role, sagemaker_session=sagemaker

Creating an inference pipeline

Set up an inference pipeline with the PipelineModel API. This sets up a list of models in a single endpoint; for this post, we configure our pipeline model with the fitted Scikit-learn inference model and the fitted MME linear learner model. See the following code:

from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3
from time import gmtime, strftime timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime()) scikit_learn_inference_model = sklearn_preprocessor.create_model() model_name = '{}-{}'.format('inference-pipeline', timestamp_prefix)
endpoint_name = '{}-{}'.format('inference-pipeline-ep', timestamp_prefix) sm_model = PipelineModel( name=model_name, role=role, sagemaker_session=sagemaker_session, models=[ scikit_learn_inference_model, ll_multi_model]) sm_model.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge', endpoint_name=endpoint_name)

The MME is now ready to take inference requests and respond with predictions. With the MME, the inference request should include the target model to invoke.

Testing the inference pipeline

You can now get predictions from the different linear learner models. Create a RealTimePredictor with the inference pipeline endpoint:

from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
predictor = RealTimePredictor( endpoint=endpoint_name, sagemaker_session=sagemaker_session, serializer=csv_serializer, content_type=CONTENT_TYPE_CSV, accept=CONTENT_TYPE_JSON)

Define a method to get predictions from the RealTimePredictor:

def predict_one_house_value(features, model_name, predictor_to_use): print('Using model {} to predict price of this house: {}'.format(model_name, features)) body = ','.join(map(str, features)) + 'n' start_time = time.time() response = predictor_to_use.predict(features, target_model=model_name) response_json = json.loads(response) predicted_value = response_json['predictions'][0]['score'] duration = time.time() - start_time print('${:,.2f}, took {:,d} msn'.format(predicted_value, int(duration * 1000)))

With MME, the models are dynamically loaded into the container’s memory of the instance hosting the endpoint when invoked. Therefore, the model invocation may take longer when it’s invoked for the first time. When the model is already in the instance container’s memory, the subsequent invocations are faster. If an instance memory utilization is high and a new model needs to be loaded, unused models are unloaded. The unloaded models remain in the instance’s storage volume and can be loaded into container’s memory later without being downloaded from the S3 bucket again. If the instance’s storage volume is full, unused models are deleted from storage volume.

Amazon SageMaker fully manages the loading and unloading of the models, without you having to take any specific actions. However, it’s important to understand this behavior because it has implications on the model invocation latency.

Iterate through invocations with random inputs against a random model and show the predictions and the time it takes for the prediction to come back:

for i in range(10): model_name = LOCATIONS[np.random.randint(1, len(LOCATIONS[:PARALLEL_TRAINING_JOBS]))] full_model_name = '{}/{}.tar.gz'.format(model_name,model_name) predict_one_house_value(gen_random_house()[1:], full_model_name,runtime_sm_client)

You receive output similar to the following:

Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [1993, 2728, 6, 3.0, 0.7, 1, 'y', 'y']
$439,972.62, took 1,166 ms Using model Houston_TX/Houston_TX.tar.gz to predict price of this house: [1989, 1944, 5, 3.0, 1.0, 1, 'n', 'y']
$280,848.00, took 1,086 ms Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [1968, 2427, 4, 3.0, 1.0, 2, 'y', 'n']
$266,721.31, took 1,029 ms Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [2000, 4024, 2, 1.0, 0.82, 1, 'y', 'y']
$584,069.88, took 53 ms Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [1986, 3463, 5, 3.0, 0.9, 1, 'y', 'n']
$496,340.19, took 43 ms Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [2002, 3885, 4, 3.0, 1.16, 2, 'n', 'n']
$626,904.12, took 39 ms Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [1992, 1531, 6, 3.0, 0.68, 1, 'y', 'n']
$257,696.17, took 36 ms Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [1992, 2327, 2, 3.0, 0.59, 3, 'n', 'n']
$337,758.22, took 33 ms Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [1995, 2656, 5, 1.0, 1.16, 0, 'y', 'n']
$390,652.59, took 35 ms Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [2000, 4086, 2, 3.0, 1.03, 3, 'n', 'y']
$632,995.44, took 35 ms

The output that shows the predicted house price and the time it took for the prediction.

You should consider two different invocations of the same model. The second time, you don’t need to download from Amazon S3 because they’re already present on the instance. You see the inferences return in less time than before. For this use case, the invocation time for the Chicago_IL/Chicago_IL.tar.gz model reduced from 1,166 milliseconds the first time to 53 milliseconds the second time. Similarly, the invocation time for the LosAngeles_CA /LosAngeles_CA.tar.gz model reduced from 1,029 milliseconds to 43 milliseconds.

Updating an MME with new models

To deploy a new model to an existing MME, copy a new set of model artifacts to the same Amazon S3 location you set up earlier. For example, copy the model for the Houston location with the following code:

## Copy the last model
last_training_job=training_jobs[PARALLEL_TRAINING_JOBS-1]
deploy_artifacts_to_mme(last_training_job)

Now you can make predictions using the last model. See the following code:

model_name = LOCATIONS[PARALLEL_TRAINING_JOBS-1]
full_model_name = '{}/{}.tar.gz'.format(model_name,model_name)
predict_one_house_value(gen_random_house()[:-1], full_model_name,predictor)

Monitoring MMEs with CloudWatch metrics

Amazon SageMaker provides CloudWatch metrics for MMEs so you can determine the endpoint usage and the cache hit rate and optimize your endpoint. To analyze the endpoint and the container behavior, you invoke multiple models in this sequence:

##Create 200 copies of the original model and save with different names.
copy_additional_artifacts_to_mme(200)
##Starting with no models loaded into the container
##Invoke the first 100 models
invoke_multiple_models_mme(0,100)
##Invoke the same 100 models again
invoke_multiple_models_mme(0,100)
##This time invoke all 200 models to observe behavior
invoke_multiple_models_mme(0,200)

The following chart shows the behavior of the CloudWatch metrics LoadedModelCount and MemoryUtilization corresponding to these model invocations.

The LoadedModelCount metric continuously increases as more models are invoked, until it levels off at 121. The MemoryUtilization metric of the container also increased correspondingly to around 79%. This shows that the instance chosen to host the endpoint could only maintain 121 models in memory when 200 model invocations were made.

The following chart adds the ModelCacheHit metric to the previous two.

As the number of models loaded to the container memory increase, the ModelCacheHit metric improves. When the same 100 models are invoked the second time, ModelCacheHit reaches 1. When new models not yet loaded are invoked, ModelCacheHit decreases again.

You can use CloudWatch charts to help make ongoing decisions on the optimal choice of instance type, instance count, and number of models that a given endpoint should host.

Exploring granular access to models hosted on an MME

Because of the role attached to the notebook instance, it can invoke all models hosted on the MME. However, you can restrict this model invocation access to specific models by using IAM condition keys. To explore this, you create a new IAM role and IAM policy with a condition key to restrict access to a single model. You then assume this new role and verify that only a single target model can be invoked.

The role assigned to the Amazon SageMaker notebook instance should allow IAM role and IAM policy creation for the next steps to be successful.

Create an IAM role with the following code:

#Create a new role that can be assumed by this notebook. The roles should allow access to only a single model.
path='/'
role_name="{}{}".format('allow_invoke_ny_model_role', strftime('%Y-%m-%d-%H-%M-%S', gmtime()))
description='Role that allows invoking a single model'
action_string = "sts:AssumeRole"
trust_policy={ "Version": "2012-10-17", "Statement": [ { "Sid": "statement1", "Effect": "Allow", "Principal": { "AWS": role }, "Action": "sts:AssumeRole" } ] } response = iam_client.create_role( Path=path, RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy), Description=description, MaxSessionDuration=3600
) print(response)

Create an IAM policy with a condition key to restrict access to only the NewYork model:

managed_policy = { "Version": "2012-10-17", "Statement": [ { "Sid": "SageMakerAccess", "Action": "sagemaker:InvokeEndpoint", "Effect": "Allow", "Resource":endpoint_resource_arn, "Condition": { "StringLike": { "sagemaker:TargetModel": ["NewYork_NY/*"] } } } ]
}
response = iam_client.create_policy( PolicyName='allow_invoke_ny_model_policy', PolicyDocument=json.dumps(managed_policy)
)

Attach the IAM policy to the IAM role:

iam_client.attach_role_policy( PolicyArn=policy_arn, RoleName=role_name
)

Assume the new role and create a RealTimePredictor object runtime client:

## Invoke with the role that has access to only NY model
sts_connection = boto3.client('sts')
assumed_role_limited_access = sts_connection.assume_role( RoleArn=role_arn, RoleSessionName="MME_Invoke_NY_Model"
)
assumed_role_limited_access['AssumedRoleUser']['Arn'] #Create sagemaker runtime client with assumed role
ACCESS_KEY = assumed_role_limited_access['Credentials']['AccessKeyId']
SECRET_KEY = assumed_role_limited_access['Credentials']['SecretAccessKey']
SESSION_TOKEN = assumed_role_limited_access['Credentials']['SessionToken'] runtime_sm_client_with_assumed_role = boto3.client( service_name='sagemaker-runtime', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, aws_session_token=SESSION_TOKEN,
) #SageMaker session with the assumed role
sagemakerSessionAssumedRole = sagemaker.Session(sagemaker_runtime_client=runtime_sm_client_with_assumed_role)
#Create a RealTimePredictor with the assumed role.
predictorAssumedRole = RealTimePredictor( endpoint=endpoint_name, sagemaker_session=sagemakerSessionAssumedRole, serializer=csv_serializer, content_type=CONTENT_TYPE_CSV, accept=CONTENT_TYPE_JSON)

Now invoke the NewYork_NY model:

full_model_name = 'NewYork_NY/NewYork_NY.tar.gz'
predict_one_house_value(gen_random_house()[:-1], full_model_name, predictorAssumedRole) 

You receive output similar to the following:

Using model NewYork_NY/NewYork_NY.tar.gz to predict price of this house: [1992, 1659, 2, 2.0, 0.87, 2, 'n', 'y']
$222,008.38, took 154 ms

Next, try to invoke a different model (Chicago_IL/Chicago_IL.tar.gz). This should throw an error because the assumed role isn’t authorized to invoke this model. See the following code:

full_model_name = 'Chicago_IL/Chicago_IL.tar.gz' predict_one_house_value(gen_random_house()[:-1], full_model_name,predictorAssumedRole) 

You receive output similar to the following:

ClientError: An error occurred (AccessDeniedException) when calling the InvokeEndpoint operation: User: arn:aws:sts::xxxxxxxxxxxx:assumed-role/allow_invoke_ny_model_role/MME_Invoke_NY_Model is not authorized to perform: sagemaker:InvokeEndpoint on resource: arn:aws:sagemaker:us-east-1:xxxxxxxxxxxx:endpoint/inference-pipeline-ep-2020-07-01-15-46-51

Conclusion

Amazon SageMaker MMEs are a very powerful tool for teams developing multiple ML models to save significant costs and lower deployment overhead for a large number of ML models. This post discussed the new capabilities of Amazon SageMaker MMEs: native integration with Amazon SageMaker built-in algorithms (such as linear learner and KNN), native integration with inference pipelines, and fine-grained controlled access to the multiple models hosted on a single endpoint using IAM condition keys.

The notebook included with the post provided detailed instructions on training multiple linear learner models for house price predictions for multiple locations, hosting all the models on a single MME, and controlling access to the individual models.When considering multi-model enabled endpoints, you should balance the cost savings and the latency requirements.

Give Amazon SageMaker MMEs a try and leave your feedback in the comments.


About the Author

Sireesha Muppala is a AI/ML Specialist Solutions Architect at AWS, providing guidance to customers on architecting and implementing machine learning solutions at scale. She received her Ph.D. in Computer Science from University of Colorado, Colorado Springs. In her spare time, Sireesha loves to run and hike Colorado trails.

Michael Pham is a Software Development Engineer in the Amazon SageMaker team. His current work focuses on helping developers efficiently host machine learning models. In his spare time he enjoys Olympic weightlifting, reading, and playing chess.

Source: https://aws.amazon.com/blogs/machine-learning/using-amazon-sagemaker-inference-pipelines-with-multi-model-endpoints/

Continue Reading

AI

Using Amazon SageMaker inference pipelines with multi-model endpoints

Businesses are increasingly deploying multiple machine learning (ML) models to serve precise and accurate predictions to their consumers. Consider a media company that wants to provide recommendations to its subscribers. The company may want to employ different custom models for recommending different categories of products—such as movies, books, music, and articles. If the company wants […]

Published

on

Businesses are increasingly deploying multiple machine learning (ML) models to serve precise and accurate predictions to their consumers. Consider a media company that wants to provide recommendations to its subscribers. The company may want to employ different custom models for recommending different categories of products—such as movies, books, music, and articles. If the company wants to add personalization to the recommendations by using individual subscriber information, the number of custom models further increases. Hosting each custom model on a distinct compute instance is not only cost prohibitive, but also leads to underutilization of the hosting resources if not all models are frequently used.

Amazon SageMaker is a fully managed service that enables developers and data scientists to quickly and easily build, train, and deploy ML models at any scale. After you train an ML model, you can deploy it on Amazon SageMaker endpoints that are fully managed and can serve inferences in real time with low latency. Amazon SageMaker multi-model endpoints (MMEs) are a cost-effective solution to deploy a large number of ML models or per-user models. You can deploy multiple models on a single multi-model enabled endpoint such that all models share the compute resources and the serving container. You get significant cost savings and also simplify model deployments and updates. For more information about MME, see Save on inference costs by using Amazon SageMaker multi-model endpoints.

The following diagram depicts how MMEs work.

Multiple model artifacts are persisted in an Amazon S3 bucket. When a specific model is invoked, Amazon SageMaker dynamically loads it onto the container hosting the endpoint. If the model is already loaded in the container’s memory, invocation is faster because Amazon SageMaker doesn’t need to download and load it.

Until now, you could use MME with several frameworks, such as TensorFlow, PyTorch, MXNet, SKLearn, and build your own container with a multi-model server. This post introduces the following feature enhancements to MME:

  • MME support for Amazon SageMaker built-in algorithms – MME is now supported natively in the following popular Amazon SageMaker built-in algorithms: XGBoost, linear learner, RCF, and KNN. You can directly use the Amazon SageMaker provided containers while using these algorithms without having to build your own custom container.
  • MME support for Amazon SageMaker inference pipelines – The Amazon SageMaker inference pipeline model consists of a sequence of containers that serve inference requests by combining preprocessing, predictions, and postprocessing data science tasks. An inference pipeline allows you to reuse the same preprocessing code used during model training to process the inference request data used for predictions. You can now deploy an inference pipeline on an MME where one of the containers in the pipeline can dynamically serve requests based on the model being invoked.
  • IAM condition keys for granular access to models – Prior to this enhancement, an AWS Identity and Access Management (IAM) principal with InvokeEndpoint permission on the endpoint resource could invoke all the models hosted on that endpoint. Now, we support granular access to models using IAM condition keys. For example, the following IAM condition restricts the principal’s access to a model persisted in the Amazon Simple Storage Service (Amazon S3) bucket with company_a or common prefixes:
 Condition": { "StringLike": { "sagemaker:TargetModel": ["company_a/*", "common/*"] } }

We also provide a fully functional notebook to demonstrate these enhancements.

Walkthrough overview

To demonstrate these capabilities, the notebook discusses the use case of predicting house prices in multiple cities using linear regression. House prices are predicted based on features like number of bedrooms, number of garages, square footage, and more. Depending on the city, the features affect the house price differently. For example, small changes in the square footage cause a drastic change in house prices in New York City when compared to price changes in Houston.

For accurate house price predictions, we train multiple linear regression models, with a unique location-specific model per city. Each location-specific model is trained on synthetic housing data with randomly generated characteristics. To cost-effectively serve the multiple housing price prediction models, we deploy the models on a single multi-model enabled endpoint, as shown in the following diagram.

The walkthrough includes the following high-level steps:

  1. Examine the synthetic housing data generated.
  2. Preprocess the raw housing data using Scikit-learn.
  3. Train regression models using the built-in Amazon SageMaker linear learner algorithm.
  4. Create an Amazon SageMaker model with multi-model support.
  5. Create an Amazon SageMaker inference pipeline with an Sklearn model and multi-model enabled linear learner model.
  6. Test the inference pipeline by getting predictions from the different linear learner models.
  7. Update the MME with new models.
  8. Monitor the MME with Amazon CloudWatch
  9. Explore fine-grained access to models hosted on the MME using IAM condition keys.

Other steps necessary to import libraries, set up IAM permissions, and use utility functions are defined in the notebook, which this post doesn’t discuss. You can walk through and run the code with the following notebook on the GitHub repo.

Examining the synthetic housing data

The dataset consists of six numerical features that capture the year the house was built, house size in square feet, number of bedrooms, number of bathrooms, lot size, number of garages, and two categorical features: deck and front porch, indicating whether these are present or not.

To see the raw data, enter the following code:

_houses.head()

The following screenshot shows the results.

You can now preprocess the categorical variables (front_porch and deck) using Scikit-learn.

Preprocessing the raw housing data

To preprocess the raw data, you first create an SKLearn estimator and use the sklearn_preprocessor.py script as the entry_point:

#Create the SKLearn estimator with the sklearn_preprocessor.py as the script
from sagemaker.sklearn.estimator import SKLearn
script_path = 'sklearn_preprocessor.py'
sklearn_preprocessor = SKLearn( entry_point=script_path, role=role, train_instance_type="ml.c4.xlarge", sagemaker_session=sagemaker_session_gamma)

You then launch multiple Scikit-learn training jobs to process the raw synthetic data generated for multiple locations. Before running the following code, take the training instance limits in your account and cost into consideration and adjust the PARALLEL_TRAINING_JOBS value accordingly:

preprocessor_transformers = [] for index, loc in enumerate(LOCATIONS[:PARALLEL_TRAINING_JOBS]): print("preprocessing fit input data at ", index , " for loc ", loc) job_name='scikit-learn-preprocessor-{}'.format(strftime('%Y-%m-%d-%H-%M-%S', gmtime())) sklearn_preprocessor.fit({'train': train_inputs[index]}, job_name=job_name, wait=True) ##Once the preprocessor is fit, use tranformer to preprocess the raw training data and store the transformed data right back into s3. transformer = sklearn_preprocessor.transformer( instance_count=1, instance_type='ml.m4.xlarge', assemble_with='Line', accept='text/csv' ) preprocessor_transformers.append(transformer)

When the preprocessors are properly fitted, preprocess the training data using batch transform to directly preprocess the raw data and store back into Amazon S3:

 preprocessed_train_data_path = [] for index, transformer in enumerate(preprocessor_transformers): transformer.transform(train_inputs[index], content_type='text/csv') print('Launching batch transform job: {}'.format(transformer.latest_transform_job.job_name)) preprocessed_train_data_path.append(transformer.output_path)

Training regression models

In this step, you train multiple models, one for each location.

Start by accessing the built-in linear learner algorithm:

from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(boto3.Session().region_name, 'linear-learner')
container

Depending on the Region you’re using, you receive output similar to the following:

	382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:1

Next, define a method to launch a training job for a single location using the Amazon SageMaker Estimator API. In the hyperparameter configuration, you use predictor_type='regressor' to indicate that you’re using the algorithm to train a regression model. See the following code:

def launch_training_job(location, transformer): """Launch a linear learner traing job""" train_inputs = '{}/{}'.format(transformer.output_path, "train.csv") val_inputs = '{}/{}'.format(transformer.output_path, "val.csv") print("train_inputs:", train_inputs) print("val_inputs:", val_inputs) full_output_prefix = '{}/model_artifacts/{}'.format(DATA_PREFIX, location) s3_output_path = 's3://{}/{}'.format(BUCKET, full_output_prefix) print("s3_output_path ", s3_output_path) s3_output_path = 's3://{}/{}/model_artifacts/{}'.format(BUCKET, DATA_PREFIX, location) linear_estimator = sagemaker.estimator.Estimator( container, role, train_instance_count=1, train_instance_type='ml.c4.xlarge', output_path=s3_output_path, sagemaker_session=sagemaker_session) linear_estimator.set_hyperparameters( feature_dim=10, mini_batch_size=100, predictor_type='regressor', epochs=10, num_models=32, loss='absolute_loss') DISTRIBUTION_MODE = 'FullyReplicated' train_input = sagemaker.s3_input(s3_data=train_inputs, distribution=DISTRIBUTION_MODE, content_type='text/csv;label_size=1') val_input = sagemaker.s3_input(s3_data=val_inputs, distribution=DISTRIBUTION_MODE, content_type='text/csv;label_size=1') remote_inputs = {'train': train_input, 'validation': val_input} linear_estimator.fit(remote_inputs, wait=False) return linear_estimator.latest_training_job.name

You can now start multiple model training jobs, one for each location. Make sure to choose the correct value for PARALLEL TRAINING_JOBS, taking your AWS account service limits and cost into consideration. In the notebook, this value is set to 4. See the following code:

training_jobs = []
for transformer, loc in zip(preprocessor_transformers, LOCATIONS[:PARALLEL_TRAINING_JOBS]): job = launch_training_job(loc, transformer) training_jobs.append(job)
print('{} training jobs launched: {}'.format(len(training_jobs), training_jobs))

You receive output similar to the following:

4 training jobs launched: [(<sagemaker.estimator.Estimator object at 0x7fb54784b6d8>, 'linear-learner-2020-06-03-03-51-26-548'), (<sagemaker.estimator.Estimator object at 0x7fb5478b3198>, 'linear-learner-2020-06-03-03-51-26-973'), (<sagemaker.estimator.Estimator object at 0x7fb54780dbe0>, 'linear-learner-2020-06-03-03-51-27-775'), (<sagemaker.estimator.Estimator object at 0x7fb5477664e0>, 'linear-learner-2020-06-03-03-51-31-457')]

Wait until all training jobs are complete before proceeding to the next step.

Creating an Amazon SageMaker model with multi-model support

When the training jobs are complete, you’re ready to create an MME.

First, define a method to copy model artifacts from the training job output to a location in Amazon S3 where the MME dynamically loads individual models:

def deploy_artifacts_to_mme(job_name): print("job_name :", job_name) response = sm_client.describe_training_job(TrainingJobName=job_name) source_s3_key,model_name = parse_model_artifacts(response['ModelArtifacts']['S3ModelArtifacts']) copy_source = {'Bucket': BUCKET, 'Key': source_s3_key} key = '{}/{}/{}/{}.tar.gz'.format(DATA_PREFIX, MULTI_MODEL_ARTIFACTS, model_name, model_name) print('Copying {} model\n from: {}\n to: {}...'.format(model_name, source_s3_key, key)) s3_client.copy_object(Bucket=BUCKET, CopySource=copy_source, Key=key)

Copy the model artifacts from all the training jobs to this location:

## Deploy all but the last model trained to MME
for job_name in training_jobs[:-1]: deploy_artifacts_to_mme(job_name)

You receive output similar to the following:

linear-learner-2020-06-03-03-51-26-973
Copying LosAngeles_CA model from: DEMO_MME_LINEAR_LEARNER/model_artifacts/LosAngeles_CA/linear-learner-2020-06-03-03-51-26-973/output/model.tar.gz to: DEMO_MME_LINEAR_LEARNER/multi_model_artifacts/LosAngeles_CA/LosAngeles_CA.tar.gz...
linear-learner-2020-06-03-03-51-27-775
Copying Chicago_IL model from: DEMO_MME_LINEAR_LEARNER/model_artifacts/Chicago_IL/linear-learner-2020-06-03-03-51-27-775/output/model.tar.gz to: DEMO_MME_LINEAR_LEARNER/multi_model_artifacts/Chicago_IL/Chicago_IL.tar.gz...
linear-learner-2020-06-03-03-51-31-457

Create the Amazon SageMaker model entity using the MultiDataModel API:

MODEL_NAME = '{}-{}'.format(HOUSING_MODEL_NAME, strftime('%Y-%m-%d-%H-%M-%S', gmtime())) _model_url = 's3://{}/{}/{}/'.format(BUCKET, DATA_PREFIX, MULTI_MODEL_ARTIFACTS) ll_multi_model = MultiDataModel( name=MODEL_NAME, model_data_prefix=_model_url, image=container, role=role, sagemaker_session=sagemaker

Creating an inference pipeline

Set up an inference pipeline with the PipelineModel API. This sets up a list of models in a single endpoint; for this post, we configure our pipeline model with the fitted Scikit-learn inference model and the fitted MME linear learner model. See the following code:

from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3
from time import gmtime, strftime timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime()) scikit_learn_inference_model = sklearn_preprocessor.create_model() model_name = '{}-{}'.format('inference-pipeline', timestamp_prefix)
endpoint_name = '{}-{}'.format('inference-pipeline-ep', timestamp_prefix) sm_model = PipelineModel( name=model_name, role=role, sagemaker_session=sagemaker_session, models=[ scikit_learn_inference_model, ll_multi_model]) sm_model.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge', endpoint_name=endpoint_name)

The MME is now ready to take inference requests and respond with predictions. With the MME, the inference request should include the target model to invoke.

Testing the inference pipeline

You can now get predictions from the different linear learner models. Create a RealTimePredictor with the inference pipeline endpoint:

from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
predictor = RealTimePredictor( endpoint=endpoint_name, sagemaker_session=sagemaker_session, serializer=csv_serializer, content_type=CONTENT_TYPE_CSV, accept=CONTENT_TYPE_JSON)

Define a method to get predictions from the RealTimePredictor:

def predict_one_house_value(features, model_name, predictor_to_use): print('Using model {} to predict price of this house: {}'.format(model_name, features)) body = ','.join(map(str, features)) + '\n' start_time = time.time() response = predictor_to_use.predict(features, target_model=model_name) response_json = json.loads(response) predicted_value = response_json['predictions'][0]['score'] duration = time.time() - start_time print('${:,.2f}, took {:,d} ms\n'.format(predicted_value, int(duration * 1000)))

With MME, the models are dynamically loaded into the container’s memory of the instance hosting the endpoint when invoked. Therefore, the model invocation may take longer when it’s invoked for the first time. When the model is already in the instance container’s memory, the subsequent invocations are faster. If an instance memory utilization is high and a new model needs to be loaded, unused models are unloaded. The unloaded models remain in the instance’s storage volume and can be loaded into container’s memory later without being downloaded from the S3 bucket again. If the instance’s storage volume is full, unused models are deleted from storage volume.

Amazon SageMaker fully manages the loading and unloading of the models, without you having to take any specific actions. However, it’s important to understand this behavior because it has implications on the model invocation latency.

Iterate through invocations with random inputs against a random model and show the predictions and the time it takes for the prediction to come back:

for i in range(10): model_name = LOCATIONS[np.random.randint(1, len(LOCATIONS[:PARALLEL_TRAINING_JOBS]))] full_model_name = '{}/{}.tar.gz'.format(model_name,model_name) predict_one_house_value(gen_random_house()[1:], full_model_name,runtime_sm_client)

You receive output similar to the following:

Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [1993, 2728, 6, 3.0, 0.7, 1, 'y', 'y']
$439,972.62, took 1,166 ms Using model Houston_TX/Houston_TX.tar.gz to predict price of this house: [1989, 1944, 5, 3.0, 1.0, 1, 'n', 'y']
$280,848.00, took 1,086 ms Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [1968, 2427, 4, 3.0, 1.0, 2, 'y', 'n']
$266,721.31, took 1,029 ms Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [2000, 4024, 2, 1.0, 0.82, 1, 'y', 'y']
$584,069.88, took 53 ms Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [1986, 3463, 5, 3.0, 0.9, 1, 'y', 'n']
$496,340.19, took 43 ms Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [2002, 3885, 4, 3.0, 1.16, 2, 'n', 'n']
$626,904.12, took 39 ms Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [1992, 1531, 6, 3.0, 0.68, 1, 'y', 'n']
$257,696.17, took 36 ms Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [1992, 2327, 2, 3.0, 0.59, 3, 'n', 'n']
$337,758.22, took 33 ms Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [1995, 2656, 5, 1.0, 1.16, 0, 'y', 'n']
$390,652.59, took 35 ms Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [2000, 4086, 2, 3.0, 1.03, 3, 'n', 'y']
$632,995.44, took 35 ms

The output that shows the predicted house price and the time it took for the prediction.

You should consider two different invocations of the same model. The second time, you don’t need to download from Amazon S3 because they’re already present on the instance. You see the inferences return in less time than before. For this use case, the invocation time for the Chicago_IL/Chicago_IL.tar.gz model reduced from 1,166 milliseconds the first time to 53 milliseconds the second time. Similarly, the invocation time for the LosAngeles_CA /LosAngeles_CA.tar.gz model reduced from 1,029 milliseconds to 43 milliseconds.

Updating an MME with new models

To deploy a new model to an existing MME, copy a new set of model artifacts to the same Amazon S3 location you set up earlier. For example, copy the model for the Houston location with the following code:

## Copy the last model
last_training_job=training_jobs[PARALLEL_TRAINING_JOBS-1]
deploy_artifacts_to_mme(last_training_job)

Now you can make predictions using the last model. See the following code:

model_name = LOCATIONS[PARALLEL_TRAINING_JOBS-1]
full_model_name = '{}/{}.tar.gz'.format(model_name,model_name)
predict_one_house_value(gen_random_house()[:-1], full_model_name,predictor)

Monitoring MMEs with CloudWatch metrics

Amazon SageMaker provides CloudWatch metrics for MMEs so you can determine the endpoint usage and the cache hit rate and optimize your endpoint. To analyze the endpoint and the container behavior, you invoke multiple models in this sequence:

##Create 200 copies of the original model and save with different names.
copy_additional_artifacts_to_mme(200)
##Starting with no models loaded into the container
##Invoke the first 100 models
invoke_multiple_models_mme(0,100)
##Invoke the same 100 models again
invoke_multiple_models_mme(0,100)
##This time invoke all 200 models to observe behavior
invoke_multiple_models_mme(0,200)

The following chart shows the behavior of the CloudWatch metrics LoadedModelCount and MemoryUtilization corresponding to these model invocations.

The LoadedModelCount metric continuously increases as more models are invoked, until it levels off at 121. The MemoryUtilization metric of the container also increased correspondingly to around 79%. This shows that the instance chosen to host the endpoint could only maintain 121 models in memory when 200 model invocations were made.

The following chart adds the ModelCacheHit metric to the previous two.

As the number of models loaded to the container memory increase, the ModelCacheHit metric improves. When the same 100 models are invoked the second time, ModelCacheHit reaches 1. When new models not yet loaded are invoked, ModelCacheHit decreases again.

You can use CloudWatch charts to help make ongoing decisions on the optimal choice of instance type, instance count, and number of models that a given endpoint should host.

Exploring granular access to models hosted on an MME

Because of the role attached to the notebook instance, it can invoke all models hosted on the MME. However, you can restrict this model invocation access to specific models by using IAM condition keys. To explore this, you create a new IAM role and IAM policy with a condition key to restrict access to a single model. You then assume this new role and verify that only a single target model can be invoked.

The role assigned to the Amazon SageMaker notebook instance should allow IAM role and IAM policy creation for the next steps to be successful.

Create an IAM role with the following code:

#Create a new role that can be assumed by this notebook. The roles should allow access to only a single model.
path='/'
role_name="{}{}".format('allow_invoke_ny_model_role', strftime('%Y-%m-%d-%H-%M-%S', gmtime()))
description='Role that allows invoking a single model'
action_string = "sts:AssumeRole"
trust_policy={ "Version": "2012-10-17", "Statement": [ { "Sid": "statement1", "Effect": "Allow", "Principal": { "AWS": role }, "Action": "sts:AssumeRole" } ] } response = iam_client.create_role( Path=path, RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy), Description=description, MaxSessionDuration=3600
) print(response)

Create an IAM policy with a condition key to restrict access to only the NewYork model:

managed_policy = { "Version": "2012-10-17", "Statement": [ { "Sid": "SageMakerAccess", "Action": "sagemaker:InvokeEndpoint", "Effect": "Allow", "Resource":endpoint_resource_arn, "Condition": { "StringLike": { "sagemaker:TargetModel": ["NewYork_NY/*"] } } } ]
}
response = iam_client.create_policy( PolicyName='allow_invoke_ny_model_policy', PolicyDocument=json.dumps(managed_policy)
)

Attach the IAM policy to the IAM role:

iam_client.attach_role_policy( PolicyArn=policy_arn, RoleName=role_name
)

Assume the new role and create a RealTimePredictor object runtime client:

## Invoke with the role that has access to only NY model
sts_connection = boto3.client('sts')
assumed_role_limited_access = sts_connection.assume_role( RoleArn=role_arn, RoleSessionName="MME_Invoke_NY_Model"
)
assumed_role_limited_access['AssumedRoleUser']['Arn'] #Create sagemaker runtime client with assumed role
ACCESS_KEY = assumed_role_limited_access['Credentials']['AccessKeyId']
SECRET_KEY = assumed_role_limited_access['Credentials']['SecretAccessKey']
SESSION_TOKEN = assumed_role_limited_access['Credentials']['SessionToken'] runtime_sm_client_with_assumed_role = boto3.client( service_name='sagemaker-runtime', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, aws_session_token=SESSION_TOKEN,
) #SageMaker session with the assumed role
sagemakerSessionAssumedRole = sagemaker.Session(sagemaker_runtime_client=runtime_sm_client_with_assumed_role)
#Create a RealTimePredictor with the assumed role.
predictorAssumedRole = RealTimePredictor( endpoint=endpoint_name, sagemaker_session=sagemakerSessionAssumedRole, serializer=csv_serializer, content_type=CONTENT_TYPE_CSV, accept=CONTENT_TYPE_JSON)

Now invoke the NewYork_NY model:

full_model_name = 'NewYork_NY/NewYork_NY.tar.gz'
predict_one_house_value(gen_random_house()[:-1], full_model_name, predictorAssumedRole) 

You receive output similar to the following:

Using model NewYork_NY/NewYork_NY.tar.gz to predict price of this house: [1992, 1659, 2, 2.0, 0.87, 2, 'n', 'y']
$222,008.38, took 154 ms

Next, try to invoke a different model (Chicago_IL/Chicago_IL.tar.gz). This should throw an error because the assumed role isn’t authorized to invoke this model. See the following code:

full_model_name = 'Chicago_IL/Chicago_IL.tar.gz' predict_one_house_value(gen_random_house()[:-1], full_model_name,predictorAssumedRole) 

You receive output similar to the following:

ClientError: An error occurred (AccessDeniedException) when calling the InvokeEndpoint operation: User: arn:aws:sts::xxxxxxxxxxxx:assumed-role/allow_invoke_ny_model_role/MME_Invoke_NY_Model is not authorized to perform: sagemaker:InvokeEndpoint on resource: arn:aws:sagemaker:us-east-1:xxxxxxxxxxxx:endpoint/inference-pipeline-ep-2020-07-01-15-46-51

Conclusion

Amazon SageMaker MMEs are a very powerful tool for teams developing multiple ML models to save significant costs and lower deployment overhead for a large number of ML models. This post discussed the new capabilities of Amazon SageMaker MMEs: native integration with Amazon SageMaker built-in algorithms (such as linear learner and KNN), native integration with inference pipelines, and fine-grained controlled access to the multiple models hosted on a single endpoint using IAM condition keys.

The notebook included with the post provided detailed instructions on training multiple linear learner models for house price predictions for multiple locations, hosting all the models on a single MME, and controlling access to the individual models.When considering multi-model enabled endpoints, you should balance the cost savings and the latency requirements.

Give Amazon SageMaker MMEs a try and leave your feedback in the comments.


About the Author

Sireesha Muppala is a AI/ML Specialist Solutions Architect at AWS, providing guidance to customers on architecting and implementing machine learning solutions at scale. She received her Ph.D. in Computer Science from University of Colorado, Colorado Springs. In her spare time, Sireesha loves to run and hike Colorado trails.

Michael Pham is a Software Development Engineer in the Amazon SageMaker team. His current work focuses on helping developers efficiently host machine learning models. In his spare time he enjoys Olympic weightlifting, reading, and playing chess.

Source: https://aws.amazon.com/blogs/machine-learning/using-amazon-sagemaker-inference-pipelines-with-multi-model-endpoints/

Continue Reading

AI

Using Amazon SageMaker inference pipelines with multi-model endpoints

Businesses are increasingly deploying multiple machine learning (ML) models to serve precise and accurate predictions to their consumers. Consider a media company that wants to provide recommendations to its subscribers. The company may want to employ different custom models for recommending different categories of products—such as movies, books, music, and articles. If the company wants […]

Published

on

Businesses are increasingly deploying multiple machine learning (ML) models to serve precise and accurate predictions to their consumers. Consider a media company that wants to provide recommendations to its subscribers. The company may want to employ different custom models for recommending different categories of products—such as movies, books, music, and articles. If the company wants to add personalization to the recommendations by using individual subscriber information, the number of custom models further increases. Hosting each custom model on a distinct compute instance is not only cost prohibitive, but also leads to underutilization of the hosting resources if not all models are frequently used.

Amazon SageMaker is a fully managed service that enables developers and data scientists to quickly and easily build, train, and deploy ML models at any scale. After you train an ML model, you can deploy it on Amazon SageMaker endpoints that are fully managed and can serve inferences in real time with low latency. Amazon SageMaker multi-model endpoints (MMEs) are a cost-effective solution to deploy a large number of ML models or per-user models. You can deploy multiple models on a single multi-model enabled endpoint such that all models share the compute resources and the serving container. You get significant cost savings and also simplify model deployments and updates. For more information about MME, see Save on inference costs by using Amazon SageMaker multi-model endpoints.

The following diagram depicts how MMEs work.

Multiple model artifacts are persisted in an Amazon S3 bucket. When a specific model is invoked, Amazon SageMaker dynamically loads it onto the container hosting the endpoint. If the model is already loaded in the container’s memory, invocation is faster because Amazon SageMaker doesn’t need to download and load it.

Until now, you could use MME with several frameworks, such as TensorFlow, PyTorch, MXNet, SKLearn, and build your own container with a multi-model server. This post introduces the following feature enhancements to MME:

  • MME support for Amazon SageMaker built-in algorithms – MME is now supported natively in the following popular Amazon SageMaker built-in algorithms: XGBoost, linear learner, RCF, and KNN. You can directly use the Amazon SageMaker provided containers while using these algorithms without having to build your own custom container.
  • MME support for Amazon SageMaker inference pipelines – The Amazon SageMaker inference pipeline model consists of a sequence of containers that serve inference requests by combining preprocessing, predictions, and postprocessing data science tasks. An inference pipeline allows you to reuse the same preprocessing code used during model training to process the inference request data used for predictions. You can now deploy an inference pipeline on an MME where one of the containers in the pipeline can dynamically serve requests based on the model being invoked.
  • IAM condition keys for granular access to models – Prior to this enhancement, an AWS Identity and Access Management (IAM) principal with InvokeEndpoint permission on the endpoint resource could invoke all the models hosted on that endpoint. Now, we support granular access to models using IAM condition keys. For example, the following IAM condition restricts the principal’s access to a model persisted in the Amazon Simple Storage Service (Amazon S3) bucket with company_a or common prefixes:
 Condition": { "StringLike": { "sagemaker:TargetModel": ["company_a/*", "common/*"] } }

We also provide a fully functional notebook to demonstrate these enhancements.

Walkthrough overview

To demonstrate these capabilities, the notebook discusses the use case of predicting house prices in multiple cities using linear regression. House prices are predicted based on features like number of bedrooms, number of garages, square footage, and more. Depending on the city, the features affect the house price differently. For example, small changes in the square footage cause a drastic change in house prices in New York City when compared to price changes in Houston.

For accurate house price predictions, we train multiple linear regression models, with a unique location-specific model per city. Each location-specific model is trained on synthetic housing data with randomly generated characteristics. To cost-effectively serve the multiple housing price prediction models, we deploy the models on a single multi-model enabled endpoint, as shown in the following diagram.

The walkthrough includes the following high-level steps:

  1. Examine the synthetic housing data generated.
  2. Preprocess the raw housing data using Scikit-learn.
  3. Train regression models using the built-in Amazon SageMaker linear learner algorithm.
  4. Create an Amazon SageMaker model with multi-model support.
  5. Create an Amazon SageMaker inference pipeline with an Sklearn model and multi-model enabled linear learner model.
  6. Test the inference pipeline by getting predictions from the different linear learner models.
  7. Update the MME with new models.
  8. Monitor the MME with Amazon CloudWatch
  9. Explore fine-grained access to models hosted on the MME using IAM condition keys.

Other steps necessary to import libraries, set up IAM permissions, and use utility functions are defined in the notebook, which this post doesn’t discuss. You can walk through and run the code with the following notebook on the GitHub repo.

Examining the synthetic housing data

The dataset consists of six numerical features that capture the year the house was built, house size in square feet, number of bedrooms, number of bathrooms, lot size, number of garages, and two categorical features: deck and front porch, indicating whether these are present or not.

To see the raw data, enter the following code:

_houses.head()

The following screenshot shows the results.

You can now preprocess the categorical variables (front_porch and deck) using Scikit-learn.

Preprocessing the raw housing data

To preprocess the raw data, you first create an SKLearn estimator and use the sklearn_preprocessor.py script as the entry_point:

#Create the SKLearn estimator with the sklearn_preprocessor.py as the script
from sagemaker.sklearn.estimator import SKLearn
script_path = 'sklearn_preprocessor.py'
sklearn_preprocessor = SKLearn( entry_point=script_path, role=role, train_instance_type="ml.c4.xlarge", sagemaker_session=sagemaker_session_gamma)

You then launch multiple Scikit-learn training jobs to process the raw synthetic data generated for multiple locations. Before running the following code, take the training instance limits in your account and cost into consideration and adjust the PARALLEL_TRAINING_JOBS value accordingly:

preprocessor_transformers = [] for index, loc in enumerate(LOCATIONS[:PARALLEL_TRAINING_JOBS]): print("preprocessing fit input data at ", index , " for loc ", loc) job_name='scikit-learn-preprocessor-{}'.format(strftime('%Y-%m-%d-%H-%M-%S', gmtime())) sklearn_preprocessor.fit({'train': train_inputs[index]}, job_name=job_name, wait=True) ##Once the preprocessor is fit, use tranformer to preprocess the raw training data and store the transformed data right back into s3. transformer = sklearn_preprocessor.transformer( instance_count=1, instance_type='ml.m4.xlarge', assemble_with='Line', accept='text/csv' ) preprocessor_transformers.append(transformer)

When the preprocessors are properly fitted, preprocess the training data using batch transform to directly preprocess the raw data and store back into Amazon S3:

 preprocessed_train_data_path = [] for index, transformer in enumerate(preprocessor_transformers): transformer.transform(train_inputs[index], content_type='text/csv') print('Launching batch transform job: {}'.format(transformer.latest_transform_job.job_name)) preprocessed_train_data_path.append(transformer.output_path)

Training regression models

In this step, you train multiple models, one for each location.

Start by accessing the built-in linear learner algorithm:

from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(boto3.Session().region_name, 'linear-learner')
container

Depending on the Region you’re using, you receive output similar to the following:

	382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:1

Next, define a method to launch a training job for a single location using the Amazon SageMaker Estimator API. In the hyperparameter configuration, you use predictor_type='regressor' to indicate that you’re using the algorithm to train a regression model. See the following code:

def launch_training_job(location, transformer): """Launch a linear learner traing job""" train_inputs = '{}/{}'.format(transformer.output_path, "train.csv") val_inputs = '{}/{}'.format(transformer.output_path, "val.csv") print("train_inputs:", train_inputs) print("val_inputs:", val_inputs) full_output_prefix = '{}/model_artifacts/{}'.format(DATA_PREFIX, location) s3_output_path = 's3://{}/{}'.format(BUCKET, full_output_prefix) print("s3_output_path ", s3_output_path) s3_output_path = 's3://{}/{}/model_artifacts/{}'.format(BUCKET, DATA_PREFIX, location) linear_estimator = sagemaker.estimator.Estimator( container, role, train_instance_count=1, train_instance_type='ml.c4.xlarge', output_path=s3_output_path, sagemaker_session=sagemaker_session) linear_estimator.set_hyperparameters( feature_dim=10, mini_batch_size=100, predictor_type='regressor', epochs=10, num_models=32, loss='absolute_loss') DISTRIBUTION_MODE = 'FullyReplicated' train_input = sagemaker.s3_input(s3_data=train_inputs, distribution=DISTRIBUTION_MODE, content_type='text/csv;label_size=1') val_input = sagemaker.s3_input(s3_data=val_inputs, distribution=DISTRIBUTION_MODE, content_type='text/csv;label_size=1') remote_inputs = {'train': train_input, 'validation': val_input} linear_estimator.fit(remote_inputs, wait=False) return linear_estimator.latest_training_job.name

You can now start multiple model training jobs, one for each location. Make sure to choose the correct value for PARALLEL TRAINING_JOBS, taking your AWS account service limits and cost into consideration. In the notebook, this value is set to 4. See the following code:

training_jobs = []
for transformer, loc in zip(preprocessor_transformers, LOCATIONS[:PARALLEL_TRAINING_JOBS]): job = launch_training_job(loc, transformer) training_jobs.append(job)
print('{} training jobs launched: {}'.format(len(training_jobs), training_jobs))

You receive output similar to the following:

4 training jobs launched: [(<sagemaker.estimator.Estimator object at 0x7fb54784b6d8>, 'linear-learner-2020-06-03-03-51-26-548'), (<sagemaker.estimator.Estimator object at 0x7fb5478b3198>, 'linear-learner-2020-06-03-03-51-26-973'), (<sagemaker.estimator.Estimator object at 0x7fb54780dbe0>, 'linear-learner-2020-06-03-03-51-27-775'), (<sagemaker.estimator.Estimator object at 0x7fb5477664e0>, 'linear-learner-2020-06-03-03-51-31-457')]

Wait until all training jobs are complete before proceeding to the next step.

Creating an Amazon SageMaker model with multi-model support

When the training jobs are complete, you’re ready to create an MME.

First, define a method to copy model artifacts from the training job output to a location in Amazon S3 where the MME dynamically loads individual models:

def deploy_artifacts_to_mme(job_name): print("job_name :", job_name) response = sm_client.describe_training_job(TrainingJobName=job_name) source_s3_key,model_name = parse_model_artifacts(response['ModelArtifacts']['S3ModelArtifacts']) copy_source = {'Bucket': BUCKET, 'Key': source_s3_key} key = '{}/{}/{}/{}.tar.gz'.format(DATA_PREFIX, MULTI_MODEL_ARTIFACTS, model_name, model_name) print('Copying {} model\n from: {}\n to: {}...'.format(model_name, source_s3_key, key)) s3_client.copy_object(Bucket=BUCKET, CopySource=copy_source, Key=key)

Copy the model artifacts from all the training jobs to this location:

## Deploy all but the last model trained to MME
for job_name in training_jobs[:-1]: deploy_artifacts_to_mme(job_name)

You receive output similar to the following:

linear-learner-2020-06-03-03-51-26-973
Copying LosAngeles_CA model from: DEMO_MME_LINEAR_LEARNER/model_artifacts/LosAngeles_CA/linear-learner-2020-06-03-03-51-26-973/output/model.tar.gz to: DEMO_MME_LINEAR_LEARNER/multi_model_artifacts/LosAngeles_CA/LosAngeles_CA.tar.gz...
linear-learner-2020-06-03-03-51-27-775
Copying Chicago_IL model from: DEMO_MME_LINEAR_LEARNER/model_artifacts/Chicago_IL/linear-learner-2020-06-03-03-51-27-775/output/model.tar.gz to: DEMO_MME_LINEAR_LEARNER/multi_model_artifacts/Chicago_IL/Chicago_IL.tar.gz...
linear-learner-2020-06-03-03-51-31-457

Create the Amazon SageMaker model entity using the MultiDataModel API:

MODEL_NAME = '{}-{}'.format(HOUSING_MODEL_NAME, strftime('%Y-%m-%d-%H-%M-%S', gmtime())) _model_url = 's3://{}/{}/{}/'.format(BUCKET, DATA_PREFIX, MULTI_MODEL_ARTIFACTS) ll_multi_model = MultiDataModel( name=MODEL_NAME, model_data_prefix=_model_url, image=container, role=role, sagemaker_session=sagemaker

Creating an inference pipeline

Set up an inference pipeline with the PipelineModel API. This sets up a list of models in a single endpoint; for this post, we configure our pipeline model with the fitted Scikit-learn inference model and the fitted MME linear learner model. See the following code:

from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3
from time import gmtime, strftime timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime()) scikit_learn_inference_model = sklearn_preprocessor.create_model() model_name = '{}-{}'.format('inference-pipeline', timestamp_prefix)
endpoint_name = '{}-{}'.format('inference-pipeline-ep', timestamp_prefix) sm_model = PipelineModel( name=model_name, role=role, sagemaker_session=sagemaker_session, models=[ scikit_learn_inference_model, ll_multi_model]) sm_model.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge', endpoint_name=endpoint_name)

The MME is now ready to take inference requests and respond with predictions. With the MME, the inference request should include the target model to invoke.

Testing the inference pipeline

You can now get predictions from the different linear learner models. Create a RealTimePredictor with the inference pipeline endpoint:

from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
predictor = RealTimePredictor( endpoint=endpoint_name, sagemaker_session=sagemaker_session, serializer=csv_serializer, content_type=CONTENT_TYPE_CSV, accept=CONTENT_TYPE_JSON)

Define a method to get predictions from the RealTimePredictor:

def predict_one_house_value(features, model_name, predictor_to_use): print('Using model {} to predict price of this house: {}'.format(model_name, features)) body = ','.join(map(str, features)) + '\n' start_time = time.time() response = predictor_to_use.predict(features, target_model=model_name) response_json = json.loads(response) predicted_value = response_json['predictions'][0]['score'] duration = time.time() - start_time print('${:,.2f}, took {:,d} ms\n'.format(predicted_value, int(duration * 1000)))

With MME, the models are dynamically loaded into the container’s memory of the instance hosting the endpoint when invoked. Therefore, the model invocation may take longer when it’s invoked for the first time. When the model is already in the instance container’s memory, the subsequent invocations are faster. If an instance memory utilization is high and a new model needs to be loaded, unused models are unloaded. The unloaded models remain in the instance’s storage volume and can be loaded into container’s memory later without being downloaded from the S3 bucket again. If the instance’s storage volume is full, unused models are deleted from storage volume.

Amazon SageMaker fully manages the loading and unloading of the models, without you having to take any specific actions. However, it’s important to understand this behavior because it has implications on the model invocation latency.

Iterate through invocations with random inputs against a random model and show the predictions and the time it takes for the prediction to come back:

for i in range(10): model_name = LOCATIONS[np.random.randint(1, len(LOCATIONS[:PARALLEL_TRAINING_JOBS]))] full_model_name = '{}/{}.tar.gz'.format(model_name,model_name) predict_one_house_value(gen_random_house()[1:], full_model_name,runtime_sm_client)

You receive output similar to the following:

Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [1993, 2728, 6, 3.0, 0.7, 1, 'y', 'y']
$439,972.62, took 1,166 ms Using model Houston_TX/Houston_TX.tar.gz to predict price of this house: [1989, 1944, 5, 3.0, 1.0, 1, 'n', 'y']
$280,848.00, took 1,086 ms Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [1968, 2427, 4, 3.0, 1.0, 2, 'y', 'n']
$266,721.31, took 1,029 ms Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [2000, 4024, 2, 1.0, 0.82, 1, 'y', 'y']
$584,069.88, took 53 ms Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [1986, 3463, 5, 3.0, 0.9, 1, 'y', 'n']
$496,340.19, took 43 ms Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [2002, 3885, 4, 3.0, 1.16, 2, 'n', 'n']
$626,904.12, took 39 ms Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [1992, 1531, 6, 3.0, 0.68, 1, 'y', 'n']
$257,696.17, took 36 ms Using model Chicago_IL/Chicago_IL.tar.gz to predict price of this house: [1992, 2327, 2, 3.0, 0.59, 3, 'n', 'n']
$337,758.22, took 33 ms Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [1995, 2656, 5, 1.0, 1.16, 0, 'y', 'n']
$390,652.59, took 35 ms Using model LosAngeles_CA/LosAngeles_CA.tar.gz to predict price of this house: [2000, 4086, 2, 3.0, 1.03, 3, 'n', 'y']
$632,995.44, took 35 ms

The output that shows the predicted house price and the time it took for the prediction.

You should consider two different invocations of the same model. The second time, you don’t need to download from Amazon S3 because they’re already present on the instance. You see the inferences return in less time than before. For this use case, the invocation time for the Chicago_IL/Chicago_IL.tar.gz model reduced from 1,166 milliseconds the first time to 53 milliseconds the second time. Similarly, the invocation time for the LosAngeles_CA /LosAngeles_CA.tar.gz model reduced from 1,029 milliseconds to 43 milliseconds.

Updating an MME with new models

To deploy a new model to an existing MME, copy a new set of model artifacts to the same Amazon S3 location you set up earlier. For example, copy the model for the Houston location with the following code:

## Copy the last model
last_training_job=training_jobs[PARALLEL_TRAINING_JOBS-1]
deploy_artifacts_to_mme(last_training_job)

Now you can make predictions using the last model. See the following code:

model_name = LOCATIONS[PARALLEL_TRAINING_JOBS-1]
full_model_name = '{}/{}.tar.gz'.format(model_name,model_name)
predict_one_house_value(gen_random_house()[:-1], full_model_name,predictor)

Monitoring MMEs with CloudWatch metrics

Amazon SageMaker provides CloudWatch metrics for MMEs so you can determine the endpoint usage and the cache hit rate and optimize your endpoint. To analyze the endpoint and the container behavior, you invoke multiple models in this sequence:

##Create 200 copies of the original model and save with different names.
copy_additional_artifacts_to_mme(200)
##Starting with no models loaded into the container
##Invoke the first 100 models
invoke_multiple_models_mme(0,100)
##Invoke the same 100 models again
invoke_multiple_models_mme(0,100)
##This time invoke all 200 models to observe behavior
invoke_multiple_models_mme(0,200)

The following chart shows the behavior of the CloudWatch metrics LoadedModelCount and MemoryUtilization corresponding to these model invocations.

The LoadedModelCount metric continuously increases as more models are invoked, until it levels off at 121. The MemoryUtilization metric of the container also increased correspondingly to around 79%. This shows that the instance chosen to host the endpoint could only maintain 121 models in memory when 200 model invocations were made.

The following chart adds the ModelCacheHit metric to the previous two.

As the number of models loaded to the container memory increase, the ModelCacheHit metric improves. When the same 100 models are invoked the second time, ModelCacheHit reaches 1. When new models not yet loaded are invoked, ModelCacheHit decreases again.

You can use CloudWatch charts to help make ongoing decisions on the optimal choice of instance type, instance count, and number of models that a given endpoint should host.

Exploring granular access to models hosted on an MME

Because of the role attached to the notebook instance, it can invoke all models hosted on the MME. However, you can restrict this model invocation access to specific models by using IAM condition keys. To explore this, you create a new IAM role and IAM policy with a condition key to restrict access to a single model. You then assume this new role and verify that only a single target model can be invoked.

The role assigned to the Amazon SageMaker notebook instance should allow IAM role and IAM policy creation for the next steps to be successful.

Create an IAM role with the following code:

#Create a new role that can be assumed by this notebook. The roles should allow access to only a single model.
path='/'
role_name="{}{}".format('allow_invoke_ny_model_role', strftime('%Y-%m-%d-%H-%M-%S', gmtime()))
description='Role that allows invoking a single model'
action_string = "sts:AssumeRole"
trust_policy={ "Version": "2012-10-17", "Statement": [ { "Sid": "statement1", "Effect": "Allow", "Principal": { "AWS": role }, "Action": "sts:AssumeRole" } ] } response = iam_client.create_role( Path=path, RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy), Description=description, MaxSessionDuration=3600
) print(response)

Create an IAM policy with a condition key to restrict access to only the NewYork model:

managed_policy = { "Version": "2012-10-17", "Statement": [ { "Sid": "SageMakerAccess", "Action": "sagemaker:InvokeEndpoint", "Effect": "Allow", "Resource":endpoint_resource_arn, "Condition": { "StringLike": { "sagemaker:TargetModel": ["NewYork_NY/*"] } } } ]
}
response = iam_client.create_policy( PolicyName='allow_invoke_ny_model_policy', PolicyDocument=json.dumps(managed_policy)
)

Attach the IAM policy to the IAM role:

iam_client.attach_role_policy( PolicyArn=policy_arn, RoleName=role_name
)

Assume the new role and create a RealTimePredictor object runtime client:

## Invoke with the role that has access to only NY model
sts_connection = boto3.client('sts')
assumed_role_limited_access = sts_connection.assume_role( RoleArn=role_arn, RoleSessionName="MME_Invoke_NY_Model"
)
assumed_role_limited_access['AssumedRoleUser']['Arn'] #Create sagemaker runtime client with assumed role
ACCESS_KEY = assumed_role_limited_access['Credentials']['AccessKeyId']
SECRET_KEY = assumed_role_limited_access['Credentials']['SecretAccessKey']
SESSION_TOKEN = assumed_role_limited_access['Credentials']['SessionToken'] runtime_sm_client_with_assumed_role = boto3.client( service_name='sagemaker-runtime', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, aws_session_token=SESSION_TOKEN,
) #SageMaker session with the assumed role
sagemakerSessionAssumedRole = sagemaker.Session(sagemaker_runtime_client=runtime_sm_client_with_assumed_role)
#Create a RealTimePredictor with the assumed role.
predictorAssumedRole = RealTimePredictor( endpoint=endpoint_name, sagemaker_session=sagemakerSessionAssumedRole, serializer=csv_serializer, content_type=CONTENT_TYPE_CSV, accept=CONTENT_TYPE_JSON)

Now invoke the NewYork_NY model:

full_model_name = 'NewYork_NY/NewYork_NY.tar.gz'
predict_one_house_value(gen_random_house()[:-1], full_model_name, predictorAssumedRole) 

You receive output similar to the following:

Using model NewYork_NY/NewYork_NY.tar.gz to predict price of this house: [1992, 1659, 2, 2.0, 0.87, 2, 'n', 'y']
$222,008.38, took 154 ms

Next, try to invoke a different model (Chicago_IL/Chicago_IL.tar.gz). This should throw an error because the assumed role isn’t authorized to invoke this model. See the following code:

full_model_name = 'Chicago_IL/Chicago_IL.tar.gz' predict_one_house_value(gen_random_house()[:-1], full_model_name,predictorAssumedRole) 

You receive output similar to the following:

ClientError: An error occurred (AccessDeniedException) when calling the InvokeEndpoint operation: User: arn:aws:sts::xxxxxxxxxxxx:assumed-role/allow_invoke_ny_model_role/MME_Invoke_NY_Model is not authorized to perform: sagemaker:InvokeEndpoint on resource: arn:aws:sagemaker:us-east-1:xxxxxxxxxxxx:endpoint/inference-pipeline-ep-2020-07-01-15-46-51

Conclusion

Amazon SageMaker MMEs are a very powerful tool for teams developing multiple ML models to save significant costs and lower deployment overhead for a large number of ML models. This post discussed the new capabilities of Amazon SageMaker MMEs: native integration with Amazon SageMaker built-in algorithms (such as linear learner and KNN), native integration with inference pipelines, and fine-grained controlled access to the multiple models hosted on a single endpoint using IAM condition keys.

The notebook included with the post provided detailed instructions on training multiple linear learner models for house price predictions for multiple locations, hosting all the models on a single MME, and controlling access to the individual models.When considering multi-model enabled endpoints, you should balance the cost savings and the latency requirements.

Give Amazon SageMaker MMEs a try and leave your feedback in the comments.


About the Author

Sireesha Muppala is a AI/ML Specialist Solutions Architect at AWS, providing guidance to customers on architecting and implementing machine learning solutions at scale. She received her Ph.D. in Computer Science from University of Colorado, Colorado Springs. In her spare time, Sireesha loves to run and hike Colorado trails.

Michael Pham is a Software Development Engineer in the Amazon SageMaker team. His current work focuses on helping developers efficiently host machine learning models. In his spare time he enjoys Olympic weightlifting, reading, and playing chess.

Source: https://aws.amazon.com/blogs/machine-learning/using-amazon-sagemaker-inference-pipelines-with-multi-model-endpoints/

Continue Reading
AI6 hours ago

Using Amazon SageMaker inference pipelines with multi-model endpoints

AI6 hours ago

Using Amazon SageMaker inference pipelines with multi-model endpoints

AI6 hours ago

Using Amazon SageMaker inference pipelines with multi-model endpoints

AI6 hours ago

Using Amazon SageMaker inference pipelines with multi-model endpoints

AI6 hours ago

Using Amazon SageMaker inference pipelines with multi-model endpoints

AI6 hours ago

Using Amazon SageMaker inference pipelines with multi-model endpoints

AI6 hours ago

Using Amazon SageMaker inference pipelines with multi-model endpoints

AI6 hours ago

Time series forecasting using unstructured data with Amazon Forecast and the Amazon SageMaker Neural Topic Model

AI6 hours ago

Time series forecasting using unstructured data with Amazon Forecast and the Amazon SageMaker Neural Topic Model

AI6 hours ago

Time series forecasting using unstructured data with Amazon Forecast and the Amazon SageMaker Neural Topic Model

AI6 hours ago

Time series forecasting using unstructured data with Amazon Forecast and the Amazon SageMaker Neural Topic Model

AI6 hours ago

Time series forecasting using unstructured data with Amazon Forecast and the Amazon SageMaker Neural Topic Model

AI6 hours ago

Time series forecasting using unstructured data with Amazon Forecast and the Amazon SageMaker Neural Topic Model

AI6 hours ago

Time series forecasting using unstructured data with Amazon Forecast and the Amazon SageMaker Neural Topic Model

AI8 hours ago

Performing batch fraud predictions using Amazon Fraud Detector, Amazon S3, and AWS Lambda

AI8 hours ago

Performing batch fraud predictions using Amazon Fraud Detector, Amazon S3, and AWS Lambda

AI8 hours ago

Performing batch fraud predictions using Amazon Fraud Detector, Amazon S3, and AWS Lambda

AI8 hours ago

Performing batch fraud predictions using Amazon Fraud Detector, Amazon S3, and AWS Lambda

AI8 hours ago

Performing batch fraud predictions using Amazon Fraud Detector, Amazon S3, and AWS Lambda

AI8 hours ago

Performing batch fraud predictions using Amazon Fraud Detector, Amazon S3, and AWS Lambda

Trending