import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
import ray.cloudpickle as pickle
from ray._private.ray_logging.logging_config import LoggingConfig
from ray.util.annotations import PublicAPI
if TYPE_CHECKING:
    from ray.runtime_env import RuntimeEnv
[docs]
@PublicAPI
class JobConfig:
    """A class used to store the configurations of a job.
    Examples:
        .. testcode::
            :hide:
            import ray
            ray.shutdown()
        .. testcode::
            import ray
            from ray.job_config import JobConfig
            ray.init(job_config=JobConfig(default_actor_lifetime="non_detached"))
    Args:
        jvm_options: The jvm options for java workers of the job.
        code_search_path: A list of directories or jar files that
            specify the search path for user code. This will be used as
            `CLASSPATH` in Java and `PYTHONPATH` in Python.
            See :ref:`Ray cross-language programming <cross_language>` for more details.
        runtime_env: A :ref:`runtime environment <runtime-environments>` dictionary.
        metadata: An opaque metadata dictionary.
        ray_namespace: A :ref:`namespace <namespaces-guide>`
            is a logical grouping of jobs and named actors.
        default_actor_lifetime: The default value of actor lifetime,
            can be "detached" or "non_detached".
            See :ref:`actor lifetimes <actor-lifetimes>` for more details.
    """
    def __init__(
        self,
        jvm_options: Optional[List[str]] = None,
        code_search_path: Optional[List[str]] = None,
        runtime_env: Optional[dict] = None,
        _client_job: bool = False,
        metadata: Optional[dict] = None,
        ray_namespace: Optional[str] = None,
        default_actor_lifetime: str = "non_detached",
        _py_driver_sys_path: Optional[List[str]] = None,
    ):
        #: The jvm options for java workers of the job.
        self.jvm_options = jvm_options or []
        #: A list of directories or jar files that
        #: specify the search path for user code.
        self.code_search_path = code_search_path or []
        # It's difficult to find the error that caused by the
        # code_search_path is a string. So we assert here.
        assert isinstance(self.code_search_path, (list, tuple)), (
            f"The type of code search path is incorrect: " f"{type(code_search_path)}"
        )
        self._client_job = _client_job
        #: An opaque metadata dictionary.
        self.metadata = metadata or {}
        #: A namespace is a logical grouping of jobs and named actors.
        self.ray_namespace = ray_namespace
        self.set_runtime_env(runtime_env)
        self.set_default_actor_lifetime(default_actor_lifetime)
        # A list of directories that specify the search path for python workers.
        self._py_driver_sys_path = _py_driver_sys_path or []
        # Python logging configurations that will be passed to Ray tasks/actors.
        self.py_logging_config = None
    def _serialize(self) -> str:
        """Serialize the struct into protobuf string"""
        return self._get_proto_job_config().SerializeToString()
[docs]
    def set_runtime_env(
        self,
        runtime_env: Optional[Union[Dict[str, Any], "RuntimeEnv"]],
        validate: bool = False,
    ) -> None:
        """Modify the runtime_env of the JobConfig.
        We don't validate the runtime_env by default here because it may go
        through some translation before actually being passed to C++ (e.g.,
        working_dir translated from a local directory to a URI).
        Args:
            runtime_env: A :ref:`runtime environment <runtime-environments>` dictionary.
            validate: Whether to validate the runtime env.
        """
        self.runtime_env = runtime_env if runtime_env is not None else {}
        if validate:
            self.runtime_env = self._validate_runtime_env()
        self._cached_pb = None 
[docs]
    def set_py_logging_config(
        self,
        logging_config: Optional[LoggingConfig] = None,
    ):
        """Set the logging configuration for the job.
        The logging configuration will be applied to the root loggers of
        all Ray task and actor processes that belong to this job.
        Args:
            logging_config: The logging configuration to set.
        """
        self.py_logging_config = logging_config 
[docs]
    def set_ray_namespace(self, ray_namespace: str) -> None:
        """Set Ray :ref:`namespace <namespaces-guide>`.
        Args:
            ray_namespace: The namespace to set.
        """
        if ray_namespace != self.ray_namespace:
            self.ray_namespace = ray_namespace
            self._cached_pb = None 
[docs]
    def set_default_actor_lifetime(self, default_actor_lifetime: str) -> None:
        """Set the default actor lifetime, which can be "detached" or "non_detached".
        See :ref:`actor lifetimes <actor-lifetimes>` for more details.
        Args:
            default_actor_lifetime: The default actor lifetime to set.
        """
        import ray.core.generated.common_pb2 as common_pb2
        if default_actor_lifetime == "detached":
            self._default_actor_lifetime = common_pb2.JobConfig.ActorLifetime.DETACHED
        elif default_actor_lifetime == "non_detached":
            self._default_actor_lifetime = (
                common_pb2.JobConfig.ActorLifetime.NON_DETACHED
            )
        else:
            raise ValueError(
                "Default actor lifetime must be one of `detached`, `non_detached`"
            ) 
    def _validate_runtime_env(self):
        # TODO(edoakes): this is really unfortunate, but JobConfig is imported
        # all over the place so this causes circular imports. We should remove
        # this dependency and pass in a validated runtime_env instead.
        from ray.runtime_env import RuntimeEnv
        if isinstance(self.runtime_env, RuntimeEnv):
            return self.runtime_env
        return RuntimeEnv(**self.runtime_env)
    def _get_proto_job_config(self):
        """Return the protobuf structure of JobConfig."""
        # TODO(edoakes): this is really unfortunate, but JobConfig is imported
        # all over the place so this causes circular imports. We should remove
        # this dependency and pass in a validated runtime_env instead.
        import ray.core.generated.common_pb2 as common_pb2
        from ray._private.utils import get_runtime_env_info
        if self._cached_pb is None:
            pb = common_pb2.JobConfig()
            if self.ray_namespace is None:
                pb.ray_namespace = str(uuid.uuid4())
            else:
                pb.ray_namespace = self.ray_namespace
            pb.jvm_options.extend(self.jvm_options)
            pb.code_search_path.extend(self.code_search_path)
            pb.py_driver_sys_path.extend(self._py_driver_sys_path)
            for k, v in self.metadata.items():
                pb.metadata[k] = v
            parsed_env = self._validate_runtime_env()
            pb.runtime_env_info.CopyFrom(
                get_runtime_env_info(
                    parsed_env,
                    is_job_runtime_env=True,
                    serialize=False,
                )
            )
            if self._default_actor_lifetime is not None:
                pb.default_actor_lifetime = self._default_actor_lifetime
            if self.py_logging_config:
                pb.serialized_py_logging_config = pickle.dumps(self.py_logging_config)
            self._cached_pb = pb
        return self._cached_pb
    def _runtime_env_has_working_dir(self):
        return self._validate_runtime_env().has_working_dir()
    def _get_serialized_runtime_env(self) -> str:
        """Return the JSON-serialized parsed runtime env dict"""
        return self._validate_runtime_env().serialize()
    def _get_proto_runtime_env_config(self) -> str:
        """Return the JSON-serialized parsed runtime env info"""
        return self._get_proto_job_config().runtime_env_info.runtime_env_config
[docs]
    @classmethod
    def from_json(cls, job_config_json):
        """Generates a JobConfig object from json.
        Examples:
            .. testcode::
                from ray.job_config import JobConfig
                job_config = JobConfig.from_json(
                    {"runtime_env": {"working_dir": "uri://abc"}})
        Args:
            job_config_json: The job config json dictionary.
        """
        return cls(
            jvm_options=job_config_json.get("jvm_options", None),
            code_search_path=job_config_json.get("code_search_path", None),
            runtime_env=job_config_json.get("runtime_env", None),
            metadata=job_config_json.get("metadata", None),
            ray_namespace=job_config_json.get("ray_namespace", None),
            _client_job=job_config_json.get("client_job", False),
            _py_driver_sys_path=job_config_json.get("py_driver_sys_path", None),
        )