nautilus_deribit/websocket/handler.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Deribit.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel.
21
22use std::sync::{
23 Arc,
24 atomic::{AtomicBool, AtomicU64, Ordering},
25};
26
27use ahash::AHashMap;
28use nautilus_core::{AtomicTime, UnixNanos, time::get_atomic_clock_realtime};
29use nautilus_model::{
30 data::Data,
31 instruments::{Instrument, InstrumentAny},
32};
33use nautilus_network::{
34 RECONNECTED,
35 retry::{RetryManager, create_websocket_retry_manager},
36 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
37};
38use tokio_tungstenite::tungstenite::Message;
39use ustr::Ustr;
40
41use super::{
42 enums::{DeribitHeartbeatType, DeribitWsChannel},
43 error::DeribitWsError,
44 messages::{
45 DeribitAuthResult, DeribitBookMsg, DeribitChartMsg, DeribitHeartbeatParams,
46 DeribitInstrumentStateMsg, DeribitJsonRpcRequest, DeribitPerpetualMsg, DeribitQuoteMsg,
47 DeribitSubscribeParams, DeribitTickerMsg, DeribitTradeMsg, DeribitWsMessage,
48 NautilusWsMessage, parse_raw_message,
49 },
50 parse::{
51 parse_book_msg, parse_chart_msg, parse_perpetual_to_funding_rate, parse_quote_msg,
52 parse_ticker_to_index_price, parse_ticker_to_mark_price, parse_trades_data,
53 resolution_to_bar_type,
54 },
55};
56
57/// Type of pending request for request ID correlation.
58#[derive(Debug, Clone)]
59pub enum PendingRequestType {
60 /// Authentication request.
61 Authenticate,
62 /// Subscribe request with requested channels.
63 Subscribe { channels: Vec<String> },
64 /// Unsubscribe request with requested channels.
65 Unsubscribe { channels: Vec<String> },
66 /// Set heartbeat request.
67 SetHeartbeat,
68 /// Test/ping request (heartbeat response).
69 Test,
70}
71
72/// Commands sent from the client to the handler.
73#[allow(missing_debug_implementations)]
74pub enum HandlerCommand {
75 /// Set the active WebSocket client.
76 SetClient(WebSocketClient),
77 /// Disconnect the WebSocket.
78 Disconnect,
79 /// Authenticate with credentials.
80 Authenticate {
81 /// Serialized auth params (DeribitAuthParams or DeribitRefreshTokenParams).
82 auth_params: serde_json::Value,
83 },
84 /// Enable heartbeat with interval.
85 SetHeartbeat { interval: u64 },
86 /// Initialize the instrument cache.
87 InitializeInstruments(Vec<InstrumentAny>),
88 /// Update a single instrument in the cache.
89 UpdateInstrument(Box<InstrumentAny>),
90 /// Subscribe to channels.
91 Subscribe { channels: Vec<String> },
92 /// Unsubscribe from channels.
93 Unsubscribe { channels: Vec<String> },
94}
95
96/// Deribit WebSocket feed handler.
97///
98/// Runs in a dedicated Tokio task, processing commands and raw WebSocket messages.
99#[allow(missing_debug_implementations)]
100#[allow(dead_code)] // Fields reserved for future features
101pub struct DeribitWsFeedHandler {
102 clock: &'static AtomicTime,
103 signal: Arc<AtomicBool>,
104 inner: Option<WebSocketClient>,
105 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
106 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
107 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
108 auth_tracker: AuthTracker,
109 subscriptions_state: SubscriptionState,
110 retry_manager: RetryManager<DeribitWsError>,
111 instruments_cache: AHashMap<Ustr, InstrumentAny>,
112 request_id_counter: AtomicU64,
113 /// Pending requests awaiting response, keyed by request ID.
114 pending_requests: AHashMap<u64, PendingRequestType>,
115}
116
117impl DeribitWsFeedHandler {
118 /// Creates a new feed handler.
119 #[must_use]
120 pub fn new(
121 signal: Arc<AtomicBool>,
122 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
123 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
124 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
125 auth_tracker: AuthTracker,
126 subscriptions_state: SubscriptionState,
127 ) -> Self {
128 Self {
129 clock: get_atomic_clock_realtime(),
130 signal,
131 inner: None,
132 cmd_rx,
133 raw_rx,
134 out_tx,
135 auth_tracker,
136 subscriptions_state,
137 retry_manager: create_websocket_retry_manager(),
138 instruments_cache: AHashMap::new(),
139 request_id_counter: AtomicU64::new(1),
140 pending_requests: AHashMap::new(),
141 }
142 }
143
144 /// Generates a unique request ID.
145 fn next_request_id(&self) -> u64 {
146 self.request_id_counter.fetch_add(1, Ordering::Relaxed)
147 }
148
149 /// Returns the current timestamp.
150 fn ts_init(&self) -> UnixNanos {
151 self.clock.get_time_ns()
152 }
153
154 /// Sends a message over the WebSocket with retry logic.
155 async fn send_with_retry(
156 &self,
157 payload: String,
158 rate_limit_keys: Option<Vec<String>>,
159 ) -> Result<(), DeribitWsError> {
160 if let Some(client) = &self.inner {
161 self.retry_manager
162 .execute_with_retry(
163 "websocket_send",
164 || async {
165 client
166 .send_text(payload.clone(), rate_limit_keys.clone())
167 .await
168 .map_err(|e| DeribitWsError::Send(e.to_string()))
169 },
170 |e| matches!(e, DeribitWsError::Send(_)),
171 DeribitWsError::Timeout,
172 )
173 .await
174 } else {
175 Err(DeribitWsError::NotConnected)
176 }
177 }
178
179 /// Handles a subscribe command.
180 ///
181 /// Note: The client has already called `mark_subscribe` before sending this command.
182 async fn handle_subscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
183 let request_id = self.next_request_id();
184
185 // Track this request for response correlation
186 self.pending_requests.insert(
187 request_id,
188 PendingRequestType::Subscribe {
189 channels: channels.clone(),
190 },
191 );
192
193 let request = DeribitJsonRpcRequest::new(
194 request_id,
195 "public/subscribe",
196 DeribitSubscribeParams {
197 channels: channels.clone(),
198 },
199 );
200
201 let payload =
202 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
203
204 log::debug!("Subscribing to channels: request_id={request_id}, channels={channels:?}");
205 self.send_with_retry(payload, None).await
206 }
207
208 /// Handles an unsubscribe command.
209 async fn handle_unsubscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
210 let request_id = self.next_request_id();
211
212 // Track this request for response correlation
213 self.pending_requests.insert(
214 request_id,
215 PendingRequestType::Unsubscribe {
216 channels: channels.clone(),
217 },
218 );
219
220 let request = DeribitJsonRpcRequest::new(
221 request_id,
222 "public/unsubscribe",
223 DeribitSubscribeParams {
224 channels: channels.clone(),
225 },
226 );
227
228 let payload =
229 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
230
231 log::debug!("Unsubscribing from channels: request_id={request_id}, channels={channels:?}");
232 self.send_with_retry(payload, None).await
233 }
234
235 /// Handles enabling heartbeat.
236 async fn handle_set_heartbeat(&mut self, interval: u64) -> Result<(), DeribitWsError> {
237 let request_id = self.next_request_id();
238
239 // Track this request for response correlation
240 self.pending_requests
241 .insert(request_id, PendingRequestType::SetHeartbeat);
242
243 let request = DeribitJsonRpcRequest::new(
244 request_id,
245 "public/set_heartbeat",
246 DeribitHeartbeatParams { interval },
247 );
248
249 let payload =
250 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
251
252 log::debug!(
253 "Enabling heartbeat with interval: request_id={request_id}, interval={interval} seconds"
254 );
255 self.send_with_retry(payload, None).await
256 }
257
258 /// Responds to a heartbeat test_request.
259 async fn handle_heartbeat_test_request(&mut self) -> Result<(), DeribitWsError> {
260 let request_id = self.next_request_id();
261
262 // Track this request for response correlation
263 self.pending_requests
264 .insert(request_id, PendingRequestType::Test);
265
266 let request = DeribitJsonRpcRequest::new(request_id, "public/test", serde_json::json!({}));
267
268 let payload =
269 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()))?;
270
271 log::trace!("Responding to heartbeat test_request: request_id={request_id}");
272 self.send_with_retry(payload, None).await
273 }
274
275 /// Processes a command from the client.
276 async fn process_command(&mut self, cmd: HandlerCommand) {
277 match cmd {
278 HandlerCommand::SetClient(client) => {
279 log::debug!("Setting WebSocket client");
280 self.inner = Some(client);
281 }
282 HandlerCommand::Disconnect => {
283 log::debug!("Disconnecting WebSocket");
284 if let Some(client) = self.inner.take() {
285 client.disconnect().await;
286 }
287 }
288 HandlerCommand::Authenticate { auth_params } => {
289 let request_id = self.next_request_id();
290 log::debug!("Authenticating: request_id={request_id}");
291
292 // Track this request for response correlation
293 self.pending_requests
294 .insert(request_id, PendingRequestType::Authenticate);
295
296 let request = DeribitJsonRpcRequest::new(request_id, "public/auth", auth_params);
297 match serde_json::to_string(&request) {
298 Ok(payload) => {
299 if let Err(e) = self.send_with_retry(payload, None).await {
300 log::error!("Authentication send failed: {e}");
301 self.auth_tracker.fail(format!("Send failed: {e}"));
302 }
303 }
304 Err(e) => {
305 log::error!("Failed to serialize auth request: {e}");
306 self.auth_tracker.fail(format!("Serialization failed: {e}"));
307 }
308 }
309 }
310 HandlerCommand::SetHeartbeat { interval } => {
311 if let Err(e) = self.handle_set_heartbeat(interval).await {
312 log::error!("Set heartbeat failed: {e}");
313 }
314 }
315 HandlerCommand::InitializeInstruments(instruments) => {
316 log::debug!("Initializing {} instruments", instruments.len());
317 self.instruments_cache.clear();
318 for inst in instruments {
319 self.instruments_cache
320 .insert(inst.raw_symbol().inner(), inst);
321 }
322 }
323 HandlerCommand::UpdateInstrument(instrument) => {
324 log::trace!("Updating instrument: {}", instrument.raw_symbol());
325 self.instruments_cache
326 .insert(instrument.raw_symbol().inner(), *instrument);
327 }
328 HandlerCommand::Subscribe { channels } => {
329 if let Err(e) = self.handle_subscribe(channels).await {
330 log::error!("Subscribe failed: {e}");
331 }
332 }
333 HandlerCommand::Unsubscribe { channels } => {
334 if let Err(e) = self.handle_unsubscribe(channels).await {
335 log::error!("Unsubscribe failed: {e}");
336 }
337 }
338 }
339 }
340
341 /// Processes a raw WebSocket message.
342 async fn process_raw_message(&mut self, text: &str) -> Option<NautilusWsMessage> {
343 // Check for reconnection signal
344 if text == RECONNECTED {
345 log::info!("Received reconnection signal");
346 return Some(NautilusWsMessage::Reconnected);
347 }
348
349 // Parse the JSON-RPC message
350 let ws_msg = match parse_raw_message(text) {
351 Ok(msg) => msg,
352 Err(e) => {
353 log::warn!("Failed to parse message: {e}");
354 return None;
355 }
356 };
357
358 let ts_init = self.ts_init();
359
360 match ws_msg {
361 DeribitWsMessage::Response(response) => {
362 // Look up the request type by ID for explicit correlation
363 if let Some(request_id) = response.id
364 && let Some(request_type) = self.pending_requests.remove(&request_id)
365 {
366 match request_type {
367 PendingRequestType::Authenticate => {
368 // Parse authentication result
369 if let Some(result) = &response.result {
370 match serde_json::from_value::<DeribitAuthResult>(result.clone()) {
371 Ok(auth_result) => {
372 self.auth_tracker.succeed();
373 log::info!(
374 "WebSocket authenticated successfully (request_id={}, scope={}, expires_in={}s)",
375 request_id,
376 auth_result.scope,
377 auth_result.expires_in
378 );
379 return Some(NautilusWsMessage::Authenticated(Box::new(
380 auth_result,
381 )));
382 }
383 Err(e) => {
384 log::error!(
385 "Failed to parse auth result: request_id={request_id}, error={e}"
386 );
387 self.auth_tracker
388 .fail(format!("Failed to parse auth result: {e}"));
389 }
390 }
391 }
392 }
393 PendingRequestType::Subscribe { channels } => {
394 // Confirm each channel in the subscription
395 for ch in &channels {
396 self.subscriptions_state.confirm_subscribe(ch);
397 log::debug!("Subscription confirmed: {ch}");
398 }
399 }
400 PendingRequestType::Unsubscribe { channels } => {
401 // Confirm each channel in the unsubscription
402 for ch in &channels {
403 self.subscriptions_state.confirm_unsubscribe(ch);
404 log::debug!("Unsubscription confirmed: {ch}");
405 }
406 }
407 PendingRequestType::SetHeartbeat => {
408 log::debug!("Heartbeat enabled (request_id={request_id})");
409 }
410 PendingRequestType::Test => {
411 log::trace!("Heartbeat test acknowledged (request_id={request_id})");
412 }
413 }
414 }
415 None
416 }
417 DeribitWsMessage::Notification(notification) => {
418 let channel = ¬ification.params.channel;
419 let data = ¬ification.params.data;
420
421 // Determine channel type and parse accordingly
422 if let Some(channel_type) = DeribitWsChannel::from_channel_string(channel) {
423 match channel_type {
424 DeribitWsChannel::Trades => {
425 // Parse trade messages
426 match serde_json::from_value::<Vec<DeribitTradeMsg>>(data.clone()) {
427 Ok(trades) => {
428 log::debug!("Received {} trades", trades.len());
429 let data_vec =
430 parse_trades_data(trades, &self.instruments_cache, ts_init);
431 if data_vec.is_empty() {
432 log::debug!(
433 "No trades parsed - instrument cache size: {}",
434 self.instruments_cache.len()
435 );
436 } else {
437 log::debug!("Parsed {} trade ticks", data_vec.len());
438 return Some(NautilusWsMessage::Data(data_vec));
439 }
440 }
441 Err(e) => {
442 log::warn!("Failed to deserialize trades: {e}");
443 }
444 }
445 }
446 DeribitWsChannel::Book => {
447 // Parse order book messages
448 if let Ok(book_msg) =
449 serde_json::from_value::<DeribitBookMsg>(data.clone())
450 && let Some(instrument) =
451 self.instruments_cache.get(&book_msg.instrument_name)
452 {
453 match parse_book_msg(&book_msg, instrument, ts_init) {
454 Ok(deltas) => {
455 return Some(NautilusWsMessage::Deltas(deltas));
456 }
457 Err(e) => {
458 log::warn!("Failed to parse book message: {e}");
459 }
460 }
461 }
462 }
463 DeribitWsChannel::Ticker => {
464 // Parse ticker to emit both MarkPrice and IndexPrice
465 // When subscribed to either mark_prices or index_prices, we emit both
466 // as traders typically need both for analysis
467 if let Ok(ticker_msg) =
468 serde_json::from_value::<DeribitTickerMsg>(data.clone())
469 && let Some(instrument) =
470 self.instruments_cache.get(&ticker_msg.instrument_name)
471 {
472 let mark_price =
473 parse_ticker_to_mark_price(&ticker_msg, instrument, ts_init);
474 let index_price =
475 parse_ticker_to_index_price(&ticker_msg, instrument, ts_init);
476
477 return Some(NautilusWsMessage::Data(vec![
478 Data::MarkPriceUpdate(mark_price),
479 Data::IndexPriceUpdate(index_price),
480 ]));
481 }
482 }
483 DeribitWsChannel::Perpetual => {
484 // Parse perpetual channel for funding rate updates
485 // This channel is dedicated to perpetual instruments and provides
486 // the interest (funding) rate
487 match serde_json::from_value::<DeribitPerpetualMsg>(data.clone()) {
488 Ok(perpetual_msg) => {
489 // Extract instrument name from channel: perpetual.{instrument}.{interval}
490 let parts: Vec<&str> = channel.split('.').collect();
491 if parts.len() >= 2 {
492 let instrument_name = Ustr::from(parts[1]);
493 if let Some(instrument) =
494 self.instruments_cache.get(&instrument_name)
495 {
496 if let Some(funding_rate) =
497 parse_perpetual_to_funding_rate(
498 &perpetual_msg,
499 instrument,
500 ts_init,
501 )
502 {
503 return Some(NautilusWsMessage::FundingRates(
504 vec![funding_rate],
505 ));
506 } else {
507 log::warn!(
508 "Failed to create funding rate from perpetual msg"
509 );
510 }
511 } else {
512 log::warn!(
513 "Instrument {} not found in cache (cache size: {})",
514 instrument_name,
515 self.instruments_cache.len()
516 );
517 }
518 }
519 }
520 Err(e) => {
521 log::warn!(
522 "Failed to deserialize perpetual message: {e}, data: {data}"
523 );
524 }
525 }
526 }
527 DeribitWsChannel::Quote => {
528 // Parse quote messages
529 if let Ok(quote_msg) =
530 serde_json::from_value::<DeribitQuoteMsg>(data.clone())
531 && let Some(instrument) =
532 self.instruments_cache.get("e_msg.instrument_name)
533 {
534 match parse_quote_msg("e_msg, instrument, ts_init) {
535 Ok(quote) => {
536 return Some(NautilusWsMessage::Data(vec![Data::Quote(
537 quote,
538 )]));
539 }
540 Err(e) => {
541 log::warn!("Failed to parse quote message: {e}");
542 }
543 }
544 }
545 }
546 DeribitWsChannel::InstrumentState => {
547 // Parse instrument state lifecycle notifications
548 match serde_json::from_value::<DeribitInstrumentStateMsg>(data.clone())
549 {
550 Ok(state_msg) => {
551 log::info!(
552 "Instrument state change: {} -> {} (timestamp: {})",
553 state_msg.instrument_name,
554 state_msg.state,
555 state_msg.timestamp
556 );
557 // Return raw data for consumers to handle state changes
558 // TODO: Optionally emit instrument updates when instrument transitions to 'started'
559 return Some(NautilusWsMessage::Raw(data.clone()));
560 }
561 Err(e) => {
562 log::warn!("Failed to parse instrument state message: {e}");
563 }
564 }
565 }
566 DeribitWsChannel::ChartTrades => {
567 // Parse chart.trades messages into Bar objects
568 if let Ok(chart_msg) =
569 serde_json::from_value::<DeribitChartMsg>(data.clone())
570 {
571 // Extract instrument and resolution from channel
572 // Channel format: chart.trades.{instrument}.{resolution}
573 let parts: Vec<&str> = channel.split('.').collect();
574 if parts.len() >= 4 {
575 let instrument_name = Ustr::from(parts[2]);
576 let resolution = parts[3];
577
578 if let Some(instrument) =
579 self.instruments_cache.get(&instrument_name)
580 {
581 let instrument_id = instrument.id();
582
583 // Create BarType from resolution and instrument
584 match resolution_to_bar_type(instrument_id, resolution) {
585 Ok(bar_type) => {
586 let price_precision = instrument.price_precision();
587 let size_precision = instrument.size_precision();
588
589 match parse_chart_msg(
590 &chart_msg,
591 bar_type,
592 price_precision,
593 size_precision,
594 ts_init,
595 ) {
596 Ok(bar) => {
597 log::debug!("Parsed bar: {bar:?}");
598 return Some(NautilusWsMessage::Data(
599 vec![Data::Bar(bar)],
600 ));
601 }
602 Err(e) => {
603 log::warn!(
604 "Failed to parse chart message to bar: {e}"
605 );
606 }
607 }
608 }
609 Err(e) => {
610 log::warn!(
611 "Failed to create BarType from resolution {resolution}: {e}"
612 );
613 }
614 }
615 } else {
616 log::warn!(
617 "Instrument {instrument_name} not found in cache for chart data"
618 );
619 }
620 }
621 }
622 }
623 _ => {
624 // Unhandled channel - return raw
625 log::trace!("Unhandled channel: {channel}");
626 return Some(NautilusWsMessage::Raw(data.clone()));
627 }
628 }
629 } else {
630 log::trace!("Unknown channel: {channel}");
631 return Some(NautilusWsMessage::Raw(data.clone()));
632 }
633 None
634 }
635 DeribitWsMessage::Heartbeat(heartbeat) => {
636 match heartbeat.heartbeat_type {
637 DeribitHeartbeatType::TestRequest => {
638 log::trace!(
639 "Received heartbeat test_request - responding with public/test"
640 );
641 if let Err(e) = self.handle_heartbeat_test_request().await {
642 log::error!("Failed to respond to heartbeat test_request: {e}");
643 }
644 }
645 DeribitHeartbeatType::Heartbeat => {
646 log::trace!("Received heartbeat acknowledgment");
647 }
648 }
649 None
650 }
651 DeribitWsMessage::Error(err) => {
652 log::error!("Deribit error {}: {}", err.code, err.message);
653 Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
654 code: err.code,
655 message: err.message,
656 }))
657 }
658 DeribitWsMessage::Reconnected => Some(NautilusWsMessage::Reconnected),
659 }
660 }
661
662 /// Main message processing loop.
663 ///
664 /// Returns `None` when the handler should stop.
665 /// Messages that need client-side handling (e.g., Reconnected) are returned.
666 /// Data messages are sent directly to `out_tx` for the user stream.
667 pub async fn next(&mut self) -> Option<NautilusWsMessage> {
668 loop {
669 tokio::select! {
670 // Process commands from client
671 Some(cmd) = self.cmd_rx.recv() => {
672 self.process_command(cmd).await;
673 }
674 // Process raw WebSocket messages
675 Some(msg) = self.raw_rx.recv() => {
676 match msg {
677 Message::Text(text) => {
678 if let Some(nautilus_msg) = self.process_raw_message(&text).await {
679 // Send data messages to user stream
680 match &nautilus_msg {
681 NautilusWsMessage::Data(_)
682 | NautilusWsMessage::Deltas(_)
683 | NautilusWsMessage::Instrument(_)
684 | NautilusWsMessage::Raw(_)
685 | NautilusWsMessage::Error(_) => {
686 let _ = self.out_tx.send(nautilus_msg);
687 }
688 NautilusWsMessage::FundingRates(rates) => {
689 let msg_to_send =
690 NautilusWsMessage::FundingRates(rates.clone());
691 if let Err(e) = self.out_tx.send(msg_to_send) {
692 log::error!("Failed to send funding rates: {e}");
693 }
694 }
695 // Return messages that need client-side handling
696 NautilusWsMessage::Reconnected
697 | NautilusWsMessage::Authenticated(_) => {
698 return Some(nautilus_msg);
699 }
700 }
701 }
702 }
703 Message::Ping(data) => {
704 // Respond to ping with pong
705 if let Some(client) = &self.inner {
706 let _ = client.send_pong(data.to_vec()).await;
707 }
708 }
709 Message::Close(_) => {
710 log::info!("Received close frame");
711 }
712 _ => {}
713 }
714 }
715 // Check for stop signal
716 () = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
717 if self.signal.load(Ordering::Relaxed) {
718 log::debug!("Stop signal received");
719 return None;
720 }
721 }
722 }
723 }
724 }
725}