Skip to content

Commit

Permalink
router: shutdown via endpoints, not channels
Browse files Browse the repository at this point in the history
Add a .do_close() virtual method at the Endpoint level.  This is already
implemented by Channel, but let's also add an implementation of it to
Peer which shuts down the Peer in the usual way.

When the bridge gets EOF we can now call this method on all endpoints
instead of creating synthetic close messages for each channel.  We also
stop tracking open channels for knowing when the shutdown is complete,
and track endpoints instead (which is a strictly tighter constraint, as
it includes endpoints with no open channels).

There's the small matter of the increasingly redundant shutdown() method
on routing rules, but we can clean that up soon.
  • Loading branch information
allisonkarlitskaya committed Nov 10, 2023
1 parent b1a49ac commit d1b4f18
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
3 changes: 3 additions & 0 deletions src/cockpit/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ def do_kill(self, host: Optional[str], group: Optional[str]) -> None:
assert self.init_future is None
self.write_control(command='kill', host=host, group=group)

def do_close(self) -> None:
self.close()


class ConfiguredPeer(Peer):
config: BridgeConfig
Expand Down
25 changes: 17 additions & 8 deletions src/cockpit/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ def thaw_endpoint(self):
self.__endpoint_frozen_queue = None

# interface for receiving messages
def do_close(self):
raise NotImplementedError

def do_channel_control(self, channel: str, command: str, message: JsonObject) -> None:
raise NotImplementedError

Expand Down Expand Up @@ -160,17 +163,20 @@ def drop_channel(self, channel: str) -> None:
except KeyError:
logger.error('trying to drop non-existent channel %s from %s', channel, self.open_channels)

# were we waiting to exit?
if not self.open_channels and self._eof and self.transport:
self.transport.close()

def shutdown_endpoint(self, endpoint: Endpoint, **kwargs) -> None:
channels = self.endpoints.pop(endpoint)
logger.debug('shutdown_endpoint(%s, %s) will close %s', endpoint, kwargs, channels)
for channel in channels:
self.write_control(command='close', channel=channel, **kwargs)
self.drop_channel(channel)

# were we waiting to exit?
if self._eof:
logger.debug(' %d endpoints remaining', len(self.endpoints))
if not self.endpoints and self.transport:
logger.debug(' close transport')
self.transport.close()

def do_kill(self, host: Optional[str], group: Optional[str]) -> None:
endpoints = set(self.endpoints)
logger.debug('do_kill(%s, %s). Considering %d endpoints.', host, group, len(endpoints))
Expand Down Expand Up @@ -213,12 +219,15 @@ def channel_data_received(self, channel: str, data: bytes) -> None:
endpoint.do_channel_data(channel, data)

def eof_received(self) -> bool:
self._eof = True
logger.debug('eof_received(%r)', self)

for channel, endpoint in list(self.open_channels.items()):
endpoint.do_channel_control(channel, 'close', {'command': 'close', 'channel': channel})
endpoints = set(self.endpoints)
for endpoint in endpoints:
endpoint.do_close()

return bool(self.open_channels)
self._eof = True
logger.debug(' endpoints remaining: %r', self.endpoints)
return bool(self.endpoints)

def do_closed(self, exc: Optional[Exception]) -> None:
for rule in self.routing_rules:
Expand Down

0 comments on commit d1b4f18

Please sign in to comment.