ray.rllib.utils.metrics.metrics_logger.MetricsLogger.merge_and_log_n_dicts#
- MetricsLogger.merge_and_log_n_dicts(stats_dicts: List[Dict[str, Any]], *, key: str | Tuple[str, ...] | None = None, reduce: str | None = 'mean', window: int | float | None = None, ema_coeff: float | None = None, clear_on_reduce: bool = False) None[source]#
- Merges n dicts, generated by n parallel components, and logs the results. - from ray.rllib.utils.metrics.metrics_logger import MetricsLogger from ray.rllib.utils.test_utils import check # Example: n Learners logging loss stats to be merged. # Note that losses should usually be logged with a window=1 so they don't # get smeared over time and instead provide an accurate picture of the # current situation. main_logger = MetricsLogger() logger_learner1 = MetricsLogger() logger_learner1.log_value("loss", 0.1, window=1) learner1_results = logger_learner1.reduce() logger_learner2 = MetricsLogger() logger_learner2.log_value("loss", 0.2, window=1) learner2_results = logger_learner2.reduce() # Merge the stats from both Learners. main_logger.merge_and_log_n_dicts( [learner1_results, learner2_results], key="learners", ) check(main_logger.peek(("learners", "loss")), 0.15) # Example: m EnvRunners logging episode returns to be merged. main_logger = MetricsLogger() logger_env_runner1 = MetricsLogger() logger_env_runner1.log_value("mean_ret", 100.0, window=3) logger_env_runner1.log_value("mean_ret", 200.0) logger_env_runner1.log_value("mean_ret", 300.0) logger_env_runner1.log_value("mean_ret", 400.0) env_runner1_results = logger_env_runner1.reduce() logger_env_runner2 = MetricsLogger() logger_env_runner2.log_value("mean_ret", 150.0, window=3) logger_env_runner2.log_value("mean_ret", 250.0) logger_env_runner2.log_value("mean_ret", 350.0) logger_env_runner2.log_value("mean_ret", 450.0) env_runner2_results = logger_env_runner2.reduce() # Merge the stats from both EnvRunners. main_logger.merge_and_log_n_dicts( [env_runner1_results, env_runner2_results], key="env_runners", ) # The expected procedure is as follows: # The individual internal values lists of the two loggers are as follows: # env runner 1: [100, 200, 300, 400] # env runner 2: [150, 250, 350, 450] # Move backwards from index=-1 (each time, loop through both env runners) # index=-1 -> [400, 450] -> reduce-mean -> [425] -> repeat 2 times (number # of env runners) -> [425, 425] # index=-2 -> [300, 350] -> reduce-mean -> [325] -> repeat 2 times # -> append -> [425, 425, 325, 325] -> STOP b/c we have reached >= window. # reverse the list -> [325, 325, 425, 425] check( main_logger.stats["env_runners"]["mean_ret"].values, [325, 325, 425, 425], ) check(main_logger.peek(("env_runners", "mean_ret")), (325 + 425 + 425) / 3) # Example: Lifetime sum over n parallel components' stats. main_logger = MetricsLogger() logger1 = MetricsLogger() logger1.log_value("some_stat", 50, reduce="sum", window=None) logger1.log_value("some_stat", 25, reduce="sum", window=None) logger1_results = logger1.reduce() logger2 = MetricsLogger() logger2.log_value("some_stat", 75, reduce="sum", window=None) logger2_results = logger2.reduce() # Merge the stats from both Learners. main_logger.merge_and_log_n_dicts([logger1_results, logger2_results]) check(main_logger.peek("some_stat"), 150) # Example: Sum over n parallel components' stats with a window of 3. main_logger = MetricsLogger() logger1 = MetricsLogger() logger1.log_value("some_stat", 50, reduce="sum", window=3) logger1.log_value("some_stat", 25, reduce="sum") logger1.log_value("some_stat", 10, reduce="sum") logger1.log_value("some_stat", 5, reduce="sum") logger1_results = logger1.reduce() logger2 = MetricsLogger() logger2.log_value("some_stat", 75, reduce="sum", window=3) logger2.log_value("some_stat", 100, reduce="sum") logger2_results = logger2.reduce() # Merge the stats from both Learners. main_logger.merge_and_log_n_dicts([logger1_results, logger2_results]) # The expected procedure is as follows: # The individual internal values lists of the two loggers are as follows: # env runner 1: [50, 25, 10, 5] # env runner 2: [75, 100] # Move backwards from index=-1 (each time, loop through both loggers) # index=-1 -> [5, 100] -> leave as-is, b/c we are sum'ing -> [5, 100] # index=-2 -> [10, 75] -> leave as-is -> [5, 100, 10, 75] -> STOP b/c we # have reached >= window. # reverse the list -> [75, 10, 100, 5] check(main_logger.peek("some_stat"), 115) # last 3 items (window) get sum'd - Parameters:
- stats_dicts – List of n stats dicts to be merged and then logged. 
- key – Optional top-level key under which to log all keys/key sequences found in the n - stats_dicts.
- reduce – The reduction method to apply, once - self.reduce()is called. If None, will collect all logged values under- keyin a list (and also return that list upon calling- self.reduce()).
- window – An optional window size to reduce over. If not None, then the reduction operation is only applied to the most recent - windowitems, and - after reduction - the internal values list under- keyis shortened to hold at most- windowitems (the most recent ones). Must be None if- ema_coeffis provided. If None (and- ema_coeffis None), reduction must not be “mean”.
- ema_coeff – An optional EMA coefficient to use if - reduceis “mean” and no- windowis provided. Note that if both- windowand- ema_coeffare provided, an error is thrown. Also, if- ema_coeffis provided,- reducemust be “mean”. The reduction formula for EMA is: EMA(t1) = (1.0 - ema_coeff) * EMA(t0) + ema_coeff * new_value
- clear_on_reduce – If True, all values under - keywill be emptied after- self.reduce()is called. Setting this to True is useful for cases, in which the internal values list would otherwise grow indefinitely, for example if reduce is None and there is no- windowprovided.