Monday, October 2, 2023
HomeAIOrchestrate Ray-based machine studying workflows utilizing Amazon SageMaker

Orchestrate Ray-based machine studying workflows utilizing Amazon SageMaker

Machine studying (ML) is changing into more and more complicated as clients attempt to remedy increasingly more difficult issues. This complexity typically results in the necessity for distributed ML, the place a number of machines are used to coach a single mannequin. Though this permits parallelization of duties throughout a number of nodes, resulting in accelerated coaching instances, enhanced scalability, and improved efficiency, there are vital challenges in successfully utilizing distributed {hardware}. Knowledge scientists have to deal with challenges like knowledge partitioning, load balancing, fault tolerance, and scalability. ML engineers should deal with parallelization, scheduling, faults, and retries manually, requiring complicated infrastructure code.

On this put up, we talk about the advantages of utilizing Ray and Amazon SageMaker for distributed ML, and supply a step-by-step information on use these frameworks to construct and deploy a scalable ML workflow.

Ray, an open-source distributed computing framework, supplies a versatile framework for distributed coaching and serving of ML fashions. It abstracts away low-level distributed system particulars by way of easy, scalable libraries for widespread ML duties resembling knowledge preprocessing, distributed coaching, hyperparameter tuning, reinforcement studying, and mannequin serving.

SageMaker is a totally managed service for constructing, coaching, and deploying ML fashions. Ray seamlessly integrates with SageMaker options to construct and deploy complicated ML workloads which are each environment friendly and dependable. The mixture of Ray and SageMaker supplies end-to-end capabilities for scalable ML workflows, and has the next highlighted options:

  • Distributed actors and parallelism constructs in Ray simplify creating distributed functions.
  • Ray AI Runtime (AIR) reduces friction of going from improvement to manufacturing. With Ray and AIR, the identical Python code can scale seamlessly from a laptop computer to a big cluster.
  • The managed infrastructure of SageMaker and options like processing jobs, coaching jobs, and hyperparameter tuning jobs can use Ray libraries beneath for distributed computing.
  • Amazon SageMaker Experiments permits quickly iterating and maintaining observe of trials.
  • Amazon SageMaker Characteristic Retailer supplies a scalable repository for storing, retrieving, and sharing ML options for mannequin coaching.
  • Educated fashions could be saved, versioned, and tracked in Amazon SageMaker Mannequin Registry for governance and administration.
  • Amazon SageMaker Pipelines permits orchestrating the end-to-end ML lifecycle from knowledge preparation and coaching to mannequin deployment as automated workflows.

Resolution overview

This put up focuses on the advantages of utilizing Ray and SageMaker collectively. We arrange an end-to-end Ray-based ML workflow, orchestrated utilizing SageMaker Pipelines. The workflow consists of parallel ingestion of knowledge into the function retailer utilizing Ray actors, knowledge preprocessing with Ray Knowledge, coaching fashions and hyperparameter tuning at scale utilizing Ray Practice and hyperparameter optimization (HPO) tuning jobs, and eventually mannequin analysis and registering the mannequin right into a mannequin registry.

For our knowledge, we use an artificial housing dataset that consists of eight options (YEAR_BUILT, SQUARE_FEET, NUM_BEDROOM, NUM_BATHROOMS, LOT_ACRES, GARAGE_SPACES, FRONT_PORCH, and DECK) and our mannequin will predict the PRICE of the home.

Every stage within the ML workflow is damaged into discrete steps, with its personal script that takes enter and output parameters. Within the subsequent part, we spotlight key code snippets from every step. The complete code could be discovered on the aws-samples-for-ray GitHub repository.


To make use of the SageMaker Python SDK and run the code related to this put up, you want the next stipulations:

Ingest knowledge into SageMaker Characteristic Retailer

Step one within the ML workflow is to learn the supply knowledge file from Amazon Easy Storage Service (Amazon S3) in CSV format and ingest it into SageMaker Characteristic Retailer. SageMaker Characteristic Retailer is a purpose-built repository that makes it straightforward for groups to create, share, and handle ML options. It simplifies function discovery, reuse, and sharing, resulting in sooner improvement, elevated collaboration inside buyer groups, and diminished prices.

Ingesting options into the function retailer comprises the next steps:

  1. Outline a function group and create the function group within the function retailer.
  2. Put together the supply knowledge for the function retailer by including an occasion time and report ID for every row of knowledge.
  3. Ingest the ready knowledge into the function group through the use of the Boto3 SDK.

On this part, we solely spotlight Step 3, as a result of that is the half that includes parallel processing of the ingestion process utilizing Ray. You’ll be able to overview the complete code for this course of within the GitHub repo.

The ingest_features methodology is outlined inside a category known as Featurestore. Observe that the Featurestore class is adorned with @ray.distant. This means that an occasion of this class is a Ray actor, a stateful and concurrent computational unit inside Ray. It’s a programming mannequin that means that you can create distributed objects that keep an inner state and could be accessed concurrently by a number of duties working on totally different nodes in a Ray cluster. Actors present a option to handle and encapsulate the mutable state, making them helpful for constructing complicated, stateful functions in a distributed setting. You’ll be able to specify useful resource necessities in actors too. On this case, every occasion of the FeatureStore class would require 0.5 CPUs. See the next code:

class Featurestore:
    def ingest_features(self,feature_group_name, df, area):
        Ingest options to Characteristic Retailer Group
            feature_group_name (str): Characteristic Group Title
            data_path (str): Path to the prepare/validation/take a look at knowledge in CSV format.

You’ll be able to work together with the actor by calling the distant operator. Within the following code, the specified variety of actors is handed in as an enter argument to the script. The info is then partitioned based mostly on the variety of actors and handed to the distant parallel processes to be ingested into the function retailer. You’ll be able to name get on the item ref to dam the execution of the present process till the distant computation is full and the result’s obtainable. When the result’s obtainable, ray.get will return the outcome, and the execution of the present process will proceed.

import modin.pandas as pd
import ray

df = pd.read_csv(s3_path)
knowledge = prepare_df_for_feature_store(df)
# Break up into partitions
partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
# Begin actors and assign partitions in a loop
actors = [Featurestore.remote() for _ in range(args.num_actors)]
outcomes = []

for actor, partition in zip(actors, input_partitions):
                        partition, args.area


Put together knowledge for coaching, validation, and testing

On this step, we use Ray Dataset to effectively break up, remodel, and scale our dataset in preparation for machine studying. Ray Dataset supplies a normal option to load distributed knowledge into Ray, supporting varied storage programs and file codecs. It has APIs for widespread ML knowledge preprocessing operations like parallel transformations, shuffling, grouping, and aggregations. Ray Dataset additionally handles operations needing stateful setup and GPU acceleration. It integrates easily with different knowledge processing libraries like Spark, Pandas, NumPy, and extra, in addition to ML frameworks like TensorFlow and PyTorch. This enables constructing end-to-end knowledge pipelines and ML workflows on prime of Ray. The aim is to make distributed knowledge processing and ML simpler for practitioners and researchers.

Let’s have a look at sections of the scripts that carry out this knowledge preprocessing. We begin by loading the info from the function retailer:

def load_dataset(feature_group_name, area):
    Hundreds the info as a ray dataset from the offline featurestore S3 location
        feature_group_name (str): title of the function group
        ds (ray.knowledge.dataset): Ray dataset the comprises the requested dat from the function retailer
    session = sagemaker.Session(boto3.Session(region_name=area))
    fs_group = FeatureGroup(

    fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri")
    # Drop columns added by the function retailer
    # Since these should not associated to the ML downside at hand
    cols_to_drop = ["record_id", "event_time","write_time", 
                    "api_invocation_time", "is_deleted", 
                    "year", "month", "day", "hour"]           

    ds = ray.knowledge.read_parquet(fs_data_loc)
    ds = ds.drop_columns(cols_to_drop)
    print(f"{fs_data_loc} depend is {ds.depend()}")
    return ds

We then break up and scale knowledge utilizing the higher-level abstractions obtainable from the ray.knowledge library:

def split_dataset(dataset, train_size, val_size, test_size, random_state=None):
    Break up dataset into prepare, validation and take a look at samples
        dataset (ray.knowledge.Dataset): enter knowledge
        train_size (float): ratio of knowledge to make use of as coaching dataset
        val_size (float): ratio of knowledge to make use of as validation dataset
        test_size (float): ratio of knowledge to make use of as take a look at dataset
        random_state (int): Move an int for reproducible output throughout a number of operate calls.
        train_set (ray.knowledge.Dataset): prepare dataset
        val_set (ray.knowledge.Dataset): validation dataset
        test_set (ray.knowledge.Dataset): take a look at dataset
    # Shuffle this dataset with a set random seed.
    shuffled_ds = dataset.random_shuffle(seed=random_state)
    # Break up the info into prepare, validation and take a look at datasets
    train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size])
    return train_set, val_set, test_set

def scale_dataset(train_set, val_set, test_set, target_col):
    Match StandardScaler to train_set and apply it to val_set and test_set
        train_set (ray.knowledge.Dataset): prepare dataset
        val_set (ray.knowledge.Dataset): validation dataset
        test_set (ray.knowledge.Dataset): take a look at dataset
        target_col (str): goal col
        train_transformed (ray.knowledge.Dataset): prepare knowledge scaled
        val_transformed (ray.knowledge.Dataset): val knowledge scaled
        test_transformed (ray.knowledge.Dataset): take a look at knowledge scaled
    tranform_cols = dataset.columns()
    # Take away the goal columns from being scaled
    tranform_cols.take away(target_col)
    # arrange a normal scaler
    standard_scaler = StandardScaler(tranform_cols)
    # match scaler to coaching dataset
    print("Becoming scaling to coaching knowledge and reworking dataset...")
    train_set_transformed = standard_scaler.fit_transform(train_set)
    # apply scaler to validation and take a look at datasets
    print("Remodeling validation and take a look at datasets...")
    val_set_transformed = standard_scaler.remodel(val_set)
    test_set_transformed = standard_scaler.remodel(test_set)
    return train_set_transformed, val_set_transformed, test_set_transformed

The processed prepare, validation, and take a look at datasets are saved in Amazon S3 and will probably be handed because the enter parameters to subsequent steps.

Carry out mannequin coaching and hyperparameter optimization

With our knowledge preprocessed and prepared for modeling, it’s time to coach some ML fashions and fine-tune their hyperparameters to maximise predictive efficiency. We use XGBoost-Ray, a distributed backend for XGBoost constructed on Ray that allows coaching XGBoost fashions on massive datasets through the use of a number of nodes and GPUs. It supplies easy drop-in replacements for XGBoost’s prepare and predict APIs whereas dealing with the complexities of distributed knowledge administration and coaching underneath the hood.

To allow distribution of the coaching over a number of nodes, we make the most of a helper class named RayHelper. As proven within the following code, we use the useful resource configuration of the coaching job and select the primary host as the pinnacle node:

class RayHelper():
    def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"):
        self.resource_config = self.get_resource_config()
        self.head_host = self.resource_config["hosts"][0]
        self.n_hosts = len(self.resource_config["hosts"])

We will use the host info to determine initialize Ray on every of the coaching job situations:

def start_ray(self): 
   head_ip = self._get_ip_from_host()
   # If the present host is the host choosen as the pinnacle node
   # run `ray begin` with specifying the --head flag making that is the pinnacle node
    if self.resource_config["current_host"] == self.head_host:
        output =['ray', 'start', '--head', '-vvv', '--port', 
        self.ray_port, '--redis-password', self.redis_pass, 
        '--include-dashboard', 'false'], stdout=subprocess.PIPE)
        ray.init(deal with="auto", include_dashboard=False)
        print("All employees current and accounted for")

       # If the present host just isn't the pinnacle node, 
       # run `ray begin` with specifying ip deal with because the head_host as the pinnacle node
        output =['ray', 'start', 
        '--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE)

When a coaching job is began, a Ray cluster could be initialized by calling the start_ray() methodology on an occasion of RayHelper:

if __name__ == '__main__':
    ray_helper = RayHelper()
    args = read_parameters()
    sess = sagemaker.Session(boto3.Session(region_name=args.area))

We then use the XGBoost coach from XGBoost-Ray for coaching:

def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "value") -> End result:
    Creates a XGBoost coach, prepare it, and return the outcome.        
        ds_train (ray.knowledge.dataset): Coaching dataset
        ds_val (ray.knowledge.dataset): Validation dataset
        params (dict): Hyperparameters
        num_workers (int): variety of employees to distribute the coaching throughout
        target_col (str): goal column
        outcome (ray.air.outcome.End result): Results of the coaching job
    train_set = RayDMatrix(ds_train, 'PRICE')
    val_set = RayDMatrix(ds_val, 'PRICE')
    evals_result = {}
    coach = prepare(
        evals=[(val_set, "validation")],
        ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1),
    ) part of(args.model_dir, 'mannequin.xgb')
    valMAE = evals_result["validation"]["mae"][-1]
    valRMSE = evals_result["validation"]["rmse"][-1]
    print('[3] #011validation-mae:{}'.format(valMAE))
    print('[4] #011validation-rmse:{}'.format(valRMSE))
    local_testing = False
        local_testing = True
    if not local_testing: # Monitor experiment if utilizing SageMaker Coaching
        with load_run(sagemaker_session=sess) as run:
            run.log_metric('validation-mae', valMAE)
            run.log_metric('validation-rmse', valRMSE)

Observe that whereas instantiating the coach, we cross RayParams, which takes the quantity actors and variety of CPUs per actors. XGBoost-Ray makes use of this info to distribute the coaching throughout all of the nodes connected to the Ray cluster.

We now create a XGBoost estimator object based mostly on the SageMaker Python SDK and use that for the HPO job.

Orchestrate the previous steps utilizing SageMaker Pipelines

To construct an end-to-end scalable and reusable ML workflow, we have to use a CI/CD software to orchestrate the previous steps right into a pipeline. SageMaker Pipelines has direct integration with SageMaker, the SageMaker Python SDK, and SageMaker Studio. This integration means that you can create ML workflows with an easy-to-use Python SDK, after which visualize and handle your workflow utilizing SageMaker Studio. It’s also possible to observe the historical past of your knowledge inside the pipeline execution and designate steps for caching.

SageMaker Pipelines creates a Directed Acyclic Graph (DAG) that features steps wanted to construct an ML workflow. Every pipeline is a sequence of interconnected steps orchestrated by knowledge dependencies between steps, and could be parameterized, permitting you to supply enter variables as parameters for every run of the pipeline. SageMaker Pipelines has 4 varieties of pipeline parameters: ParameterString, ParameterInteger, ParameterFloat, and ParameterBoolean. On this part, we parameterize a number of the enter variables and arrange the step caching configuration:

processing_instance_count = ParameterInteger(
feature_group_name = ParameterString(
bucket_prefix = ParameterString(
rmse_threshold = ParameterFloat(title="RMSEThreshold", default_value=15000.0)
    train_size = ParameterString(
val_size = ParameterString(
test_size = ParameterString(

cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

We outline two processing steps: one for SageMaker Characteristic Retailer ingestion, the opposite for knowledge preparation. This could look similar to the earlier steps described earlier. The one new line of code is the ProcessingStep after the steps’ definition, which permits us to take the processing job configuration and embrace it as a pipeline step. We additional specify the dependency of the info preparation step on the SageMaker Characteristic Retailer ingestion step. See the next code:

feature_store_ingestion_step = ProcessingStep(

preprocess_dataset_step = ProcessingStep(

Equally, to construct a mannequin coaching and tuning step, we have to add a definition of TuningStep after the mannequin coaching step’s code to permit us to run SageMaker hyperparameter tuning as a step within the pipeline:

tuning_step = TuningStep(
        "prepare": TrainingInput(
            content_type="textual content/csv"
        "validation": TrainingInput(
            content_type="textual content/csv"

After the tuning step, we select to register the perfect mannequin into SageMaker Mannequin Registry. To regulate the mannequin high quality, we implement a minimal high quality gate that compares the perfect mannequin’s goal metric (RMSE) in opposition to a threshold outlined because the pipeline’s enter parameter rmse_threshold. To do that analysis, we create one other processing step to run an analysis script. The mannequin analysis outcome will probably be saved as a property file. Property information are significantly helpful when analyzing the outcomes of a processing step to determine how different steps ought to be run. See the next code:

# Specify the place we'll retailer the mannequin analysis outcomes in order that different steps can entry these outcomes
evaluation_report = PropertyFile(

# A ProcessingStep is used to judge the efficiency of a specific mannequin from the HPO step. 
# On this case, the highest performing mannequin is evaluated. 
evaluation_step = ProcessingStep(
                top_k=0, s3_bucket=bucket, prefix=s3_prefix
            vacation spot='/decide/ml/processing/take a look at',
            output_name="evaluation", source="/opt/ml/processing/evaluation"

We outline a ModelStep to register the perfect mannequin into SageMaker Mannequin Registry in our pipeline. In case the perfect mannequin doesn’t cross our predetermined high quality verify, we moreover specify a FailStep to output an error message:

register_step = ModelStep(

metrics_fail_step = FailStep(
    error_message=Be part of(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),

Subsequent, we use a ConditionStep to judge whether or not the mannequin registration step or failure step ought to be taken subsequent within the pipeline. In our case, the perfect mannequin will probably be registered if its RMSE rating is decrease than the brink.

# Situation step for evaluating mannequin high quality and branching execution
cond_lte = ConditionLessThanOrEqualTo(
condition_step = ConditionStep(

Lastly, we orchestrate all of the outlined steps right into a pipeline:

step_list = [

training_pipeline = Pipeline(

# Observe: If an current pipeline has the identical title will probably be overwritten.

The previous pipeline could be visualized and executed instantly in SageMaker Studio, or be executed by calling execution = training_pipeline.begin(). The next determine illustrates the pipeline circulate.

Moreover, we will overview the lineage of artifacts generated by the pipeline execution.

from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):

Deploy the mannequin

After the perfect mannequin is registered in SageMaker Mannequin Registry through a pipeline run, we deploy the mannequin to a real-time endpoint through the use of the absolutely managed mannequin deployment capabilities of SageMaker. SageMaker has different mannequin deployment choices to satisfy the wants of various use circumstances. For particulars, confer with Deploy fashions for inference when choosing the proper possibility to your use case. First, let’s get the mannequin registered in SageMaker Mannequin Registry:

xgb_regressor_model = ModelPackage(

The mannequin’s present standing is PendingApproval. We have to set its standing to Authorised previous to deployment:



Clear up

After you might be performed experimenting, keep in mind to wash up the assets to keep away from pointless costs. To wash up, delete the real-time endpoint, mannequin group, pipeline, and have group by calling the APIs DeleteEndpoint, DeleteModelPackageGroup, DeletePipeline, and DeleteFeatureGroup, respectively, and shut down all SageMaker Studio pocket book situations.


This put up demonstrated a step-by-step walkthrough on use SageMaker Pipelines to orchestrate Ray-based ML workflows. We additionally demonstrated the potential of SageMaker Pipelines to combine with third-party ML instruments. There are numerous AWS companies that assist Ray workloads in a scalable and safe trend to make sure efficiency excellence and operational effectivity. Now, it’s your flip to discover these highly effective capabilities and begin optimizing your machine studying workflows with Amazon SageMaker Pipelines and Ray. Take motion right this moment and unlock the complete potential of your ML initiatives!

Concerning the Creator

Raju Rangan is a Senior Options Architect at Amazon Net Providers (AWS). He works with authorities sponsored entities, serving to them construct AI/ML options utilizing AWS. When not tinkering with cloud options, you’ll catch him hanging out with household or smashing birdies in a energetic sport of badminton with buddies.

Sherry Ding is a senior AI/ML specialist options architect at Amazon Net Providers (AWS). She has intensive expertise in machine studying with a PhD diploma in pc science. She primarily works with public sector clients on varied AI/ML-related enterprise challenges, serving to them speed up their machine studying journey on the AWS Cloud. When not serving to clients, she enjoys outside actions.



Please enter your comment!
Please enter your name here

- Advertisment -

Most Popular

Recent Comments