From be80d840916b77955cc7eba0ead747e276a99708 Mon Sep 17 00:00:00 2001 From: kaihsun Date: Sat, 9 Nov 2024 20:05:54 +0000 Subject: [PATCH 1/2] update Signed-off-by: kaihsun --- python/ray/_private/node.py | 12 +++++++++++ python/ray/tests/test_logging.py | 36 ++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 156ecd0be606..98d9f2b89efc 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -1078,6 +1078,18 @@ def start_reaper_process(self): def start_log_monitor(self): """Start the log monitor.""" + filename = ray_constants.LOG_MONITOR_LOG_FILE_NAME + file_path = os.path.join(self._logs_dir, filename) + # Avoid launching multiple log monitors on a single host. + # This can happen if the user starts multiple Ray nodes on the same host. + if os.path.isfile(file_path): + logger.debug( + f"File {file_path} exists, not starting log monitor again. " + "This can happen if the user starts multiple Ray nodes on " + "the same host." + ) + return + # Only redirect logs to .err. .err file is only useful when the # component has an unexpected output to stdout/stderr. _, stderr_file = self.get_log_file_handles( diff --git a/python/ray/tests/test_logging.py b/python/ray/tests/test_logging.py index 8ab94394ba06..b9d27fe9deff 100644 --- a/python/ray/tests/test_logging.py +++ b/python/ray/tests/test_logging.py @@ -11,6 +11,7 @@ from pathlib import Path from typing import Dict from unittest.mock import Mock, MagicMock, patch +from ray.cluster_utils import AutoscalingCluster import colorama import pytest @@ -1058,6 +1059,41 @@ def test_print_worker_logs_multi_color() -> None: ) +def test_multiple_ray_nodes_on_same_host(shutdown_only): + # Test that launching multiple Ray nodes on the same host will + # not produce redundant logs. Use `AutoScalingCluster` to launch + # the cluster instead of `ray_start_cluster`, as the latter only + # starts certain processes for worker nodes. For example, LogMonitor + # is not launched. + cluster = AutoscalingCluster( + head_resources={"CPU": 1}, + worker_node_types={ + "type-1": { + "resources": {"CPU": 1}, + "node_config": {}, + "min_workers": 1, + "max_workers": 1, + }, + }, + autoscaler_v2=True, + ) + try: + cluster.start() + script = """ +import ray +ray.init() +@ray.remote +def f(): + print("hello world") +ray.get(f.remote()) +""" + stderr = run_string_as_driver(script) + assert stderr.count("hello world") == 1 + finally: + ray.shutdown() + cluster.shutdown() + + if __name__ == "__main__": import sys From 764a8c2dfa0f438167e3877101f4fcbf8ac418e5 Mon Sep 17 00:00:00 2001 From: kaihsun Date: Mon, 18 Nov 2024 01:18:45 +0000 Subject: [PATCH 2/2] update Signed-off-by: kaihsun --- python/ray/_private/node.py | 33 ++++++++++++++++++---------- python/ray/_private/ray_constants.py | 3 +++ python/ray/tests/test_logging.py | 17 +++++++++++--- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 98d9f2b89efc..040f37935ebc 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -495,6 +495,15 @@ def _init_temp(self): ) try_to_create_directory(self._runtime_env_dir) + # Create a file named created_by_head to indicate that this session + # directory was created by the head node. + if self.head: + created_by_head = os.path.join( + self._logs_dir, ray_constants.CREATED_BY_HEAD_FILE_NAME + ) + with open(created_by_head, "w") as f: + f.write("This session directory was created by the head node.") + def _get_node_labels(self): def merge_labels(env_override_labels, params_labels): """Merges two dictionaries, picking from the @@ -1078,17 +1087,19 @@ def start_reaper_process(self): def start_log_monitor(self): """Start the log monitor.""" - filename = ray_constants.LOG_MONITOR_LOG_FILE_NAME - file_path = os.path.join(self._logs_dir, filename) - # Avoid launching multiple log monitors on a single host. - # This can happen if the user starts multiple Ray nodes on the same host. - if os.path.isfile(file_path): - logger.debug( - f"File {file_path} exists, not starting log monitor again. " - "This can happen if the user starts multiple Ray nodes on " - "the same host." - ) - return + if not self.head: + filename = ray_constants.CREATED_BY_HEAD_FILE_NAME + file_path = os.path.join(self._logs_dir, filename) + # Avoid launching multiple log monitors on a single host. + # This can happen if the user starts multiple Ray nodes on the same host. + if os.path.isfile(file_path): + logger.debug( + f"File {file_path} exists, indicating that the session directory " + "was created by the head node. This worker node is colocated with " + "the head node, so the log monitor should not be launched again to " + "avoid duplicate log entries." + ) + return # Only redirect logs to .err. .err file is only useful when the # component has an unexpected output to stdout/stderr. diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 707d68ec046a..0f025cca6d1d 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -262,6 +262,9 @@ def env_set_by_user(key): MONITOR_LOG_FILE_NAME = f"{PROCESS_TYPE_MONITOR}.log" LOG_MONITOR_LOG_FILE_NAME = f"{PROCESS_TYPE_LOG_MONITOR}.log" +# If the file exists, the session directory was created by the head node. +CREATED_BY_HEAD_FILE_NAME = "created_by_head" + # Enable log deduplication. RAY_DEDUP_LOGS = env_bool("RAY_DEDUP_LOGS", True) diff --git a/python/ray/tests/test_logging.py b/python/ray/tests/test_logging.py index b9d27fe9deff..8b3f6d818cab 100644 --- a/python/ray/tests/test_logging.py +++ b/python/ray/tests/test_logging.py @@ -1059,7 +1059,14 @@ def test_print_worker_logs_multi_color() -> None: ) -def test_multiple_ray_nodes_on_same_host(shutdown_only): +@pytest.mark.parametrize( + "override_env", + ( + {"RAY_LOG_TO_STDERR": "0"}, + {"RAY_LOG_TO_STDERR": "1"}, + ), +) +def test_multiple_ray_nodes_on_same_host(shutdown_only, override_env): # Test that launching multiple Ray nodes on the same host will # not produce redundant logs. Use `AutoScalingCluster` to launch # the cluster instead of `ray_start_cluster`, as the latter only @@ -1078,7 +1085,7 @@ def test_multiple_ray_nodes_on_same_host(shutdown_only): autoscaler_v2=True, ) try: - cluster.start() + cluster.start(override_env=override_env) script = """ import ray ray.init() @@ -1088,7 +1095,11 @@ def f(): ray.get(f.remote()) """ stderr = run_string_as_driver(script) - assert stderr.count("hello world") == 1 + assert "RAY_LOG_TO_STDERR" in override_env + if override_env["RAY_LOG_TO_STDERR"] == "1": + assert stderr.count("hello world") == 0 + else: + assert stderr.count("hello world") == 1 finally: ray.shutdown() cluster.shutdown()