nautilus_deribit/websocket/
handler.rs1use std::sync::{
23 Arc,
24 atomic::{AtomicBool, AtomicU64, Ordering},
25};
26
27use ahash::AHashMap;
28use nautilus_core::{AtomicTime, UnixNanos, time::get_atomic_clock_realtime};
29use nautilus_model::{
30 data::Data,
31 instruments::{Instrument, InstrumentAny},
32};
33use nautilus_network::{
34 RECONNECTED,
35 retry::{RetryManager, create_websocket_retry_manager},
36 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
37};
38use tokio_tungstenite::tungstenite::Message;
39use ustr::Ustr;
40
41use super::{
42 enums::{DeribitHeartbeatType, DeribitWsChannel},
43 error::DeribitWsError,
44 messages::{
45 DeribitAuthResult, DeribitBookMsg, DeribitHeartbeatParams, DeribitJsonRpcRequest,
46 DeribitQuoteMsg, DeribitSubscribeParams, DeribitTickerMsg, DeribitTradeMsg,
47 DeribitWsMessage, NautilusWsMessage, parse_raw_message,
48 },
49 parse::{parse_book_msg, parse_quote_msg, parse_ticker_to_quote, parse_trades_data},
50};
51
52#[allow(missing_debug_implementations)]
54pub enum HandlerCommand {
55 SetClient(WebSocketClient),
57 Disconnect,
59 Authenticate { payload: String },
61 SetHeartbeat { interval: u64 },
63 InitializeInstruments(Vec<InstrumentAny>),
65 UpdateInstrument(Box<InstrumentAny>),
67 Subscribe { channels: Vec<String> },
69 Unsubscribe { channels: Vec<String> },
71}
72
73#[allow(missing_debug_implementations)]
77#[allow(dead_code)] pub struct DeribitWsFeedHandler {
79 clock: &'static AtomicTime,
80 signal: Arc<AtomicBool>,
81 inner: Option<WebSocketClient>,
82 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
83 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
84 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
85 auth_tracker: AuthTracker,
86 subscriptions_state: SubscriptionState,
87 retry_manager: RetryManager<DeribitWsError>,
88 instruments_cache: AHashMap<Ustr, InstrumentAny>,
89 request_id_counter: AtomicU64,
90}
91
92impl DeribitWsFeedHandler {
93 #[must_use]
95 pub fn new(
96 signal: Arc<AtomicBool>,
97 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
98 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
99 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
100 auth_tracker: AuthTracker,
101 subscriptions_state: SubscriptionState,
102 ) -> Self {
103 Self {
104 clock: get_atomic_clock_realtime(),
105 signal,
106 inner: None,
107 cmd_rx,
108 raw_rx,
109 out_tx,
110 auth_tracker,
111 subscriptions_state,
112 retry_manager: create_websocket_retry_manager(),
113 instruments_cache: AHashMap::new(),
114 request_id_counter: AtomicU64::new(1),
115 }
116 }
117
118 fn next_request_id(&self) -> u64 {
120 self.request_id_counter.fetch_add(1, Ordering::Relaxed)
121 }
122
123 fn ts_init(&self) -> UnixNanos {
125 self.clock.get_time_ns()
126 }
127
128 async fn send_with_retry(
130 &self,
131 payload: String,
132 rate_limit_keys: Option<Vec<String>>,
133 ) -> Result<(), DeribitWsError> {
134 if let Some(client) = &self.inner {
135 self.retry_manager
136 .execute_with_retry(
137 "websocket_send",
138 || async {
139 client
140 .send_text(payload.clone(), rate_limit_keys.clone())
141 .await
142 .map_err(|e| DeribitWsError::Send(e.to_string()))
143 },
144 |e| matches!(e, DeribitWsError::Send(_)),
145 DeribitWsError::Timeout,
146 )
147 .await
148 } else {
149 Err(DeribitWsError::NotConnected)
150 }
151 }
152
153 async fn handle_subscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
155 let request_id = self.next_request_id();
156
157 for channel in &channels {
159 self.subscriptions_state.mark_subscribe(channel);
160 }
161
162 let request = DeribitJsonRpcRequest::new(
163 request_id,
164 "public/subscribe",
165 DeribitSubscribeParams {
166 channels: channels.clone(),
167 },
168 );
169
170 let payload =
171 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
172
173 tracing::debug!("Subscribing to channels: {:?}", channels);
174 self.send_with_retry(payload, None).await
175 }
176
177 async fn handle_unsubscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
179 let request_id = self.next_request_id();
180
181 for channel in &channels {
183 self.subscriptions_state.mark_unsubscribe(channel);
184 }
185
186 let request = DeribitJsonRpcRequest::new(
187 request_id,
188 "public/unsubscribe",
189 DeribitSubscribeParams {
190 channels: channels.clone(),
191 },
192 );
193
194 let payload =
195 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
196
197 tracing::debug!("Unsubscribing from channels: {:?}", channels);
198 self.send_with_retry(payload, None).await
199 }
200
201 async fn handle_set_heartbeat(&mut self, interval: u64) -> Result<(), DeribitWsError> {
203 let request_id = self.next_request_id();
204
205 let request = DeribitJsonRpcRequest::new(
206 request_id,
207 "public/set_heartbeat",
208 DeribitHeartbeatParams { interval },
209 );
210
211 let payload =
212 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
213
214 tracing::debug!("Enabling heartbeat with interval: {} seconds", interval);
215 self.send_with_retry(payload, None).await
216 }
217
218 async fn handle_heartbeat_test_request(&self) -> Result<(), DeribitWsError> {
220 let request_id = self.next_request_id();
221
222 let request = DeribitJsonRpcRequest::new(request_id, "public/test", serde_json::json!({}));
223
224 let payload =
225 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
226
227 tracing::trace!("Responding to heartbeat test_request");
228 self.send_with_retry(payload, None).await
229 }
230
231 async fn process_command(&mut self, cmd: HandlerCommand) {
233 match cmd {
234 HandlerCommand::SetClient(client) => {
235 tracing::debug!("Setting WebSocket client");
236 self.inner = Some(client);
237 }
238 HandlerCommand::Disconnect => {
239 tracing::debug!("Disconnecting WebSocket");
240 if let Some(client) = self.inner.take() {
241 client.disconnect().await;
242 }
243 }
244 HandlerCommand::Authenticate { payload } => {
245 tracing::debug!("Authenticating...");
246 if let Err(e) = self.send_with_retry(payload, None).await {
247 tracing::error!("Authentication send failed: {e}");
248 }
249 }
250 HandlerCommand::SetHeartbeat { interval } => {
251 if let Err(e) = self.handle_set_heartbeat(interval).await {
252 tracing::error!("Set heartbeat failed: {e}");
253 }
254 }
255 HandlerCommand::InitializeInstruments(instruments) => {
256 tracing::debug!("Initializing {} instruments", instruments.len());
257 self.instruments_cache.clear();
258 for inst in instruments {
259 self.instruments_cache
260 .insert(inst.raw_symbol().inner(), inst);
261 }
262 }
263 HandlerCommand::UpdateInstrument(instrument) => {
264 tracing::trace!("Updating instrument: {}", instrument.raw_symbol());
265 self.instruments_cache
266 .insert(instrument.raw_symbol().inner(), *instrument);
267 }
268 HandlerCommand::Subscribe { channels } => {
269 if let Err(e) = self.handle_subscribe(channels).await {
270 tracing::error!("Subscribe failed: {e}");
271 }
272 }
273 HandlerCommand::Unsubscribe { channels } => {
274 if let Err(e) = self.handle_unsubscribe(channels).await {
275 tracing::error!("Unsubscribe failed: {e}");
276 }
277 }
278 }
279 }
280
281 async fn process_raw_message(&mut self, text: &str) -> Option<NautilusWsMessage> {
283 if text == RECONNECTED {
285 tracing::info!("Received reconnection signal");
286 return Some(NautilusWsMessage::Reconnected);
287 }
288
289 let ws_msg = match parse_raw_message(text) {
291 Ok(msg) => msg,
292 Err(e) => {
293 tracing::warn!("Failed to parse message: {e}");
294 return None;
295 }
296 };
297
298 let ts_init = self.ts_init();
299
300 match ws_msg {
301 DeribitWsMessage::Response(response) => {
302 if let Some(result) = &response.result
304 && let Some(channels) = result.as_array()
305 {
306 for channel in channels {
307 if let Some(ch) = channel.as_str() {
308 self.subscriptions_state.confirm_subscribe(ch);
309 tracing::debug!("Subscription confirmed: {ch}");
310 }
311 }
312 }
313 if let Some(result) = &response.result
315 && result.get("access_token").is_some()
316 {
317 match serde_json::from_value::<DeribitAuthResult>(result.clone()) {
319 Ok(auth_result) => {
320 self.auth_tracker.succeed();
321 tracing::info!(
322 "Authentication successful, scope: {}, expires_in: {}s",
323 auth_result.scope,
324 auth_result.expires_in
325 );
326 return Some(NautilusWsMessage::Authenticated(Box::new(auth_result)));
327 }
328 Err(e) => {
329 tracing::error!("Failed to parse auth result: {e}");
330 self.auth_tracker
331 .fail(format!("Failed to parse auth result: {e}"));
332 return None;
333 }
334 }
335 }
336 None
337 }
338 DeribitWsMessage::Notification(notification) => {
339 let channel = ¬ification.params.channel;
340 let data = ¬ification.params.data;
341
342 if let Some(channel_type) = DeribitWsChannel::from_channel_string(channel) {
344 match channel_type {
345 DeribitWsChannel::Trades => {
346 match serde_json::from_value::<Vec<DeribitTradeMsg>>(data.clone()) {
348 Ok(trades) => {
349 tracing::debug!("Received {} trades", trades.len());
350 let data_vec =
351 parse_trades_data(trades, &self.instruments_cache, ts_init);
352 if !data_vec.is_empty() {
353 tracing::debug!("Parsed {} trade ticks", data_vec.len());
354 return Some(NautilusWsMessage::Data(data_vec));
355 } else {
356 tracing::debug!(
357 "No trades parsed - instrument cache size: {}",
358 self.instruments_cache.len()
359 );
360 }
361 }
362 Err(e) => {
363 tracing::warn!("Failed to deserialize trades: {e}");
364 }
365 }
366 }
367 DeribitWsChannel::Book => {
368 if let Ok(book_msg) =
370 serde_json::from_value::<DeribitBookMsg>(data.clone())
371 && let Some(instrument) =
372 self.instruments_cache.get(&book_msg.instrument_name)
373 {
374 match parse_book_msg(&book_msg, instrument, ts_init) {
375 Ok(deltas) => {
376 return Some(NautilusWsMessage::Deltas(deltas));
377 }
378 Err(e) => {
379 tracing::warn!("Failed to parse book message: {e}");
380 }
381 }
382 }
383 }
384 DeribitWsChannel::Ticker => {
385 if let Ok(ticker_msg) =
387 serde_json::from_value::<DeribitTickerMsg>(data.clone())
388 && let Some(instrument) =
389 self.instruments_cache.get(&ticker_msg.instrument_name)
390 {
391 match parse_ticker_to_quote(&ticker_msg, instrument, ts_init) {
392 Ok(quote) => {
393 return Some(NautilusWsMessage::Data(vec![Data::Quote(
394 quote,
395 )]));
396 }
397 Err(e) => {
398 tracing::warn!("Failed to parse ticker message: {e}");
399 }
400 }
401 }
402 }
403 DeribitWsChannel::Quote => {
404 if let Ok(quote_msg) =
406 serde_json::from_value::<DeribitQuoteMsg>(data.clone())
407 && let Some(instrument) =
408 self.instruments_cache.get("e_msg.instrument_name)
409 {
410 match parse_quote_msg("e_msg, instrument, ts_init) {
411 Ok(quote) => {
412 return Some(NautilusWsMessage::Data(vec![Data::Quote(
413 quote,
414 )]));
415 }
416 Err(e) => {
417 tracing::warn!("Failed to parse quote message: {e}");
418 }
419 }
420 }
421 }
422 _ => {
423 tracing::trace!("Unhandled channel: {channel}");
425 return Some(NautilusWsMessage::Raw(data.clone()));
426 }
427 }
428 } else {
429 tracing::trace!("Unknown channel: {channel}");
430 return Some(NautilusWsMessage::Raw(data.clone()));
431 }
432 None
433 }
434 DeribitWsMessage::Heartbeat(heartbeat) => {
435 match heartbeat.heartbeat_type {
436 DeribitHeartbeatType::TestRequest => {
437 tracing::trace!(
438 "Received heartbeat test_request - responding with public/test"
439 );
440 if let Err(e) = self.handle_heartbeat_test_request().await {
441 tracing::error!("Failed to respond to heartbeat test_request: {e}");
442 }
443 }
444 DeribitHeartbeatType::Heartbeat => {
445 tracing::trace!("Received heartbeat acknowledgment");
446 }
447 }
448 None
449 }
450 DeribitWsMessage::Error(err) => {
451 tracing::error!("Deribit error {}: {}", err.code, err.message);
452 Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
453 code: err.code,
454 message: err.message,
455 }))
456 }
457 DeribitWsMessage::Reconnected => Some(NautilusWsMessage::Reconnected),
458 }
459 }
460
461 pub async fn next(&mut self) -> Option<NautilusWsMessage> {
467 loop {
468 tokio::select! {
469 Some(cmd) = self.cmd_rx.recv() => {
471 self.process_command(cmd).await;
472 }
473 Some(msg) = self.raw_rx.recv() => {
475 match msg {
476 Message::Text(text) => {
477 if let Some(nautilus_msg) = self.process_raw_message(&text).await {
478 match &nautilus_msg {
480 NautilusWsMessage::Data(_)
481 | NautilusWsMessage::Deltas(_)
482 | NautilusWsMessage::Instrument(_)
483 | NautilusWsMessage::Raw(_)
484 | NautilusWsMessage::Error(_) => {
485 let _ = self.out_tx.send(nautilus_msg);
486 }
487 NautilusWsMessage::Reconnected
489 | NautilusWsMessage::Authenticated(_) => {
490 return Some(nautilus_msg);
491 }
492 }
493 }
494 }
495 Message::Ping(data) => {
496 if let Some(client) = &self.inner {
498 let _ = client.send_pong(data.to_vec()).await;
499 }
500 }
501 Message::Close(_) => {
502 tracing::info!("Received close frame");
503 }
504 _ => {}
505 }
506 }
507 _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
509 if self.signal.load(Ordering::Relaxed) {
510 tracing::debug!("Stop signal received");
511 return None;
512 }
513 }
514 }
515 }
516 }
517}