nautilus_deribit/websocket/handler.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Deribit.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel.
21
22use std::{
23 collections::VecDeque,
24 sync::{
25 Arc,
26 atomic::{AtomicBool, AtomicU64, Ordering},
27 },
28};
29
30use ahash::AHashMap;
31use nautilus_common::cache::fifo::FifoCache;
32use nautilus_core::{AtomicTime, UUID4, UnixNanos, time::get_atomic_clock_realtime};
33use nautilus_model::{
34 data::{Bar, Data},
35 events::{AccountState, OrderCancelRejected, OrderModifyRejected, OrderRejected},
36 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
37 instruments::{Instrument, InstrumentAny},
38};
39use nautilus_network::{
40 RECONNECTED,
41 retry::{RetryManager, create_websocket_retry_manager},
42 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
43};
44use tokio_tungstenite::tungstenite::Message;
45use ustr::Ustr;
46
47use super::{
48 enums::{DeribitBookMsgType, DeribitHeartbeatType, DeribitWsChannel},
49 error::DeribitWsError,
50 messages::{
51 DeribitAuthResult, DeribitBookMsg, DeribitCancelAllByInstrumentParams, DeribitCancelParams,
52 DeribitChartMsg, DeribitEditParams, DeribitHeartbeatParams, DeribitInstrumentStateMsg,
53 DeribitJsonRpcRequest, DeribitOrderMsg, DeribitOrderParams, DeribitOrderResponse,
54 DeribitPerpetualMsg, DeribitPortfolioMsg, DeribitQuoteMsg, DeribitSubscribeParams,
55 DeribitTickerMsg, DeribitTradeMsg, DeribitUserTradeMsg, DeribitWsMessage,
56 NautilusWsMessage, parse_raw_message,
57 },
58 parse::{
59 OrderEventType, determine_order_event_type, parse_book_msg, parse_chart_msg,
60 parse_order_accepted, parse_order_canceled, parse_order_expired, parse_order_updated,
61 parse_perpetual_to_funding_rate, parse_quote_msg, parse_ticker_to_index_price,
62 parse_ticker_to_mark_price, parse_trades_data, parse_user_order_msg, parse_user_trade_msg,
63 resolution_to_bar_type,
64 },
65};
66use crate::common::{
67 consts::{DERIBIT_POST_ONLY_ERROR_CODE, DERIBIT_RATE_LIMIT_KEY_ORDER},
68 parse::parse_portfolio_to_account_state,
69};
70
71/// Type of pending request for request ID correlation.
72#[derive(Debug, Clone)]
73pub enum PendingRequestType {
74 /// Authentication request.
75 Authenticate,
76 /// Subscribe request with requested channels.
77 Subscribe { channels: Vec<String> },
78 /// Unsubscribe request with requested channels.
79 Unsubscribe { channels: Vec<String> },
80 /// Set heartbeat request.
81 SetHeartbeat,
82 /// Test/ping request (heartbeat response).
83 Test,
84 /// Buy order request.
85 Buy {
86 client_order_id: ClientOrderId,
87 trader_id: TraderId,
88 strategy_id: StrategyId,
89 instrument_id: InstrumentId,
90 },
91 /// Sell order request.
92 Sell {
93 client_order_id: ClientOrderId,
94 trader_id: TraderId,
95 strategy_id: StrategyId,
96 instrument_id: InstrumentId,
97 },
98 /// Edit order request.
99 Edit {
100 client_order_id: ClientOrderId,
101 trader_id: TraderId,
102 strategy_id: StrategyId,
103 instrument_id: InstrumentId,
104 },
105 /// Cancel order request.
106 Cancel {
107 client_order_id: ClientOrderId,
108 trader_id: TraderId,
109 strategy_id: StrategyId,
110 instrument_id: InstrumentId,
111 },
112 /// Cancel all orders by instrument request.
113 CancelAllByInstrument { instrument_id: InstrumentId },
114 /// Get order state request.
115 GetOrderState {
116 client_order_id: ClientOrderId,
117 trader_id: TraderId,
118 strategy_id: StrategyId,
119 instrument_id: InstrumentId,
120 },
121}
122
123/// Commands sent from the client to the handler.
124#[allow(missing_debug_implementations)]
125pub enum HandlerCommand {
126 /// Set the active WebSocket client.
127 SetClient(WebSocketClient),
128 /// Disconnect the WebSocket.
129 Disconnect,
130 /// Authenticate with credentials.
131 Authenticate {
132 /// Serialized auth params (DeribitAuthParams or DeribitRefreshTokenParams).
133 auth_params: serde_json::Value,
134 },
135 /// Enable heartbeat with interval.
136 SetHeartbeat { interval: u64 },
137 /// Initialize the instrument cache.
138 InitializeInstruments(Vec<InstrumentAny>),
139 /// Update a single instrument in the cache.
140 UpdateInstrument(Box<InstrumentAny>),
141 /// Subscribe to channels.
142 Subscribe { channels: Vec<String> },
143 /// Unsubscribe from channels.
144 Unsubscribe { channels: Vec<String> },
145 /// Submit a buy order.
146 Buy {
147 params: DeribitOrderParams,
148 client_order_id: ClientOrderId,
149 trader_id: TraderId,
150 strategy_id: StrategyId,
151 instrument_id: InstrumentId,
152 },
153 /// Submit a sell order.
154 Sell {
155 params: DeribitOrderParams,
156 client_order_id: ClientOrderId,
157 trader_id: TraderId,
158 strategy_id: StrategyId,
159 instrument_id: InstrumentId,
160 },
161 /// Edit an existing order.
162 Edit {
163 params: DeribitEditParams,
164 client_order_id: ClientOrderId,
165 trader_id: TraderId,
166 strategy_id: StrategyId,
167 instrument_id: InstrumentId,
168 },
169 /// Cancel an existing order.
170 Cancel {
171 params: DeribitCancelParams,
172 client_order_id: ClientOrderId,
173 trader_id: TraderId,
174 strategy_id: StrategyId,
175 instrument_id: InstrumentId,
176 },
177 /// Cancel all orders by instrument.
178 CancelAllByInstrument {
179 params: DeribitCancelAllByInstrumentParams,
180 instrument_id: InstrumentId,
181 },
182 /// Get order state.
183 GetOrderState {
184 order_id: String,
185 client_order_id: ClientOrderId,
186 trader_id: TraderId,
187 strategy_id: StrategyId,
188 instrument_id: InstrumentId,
189 },
190}
191
192/// Context for an order submitted via this handler.
193///
194/// Stores the original trader/strategy/client IDs from the buy/sell command
195/// so they can be used when processing user.orders subscription updates.
196#[derive(Debug, Clone)]
197pub struct OrderContext {
198 pub client_order_id: ClientOrderId,
199 pub trader_id: TraderId,
200 pub strategy_id: StrategyId,
201 pub instrument_id: InstrumentId,
202}
203
204/// Deribit WebSocket feed handler.
205///
206/// Runs in a dedicated Tokio task, processing commands and raw WebSocket messages.
207#[allow(missing_debug_implementations)]
208pub struct DeribitWsFeedHandler {
209 clock: &'static AtomicTime,
210 signal: Arc<AtomicBool>,
211 inner: Option<WebSocketClient>,
212 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
213 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
214 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
215 auth_tracker: AuthTracker,
216 subscriptions_state: SubscriptionState,
217 retry_manager: RetryManager<DeribitWsError>,
218 instruments_cache: AHashMap<Ustr, InstrumentAny>,
219 request_id_counter: AtomicU64,
220 pending_requests: AHashMap<u64, PendingRequestType>,
221 account_id: Option<AccountId>,
222 order_contexts: AHashMap<VenueOrderId, OrderContext>,
223 emitted_accepted: FifoCache<VenueOrderId, 10_000>,
224 terminal_orders: FifoCache<ClientOrderId, 10_000>,
225 pending_bars: AHashMap<String, Bar>,
226 bars_timestamp_on_close: bool,
227 last_account_states: AHashMap<String, AccountState>,
228 book_sequence: AHashMap<Ustr, u64>,
229 pending_outgoing: VecDeque<NautilusWsMessage>,
230}
231
232impl DeribitWsFeedHandler {
233 /// Creates a new feed handler.
234 #[allow(clippy::too_many_arguments)]
235 #[must_use]
236 pub fn new(
237 signal: Arc<AtomicBool>,
238 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
239 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
240 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
241 auth_tracker: AuthTracker,
242 subscriptions_state: SubscriptionState,
243 account_id: Option<AccountId>,
244 bars_timestamp_on_close: bool,
245 ) -> Self {
246 Self {
247 clock: get_atomic_clock_realtime(),
248 signal,
249 inner: None,
250 cmd_rx,
251 raw_rx,
252 out_tx,
253 auth_tracker,
254 subscriptions_state,
255 retry_manager: create_websocket_retry_manager(),
256 instruments_cache: AHashMap::new(),
257 request_id_counter: AtomicU64::new(1),
258 pending_requests: AHashMap::new(),
259 account_id,
260 order_contexts: AHashMap::new(),
261 emitted_accepted: FifoCache::new(),
262 terminal_orders: FifoCache::new(),
263 pending_bars: AHashMap::new(),
264 bars_timestamp_on_close,
265 last_account_states: AHashMap::new(),
266 book_sequence: AHashMap::new(),
267 pending_outgoing: VecDeque::new(),
268 }
269 }
270
271 /// Sets the account ID for order/fill reports.
272 pub fn set_account_id(&mut self, account_id: AccountId) {
273 self.account_id = Some(account_id);
274 }
275
276 /// Returns the account ID.
277 #[must_use]
278 pub fn account_id(&self) -> Option<AccountId> {
279 self.account_id
280 }
281
282 fn clear_state(&mut self) {
283 let pending_count = self.pending_requests.len();
284 let emitted_count = self.emitted_accepted.len();
285 let bars_count = self.pending_bars.len();
286 let account_count = self.last_account_states.len();
287 let book_count = self.book_sequence.len();
288 let outgoing_count = self.pending_outgoing.len();
289
290 self.pending_requests.clear();
291 self.emitted_accepted.clear();
292 self.pending_bars.clear();
293 self.last_account_states.clear();
294 self.book_sequence.clear();
295 self.pending_outgoing.clear();
296
297 log::debug!(
298 "Reset state: pending_requests={pending_count}, emitted_accepted={emitted_count}, \
299 pending_bars={bars_count}, account_states={account_count}, book_sequence={book_count}, \
300 pending_outgoing={outgoing_count}"
301 );
302 }
303
304 /// Generates a unique request ID.
305 fn next_request_id(&self) -> u64 {
306 self.request_id_counter.fetch_add(1, Ordering::Relaxed)
307 }
308
309 /// Returns the current timestamp.
310 fn ts_init(&self) -> UnixNanos {
311 self.clock.get_time_ns()
312 }
313
314 /// Checks if there's a pending buy/sell request for the given client_order_id.
315 ///
316 /// This is used to avoid emitting duplicate OrderAccepted events from the
317 /// user.orders subscription when the response path will also emit an event.
318 fn is_pending_order(&self, client_order_id: &ClientOrderId) -> bool {
319 self.pending_requests.values().any(|req| match req {
320 PendingRequestType::Buy {
321 client_order_id: id,
322 ..
323 }
324 | PendingRequestType::Sell {
325 client_order_id: id,
326 ..
327 } => id == client_order_id,
328 _ => false,
329 })
330 }
331
332 /// Gets the OrderContext from a pending buy/sell request by client_order_id.
333 ///
334 /// Returns None if no pending request found.
335 fn get_pending_order_context(&self, client_order_id: &ClientOrderId) -> Option<OrderContext> {
336 for req in self.pending_requests.values() {
337 match req {
338 PendingRequestType::Buy {
339 client_order_id: id,
340 trader_id,
341 strategy_id,
342 instrument_id,
343 }
344 | PendingRequestType::Sell {
345 client_order_id: id,
346 trader_id,
347 strategy_id,
348 instrument_id,
349 } => {
350 if id == client_order_id {
351 return Some(OrderContext {
352 client_order_id: *id,
353 trader_id: *trader_id,
354 strategy_id: *strategy_id,
355 instrument_id: *instrument_id,
356 });
357 }
358 }
359 _ => {}
360 }
361 }
362 None
363 }
364
365 /// Sends a message over the WebSocket with retry logic.
366 async fn send_with_retry(
367 &self,
368 payload: String,
369 rate_limit_keys: Option<&[Ustr]>,
370 ) -> Result<(), DeribitWsError> {
371 if let Some(client) = &self.inner {
372 let keys_owned: Option<Vec<Ustr>> = rate_limit_keys.map(|k| k.to_vec());
373 self.retry_manager
374 .execute_with_retry(
375 "websocket_send",
376 || {
377 let payload = payload.clone();
378 let keys = keys_owned.clone();
379 async move {
380 client
381 .send_text(payload, keys.as_deref())
382 .await
383 .map_err(|e| DeribitWsError::Send(e.to_string()))
384 }
385 },
386 |e| matches!(e, DeribitWsError::Send(_)),
387 DeribitWsError::Timeout,
388 )
389 .await
390 } else {
391 Err(DeribitWsError::NotConnected)
392 }
393 }
394
395 /// Handles a subscribe command.
396 ///
397 /// Note: The client has already called `mark_subscribe` before sending this command.
398 async fn handle_subscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
399 let request_id = self.next_request_id();
400
401 // Track this request for response correlation
402 self.pending_requests.insert(
403 request_id,
404 PendingRequestType::Subscribe {
405 channels: channels.clone(),
406 },
407 );
408
409 let request = DeribitJsonRpcRequest::new(
410 request_id,
411 "public/subscribe",
412 DeribitSubscribeParams {
413 channels: channels.clone(),
414 },
415 );
416
417 let payload =
418 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
419
420 log::debug!("Subscribing to channels: request_id={request_id}, channels={channels:?}");
421 self.send_with_retry(payload, None).await
422 }
423
424 /// Handles an unsubscribe command.
425 async fn handle_unsubscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
426 let request_id = self.next_request_id();
427
428 // Track this request for response correlation
429 self.pending_requests.insert(
430 request_id,
431 PendingRequestType::Unsubscribe {
432 channels: channels.clone(),
433 },
434 );
435
436 let request = DeribitJsonRpcRequest::new(
437 request_id,
438 "public/unsubscribe",
439 DeribitSubscribeParams {
440 channels: channels.clone(),
441 },
442 );
443
444 let payload =
445 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
446
447 log::debug!("Unsubscribing from channels: request_id={request_id}, channels={channels:?}");
448 self.send_with_retry(payload, None).await
449 }
450
451 /// Handles enabling heartbeat.
452 async fn handle_set_heartbeat(&mut self, interval: u64) -> Result<(), DeribitWsError> {
453 let request_id = self.next_request_id();
454
455 // Track this request for response correlation
456 self.pending_requests
457 .insert(request_id, PendingRequestType::SetHeartbeat);
458
459 let request = DeribitJsonRpcRequest::new(
460 request_id,
461 "public/set_heartbeat",
462 DeribitHeartbeatParams { interval },
463 );
464
465 let payload =
466 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
467
468 log::debug!(
469 "Enabling heartbeat with interval: request_id={request_id}, interval={interval} seconds"
470 );
471 self.send_with_retry(payload, None).await
472 }
473
474 /// Responds to a heartbeat test_request.
475 async fn handle_heartbeat_test_request(&mut self) -> Result<(), DeribitWsError> {
476 let request_id = self.next_request_id();
477
478 // Track this request for response correlation
479 self.pending_requests
480 .insert(request_id, PendingRequestType::Test);
481
482 let request = DeribitJsonRpcRequest::new(request_id, "public/test", serde_json::json!({}));
483
484 let payload =
485 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
486
487 log::trace!("Responding to heartbeat test_request: request_id={request_id}");
488 self.send_with_retry(payload, None).await
489 }
490
491 /// Handles a buy order command.
492 async fn handle_buy(
493 &mut self,
494 params: DeribitOrderParams,
495 client_order_id: ClientOrderId,
496 trader_id: TraderId,
497 strategy_id: StrategyId,
498 instrument_id: InstrumentId,
499 ) -> Result<(), DeribitWsError> {
500 let request_id = self.next_request_id();
501
502 self.pending_requests.insert(
503 request_id,
504 PendingRequestType::Buy {
505 client_order_id,
506 trader_id,
507 strategy_id,
508 instrument_id,
509 },
510 );
511
512 let request = DeribitJsonRpcRequest::new(request_id, "private/buy", params);
513
514 let payload =
515 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
516
517 log::debug!("Sending buy order: request_id={request_id}");
518 self.send_with_retry(payload, Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()))
519 .await
520 }
521
522 /// Handles a sell order command.
523 async fn handle_sell(
524 &mut self,
525 params: DeribitOrderParams,
526 client_order_id: ClientOrderId,
527 trader_id: TraderId,
528 strategy_id: StrategyId,
529 instrument_id: InstrumentId,
530 ) -> Result<(), DeribitWsError> {
531 let request_id = self.next_request_id();
532
533 self.pending_requests.insert(
534 request_id,
535 PendingRequestType::Sell {
536 client_order_id,
537 trader_id,
538 strategy_id,
539 instrument_id,
540 },
541 );
542
543 let request = DeribitJsonRpcRequest::new(request_id, "private/sell", params);
544
545 let payload =
546 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
547
548 log::debug!("Sending sell order: request_id={request_id}");
549 self.send_with_retry(payload, Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()))
550 .await
551 }
552
553 /// Handles an edit order command.
554 async fn handle_edit(
555 &mut self,
556 params: DeribitEditParams,
557 client_order_id: ClientOrderId,
558 trader_id: TraderId,
559 strategy_id: StrategyId,
560 instrument_id: InstrumentId,
561 ) -> Result<(), DeribitWsError> {
562 let request_id = self.next_request_id();
563 let order_id = params.order_id.clone();
564
565 self.pending_requests.insert(
566 request_id,
567 PendingRequestType::Edit {
568 client_order_id,
569 trader_id,
570 strategy_id,
571 instrument_id,
572 },
573 );
574
575 let request = DeribitJsonRpcRequest::new(request_id, "private/edit", params);
576
577 let payload =
578 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
579
580 log::debug!("Sending edit order: request_id={request_id}, order_id={order_id}");
581 self.send_with_retry(payload, Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()))
582 .await
583 }
584
585 /// Handles a cancel order command.
586 async fn handle_cancel(
587 &mut self,
588 params: DeribitCancelParams,
589 client_order_id: ClientOrderId,
590 trader_id: TraderId,
591 strategy_id: StrategyId,
592 instrument_id: InstrumentId,
593 ) -> Result<(), DeribitWsError> {
594 let request_id = self.next_request_id();
595 let order_id = params.order_id.clone();
596
597 self.pending_requests.insert(
598 request_id,
599 PendingRequestType::Cancel {
600 client_order_id,
601 trader_id,
602 strategy_id,
603 instrument_id,
604 },
605 );
606
607 let request = DeribitJsonRpcRequest::new(request_id, "private/cancel", params);
608
609 let payload =
610 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
611
612 log::debug!("Sending cancel order: request_id={request_id}, order_id={order_id}");
613 self.send_with_retry(payload, Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()))
614 .await
615 }
616
617 /// Handles cancel all orders by instrument command.
618 async fn handle_cancel_all_by_instrument(
619 &mut self,
620 params: DeribitCancelAllByInstrumentParams,
621 instrument_id: InstrumentId,
622 ) -> Result<(), DeribitWsError> {
623 let request_id = self.next_request_id();
624 let instrument_name = params.instrument_name.clone();
625
626 // Track this request for response correlation
627 self.pending_requests.insert(
628 request_id,
629 PendingRequestType::CancelAllByInstrument { instrument_id },
630 );
631
632 let request =
633 DeribitJsonRpcRequest::new(request_id, "private/cancel_all_by_instrument", params);
634
635 let payload =
636 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
637
638 log::debug!(
639 "Sending cancel_all_by_instrument: request_id={request_id}, instrument={instrument_name}"
640 );
641 self.send_with_retry(payload, Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()))
642 .await
643 }
644
645 /// Handles get order state command.
646 async fn handle_get_order_state(
647 &mut self,
648 order_id: String,
649 client_order_id: ClientOrderId,
650 trader_id: TraderId,
651 strategy_id: StrategyId,
652 instrument_id: InstrumentId,
653 ) -> Result<(), DeribitWsError> {
654 let request_id = self.next_request_id();
655
656 // Track this request for response correlation
657 self.pending_requests.insert(
658 request_id,
659 PendingRequestType::GetOrderState {
660 client_order_id,
661 trader_id,
662 strategy_id,
663 instrument_id,
664 },
665 );
666
667 let params = serde_json::json!({
668 "order_id": order_id
669 });
670
671 let request = DeribitJsonRpcRequest::new(request_id, "private/get_order_state", params);
672
673 let payload =
674 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
675
676 log::debug!("Sending get_order_state: request_id={request_id}, order_id={order_id}");
677 self.send_with_retry(payload, Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()))
678 .await
679 }
680
681 /// Processes a command from the client.
682 async fn process_command(&mut self, cmd: HandlerCommand) {
683 match cmd {
684 HandlerCommand::SetClient(client) => {
685 log::debug!("Setting WebSocket client");
686 self.inner = Some(client);
687 }
688 HandlerCommand::Disconnect => {
689 log::debug!("Disconnecting WebSocket");
690 if let Some(client) = self.inner.take() {
691 client.disconnect().await;
692 }
693 }
694 HandlerCommand::Authenticate { auth_params } => {
695 let request_id = self.next_request_id();
696 log::debug!("Authenticating: request_id={request_id}");
697
698 // Track this request for response correlation
699 self.pending_requests
700 .insert(request_id, PendingRequestType::Authenticate);
701
702 let request = DeribitJsonRpcRequest::new(request_id, "public/auth", auth_params);
703 match serde_json::to_string(&request) {
704 Ok(payload) => {
705 if let Err(e) = self.send_with_retry(payload, None).await {
706 log::error!("Authentication send failed: {e}");
707 self.auth_tracker.fail(format!("Send failed: {e}"));
708 }
709 }
710 Err(e) => {
711 log::error!("Failed to serialize auth request: {e}");
712 self.auth_tracker.fail(format!("Serialization failed: {e}"));
713 }
714 }
715 }
716 HandlerCommand::SetHeartbeat { interval } => {
717 if let Err(e) = self.handle_set_heartbeat(interval).await {
718 log::error!("Set heartbeat failed: {e}");
719 }
720 }
721 HandlerCommand::InitializeInstruments(instruments) => {
722 log::info!("Handler received {} instruments", instruments.len());
723 self.instruments_cache.clear();
724 for inst in instruments {
725 self.instruments_cache
726 .insert(inst.raw_symbol().inner(), inst);
727 }
728 }
729 HandlerCommand::UpdateInstrument(instrument) => {
730 log::trace!("Updating instrument: {}", instrument.raw_symbol());
731 self.instruments_cache
732 .insert(instrument.raw_symbol().inner(), *instrument);
733 }
734 HandlerCommand::Subscribe { channels } => {
735 if let Err(e) = self.handle_subscribe(channels).await {
736 log::error!("Subscribe failed: {e}");
737 }
738 }
739 HandlerCommand::Unsubscribe { channels } => {
740 if let Err(e) = self.handle_unsubscribe(channels).await {
741 log::error!("Unsubscribe failed: {e}");
742 }
743 }
744 HandlerCommand::Buy {
745 params,
746 client_order_id,
747 trader_id,
748 strategy_id,
749 instrument_id,
750 } => {
751 if let Err(e) = self
752 .handle_buy(
753 params,
754 client_order_id,
755 trader_id,
756 strategy_id,
757 instrument_id,
758 )
759 .await
760 {
761 log::error!("Buy order failed: {e}");
762 }
763 }
764 HandlerCommand::Sell {
765 params,
766 client_order_id,
767 trader_id,
768 strategy_id,
769 instrument_id,
770 } => {
771 if let Err(e) = self
772 .handle_sell(
773 params,
774 client_order_id,
775 trader_id,
776 strategy_id,
777 instrument_id,
778 )
779 .await
780 {
781 log::error!("Sell order failed: {e}");
782 }
783 }
784 HandlerCommand::Edit {
785 params,
786 client_order_id,
787 trader_id,
788 strategy_id,
789 instrument_id,
790 } => {
791 if let Err(e) = self
792 .handle_edit(
793 params,
794 client_order_id,
795 trader_id,
796 strategy_id,
797 instrument_id,
798 )
799 .await
800 {
801 log::error!("Edit order failed: {e}");
802 }
803 }
804 HandlerCommand::Cancel {
805 params,
806 client_order_id,
807 trader_id,
808 strategy_id,
809 instrument_id,
810 } => {
811 if let Err(e) = self
812 .handle_cancel(
813 params,
814 client_order_id,
815 trader_id,
816 strategy_id,
817 instrument_id,
818 )
819 .await
820 {
821 log::error!("Cancel order failed: {e}");
822 }
823 }
824 HandlerCommand::CancelAllByInstrument {
825 params,
826 instrument_id,
827 } => {
828 if let Err(e) = self
829 .handle_cancel_all_by_instrument(params, instrument_id)
830 .await
831 {
832 log::error!("Cancel all by instrument failed: {e}");
833 }
834 }
835 HandlerCommand::GetOrderState {
836 order_id,
837 client_order_id,
838 trader_id,
839 strategy_id,
840 instrument_id,
841 } => {
842 if let Err(e) = self
843 .handle_get_order_state(
844 order_id,
845 client_order_id,
846 trader_id,
847 strategy_id,
848 instrument_id,
849 )
850 .await
851 {
852 log::error!("Get order state failed: {e}");
853 }
854 }
855 }
856 }
857
858 /// Processes a raw WebSocket message.
859 async fn process_raw_message(&mut self, text: &str) -> Option<NautilusWsMessage> {
860 if text == RECONNECTED {
861 log::info!("Received reconnection signal");
862
863 self.clear_state();
864
865 return Some(NautilusWsMessage::Reconnected);
866 }
867
868 // Parse the JSON-RPC message
869 let ws_msg = match parse_raw_message(text) {
870 Ok(msg) => msg,
871 Err(e) => {
872 log::warn!("Failed to parse message: {e}");
873 return None;
874 }
875 };
876
877 let ts_init = self.ts_init();
878
879 match ws_msg {
880 DeribitWsMessage::Response(response) => {
881 // Look up the request type by ID for explicit correlation
882 if let Some(request_id) = response.id
883 && let Some(request_type) = self.pending_requests.remove(&request_id)
884 {
885 match request_type {
886 PendingRequestType::Authenticate => {
887 if let Some(error) = &response.error {
888 log::error!(
889 "Authentication failed: code={}, message={}, request_id={}",
890 error.code,
891 error.message,
892 request_id
893 );
894 self.auth_tracker.fail(format!(
895 "Authentication error code={}: {}",
896 error.code, error.message
897 ));
898 } else if let Some(result) = &response.result {
899 match serde_json::from_value::<DeribitAuthResult>(result.clone()) {
900 Ok(auth_result) => {
901 self.auth_tracker.succeed();
902 log::debug!(
903 "WebSocket authenticated successfully (request_id={}, scope={}, expires_in={}s)",
904 request_id,
905 auth_result.scope,
906 auth_result.expires_in
907 );
908 return Some(NautilusWsMessage::Authenticated(Box::new(
909 auth_result,
910 )));
911 }
912 Err(e) => {
913 log::error!(
914 "Failed to parse auth result: request_id={request_id}, error={e}"
915 );
916 self.auth_tracker
917 .fail(format!("Failed to parse auth result: {e}"));
918 }
919 }
920 }
921 }
922 PendingRequestType::Subscribe { channels } => {
923 if let Some(error) = &response.error {
924 log::error!(
925 "Subscribe failed: code={}, message={}, channels={:?}, request_id={}",
926 error.code,
927 error.message,
928 channels,
929 request_id
930 );
931 // Mark channels as failed so they can be retried
932 for ch in &channels {
933 self.subscriptions_state.confirm_unsubscribe(ch);
934 }
935 } else {
936 // Confirm each channel in the subscription
937 for ch in &channels {
938 self.subscriptions_state.confirm_subscribe(ch);
939 log::debug!("Subscription confirmed: {ch}");
940 }
941 }
942 }
943 PendingRequestType::Unsubscribe { channels } => {
944 if let Some(error) = &response.error {
945 log::error!(
946 "Unsubscribe failed: code={}, message={}, channels={:?}, request_id={}",
947 error.code,
948 error.message,
949 channels,
950 request_id
951 );
952 } else {
953 // Confirm each channel in the unsubscription
954 for ch in &channels {
955 self.subscriptions_state.confirm_unsubscribe(ch);
956 log::debug!("Unsubscription confirmed: {ch}");
957 }
958 }
959 }
960 PendingRequestType::SetHeartbeat => {
961 if let Some(error) = &response.error {
962 log::error!(
963 "Set heartbeat failed: code={}, message={}, request_id={}",
964 error.code,
965 error.message,
966 request_id
967 );
968 } else {
969 log::debug!("Heartbeat enabled (request_id={request_id})");
970 }
971 }
972 PendingRequestType::Test => {
973 if let Some(error) = &response.error {
974 log::warn!(
975 "Heartbeat test failed: code={}, message={}, request_id={}",
976 error.code,
977 error.message,
978 request_id
979 );
980 } else {
981 log::trace!(
982 "Heartbeat test acknowledged (request_id={request_id})"
983 );
984 }
985 }
986 PendingRequestType::Cancel {
987 client_order_id,
988 trader_id,
989 strategy_id,
990 instrument_id,
991 } => {
992 if let Some(result) = &response.result {
993 match serde_json::from_value::<DeribitOrderMsg>(result.clone()) {
994 Ok(order_msg) => {
995 // Cancel confirmed - don't emit or remove context here.
996 // Let user.orders stream handle the cancel event to avoid
997 // duplicates. The stream will use the context for correct
998 // trader/strategy IDs and then remove it.
999 log::debug!(
1000 "Cancel confirmed: venue_order_id={}, client_order_id={}, state={} (waiting for user.orders)",
1001 order_msg.order_id,
1002 client_order_id,
1003 order_msg.order_state
1004 );
1005 }
1006 Err(e) => {
1007 log::error!(
1008 "Failed to parse cancel response: request_id={request_id}, error={e}"
1009 );
1010 }
1011 }
1012 } else if let Some(error) = &response.error {
1013 log::error!(
1014 "Cancel rejected: code={}, message={}, client_order_id={}",
1015 error.code,
1016 error.message,
1017 client_order_id
1018 );
1019 return Some(NautilusWsMessage::OrderCancelRejected(
1020 OrderCancelRejected::new(
1021 trader_id,
1022 strategy_id,
1023 instrument_id,
1024 client_order_id,
1025 ustr::ustr(&format!(
1026 "code={}: {}",
1027 error.code, error.message
1028 )),
1029 UUID4::new(),
1030 ts_init,
1031 ts_init,
1032 false,
1033 None, // venue_order_id not available in error response
1034 self.account_id,
1035 ),
1036 ));
1037 }
1038 }
1039 PendingRequestType::CancelAllByInstrument { instrument_id } => {
1040 if let Some(result) = &response.result {
1041 match serde_json::from_value::<u64>(result.clone()) {
1042 Ok(count) => {
1043 log::info!(
1044 "Cancelled {count} orders for instrument {instrument_id}"
1045 );
1046 // Individual order status updates come via user.orders subscription
1047 }
1048 Err(e) => {
1049 log::warn!("Failed to parse cancel_all response: {e}");
1050 }
1051 }
1052 } else if let Some(error) = &response.error {
1053 log::error!(
1054 "Cancel all by instrument rejected: code={}, message={}, instrument_id={}",
1055 error.code,
1056 error.message,
1057 instrument_id
1058 );
1059 }
1060 }
1061 PendingRequestType::Buy {
1062 client_order_id,
1063 trader_id,
1064 strategy_id,
1065 instrument_id,
1066 }
1067 | PendingRequestType::Sell {
1068 client_order_id,
1069 trader_id,
1070 strategy_id,
1071 instrument_id,
1072 } => {
1073 if let Some(result) = &response.result {
1074 match serde_json::from_value::<DeribitOrderResponse>(result.clone())
1075 {
1076 Ok(order_response) => {
1077 let venue_order_id_str = &order_response.order.order_id;
1078 let venue_order_id =
1079 VenueOrderId::new(venue_order_id_str.as_str());
1080 let order_state = &order_response.order.order_state;
1081 log::debug!(
1082 "Order response: venue_order_id={venue_order_id}, client_order_id={client_order_id}, state={order_state}"
1083 );
1084
1085 self.order_contexts.insert(
1086 venue_order_id,
1087 OrderContext {
1088 client_order_id,
1089 trader_id,
1090 strategy_id,
1091 instrument_id,
1092 },
1093 );
1094
1095 // Skip OrderAccepted if order already reached terminal state
1096 if self.terminal_orders.contains(&client_order_id) {
1097 log::debug!(
1098 "Skipping OrderAccepted for terminal order: client_order_id={client_order_id}"
1099 );
1100 self.emitted_accepted.add(venue_order_id);
1101 } else if order_state == "filled" {
1102 // Order went directly Submitted -> Filled (e.g., market orders)
1103 log::debug!(
1104 "Skipping OrderAccepted for already filled order: venue_order_id={venue_order_id}, client_order_id={client_order_id}"
1105 );
1106 self.terminal_orders.add(client_order_id);
1107 self.emitted_accepted.add(venue_order_id);
1108 } else {
1109 let instrument_name_ustr = Ustr::from(
1110 order_response.order.instrument_name.as_str(),
1111 );
1112 if let Some(instrument) =
1113 self.instruments_cache.get(&instrument_name_ustr)
1114 {
1115 if let Some(account_id) = self.account_id {
1116 let event = parse_order_accepted(
1117 &order_response.order,
1118 instrument,
1119 account_id,
1120 trader_id,
1121 strategy_id,
1122 ts_init,
1123 );
1124 // Mark OrderAccepted as emitted to prevent duplicate from subscription
1125 self.emitted_accepted.add(venue_order_id);
1126 return Some(NautilusWsMessage::OrderAccepted(
1127 event,
1128 ));
1129 } else {
1130 log::warn!(
1131 "Cannot create OrderAccepted: account_id not set"
1132 );
1133 }
1134 } else {
1135 log::warn!(
1136 "Instrument {instrument_name_ustr} not found in cache for order response"
1137 );
1138 }
1139 }
1140 }
1141 Err(e) => {
1142 log::error!(
1143 "Failed to parse order response: request_id={request_id}, error={e}"
1144 );
1145 return Some(NautilusWsMessage::OrderRejected(
1146 OrderRejected::new(
1147 trader_id,
1148 strategy_id,
1149 instrument_id,
1150 client_order_id,
1151 self.account_id
1152 .unwrap_or(AccountId::new("DERIBIT-UNKNOWN")),
1153 ustr::ustr(&format!(
1154 "Failed to parse response: {e}"
1155 )),
1156 UUID4::new(),
1157 ts_init,
1158 ts_init,
1159 false,
1160 false,
1161 ),
1162 ));
1163 }
1164 }
1165 } else if let Some(error) = &response.error {
1166 let due_post_only = error.code == DERIBIT_POST_ONLY_ERROR_CODE;
1167 let reason = if let Some(data) = &error.data {
1168 format!(
1169 "code={}: {} (data: {})",
1170 error.code, error.message, data
1171 )
1172 } else {
1173 format!("code={}: {}", error.code, error.message)
1174 };
1175
1176 log::debug!(
1177 "Order rejected: {reason}, client_order_id={client_order_id}"
1178 );
1179 return Some(NautilusWsMessage::OrderRejected(OrderRejected::new(
1180 trader_id,
1181 strategy_id,
1182 instrument_id,
1183 client_order_id,
1184 self.account_id.unwrap_or(AccountId::new("DERIBIT-UNKNOWN")),
1185 ustr::ustr(&reason),
1186 UUID4::new(),
1187 ts_init,
1188 ts_init,
1189 false,
1190 due_post_only,
1191 )));
1192 }
1193 }
1194 PendingRequestType::Edit {
1195 client_order_id,
1196 trader_id,
1197 strategy_id,
1198 instrument_id,
1199 } => {
1200 if let Some(result) = &response.result {
1201 match serde_json::from_value::<DeribitOrderResponse>(result.clone())
1202 {
1203 Ok(order_response) => {
1204 let venue_order_id =
1205 VenueOrderId::new(&order_response.order.order_id);
1206 log::info!(
1207 "Order updated: venue_order_id={}, client_order_id={}, state={}",
1208 venue_order_id,
1209 client_order_id,
1210 order_response.order.order_state
1211 );
1212
1213 self.order_contexts.insert(
1214 venue_order_id,
1215 OrderContext {
1216 client_order_id,
1217 trader_id,
1218 strategy_id,
1219 instrument_id,
1220 },
1221 );
1222
1223 let instrument_name_ustr = Ustr::from(
1224 order_response.order.instrument_name.as_str(),
1225 );
1226 if let Some(instrument) =
1227 self.instruments_cache.get(&instrument_name_ustr)
1228 {
1229 if let Some(account_id) = self.account_id {
1230 let event = parse_order_updated(
1231 &order_response.order,
1232 instrument,
1233 account_id,
1234 trader_id,
1235 strategy_id,
1236 ts_init,
1237 );
1238 return Some(NautilusWsMessage::OrderUpdated(
1239 event,
1240 ));
1241 } else {
1242 log::warn!(
1243 "Cannot create OrderUpdated: account_id not set"
1244 );
1245 }
1246 } else {
1247 log::warn!(
1248 "Instrument {instrument_name_ustr} not found in cache for edit response"
1249 );
1250 }
1251 }
1252 Err(e) => {
1253 log::error!(
1254 "Failed to parse edit response: request_id={request_id}, error={e}"
1255 );
1256 return Some(NautilusWsMessage::OrderModifyRejected(
1257 OrderModifyRejected::new(
1258 trader_id,
1259 strategy_id,
1260 instrument_id,
1261 client_order_id,
1262 ustr::ustr(&format!(
1263 "Failed to parse response: {e}"
1264 )),
1265 UUID4::new(),
1266 ts_init,
1267 ts_init,
1268 false,
1269 None, // venue_order_id not available
1270 self.account_id,
1271 ),
1272 ));
1273 }
1274 }
1275 } else if let Some(error) = &response.error {
1276 log::error!(
1277 "Order modify rejected: code={}, message={}, client_order_id={}",
1278 error.code,
1279 error.message,
1280 client_order_id
1281 );
1282 return Some(NautilusWsMessage::OrderModifyRejected(
1283 OrderModifyRejected::new(
1284 trader_id,
1285 strategy_id,
1286 instrument_id,
1287 client_order_id,
1288 ustr::ustr(&format!(
1289 "code={}: {}",
1290 error.code, error.message
1291 )),
1292 UUID4::new(),
1293 ts_init,
1294 ts_init,
1295 false,
1296 None, // venue_order_id not available
1297 self.account_id,
1298 ),
1299 ));
1300 }
1301 }
1302 PendingRequestType::GetOrderState {
1303 client_order_id,
1304 trader_id: _,
1305 strategy_id: _,
1306 instrument_id: _,
1307 } => {
1308 if let Some(result) = &response.result {
1309 match serde_json::from_value::<DeribitOrderMsg>(result.clone()) {
1310 Ok(order_msg) => {
1311 log::info!(
1312 "Order state received: venue_order_id={}, client_order_id={}, state={}",
1313 order_msg.order_id,
1314 client_order_id,
1315 order_msg.order_state
1316 );
1317
1318 // Convert to OrderStatusReport
1319 let instrument_name_ustr = order_msg.instrument_name;
1320 if let Some(instrument) =
1321 self.instruments_cache.get(&instrument_name_ustr)
1322 {
1323 if let Some(account_id) = self.account_id {
1324 match parse_user_order_msg(
1325 &order_msg, instrument, account_id, ts_init,
1326 ) {
1327 Ok(report) => {
1328 return Some(
1329 NautilusWsMessage::OrderStatusReports(
1330 vec![report],
1331 ),
1332 );
1333 }
1334 Err(e) => {
1335 log::warn!(
1336 "Failed to parse get_order_state response to report: {e}"
1337 );
1338 }
1339 }
1340 } else {
1341 log::warn!(
1342 "Cannot create OrderStatusReport: account_id not set"
1343 );
1344 }
1345 } else {
1346 log::warn!(
1347 "Instrument {instrument_name_ustr} not found in cache for get_order_state response"
1348 );
1349 }
1350 }
1351 Err(e) => {
1352 log::error!(
1353 "Failed to parse get_order_state response: request_id={request_id}, error={e}"
1354 );
1355 }
1356 }
1357 } else if let Some(error) = &response.error {
1358 log::error!(
1359 "Get order state failed: code={}, message={}, client_order_id={}",
1360 error.code,
1361 error.message,
1362 client_order_id
1363 );
1364 }
1365 }
1366 }
1367 } else if let Some(request_id) = response.id {
1368 // Response with ID but no matching pending request
1369 if let Some(error) = &response.error {
1370 // Log orphaned error response with all available context
1371 log::error!(
1372 "Deribit error for unknown request: code={}, message={}, request_id={}, data={:?}",
1373 error.code,
1374 error.message,
1375 request_id,
1376 error.data
1377 );
1378 return Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
1379 code: error.code,
1380 message: error.message.clone(),
1381 }));
1382 } else {
1383 // Success response but no pending request - likely already processed
1384 log::debug!(
1385 "Received response for unknown request_id={}, result present: {}",
1386 request_id,
1387 response.result.is_some()
1388 );
1389 }
1390 } else if let Some(error) = &response.error {
1391 // Error response with no ID (shouldn't happen in JSON-RPC 2.0, but handle it)
1392 log::error!(
1393 "Deribit error with no request_id: code={}, message={}, data={:?}",
1394 error.code,
1395 error.message,
1396 error.data
1397 );
1398 return Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
1399 code: error.code,
1400 message: error.message.clone(),
1401 }));
1402 }
1403 None
1404 }
1405 DeribitWsMessage::Notification(notification) => {
1406 let channel = ¬ification.params.channel;
1407 let data = ¬ification.params.data;
1408
1409 // Determine channel type and parse accordingly
1410 if let Some(channel_type) = DeribitWsChannel::from_channel_string(channel) {
1411 match channel_type {
1412 DeribitWsChannel::Trades => {
1413 // Parse trade messages
1414 match serde_json::from_value::<Vec<DeribitTradeMsg>>(data.clone()) {
1415 Ok(trades) => {
1416 log::debug!("Received {} trades", trades.len());
1417 let data_vec =
1418 parse_trades_data(trades, &self.instruments_cache, ts_init);
1419 if data_vec.is_empty() {
1420 log::debug!(
1421 "No trades parsed - instrument cache size: {}",
1422 self.instruments_cache.len()
1423 );
1424 } else {
1425 log::debug!("Parsed {} trade ticks", data_vec.len());
1426 return Some(NautilusWsMessage::Data(data_vec));
1427 }
1428 }
1429 Err(e) => {
1430 log::warn!("Failed to deserialize trades: {e}");
1431 }
1432 }
1433 }
1434 DeribitWsChannel::Book => {
1435 // Parse order book messages
1436 match serde_json::from_value::<DeribitBookMsg>(data.clone()) {
1437 Ok(book_msg) => {
1438 if let Some(instrument) =
1439 self.instruments_cache.get(&book_msg.instrument_name)
1440 {
1441 if book_msg.msg_type == DeribitBookMsgType::Change
1442 && let Some(prev_id) = book_msg.prev_change_id
1443 && let Some(&last_id) =
1444 self.book_sequence.get(&book_msg.instrument_name)
1445 && prev_id != last_id
1446 {
1447 log::warn!(
1448 "Book sequence gap for {}: expected prev_change_id={}, was {}",
1449 book_msg.instrument_name,
1450 last_id,
1451 prev_id
1452 );
1453 }
1454 self.book_sequence
1455 .insert(book_msg.instrument_name, book_msg.change_id);
1456
1457 match parse_book_msg(&book_msg, instrument, ts_init) {
1458 Ok(deltas) => {
1459 return Some(NautilusWsMessage::Deltas(deltas));
1460 }
1461 Err(e) => {
1462 log::warn!("Failed to parse book message: {e}");
1463 }
1464 }
1465 } else {
1466 log::warn!(
1467 "Book message received but instrument '{}' not found in cache (cache size: {})",
1468 book_msg.instrument_name,
1469 self.instruments_cache.len()
1470 );
1471 }
1472 }
1473 Err(e) => {
1474 log::warn!(
1475 "Failed to deserialize book message: {e}, channel: {channel}"
1476 );
1477 }
1478 }
1479 }
1480 DeribitWsChannel::Ticker => {
1481 // Parse ticker to emit both MarkPrice and IndexPrice
1482 // When subscribed to either mark_prices or index_prices, we emit both
1483 // as traders typically need both for analysis
1484 if let Ok(ticker_msg) =
1485 serde_json::from_value::<DeribitTickerMsg>(data.clone())
1486 && let Some(instrument) =
1487 self.instruments_cache.get(&ticker_msg.instrument_name)
1488 {
1489 match (
1490 parse_ticker_to_mark_price(&ticker_msg, instrument, ts_init),
1491 parse_ticker_to_index_price(&ticker_msg, instrument, ts_init),
1492 ) {
1493 (Ok(mark_price), Ok(index_price)) => {
1494 return Some(NautilusWsMessage::Data(vec![
1495 Data::MarkPriceUpdate(mark_price),
1496 Data::IndexPriceUpdate(index_price),
1497 ]));
1498 }
1499 (Err(e), _) | (_, Err(e)) => {
1500 log::warn!("Failed to parse ticker prices: {e}");
1501 }
1502 }
1503 }
1504 }
1505 DeribitWsChannel::Perpetual => {
1506 // Parse perpetual channel for funding rate updates
1507 // This channel is dedicated to perpetual instruments and provides
1508 // the interest (funding) rate
1509 match serde_json::from_value::<DeribitPerpetualMsg>(data.clone()) {
1510 Ok(perpetual_msg) => {
1511 // Extract instrument name from channel: perpetual.{instrument}.{interval}
1512 let parts: Vec<&str> = channel.split('.').collect();
1513 if parts.len() >= 2 {
1514 let instrument_name = Ustr::from(parts[1]);
1515 if let Some(instrument) =
1516 self.instruments_cache.get(&instrument_name)
1517 {
1518 let funding_rate = parse_perpetual_to_funding_rate(
1519 &perpetual_msg,
1520 instrument,
1521 ts_init,
1522 );
1523 return Some(NautilusWsMessage::FundingRates(vec![
1524 funding_rate,
1525 ]));
1526 } else {
1527 log::warn!(
1528 "Instrument {} not found in cache (cache size: {})",
1529 instrument_name,
1530 self.instruments_cache.len()
1531 );
1532 }
1533 }
1534 }
1535 Err(e) => {
1536 log::warn!(
1537 "Failed to deserialize perpetual message: {e}, data: {data}"
1538 );
1539 }
1540 }
1541 }
1542 DeribitWsChannel::Quote => {
1543 // Parse quote messages
1544 if let Ok(quote_msg) =
1545 serde_json::from_value::<DeribitQuoteMsg>(data.clone())
1546 && let Some(instrument) =
1547 self.instruments_cache.get("e_msg.instrument_name)
1548 {
1549 match parse_quote_msg("e_msg, instrument, ts_init) {
1550 Ok(quote) => {
1551 return Some(NautilusWsMessage::Data(vec![Data::Quote(
1552 quote,
1553 )]));
1554 }
1555 Err(e) => {
1556 log::warn!("Failed to parse quote message: {e}");
1557 }
1558 }
1559 }
1560 }
1561 DeribitWsChannel::InstrumentState => {
1562 // Parse instrument state lifecycle notifications
1563 match serde_json::from_value::<DeribitInstrumentStateMsg>(data.clone())
1564 {
1565 Ok(state_msg) => {
1566 log::info!(
1567 "Instrument state change: {} -> {} (timestamp: {})",
1568 state_msg.instrument_name,
1569 state_msg.state,
1570 state_msg.timestamp
1571 );
1572 // Return raw data for consumers to handle state changes
1573 // TODO: Optionally emit instrument updates when instrument transitions to 'started'
1574 return Some(NautilusWsMessage::Raw(data.clone()));
1575 }
1576 Err(e) => {
1577 log::warn!("Failed to parse instrument state message: {e}");
1578 }
1579 }
1580 }
1581 DeribitWsChannel::ChartTrades => {
1582 // Parse chart.trades messages into Bar objects using emit-on-next pattern.
1583 // Deribit sends updates for the current bar as it builds. We only emit
1584 // a bar when we receive a bar with a different timestamp, confirming
1585 // the previous bar is closed.
1586 if let Ok(chart_msg) =
1587 serde_json::from_value::<DeribitChartMsg>(data.clone())
1588 {
1589 // Extract instrument and resolution from channel
1590 // Channel format: chart.trades.{instrument}.{resolution}
1591 let parts: Vec<&str> = channel.split('.').collect();
1592 if parts.len() >= 4 {
1593 let instrument_name = Ustr::from(parts[2]);
1594 let resolution = parts[3];
1595
1596 if let Some(instrument) =
1597 self.instruments_cache.get(&instrument_name)
1598 {
1599 let instrument_id = instrument.id();
1600
1601 match resolution_to_bar_type(instrument_id, resolution) {
1602 Ok(bar_type) => {
1603 let price_precision = instrument.price_precision();
1604 let size_precision = instrument.size_precision();
1605
1606 match parse_chart_msg(
1607 &chart_msg,
1608 bar_type,
1609 price_precision,
1610 size_precision,
1611 self.bars_timestamp_on_close,
1612 ts_init,
1613 ) {
1614 Ok(new_bar) => {
1615 // Check if we have a pending bar for this channel
1616 let channel_key = channel.clone();
1617 if let Some(pending_bar) =
1618 self.pending_bars.get(&channel_key)
1619 {
1620 // If new bar has different timestamp, the pending bar is closed
1621 if new_bar.ts_event
1622 != pending_bar.ts_event
1623 {
1624 let closed_bar = *pending_bar;
1625 self.pending_bars
1626 .insert(channel_key, new_bar);
1627 log::debug!(
1628 "Emitting closed bar: {closed_bar:?}"
1629 );
1630 return Some(
1631 NautilusWsMessage::Data(vec![
1632 Data::Bar(closed_bar),
1633 ]),
1634 );
1635 }
1636 // Same timestamp - update pending bar with latest values
1637 self.pending_bars
1638 .insert(channel_key, new_bar);
1639 } else {
1640 // First bar for this channel - store as pending
1641 self.pending_bars
1642 .insert(channel_key, new_bar);
1643 }
1644 }
1645 Err(e) => {
1646 log::warn!(
1647 "Failed to parse chart message to bar: {e}"
1648 );
1649 }
1650 }
1651 }
1652 Err(e) => {
1653 log::warn!(
1654 "Failed to create BarType from resolution {resolution}: {e}"
1655 );
1656 }
1657 }
1658 } else {
1659 log::warn!(
1660 "Instrument {instrument_name} not found in cache for chart data"
1661 );
1662 }
1663 }
1664 }
1665 }
1666 DeribitWsChannel::UserOrders => {
1667 // Handle both array and single object responses
1668 let orders_result =
1669 serde_json::from_value::<Vec<DeribitOrderMsg>>(data.clone())
1670 .or_else(|_| {
1671 serde_json::from_value::<DeribitOrderMsg>(data.clone())
1672 .map(|order| vec![order])
1673 });
1674
1675 match orders_result {
1676 Ok(orders) => {
1677 log::debug!("Received {} user order updates", orders.len());
1678
1679 // Require account_id for parsing
1680 let Some(account_id) = self.account_id else {
1681 log::warn!("Cannot parse user orders: account_id not set");
1682 return Some(NautilusWsMessage::Raw(data.clone()));
1683 };
1684
1685 let mut outgoing = Vec::new();
1686
1687 // Process each order and emit appropriate events
1688 for order in &orders {
1689 let venue_order_id_str = &order.order_id;
1690 let venue_order_id =
1691 VenueOrderId::new(venue_order_id_str.as_str());
1692 let instrument_name = order.instrument_name;
1693
1694 let Some(instrument) =
1695 self.instruments_cache.get(&instrument_name)
1696 else {
1697 log::warn!(
1698 "Instrument {instrument_name} not found in cache"
1699 );
1700 continue;
1701 };
1702
1703 // Look up OrderContext for this order
1704 // First check order_contexts (for orders whose response has been processed)
1705 // Then check pending_requests (for orders whose response hasn't arrived yet)
1706 // If neither found, this is a true external order
1707 let context =
1708 self.order_contexts.get(&venue_order_id).cloned();
1709
1710 // Extract client_order_id from order label for pending check
1711 let label_client_order_id = order
1712 .label
1713 .as_ref()
1714 .filter(|l| !l.is_empty())
1715 .map(ClientOrderId::new);
1716
1717 // Check for pending request if not in order_contexts
1718 let pending_context = if context.is_none() {
1719 if let Some(client_id) = &label_client_order_id {
1720 self.get_pending_order_context(client_id)
1721 } else {
1722 None
1723 }
1724 } else {
1725 None
1726 };
1727
1728 // Check if order has a pending request for context resolution
1729 let has_pending_request =
1730 if let Some(client_id) = &label_client_order_id {
1731 self.is_pending_order(client_id)
1732 } else {
1733 false
1734 };
1735
1736 let effective_context = context.or(pending_context);
1737 let is_known_order =
1738 effective_context.is_some() || has_pending_request;
1739
1740 // Determine event type based on order state
1741 let event_type = determine_order_event_type(
1742 &order.order_state,
1743 !is_known_order, // is_new if we don't know about it
1744 false, // not from edit response
1745 );
1746
1747 let (trader_id, strategy_id, client_order_id) =
1748 if let Some(ctx) = effective_context {
1749 (
1750 ctx.trader_id,
1751 ctx.strategy_id,
1752 ctx.client_order_id,
1753 )
1754 } else {
1755 // External order - use default values
1756 // Note: These won't match any strategy, which is correct
1757 (
1758 TraderId::new("EXTERNAL-000"),
1759 StrategyId::new("EXTERNAL"),
1760 ClientOrderId::new(venue_order_id_str),
1761 )
1762 };
1763
1764 match event_type {
1765 OrderEventType::Accepted => {
1766 // Skip if order already reached terminal state (race condition)
1767 if self.terminal_orders.contains(&client_order_id) {
1768 log::debug!(
1769 "Skipping OrderAccepted for terminal order: client_order_id={client_order_id}"
1770 );
1771 continue;
1772 }
1773
1774 // Check if we already emitted OrderAccepted for this order
1775 // This prevents duplicates from both response and subscription paths
1776 if self.emitted_accepted.contains(&venue_order_id) {
1777 log::trace!(
1778 "Skipping duplicate OrderAccepted: venue_order_id={venue_order_id}"
1779 );
1780 continue;
1781 }
1782
1783 let event = parse_order_accepted(
1784 order,
1785 instrument,
1786 account_id,
1787 trader_id,
1788 strategy_id,
1789 ts_init,
1790 );
1791
1792 // Mark OrderAccepted as emitted
1793 self.emitted_accepted.add(venue_order_id);
1794
1795 log::debug!(
1796 "Emitting OrderAccepted: venue_order_id={venue_order_id}, is_known={is_known_order}"
1797 );
1798 outgoing
1799 .push(NautilusWsMessage::OrderAccepted(event));
1800 }
1801 OrderEventType::Canceled => {
1802 let event = parse_order_canceled(
1803 order,
1804 instrument,
1805 account_id,
1806 trader_id,
1807 strategy_id,
1808 ts_init,
1809 );
1810 log::debug!(
1811 "Emitting OrderCanceled: venue_order_id={venue_order_id}"
1812 );
1813 self.terminal_orders.add(client_order_id);
1814 self.order_contexts.remove(&venue_order_id);
1815 self.emitted_accepted.remove(&venue_order_id);
1816 outgoing
1817 .push(NautilusWsMessage::OrderCanceled(event));
1818 }
1819 OrderEventType::Expired => {
1820 let event = parse_order_expired(
1821 order,
1822 instrument,
1823 account_id,
1824 trader_id,
1825 strategy_id,
1826 ts_init,
1827 );
1828 log::debug!(
1829 "Emitting OrderExpired: venue_order_id={venue_order_id}"
1830 );
1831 self.terminal_orders.add(client_order_id);
1832 self.order_contexts.remove(&venue_order_id);
1833 self.emitted_accepted.remove(&venue_order_id);
1834 outgoing
1835 .push(NautilusWsMessage::OrderExpired(event));
1836 }
1837 OrderEventType::Updated => {
1838 // Emit OrderStatusReport for updates
1839 // This includes quantity/price changes from modify
1840 match parse_user_order_msg(
1841 order, instrument, account_id, ts_init,
1842 ) {
1843 Ok(report) => {
1844 log::debug!(
1845 "Emitting OrderStatusReport (updated): venue_order_id={venue_order_id}"
1846 );
1847 outgoing.push(
1848 NautilusWsMessage::OrderStatusReports(
1849 vec![report],
1850 ),
1851 );
1852 }
1853 Err(e) => {
1854 log::warn!(
1855 "Failed to parse order update: {e}"
1856 );
1857 }
1858 }
1859 }
1860 OrderEventType::None => {
1861 // Fills handled via user.trades, track terminal state
1862 // for race condition prevention
1863 if matches!(
1864 order.order_state.as_str(),
1865 "filled" | "rejected"
1866 ) {
1867 log::debug!(
1868 "Recording terminal order: venue_order_id={venue_order_id}, state={}",
1869 order.order_state
1870 );
1871 self.terminal_orders.add(client_order_id);
1872 self.order_contexts.remove(&venue_order_id);
1873 self.emitted_accepted.remove(&venue_order_id);
1874 } else {
1875 log::trace!(
1876 "No event to emit for order {}, state={}",
1877 venue_order_id,
1878 order.order_state
1879 );
1880 }
1881 }
1882 }
1883 }
1884
1885 if !outgoing.is_empty() {
1886 self.pending_outgoing.extend(outgoing);
1887 }
1888 }
1889 Err(e) => {
1890 log::warn!("Failed to deserialize user orders: {e}");
1891 }
1892 }
1893 }
1894 DeribitWsChannel::UserTrades => {
1895 // Handle both array and single object responses
1896 let trades_result =
1897 serde_json::from_value::<Vec<DeribitUserTradeMsg>>(data.clone())
1898 .or_else(|_| {
1899 serde_json::from_value::<DeribitUserTradeMsg>(data.clone())
1900 .map(|trade| vec![trade])
1901 });
1902
1903 match trades_result {
1904 Ok(trades) => {
1905 log::debug!("Received {} user trade updates", trades.len());
1906
1907 let Some(account_id) = self.account_id else {
1908 log::warn!("Cannot parse user trades: account_id not set");
1909 return Some(NautilusWsMessage::Raw(data.clone()));
1910 };
1911
1912 let mut reports = Vec::with_capacity(trades.len());
1913 for trade in &trades {
1914 let instrument_name = trade.instrument_name;
1915 if let Some(instrument) =
1916 self.instruments_cache.get(&instrument_name)
1917 {
1918 match parse_user_trade_msg(
1919 trade, instrument, account_id, ts_init,
1920 ) {
1921 Ok(report) => {
1922 log::debug!(
1923 "Parsed fill report: {} @ {}",
1924 report.trade_id,
1925 report.last_px
1926 );
1927 reports.push(report);
1928 }
1929 Err(e) => {
1930 log::warn!(
1931 "Failed to parse trade {}: {e}",
1932 trade.trade_id
1933 );
1934 }
1935 }
1936 } else {
1937 log::warn!(
1938 "Instrument {instrument_name} not found in cache"
1939 );
1940 }
1941 }
1942
1943 if !reports.is_empty() {
1944 return Some(NautilusWsMessage::FillReports(reports));
1945 }
1946 }
1947 Err(e) => {
1948 log::warn!("Failed to deserialize user trades: {e}");
1949 }
1950 }
1951 }
1952 DeribitWsChannel::UserPortfolio => {
1953 match serde_json::from_value::<DeribitPortfolioMsg>(data.clone()) {
1954 Ok(portfolio) => {
1955 // Skip zero-balance currencies (common with cross-collateral)
1956 // Only check equity and balance - initial_margin can be non-zero
1957 // for all currencies when cross-collateral is enabled
1958 if portfolio.equity.is_zero() && portfolio.balance.is_zero() {
1959 log::trace!(
1960 "Skipping zero-balance portfolio for {}",
1961 portfolio.currency
1962 );
1963 return None;
1964 }
1965
1966 // Require account_id for parsing
1967 let Some(account_id) = self.account_id else {
1968 log::warn!("Cannot parse portfolio: account_id not set");
1969 return None;
1970 };
1971
1972 match parse_portfolio_to_account_state(
1973 &portfolio, account_id, ts_init,
1974 ) {
1975 Ok(account_state) => {
1976 // Check for duplicate per currency
1977 let currency_key = portfolio.currency.clone();
1978 if let Some(last) =
1979 self.last_account_states.get(¤cy_key)
1980 && account_state.has_same_balances_and_margins(last)
1981 {
1982 log::trace!(
1983 "Skipping duplicate portfolio update for {}",
1984 portfolio.currency
1985 );
1986 return None;
1987 }
1988
1989 self.last_account_states
1990 .insert(currency_key, account_state.clone());
1991 return Some(NautilusWsMessage::AccountState(
1992 account_state,
1993 ));
1994 }
1995 Err(e) => {
1996 log::warn!(
1997 "Failed to parse portfolio to AccountState: {e}"
1998 );
1999 }
2000 }
2001 }
2002 Err(e) => {
2003 log::warn!("Failed to deserialize portfolio: {e}");
2004 }
2005 }
2006 }
2007 _ => {
2008 // Unhandled channel - return raw
2009 log::trace!("Unhandled channel: {channel}");
2010 return Some(NautilusWsMessage::Raw(data.clone()));
2011 }
2012 }
2013 } else {
2014 log::trace!("Unknown channel: {channel}");
2015 return Some(NautilusWsMessage::Raw(data.clone()));
2016 }
2017 None
2018 }
2019 DeribitWsMessage::Heartbeat(heartbeat) => {
2020 match heartbeat.heartbeat_type {
2021 DeribitHeartbeatType::TestRequest => {
2022 log::trace!(
2023 "Received heartbeat test_request - responding with public/test"
2024 );
2025 if let Err(e) = self.handle_heartbeat_test_request().await {
2026 log::error!("Failed to respond to heartbeat test_request: {e}");
2027
2028 // Return error to signal connection may be unhealthy
2029 return Some(NautilusWsMessage::Error(DeribitWsError::Send(format!(
2030 "Heartbeat response failed: {e}"
2031 ))));
2032 }
2033 }
2034 DeribitHeartbeatType::Heartbeat => {
2035 log::trace!("Received heartbeat acknowledgment");
2036 }
2037 }
2038 None
2039 }
2040 DeribitWsMessage::Error(err) => {
2041 log::error!("Deribit error {}: {}", err.code, err.message);
2042 Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
2043 code: err.code,
2044 message: err.message,
2045 }))
2046 }
2047 DeribitWsMessage::Reconnected => Some(NautilusWsMessage::Reconnected),
2048 }
2049 }
2050
2051 /// Main message processing loop.
2052 ///
2053 /// Returns `None` when the handler should stop.
2054 /// Messages that need client-side handling (e.g., Reconnected) are returned.
2055 /// Data messages are sent directly to `out_tx` for the user stream.
2056 pub async fn next(&mut self) -> Option<NautilusWsMessage> {
2057 loop {
2058 if let Some(msg) = self.pending_outgoing.pop_front() {
2059 match msg {
2060 NautilusWsMessage::Reconnected | NautilusWsMessage::Authenticated(_) => {
2061 return Some(msg);
2062 }
2063 _ => {
2064 let _ = self.out_tx.send(msg);
2065 continue;
2066 }
2067 }
2068 }
2069
2070 tokio::select! {
2071 // Process commands from client
2072 Some(cmd) = self.cmd_rx.recv() => {
2073 self.process_command(cmd).await;
2074 }
2075 // Process raw WebSocket messages
2076 Some(msg) = self.raw_rx.recv() => {
2077 match msg {
2078 Message::Text(text) => {
2079 if let Some(nautilus_msg) = self.process_raw_message(&text).await {
2080 // Send data messages to user stream
2081 match &nautilus_msg {
2082 NautilusWsMessage::Data(_)
2083 | NautilusWsMessage::Deltas(_)
2084 | NautilusWsMessage::Instrument(_)
2085 | NautilusWsMessage::Raw(_)
2086 | NautilusWsMessage::Error(_) => {
2087 let _ = self.out_tx.send(nautilus_msg);
2088 }
2089 NautilusWsMessage::FundingRates(rates) => {
2090 let msg_to_send =
2091 NautilusWsMessage::FundingRates(rates.clone());
2092 if let Err(e) = self.out_tx.send(msg_to_send) {
2093 log::error!("Failed to send funding rates: {e}");
2094 }
2095 }
2096 NautilusWsMessage::OrderStatusReports(_)
2097 | NautilusWsMessage::FillReports(_)
2098 | NautilusWsMessage::OrderAccepted(_)
2099 | NautilusWsMessage::OrderCanceled(_)
2100 | NautilusWsMessage::OrderExpired(_)
2101 | NautilusWsMessage::OrderUpdated(_)
2102 | NautilusWsMessage::OrderRejected(_)
2103 | NautilusWsMessage::OrderCancelRejected(_)
2104 | NautilusWsMessage::OrderModifyRejected(_)
2105 | NautilusWsMessage::AccountState(_) => {
2106 let _ = self.out_tx.send(nautilus_msg);
2107 }
2108 // Return messages that need client-side handling
2109 NautilusWsMessage::Reconnected
2110 | NautilusWsMessage::Authenticated(_) => {
2111 return Some(nautilus_msg);
2112 }
2113 }
2114 }
2115 }
2116 Message::Ping(data) => {
2117 // Respond to ping with pong
2118 if let Some(client) = &self.inner {
2119 let _ = client.send_pong(data.to_vec()).await;
2120 }
2121 }
2122 Message::Close(_) => {
2123 log::info!("Received close frame");
2124 }
2125 _ => {}
2126 }
2127 }
2128 // Check for stop signal
2129 () = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
2130 if self.signal.load(Ordering::Relaxed) {
2131 log::debug!("Stop signal received");
2132 return None;
2133 }
2134 }
2135 }
2136 }
2137 }
2138}