Skip to content

gRPC Streaming Example

Indexer-based orderbook streaming, due to the increased latency introduced by the Indexer, can cause issues like more outdated orders or a crossed orderbook. In a full node, the orderbook available will be more up-to-date and should be preferred over the Indexer-based solution. This requires a full node with gRPC streaming enabled.

In this example, we'll guide on how to connect and handle the gRPC data in order to enable use-cases such as orderbook watching. While full node streaming is provided both using gRPC and WebSockets, we'll focus here on gRPC-based streaming due to its higher efficiency.

Install dependencies

gRPC uses structured and serialized data using Protocol Buffers. For Python, install the package v4-proto which already contains the messages and generated code used in gRPC. This is the main dependency used in this guide, allowing us to deserialize the incoming stream messages.

Establish a connection

With a full node with gRPC streaming available, we can now try to establish a connection to it. We'll need to define a gRPC configuration to maintain an healthy connection. Lets also define here the CLOB pairs IDs (markets) that we are interested in, as well as the relevant subaccounts.

Python
import grpc
from v4_proto.dydxprotocol.clob.query_pb2 import StreamOrderbookUpdatesRequest
from v4_proto.dydxprotocol.clob.query_pb2_grpc import QueryStub
from v4_proto.dydxprotocol.subaccounts.subaccount_pb2 import SubaccountId
 
GRPC_OPTIONS = [
    # Send keepalive ping every 30 seconds
    ("grpc.keepalive_time_ms", 3000),
    # Wait 10 seconds for ping ack before considering the connection dead
    ("grpc.keepalive_timeout_ms", 1000,),
    # Allow keepalive pings even when there are no calls
    ("grpc.keepalive_permit_without_calls", True,),
    # Minimum allowed time between pings
    ("grpc.http2.min_time_between_pings_ms", 3000,),
    # Minimum allowed time between pings with no data
    ("grpc.http2.min_ping_interval_without_data_ms", 3000,),
]
 
endpoint = "your-node-address:9090"
clob_pair_ids = [0, 1] # ETH-USD
subaccount_ids = [] # All subaccounts
 
# Establish async connection 
async with grpc.aio.insecure_channel(endpoint, GRPC_OPTIONS) as channel:
    tasks = [
        listen_to_grpc_stream(
            channel,
            clob_pair_ids,
            subaccount_ids,
            feed_handler,
        ),
    ]
    await asyncio.gather(*tasks)

Streaming

The streaming function listen_to_grpc_stream() processes the continuous stream of orderbook updates. Each message contains batched updates that must be processed sequentially to maintain correct state.

Python
async def listen_to_grpc_stream(
    channel: grpc.Channel,
    clob_pair_ids: List[int],
    subaccount_ids: List[str],
    feed_handler: FeedHandler,
):
    """Subscribe to gRPC stream and handle orderbook updates."""
    stub = QueryStub(channel)
    
    # Parse subaccount ids (format: owner_address/subaccount_number)
    subaccount_protos = [
        SubaccountId(owner=sa.split('/')[0], number=int(sa.split('/')[1])) 
        for sa in subaccount_ids
    ]
    
    request = StreamOrderbookUpdatesRequest(
        clob_pair_id=clob_pair_ids, 
        subaccount_ids=subaccount_protos
    )
    
    async for response in stub.StreamOrderbookUpdates(request):
        fill_events = feed_handler.handle(response)
        # Process fills and other updates
        for fill in fill_events:
            print(f"Fill: {fill.quantums} @ {fill.subticks}")

Maintaining Orderbook and Subaccount State

Lets add a component FeedHandler that maintains local state by processing streaming updates. It will handle different message types and ensure state consistency.

Python
class FeedHandler:
    def __init__(self):
        self.books: Dict[int, LimitOrderBook] = {}
        self.subaccounts: Dict[SubaccountId, StreamSubaccount] = {}
        self.has_seen_first_snapshot = False
    
    def handle(self, message: StreamOrderbookUpdatesResponse) -> List[Fill]:
        """Handle incoming stream messages and update state."""
        collected_fills = []
        
        for update in message.updates:
            update_type = update.WhichOneof('update_message')
            
            if update_type == 'orderbook_update':
                self._handle_orderbook_update(update.orderbook_update)
            elif update_type == 'order_fill':
                fills = self._handle_fills(update.order_fill, update.exec_mode)
                collected_fills += fills
            elif update_type == 'subaccount_update':
                self._handle_subaccounts(update.subaccount_update)
                
        return collected_fills

Snapshots

Snapshots provide the complete current state and serve as the foundation for processing subsequent incremental updates. The client should wait for snapshots before processing any other messages to ensure state consistency.

Discard order messages until you receive a StreamOrderbookUpdate with snapshot set to true. This message contains the full orderbook state for each clob pair.

Similarly, discard subaccount messages until you receive a StreamSubaccountUpdate with snapshot set to true. This message contains the full subaccount state for each subscribed subaccount.

def _handle_orderbook_update(self, update: StreamOrderbookUpdate):
    """Handle orderbook snapshots and incremental updates."""
    # Skip messages until the first snapshot is received
    if not self.has_seen_first_snapshot and not update.snapshot:
        return
    
    # Skip subsequent snapshots
    if update.snapshot and self.has_seen_first_snapshot:
        logging.warning("Skipping subsequent snapshot")
        return
    
    if update.snapshot:
        # This is a new snapshot of the book state
        if not self.has_seen_first_snapshot:
            self.has_seen_first_snapshot = True
    
    # Process each update in the batch
    for u in update.updates:
        update_type = u.WhichOneof('update_message')
        
        if update_type == 'order_place':
            self._handle_order_place(u.order_place)
        elif update_type == 'order_update':
            self._handle_order_update(u.order_update)
        elif update_type == 'order_remove':
            self._handle_order_remove(u.order_remove)
 
def _handle_subaccounts(self, update: StreamSubaccountUpdate):
    """Handle subaccount snapshots and updates."""
    parsed_subaccount = parse_subaccounts(update)
    subaccount_id = parsed_subaccount.subaccount_id
    
    if update.snapshot:
        # Skip subsequent snapshots
        if subaccount_id in self.subaccounts:
            logging.warning(f"Saw multiple snapshots for subaccount {subaccount_id}")
            return
        self.subaccounts[subaccount_id] = parsed_subaccount
    else:
        # Skip messages until the first snapshot is received
        if subaccount_id not in self.subaccounts:
            return
        # Update the existing subaccount
        existing_subaccount = self.subaccounts[subaccount_id]
        existing_subaccount.perpetual_positions.update(parsed_subaccount.perpetual_positions)
        existing_subaccount.asset_positions.update(parsed_subaccount.asset_positions)

Orderbook Management

The orderbook is implemented as a Level 3 (L3) order book that maintains individual orders with their full details. This provides maximum granularity for trading applications that need to track specific orders and their execution.

Lets implement an efficient Orderbook data structure named LimitOrderBook suitable for high-frequency trading.

Each price level maintains orders in a doubly-linked list for efficient insertion and removal.

Orders progress through a lifecycle of placement, updates, and removal. Your client must handle each stage correctly to maintain accurate book state.

Handle both optimistic and finalized trades correctly. Optimistic fills update book state immediately, while only finalized fills should be treated as confirmed trades.

Additional logic

Process informational taker order messages without updating state.

Convert protocol integers to decimal values for display and analysis. Fetch the market information from the Indexer, containing data required for integer conversion.

Validate orderbook state consistency in order to detect any errors, here related with crossed orderbook (a bid larger than a ask).