from typing import Dict, Union, Optional, TYPE_CHECKING
from ray.util.annotations import PublicAPI
if TYPE_CHECKING:
    from ray.util.placement_group import PlacementGroup
# "DEFAULT": The default hybrid scheduling strategy
# based on config scheduler_spread_threshold.
# This disables any potential placement group capture.
# "SPREAD": Spread scheduling on a best effort basis.
[docs]
@PublicAPI
class PlacementGroupSchedulingStrategy:
    """Placement group based scheduling strategy.
    Attributes:
        placement_group: the placement group this actor belongs to,
            or None if it doesn't belong to any group.
        placement_group_bundle_index: the index of the bundle
            if the actor belongs to a placement group, which may be -1 to
            specify any available bundle.
        placement_group_capture_child_tasks: Whether or not children tasks
            of this actor should implicitly use the same placement group
            as its parent. It is False by default.
    """
    def __init__(
        self,
        placement_group: "PlacementGroup",
        placement_group_bundle_index: int = -1,
        placement_group_capture_child_tasks: Optional[bool] = None,
    ):
        self.placement_group = placement_group
        self.placement_group_bundle_index = placement_group_bundle_index
        self.placement_group_capture_child_tasks = placement_group_capture_child_tasks 
[docs]
@PublicAPI
class NodeAffinitySchedulingStrategy:
    """Static scheduling strategy used to run a task or actor on a particular node.
    Attributes:
        node_id: the hex id of the node where the task or actor should run.
        soft: whether the scheduler should run the task or actor somewhere else
            if the target node doesn't exist (e.g. the node dies) or is infeasible
            during scheduling.
            If the node exists and is feasible, the task or actor
            will only be scheduled there.
            This means if the node doesn't have the available resources,
            the task or actor will wait indefinitely until resources become available.
            If the node doesn't exist or is infeasible, the task or actor
            will fail if soft is False
            or be scheduled somewhere else if soft is True.
    """
    def __init__(
        self,
        node_id: str,
        soft: bool,
        _spill_on_unavailable: bool = False,
        _fail_on_unavailable: bool = False,
    ):
        # This will be removed once we standardize on node id being hex string.
        if not isinstance(node_id, str):
            node_id = node_id.hex()
        self.node_id = node_id
        self.soft = soft
        self._spill_on_unavailable = _spill_on_unavailable
        self._fail_on_unavailable = _fail_on_unavailable 
def _validate_label_match_operator_values(values, operator):
    if not values:
        raise ValueError(
            f"The variadic parameter of the {operator} operator"
            f' must be a non-empty tuple: e.g. {operator}("value1", "value2").'
        )
    index = 0
    for value in values:
        if not isinstance(value, str):
            raise ValueError(
                f"Type of value in position {index} for the {operator} operator "
                f'must be str (e.g. {operator}("value1", "value2")) '
                f"but got {str(value)} of type {type(value)}."
            )
        index = index + 1
@PublicAPI(stability="alpha")
class In:
    def __init__(self, *values):
        _validate_label_match_operator_values(values, "In")
        self.values = list(values)
@PublicAPI(stability="alpha")
class NotIn:
    def __init__(self, *values):
        _validate_label_match_operator_values(values, "NotIn")
        self.values = list(values)
@PublicAPI(stability="alpha")
class Exists:
    def __init__(self):
        pass
@PublicAPI(stability="alpha")
class DoesNotExist:
    def __init__(self):
        pass
class _LabelMatchExpression:
    """An expression used to select node by node's labels
    Attributes:
        key: the key of label
        operator: In、NotIn、Exists、DoesNotExist
    """
    def __init__(self, key: str, operator: Union[In, NotIn, Exists, DoesNotExist]):
        self.key = key
        self.operator = operator
LabelMatchExpressionsT = Dict[str, Union[In, NotIn, Exists, DoesNotExist]]
@PublicAPI(stability="alpha")
class NodeLabelSchedulingStrategy:
    """
    Label based node affinity scheduling strategy
    scheduling_strategy=NodeLabelSchedulingStrategy({
        "region": In("us"),
        "gpu_type": Exists(),
    })
    """
    def __init__(
        self, hard: LabelMatchExpressionsT, *, soft: LabelMatchExpressionsT = None
    ):
        self.hard = _convert_map_to_expressions(hard, "hard")
        self.soft = _convert_map_to_expressions(soft, "soft")
        self._check_usage()
    def _check_usage(self):
        if not (self.hard or self.soft):
            raise ValueError(
                "The `hard` and `soft` parameter "
                "of NodeLabelSchedulingStrategy cannot both be empty."
            )
def _convert_map_to_expressions(map_expressions: LabelMatchExpressionsT, param: str):
    expressions = []
    if map_expressions is None:
        return expressions
    if not isinstance(map_expressions, Dict):
        raise ValueError(
            f'The {param} parameter must be a map (e.g. {{"key1": In("value1")}}) '
            f"but got type {type(map_expressions)}."
        )
    for key, value in map_expressions.items():
        if not isinstance(key, str):
            raise ValueError(
                f"The map key of the {param} parameter must "
                f'be of type str (e.g. {{"key1": In("value1")}}) '
                f"but got {str(key)} of type {type(key)}."
            )
        if not isinstance(value, (In, NotIn, Exists, DoesNotExist)):
            raise ValueError(
                f"The map value for key {key} of the {param} parameter "
                f"must be one of the `In`, `NotIn`, `Exists` or `DoesNotExist` "
                f'operator (e.g. {{"key1": In("value1")}}) '
                f"but got {str(value)} of type {type(value)}."
            )
        expressions.append(_LabelMatchExpression(key, value))
    return expressions
SchedulingStrategyT = Union[
    None,
    str,  # Literal["DEFAULT", "SPREAD"]
    PlacementGroupSchedulingStrategy,
    NodeAffinitySchedulingStrategy,
    NodeLabelSchedulingStrategy,
]