ray.data.Dataset.streaming_split#
- Dataset.streaming_split(n: int, *, equal: bool = False, locality_hints: List[NodeIdStr] | None = None) List[DataIterator][source]#
Returns
nDataIteratorsthat can be used to read disjoint subsets of the dataset in parallel.This method is the recommended way to consume
Datasetsfor distributed training.Streaming split works by delegating the execution of this
Datasetto a coordinator actor. The coordinator pulls block references from the executed stream, and divides those blocks amongnoutput iterators. Iterators pull blocks from the coordinator actor to return to their caller onnext.The returned iterators are also repeatable; each iteration will trigger a new execution of the Dataset. There is an implicit barrier at the start of each iteration, which means that
nextmust be called on all iterators before the iteration starts.Warning
Because iterators are pulling blocks from the same
Datasetexecution, if one iterator falls behind, other iterators may be stalled.Note
This operation will trigger execution of the lazy transformations performed on this dataset.
Examples
import ray ds = ray.data.range(100) it1, it2 = ds.streaming_split(2, equal=True)
Consume data from iterators in parallel.
@ray.remote def consume(it): for batch in it.iter_batches(): pass ray.get([consume.remote(it1), consume.remote(it2)])
You can loop over the iterators multiple times (multiple epochs).
@ray.remote def train(it): NUM_EPOCHS = 2 for _ in range(NUM_EPOCHS): for batch in it.iter_batches(): pass ray.get([train.remote(it1), train.remote(it2)])
The following remote function call blocks waiting for a read on
it2to start.ray.get(train.remote(it1))
- Parameters:
n – Number of output iterators to return.
equal – If
True, each output iterator sees an exactly equal number of rows, dropping data if necessary. IfFalse, some iterators may see slightly more or less rows than others, but no data is dropped.locality_hints – Specify the node ids corresponding to each iterator location. Dataset will try to minimize data movement based on the iterator output locations. This list must have length
n. You can get the current node id of a task or actor by callingray.get_runtime_context().get_node_id().
- Returns:
The output iterator splits. These iterators are Ray-serializable and can be freely passed to any Ray task or actor.
See also
Dataset.split()Unlike
streaming_split(),split()materializes the dataset in memory.