-
Notifications
You must be signed in to change notification settings - Fork 22
Advanced WebSocket
IBKR WebSocket API may behave unexpectedly in the future if the previous connections are not closed gracefully. We need to ensure to always call IbkrWsClient.shutdown
method when the program terminates.
To do so, use signal
module to shutdown when the program is terminated.
import signal
# assuming we subscribe to Orders channel
ws_client.subscribe(channel='or', data=None, needs_confirmation=False)
# this is a callback used when the program terminates
def stop(_, _1):
# we unsubscribe from the Orders channel
ws_client.unsubscribe(channel='or', data=None, needs_confirmation=False)
# we gracefully close the connection
ws_client.shutdown()
# register the `stop` callback to be called when the program terminates
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)
See examples "ws_02_intermediate" and "ws_03_market_history" which demonstrate advanced lifecycle management.
While most IBKR WebSocket API channels follow the payload structure described in the IbkrWsClient - Subscribing and Unsubscribing section, there are some exceptions that need to be handled on a case-by-case basis.
To facilitate this, the subscribe
and unsubscribe
methods accept an instance of SubscriptionProcessor
as an optional argument.
It is an interface allowing the WsClient
to translate our channel
and data
arguments into a payload string. Recall that these arguments are passed to the subscribe
and unsubscribe
methods.
class SubscriptionProcessor(ABC):
def make_subscribe_payload(self, channel: str, data: dict = None) -> str:
raise NotImplementedError()
def make_unsubscribe_payload(self, channel: str, data: dict = None) -> str:
raise NotImplementedError()
IbkrWsClient
utilises the IbkrSubscriptionProcessor
, which for example adds the s
and u
prefixes depending on whether we subscribe or unsubscribe.
class IbkrSubscriptionProcessor(SubscriptionProcessor):
def make_subscribe_payload(self, channel: str, data: dict = None) -> str:
payload = f"s{channel}"
if data is not None or data == {}:
payload += f"+{json.dumps(data)}"
return payload
def make_unsubscribe_payload(self, channel: str, data: dict = None) -> str:
data = {} if data is None else data
return f'u{channel}+{json.dumps(data)}'
We can specify a custom SubscriptionProcessor
when we call the subscribe
or unsubscribe
methods in order to override using the default IbkrSubscriptionProcessor
. There are some channels that require this to work, and the next section is going to go over one such use case.
If we see the Historical Market Data WebSocket documentation, we can see that the payload required for subscribing and unsubscribing differs substantially.
Subscribing:
smh+conid+{"exchange":"exchange", "period":"period", "bar":"bar", "outsideRth":outsideRth, "source":"source", "format":"format"}
Unsubscribing:
umh+{serverId}
Additionally:
NOTE: Only a max of 5 concurrent historical data request available at a time.
NOTE: Historical data will only respond once, though customers will still need to unsubscribe from the endpoint.
There are several key challenges with this channel:
-
The parameters change: Note that the second parameter passed in the payloads changes from
conid
toserverId
. This is unlike majority of the other endpoints, where unsubscribing either also requires the sameconid
parameter as subscribing, or requires no parameters at all. -
Acquiring
serverId
: What's more, theserverId
parameter expects an ID of the IBKR server that currently handles our subscription on IBKR side. This is an information that will be known to us only after subscribing and receiving first valid messages through the WebSocket channel, as theserverId
field is attached to most Historical Market Data messages. - Connection limits: To make it even more complicated, we may only have up to five simultaneous Historical Market Data WebSocket servers connected to us, and these stay assigned until we explicitly unsubscribe from them. Hence, it's obligatory to build a reliable unsubscribing logic before we even start testing this channel.
The solution will involve:
- Recognising and storing the
serverId
data. - Building a custom
SubscriptionProcessor
that adds theserverId
to the payload instead of theconid
. - Adding a function that will loop over the existing
serverId
data stored and attempt unsubscribing.
Let's tackle it step by step.
Since this is a known challenge, the IbkrWsClient
handles the first step for us automatically.
All Historical Market Data channel messages are being parsed for the serverId
field and stored internally along with the conid
that the sever is sending data for.
We may access the list of currently stored serverId
/conid
pairs for a particular channel by calling the server_ids
method, passing the appropriate IbkrWsKey
of the channel as the argument.
ws_client.server_ids(IbkrWsKey.MARKET_HISTORY)
Tackling the second step requires us to write a custom SubscriptionProcessor
, which will inherit making the subscription payload logic from the IbkrSubscriptionProcessor
, but override it for making the unsubscription payload logic:
class MhSubscriptionProcessor(IbkrSubscriptionProcessor):
def make_unsubscribe_payload(self, channel: str, server_id: dict = None) -> str:
return f'umh+{server_id}'
Finally, we can write the function that will unsubscribe from the Historical Market Data using our custom MhSubscriptionProcessor
:
subscription_processor = MhSubscriptionProcessor()
def unsubscribe_market_history():
# get serverId/conid pairs
server_id_conid_pairs = ws_client.server_ids(IbkrWsKey.MARKET_HISTORY)
# loop over
for server_id, conid in server_id_conid_pairs.items():
# unsubscribe using the custom SubscriptionProcessor
confirmed = ws_client.unsubscribe(
channel='mh',
server_id=server_id,
needs_confirmation=False,
subscription_processor=subscription_processor
)
Following the advice from the Advanced Lifecycle Management section, we should ensure that this unsubscribe_market_history
function is called every time our program terminates:
import signal
def stop(_, _1):
unsubscribe_market_history()
ws_client.shutdown()
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)
Only with this code implemented, it may be reasonable to start testing the Historical Market Data channel.
See example "ws_03_market_history" which demonstrates using the Historical Market Data channel.
See any error on this page? Create an Issue and let us know.