Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][observability] Logs are duplicated if multiple nodes are running on same machine #48676

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,15 @@ def _init_temp(self):
)
try_to_create_directory(self._runtime_env_dir)

# Create a file named created_by_head to indicate that this session
# directory was created by the head node.
if self.head:
created_by_head = os.path.join(
self._logs_dir, ray_constants.CREATED_BY_HEAD_FILE_NAME
)
with open(created_by_head, "w") as f:
f.write("This session directory was created by the head node.")

def _get_node_labels(self):
def merge_labels(env_override_labels, params_labels):
"""Merges two dictionaries, picking from the
Expand Down Expand Up @@ -1078,6 +1087,20 @@ def start_reaper_process(self):

def start_log_monitor(self):
"""Start the log monitor."""
if not self.head:
filename = ray_constants.CREATED_BY_HEAD_FILE_NAME
file_path = os.path.join(self._logs_dir, filename)
# Avoid launching multiple log monitors on a single host.
# This can happen if the user starts multiple Ray nodes on the same host.
if os.path.isfile(file_path):
logger.debug(
f"File {file_path} exists, indicating that the session directory "
"was created by the head node. This worker node is colocated with "
"the head node, so the log monitor should not be launched again to "
"avoid duplicate log entries."
)
return

# Only redirect logs to .err. .err file is only useful when the
# component has an unexpected output to stdout/stderr.
_, stderr_file = self.get_log_file_handles(
Expand Down
3 changes: 3 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ def env_set_by_user(key):
MONITOR_LOG_FILE_NAME = f"{PROCESS_TYPE_MONITOR}.log"
LOG_MONITOR_LOG_FILE_NAME = f"{PROCESS_TYPE_LOG_MONITOR}.log"

# If the file exists, the session directory was created by the head node.
CREATED_BY_HEAD_FILE_NAME = "created_by_head"

# Enable log deduplication.
RAY_DEDUP_LOGS = env_bool("RAY_DEDUP_LOGS", True)

Expand Down
47 changes: 47 additions & 0 deletions python/ray/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pathlib import Path
from typing import Dict
from unittest.mock import Mock, MagicMock, patch
from ray.cluster_utils import AutoscalingCluster

import colorama
import pytest
Expand Down Expand Up @@ -1058,6 +1059,52 @@ def test_print_worker_logs_multi_color() -> None:
)


@pytest.mark.parametrize(
"override_env",
(
{"RAY_LOG_TO_STDERR": "0"},
{"RAY_LOG_TO_STDERR": "1"},
),
)
def test_multiple_ray_nodes_on_same_host(shutdown_only, override_env):
# Test that launching multiple Ray nodes on the same host will
# not produce redundant logs. Use `AutoScalingCluster` to launch
# the cluster instead of `ray_start_cluster`, as the latter only
# starts certain processes for worker nodes. For example, LogMonitor
# is not launched.
cluster = AutoscalingCluster(
head_resources={"CPU": 1},
worker_node_types={
"type-1": {
"resources": {"CPU": 1},
"node_config": {},
"min_workers": 1,
"max_workers": 1,
},
},
autoscaler_v2=True,
)
try:
cluster.start(override_env=override_env)
script = """
import ray
ray.init()
@ray.remote
def f():
print("hello world")
ray.get(f.remote())
"""
stderr = run_string_as_driver(script)
assert "RAY_LOG_TO_STDERR" in override_env
if override_env["RAY_LOG_TO_STDERR"] == "1":
assert stderr.count("hello world") == 0
else:
assert stderr.count("hello world") == 1
finally:
ray.shutdown()
cluster.shutdown()


if __name__ == "__main__":
import sys

Expand Down