Examples#

This page introduces examples about Ray virtual clusters:

RayCluster Deployment#

This section demonstrates how to submit and deploy Ray Cluster to kubernetes using kubectl. The provided example showcases a RayCluster (ray-cluster.sample.yaml) that interacts with kubernetes, retrieves RayCluster name, pods, services for cluster management.

Create a ray cluster yaml file: ray-cluster.sample.yaml

# This example config does not specify resource requests or limits.
# For examples with more realistic resource configuration, see
# ray-cluster.complete.large.yaml and
# ray-cluster.autoscaler.large.yaml.
apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: raycluster-sample
spec:
  rayVersion: '2.9.0' # should match the Ray version in the image of the containers
  # Ray head pod template
  headGroupSpec:
    serviceType: ClusterIP # optional
    rayStartParams:
      dashboard-host: '0.0.0.0'
      block: 'true'
    #pod template
    template:
      metadata:
        labels:
          test: a
      spec:
        containers:
        - name: ray-head
          image: ray-image:antray-open
          resources:
            limits:
              cpu: 1
              memory: 2Gi
              ephemeral-storage: 2Gi
            requests:
              cpu: 500m
              memory: 2Gi
              ephemeral-storage: 2Gi
          env:
          - name: RAY_NODE_TYPE_NAME
            value: group1
          ports:
          - containerPort: 6379
            name: gcs-server
          - containerPort: 8265 # Ray dashboard
            name: dashboard
          - containerPort: 10001
            name: client
  workerGroupSpecs:
    # the pod replicas in this group typed worker
    - replicas: 1
      minReplicas: 1
      maxReplicas: 5
      # logical group name, for this called small-group, also can be functional
      groupName: small-group-0

      rayStartParams:
        block: 'true'
      #pod template
      template:
        metadata:
          labels:
            test: a
        spec:
          containers:
            - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc'
              image: ray-image:antray-open
              env:
              - name: RAY_NODE_TYPE_NAME
                value: group0
              resources:
                limits:
                  cpu: 1
                  memory: 1Gi
                  ephemeral-storage: 1Gi
                requests:
                  cpu: 500m
                  memory: 1Gi
                  ephemeral-storage: 1Gi
    # the pod replicas in this group typed worker
    - replicas: 1
      minReplicas: 1
      maxReplicas: 5
      # logical group name, for this called small-group, also can be functional
      groupName: small-group-1
      rayStartParams:
        block: 'true'
      #pod template
      template:
        metadata:
          labels:
            test: a
        spec:
          containers:
            - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc'
              image: ray-image:antray-open
              env:
              - name: RAY_NODE_TYPE_NAME
                value: group1
              resources:
                limits:
                  cpu: 1
                  memory: 1Gi
                  ephemeral-storage: 1Gi
                requests:
                  cpu: 500m
                  memory: 1Gi
                  ephemeral-storage: 1Gi

Create RayCluster

kubectl create -f  ray-cluster.sample.yaml

Get RayCluster

$ kubectl get rayclusters.ray.io --sort-by='{.metadata.creationTimestamp}'
NAME                    DESIRED WORKERS   AVAILABLE WORKERS   STATUS   AGE
raycluster-sample       1                                     ready    3d14h

Get RayCluster Pods

$ kubectl get po -owide --sort-by='{.metadata.creationTimestamp}'  -l ray.io/cluster=raycluster-sample
NAME                                           READY   STATUS    RESTARTS   AGE     IP              NODE       NOMINATED NODE   READINESS GATES
raycluster-sample-head-z779h                   1/1     Running   0          3d14h   100.88.92.34    yg61001t   <none>           1/1
raycluster-sample-worker-small-group-0-lf9gg   1/1     Running   1          3d14h   100.88.92.192   yg61001t   <none>           1/1
raycluster-sample-worker-small-group-1-lf9gg   1/1     Running   1          3d14h   100.88.92.191   yg61001t   <none>           1/1

Simple Job#

This section demonstrates how to execute and submit Ray jobs to both Divisible and Indivisible Virtual Clusters using the Ray CLI. The provided example showcases a Python script (test.py) that interacts with different components within a Ray cluster, retrieves node IDs, and utilizes placement groups for resource management.

import ray
import sys

# Initialize Ray and connect to the existing cluster
ray.init(address='auto')

@ray.remote
class Actor:
    def __init__(self):
        pass

    def run(self):
        # Retrieve and return the node ID where this actor is running
        return ray.get_runtime_context().get_node_id()

@ray.remote
def hello():
    # Retrieve and return the node ID where this task is running
    return ray.get_runtime_context().get_node_id()

# Create a placement group with 1 CPU
pg = ray.util.placement_group(
    bundles=[{"CPU": 1}], name="pg_name"
)
ray.get(pg.ready())

# Execute a remote task to get the node ID
node_id_task = ray.get(hello.remote())
print("node_id:task: ", node_id_task)

# Create a detached actor and get its node ID
actor = Actor.options(name="test_actor", namespace="test", lifetime="detached").remote()
node_id_actor = ray.get(actor.run.remote())
print("node_id:actor: ", node_id_actor)

# Get the node ID associated with the placement group
placement_group_table = ray.util.placement_group_table(pg)
node_id_pg = placement_group_table["bundles_to_node_id"][0]
print("node_id:placement_group: ", node_id_pg)

Submitting to a Divisible Cluster#

Submitting a job to a Divisible Cluster involves specifying the –virtual-cluster-id and defining the replica sets.

Command:

ray job submit --working-dir . --virtual-cluster-id kPrimaryClusterID --replica-sets '{"group0": 1}' -- python test.py

Command Breakdown:

  • –working-dir .: Sets the current directory as the working directory for the job.

  • –virtual-cluster-id kPrimaryClusterID: Specifies the Divisible Cluster named PrimaryCluster to which the job is submitted.

  • –replica-sets ‘{“group0”: 1}’: Defines the replica set configuration, requesting 1 replica in group0.

  • – python test.py: Indicates the Python script to execute.

Submitting to an Indivisible Cluster#

Submitting a job to a Indivisible Cluster involves specifying the –virtual-cluster-id

Command:

ray job submit --working-dir . --virtual-cluster-id indivisibleLogicalID -- python test.py

Command Breakdown:

  • –working-dir .: Sets the current directory as the working directory for the job.

  • –virtual-cluster-id indivisibleLogicalID: Specifies the Indivisible Cluster named indivisibleLogicalID to which the job is submitted.

  • –python test.py: Indicates the Python script to execute.

RayData Job#

Let’s now submit a RayData job. The whole process is as same as submitting a simple job, where the execution parallelism will be deduced based on the certain virtual cluster’s resources, and operator executor, i.e. Ray Actors, will also be restricted inside the virtual cluster.

A batch inference job#

Since we only want to demonstrate the process, we simplify the job by mocking a inference model that returns True when the number of passengers is more than 2, and vice versa.

# Solution 1: Batch Inference with a Self-Maintained Pool of Actors
print("Batch inference with a self-maintained pool of actors.")
import pandas as pd
import pyarrow.parquet as pq
import ray

def load_trained_model():
    # A fake model that predicts whether tips were given based on
    # the number of passengers in the taxi cab.
    def model(batch: pd.DataFrame) -> pd.DataFrame:
        # Give a tip if 2 or more passengers.
        predict = batch["passenger_count"] >= 2
        return pd.DataFrame({"score": predict})
    return model

model = load_trained_model()
model_ref = ray.put(model)
input_splits = [f"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet"
                f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet"
                for i in range(12) ]

ds = ray.data.read_parquet(input_splits)

class CallableCls:
    def __init__(self, model):
        self.model = ray.get(model)

    def __call__(self, batch):
        result = self.model(batch)
        return result

results = ds.map_batches(
    CallableCls,
    num_gpus=0,
    batch_size=1024,
    batch_format="numpy",
    compute=ray.data.ActorPoolStrategy(min_size=1, max_size=5),
    fn_constructor_args=(model_ref,))

print(results.take(5))

Name it as NYC_taxi_predict.pyand put it to /path/to/project/NYC_taxi_predict.py.

Submitting to a Divisible Cluster#

Submitting the job is exactly as the same as before:

Command:

ray job submit --working-dir /path/to/project/ --virtual-cluster-id kPrimaryClusterID --replica-sets '{"group0": 1}' -- python NYC_taxi_predict.py

Command Breakdown:

  • –working-dir .: Sets /path/to/project/ as the working directory for the job.

  • –virtual-cluster-id kPrimaryClusterID: Specifies the Divisible Cluster named PrimaryCluster to which the job is submitted.

  • –replica-sets ‘{“group0”: 1}’: Defines the replica set configuration, requesting 1 replica in group0.

  • –python NYC_taxi_predict.py: Indicates the Python script to execute.

Submitting to an Indivisible Cluster#

Submitting a job to a Indivisible Cluster involves specifying the –virtual-cluster-id

Command:

ray job submit --working-dir /path/to/project/ --virtual-cluster-id indivisibleLogicalID -- python NYC_taxi_predict.py

Command Breakdown:

  • –working-dir .: Sets the current directory as the working directory for the job.

  • –virtual-cluster-id indivisibleLogicalID: Specifies the Indivisible Cluster named indivisibleLogicalID to which the job is submitted.

  • –python test.py: Indicates the Python script to execute.