Dataset API#
Dataset#
- class ray.data.Dataset(plan: ExecutionPlan, logical_plan: LogicalPlan)[source]#
 A Dataset is a distributed data collection for data loading and processing.
Datasets are distributed pipelines that produce
ObjectRef[Block]outputs, where each block holds data in Arrow format, representing a shard of the overall data collection. The block also determines the unit of parallelism. For more details, see Ray Data Internals.Datasets can be created in multiple ways: from synthetic data via
range_*()APIs, from existing memory data viafrom_*()APIs (this creates a subclass of Dataset calledMaterializedDataset), or from external storage systems such as local disk, S3, HDFS etc. via theread_*()APIs. The (potentially processed) Dataset can be saved back to external storage systems via thewrite_*()APIs.Examples
import ray # Create dataset from synthetic data. ds = ray.data.range(1000) # Create dataset from in-memory data. ds = ray.data.from_items( [{"col1": i, "col2": i * 2} for i in range(1000)] ) # Create dataset from external storage system. ds = ray.data.read_parquet("s3://bucket/path") # Save dataset back to external storage system. ds.write_csv("s3://bucket/output")
Dataset has two kinds of operations: transformation, which takes in Dataset and outputs a new Dataset (e.g.
map_batches()); and consumption, which produces values (not a data stream) as output (e.g.iter_batches()).Dataset transformations are lazy, with execution of the transformations being triggered by downstream consumption.
Dataset supports parallel processing at scale: transformations such as
map_batches(), aggregations such asmin()/max()/mean(), grouping viagroupby(), shuffling operations such assort(),random_shuffle(), andrepartition().Examples
>>> import ray >>> ds = ray.data.range(1000) >>> # Transform batches (Dict[str, np.ndarray]) with map_batches(). >>> ds.map_batches(lambda batch: {"id": batch["id"] * 2}) MapBatches(<lambda>) +- Dataset(num_rows=1000, schema={id: int64}) >>> # Compute the maximum. >>> ds.max("id") 999 >>> # Shuffle this dataset randomly. >>> ds.random_shuffle() RandomShuffle +- Dataset(num_rows=1000, schema={id: int64}) >>> # Sort it back in order. >>> ds.sort("id") Sort +- Dataset(num_rows=1000, schema={id: int64})
Both unexecuted and materialized Datasets can be passed between Ray tasks and actors without incurring a copy. Dataset supports conversion to/from several more featureful dataframe libraries (e.g., Spark, Dask, Modin, MARS), and are also compatible with distributed TensorFlow / PyTorch.
Basic Transformations#
Add the given column to the dataset.  | 
|
Drop one or more columns from the dataset.  | 
|
Filter out rows that don't satisfy the given predicate.  | 
|
Apply the given function to each row and then flatten results.  | 
|
Truncate the dataset to the first   | 
|
Apply the given function to each row of this dataset.  | 
|
Apply the given function to batches of data.  | 
|
Returns a new   | 
|
Rename columns in the dataset.  | 
|
Select one or more columns from the dataset.  | 
Consuming Data#
Return an iterable over batches of data.  | 
|
Return an iterable over the rows in this dataset.  | 
|
Return an iterable over batches of data represented as Torch tensors.  | 
|
Return a   | 
|
Print up to the given number of rows from the   | 
|
Return up to   | 
|
Return all of the rows in this   | 
|
Return up to   | 
Execution#
Execute and materialize this dataset into object store memory.  | 
Grouped and Global aggregations#
Aggregate values using one or more functions.  | 
|
Group rows of a   | 
|
Return the maximum of one or more columns.  | 
|
Compute the mean of one or more columns.  | 
|
Return the minimum of one or more columns.  | 
|
Compute the standard deviation of one or more columns.  | 
|
Compute the sum of one or more columns.  | 
|
List the unique elements in a given column.  | 
I/O and Conversion#
Convert this   | 
|
Convert this   | 
|
Convert this   | 
|
Convert this   | 
|
Convert this   | 
|
Return a TensorFlow Dataset over this   | 
|
Writes the   | 
|
Writes the   | 
|
Writes the   | 
|
Writes the   | 
|
Writes a column of the   | 
|
Writes the   | 
|
Write the   | 
|
Writes the dataset to WebDataset files.  | 
Inspecting Metadata#
Returns the columns of this Dataset.  | 
|
Count the number of rows in the dataset.  | 
|
Return the list of input files for the dataset.  | 
|
Return the number of blocks of this   | 
|
Return the schema of the dataset.  | 
|
Return the in-memory size of the dataset.  | 
|
Returns a string containing execution timing information.  | 
Sorting, Shuffling and Repartitioning#
Randomly shuffle the rows of this   | 
|
Sort the dataset by the specified key column or key function.  | 
Splitting and Merging datasets#
Materialize and split the dataset into   | 
|
Materialize and split the dataset at the given indices (like   | 
|
Materialize and split the dataset using proportions.  | 
|
Returns   | 
|
Materialize and split the dataset into train and test subsets.  | 
|
Concatenate   | 
|
Zip the columns of this dataset with the columns of another.  | 
Schema#
- class ray.data.Schema(base_schema: pyarrow.lib.Schema | PandasBlockSchema, *, data_context: DataContext | None = None)[source]#
 Dataset schema.
- base_schema#
 The underlying Arrow or Pandas schema.
PublicAPI (beta): This API is in beta and may change before becoming stable.
Developer API#
Converts this   | 
|
Converts this   | 
|
Convert this   | 
|
Get an iterator over   | 
|
alias of   | 
|
Execution stats for this block.  | 
|
Metadata about the block.  | 
|
Provides accessor methods for a specific block.  | 
Deprecated API#
Return an iterable over batches of data represented as TensorFlow tensors.  | 
|
Return a Torch IterableDataset over this   |