Skip to main content

nautilus_deribit/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Deribit.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel.
21
22use std::{
23    collections::VecDeque,
24    sync::{
25        Arc,
26        atomic::{AtomicBool, AtomicU64, Ordering},
27    },
28};
29
30use ahash::AHashMap;
31use nautilus_common::cache::fifo::FifoCache;
32use nautilus_core::{AtomicTime, UUID4, UnixNanos, time::get_atomic_clock_realtime};
33use nautilus_model::{
34    data::{Bar, Data},
35    events::{AccountState, OrderCancelRejected, OrderModifyRejected, OrderRejected},
36    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
37    instruments::{Instrument, InstrumentAny},
38};
39use nautilus_network::{
40    RECONNECTED,
41    retry::{RetryManager, create_websocket_retry_manager},
42    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
43};
44use tokio_tungstenite::tungstenite::Message;
45use ustr::Ustr;
46
47use super::{
48    enums::{DeribitBookMsgType, DeribitHeartbeatType, DeribitWsChannel},
49    error::DeribitWsError,
50    messages::{
51        DeribitAuthResult, DeribitBookMsg, DeribitCancelAllByInstrumentParams, DeribitCancelParams,
52        DeribitChartMsg, DeribitEditParams, DeribitHeartbeatParams, DeribitInstrumentStateMsg,
53        DeribitJsonRpcRequest, DeribitOrderMsg, DeribitOrderParams, DeribitOrderResponse,
54        DeribitPerpetualMsg, DeribitPortfolioMsg, DeribitQuoteMsg, DeribitSubscribeParams,
55        DeribitTickerMsg, DeribitTradeMsg, DeribitUserTradeMsg, DeribitWsMessage,
56        NautilusWsMessage, parse_raw_message,
57    },
58    parse::{
59        OrderEventType, determine_order_event_type, parse_book_msg, parse_chart_msg,
60        parse_order_accepted, parse_order_canceled, parse_order_expired, parse_order_updated,
61        parse_perpetual_to_funding_rate, parse_quote_msg, parse_ticker_to_index_price,
62        parse_ticker_to_mark_price, parse_trades_data, parse_user_order_msg, parse_user_trade_msg,
63        resolution_to_bar_type,
64    },
65};
66use crate::common::{
67    consts::{DERIBIT_POST_ONLY_ERROR_CODE, DERIBIT_RATE_LIMIT_KEY_ORDER},
68    parse::parse_portfolio_to_account_state,
69};
70
71/// Type of pending request for request ID correlation.
72#[derive(Debug, Clone)]
73pub enum PendingRequestType {
74    /// Authentication request.
75    Authenticate,
76    /// Subscribe request with requested channels.
77    Subscribe { channels: Vec<String> },
78    /// Unsubscribe request with requested channels.
79    Unsubscribe { channels: Vec<String> },
80    /// Set heartbeat request.
81    SetHeartbeat,
82    /// Test/ping request (heartbeat response).
83    Test,
84    /// Buy order request.
85    Buy {
86        client_order_id: ClientOrderId,
87        trader_id: TraderId,
88        strategy_id: StrategyId,
89        instrument_id: InstrumentId,
90    },
91    /// Sell order request.
92    Sell {
93        client_order_id: ClientOrderId,
94        trader_id: TraderId,
95        strategy_id: StrategyId,
96        instrument_id: InstrumentId,
97    },
98    /// Edit order request.
99    Edit {
100        client_order_id: ClientOrderId,
101        trader_id: TraderId,
102        strategy_id: StrategyId,
103        instrument_id: InstrumentId,
104    },
105    /// Cancel order request.
106    Cancel {
107        client_order_id: ClientOrderId,
108        trader_id: TraderId,
109        strategy_id: StrategyId,
110        instrument_id: InstrumentId,
111    },
112    /// Cancel all orders by instrument request.
113    CancelAllByInstrument { instrument_id: InstrumentId },
114    /// Get order state request.
115    GetOrderState {
116        client_order_id: ClientOrderId,
117        trader_id: TraderId,
118        strategy_id: StrategyId,
119        instrument_id: InstrumentId,
120    },
121}
122
123/// Commands sent from the client to the handler.
124#[allow(missing_debug_implementations)]
125pub enum HandlerCommand {
126    /// Set the active WebSocket client.
127    SetClient(WebSocketClient),
128    /// Disconnect the WebSocket.
129    Disconnect,
130    /// Authenticate with credentials.
131    Authenticate {
132        /// Serialized auth params (DeribitAuthParams or DeribitRefreshTokenParams).
133        auth_params: serde_json::Value,
134    },
135    /// Enable heartbeat with interval.
136    SetHeartbeat { interval: u64 },
137    /// Initialize the instrument cache.
138    InitializeInstruments(Vec<InstrumentAny>),
139    /// Update a single instrument in the cache.
140    UpdateInstrument(Box<InstrumentAny>),
141    /// Subscribe to channels.
142    Subscribe { channels: Vec<String> },
143    /// Unsubscribe from channels.
144    Unsubscribe { channels: Vec<String> },
145    /// Submit a buy order.
146    Buy {
147        params: DeribitOrderParams,
148        client_order_id: ClientOrderId,
149        trader_id: TraderId,
150        strategy_id: StrategyId,
151        instrument_id: InstrumentId,
152    },
153    /// Submit a sell order.
154    Sell {
155        params: DeribitOrderParams,
156        client_order_id: ClientOrderId,
157        trader_id: TraderId,
158        strategy_id: StrategyId,
159        instrument_id: InstrumentId,
160    },
161    /// Edit an existing order.
162    Edit {
163        params: DeribitEditParams,
164        client_order_id: ClientOrderId,
165        trader_id: TraderId,
166        strategy_id: StrategyId,
167        instrument_id: InstrumentId,
168    },
169    /// Cancel an existing order.
170    Cancel {
171        params: DeribitCancelParams,
172        client_order_id: ClientOrderId,
173        trader_id: TraderId,
174        strategy_id: StrategyId,
175        instrument_id: InstrumentId,
176    },
177    /// Cancel all orders by instrument.
178    CancelAllByInstrument {
179        params: DeribitCancelAllByInstrumentParams,
180        instrument_id: InstrumentId,
181    },
182    /// Get order state.
183    GetOrderState {
184        order_id: String,
185        client_order_id: ClientOrderId,
186        trader_id: TraderId,
187        strategy_id: StrategyId,
188        instrument_id: InstrumentId,
189    },
190}
191
192/// Context for an order submitted via this handler.
193///
194/// Stores the original trader/strategy/client IDs from the buy/sell command
195/// so they can be used when processing user.orders subscription updates.
196#[derive(Debug, Clone)]
197pub struct OrderContext {
198    pub client_order_id: ClientOrderId,
199    pub trader_id: TraderId,
200    pub strategy_id: StrategyId,
201    pub instrument_id: InstrumentId,
202}
203
204/// Deribit WebSocket feed handler.
205///
206/// Runs in a dedicated Tokio task, processing commands and raw WebSocket messages.
207#[allow(missing_debug_implementations)]
208pub struct DeribitWsFeedHandler {
209    clock: &'static AtomicTime,
210    signal: Arc<AtomicBool>,
211    inner: Option<WebSocketClient>,
212    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
213    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
214    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
215    auth_tracker: AuthTracker,
216    subscriptions_state: SubscriptionState,
217    retry_manager: RetryManager<DeribitWsError>,
218    instruments_cache: AHashMap<Ustr, InstrumentAny>,
219    request_id_counter: AtomicU64,
220    pending_requests: AHashMap<u64, PendingRequestType>,
221    account_id: Option<AccountId>,
222    order_contexts: AHashMap<VenueOrderId, OrderContext>,
223    emitted_accepted: FifoCache<VenueOrderId, 10_000>,
224    terminal_orders: FifoCache<ClientOrderId, 10_000>,
225    pending_bars: AHashMap<String, Bar>,
226    bars_timestamp_on_close: bool,
227    last_account_states: AHashMap<String, AccountState>,
228    book_sequence: AHashMap<Ustr, u64>,
229    pending_outgoing: VecDeque<NautilusWsMessage>,
230}
231
232impl DeribitWsFeedHandler {
233    /// Creates a new feed handler.
234    #[allow(clippy::too_many_arguments)]
235    #[must_use]
236    pub fn new(
237        signal: Arc<AtomicBool>,
238        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
239        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
240        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
241        auth_tracker: AuthTracker,
242        subscriptions_state: SubscriptionState,
243        account_id: Option<AccountId>,
244        bars_timestamp_on_close: bool,
245    ) -> Self {
246        Self {
247            clock: get_atomic_clock_realtime(),
248            signal,
249            inner: None,
250            cmd_rx,
251            raw_rx,
252            out_tx,
253            auth_tracker,
254            subscriptions_state,
255            retry_manager: create_websocket_retry_manager(),
256            instruments_cache: AHashMap::new(),
257            request_id_counter: AtomicU64::new(1),
258            pending_requests: AHashMap::new(),
259            account_id,
260            order_contexts: AHashMap::new(),
261            emitted_accepted: FifoCache::new(),
262            terminal_orders: FifoCache::new(),
263            pending_bars: AHashMap::new(),
264            bars_timestamp_on_close,
265            last_account_states: AHashMap::new(),
266            book_sequence: AHashMap::new(),
267            pending_outgoing: VecDeque::new(),
268        }
269    }
270
271    /// Sets the account ID for order/fill reports.
272    pub fn set_account_id(&mut self, account_id: AccountId) {
273        self.account_id = Some(account_id);
274    }
275
276    /// Returns the account ID.
277    #[must_use]
278    pub fn account_id(&self) -> Option<AccountId> {
279        self.account_id
280    }
281
282    fn clear_state(&mut self) {
283        let pending_count = self.pending_requests.len();
284        let emitted_count = self.emitted_accepted.len();
285        let bars_count = self.pending_bars.len();
286        let account_count = self.last_account_states.len();
287        let book_count = self.book_sequence.len();
288        let outgoing_count = self.pending_outgoing.len();
289
290        self.pending_requests.clear();
291        self.emitted_accepted.clear();
292        self.pending_bars.clear();
293        self.last_account_states.clear();
294        self.book_sequence.clear();
295        self.pending_outgoing.clear();
296
297        log::debug!(
298            "Reset state: pending_requests={pending_count}, emitted_accepted={emitted_count}, \
299            pending_bars={bars_count}, account_states={account_count}, book_sequence={book_count}, \
300            pending_outgoing={outgoing_count}"
301        );
302    }
303
304    /// Generates a unique request ID.
305    fn next_request_id(&self) -> u64 {
306        self.request_id_counter.fetch_add(1, Ordering::Relaxed)
307    }
308
309    /// Returns the current timestamp.
310    fn ts_init(&self) -> UnixNanos {
311        self.clock.get_time_ns()
312    }
313
314    /// Checks if there's a pending buy/sell request for the given client_order_id.
315    ///
316    /// This is used to avoid emitting duplicate OrderAccepted events from the
317    /// user.orders subscription when the response path will also emit an event.
318    fn is_pending_order(&self, client_order_id: &ClientOrderId) -> bool {
319        self.pending_requests.values().any(|req| match req {
320            PendingRequestType::Buy {
321                client_order_id: id,
322                ..
323            }
324            | PendingRequestType::Sell {
325                client_order_id: id,
326                ..
327            } => id == client_order_id,
328            _ => false,
329        })
330    }
331
332    /// Gets the OrderContext from a pending buy/sell request by client_order_id.
333    ///
334    /// Returns None if no pending request found.
335    fn get_pending_order_context(&self, client_order_id: &ClientOrderId) -> Option<OrderContext> {
336        for req in self.pending_requests.values() {
337            match req {
338                PendingRequestType::Buy {
339                    client_order_id: id,
340                    trader_id,
341                    strategy_id,
342                    instrument_id,
343                }
344                | PendingRequestType::Sell {
345                    client_order_id: id,
346                    trader_id,
347                    strategy_id,
348                    instrument_id,
349                } => {
350                    if id == client_order_id {
351                        return Some(OrderContext {
352                            client_order_id: *id,
353                            trader_id: *trader_id,
354                            strategy_id: *strategy_id,
355                            instrument_id: *instrument_id,
356                        });
357                    }
358                }
359                _ => {}
360            }
361        }
362        None
363    }
364
365    /// Sends a message over the WebSocket with retry logic.
366    async fn send_with_retry(
367        &self,
368        payload: String,
369        rate_limit_keys: Option<&[Ustr]>,
370    ) -> Result<(), DeribitWsError> {
371        if let Some(client) = &self.inner {
372            let keys_owned: Option<Vec<Ustr>> = rate_limit_keys.map(|k| k.to_vec());
373            self.retry_manager
374                .execute_with_retry(
375                    "websocket_send",
376                    || {
377                        let payload = payload.clone();
378                        let keys = keys_owned.clone();
379                        async move {
380                            client
381                                .send_text(payload, keys.as_deref())
382                                .await
383                                .map_err(|e| DeribitWsError::Send(e.to_string()))
384                        }
385                    },
386                    |e| matches!(e, DeribitWsError::Send(_)),
387                    DeribitWsError::Timeout,
388                )
389                .await
390        } else {
391            Err(DeribitWsError::NotConnected)
392        }
393    }
394
395    /// Handles a subscribe command.
396    ///
397    /// Note: The client has already called `mark_subscribe` before sending this command.
398    async fn handle_subscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
399        let request_id = self.next_request_id();
400
401        // Track this request for response correlation
402        self.pending_requests.insert(
403            request_id,
404            PendingRequestType::Subscribe {
405                channels: channels.clone(),
406            },
407        );
408
409        let request = DeribitJsonRpcRequest::new(
410            request_id,
411            "public/subscribe",
412            DeribitSubscribeParams {
413                channels: channels.clone(),
414            },
415        );
416
417        let payload =
418            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
419
420        log::debug!("Subscribing to channels: request_id={request_id}, channels={channels:?}");
421        self.send_with_retry(payload, None).await
422    }
423
424    /// Handles an unsubscribe command.
425    async fn handle_unsubscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
426        let request_id = self.next_request_id();
427
428        // Track this request for response correlation
429        self.pending_requests.insert(
430            request_id,
431            PendingRequestType::Unsubscribe {
432                channels: channels.clone(),
433            },
434        );
435
436        let request = DeribitJsonRpcRequest::new(
437            request_id,
438            "public/unsubscribe",
439            DeribitSubscribeParams {
440                channels: channels.clone(),
441            },
442        );
443
444        let payload =
445            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
446
447        log::debug!("Unsubscribing from channels: request_id={request_id}, channels={channels:?}");
448        self.send_with_retry(payload, None).await
449    }
450
451    /// Handles enabling heartbeat.
452    async fn handle_set_heartbeat(&mut self, interval: u64) -> Result<(), DeribitWsError> {
453        let request_id = self.next_request_id();
454
455        // Track this request for response correlation
456        self.pending_requests
457            .insert(request_id, PendingRequestType::SetHeartbeat);
458
459        let request = DeribitJsonRpcRequest::new(
460            request_id,
461            "public/set_heartbeat",
462            DeribitHeartbeatParams { interval },
463        );
464
465        let payload =
466            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
467
468        log::debug!(
469            "Enabling heartbeat with interval: request_id={request_id}, interval={interval} seconds"
470        );
471        self.send_with_retry(payload, None).await
472    }
473
474    /// Responds to a heartbeat test_request.
475    async fn handle_heartbeat_test_request(&mut self) -> Result<(), DeribitWsError> {
476        let request_id = self.next_request_id();
477
478        // Track this request for response correlation
479        self.pending_requests
480            .insert(request_id, PendingRequestType::Test);
481
482        let request = DeribitJsonRpcRequest::new(request_id, "public/test", serde_json::json!({}));
483
484        let payload =
485            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
486
487        log::trace!("Responding to heartbeat test_request: request_id={request_id}");
488        self.send_with_retry(payload, None).await
489    }
490
491    /// Handles a buy order command.
492    async fn handle_buy(
493        &mut self,
494        params: DeribitOrderParams,
495        client_order_id: ClientOrderId,
496        trader_id: TraderId,
497        strategy_id: StrategyId,
498        instrument_id: InstrumentId,
499    ) -> Result<(), DeribitWsError> {
500        let request_id = self.next_request_id();
501
502        self.pending_requests.insert(
503            request_id,
504            PendingRequestType::Buy {
505                client_order_id,
506                trader_id,
507                strategy_id,
508                instrument_id,
509            },
510        );
511
512        let request = DeribitJsonRpcRequest::new(request_id, "private/buy", params);
513
514        let payload =
515            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
516
517        log::debug!("Sending buy order: request_id={request_id}");
518        self.send_with_retry(payload, Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()))
519            .await
520    }
521
522    /// Handles a sell order command.
523    async fn handle_sell(
524        &mut self,
525        params: DeribitOrderParams,
526        client_order_id: ClientOrderId,
527        trader_id: TraderId,
528        strategy_id: StrategyId,
529        instrument_id: InstrumentId,
530    ) -> Result<(), DeribitWsError> {
531        let request_id = self.next_request_id();
532
533        self.pending_requests.insert(
534            request_id,
535            PendingRequestType::Sell {
536                client_order_id,
537                trader_id,
538                strategy_id,
539                instrument_id,
540            },
541        );
542
543        let request = DeribitJsonRpcRequest::new(request_id, "private/sell", params);
544
545        let payload =
546            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
547
548        log::debug!("Sending sell order: request_id={request_id}");
549        self.send_with_retry(payload, Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()))
550            .await
551    }
552
553    /// Handles an edit order command.
554    async fn handle_edit(
555        &mut self,
556        params: DeribitEditParams,
557        client_order_id: ClientOrderId,
558        trader_id: TraderId,
559        strategy_id: StrategyId,
560        instrument_id: InstrumentId,
561    ) -> Result<(), DeribitWsError> {
562        let request_id = self.next_request_id();
563        let order_id = params.order_id.clone();
564
565        self.pending_requests.insert(
566            request_id,
567            PendingRequestType::Edit {
568                client_order_id,
569                trader_id,
570                strategy_id,
571                instrument_id,
572            },
573        );
574
575        let request = DeribitJsonRpcRequest::new(request_id, "private/edit", params);
576
577        let payload =
578            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
579
580        log::debug!("Sending edit order: request_id={request_id}, order_id={order_id}");
581        self.send_with_retry(payload, Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()))
582            .await
583    }
584
585    /// Handles a cancel order command.
586    async fn handle_cancel(
587        &mut self,
588        params: DeribitCancelParams,
589        client_order_id: ClientOrderId,
590        trader_id: TraderId,
591        strategy_id: StrategyId,
592        instrument_id: InstrumentId,
593    ) -> Result<(), DeribitWsError> {
594        let request_id = self.next_request_id();
595        let order_id = params.order_id.clone();
596
597        self.pending_requests.insert(
598            request_id,
599            PendingRequestType::Cancel {
600                client_order_id,
601                trader_id,
602                strategy_id,
603                instrument_id,
604            },
605        );
606
607        let request = DeribitJsonRpcRequest::new(request_id, "private/cancel", params);
608
609        let payload =
610            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
611
612        log::debug!("Sending cancel order: request_id={request_id}, order_id={order_id}");
613        self.send_with_retry(payload, Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()))
614            .await
615    }
616
617    /// Handles cancel all orders by instrument command.
618    async fn handle_cancel_all_by_instrument(
619        &mut self,
620        params: DeribitCancelAllByInstrumentParams,
621        instrument_id: InstrumentId,
622    ) -> Result<(), DeribitWsError> {
623        let request_id = self.next_request_id();
624        let instrument_name = params.instrument_name.clone();
625
626        // Track this request for response correlation
627        self.pending_requests.insert(
628            request_id,
629            PendingRequestType::CancelAllByInstrument { instrument_id },
630        );
631
632        let request =
633            DeribitJsonRpcRequest::new(request_id, "private/cancel_all_by_instrument", params);
634
635        let payload =
636            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
637
638        log::debug!(
639            "Sending cancel_all_by_instrument: request_id={request_id}, instrument={instrument_name}"
640        );
641        self.send_with_retry(payload, Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()))
642            .await
643    }
644
645    /// Handles get order state command.
646    async fn handle_get_order_state(
647        &mut self,
648        order_id: String,
649        client_order_id: ClientOrderId,
650        trader_id: TraderId,
651        strategy_id: StrategyId,
652        instrument_id: InstrumentId,
653    ) -> Result<(), DeribitWsError> {
654        let request_id = self.next_request_id();
655
656        // Track this request for response correlation
657        self.pending_requests.insert(
658            request_id,
659            PendingRequestType::GetOrderState {
660                client_order_id,
661                trader_id,
662                strategy_id,
663                instrument_id,
664            },
665        );
666
667        let params = serde_json::json!({
668            "order_id": order_id
669        });
670
671        let request = DeribitJsonRpcRequest::new(request_id, "private/get_order_state", params);
672
673        let payload =
674            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
675
676        log::debug!("Sending get_order_state: request_id={request_id}, order_id={order_id}");
677        self.send_with_retry(payload, Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()))
678            .await
679    }
680
681    /// Processes a command from the client.
682    async fn process_command(&mut self, cmd: HandlerCommand) {
683        match cmd {
684            HandlerCommand::SetClient(client) => {
685                log::debug!("Setting WebSocket client");
686                self.inner = Some(client);
687            }
688            HandlerCommand::Disconnect => {
689                log::debug!("Disconnecting WebSocket");
690                if let Some(client) = self.inner.take() {
691                    client.disconnect().await;
692                }
693            }
694            HandlerCommand::Authenticate { auth_params } => {
695                let request_id = self.next_request_id();
696                log::debug!("Authenticating: request_id={request_id}");
697
698                // Track this request for response correlation
699                self.pending_requests
700                    .insert(request_id, PendingRequestType::Authenticate);
701
702                let request = DeribitJsonRpcRequest::new(request_id, "public/auth", auth_params);
703                match serde_json::to_string(&request) {
704                    Ok(payload) => {
705                        if let Err(e) = self.send_with_retry(payload, None).await {
706                            log::error!("Authentication send failed: {e}");
707                            self.auth_tracker.fail(format!("Send failed: {e}"));
708                        }
709                    }
710                    Err(e) => {
711                        log::error!("Failed to serialize auth request: {e}");
712                        self.auth_tracker.fail(format!("Serialization failed: {e}"));
713                    }
714                }
715            }
716            HandlerCommand::SetHeartbeat { interval } => {
717                if let Err(e) = self.handle_set_heartbeat(interval).await {
718                    log::error!("Set heartbeat failed: {e}");
719                }
720            }
721            HandlerCommand::InitializeInstruments(instruments) => {
722                log::info!("Handler received {} instruments", instruments.len());
723                self.instruments_cache.clear();
724                for inst in instruments {
725                    self.instruments_cache
726                        .insert(inst.raw_symbol().inner(), inst);
727                }
728            }
729            HandlerCommand::UpdateInstrument(instrument) => {
730                log::trace!("Updating instrument: {}", instrument.raw_symbol());
731                self.instruments_cache
732                    .insert(instrument.raw_symbol().inner(), *instrument);
733            }
734            HandlerCommand::Subscribe { channels } => {
735                if let Err(e) = self.handle_subscribe(channels).await {
736                    log::error!("Subscribe failed: {e}");
737                }
738            }
739            HandlerCommand::Unsubscribe { channels } => {
740                if let Err(e) = self.handle_unsubscribe(channels).await {
741                    log::error!("Unsubscribe failed: {e}");
742                }
743            }
744            HandlerCommand::Buy {
745                params,
746                client_order_id,
747                trader_id,
748                strategy_id,
749                instrument_id,
750            } => {
751                if let Err(e) = self
752                    .handle_buy(
753                        params,
754                        client_order_id,
755                        trader_id,
756                        strategy_id,
757                        instrument_id,
758                    )
759                    .await
760                {
761                    log::error!("Buy order failed: {e}");
762                }
763            }
764            HandlerCommand::Sell {
765                params,
766                client_order_id,
767                trader_id,
768                strategy_id,
769                instrument_id,
770            } => {
771                if let Err(e) = self
772                    .handle_sell(
773                        params,
774                        client_order_id,
775                        trader_id,
776                        strategy_id,
777                        instrument_id,
778                    )
779                    .await
780                {
781                    log::error!("Sell order failed: {e}");
782                }
783            }
784            HandlerCommand::Edit {
785                params,
786                client_order_id,
787                trader_id,
788                strategy_id,
789                instrument_id,
790            } => {
791                if let Err(e) = self
792                    .handle_edit(
793                        params,
794                        client_order_id,
795                        trader_id,
796                        strategy_id,
797                        instrument_id,
798                    )
799                    .await
800                {
801                    log::error!("Edit order failed: {e}");
802                }
803            }
804            HandlerCommand::Cancel {
805                params,
806                client_order_id,
807                trader_id,
808                strategy_id,
809                instrument_id,
810            } => {
811                if let Err(e) = self
812                    .handle_cancel(
813                        params,
814                        client_order_id,
815                        trader_id,
816                        strategy_id,
817                        instrument_id,
818                    )
819                    .await
820                {
821                    log::error!("Cancel order failed: {e}");
822                }
823            }
824            HandlerCommand::CancelAllByInstrument {
825                params,
826                instrument_id,
827            } => {
828                if let Err(e) = self
829                    .handle_cancel_all_by_instrument(params, instrument_id)
830                    .await
831                {
832                    log::error!("Cancel all by instrument failed: {e}");
833                }
834            }
835            HandlerCommand::GetOrderState {
836                order_id,
837                client_order_id,
838                trader_id,
839                strategy_id,
840                instrument_id,
841            } => {
842                if let Err(e) = self
843                    .handle_get_order_state(
844                        order_id,
845                        client_order_id,
846                        trader_id,
847                        strategy_id,
848                        instrument_id,
849                    )
850                    .await
851                {
852                    log::error!("Get order state failed: {e}");
853                }
854            }
855        }
856    }
857
858    /// Processes a raw WebSocket message.
859    async fn process_raw_message(&mut self, text: &str) -> Option<NautilusWsMessage> {
860        if text == RECONNECTED {
861            log::info!("Received reconnection signal");
862
863            self.clear_state();
864
865            return Some(NautilusWsMessage::Reconnected);
866        }
867
868        // Parse the JSON-RPC message
869        let ws_msg = match parse_raw_message(text) {
870            Ok(msg) => msg,
871            Err(e) => {
872                log::warn!("Failed to parse message: {e}");
873                return None;
874            }
875        };
876
877        let ts_init = self.ts_init();
878
879        match ws_msg {
880            DeribitWsMessage::Response(response) => {
881                // Look up the request type by ID for explicit correlation
882                if let Some(request_id) = response.id
883                    && let Some(request_type) = self.pending_requests.remove(&request_id)
884                {
885                    match request_type {
886                        PendingRequestType::Authenticate => {
887                            if let Some(error) = &response.error {
888                                log::error!(
889                                    "Authentication failed: code={}, message={}, request_id={}",
890                                    error.code,
891                                    error.message,
892                                    request_id
893                                );
894                                self.auth_tracker.fail(format!(
895                                    "Authentication error code={}: {}",
896                                    error.code, error.message
897                                ));
898                            } else if let Some(result) = &response.result {
899                                match serde_json::from_value::<DeribitAuthResult>(result.clone()) {
900                                    Ok(auth_result) => {
901                                        self.auth_tracker.succeed();
902                                        log::debug!(
903                                            "WebSocket authenticated successfully (request_id={}, scope={}, expires_in={}s)",
904                                            request_id,
905                                            auth_result.scope,
906                                            auth_result.expires_in
907                                        );
908                                        return Some(NautilusWsMessage::Authenticated(Box::new(
909                                            auth_result,
910                                        )));
911                                    }
912                                    Err(e) => {
913                                        log::error!(
914                                            "Failed to parse auth result: request_id={request_id}, error={e}"
915                                        );
916                                        self.auth_tracker
917                                            .fail(format!("Failed to parse auth result: {e}"));
918                                    }
919                                }
920                            }
921                        }
922                        PendingRequestType::Subscribe { channels } => {
923                            if let Some(error) = &response.error {
924                                log::error!(
925                                    "Subscribe failed: code={}, message={}, channels={:?}, request_id={}",
926                                    error.code,
927                                    error.message,
928                                    channels,
929                                    request_id
930                                );
931                                // Mark channels as failed so they can be retried
932                                for ch in &channels {
933                                    self.subscriptions_state.confirm_unsubscribe(ch);
934                                }
935                            } else {
936                                // Confirm each channel in the subscription
937                                for ch in &channels {
938                                    self.subscriptions_state.confirm_subscribe(ch);
939                                    log::debug!("Subscription confirmed: {ch}");
940                                }
941                            }
942                        }
943                        PendingRequestType::Unsubscribe { channels } => {
944                            if let Some(error) = &response.error {
945                                log::error!(
946                                    "Unsubscribe failed: code={}, message={}, channels={:?}, request_id={}",
947                                    error.code,
948                                    error.message,
949                                    channels,
950                                    request_id
951                                );
952                            } else {
953                                // Confirm each channel in the unsubscription
954                                for ch in &channels {
955                                    self.subscriptions_state.confirm_unsubscribe(ch);
956                                    log::debug!("Unsubscription confirmed: {ch}");
957                                }
958                            }
959                        }
960                        PendingRequestType::SetHeartbeat => {
961                            if let Some(error) = &response.error {
962                                log::error!(
963                                    "Set heartbeat failed: code={}, message={}, request_id={}",
964                                    error.code,
965                                    error.message,
966                                    request_id
967                                );
968                            } else {
969                                log::debug!("Heartbeat enabled (request_id={request_id})");
970                            }
971                        }
972                        PendingRequestType::Test => {
973                            if let Some(error) = &response.error {
974                                log::warn!(
975                                    "Heartbeat test failed: code={}, message={}, request_id={}",
976                                    error.code,
977                                    error.message,
978                                    request_id
979                                );
980                            } else {
981                                log::trace!(
982                                    "Heartbeat test acknowledged (request_id={request_id})"
983                                );
984                            }
985                        }
986                        PendingRequestType::Cancel {
987                            client_order_id,
988                            trader_id,
989                            strategy_id,
990                            instrument_id,
991                        } => {
992                            if let Some(result) = &response.result {
993                                match serde_json::from_value::<DeribitOrderMsg>(result.clone()) {
994                                    Ok(order_msg) => {
995                                        // Cancel confirmed - don't emit or remove context here.
996                                        // Let user.orders stream handle the cancel event to avoid
997                                        // duplicates. The stream will use the context for correct
998                                        // trader/strategy IDs and then remove it.
999                                        log::debug!(
1000                                            "Cancel confirmed: venue_order_id={}, client_order_id={}, state={} (waiting for user.orders)",
1001                                            order_msg.order_id,
1002                                            client_order_id,
1003                                            order_msg.order_state
1004                                        );
1005                                    }
1006                                    Err(e) => {
1007                                        log::error!(
1008                                            "Failed to parse cancel response: request_id={request_id}, error={e}"
1009                                        );
1010                                    }
1011                                }
1012                            } else if let Some(error) = &response.error {
1013                                log::error!(
1014                                    "Cancel rejected: code={}, message={}, client_order_id={}",
1015                                    error.code,
1016                                    error.message,
1017                                    client_order_id
1018                                );
1019                                return Some(NautilusWsMessage::OrderCancelRejected(
1020                                    OrderCancelRejected::new(
1021                                        trader_id,
1022                                        strategy_id,
1023                                        instrument_id,
1024                                        client_order_id,
1025                                        ustr::ustr(&format!(
1026                                            "code={}: {}",
1027                                            error.code, error.message
1028                                        )),
1029                                        UUID4::new(),
1030                                        ts_init,
1031                                        ts_init,
1032                                        false,
1033                                        None, // venue_order_id not available in error response
1034                                        self.account_id,
1035                                    ),
1036                                ));
1037                            }
1038                        }
1039                        PendingRequestType::CancelAllByInstrument { instrument_id } => {
1040                            if let Some(result) = &response.result {
1041                                match serde_json::from_value::<u64>(result.clone()) {
1042                                    Ok(count) => {
1043                                        log::info!(
1044                                            "Cancelled {count} orders for instrument {instrument_id}"
1045                                        );
1046                                        // Individual order status updates come via user.orders subscription
1047                                    }
1048                                    Err(e) => {
1049                                        log::warn!("Failed to parse cancel_all response: {e}");
1050                                    }
1051                                }
1052                            } else if let Some(error) = &response.error {
1053                                log::error!(
1054                                    "Cancel all by instrument rejected: code={}, message={}, instrument_id={}",
1055                                    error.code,
1056                                    error.message,
1057                                    instrument_id
1058                                );
1059                            }
1060                        }
1061                        PendingRequestType::Buy {
1062                            client_order_id,
1063                            trader_id,
1064                            strategy_id,
1065                            instrument_id,
1066                        }
1067                        | PendingRequestType::Sell {
1068                            client_order_id,
1069                            trader_id,
1070                            strategy_id,
1071                            instrument_id,
1072                        } => {
1073                            if let Some(result) = &response.result {
1074                                match serde_json::from_value::<DeribitOrderResponse>(result.clone())
1075                                {
1076                                    Ok(order_response) => {
1077                                        let venue_order_id_str = &order_response.order.order_id;
1078                                        let venue_order_id =
1079                                            VenueOrderId::new(venue_order_id_str.as_str());
1080                                        let order_state = &order_response.order.order_state;
1081                                        log::debug!(
1082                                            "Order response: venue_order_id={venue_order_id}, client_order_id={client_order_id}, state={order_state}"
1083                                        );
1084
1085                                        self.order_contexts.insert(
1086                                            venue_order_id,
1087                                            OrderContext {
1088                                                client_order_id,
1089                                                trader_id,
1090                                                strategy_id,
1091                                                instrument_id,
1092                                            },
1093                                        );
1094
1095                                        // Skip OrderAccepted if order already reached terminal state
1096                                        if self.terminal_orders.contains(&client_order_id) {
1097                                            log::debug!(
1098                                                "Skipping OrderAccepted for terminal order: client_order_id={client_order_id}"
1099                                            );
1100                                            self.emitted_accepted.add(venue_order_id);
1101                                        } else if order_state == "filled" {
1102                                            // Order went directly Submitted -> Filled (e.g., market orders)
1103                                            log::debug!(
1104                                                "Skipping OrderAccepted for already filled order: venue_order_id={venue_order_id}, client_order_id={client_order_id}"
1105                                            );
1106                                            self.terminal_orders.add(client_order_id);
1107                                            self.emitted_accepted.add(venue_order_id);
1108                                        } else {
1109                                            let instrument_name_ustr = Ustr::from(
1110                                                order_response.order.instrument_name.as_str(),
1111                                            );
1112                                            if let Some(instrument) =
1113                                                self.instruments_cache.get(&instrument_name_ustr)
1114                                            {
1115                                                if let Some(account_id) = self.account_id {
1116                                                    let event = parse_order_accepted(
1117                                                        &order_response.order,
1118                                                        instrument,
1119                                                        account_id,
1120                                                        trader_id,
1121                                                        strategy_id,
1122                                                        ts_init,
1123                                                    );
1124                                                    // Mark OrderAccepted as emitted to prevent duplicate from subscription
1125                                                    self.emitted_accepted.add(venue_order_id);
1126                                                    return Some(NautilusWsMessage::OrderAccepted(
1127                                                        event,
1128                                                    ));
1129                                                } else {
1130                                                    log::warn!(
1131                                                        "Cannot create OrderAccepted: account_id not set"
1132                                                    );
1133                                                }
1134                                            } else {
1135                                                log::warn!(
1136                                                    "Instrument {instrument_name_ustr} not found in cache for order response"
1137                                                );
1138                                            }
1139                                        }
1140                                    }
1141                                    Err(e) => {
1142                                        log::error!(
1143                                            "Failed to parse order response: request_id={request_id}, error={e}"
1144                                        );
1145                                        return Some(NautilusWsMessage::OrderRejected(
1146                                            OrderRejected::new(
1147                                                trader_id,
1148                                                strategy_id,
1149                                                instrument_id,
1150                                                client_order_id,
1151                                                self.account_id
1152                                                    .unwrap_or(AccountId::new("DERIBIT-UNKNOWN")),
1153                                                ustr::ustr(&format!(
1154                                                    "Failed to parse response: {e}"
1155                                                )),
1156                                                UUID4::new(),
1157                                                ts_init,
1158                                                ts_init,
1159                                                false,
1160                                                false,
1161                                            ),
1162                                        ));
1163                                    }
1164                                }
1165                            } else if let Some(error) = &response.error {
1166                                let due_post_only = error.code == DERIBIT_POST_ONLY_ERROR_CODE;
1167                                let reason = if let Some(data) = &error.data {
1168                                    format!(
1169                                        "code={}: {} (data: {})",
1170                                        error.code, error.message, data
1171                                    )
1172                                } else {
1173                                    format!("code={}: {}", error.code, error.message)
1174                                };
1175
1176                                log::debug!(
1177                                    "Order rejected: {reason}, client_order_id={client_order_id}"
1178                                );
1179                                return Some(NautilusWsMessage::OrderRejected(OrderRejected::new(
1180                                    trader_id,
1181                                    strategy_id,
1182                                    instrument_id,
1183                                    client_order_id,
1184                                    self.account_id.unwrap_or(AccountId::new("DERIBIT-UNKNOWN")),
1185                                    ustr::ustr(&reason),
1186                                    UUID4::new(),
1187                                    ts_init,
1188                                    ts_init,
1189                                    false,
1190                                    due_post_only,
1191                                )));
1192                            }
1193                        }
1194                        PendingRequestType::Edit {
1195                            client_order_id,
1196                            trader_id,
1197                            strategy_id,
1198                            instrument_id,
1199                        } => {
1200                            if let Some(result) = &response.result {
1201                                match serde_json::from_value::<DeribitOrderResponse>(result.clone())
1202                                {
1203                                    Ok(order_response) => {
1204                                        let venue_order_id =
1205                                            VenueOrderId::new(&order_response.order.order_id);
1206                                        log::info!(
1207                                            "Order updated: venue_order_id={}, client_order_id={}, state={}",
1208                                            venue_order_id,
1209                                            client_order_id,
1210                                            order_response.order.order_state
1211                                        );
1212
1213                                        self.order_contexts.insert(
1214                                            venue_order_id,
1215                                            OrderContext {
1216                                                client_order_id,
1217                                                trader_id,
1218                                                strategy_id,
1219                                                instrument_id,
1220                                            },
1221                                        );
1222
1223                                        let instrument_name_ustr = Ustr::from(
1224                                            order_response.order.instrument_name.as_str(),
1225                                        );
1226                                        if let Some(instrument) =
1227                                            self.instruments_cache.get(&instrument_name_ustr)
1228                                        {
1229                                            if let Some(account_id) = self.account_id {
1230                                                let event = parse_order_updated(
1231                                                    &order_response.order,
1232                                                    instrument,
1233                                                    account_id,
1234                                                    trader_id,
1235                                                    strategy_id,
1236                                                    ts_init,
1237                                                );
1238                                                return Some(NautilusWsMessage::OrderUpdated(
1239                                                    event,
1240                                                ));
1241                                            } else {
1242                                                log::warn!(
1243                                                    "Cannot create OrderUpdated: account_id not set"
1244                                                );
1245                                            }
1246                                        } else {
1247                                            log::warn!(
1248                                                "Instrument {instrument_name_ustr} not found in cache for edit response"
1249                                            );
1250                                        }
1251                                    }
1252                                    Err(e) => {
1253                                        log::error!(
1254                                            "Failed to parse edit response: request_id={request_id}, error={e}"
1255                                        );
1256                                        return Some(NautilusWsMessage::OrderModifyRejected(
1257                                            OrderModifyRejected::new(
1258                                                trader_id,
1259                                                strategy_id,
1260                                                instrument_id,
1261                                                client_order_id,
1262                                                ustr::ustr(&format!(
1263                                                    "Failed to parse response: {e}"
1264                                                )),
1265                                                UUID4::new(),
1266                                                ts_init,
1267                                                ts_init,
1268                                                false,
1269                                                None, // venue_order_id not available
1270                                                self.account_id,
1271                                            ),
1272                                        ));
1273                                    }
1274                                }
1275                            } else if let Some(error) = &response.error {
1276                                log::error!(
1277                                    "Order modify rejected: code={}, message={}, client_order_id={}",
1278                                    error.code,
1279                                    error.message,
1280                                    client_order_id
1281                                );
1282                                return Some(NautilusWsMessage::OrderModifyRejected(
1283                                    OrderModifyRejected::new(
1284                                        trader_id,
1285                                        strategy_id,
1286                                        instrument_id,
1287                                        client_order_id,
1288                                        ustr::ustr(&format!(
1289                                            "code={}: {}",
1290                                            error.code, error.message
1291                                        )),
1292                                        UUID4::new(),
1293                                        ts_init,
1294                                        ts_init,
1295                                        false,
1296                                        None, // venue_order_id not available
1297                                        self.account_id,
1298                                    ),
1299                                ));
1300                            }
1301                        }
1302                        PendingRequestType::GetOrderState {
1303                            client_order_id,
1304                            trader_id: _,
1305                            strategy_id: _,
1306                            instrument_id: _,
1307                        } => {
1308                            if let Some(result) = &response.result {
1309                                match serde_json::from_value::<DeribitOrderMsg>(result.clone()) {
1310                                    Ok(order_msg) => {
1311                                        log::info!(
1312                                            "Order state received: venue_order_id={}, client_order_id={}, state={}",
1313                                            order_msg.order_id,
1314                                            client_order_id,
1315                                            order_msg.order_state
1316                                        );
1317
1318                                        // Convert to OrderStatusReport
1319                                        let instrument_name_ustr = order_msg.instrument_name;
1320                                        if let Some(instrument) =
1321                                            self.instruments_cache.get(&instrument_name_ustr)
1322                                        {
1323                                            if let Some(account_id) = self.account_id {
1324                                                match parse_user_order_msg(
1325                                                    &order_msg, instrument, account_id, ts_init,
1326                                                ) {
1327                                                    Ok(report) => {
1328                                                        return Some(
1329                                                            NautilusWsMessage::OrderStatusReports(
1330                                                                vec![report],
1331                                                            ),
1332                                                        );
1333                                                    }
1334                                                    Err(e) => {
1335                                                        log::warn!(
1336                                                            "Failed to parse get_order_state response to report: {e}"
1337                                                        );
1338                                                    }
1339                                                }
1340                                            } else {
1341                                                log::warn!(
1342                                                    "Cannot create OrderStatusReport: account_id not set"
1343                                                );
1344                                            }
1345                                        } else {
1346                                            log::warn!(
1347                                                "Instrument {instrument_name_ustr} not found in cache for get_order_state response"
1348                                            );
1349                                        }
1350                                    }
1351                                    Err(e) => {
1352                                        log::error!(
1353                                            "Failed to parse get_order_state response: request_id={request_id}, error={e}"
1354                                        );
1355                                    }
1356                                }
1357                            } else if let Some(error) = &response.error {
1358                                log::error!(
1359                                    "Get order state failed: code={}, message={}, client_order_id={}",
1360                                    error.code,
1361                                    error.message,
1362                                    client_order_id
1363                                );
1364                            }
1365                        }
1366                    }
1367                } else if let Some(request_id) = response.id {
1368                    // Response with ID but no matching pending request
1369                    if let Some(error) = &response.error {
1370                        // Log orphaned error response with all available context
1371                        log::error!(
1372                            "Deribit error for unknown request: code={}, message={}, request_id={}, data={:?}",
1373                            error.code,
1374                            error.message,
1375                            request_id,
1376                            error.data
1377                        );
1378                        return Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
1379                            code: error.code,
1380                            message: error.message.clone(),
1381                        }));
1382                    } else {
1383                        // Success response but no pending request - likely already processed
1384                        log::debug!(
1385                            "Received response for unknown request_id={}, result present: {}",
1386                            request_id,
1387                            response.result.is_some()
1388                        );
1389                    }
1390                } else if let Some(error) = &response.error {
1391                    // Error response with no ID (shouldn't happen in JSON-RPC 2.0, but handle it)
1392                    log::error!(
1393                        "Deribit error with no request_id: code={}, message={}, data={:?}",
1394                        error.code,
1395                        error.message,
1396                        error.data
1397                    );
1398                    return Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
1399                        code: error.code,
1400                        message: error.message.clone(),
1401                    }));
1402                }
1403                None
1404            }
1405            DeribitWsMessage::Notification(notification) => {
1406                let channel = &notification.params.channel;
1407                let data = &notification.params.data;
1408
1409                // Determine channel type and parse accordingly
1410                if let Some(channel_type) = DeribitWsChannel::from_channel_string(channel) {
1411                    match channel_type {
1412                        DeribitWsChannel::Trades => {
1413                            // Parse trade messages
1414                            match serde_json::from_value::<Vec<DeribitTradeMsg>>(data.clone()) {
1415                                Ok(trades) => {
1416                                    log::debug!("Received {} trades", trades.len());
1417                                    let data_vec =
1418                                        parse_trades_data(trades, &self.instruments_cache, ts_init);
1419                                    if data_vec.is_empty() {
1420                                        log::debug!(
1421                                            "No trades parsed - instrument cache size: {}",
1422                                            self.instruments_cache.len()
1423                                        );
1424                                    } else {
1425                                        log::debug!("Parsed {} trade ticks", data_vec.len());
1426                                        return Some(NautilusWsMessage::Data(data_vec));
1427                                    }
1428                                }
1429                                Err(e) => {
1430                                    log::warn!("Failed to deserialize trades: {e}");
1431                                }
1432                            }
1433                        }
1434                        DeribitWsChannel::Book => {
1435                            // Parse order book messages
1436                            match serde_json::from_value::<DeribitBookMsg>(data.clone()) {
1437                                Ok(book_msg) => {
1438                                    if let Some(instrument) =
1439                                        self.instruments_cache.get(&book_msg.instrument_name)
1440                                    {
1441                                        if book_msg.msg_type == DeribitBookMsgType::Change
1442                                            && let Some(prev_id) = book_msg.prev_change_id
1443                                            && let Some(&last_id) =
1444                                                self.book_sequence.get(&book_msg.instrument_name)
1445                                            && prev_id != last_id
1446                                        {
1447                                            log::warn!(
1448                                                "Book sequence gap for {}: expected prev_change_id={}, was {}",
1449                                                book_msg.instrument_name,
1450                                                last_id,
1451                                                prev_id
1452                                            );
1453                                        }
1454                                        self.book_sequence
1455                                            .insert(book_msg.instrument_name, book_msg.change_id);
1456
1457                                        match parse_book_msg(&book_msg, instrument, ts_init) {
1458                                            Ok(deltas) => {
1459                                                return Some(NautilusWsMessage::Deltas(deltas));
1460                                            }
1461                                            Err(e) => {
1462                                                log::warn!("Failed to parse book message: {e}");
1463                                            }
1464                                        }
1465                                    } else {
1466                                        log::warn!(
1467                                            "Book message received but instrument '{}' not found in cache (cache size: {})",
1468                                            book_msg.instrument_name,
1469                                            self.instruments_cache.len()
1470                                        );
1471                                    }
1472                                }
1473                                Err(e) => {
1474                                    log::warn!(
1475                                        "Failed to deserialize book message: {e}, channel: {channel}"
1476                                    );
1477                                }
1478                            }
1479                        }
1480                        DeribitWsChannel::Ticker => {
1481                            // Parse ticker to emit both MarkPrice and IndexPrice
1482                            // When subscribed to either mark_prices or index_prices, we emit both
1483                            // as traders typically need both for analysis
1484                            if let Ok(ticker_msg) =
1485                                serde_json::from_value::<DeribitTickerMsg>(data.clone())
1486                                && let Some(instrument) =
1487                                    self.instruments_cache.get(&ticker_msg.instrument_name)
1488                            {
1489                                match (
1490                                    parse_ticker_to_mark_price(&ticker_msg, instrument, ts_init),
1491                                    parse_ticker_to_index_price(&ticker_msg, instrument, ts_init),
1492                                ) {
1493                                    (Ok(mark_price), Ok(index_price)) => {
1494                                        return Some(NautilusWsMessage::Data(vec![
1495                                            Data::MarkPriceUpdate(mark_price),
1496                                            Data::IndexPriceUpdate(index_price),
1497                                        ]));
1498                                    }
1499                                    (Err(e), _) | (_, Err(e)) => {
1500                                        log::warn!("Failed to parse ticker prices: {e}");
1501                                    }
1502                                }
1503                            }
1504                        }
1505                        DeribitWsChannel::Perpetual => {
1506                            // Parse perpetual channel for funding rate updates
1507                            // This channel is dedicated to perpetual instruments and provides
1508                            // the interest (funding) rate
1509                            match serde_json::from_value::<DeribitPerpetualMsg>(data.clone()) {
1510                                Ok(perpetual_msg) => {
1511                                    // Extract instrument name from channel: perpetual.{instrument}.{interval}
1512                                    let parts: Vec<&str> = channel.split('.').collect();
1513                                    if parts.len() >= 2 {
1514                                        let instrument_name = Ustr::from(parts[1]);
1515                                        if let Some(instrument) =
1516                                            self.instruments_cache.get(&instrument_name)
1517                                        {
1518                                            let funding_rate = parse_perpetual_to_funding_rate(
1519                                                &perpetual_msg,
1520                                                instrument,
1521                                                ts_init,
1522                                            );
1523                                            return Some(NautilusWsMessage::FundingRates(vec![
1524                                                funding_rate,
1525                                            ]));
1526                                        } else {
1527                                            log::warn!(
1528                                                "Instrument {} not found in cache (cache size: {})",
1529                                                instrument_name,
1530                                                self.instruments_cache.len()
1531                                            );
1532                                        }
1533                                    }
1534                                }
1535                                Err(e) => {
1536                                    log::warn!(
1537                                        "Failed to deserialize perpetual message: {e}, data: {data}"
1538                                    );
1539                                }
1540                            }
1541                        }
1542                        DeribitWsChannel::Quote => {
1543                            // Parse quote messages
1544                            if let Ok(quote_msg) =
1545                                serde_json::from_value::<DeribitQuoteMsg>(data.clone())
1546                                && let Some(instrument) =
1547                                    self.instruments_cache.get(&quote_msg.instrument_name)
1548                            {
1549                                match parse_quote_msg(&quote_msg, instrument, ts_init) {
1550                                    Ok(quote) => {
1551                                        return Some(NautilusWsMessage::Data(vec![Data::Quote(
1552                                            quote,
1553                                        )]));
1554                                    }
1555                                    Err(e) => {
1556                                        log::warn!("Failed to parse quote message: {e}");
1557                                    }
1558                                }
1559                            }
1560                        }
1561                        DeribitWsChannel::InstrumentState => {
1562                            // Parse instrument state lifecycle notifications
1563                            match serde_json::from_value::<DeribitInstrumentStateMsg>(data.clone())
1564                            {
1565                                Ok(state_msg) => {
1566                                    log::info!(
1567                                        "Instrument state change: {} -> {} (timestamp: {})",
1568                                        state_msg.instrument_name,
1569                                        state_msg.state,
1570                                        state_msg.timestamp
1571                                    );
1572                                    // Return raw data for consumers to handle state changes
1573                                    // TODO: Optionally emit instrument updates when instrument transitions to 'started'
1574                                    return Some(NautilusWsMessage::Raw(data.clone()));
1575                                }
1576                                Err(e) => {
1577                                    log::warn!("Failed to parse instrument state message: {e}");
1578                                }
1579                            }
1580                        }
1581                        DeribitWsChannel::ChartTrades => {
1582                            // Parse chart.trades messages into Bar objects using emit-on-next pattern.
1583                            // Deribit sends updates for the current bar as it builds. We only emit
1584                            // a bar when we receive a bar with a different timestamp, confirming
1585                            // the previous bar is closed.
1586                            if let Ok(chart_msg) =
1587                                serde_json::from_value::<DeribitChartMsg>(data.clone())
1588                            {
1589                                // Extract instrument and resolution from channel
1590                                // Channel format: chart.trades.{instrument}.{resolution}
1591                                let parts: Vec<&str> = channel.split('.').collect();
1592                                if parts.len() >= 4 {
1593                                    let instrument_name = Ustr::from(parts[2]);
1594                                    let resolution = parts[3];
1595
1596                                    if let Some(instrument) =
1597                                        self.instruments_cache.get(&instrument_name)
1598                                    {
1599                                        let instrument_id = instrument.id();
1600
1601                                        match resolution_to_bar_type(instrument_id, resolution) {
1602                                            Ok(bar_type) => {
1603                                                let price_precision = instrument.price_precision();
1604                                                let size_precision = instrument.size_precision();
1605
1606                                                match parse_chart_msg(
1607                                                    &chart_msg,
1608                                                    bar_type,
1609                                                    price_precision,
1610                                                    size_precision,
1611                                                    self.bars_timestamp_on_close,
1612                                                    ts_init,
1613                                                ) {
1614                                                    Ok(new_bar) => {
1615                                                        // Check if we have a pending bar for this channel
1616                                                        let channel_key = channel.clone();
1617                                                        if let Some(pending_bar) =
1618                                                            self.pending_bars.get(&channel_key)
1619                                                        {
1620                                                            // If new bar has different timestamp, the pending bar is closed
1621                                                            if new_bar.ts_event
1622                                                                != pending_bar.ts_event
1623                                                            {
1624                                                                let closed_bar = *pending_bar;
1625                                                                self.pending_bars
1626                                                                    .insert(channel_key, new_bar);
1627                                                                log::debug!(
1628                                                                    "Emitting closed bar: {closed_bar:?}"
1629                                                                );
1630                                                                return Some(
1631                                                                    NautilusWsMessage::Data(vec![
1632                                                                        Data::Bar(closed_bar),
1633                                                                    ]),
1634                                                                );
1635                                                            }
1636                                                            // Same timestamp - update pending bar with latest values
1637                                                            self.pending_bars
1638                                                                .insert(channel_key, new_bar);
1639                                                        } else {
1640                                                            // First bar for this channel - store as pending
1641                                                            self.pending_bars
1642                                                                .insert(channel_key, new_bar);
1643                                                        }
1644                                                    }
1645                                                    Err(e) => {
1646                                                        log::warn!(
1647                                                            "Failed to parse chart message to bar: {e}"
1648                                                        );
1649                                                    }
1650                                                }
1651                                            }
1652                                            Err(e) => {
1653                                                log::warn!(
1654                                                    "Failed to create BarType from resolution {resolution}: {e}"
1655                                                );
1656                                            }
1657                                        }
1658                                    } else {
1659                                        log::warn!(
1660                                            "Instrument {instrument_name} not found in cache for chart data"
1661                                        );
1662                                    }
1663                                }
1664                            }
1665                        }
1666                        DeribitWsChannel::UserOrders => {
1667                            // Handle both array and single object responses
1668                            let orders_result =
1669                                serde_json::from_value::<Vec<DeribitOrderMsg>>(data.clone())
1670                                    .or_else(|_| {
1671                                        serde_json::from_value::<DeribitOrderMsg>(data.clone())
1672                                            .map(|order| vec![order])
1673                                    });
1674
1675                            match orders_result {
1676                                Ok(orders) => {
1677                                    log::debug!("Received {} user order updates", orders.len());
1678
1679                                    // Require account_id for parsing
1680                                    let Some(account_id) = self.account_id else {
1681                                        log::warn!("Cannot parse user orders: account_id not set");
1682                                        return Some(NautilusWsMessage::Raw(data.clone()));
1683                                    };
1684
1685                                    let mut outgoing = Vec::new();
1686
1687                                    // Process each order and emit appropriate events
1688                                    for order in &orders {
1689                                        let venue_order_id_str = &order.order_id;
1690                                        let venue_order_id =
1691                                            VenueOrderId::new(venue_order_id_str.as_str());
1692                                        let instrument_name = order.instrument_name;
1693
1694                                        let Some(instrument) =
1695                                            self.instruments_cache.get(&instrument_name)
1696                                        else {
1697                                            log::warn!(
1698                                                "Instrument {instrument_name} not found in cache"
1699                                            );
1700                                            continue;
1701                                        };
1702
1703                                        // Look up OrderContext for this order
1704                                        // First check order_contexts (for orders whose response has been processed)
1705                                        // Then check pending_requests (for orders whose response hasn't arrived yet)
1706                                        // If neither found, this is a true external order
1707                                        let context =
1708                                            self.order_contexts.get(&venue_order_id).cloned();
1709
1710                                        // Extract client_order_id from order label for pending check
1711                                        let label_client_order_id = order
1712                                            .label
1713                                            .as_ref()
1714                                            .filter(|l| !l.is_empty())
1715                                            .map(ClientOrderId::new);
1716
1717                                        // Check for pending request if not in order_contexts
1718                                        let pending_context = if context.is_none() {
1719                                            if let Some(client_id) = &label_client_order_id {
1720                                                self.get_pending_order_context(client_id)
1721                                            } else {
1722                                                None
1723                                            }
1724                                        } else {
1725                                            None
1726                                        };
1727
1728                                        // Check if order has a pending request for context resolution
1729                                        let has_pending_request =
1730                                            if let Some(client_id) = &label_client_order_id {
1731                                                self.is_pending_order(client_id)
1732                                            } else {
1733                                                false
1734                                            };
1735
1736                                        let effective_context = context.or(pending_context);
1737                                        let is_known_order =
1738                                            effective_context.is_some() || has_pending_request;
1739
1740                                        // Determine event type based on order state
1741                                        let event_type = determine_order_event_type(
1742                                            &order.order_state,
1743                                            !is_known_order, // is_new if we don't know about it
1744                                            false,           // not from edit response
1745                                        );
1746
1747                                        let (trader_id, strategy_id, client_order_id) =
1748                                            if let Some(ctx) = effective_context {
1749                                                (
1750                                                    ctx.trader_id,
1751                                                    ctx.strategy_id,
1752                                                    ctx.client_order_id,
1753                                                )
1754                                            } else {
1755                                                // External order - use default values
1756                                                // Note: These won't match any strategy, which is correct
1757                                                (
1758                                                    TraderId::new("EXTERNAL-000"),
1759                                                    StrategyId::new("EXTERNAL"),
1760                                                    ClientOrderId::new(venue_order_id_str),
1761                                                )
1762                                            };
1763
1764                                        match event_type {
1765                                            OrderEventType::Accepted => {
1766                                                // Skip if order already reached terminal state (race condition)
1767                                                if self.terminal_orders.contains(&client_order_id) {
1768                                                    log::debug!(
1769                                                        "Skipping OrderAccepted for terminal order: client_order_id={client_order_id}"
1770                                                    );
1771                                                    continue;
1772                                                }
1773
1774                                                // Check if we already emitted OrderAccepted for this order
1775                                                // This prevents duplicates from both response and subscription paths
1776                                                if self.emitted_accepted.contains(&venue_order_id) {
1777                                                    log::trace!(
1778                                                        "Skipping duplicate OrderAccepted: venue_order_id={venue_order_id}"
1779                                                    );
1780                                                    continue;
1781                                                }
1782
1783                                                let event = parse_order_accepted(
1784                                                    order,
1785                                                    instrument,
1786                                                    account_id,
1787                                                    trader_id,
1788                                                    strategy_id,
1789                                                    ts_init,
1790                                                );
1791
1792                                                // Mark OrderAccepted as emitted
1793                                                self.emitted_accepted.add(venue_order_id);
1794
1795                                                log::debug!(
1796                                                    "Emitting OrderAccepted: venue_order_id={venue_order_id}, is_known={is_known_order}"
1797                                                );
1798                                                outgoing
1799                                                    .push(NautilusWsMessage::OrderAccepted(event));
1800                                            }
1801                                            OrderEventType::Canceled => {
1802                                                let event = parse_order_canceled(
1803                                                    order,
1804                                                    instrument,
1805                                                    account_id,
1806                                                    trader_id,
1807                                                    strategy_id,
1808                                                    ts_init,
1809                                                );
1810                                                log::debug!(
1811                                                    "Emitting OrderCanceled: venue_order_id={venue_order_id}"
1812                                                );
1813                                                self.terminal_orders.add(client_order_id);
1814                                                self.order_contexts.remove(&venue_order_id);
1815                                                self.emitted_accepted.remove(&venue_order_id);
1816                                                outgoing
1817                                                    .push(NautilusWsMessage::OrderCanceled(event));
1818                                            }
1819                                            OrderEventType::Expired => {
1820                                                let event = parse_order_expired(
1821                                                    order,
1822                                                    instrument,
1823                                                    account_id,
1824                                                    trader_id,
1825                                                    strategy_id,
1826                                                    ts_init,
1827                                                );
1828                                                log::debug!(
1829                                                    "Emitting OrderExpired: venue_order_id={venue_order_id}"
1830                                                );
1831                                                self.terminal_orders.add(client_order_id);
1832                                                self.order_contexts.remove(&venue_order_id);
1833                                                self.emitted_accepted.remove(&venue_order_id);
1834                                                outgoing
1835                                                    .push(NautilusWsMessage::OrderExpired(event));
1836                                            }
1837                                            OrderEventType::Updated => {
1838                                                // Emit OrderStatusReport for updates
1839                                                // This includes quantity/price changes from modify
1840                                                match parse_user_order_msg(
1841                                                    order, instrument, account_id, ts_init,
1842                                                ) {
1843                                                    Ok(report) => {
1844                                                        log::debug!(
1845                                                            "Emitting OrderStatusReport (updated): venue_order_id={venue_order_id}"
1846                                                        );
1847                                                        outgoing.push(
1848                                                            NautilusWsMessage::OrderStatusReports(
1849                                                                vec![report],
1850                                                            ),
1851                                                        );
1852                                                    }
1853                                                    Err(e) => {
1854                                                        log::warn!(
1855                                                            "Failed to parse order update: {e}"
1856                                                        );
1857                                                    }
1858                                                }
1859                                            }
1860                                            OrderEventType::None => {
1861                                                // Fills handled via user.trades, track terminal state
1862                                                // for race condition prevention
1863                                                if matches!(
1864                                                    order.order_state.as_str(),
1865                                                    "filled" | "rejected"
1866                                                ) {
1867                                                    log::debug!(
1868                                                        "Recording terminal order: venue_order_id={venue_order_id}, state={}",
1869                                                        order.order_state
1870                                                    );
1871                                                    self.terminal_orders.add(client_order_id);
1872                                                    self.order_contexts.remove(&venue_order_id);
1873                                                    self.emitted_accepted.remove(&venue_order_id);
1874                                                } else {
1875                                                    log::trace!(
1876                                                        "No event to emit for order {}, state={}",
1877                                                        venue_order_id,
1878                                                        order.order_state
1879                                                    );
1880                                                }
1881                                            }
1882                                        }
1883                                    }
1884
1885                                    if !outgoing.is_empty() {
1886                                        self.pending_outgoing.extend(outgoing);
1887                                    }
1888                                }
1889                                Err(e) => {
1890                                    log::warn!("Failed to deserialize user orders: {e}");
1891                                }
1892                            }
1893                        }
1894                        DeribitWsChannel::UserTrades => {
1895                            // Handle both array and single object responses
1896                            let trades_result =
1897                                serde_json::from_value::<Vec<DeribitUserTradeMsg>>(data.clone())
1898                                    .or_else(|_| {
1899                                        serde_json::from_value::<DeribitUserTradeMsg>(data.clone())
1900                                            .map(|trade| vec![trade])
1901                                    });
1902
1903                            match trades_result {
1904                                Ok(trades) => {
1905                                    log::debug!("Received {} user trade updates", trades.len());
1906
1907                                    let Some(account_id) = self.account_id else {
1908                                        log::warn!("Cannot parse user trades: account_id not set");
1909                                        return Some(NautilusWsMessage::Raw(data.clone()));
1910                                    };
1911
1912                                    let mut reports = Vec::with_capacity(trades.len());
1913                                    for trade in &trades {
1914                                        let instrument_name = trade.instrument_name;
1915                                        if let Some(instrument) =
1916                                            self.instruments_cache.get(&instrument_name)
1917                                        {
1918                                            match parse_user_trade_msg(
1919                                                trade, instrument, account_id, ts_init,
1920                                            ) {
1921                                                Ok(report) => {
1922                                                    log::debug!(
1923                                                        "Parsed fill report: {} @ {}",
1924                                                        report.trade_id,
1925                                                        report.last_px
1926                                                    );
1927                                                    reports.push(report);
1928                                                }
1929                                                Err(e) => {
1930                                                    log::warn!(
1931                                                        "Failed to parse trade {}: {e}",
1932                                                        trade.trade_id
1933                                                    );
1934                                                }
1935                                            }
1936                                        } else {
1937                                            log::warn!(
1938                                                "Instrument {instrument_name} not found in cache"
1939                                            );
1940                                        }
1941                                    }
1942
1943                                    if !reports.is_empty() {
1944                                        return Some(NautilusWsMessage::FillReports(reports));
1945                                    }
1946                                }
1947                                Err(e) => {
1948                                    log::warn!("Failed to deserialize user trades: {e}");
1949                                }
1950                            }
1951                        }
1952                        DeribitWsChannel::UserPortfolio => {
1953                            match serde_json::from_value::<DeribitPortfolioMsg>(data.clone()) {
1954                                Ok(portfolio) => {
1955                                    // Skip zero-balance currencies (common with cross-collateral)
1956                                    // Only check equity and balance - initial_margin can be non-zero
1957                                    // for all currencies when cross-collateral is enabled
1958                                    if portfolio.equity.is_zero() && portfolio.balance.is_zero() {
1959                                        log::trace!(
1960                                            "Skipping zero-balance portfolio for {}",
1961                                            portfolio.currency
1962                                        );
1963                                        return None;
1964                                    }
1965
1966                                    // Require account_id for parsing
1967                                    let Some(account_id) = self.account_id else {
1968                                        log::warn!("Cannot parse portfolio: account_id not set");
1969                                        return None;
1970                                    };
1971
1972                                    match parse_portfolio_to_account_state(
1973                                        &portfolio, account_id, ts_init,
1974                                    ) {
1975                                        Ok(account_state) => {
1976                                            // Check for duplicate per currency
1977                                            let currency_key = portfolio.currency.clone();
1978                                            if let Some(last) =
1979                                                self.last_account_states.get(&currency_key)
1980                                                && account_state.has_same_balances_and_margins(last)
1981                                            {
1982                                                log::trace!(
1983                                                    "Skipping duplicate portfolio update for {}",
1984                                                    portfolio.currency
1985                                                );
1986                                                return None;
1987                                            }
1988
1989                                            self.last_account_states
1990                                                .insert(currency_key, account_state.clone());
1991                                            return Some(NautilusWsMessage::AccountState(
1992                                                account_state,
1993                                            ));
1994                                        }
1995                                        Err(e) => {
1996                                            log::warn!(
1997                                                "Failed to parse portfolio to AccountState: {e}"
1998                                            );
1999                                        }
2000                                    }
2001                                }
2002                                Err(e) => {
2003                                    log::warn!("Failed to deserialize portfolio: {e}");
2004                                }
2005                            }
2006                        }
2007                        _ => {
2008                            // Unhandled channel - return raw
2009                            log::trace!("Unhandled channel: {channel}");
2010                            return Some(NautilusWsMessage::Raw(data.clone()));
2011                        }
2012                    }
2013                } else {
2014                    log::trace!("Unknown channel: {channel}");
2015                    return Some(NautilusWsMessage::Raw(data.clone()));
2016                }
2017                None
2018            }
2019            DeribitWsMessage::Heartbeat(heartbeat) => {
2020                match heartbeat.heartbeat_type {
2021                    DeribitHeartbeatType::TestRequest => {
2022                        log::trace!(
2023                            "Received heartbeat test_request - responding with public/test"
2024                        );
2025                        if let Err(e) = self.handle_heartbeat_test_request().await {
2026                            log::error!("Failed to respond to heartbeat test_request: {e}");
2027
2028                            // Return error to signal connection may be unhealthy
2029                            return Some(NautilusWsMessage::Error(DeribitWsError::Send(format!(
2030                                "Heartbeat response failed: {e}"
2031                            ))));
2032                        }
2033                    }
2034                    DeribitHeartbeatType::Heartbeat => {
2035                        log::trace!("Received heartbeat acknowledgment");
2036                    }
2037                }
2038                None
2039            }
2040            DeribitWsMessage::Error(err) => {
2041                log::error!("Deribit error {}: {}", err.code, err.message);
2042                Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
2043                    code: err.code,
2044                    message: err.message,
2045                }))
2046            }
2047            DeribitWsMessage::Reconnected => Some(NautilusWsMessage::Reconnected),
2048        }
2049    }
2050
2051    /// Main message processing loop.
2052    ///
2053    /// Returns `None` when the handler should stop.
2054    /// Messages that need client-side handling (e.g., Reconnected) are returned.
2055    /// Data messages are sent directly to `out_tx` for the user stream.
2056    pub async fn next(&mut self) -> Option<NautilusWsMessage> {
2057        loop {
2058            if let Some(msg) = self.pending_outgoing.pop_front() {
2059                match msg {
2060                    NautilusWsMessage::Reconnected | NautilusWsMessage::Authenticated(_) => {
2061                        return Some(msg);
2062                    }
2063                    _ => {
2064                        let _ = self.out_tx.send(msg);
2065                        continue;
2066                    }
2067                }
2068            }
2069
2070            tokio::select! {
2071                // Process commands from client
2072                Some(cmd) = self.cmd_rx.recv() => {
2073                    self.process_command(cmd).await;
2074                }
2075                // Process raw WebSocket messages
2076                Some(msg) = self.raw_rx.recv() => {
2077                    match msg {
2078                        Message::Text(text) => {
2079                            if let Some(nautilus_msg) = self.process_raw_message(&text).await {
2080                                // Send data messages to user stream
2081                                match &nautilus_msg {
2082                                    NautilusWsMessage::Data(_)
2083                                    | NautilusWsMessage::Deltas(_)
2084                                    | NautilusWsMessage::Instrument(_)
2085                                    | NautilusWsMessage::Raw(_)
2086                                    | NautilusWsMessage::Error(_) => {
2087                                        let _ = self.out_tx.send(nautilus_msg);
2088                                    }
2089                                    NautilusWsMessage::FundingRates(rates) => {
2090                                        let msg_to_send =
2091                                            NautilusWsMessage::FundingRates(rates.clone());
2092                                        if let Err(e) = self.out_tx.send(msg_to_send) {
2093                                            log::error!("Failed to send funding rates: {e}");
2094                                        }
2095                                    }
2096                                    NautilusWsMessage::OrderStatusReports(_)
2097                                    | NautilusWsMessage::FillReports(_)
2098                                    | NautilusWsMessage::OrderAccepted(_)
2099                                    | NautilusWsMessage::OrderCanceled(_)
2100                                    | NautilusWsMessage::OrderExpired(_)
2101                                    | NautilusWsMessage::OrderUpdated(_)
2102                                    | NautilusWsMessage::OrderRejected(_)
2103                                    | NautilusWsMessage::OrderCancelRejected(_)
2104                                    | NautilusWsMessage::OrderModifyRejected(_)
2105                                    | NautilusWsMessage::AccountState(_) => {
2106                                        let _ = self.out_tx.send(nautilus_msg);
2107                                    }
2108                                    // Return messages that need client-side handling
2109                                    NautilusWsMessage::Reconnected
2110                                    | NautilusWsMessage::Authenticated(_) => {
2111                                        return Some(nautilus_msg);
2112                                    }
2113                                }
2114                            }
2115                        }
2116                        Message::Ping(data) => {
2117                            // Respond to ping with pong
2118                            if let Some(client) = &self.inner {
2119                                let _ = client.send_pong(data.to_vec()).await;
2120                            }
2121                        }
2122                        Message::Close(_) => {
2123                            log::info!("Received close frame");
2124                        }
2125                        _ => {}
2126                    }
2127                }
2128                // Check for stop signal
2129                () = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
2130                    if self.signal.load(Ordering::Relaxed) {
2131                        log::debug!("Stop signal received");
2132                        return None;
2133                    }
2134                }
2135            }
2136        }
2137    }
2138}