diff --git a/src/prefect/flow_engine.py b/src/prefect/flow_engine.py index 26b90b6eaeb2..b0eb0ec91e35 100644 --- a/src/prefect/flow_engine.py +++ b/src/prefect/flow_engine.py @@ -602,6 +602,7 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None): self.client.set_flow_run_name( flow_run_id=self.flow_run.id, name=flow_run_name ) + self.logger.extra["flow_run_name"] = flow_run_name self.logger.debug( f"Renamed flow run {self.flow_run.name!r} to {flow_run_name!r}" @@ -609,6 +610,8 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None): self.flow_run.name = flow_run_name self._flow_run_name_set = True + self._telemetry.update_run_name(name=flow_run_name) + if self.flow_run.parent_task_run_id: _logger = get_run_logger(FlowRunContext.get()) run_type = "subflow" @@ -655,7 +658,6 @@ def initialize_run(self): ) self._telemetry.start_span( - name=self.flow.name, run=self.flow_run, client=self.client, parameters=self.parameters, @@ -1156,6 +1158,7 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None): self.logger = flow_run_logger(flow_run=self.flow_run, flow=self.flow) # update the flow run name if necessary + if not self._flow_run_name_set and self.flow.flow_run_name: flow_run_name = resolve_custom_flow_run_name( flow=self.flow, parameters=self.parameters @@ -1170,6 +1173,7 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None): self.flow_run.name = flow_run_name self._flow_run_name_set = True + self._telemetry.update_run_name(name=flow_run_name) if self.flow_run.parent_task_run_id: _logger = get_run_logger(FlowRunContext.get()) run_type = "subflow" @@ -1222,7 +1226,6 @@ async def initialize_run(self): ) await self._telemetry.async_start_span( - name=self.flow.name, run=self.flow_run, client=self.client, parameters=self.parameters, diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index 3ee292e3a9d0..4a385d227615 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -208,6 +208,7 @@ def _set_custom_task_run_name(self): ) self.task_run.name = task_run_name self._task_name_set = True + self._telemetry.update_run_name(name=task_run_name) def _wait_for_dependencies(self): if not self.wait_for: @@ -686,7 +687,6 @@ def initialize_run( self._telemetry.start_span( run=self.task_run, - name=self.task.name, client=self.client, parameters=self.parameters, ) @@ -1215,7 +1215,6 @@ async def initialize_run( await self._telemetry.async_start_span( run=self.task_run, - name=self.task.name, client=self.client, parameters=self.parameters, ) diff --git a/src/prefect/telemetry/run_telemetry.py b/src/prefect/telemetry/run_telemetry.py index fe2a7bfeff94..cb43719841cf 100644 --- a/src/prefect/telemetry/run_telemetry.py +++ b/src/prefect/telemetry/run_telemetry.py @@ -53,10 +53,9 @@ async def async_start_span( self, run: FlowOrTaskRun, client: PrefectClient, - name: Optional[str] = None, parameters: Optional[dict[str, Any]] = None, ): - traceparent, span = self._start_span(run, name, parameters) + traceparent, span = self._start_span(run, parameters) if self._run_type(run) == "flow" and traceparent: # Only explicitly update labels if the run is a flow as task runs @@ -71,10 +70,9 @@ def start_span( self, run: FlowOrTaskRun, client: SyncPrefectClient, - name: Optional[str] = None, parameters: Optional[dict[str, Any]] = None, ): - traceparent, span = self._start_span(run, name, parameters) + traceparent, span = self._start_span(run, parameters) if self._run_type(run) == "flow" and traceparent: # Only explicitly update labels if the run is a flow as task runs @@ -86,7 +84,6 @@ def start_span( def _start_span( self, run: FlowOrTaskRun, - name: Optional[str] = None, parameters: Optional[dict[str, Any]] = None, ) -> tuple[Optional[str], Span]: """ @@ -117,10 +114,10 @@ def _start_span( run_type = self._run_type(run) self.span = self._tracer.start_span( - name=name or run.name, + name=run.name, context=context, attributes={ - "prefect.run.name": name or run.name, + "prefect.run.name": run.name, "prefect.run.type": run_type, "prefect.run.id": str(run.id), "prefect.tags": run.tags, @@ -198,6 +195,14 @@ def update_state(self, new_state: State) -> None: }, ) + def update_run_name(self, name: str) -> None: + """ + Update the name of the run. + """ + if self.span: + self.span.update_name(name=name) + self.span.set_attribute("prefect.run.name", name) + def _parent_run(self) -> Union[FlowOrTaskRun, None]: """ Identify the "parent run" for the current execution context. diff --git a/tests/deployment/test_flow_runs.py b/tests/deployment/test_flow_runs.py index 388e2719d22c..d599e4682d5c 100644 --- a/tests/deployment/test_flow_runs.py +++ b/tests/deployment/test_flow_runs.py @@ -446,7 +446,7 @@ async def test_propagates_otel_trace_to_deployment_flow_run( """Test that OTEL trace context gets propagated from parent flow to deployment flow run""" deployment = test_deployment - @flow(name="child-flow") + @flow(flow_run_name="child-flow") async def child_flow() -> None: pass @@ -457,7 +457,7 @@ async def child_flow() -> None: ) deployment = await prefect_client.read_deployment(deployment_id) - @flow(name="parent-flow") + @flow(flow_run_name="parent-flow") async def parent_flow(): return await run_deployment( f"foo/{deployment.name}", diff --git a/tests/telemetry/test_run_telemetry.py b/tests/telemetry/test_run_telemetry.py index 25de56533da4..5c41bc075e5b 100644 --- a/tests/telemetry/test_run_telemetry.py +++ b/tests/telemetry/test_run_telemetry.py @@ -120,11 +120,11 @@ async def test_flow_run_creates_and_stores_traceparent( instrumentation: InstrumentationTester, sync_prefect_client: SyncPrefectClient, ): - @flow(name="the-flow") + @flow(flow_run_name="the-flow") async def async_flow(): pass - @flow(name="the-flow") + @flow(flow_run_name="the-flow") def sync_flow(): pass @@ -149,11 +149,11 @@ async def test_flow_run_instrumentation( engine_type: Literal["async", "sync"], instrumentation: InstrumentationTester, ): - @flow(name="instrumented-flow") + @flow(flow_run_name="instrumented-flow") async def async_flow() -> str: return "hello" - @flow(name="instrumented-flow") + @flow(flow_run_name="instrumented-flow") def sync_flow() -> str: return "hello" @@ -277,28 +277,28 @@ async def test_span_links( engine_type: Literal["async", "sync"], instrumentation: InstrumentationTester, ): - @task + @task(task_run_name="produces42") def produces42() -> int: return 42 if engine_type == "async": - @task + @task(task_run_name="async_task") async def async_task(x: int, y: int): return x + y - @flow + @flow(flow_run_name="async-flow") async def async_flow(): await async_task(x=produces42.submit(), y=2) await async_flow() else: - @task + @task(task_run_name="sync_task") def sync_task(x: int, y: int): return x + y - @flow + @flow(flow_run_name="sync-flow") def sync_flow(): sync_task(x=produces42.submit(), y=2) @@ -427,27 +427,27 @@ async def test_nested_flow_task_task( instrumentation: InstrumentationTester, sync_prefect_client: SyncPrefectClient, ): - @flow(name="parent-flow") + @flow(flow_run_name="parent-flow") async def my_async_flow(): await async_child_task() - @flow(name="parent-flow") + @flow(flow_run_name="parent-flow") def my_sync_flow(): sync_child_task() - @task(name="child-task") + @task(task_run_name="child-task") def sync_child_task(): sync_grandchild_task() - @task(name="child-task") + @task(task_run_name="child-task") async def async_child_task(): await async_grandchild_task() - @task(name="grandchild-task") + @task(task_run_name="grandchild-task") def sync_grandchild_task(): pass - @task(name="grandchild-task") + @task(task_run_name="grandchild-task") async def async_grandchild_task(): pass @@ -489,27 +489,27 @@ async def test_nested_flow_task_flow( instrumentation: InstrumentationTester, sync_prefect_client: SyncPrefectClient, ): - @flow(name="parent-flow") + @flow(flow_run_name="parent-flow") async def my_async_flow(): await async_child_task() - @flow(name="parent-flow") + @flow(flow_run_name="parent-flow") def my_sync_flow(): sync_child_task() - @task(name="child-task") + @task(task_run_name="child-task") def sync_child_task(): sync_grandchild_flow() - @task(name="child-task") + @task(task_run_name="child-task") async def async_child_task(): await async_grandchild_flow() - @flow(name="grandchild-flow") + @flow(flow_run_name="grandchild-flow") def sync_grandchild_flow(): pass - @flow(name="grandchild-flow") + @flow(flow_run_name="grandchild-flow") async def async_grandchild_flow(): pass @@ -550,27 +550,27 @@ async def test_nested_task_flow_task( engine_type: Literal["async", "sync"], instrumentation: InstrumentationTester, ): - @task(name="parent-task") + @task(task_run_name="parent-task") async def async_parent_task(): await async_child_flow() - @task(name="parent-task") + @task(task_run_name="parent-task") def sync_parent_task(): sync_child_flow() - @flow(name="child-flow") + @flow(flow_run_name="child-flow") def sync_child_flow(): sync_grandchild_task() - @flow(name="child-flow") + @flow(flow_run_name="child-flow") async def async_child_flow(): await async_grandchild_task() - @task(name="grandchild-task") + @task(task_run_name="grandchild-task") def sync_grandchild_task(): pass - @task(name="grandchild-task") + @task(task_run_name="grandchild-task") async def async_grandchild_task(): pass @@ -612,27 +612,27 @@ async def test_nested_task_flow_flow( engine_type: Literal["async", "sync"], instrumentation: InstrumentationTester, ): - @task(name="parent-task") + @task(task_run_name="parent-task") async def async_parent_task(): await async_child_flow() - @task(name="parent-task") + @task(task_run_name="parent-task") def sync_parent_task(): sync_child_flow() - @flow(name="child-flow") + @flow(flow_run_name="child-flow") def sync_child_flow(): sync_grandchild_flow() - @flow(name="child-flow") + @flow(flow_run_name="child-flow") async def async_child_flow(): await async_grandchild_flow() - @flow(name="grandchild-flow") + @flow(flow_run_name="grandchild-flow") def sync_grandchild_flow(): pass - @flow(name="grandchild-flow") + @flow(flow_run_name="grandchild-flow") async def async_grandchild_flow(): pass @@ -670,27 +670,27 @@ async def test_nested_flow_flow_task( instrumentation: InstrumentationTester, sync_prefect_client: SyncPrefectClient, ): - @flow(name="parent-flow") + @flow(flow_run_name="parent-flow") async def async_parent_flow(): await async_child_flow() - @flow(name="parent-flow") + @flow(flow_run_name="parent-flow") def sync_parent_flow(): sync_child_flow() - @flow(name="child-flow") + @flow(flow_run_name="child-flow") async def async_child_flow(): await async_grandchild_task() - @flow(name="child-flow") + @flow(flow_run_name="child-flow") def sync_child_flow(): sync_grandchild_task() - @task(name="grandchild-task") + @task(task_run_name="grandchild-task") async def async_grandchild_task(): pass - @task(name="grandchild-task") + @task(task_run_name="grandchild-task") def sync_grandchild_task(): pass @@ -732,27 +732,27 @@ async def test_nested_flow_flow_flow( instrumentation: InstrumentationTester, sync_prefect_client: SyncPrefectClient, ): - @flow(name="parent-flow") + @flow(flow_run_name="parent-flow") async def async_parent_flow(): await async_child_flow() - @flow(name="parent-flow") + @flow(flow_run_name="parent-flow") def sync_parent_flow(): sync_child_flow() - @flow(name="child-flow") + @flow(flow_run_name="child-flow") async def async_child_flow(): await async_grandchild_flow() - @flow(name="child-flow") + @flow(flow_run_name="child-flow") def sync_child_flow(): sync_grandchild_flow() - @flow(name="grandchild-flow") + @flow(flow_run_name="grandchild-flow") async def async_grandchild_flow(): pass - @flow(name="grandchild-flow") + @flow(flow_run_name="grandchild-flow") def sync_grandchild_flow(): pass @@ -793,27 +793,27 @@ async def test_nested_task_task_task( engine_type: Literal["async", "sync"], instrumentation: InstrumentationTester, ): - @task(name="parent-task") + @task(task_run_name="parent-task") async def async_parent_task(): await async_child_task() - @task(name="parent-task") + @task(task_run_name="parent-task") def sync_parent_task(): sync_child_task() - @task(name="child-task") + @task(task_run_name="child-task") async def async_child_task(): await async_grandchild_task() - @task(name="child-task") + @task(task_run_name="child-task") def sync_child_task(): sync_grandchild_task() - @task(name="grandchild-task") + @task(task_run_name="grandchild-task") async def async_grandchild_task(): pass - @task(name="grandchild-task") + @task(task_run_name="grandchild-task") def sync_grandchild_task(): pass @@ -854,27 +854,27 @@ async def test_nested_task_task_flow( engine_type: Literal["async", "sync"], instrumentation: InstrumentationTester, ): - @task(name="parent-task") + @task(task_run_name="parent-task") async def async_parent_task(): await async_child_task() - @task(name="parent-task") + @task(task_run_name="parent-task") def sync_parent_task(): sync_child_task() - @task(name="child-task") + @task(task_run_name="child-task") async def async_child_task(): await async_grandchild_flow() - @task(name="child-task") + @task(task_run_name="child-task") def sync_child_task(): sync_grandchild_flow() - @flow(name="grandchild-flow") + @flow(flow_run_name="grandchild-flow") async def async_grandchild_flow(): pass - @flow(name="grandchild-flow") + @flow(flow_run_name="grandchild-flow") def sync_grandchild_flow(): pass @@ -909,3 +909,37 @@ def sync_grandchild_flow(): # The grandchild span should have the `child_span` as its parent assert grandchild_span.parent is not None assert grandchild_span.parent.span_id == child_span.context.span_id + + +async def test_span_name_with_string_template( + engine_type: Literal["async", "sync"], + instrumentation: InstrumentationTester, +): + """Test that spans use the formatted name when string templates are used""" + test_value = "template-test" + + @task(task_run_name=f"task-{test_value}") + async def async_task(value: str): + return value + + @task(task_run_name=f"task-{test_value}") + def sync_task(value: str): + return value + + task_fn = async_task if engine_type == "async" else sync_task + task_run_id = uuid4() + + await run_task( + task_fn, + task_run_id=task_run_id, + parameters={"value": test_value}, + engine_type=engine_type, + ) + + spans = instrumentation.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + + expected_name = f"task-{test_value}" + assert span.name == expected_name + assert span.attributes["prefect.run.name"] == expected_name