gRPC Streaming Example
Indexer-based orderbook streaming, due to the increased latency introduced by the Indexer, can cause issues like more outdated orders or a crossed orderbook. In a full node, the orderbook available will be more up-to-date and should be preferred over the Indexer-based solution. This requires a full node with gRPC streaming enabled.
In this example, we'll guide on how to connect and handle the gRPC data in order to enable use-cases such as orderbook watching. While full node streaming is provided both using gRPC and WebSockets, we'll focus here on gRPC-based streaming due to its higher efficiency.
Install dependencies
gRPC uses structured and serialized data using Protocol Buffers.
For Python, install the package v4-proto
which already contains the messages and generated code used in gRPC.
This is the main dependency used in this guide, allowing us to deserialize the incoming stream messages.
Establish a connection
With a full node with gRPC streaming available, we can now try to establish a connection to it. We'll need to define a gRPC configuration to maintain an healthy connection. Lets also define here the CLOB pairs IDs (markets) that we are interested in, as well as the relevant subaccounts.
import grpc
from v4_proto.dydxprotocol.clob.query_pb2 import StreamOrderbookUpdatesRequest
from v4_proto.dydxprotocol.clob.query_pb2_grpc import QueryStub
from v4_proto.dydxprotocol.subaccounts.subaccount_pb2 import SubaccountId
GRPC_OPTIONS = [
# Send keepalive ping every 30 seconds
("grpc.keepalive_time_ms", 3000),
# Wait 10 seconds for ping ack before considering the connection dead
("grpc.keepalive_timeout_ms", 1000,),
# Allow keepalive pings even when there are no calls
("grpc.keepalive_permit_without_calls", True,),
# Minimum allowed time between pings
("grpc.http2.min_time_between_pings_ms", 3000,),
# Minimum allowed time between pings with no data
("grpc.http2.min_ping_interval_without_data_ms", 3000,),
]
endpoint = "your-node-address:9090"
clob_pair_ids = [0, 1] # ETH-USD
subaccount_ids = [] # All subaccounts
# Establish async connection
async with grpc.aio.insecure_channel(endpoint, GRPC_OPTIONS) as channel:
tasks = [
listen_to_grpc_stream(
channel,
clob_pair_ids,
subaccount_ids,
feed_handler,
),
]
await asyncio.gather(*tasks)
Streaming
The streaming function listen_to_grpc_stream()
processes the continuous stream of orderbook updates. Each message contains batched updates that must be processed sequentially to maintain correct state.
async def listen_to_grpc_stream(
channel: grpc.Channel,
clob_pair_ids: List[int],
subaccount_ids: List[str],
feed_handler: FeedHandler,
):
"""Subscribe to gRPC stream and handle orderbook updates."""
stub = QueryStub(channel)
# Parse subaccount ids (format: owner_address/subaccount_number)
subaccount_protos = [
SubaccountId(owner=sa.split('/')[0], number=int(sa.split('/')[1]))
for sa in subaccount_ids
]
request = StreamOrderbookUpdatesRequest(
clob_pair_id=clob_pair_ids,
subaccount_ids=subaccount_protos
)
async for response in stub.StreamOrderbookUpdates(request):
fill_events = feed_handler.handle(response)
# Process fills and other updates
for fill in fill_events:
print(f"Fill: {fill.quantums} @ {fill.subticks}")
Maintaining Orderbook and Subaccount State
Lets add a component FeedHandler
that maintains local state by processing streaming updates.
It will handle different message types and ensure state consistency.
class FeedHandler:
def __init__(self):
self.books: Dict[int, LimitOrderBook] = {}
self.subaccounts: Dict[SubaccountId, StreamSubaccount] = {}
self.has_seen_first_snapshot = False
def handle(self, message: StreamOrderbookUpdatesResponse) -> List[Fill]:
"""Handle incoming stream messages and update state."""
collected_fills = []
for update in message.updates:
update_type = update.WhichOneof('update_message')
if update_type == 'orderbook_update':
self._handle_orderbook_update(update.orderbook_update)
elif update_type == 'order_fill':
fills = self._handle_fills(update.order_fill, update.exec_mode)
collected_fills += fills
elif update_type == 'subaccount_update':
self._handle_subaccounts(update.subaccount_update)
return collected_fills
Snapshots
Snapshots provide the complete current state and serve as the foundation for processing subsequent incremental updates. The client should wait for snapshots before processing any other messages to ensure state consistency.
Discard order messages until you receive a
StreamOrderbookUpdate
withsnapshot
set totrue
. This message contains the full orderbook state for each clob pair.
Similarly, discard subaccount messages until you receive a
StreamSubaccountUpdate
withsnapshot
set totrue
. This message contains the full subaccount state for each subscribed subaccount.
def _handle_orderbook_update(self, update: StreamOrderbookUpdate):
"""Handle orderbook snapshots and incremental updates."""
# Skip messages until the first snapshot is received
if not self.has_seen_first_snapshot and not update.snapshot:
return
# Skip subsequent snapshots
if update.snapshot and self.has_seen_first_snapshot:
logging.warning("Skipping subsequent snapshot")
return
if update.snapshot:
# This is a new snapshot of the book state
if not self.has_seen_first_snapshot:
self.has_seen_first_snapshot = True
# Process each update in the batch
for u in update.updates:
update_type = u.WhichOneof('update_message')
if update_type == 'order_place':
self._handle_order_place(u.order_place)
elif update_type == 'order_update':
self._handle_order_update(u.order_update)
elif update_type == 'order_remove':
self._handle_order_remove(u.order_remove)
def _handle_subaccounts(self, update: StreamSubaccountUpdate):
"""Handle subaccount snapshots and updates."""
parsed_subaccount = parse_subaccounts(update)
subaccount_id = parsed_subaccount.subaccount_id
if update.snapshot:
# Skip subsequent snapshots
if subaccount_id in self.subaccounts:
logging.warning(f"Saw multiple snapshots for subaccount {subaccount_id}")
return
self.subaccounts[subaccount_id] = parsed_subaccount
else:
# Skip messages until the first snapshot is received
if subaccount_id not in self.subaccounts:
return
# Update the existing subaccount
existing_subaccount = self.subaccounts[subaccount_id]
existing_subaccount.perpetual_positions.update(parsed_subaccount.perpetual_positions)
existing_subaccount.asset_positions.update(parsed_subaccount.asset_positions)
Orderbook Management
The orderbook is implemented as a Level 3 (L3) order book that maintains individual orders with their full details. This provides maximum granularity for trading applications that need to track specific orders and their execution.
Lets implement an efficient Orderbook data structure named LimitOrderBook
suitable for high-frequency trading.
Each price level maintains orders in a doubly-linked list for efficient insertion and removal.
Orders progress through a lifecycle of placement, updates, and removal. Your client must handle each stage correctly to maintain accurate book state.
Handle both optimistic and finalized trades correctly. Optimistic fills update book state immediately, while only finalized fills should be treated as confirmed trades.
Additional logic
Process informational taker order messages without updating state.
Convert protocol integers to decimal values for display and analysis. Fetch the market information from the Indexer, containing data required for integer conversion.
Validate orderbook state consistency in order to detect any errors, here related with crossed orderbook (a bid larger than a ask).