nautilus_coinbase_intx/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    sync::{
18        Arc,
19        atomic::{AtomicBool, Ordering},
20    },
21    time::{Duration, SystemTime},
22};
23
24use ahash::{AHashMap, AHashSet};
25use chrono::Utc;
26use dashmap::DashMap;
27use futures_util::{Stream, StreamExt};
28use nautilus_common::{logging::log_task_stopped, runtime::get_runtime};
29use nautilus_core::{
30    consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
31};
32use nautilus_model::{
33    data::{BarType, Data, OrderBookDeltas_API},
34    identifiers::InstrumentId,
35    instruments::{Instrument, InstrumentAny},
36};
37use nautilus_network::websocket::{MessageReader, WebSocketClient, WebSocketConfig};
38use reqwest::header::USER_AGENT;
39use tokio_tungstenite::tungstenite::{Error, Message};
40use ustr::Ustr;
41
42use super::{
43    enums::{CoinbaseIntxWsChannel, WsOperation},
44    error::CoinbaseIntxWsError,
45    messages::{CoinbaseIntxSubscription, CoinbaseIntxWsMessage, NautilusWsMessage},
46    parse::{
47        parse_candle_msg, parse_index_price_msg, parse_mark_price_msg,
48        parse_orderbook_snapshot_msg, parse_orderbook_update_msg, parse_quote_msg,
49    },
50};
51use crate::{
52    common::{
53        consts::COINBASE_INTX_WS_URL, credential::Credential, parse::bar_spec_as_coinbase_channel,
54    },
55    websocket::parse::{parse_instrument_any, parse_trade_msg},
56};
57
58/// Provides a WebSocket client for connecting to [Coinbase International](https://www.coinbase.com/en/international-exchange).
59#[derive(Debug, Clone)]
60#[cfg_attr(
61    feature = "python",
62    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
63)]
64pub struct CoinbaseIntxWebSocketClient {
65    url: String,
66    credential: Credential,
67    heartbeat: Option<u64>,
68    inner: Arc<tokio::sync::RwLock<Option<WebSocketClient>>>,
69    rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
70    signal: Arc<AtomicBool>,
71    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
72    subscriptions: Arc<DashMap<CoinbaseIntxWsChannel, AHashSet<Ustr>>>,
73    instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
74}
75
76impl Default for CoinbaseIntxWebSocketClient {
77    fn default() -> Self {
78        Self::new(None, None, None, None, Some(10)).expect("Failed to create client")
79    }
80}
81
82impl CoinbaseIntxWebSocketClient {
83    /// Creates a new [`CoinbaseIntxWebSocketClient`] instance.
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if required environment variables are missing or invalid.
88    pub fn new(
89        url: Option<String>,
90        api_key: Option<String>,
91        api_secret: Option<String>,
92        api_passphrase: Option<String>,
93        heartbeat: Option<u64>,
94    ) -> anyhow::Result<Self> {
95        let url = url.unwrap_or(COINBASE_INTX_WS_URL.to_string());
96        let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
97        let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
98        let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
99
100        let credential = Credential::new(api_key, api_secret, api_passphrase);
101        let signal = Arc::new(AtomicBool::new(false));
102        let subscriptions = Arc::new(DashMap::new());
103        let instruments_cache = Arc::new(AHashMap::new());
104
105        Ok(Self {
106            url,
107            credential,
108            heartbeat,
109            inner: Arc::new(tokio::sync::RwLock::new(None)),
110            rx: None,
111            signal,
112            task_handle: None,
113            subscriptions,
114            instruments_cache,
115        })
116    }
117
118    /// Creates a new authenticated [`CoinbaseIntxWebSocketClient`] using environment variables and
119    /// the default Coinbase International production websocket url.
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if required environment variables are missing or invalid.
124    pub fn from_env() -> anyhow::Result<Self> {
125        Self::new(None, None, None, None, None)
126    }
127
128    /// Returns the websocket url being used by the client.
129    #[must_use]
130    pub const fn url(&self) -> &str {
131        self.url.as_str()
132    }
133
134    /// Returns the public API key being used by the client.
135    #[must_use]
136    pub fn api_key(&self) -> &str {
137        self.credential.api_key.as_str()
138    }
139
140    /// Returns a value indicating whether the client is active.
141    #[must_use]
142    pub fn is_active(&self) -> bool {
143        self.inner
144            .try_read()
145            .ok()
146            .and_then(|guard| {
147                guard
148                    .as_ref()
149                    .map(nautilus_network::websocket::WebSocketClient::is_active)
150            })
151            .unwrap_or(false)
152    }
153
154    /// Returns a value indicating whether the client is closed.
155    #[must_use]
156    pub fn is_closed(&self) -> bool {
157        self.inner
158            .try_read()
159            .ok()
160            .and_then(|guard| {
161                guard
162                    .as_ref()
163                    .map(nautilus_network::websocket::WebSocketClient::is_closed)
164            })
165            .unwrap_or(true)
166    }
167
168    /// Initialize the instruments cache with the given `instruments`.
169    pub fn initialize_instruments_cache(&mut self, instruments: Vec<InstrumentAny>) {
170        let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
171        for inst in instruments {
172            instruments_cache.insert(inst.symbol().inner(), inst.clone());
173        }
174
175        self.instruments_cache = Arc::new(instruments_cache);
176    }
177
178    /// Get active subscriptions for a specific instrument.
179    #[must_use]
180    pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<CoinbaseIntxWsChannel> {
181        let product_id = instrument_id.symbol.inner();
182        let mut channels = Vec::new();
183
184        for entry in self.subscriptions.iter() {
185            let (channel, instruments) = entry.pair();
186            if instruments.contains(&product_id) {
187                channels.push(*channel);
188            }
189        }
190
191        channels
192    }
193
194    /// Connects the client to the server and caches the given instruments.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if the WebSocket connection or initial subscription fails.
199    pub async fn connect(&mut self) -> anyhow::Result<()> {
200        let client = self.clone();
201        let post_reconnect = Arc::new(move || {
202            let client = client.clone();
203            tokio::spawn(async move { client.resubscribe_all().await });
204        });
205
206        let config = WebSocketConfig {
207            url: self.url.clone(),
208            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
209            message_handler: None, // Will be handled by the returned reader
210            heartbeat: self.heartbeat,
211            heartbeat_msg: None,
212            ping_handler: None,
213            reconnect_timeout_ms: Some(5_000),
214            reconnect_delay_initial_ms: None, // Use default
215            reconnect_delay_max_ms: None,     // Use default
216            reconnect_backoff_factor: None,   // Use default
217            reconnect_jitter_ms: None,        // Use default
218        };
219        let (reader, client) =
220            WebSocketClient::connect_stream(config, vec![], None, Some(post_reconnect)).await?;
221
222        *self.inner.write().await = Some(client);
223
224        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
225        self.rx = Some(Arc::new(rx));
226        let signal = self.signal.clone();
227
228        // TODO: For now just clone the entire cache out of the arc on connect
229        let instruments_cache = (*self.instruments_cache).clone();
230
231        let stream_handle = get_runtime().spawn(async move {
232            CoinbaseIntxWsMessageHandler::new(reader, signal, tx, instruments_cache)
233                .run()
234                .await;
235        });
236
237        self.task_handle = Some(Arc::new(stream_handle));
238
239        Ok(())
240    }
241
242    /// Wait until the WebSocket connection is active.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if the connection times out.
247    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), CoinbaseIntxWsError> {
248        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
249
250        tokio::time::timeout(timeout, async {
251            while !self.is_active() {
252                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
253            }
254        })
255        .await
256        .map_err(|_| {
257            CoinbaseIntxWsError::ClientError(format!(
258                "WebSocket connection timeout after {timeout_secs} seconds"
259            ))
260        })?;
261
262        Ok(())
263    }
264
265    /// Provides the internal data stream as a channel-based stream.
266    ///
267    /// # Panics
268    ///
269    /// This function panics if:
270    /// - The websocket is not connected.
271    /// - If `stream_data` has already been called somewhere else (stream receiver is then taken).
272    pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + 'static {
273        let rx = self
274            .rx
275            .take()
276            .expect("Data stream receiver already taken or not connected"); // Design-time error
277        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
278        async_stream::stream! {
279            while let Some(data) = rx.recv().await {
280                yield data;
281            }
282        }
283    }
284
285    /// Closes the client.
286    ///
287    /// # Errors
288    ///
289    /// Returns an error if the WebSocket fails to close properly.
290    pub async fn close(&mut self) -> Result<(), Error> {
291        tracing::debug!("Closing");
292        self.signal.store(true, Ordering::Relaxed);
293
294        match tokio::time::timeout(Duration::from_secs(5), async {
295            if let Some(inner) = self.inner.read().await.as_ref() {
296                inner.disconnect().await;
297            } else {
298                log::error!("Error on close: not connected");
299            }
300        })
301        .await
302        {
303            Ok(()) => {
304                tracing::debug!("Inner disconnected");
305            }
306            Err(_) => {
307                tracing::error!("Timeout waiting for inner client to disconnect");
308            }
309        }
310
311        log::debug!("Closed");
312
313        Ok(())
314    }
315
316    /// Subscribes to the given channels and product IDs.
317    ///
318    /// # Errors
319    ///
320    /// Returns an error if the subscription message cannot be sent.
321    async fn subscribe(
322        &self,
323        channels: Vec<CoinbaseIntxWsChannel>,
324        product_ids: Vec<Ustr>,
325    ) -> Result<(), CoinbaseIntxWsError> {
326        // Update active subscriptions
327        for channel in &channels {
328            self.subscriptions
329                .entry(*channel)
330                .or_default()
331                .extend(product_ids.clone());
332        }
333        tracing::debug!(
334            "Added active subscription(s): channels={channels:?}, product_ids={product_ids:?}"
335        );
336
337        let time = chrono::DateTime::<Utc>::from(SystemTime::now())
338            .timestamp()
339            .to_string();
340        let signature = self.credential.sign_ws(&time);
341        let message = CoinbaseIntxSubscription {
342            op: WsOperation::Subscribe,
343            product_ids: Some(product_ids),
344            channels,
345            time,
346            key: self.credential.api_key,
347            passphrase: self.credential.api_passphrase,
348            signature,
349        };
350
351        let json_txt = serde_json::to_string(&message)
352            .map_err(|e| CoinbaseIntxWsError::JsonError(e.to_string()))?;
353
354        if let Some(inner) = self.inner.read().await.as_ref() {
355            if let Err(err) = inner.send_text(json_txt, None).await {
356                tracing::error!("Error sending message: {err:?}");
357            }
358        } else {
359            return Err(CoinbaseIntxWsError::ClientError(
360                "Cannot send message: not connected".to_string(),
361            ));
362        }
363
364        Ok(())
365    }
366
367    /// Unsubscribes from the given channels and product IDs.
368    async fn unsubscribe(
369        &self,
370        channels: Vec<CoinbaseIntxWsChannel>,
371        product_ids: Vec<Ustr>,
372    ) -> Result<(), CoinbaseIntxWsError> {
373        // Update active subscriptions
374        for channel in &channels {
375            if let Some(mut entry) = self.subscriptions.get_mut(channel) {
376                for product_id in &product_ids {
377                    entry.remove(product_id);
378                }
379                if entry.is_empty() {
380                    drop(entry);
381                    self.subscriptions.remove(channel);
382                }
383            }
384        }
385        tracing::debug!(
386            "Removed active subscription(s): channels={channels:?}, product_ids={product_ids:?}"
387        );
388
389        let time = chrono::DateTime::<Utc>::from(SystemTime::now())
390            .timestamp()
391            .to_string();
392        let signature = self.credential.sign_ws(&time);
393        let message = CoinbaseIntxSubscription {
394            op: WsOperation::Unsubscribe,
395            product_ids: Some(product_ids),
396            channels,
397            time,
398            key: self.credential.api_key,
399            passphrase: self.credential.api_passphrase,
400            signature,
401        };
402
403        let json_txt = serde_json::to_string(&message)
404            .map_err(|e| CoinbaseIntxWsError::JsonError(e.to_string()))?;
405
406        if let Some(inner) = self.inner.read().await.as_ref() {
407            if let Err(err) = inner.send_text(json_txt, None).await {
408                tracing::error!("Error sending message: {err:?}");
409            }
410        } else {
411            return Err(CoinbaseIntxWsError::ClientError(
412                "Cannot send message: not connected".to_string(),
413            ));
414        }
415
416        Ok(())
417    }
418
419    /// Resubscribes for all active subscriptions.
420    async fn resubscribe_all(&self) {
421        let mut subs = Vec::new();
422        for entry in self.subscriptions.iter() {
423            let (channel, product_ids) = entry.pair();
424            if !product_ids.is_empty() {
425                subs.push((*channel, product_ids.clone()));
426            }
427        }
428
429        for (channel, product_ids) in subs {
430            tracing::debug!("Resubscribing: channel={channel}, product_ids={product_ids:?}");
431
432            if let Err(e) = self
433                .subscribe(vec![channel], product_ids.into_iter().collect())
434                .await
435            {
436                tracing::error!("Failed to resubscribe to channel {channel}: {e}");
437            }
438        }
439    }
440
441    /// Subscribes to instrument definition updates for the given instrument IDs.
442    /// Subscribes to instrument updates for the specified instruments.
443    ///
444    /// # Errors
445    ///
446    /// Returns an error if the subscription fails.
447    pub async fn subscribe_instruments(
448        &self,
449        instrument_ids: Vec<InstrumentId>,
450    ) -> Result<(), CoinbaseIntxWsError> {
451        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
452        self.subscribe(vec![CoinbaseIntxWsChannel::Instruments], product_ids)
453            .await
454    }
455
456    /// Subscribes to funding message streams for the given instrument IDs.
457    /// Subscribes to funding rate updates for the specified instruments.
458    ///
459    /// # Errors
460    ///
461    /// Returns an error if the subscription fails.
462    pub async fn subscribe_funding_rates(
463        &self,
464        instrument_ids: Vec<InstrumentId>,
465    ) -> Result<(), CoinbaseIntxWsError> {
466        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
467        self.subscribe(vec![CoinbaseIntxWsChannel::Funding], product_ids)
468            .await
469    }
470
471    /// Subscribes to risk message streams for the given instrument IDs.
472    /// Subscribes to risk updates for the specified instruments.
473    ///
474    /// # Errors
475    ///
476    /// Returns an error if the subscription fails.
477    pub async fn subscribe_risk(
478        &self,
479        instrument_ids: Vec<InstrumentId>,
480    ) -> Result<(), CoinbaseIntxWsError> {
481        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
482        self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
483            .await
484    }
485
486    /// Subscribes to order book (level 2) streams for the given instrument IDs.
487    /// Subscribes to order book snapshots and updates for the specified instruments.
488    ///
489    /// # Errors
490    ///
491    /// Returns an error if the subscription fails.
492    pub async fn subscribe_book(
493        &self,
494        instrument_ids: Vec<InstrumentId>,
495    ) -> Result<(), CoinbaseIntxWsError> {
496        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
497        self.subscribe(vec![CoinbaseIntxWsChannel::Level2], product_ids)
498            .await
499    }
500
501    /// Subscribes to quote (level 1) streams for the given instrument IDs.
502    /// Subscribes to top-of-book quote updates for the specified instruments.
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if the subscription fails.
507    pub async fn subscribe_quotes(
508        &self,
509        instrument_ids: Vec<InstrumentId>,
510    ) -> Result<(), CoinbaseIntxWsError> {
511        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
512        self.subscribe(vec![CoinbaseIntxWsChannel::Level1], product_ids)
513            .await
514    }
515
516    /// Subscribes to trade (match) streams for the given instrument IDs.
517    /// Subscribes to trade updates for the specified instruments.
518    ///
519    /// # Errors
520    ///
521    /// Returns an error if the subscription fails.
522    pub async fn subscribe_trades(
523        &self,
524        instrument_ids: Vec<InstrumentId>,
525    ) -> Result<(), CoinbaseIntxWsError> {
526        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
527        self.subscribe(vec![CoinbaseIntxWsChannel::Match], product_ids)
528            .await
529    }
530
531    /// Subscribes to risk streams (for mark prices) for the given instrument IDs.
532    /// Subscribes to mark price updates for the specified instruments.
533    ///
534    /// # Errors
535    ///
536    /// Returns an error if the subscription fails.
537    pub async fn subscribe_mark_prices(
538        &self,
539        instrument_ids: Vec<InstrumentId>,
540    ) -> Result<(), CoinbaseIntxWsError> {
541        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
542        self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
543            .await
544    }
545
546    /// Subscribes to risk streams (for index prices) for the given instrument IDs.
547    /// Subscribes to index price updates for the specified instruments.
548    ///
549    /// # Errors
550    ///
551    /// Returns an error if the subscription fails.
552    pub async fn subscribe_index_prices(
553        &self,
554        instrument_ids: Vec<InstrumentId>,
555    ) -> Result<(), CoinbaseIntxWsError> {
556        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
557        self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
558            .await
559    }
560
561    /// Subscribes to bar (candle) streams for the given instrument IDs.
562    /// Subscribes to candlestick bar updates for the specified bar type.
563    ///
564    /// # Errors
565    ///
566    /// Returns an error if the subscription fails.
567    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), CoinbaseIntxWsError> {
568        let channel = bar_spec_as_coinbase_channel(bar_type.spec())
569            .map_err(|e| CoinbaseIntxWsError::ClientError(e.to_string()))?;
570        let product_ids = vec![bar_type.standard().instrument_id().symbol.inner()];
571        self.subscribe(vec![channel], product_ids).await
572    }
573
574    /// Unsubscribes from instrument definition streams for the given instrument IDs.
575    /// Unsubscribes from instrument updates for the specified instruments.
576    ///
577    /// # Errors
578    ///
579    /// Returns an error if the unsubscription fails.
580    pub async fn unsubscribe_instruments(
581        &self,
582        instrument_ids: Vec<InstrumentId>,
583    ) -> Result<(), CoinbaseIntxWsError> {
584        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
585        self.unsubscribe(vec![CoinbaseIntxWsChannel::Instruments], product_ids)
586            .await
587    }
588
589    /// Unsubscribes from risk message streams for the given instrument IDs.
590    /// Unsubscribes from risk updates for the specified instruments.
591    ///
592    /// # Errors
593    ///
594    /// Returns an error if the unsubscription fails.
595    pub async fn unsubscribe_risk(
596        &self,
597        instrument_ids: Vec<InstrumentId>,
598    ) -> Result<(), CoinbaseIntxWsError> {
599        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
600        self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
601            .await
602    }
603
604    /// Unsubscribes from funding message streams for the given instrument IDs.
605    /// Unsubscribes from funding updates for the specified instruments.
606    ///
607    /// # Errors
608    ///
609    /// Returns an error if the unsubscription fails.
610    pub async fn unsubscribe_funding(
611        &self,
612        instrument_ids: Vec<InstrumentId>,
613    ) -> Result<(), CoinbaseIntxWsError> {
614        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
615        self.unsubscribe(vec![CoinbaseIntxWsChannel::Funding], product_ids)
616            .await
617    }
618
619    /// Unsubscribes from order book (level 2) streams for the given instrument IDs.
620    /// Unsubscribes from order book updates for the specified instruments.
621    ///
622    /// # Errors
623    ///
624    /// Returns an error if the unsubscription fails.
625    pub async fn unsubscribe_book(
626        &self,
627        instrument_ids: Vec<InstrumentId>,
628    ) -> Result<(), CoinbaseIntxWsError> {
629        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
630        self.unsubscribe(vec![CoinbaseIntxWsChannel::Level2], product_ids)
631            .await
632    }
633
634    /// Unsubscribes from quote (level 1) streams for the given instrument IDs.
635    /// Unsubscribes from quote updates for the specified instruments.
636    ///
637    /// # Errors
638    ///
639    /// Returns an error if the unsubscription fails.
640    pub async fn unsubscribe_quotes(
641        &self,
642        instrument_ids: Vec<InstrumentId>,
643    ) -> Result<(), CoinbaseIntxWsError> {
644        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
645        self.unsubscribe(vec![CoinbaseIntxWsChannel::Level1], product_ids)
646            .await
647    }
648
649    /// Unsubscribes from trade (match) streams for the given instrument IDs.
650    /// Unsubscribes from trade updates for the specified instruments.
651    ///
652    /// # Errors
653    ///
654    /// Returns an error if the unsubscription fails.
655    pub async fn unsubscribe_trades(
656        &self,
657        instrument_ids: Vec<InstrumentId>,
658    ) -> Result<(), CoinbaseIntxWsError> {
659        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
660        self.unsubscribe(vec![CoinbaseIntxWsChannel::Match], product_ids)
661            .await
662    }
663
664    /// Unsubscribes from risk streams (for mark prices) for the given instrument IDs.
665    /// Unsubscribes from mark price updates for the specified instruments.
666    ///
667    /// # Errors
668    ///
669    /// Returns an error if the unsubscription fails.
670    pub async fn unsubscribe_mark_prices(
671        &self,
672        instrument_ids: Vec<InstrumentId>,
673    ) -> Result<(), CoinbaseIntxWsError> {
674        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
675        self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
676            .await
677    }
678
679    /// Unsubscribes from risk streams (for index prices) for the given instrument IDs.
680    /// Unsubscribes from index price updates for the specified instruments.
681    ///
682    /// # Errors
683    ///
684    /// Returns an error if the unsubscription fails.
685    pub async fn unsubscribe_index_prices(
686        &self,
687        instrument_ids: Vec<InstrumentId>,
688    ) -> Result<(), CoinbaseIntxWsError> {
689        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
690        self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
691            .await
692    }
693
694    /// Unsubscribes from bar (candle) streams for the given instrument IDs.
695    /// Unsubscribes from bar updates for the specified bar type.
696    ///
697    /// # Errors
698    ///
699    /// Returns an error if the unsubscription fails.
700    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), CoinbaseIntxWsError> {
701        let channel = bar_spec_as_coinbase_channel(bar_type.spec())
702            .map_err(|e| CoinbaseIntxWsError::ClientError(e.to_string()))?;
703        let product_id = bar_type.standard().instrument_id().symbol.inner();
704        self.unsubscribe(vec![channel], vec![product_id]).await
705    }
706}
707
708fn instrument_ids_to_product_ids(instrument_ids: &[InstrumentId]) -> Vec<Ustr> {
709    instrument_ids.iter().map(|x| x.symbol.inner()).collect()
710}
711
712/// Provides a raw message handler for Coinbase International WebSocket feed.
713struct CoinbaseIntxFeedHandler {
714    reader: MessageReader,
715    signal: Arc<AtomicBool>,
716}
717
718impl CoinbaseIntxFeedHandler {
719    /// Creates a new [`CoinbaseIntxFeedHandler`] instance.
720    pub const fn new(reader: MessageReader, signal: Arc<AtomicBool>) -> Self {
721        Self { reader, signal }
722    }
723
724    /// Gets the next message from the WebSocket message stream.
725    async fn next(&mut self) -> Option<CoinbaseIntxWsMessage> {
726        // Timeout awaiting the next message before checking signal
727        let timeout = Duration::from_millis(10);
728
729        loop {
730            if self.signal.load(Ordering::Relaxed) {
731                tracing::debug!("Stop signal received");
732                break;
733            }
734
735            match tokio::time::timeout(timeout, self.reader.next()).await {
736                Ok(Some(msg)) => match msg {
737                    Ok(Message::Pong(_)) => {
738                        tracing::trace!("Received pong");
739                    }
740                    Ok(Message::Ping(_)) => {
741                        tracing::trace!("Received pong"); // Coinbase send ping frames as pongs
742                    }
743                    Ok(Message::Text(text)) => {
744                        match serde_json::from_str(&text) {
745                            Ok(event) => match &event {
746                                CoinbaseIntxWsMessage::Reject(msg) => {
747                                    tracing::error!("{msg:?}");
748                                }
749                                CoinbaseIntxWsMessage::Confirmation(msg) => {
750                                    tracing::debug!("{msg:?}");
751                                    continue;
752                                }
753                                CoinbaseIntxWsMessage::Instrument(_) => return Some(event),
754                                CoinbaseIntxWsMessage::Funding(_) => return Some(event),
755                                CoinbaseIntxWsMessage::Risk(_) => return Some(event),
756                                CoinbaseIntxWsMessage::BookSnapshot(_) => return Some(event),
757                                CoinbaseIntxWsMessage::BookUpdate(_) => return Some(event),
758                                CoinbaseIntxWsMessage::Quote(_) => return Some(event),
759                                CoinbaseIntxWsMessage::Trade(_) => return Some(event),
760                                CoinbaseIntxWsMessage::CandleSnapshot(_) => return Some(event),
761                                CoinbaseIntxWsMessage::CandleUpdate(_) => continue, // Ignore
762                            },
763                            Err(e) => {
764                                tracing::error!("Failed to parse message: {e}: {text}");
765                                break;
766                            }
767                        }
768                    }
769                    Ok(Message::Binary(msg)) => {
770                        tracing::debug!("Raw binary: {msg:?}");
771                    }
772                    Ok(Message::Close(_)) => {
773                        tracing::debug!("Received close message");
774                        return None;
775                    }
776                    Ok(msg) => {
777                        tracing::warn!("Unexpected message: {msg:?}");
778                    }
779                    Err(e) => {
780                        tracing::error!("{e}, stopping client");
781                        break; // Break as indicates a bug in the code
782                    }
783                },
784                Ok(None) => {
785                    tracing::info!("WebSocket stream closed");
786                    break;
787                }
788                Err(_) => {} // Timeout occurred awaiting a message, continue loop to check signal
789            }
790        }
791
792        log_task_stopped("message-streaming");
793        None
794    }
795}
796
797/// Provides a Nautilus parser for the Coinbase International WebSocket feed.
798struct CoinbaseIntxWsMessageHandler {
799    handler: CoinbaseIntxFeedHandler,
800    tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
801    instruments_cache: AHashMap<Ustr, InstrumentAny>,
802}
803
804impl CoinbaseIntxWsMessageHandler {
805    /// Creates a new [`CoinbaseIntxWsMessageHandler`] instance.
806    pub const fn new(
807        reader: MessageReader,
808        signal: Arc<AtomicBool>,
809        tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
810        instruments_cache: AHashMap<Ustr, InstrumentAny>,
811    ) -> Self {
812        let handler = CoinbaseIntxFeedHandler::new(reader, signal);
813        Self {
814            handler,
815            tx,
816            instruments_cache,
817        }
818    }
819
820    /// Runs the WebSocket message feed.
821    async fn run(&mut self) {
822        while let Some(data) = self.next().await {
823            if let Err(e) = self.tx.send(data) {
824                tracing::error!("Error sending data: {e}");
825                break; // Stop processing on channel error
826            }
827        }
828    }
829
830    /// Gets the next message from the WebSocket message handler.
831    async fn next(&mut self) -> Option<NautilusWsMessage> {
832        let clock = get_atomic_clock_realtime();
833
834        while let Some(event) = self.handler.next().await {
835            match event {
836                CoinbaseIntxWsMessage::Instrument(msg) => {
837                    if let Some(inst) = parse_instrument_any(&msg, clock.get_time_ns()) {
838                        // Update instruments map
839                        self.instruments_cache
840                            .insert(inst.raw_symbol().inner(), inst.clone());
841                        return Some(NautilusWsMessage::Instrument(inst));
842                    }
843                }
844                CoinbaseIntxWsMessage::Funding(msg) => {
845                    tracing::warn!("Received {msg:?}"); // TODO: Implement
846                }
847                CoinbaseIntxWsMessage::BookSnapshot(msg) => {
848                    if let Some(inst) = self.instruments_cache.get(&msg.product_id) {
849                        match parse_orderbook_snapshot_msg(
850                            &msg,
851                            inst.id(),
852                            inst.price_precision(),
853                            inst.size_precision(),
854                            clock.get_time_ns(),
855                        ) {
856                            Ok(deltas) => {
857                                let deltas = OrderBookDeltas_API::new(deltas);
858                                let data = Data::Deltas(deltas);
859                                return Some(NautilusWsMessage::Data(data));
860                            }
861                            Err(e) => {
862                                tracing::error!("Failed to parse orderbook snapshot: {e}");
863                                return None;
864                            }
865                        }
866                    }
867                    tracing::error!("No instrument found for {}", msg.product_id);
868                    return None;
869                }
870                CoinbaseIntxWsMessage::BookUpdate(msg) => {
871                    if let Some(inst) = self.instruments_cache.get(&msg.product_id) {
872                        match parse_orderbook_update_msg(
873                            &msg,
874                            inst.id(),
875                            inst.price_precision(),
876                            inst.size_precision(),
877                            clock.get_time_ns(),
878                        ) {
879                            Ok(deltas) => {
880                                let deltas = OrderBookDeltas_API::new(deltas);
881                                let data = Data::Deltas(deltas);
882                                return Some(NautilusWsMessage::Data(data));
883                            }
884                            Err(e) => {
885                                tracing::error!("Failed to parse orderbook update: {e}");
886                            }
887                        }
888                    } else {
889                        tracing::error!("No instrument found for {}", msg.product_id);
890                    }
891                }
892                CoinbaseIntxWsMessage::Quote(msg) => {
893                    if let Some(inst) = self.instruments_cache.get(&msg.product_id) {
894                        match parse_quote_msg(
895                            &msg,
896                            inst.id(),
897                            inst.price_precision(),
898                            inst.size_precision(),
899                            clock.get_time_ns(),
900                        ) {
901                            Ok(quote) => return Some(NautilusWsMessage::Data(Data::Quote(quote))),
902                            Err(e) => {
903                                tracing::error!("Failed to parse quote: {e}");
904                            }
905                        }
906                    } else {
907                        tracing::error!("No instrument found for {}", msg.product_id);
908                    }
909                }
910                CoinbaseIntxWsMessage::Trade(msg) => {
911                    if let Some(inst) = self.instruments_cache.get(&msg.product_id) {
912                        match parse_trade_msg(
913                            &msg,
914                            inst.id(),
915                            inst.price_precision(),
916                            inst.size_precision(),
917                            clock.get_time_ns(),
918                        ) {
919                            Ok(trade) => return Some(NautilusWsMessage::Data(Data::Trade(trade))),
920                            Err(e) => {
921                                tracing::error!("Failed to parse trade: {e}");
922                            }
923                        }
924                    } else {
925                        tracing::error!("No instrument found for {}", msg.product_id);
926                    }
927                }
928                CoinbaseIntxWsMessage::Risk(msg) => {
929                    if let Some(inst) = self.instruments_cache.get(&msg.product_id) {
930                        let mark_price = match parse_mark_price_msg(
931                            &msg,
932                            inst.id(),
933                            inst.price_precision(),
934                            clock.get_time_ns(),
935                        ) {
936                            Ok(mark_price) => Some(mark_price),
937                            Err(e) => {
938                                tracing::error!("Failed to parse mark price: {e}");
939                                None
940                            }
941                        };
942
943                        let index_price = match parse_index_price_msg(
944                            &msg,
945                            inst.id(),
946                            inst.price_precision(),
947                            clock.get_time_ns(),
948                        ) {
949                            Ok(index_price) => Some(index_price),
950                            Err(e) => {
951                                tracing::error!("Failed to parse index price: {e}");
952                                None
953                            }
954                        };
955
956                        match (mark_price, index_price) {
957                            (Some(mark), Some(index)) => {
958                                return Some(NautilusWsMessage::MarkAndIndex((mark, index)));
959                            }
960                            (Some(mark), None) => return Some(NautilusWsMessage::MarkPrice(mark)),
961                            (None, Some(index)) => {
962                                return Some(NautilusWsMessage::IndexPrice(index));
963                            }
964                            (None, None) => continue,
965                        };
966                    }
967                    tracing::error!("No instrument found for {}", msg.product_id);
968                }
969                CoinbaseIntxWsMessage::CandleSnapshot(msg) => {
970                    if let Some(inst) = self.instruments_cache.get(&msg.product_id) {
971                        match parse_candle_msg(
972                            &msg,
973                            inst.id(),
974                            inst.price_precision(),
975                            inst.size_precision(),
976                            clock.get_time_ns(),
977                        ) {
978                            Ok(bar) => return Some(NautilusWsMessage::Data(Data::Bar(bar))),
979                            Err(e) => {
980                                tracing::error!("Failed to parse candle: {e}");
981                            }
982                        }
983                    } else {
984                        tracing::error!("No instrument found for {}", msg.product_id);
985                    }
986                }
987                _ => {
988                    tracing::warn!("Not implemented: {event:?}");
989                }
990            }
991        }
992        None // Connection closed
993    }
994}