nautilus_deribit/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Deribit.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel.
21
22use std::sync::{
23    Arc,
24    atomic::{AtomicBool, AtomicU64, Ordering},
25};
26
27use ahash::AHashMap;
28use nautilus_core::{AtomicTime, UnixNanos, time::get_atomic_clock_realtime};
29use nautilus_model::{
30    data::Data,
31    instruments::{Instrument, InstrumentAny},
32};
33use nautilus_network::{
34    RECONNECTED,
35    retry::{RetryManager, create_websocket_retry_manager},
36    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
37};
38use tokio_tungstenite::tungstenite::Message;
39use ustr::Ustr;
40
41use super::{
42    enums::{DeribitHeartbeatType, DeribitWsChannel},
43    error::DeribitWsError,
44    messages::{
45        DeribitAuthResult, DeribitBookMsg, DeribitChartMsg, DeribitHeartbeatParams,
46        DeribitInstrumentStateMsg, DeribitJsonRpcRequest, DeribitPerpetualMsg, DeribitQuoteMsg,
47        DeribitSubscribeParams, DeribitTickerMsg, DeribitTradeMsg, DeribitWsMessage,
48        NautilusWsMessage, parse_raw_message,
49    },
50    parse::{
51        parse_book_msg, parse_chart_msg, parse_perpetual_to_funding_rate, parse_quote_msg,
52        parse_ticker_to_index_price, parse_ticker_to_mark_price, parse_trades_data,
53        resolution_to_bar_type,
54    },
55};
56
57/// Type of pending request for request ID correlation.
58#[derive(Debug, Clone)]
59pub enum PendingRequestType {
60    /// Authentication request.
61    Authenticate,
62    /// Subscribe request with requested channels.
63    Subscribe { channels: Vec<String> },
64    /// Unsubscribe request with requested channels.
65    Unsubscribe { channels: Vec<String> },
66    /// Set heartbeat request.
67    SetHeartbeat,
68    /// Test/ping request (heartbeat response).
69    Test,
70}
71
72/// Commands sent from the client to the handler.
73#[allow(missing_debug_implementations)]
74pub enum HandlerCommand {
75    /// Set the active WebSocket client.
76    SetClient(WebSocketClient),
77    /// Disconnect the WebSocket.
78    Disconnect,
79    /// Authenticate with credentials.
80    Authenticate {
81        /// Serialized auth params (DeribitAuthParams or DeribitRefreshTokenParams).
82        auth_params: serde_json::Value,
83    },
84    /// Enable heartbeat with interval.
85    SetHeartbeat { interval: u64 },
86    /// Initialize the instrument cache.
87    InitializeInstruments(Vec<InstrumentAny>),
88    /// Update a single instrument in the cache.
89    UpdateInstrument(Box<InstrumentAny>),
90    /// Subscribe to channels.
91    Subscribe { channels: Vec<String> },
92    /// Unsubscribe from channels.
93    Unsubscribe { channels: Vec<String> },
94}
95
96/// Deribit WebSocket feed handler.
97///
98/// Runs in a dedicated Tokio task, processing commands and raw WebSocket messages.
99#[allow(missing_debug_implementations)]
100#[allow(dead_code)] // Fields reserved for future features
101pub struct DeribitWsFeedHandler {
102    clock: &'static AtomicTime,
103    signal: Arc<AtomicBool>,
104    inner: Option<WebSocketClient>,
105    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
106    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
107    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
108    auth_tracker: AuthTracker,
109    subscriptions_state: SubscriptionState,
110    retry_manager: RetryManager<DeribitWsError>,
111    instruments_cache: AHashMap<Ustr, InstrumentAny>,
112    request_id_counter: AtomicU64,
113    /// Pending requests awaiting response, keyed by request ID.
114    pending_requests: AHashMap<u64, PendingRequestType>,
115}
116
117impl DeribitWsFeedHandler {
118    /// Creates a new feed handler.
119    #[must_use]
120    pub fn new(
121        signal: Arc<AtomicBool>,
122        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
123        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
124        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
125        auth_tracker: AuthTracker,
126        subscriptions_state: SubscriptionState,
127    ) -> Self {
128        Self {
129            clock: get_atomic_clock_realtime(),
130            signal,
131            inner: None,
132            cmd_rx,
133            raw_rx,
134            out_tx,
135            auth_tracker,
136            subscriptions_state,
137            retry_manager: create_websocket_retry_manager(),
138            instruments_cache: AHashMap::new(),
139            request_id_counter: AtomicU64::new(1),
140            pending_requests: AHashMap::new(),
141        }
142    }
143
144    /// Generates a unique request ID.
145    fn next_request_id(&self) -> u64 {
146        self.request_id_counter.fetch_add(1, Ordering::Relaxed)
147    }
148
149    /// Returns the current timestamp.
150    fn ts_init(&self) -> UnixNanos {
151        self.clock.get_time_ns()
152    }
153
154    /// Sends a message over the WebSocket with retry logic.
155    async fn send_with_retry(
156        &self,
157        payload: String,
158        rate_limit_keys: Option<Vec<String>>,
159    ) -> Result<(), DeribitWsError> {
160        if let Some(client) = &self.inner {
161            self.retry_manager
162                .execute_with_retry(
163                    "websocket_send",
164                    || async {
165                        client
166                            .send_text(payload.clone(), rate_limit_keys.clone())
167                            .await
168                            .map_err(|e| DeribitWsError::Send(e.to_string()))
169                    },
170                    |e| matches!(e, DeribitWsError::Send(_)),
171                    DeribitWsError::Timeout,
172                )
173                .await
174        } else {
175            Err(DeribitWsError::NotConnected)
176        }
177    }
178
179    /// Handles a subscribe command.
180    ///
181    /// Note: The client has already called `mark_subscribe` before sending this command.
182    async fn handle_subscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
183        let request_id = self.next_request_id();
184
185        // Track this request for response correlation
186        self.pending_requests.insert(
187            request_id,
188            PendingRequestType::Subscribe {
189                channels: channels.clone(),
190            },
191        );
192
193        let request = DeribitJsonRpcRequest::new(
194            request_id,
195            "public/subscribe",
196            DeribitSubscribeParams {
197                channels: channels.clone(),
198            },
199        );
200
201        let payload =
202            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
203
204        log::debug!("Subscribing to channels: request_id={request_id}, channels={channels:?}");
205        self.send_with_retry(payload, None).await
206    }
207
208    /// Handles an unsubscribe command.
209    async fn handle_unsubscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
210        let request_id = self.next_request_id();
211
212        // Track this request for response correlation
213        self.pending_requests.insert(
214            request_id,
215            PendingRequestType::Unsubscribe {
216                channels: channels.clone(),
217            },
218        );
219
220        let request = DeribitJsonRpcRequest::new(
221            request_id,
222            "public/unsubscribe",
223            DeribitSubscribeParams {
224                channels: channels.clone(),
225            },
226        );
227
228        let payload =
229            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
230
231        log::debug!("Unsubscribing from channels: request_id={request_id}, channels={channels:?}");
232        self.send_with_retry(payload, None).await
233    }
234
235    /// Handles enabling heartbeat.
236    async fn handle_set_heartbeat(&mut self, interval: u64) -> Result<(), DeribitWsError> {
237        let request_id = self.next_request_id();
238
239        // Track this request for response correlation
240        self.pending_requests
241            .insert(request_id, PendingRequestType::SetHeartbeat);
242
243        let request = DeribitJsonRpcRequest::new(
244            request_id,
245            "public/set_heartbeat",
246            DeribitHeartbeatParams { interval },
247        );
248
249        let payload =
250            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
251
252        log::debug!(
253            "Enabling heartbeat with interval: request_id={request_id}, interval={interval} seconds"
254        );
255        self.send_with_retry(payload, None).await
256    }
257
258    /// Responds to a heartbeat test_request.
259    async fn handle_heartbeat_test_request(&mut self) -> Result<(), DeribitWsError> {
260        let request_id = self.next_request_id();
261
262        // Track this request for response correlation
263        self.pending_requests
264            .insert(request_id, PendingRequestType::Test);
265
266        let request = DeribitJsonRpcRequest::new(request_id, "public/test", serde_json::json!({}));
267
268        let payload =
269            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
270
271        log::trace!("Responding to heartbeat test_request: request_id={request_id}");
272        self.send_with_retry(payload, None).await
273    }
274
275    /// Processes a command from the client.
276    async fn process_command(&mut self, cmd: HandlerCommand) {
277        match cmd {
278            HandlerCommand::SetClient(client) => {
279                log::debug!("Setting WebSocket client");
280                self.inner = Some(client);
281            }
282            HandlerCommand::Disconnect => {
283                log::debug!("Disconnecting WebSocket");
284                if let Some(client) = self.inner.take() {
285                    client.disconnect().await;
286                }
287            }
288            HandlerCommand::Authenticate { auth_params } => {
289                let request_id = self.next_request_id();
290                log::debug!("Authenticating: request_id={request_id}");
291
292                // Track this request for response correlation
293                self.pending_requests
294                    .insert(request_id, PendingRequestType::Authenticate);
295
296                let request = DeribitJsonRpcRequest::new(request_id, "public/auth", auth_params);
297                match serde_json::to_string(&request) {
298                    Ok(payload) => {
299                        if let Err(e) = self.send_with_retry(payload, None).await {
300                            log::error!("Authentication send failed: {e}");
301                            self.auth_tracker.fail(format!("Send failed: {e}"));
302                        }
303                    }
304                    Err(e) => {
305                        log::error!("Failed to serialize auth request: {e}");
306                        self.auth_tracker.fail(format!("Serialization failed: {e}"));
307                    }
308                }
309            }
310            HandlerCommand::SetHeartbeat { interval } => {
311                if let Err(e) = self.handle_set_heartbeat(interval).await {
312                    log::error!("Set heartbeat failed: {e}");
313                }
314            }
315            HandlerCommand::InitializeInstruments(instruments) => {
316                log::debug!("Initializing {} instruments", instruments.len());
317                self.instruments_cache.clear();
318                for inst in instruments {
319                    self.instruments_cache
320                        .insert(inst.raw_symbol().inner(), inst);
321                }
322            }
323            HandlerCommand::UpdateInstrument(instrument) => {
324                log::trace!("Updating instrument: {}", instrument.raw_symbol());
325                self.instruments_cache
326                    .insert(instrument.raw_symbol().inner(), *instrument);
327            }
328            HandlerCommand::Subscribe { channels } => {
329                if let Err(e) = self.handle_subscribe(channels).await {
330                    log::error!("Subscribe failed: {e}");
331                }
332            }
333            HandlerCommand::Unsubscribe { channels } => {
334                if let Err(e) = self.handle_unsubscribe(channels).await {
335                    log::error!("Unsubscribe failed: {e}");
336                }
337            }
338        }
339    }
340
341    /// Processes a raw WebSocket message.
342    async fn process_raw_message(&mut self, text: &str) -> Option<NautilusWsMessage> {
343        // Check for reconnection signal
344        if text == RECONNECTED {
345            log::info!("Received reconnection signal");
346            return Some(NautilusWsMessage::Reconnected);
347        }
348
349        // Parse the JSON-RPC message
350        let ws_msg = match parse_raw_message(text) {
351            Ok(msg) => msg,
352            Err(e) => {
353                log::warn!("Failed to parse message: {e}");
354                return None;
355            }
356        };
357
358        let ts_init = self.ts_init();
359
360        match ws_msg {
361            DeribitWsMessage::Response(response) => {
362                // Look up the request type by ID for explicit correlation
363                if let Some(request_id) = response.id
364                    && let Some(request_type) = self.pending_requests.remove(&request_id)
365                {
366                    match request_type {
367                        PendingRequestType::Authenticate => {
368                            // Parse authentication result
369                            if let Some(result) = &response.result {
370                                match serde_json::from_value::<DeribitAuthResult>(result.clone()) {
371                                    Ok(auth_result) => {
372                                        self.auth_tracker.succeed();
373                                        log::info!(
374                                            "WebSocket authenticated successfully (request_id={}, scope={}, expires_in={}s)",
375                                            request_id,
376                                            auth_result.scope,
377                                            auth_result.expires_in
378                                        );
379                                        return Some(NautilusWsMessage::Authenticated(Box::new(
380                                            auth_result,
381                                        )));
382                                    }
383                                    Err(e) => {
384                                        log::error!(
385                                            "Failed to parse auth result: request_id={request_id}, error={e}"
386                                        );
387                                        self.auth_tracker
388                                            .fail(format!("Failed to parse auth result: {e}"));
389                                    }
390                                }
391                            }
392                        }
393                        PendingRequestType::Subscribe { channels } => {
394                            // Confirm each channel in the subscription
395                            for ch in &channels {
396                                self.subscriptions_state.confirm_subscribe(ch);
397                                log::debug!("Subscription confirmed: {ch}");
398                            }
399                        }
400                        PendingRequestType::Unsubscribe { channels } => {
401                            // Confirm each channel in the unsubscription
402                            for ch in &channels {
403                                self.subscriptions_state.confirm_unsubscribe(ch);
404                                log::debug!("Unsubscription confirmed: {ch}");
405                            }
406                        }
407                        PendingRequestType::SetHeartbeat => {
408                            log::debug!("Heartbeat enabled (request_id={request_id})");
409                        }
410                        PendingRequestType::Test => {
411                            log::trace!("Heartbeat test acknowledged (request_id={request_id})");
412                        }
413                    }
414                }
415                None
416            }
417            DeribitWsMessage::Notification(notification) => {
418                let channel = &notification.params.channel;
419                let data = &notification.params.data;
420
421                // Determine channel type and parse accordingly
422                if let Some(channel_type) = DeribitWsChannel::from_channel_string(channel) {
423                    match channel_type {
424                        DeribitWsChannel::Trades => {
425                            // Parse trade messages
426                            match serde_json::from_value::<Vec<DeribitTradeMsg>>(data.clone()) {
427                                Ok(trades) => {
428                                    log::debug!("Received {} trades", trades.len());
429                                    let data_vec =
430                                        parse_trades_data(trades, &self.instruments_cache, ts_init);
431                                    if data_vec.is_empty() {
432                                        log::debug!(
433                                            "No trades parsed - instrument cache size: {}",
434                                            self.instruments_cache.len()
435                                        );
436                                    } else {
437                                        log::debug!("Parsed {} trade ticks", data_vec.len());
438                                        return Some(NautilusWsMessage::Data(data_vec));
439                                    }
440                                }
441                                Err(e) => {
442                                    log::warn!("Failed to deserialize trades: {e}");
443                                }
444                            }
445                        }
446                        DeribitWsChannel::Book => {
447                            // Parse order book messages
448                            if let Ok(book_msg) =
449                                serde_json::from_value::<DeribitBookMsg>(data.clone())
450                                && let Some(instrument) =
451                                    self.instruments_cache.get(&book_msg.instrument_name)
452                            {
453                                match parse_book_msg(&book_msg, instrument, ts_init) {
454                                    Ok(deltas) => {
455                                        return Some(NautilusWsMessage::Deltas(deltas));
456                                    }
457                                    Err(e) => {
458                                        log::warn!("Failed to parse book message: {e}");
459                                    }
460                                }
461                            }
462                        }
463                        DeribitWsChannel::Ticker => {
464                            // Parse ticker to emit both MarkPrice and IndexPrice
465                            // When subscribed to either mark_prices or index_prices, we emit both
466                            // as traders typically need both for analysis
467                            if let Ok(ticker_msg) =
468                                serde_json::from_value::<DeribitTickerMsg>(data.clone())
469                                && let Some(instrument) =
470                                    self.instruments_cache.get(&ticker_msg.instrument_name)
471                            {
472                                let mark_price =
473                                    parse_ticker_to_mark_price(&ticker_msg, instrument, ts_init);
474                                let index_price =
475                                    parse_ticker_to_index_price(&ticker_msg, instrument, ts_init);
476
477                                return Some(NautilusWsMessage::Data(vec![
478                                    Data::MarkPriceUpdate(mark_price),
479                                    Data::IndexPriceUpdate(index_price),
480                                ]));
481                            }
482                        }
483                        DeribitWsChannel::Perpetual => {
484                            // Parse perpetual channel for funding rate updates
485                            // This channel is dedicated to perpetual instruments and provides
486                            // the interest (funding) rate
487                            match serde_json::from_value::<DeribitPerpetualMsg>(data.clone()) {
488                                Ok(perpetual_msg) => {
489                                    // Extract instrument name from channel: perpetual.{instrument}.{interval}
490                                    let parts: Vec<&str> = channel.split('.').collect();
491                                    if parts.len() >= 2 {
492                                        let instrument_name = Ustr::from(parts[1]);
493                                        if let Some(instrument) =
494                                            self.instruments_cache.get(&instrument_name)
495                                        {
496                                            if let Some(funding_rate) =
497                                                parse_perpetual_to_funding_rate(
498                                                    &perpetual_msg,
499                                                    instrument,
500                                                    ts_init,
501                                                )
502                                            {
503                                                return Some(NautilusWsMessage::FundingRates(
504                                                    vec![funding_rate],
505                                                ));
506                                            } else {
507                                                log::warn!(
508                                                    "Failed to create funding rate from perpetual msg"
509                                                );
510                                            }
511                                        } else {
512                                            log::warn!(
513                                                "Instrument {} not found in cache (cache size: {})",
514                                                instrument_name,
515                                                self.instruments_cache.len()
516                                            );
517                                        }
518                                    }
519                                }
520                                Err(e) => {
521                                    log::warn!(
522                                        "Failed to deserialize perpetual message: {e}, data: {data}"
523                                    );
524                                }
525                            }
526                        }
527                        DeribitWsChannel::Quote => {
528                            // Parse quote messages
529                            if let Ok(quote_msg) =
530                                serde_json::from_value::<DeribitQuoteMsg>(data.clone())
531                                && let Some(instrument) =
532                                    self.instruments_cache.get(&quote_msg.instrument_name)
533                            {
534                                match parse_quote_msg(&quote_msg, instrument, ts_init) {
535                                    Ok(quote) => {
536                                        return Some(NautilusWsMessage::Data(vec![Data::Quote(
537                                            quote,
538                                        )]));
539                                    }
540                                    Err(e) => {
541                                        log::warn!("Failed to parse quote message: {e}");
542                                    }
543                                }
544                            }
545                        }
546                        DeribitWsChannel::InstrumentState => {
547                            // Parse instrument state lifecycle notifications
548                            match serde_json::from_value::<DeribitInstrumentStateMsg>(data.clone())
549                            {
550                                Ok(state_msg) => {
551                                    log::info!(
552                                        "Instrument state change: {} -> {} (timestamp: {})",
553                                        state_msg.instrument_name,
554                                        state_msg.state,
555                                        state_msg.timestamp
556                                    );
557                                    // Return raw data for consumers to handle state changes
558                                    // TODO: Optionally emit instrument updates when instrument transitions to 'started'
559                                    return Some(NautilusWsMessage::Raw(data.clone()));
560                                }
561                                Err(e) => {
562                                    log::warn!("Failed to parse instrument state message: {e}");
563                                }
564                            }
565                        }
566                        DeribitWsChannel::ChartTrades => {
567                            // Parse chart.trades messages into Bar objects
568                            if let Ok(chart_msg) =
569                                serde_json::from_value::<DeribitChartMsg>(data.clone())
570                            {
571                                // Extract instrument and resolution from channel
572                                // Channel format: chart.trades.{instrument}.{resolution}
573                                let parts: Vec<&str> = channel.split('.').collect();
574                                if parts.len() >= 4 {
575                                    let instrument_name = Ustr::from(parts[2]);
576                                    let resolution = parts[3];
577
578                                    if let Some(instrument) =
579                                        self.instruments_cache.get(&instrument_name)
580                                    {
581                                        let instrument_id = instrument.id();
582
583                                        // Create BarType from resolution and instrument
584                                        match resolution_to_bar_type(instrument_id, resolution) {
585                                            Ok(bar_type) => {
586                                                let price_precision = instrument.price_precision();
587                                                let size_precision = instrument.size_precision();
588
589                                                match parse_chart_msg(
590                                                    &chart_msg,
591                                                    bar_type,
592                                                    price_precision,
593                                                    size_precision,
594                                                    ts_init,
595                                                ) {
596                                                    Ok(bar) => {
597                                                        log::debug!("Parsed bar: {bar:?}");
598                                                        return Some(NautilusWsMessage::Data(
599                                                            vec![Data::Bar(bar)],
600                                                        ));
601                                                    }
602                                                    Err(e) => {
603                                                        log::warn!(
604                                                            "Failed to parse chart message to bar: {e}"
605                                                        );
606                                                    }
607                                                }
608                                            }
609                                            Err(e) => {
610                                                log::warn!(
611                                                    "Failed to create BarType from resolution {resolution}: {e}"
612                                                );
613                                            }
614                                        }
615                                    } else {
616                                        log::warn!(
617                                            "Instrument {instrument_name} not found in cache for chart data"
618                                        );
619                                    }
620                                }
621                            }
622                        }
623                        _ => {
624                            // Unhandled channel - return raw
625                            log::trace!("Unhandled channel: {channel}");
626                            return Some(NautilusWsMessage::Raw(data.clone()));
627                        }
628                    }
629                } else {
630                    log::trace!("Unknown channel: {channel}");
631                    return Some(NautilusWsMessage::Raw(data.clone()));
632                }
633                None
634            }
635            DeribitWsMessage::Heartbeat(heartbeat) => {
636                match heartbeat.heartbeat_type {
637                    DeribitHeartbeatType::TestRequest => {
638                        log::trace!(
639                            "Received heartbeat test_request - responding with public/test"
640                        );
641                        if let Err(e) = self.handle_heartbeat_test_request().await {
642                            log::error!("Failed to respond to heartbeat test_request: {e}");
643                        }
644                    }
645                    DeribitHeartbeatType::Heartbeat => {
646                        log::trace!("Received heartbeat acknowledgment");
647                    }
648                }
649                None
650            }
651            DeribitWsMessage::Error(err) => {
652                log::error!("Deribit error {}: {}", err.code, err.message);
653                Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
654                    code: err.code,
655                    message: err.message,
656                }))
657            }
658            DeribitWsMessage::Reconnected => Some(NautilusWsMessage::Reconnected),
659        }
660    }
661
662    /// Main message processing loop.
663    ///
664    /// Returns `None` when the handler should stop.
665    /// Messages that need client-side handling (e.g., Reconnected) are returned.
666    /// Data messages are sent directly to `out_tx` for the user stream.
667    pub async fn next(&mut self) -> Option<NautilusWsMessage> {
668        loop {
669            tokio::select! {
670                // Process commands from client
671                Some(cmd) = self.cmd_rx.recv() => {
672                    self.process_command(cmd).await;
673                }
674                // Process raw WebSocket messages
675                Some(msg) = self.raw_rx.recv() => {
676                    match msg {
677                        Message::Text(text) => {
678                            if let Some(nautilus_msg) = self.process_raw_message(&text).await {
679                                // Send data messages to user stream
680                                match &nautilus_msg {
681                                    NautilusWsMessage::Data(_)
682                                    | NautilusWsMessage::Deltas(_)
683                                    | NautilusWsMessage::Instrument(_)
684                                    | NautilusWsMessage::Raw(_)
685                                    | NautilusWsMessage::Error(_) => {
686                                        let _ = self.out_tx.send(nautilus_msg);
687                                    }
688                                    NautilusWsMessage::FundingRates(rates) => {
689                                        let msg_to_send =
690                                            NautilusWsMessage::FundingRates(rates.clone());
691                                        if let Err(e) = self.out_tx.send(msg_to_send) {
692                                            log::error!("Failed to send funding rates: {e}");
693                                        }
694                                    }
695                                    // Return messages that need client-side handling
696                                    NautilusWsMessage::Reconnected
697                                    | NautilusWsMessage::Authenticated(_) => {
698                                        return Some(nautilus_msg);
699                                    }
700                                }
701                            }
702                        }
703                        Message::Ping(data) => {
704                            // Respond to ping with pong
705                            if let Some(client) = &self.inner {
706                                let _ = client.send_pong(data.to_vec()).await;
707                            }
708                        }
709                        Message::Close(_) => {
710                            log::info!("Received close frame");
711                        }
712                        _ => {}
713                    }
714                }
715                // Check for stop signal
716                () = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
717                    if self.signal.load(Ordering::Relaxed) {
718                        log::debug!("Stop signal received");
719                        return None;
720                    }
721                }
722            }
723        }
724    }
725}