From a31b3773572762fbe4765443b9ce1243b434d54f Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Mon, 11 Mar 2024 12:23:21 -0400 Subject: [PATCH] estuary-cdk: Add `restart_interval` override to allow connectors with known long-running tasks to restart less frequently --- estuary-cdk/estuary_cdk/capture/__init__.py | 8 +++++++- estuary-cdk/estuary_cdk/flow.py | 19 ++++++++++++++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/estuary-cdk/estuary_cdk/capture/__init__.py b/estuary-cdk/estuary_cdk/capture/__init__.py index b0a8476e84..6efd90cf12 100644 --- a/estuary-cdk/estuary_cdk/capture/__init__.py +++ b/estuary-cdk/estuary_cdk/capture/__init__.py @@ -262,7 +262,13 @@ async def stop_on_elapsed_interval(interval: int) -> None: # Gracefully exit after the capture interval has elapsed. # We don't do this within the TaskGroup because we don't # want to block on it. - asyncio.create_task(stop_on_elapsed_interval(open.capture.intervalSeconds)) + asyncio.create_task( + stop_on_elapsed_interval( + int(open.capture.config.restart_interval.total_seconds()) + if open.capture.config.restart_interval + else open.capture.intervalSeconds + ) + ) async with asyncio.TaskGroup() as tg: diff --git a/estuary-cdk/estuary_cdk/flow.py b/estuary-cdk/estuary_cdk/flow.py index c0334e8776..a0c5a181c4 100644 --- a/estuary-cdk/estuary_cdk/flow.py +++ b/estuary-cdk/estuary_cdk/flow.py @@ -1,10 +1,23 @@ import abc from dataclasses import dataclass -from pydantic import BaseModel, NonNegativeInt, PositiveInt -from typing import Any, Literal, TypeVar, Generic, Literal +from datetime import timedelta +from pydantic import BaseModel, Field, NonNegativeInt, PositiveInt +from typing import Any, Literal, Optional, TypeVar, Generic, Literal from .pydantic_polyfill import GenericModel + +class BaseEndpointConfig(abc.ABC, BaseModel, extra="forbid"): + """ + BaseEndpointConfig defines the endpoint config attribute(s) shared by all connectors. + """ + + # If unset, use the default `intervalSeconds` provided as part of the Open request + restart_interval: Optional[timedelta] = Field( + default=None, description="How long before the connector restarts automatically" + ) + + # The type of this invoked connector. ConnectorType = Literal[ "IMAGE", # We're running with the context of a container image. @@ -12,7 +25,7 @@ ] # Generic type of a connector's endpoint configuration. -EndpointConfig = TypeVar("EndpointConfig") +EndpointConfig = TypeVar("EndpointConfig", bound=BaseEndpointConfig) # Generic type of a connector's resource configuration. ResourceConfig = TypeVar("ResourceConfig", bound=BaseModel)