nautilus_deribit/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Deribit.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel.
21
22use std::sync::{
23    Arc,
24    atomic::{AtomicBool, AtomicU64, Ordering},
25};
26
27use ahash::AHashMap;
28use nautilus_core::{AtomicTime, UnixNanos, time::get_atomic_clock_realtime};
29use nautilus_model::{
30    data::Data,
31    instruments::{Instrument, InstrumentAny},
32};
33use nautilus_network::{
34    RECONNECTED,
35    retry::{RetryManager, create_websocket_retry_manager},
36    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
37};
38use tokio_tungstenite::tungstenite::Message;
39use ustr::Ustr;
40
41use super::{
42    enums::{DeribitHeartbeatType, DeribitWsChannel},
43    error::DeribitWsError,
44    messages::{
45        DeribitAuthResult, DeribitBookMsg, DeribitHeartbeatParams, DeribitJsonRpcRequest,
46        DeribitQuoteMsg, DeribitSubscribeParams, DeribitTickerMsg, DeribitTradeMsg,
47        DeribitWsMessage, NautilusWsMessage, parse_raw_message,
48    },
49    parse::{parse_book_msg, parse_quote_msg, parse_ticker_to_quote, parse_trades_data},
50};
51
52/// Commands sent from the client to the handler.
53#[allow(missing_debug_implementations)]
54pub enum HandlerCommand {
55    /// Set the active WebSocket client.
56    SetClient(WebSocketClient),
57    /// Disconnect the WebSocket.
58    Disconnect,
59    /// Authenticate with credentials.
60    Authenticate { payload: String },
61    /// Enable heartbeat with interval.
62    SetHeartbeat { interval: u64 },
63    /// Initialize the instrument cache.
64    InitializeInstruments(Vec<InstrumentAny>),
65    /// Update a single instrument in the cache.
66    UpdateInstrument(Box<InstrumentAny>),
67    /// Subscribe to channels.
68    Subscribe { channels: Vec<String> },
69    /// Unsubscribe from channels.
70    Unsubscribe { channels: Vec<String> },
71}
72
73/// Deribit WebSocket feed handler.
74///
75/// Runs in a dedicated Tokio task, processing commands and raw WebSocket messages.
76#[allow(missing_debug_implementations)]
77#[allow(dead_code)] // Fields reserved for future features
78pub struct DeribitWsFeedHandler {
79    clock: &'static AtomicTime,
80    signal: Arc<AtomicBool>,
81    inner: Option<WebSocketClient>,
82    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
83    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
84    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
85    auth_tracker: AuthTracker,
86    subscriptions_state: SubscriptionState,
87    retry_manager: RetryManager<DeribitWsError>,
88    instruments_cache: AHashMap<Ustr, InstrumentAny>,
89    request_id_counter: AtomicU64,
90}
91
92impl DeribitWsFeedHandler {
93    /// Creates a new feed handler.
94    #[must_use]
95    pub fn new(
96        signal: Arc<AtomicBool>,
97        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
98        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
99        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
100        auth_tracker: AuthTracker,
101        subscriptions_state: SubscriptionState,
102    ) -> Self {
103        Self {
104            clock: get_atomic_clock_realtime(),
105            signal,
106            inner: None,
107            cmd_rx,
108            raw_rx,
109            out_tx,
110            auth_tracker,
111            subscriptions_state,
112            retry_manager: create_websocket_retry_manager(),
113            instruments_cache: AHashMap::new(),
114            request_id_counter: AtomicU64::new(1),
115        }
116    }
117
118    /// Generates a unique request ID.
119    fn next_request_id(&self) -> u64 {
120        self.request_id_counter.fetch_add(1, Ordering::Relaxed)
121    }
122
123    /// Returns the current timestamp.
124    fn ts_init(&self) -> UnixNanos {
125        self.clock.get_time_ns()
126    }
127
128    /// Sends a message over the WebSocket with retry logic.
129    async fn send_with_retry(
130        &self,
131        payload: String,
132        rate_limit_keys: Option<Vec<String>>,
133    ) -> Result<(), DeribitWsError> {
134        if let Some(client) = &self.inner {
135            self.retry_manager
136                .execute_with_retry(
137                    "websocket_send",
138                    || async {
139                        client
140                            .send_text(payload.clone(), rate_limit_keys.clone())
141                            .await
142                            .map_err(|e| DeribitWsError::Send(e.to_string()))
143                    },
144                    |e| matches!(e, DeribitWsError::Send(_)),
145                    DeribitWsError::Timeout,
146                )
147                .await
148        } else {
149            Err(DeribitWsError::NotConnected)
150        }
151    }
152
153    /// Handles a subscribe command.
154    async fn handle_subscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
155        let request_id = self.next_request_id();
156
157        // Mark channels as pending
158        for channel in &channels {
159            self.subscriptions_state.mark_subscribe(channel);
160        }
161
162        let request = DeribitJsonRpcRequest::new(
163            request_id,
164            "public/subscribe",
165            DeribitSubscribeParams {
166                channels: channels.clone(),
167            },
168        );
169
170        let payload =
171            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
172
173        tracing::debug!("Subscribing to channels: {:?}", channels);
174        self.send_with_retry(payload, None).await
175    }
176
177    /// Handles an unsubscribe command.
178    async fn handle_unsubscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
179        let request_id = self.next_request_id();
180
181        // Mark channels as pending unsubscribe
182        for channel in &channels {
183            self.subscriptions_state.mark_unsubscribe(channel);
184        }
185
186        let request = DeribitJsonRpcRequest::new(
187            request_id,
188            "public/unsubscribe",
189            DeribitSubscribeParams {
190                channels: channels.clone(),
191            },
192        );
193
194        let payload =
195            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
196
197        tracing::debug!("Unsubscribing from channels: {:?}", channels);
198        self.send_with_retry(payload, None).await
199    }
200
201    /// Handles enabling heartbeat.
202    async fn handle_set_heartbeat(&mut self, interval: u64) -> Result<(), DeribitWsError> {
203        let request_id = self.next_request_id();
204
205        let request = DeribitJsonRpcRequest::new(
206            request_id,
207            "public/set_heartbeat",
208            DeribitHeartbeatParams { interval },
209        );
210
211        let payload =
212            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
213
214        tracing::debug!("Enabling heartbeat with interval: {} seconds", interval);
215        self.send_with_retry(payload, None).await
216    }
217
218    /// Responds to a heartbeat test_request.
219    async fn handle_heartbeat_test_request(&self) -> Result<(), DeribitWsError> {
220        let request_id = self.next_request_id();
221
222        let request = DeribitJsonRpcRequest::new(request_id, "public/test", serde_json::json!({}));
223
224        let payload =
225            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
226
227        tracing::trace!("Responding to heartbeat test_request");
228        self.send_with_retry(payload, None).await
229    }
230
231    /// Processes a command from the client.
232    async fn process_command(&mut self, cmd: HandlerCommand) {
233        match cmd {
234            HandlerCommand::SetClient(client) => {
235                tracing::debug!("Setting WebSocket client");
236                self.inner = Some(client);
237            }
238            HandlerCommand::Disconnect => {
239                tracing::debug!("Disconnecting WebSocket");
240                if let Some(client) = self.inner.take() {
241                    client.disconnect().await;
242                }
243            }
244            HandlerCommand::Authenticate { payload } => {
245                tracing::debug!("Authenticating...");
246                if let Err(e) = self.send_with_retry(payload, None).await {
247                    tracing::error!("Authentication send failed: {e}");
248                }
249            }
250            HandlerCommand::SetHeartbeat { interval } => {
251                if let Err(e) = self.handle_set_heartbeat(interval).await {
252                    tracing::error!("Set heartbeat failed: {e}");
253                }
254            }
255            HandlerCommand::InitializeInstruments(instruments) => {
256                tracing::debug!("Initializing {} instruments", instruments.len());
257                self.instruments_cache.clear();
258                for inst in instruments {
259                    self.instruments_cache
260                        .insert(inst.raw_symbol().inner(), inst);
261                }
262            }
263            HandlerCommand::UpdateInstrument(instrument) => {
264                tracing::trace!("Updating instrument: {}", instrument.raw_symbol());
265                self.instruments_cache
266                    .insert(instrument.raw_symbol().inner(), *instrument);
267            }
268            HandlerCommand::Subscribe { channels } => {
269                if let Err(e) = self.handle_subscribe(channels).await {
270                    tracing::error!("Subscribe failed: {e}");
271                }
272            }
273            HandlerCommand::Unsubscribe { channels } => {
274                if let Err(e) = self.handle_unsubscribe(channels).await {
275                    tracing::error!("Unsubscribe failed: {e}");
276                }
277            }
278        }
279    }
280
281    /// Processes a raw WebSocket message.
282    async fn process_raw_message(&mut self, text: &str) -> Option<NautilusWsMessage> {
283        // Check for reconnection signal
284        if text == RECONNECTED {
285            tracing::info!("Received reconnection signal");
286            return Some(NautilusWsMessage::Reconnected);
287        }
288
289        // Parse the JSON-RPC message
290        let ws_msg = match parse_raw_message(text) {
291            Ok(msg) => msg,
292            Err(e) => {
293                tracing::warn!("Failed to parse message: {e}");
294                return None;
295            }
296        };
297
298        let ts_init = self.ts_init();
299
300        match ws_msg {
301            DeribitWsMessage::Response(response) => {
302                // Handle subscription response
303                if let Some(result) = &response.result
304                    && let Some(channels) = result.as_array()
305                {
306                    for channel in channels {
307                        if let Some(ch) = channel.as_str() {
308                            self.subscriptions_state.confirm_subscribe(ch);
309                            tracing::debug!("Subscription confirmed: {ch}");
310                        }
311                    }
312                }
313                // Check for authentication response
314                if let Some(result) = &response.result
315                    && result.get("access_token").is_some()
316                {
317                    // Parse the full auth result
318                    match serde_json::from_value::<DeribitAuthResult>(result.clone()) {
319                        Ok(auth_result) => {
320                            self.auth_tracker.succeed();
321                            tracing::info!(
322                                "Authentication successful, scope: {}, expires_in: {}s",
323                                auth_result.scope,
324                                auth_result.expires_in
325                            );
326                            return Some(NautilusWsMessage::Authenticated(Box::new(auth_result)));
327                        }
328                        Err(e) => {
329                            tracing::error!("Failed to parse auth result: {e}");
330                            self.auth_tracker
331                                .fail(format!("Failed to parse auth result: {e}"));
332                            return None;
333                        }
334                    }
335                }
336                None
337            }
338            DeribitWsMessage::Notification(notification) => {
339                let channel = &notification.params.channel;
340                let data = &notification.params.data;
341
342                // Determine channel type and parse accordingly
343                if let Some(channel_type) = DeribitWsChannel::from_channel_string(channel) {
344                    match channel_type {
345                        DeribitWsChannel::Trades => {
346                            // Parse trade messages
347                            match serde_json::from_value::<Vec<DeribitTradeMsg>>(data.clone()) {
348                                Ok(trades) => {
349                                    tracing::debug!("Received {} trades", trades.len());
350                                    let data_vec =
351                                        parse_trades_data(trades, &self.instruments_cache, ts_init);
352                                    if !data_vec.is_empty() {
353                                        tracing::debug!("Parsed {} trade ticks", data_vec.len());
354                                        return Some(NautilusWsMessage::Data(data_vec));
355                                    } else {
356                                        tracing::debug!(
357                                            "No trades parsed - instrument cache size: {}",
358                                            self.instruments_cache.len()
359                                        );
360                                    }
361                                }
362                                Err(e) => {
363                                    tracing::warn!("Failed to deserialize trades: {e}");
364                                }
365                            }
366                        }
367                        DeribitWsChannel::Book => {
368                            // Parse order book messages
369                            if let Ok(book_msg) =
370                                serde_json::from_value::<DeribitBookMsg>(data.clone())
371                                && let Some(instrument) =
372                                    self.instruments_cache.get(&book_msg.instrument_name)
373                            {
374                                match parse_book_msg(&book_msg, instrument, ts_init) {
375                                    Ok(deltas) => {
376                                        return Some(NautilusWsMessage::Deltas(deltas));
377                                    }
378                                    Err(e) => {
379                                        tracing::warn!("Failed to parse book message: {e}");
380                                    }
381                                }
382                            }
383                        }
384                        DeribitWsChannel::Ticker => {
385                            // Parse ticker to quote
386                            if let Ok(ticker_msg) =
387                                serde_json::from_value::<DeribitTickerMsg>(data.clone())
388                                && let Some(instrument) =
389                                    self.instruments_cache.get(&ticker_msg.instrument_name)
390                            {
391                                match parse_ticker_to_quote(&ticker_msg, instrument, ts_init) {
392                                    Ok(quote) => {
393                                        return Some(NautilusWsMessage::Data(vec![Data::Quote(
394                                            quote,
395                                        )]));
396                                    }
397                                    Err(e) => {
398                                        tracing::warn!("Failed to parse ticker message: {e}");
399                                    }
400                                }
401                            }
402                        }
403                        DeribitWsChannel::Quote => {
404                            // Parse quote messages
405                            if let Ok(quote_msg) =
406                                serde_json::from_value::<DeribitQuoteMsg>(data.clone())
407                                && let Some(instrument) =
408                                    self.instruments_cache.get(&quote_msg.instrument_name)
409                            {
410                                match parse_quote_msg(&quote_msg, instrument, ts_init) {
411                                    Ok(quote) => {
412                                        return Some(NautilusWsMessage::Data(vec![Data::Quote(
413                                            quote,
414                                        )]));
415                                    }
416                                    Err(e) => {
417                                        tracing::warn!("Failed to parse quote message: {e}");
418                                    }
419                                }
420                            }
421                        }
422                        _ => {
423                            // Unhandled channel - return raw
424                            tracing::trace!("Unhandled channel: {channel}");
425                            return Some(NautilusWsMessage::Raw(data.clone()));
426                        }
427                    }
428                } else {
429                    tracing::trace!("Unknown channel: {channel}");
430                    return Some(NautilusWsMessage::Raw(data.clone()));
431                }
432                None
433            }
434            DeribitWsMessage::Heartbeat(heartbeat) => {
435                match heartbeat.heartbeat_type {
436                    DeribitHeartbeatType::TestRequest => {
437                        tracing::trace!(
438                            "Received heartbeat test_request - responding with public/test"
439                        );
440                        if let Err(e) = self.handle_heartbeat_test_request().await {
441                            tracing::error!("Failed to respond to heartbeat test_request: {e}");
442                        }
443                    }
444                    DeribitHeartbeatType::Heartbeat => {
445                        tracing::trace!("Received heartbeat acknowledgment");
446                    }
447                }
448                None
449            }
450            DeribitWsMessage::Error(err) => {
451                tracing::error!("Deribit error {}: {}", err.code, err.message);
452                Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
453                    code: err.code,
454                    message: err.message,
455                }))
456            }
457            DeribitWsMessage::Reconnected => Some(NautilusWsMessage::Reconnected),
458        }
459    }
460
461    /// Main message processing loop.
462    ///
463    /// Returns `None` when the handler should stop.
464    /// Messages that need client-side handling (e.g., Reconnected) are returned.
465    /// Data messages are sent directly to `out_tx` for the user stream.
466    pub async fn next(&mut self) -> Option<NautilusWsMessage> {
467        loop {
468            tokio::select! {
469                // Process commands from client
470                Some(cmd) = self.cmd_rx.recv() => {
471                    self.process_command(cmd).await;
472                }
473                // Process raw WebSocket messages
474                Some(msg) = self.raw_rx.recv() => {
475                    match msg {
476                        Message::Text(text) => {
477                            if let Some(nautilus_msg) = self.process_raw_message(&text).await {
478                                // Send data messages to user stream
479                                match &nautilus_msg {
480                                    NautilusWsMessage::Data(_)
481                                    | NautilusWsMessage::Deltas(_)
482                                    | NautilusWsMessage::Instrument(_)
483                                    | NautilusWsMessage::Raw(_)
484                                    | NautilusWsMessage::Error(_) => {
485                                        let _ = self.out_tx.send(nautilus_msg);
486                                    }
487                                    // Return messages that need client-side handling
488                                    NautilusWsMessage::Reconnected
489                                    | NautilusWsMessage::Authenticated(_) => {
490                                        return Some(nautilus_msg);
491                                    }
492                                }
493                            }
494                        }
495                        Message::Ping(data) => {
496                            // Respond to ping with pong
497                            if let Some(client) = &self.inner {
498                                let _ = client.send_pong(data.to_vec()).await;
499                            }
500                        }
501                        Message::Close(_) => {
502                            tracing::info!("Received close frame");
503                        }
504                        _ => {}
505                    }
506                }
507                // Check for stop signal
508                _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
509                    if self.signal.load(Ordering::Relaxed) {
510                        tracing::debug!("Stop signal received");
511                        return None;
512                    }
513                }
514            }
515        }
516    }
517}