nautilus_coinbase_intx/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    sync::{
19        Arc,
20        atomic::{AtomicBool, Ordering},
21    },
22    time::{Duration, SystemTime},
23};
24
25use chrono::Utc;
26use futures_util::{Stream, StreamExt};
27use nautilus_common::runtime::get_runtime;
28use nautilus_core::{consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
29use nautilus_model::{
30    data::{BarType, Data, OrderBookDeltas_API},
31    identifiers::InstrumentId,
32    instruments::{Instrument, InstrumentAny},
33};
34use nautilus_network::websocket::{Consumer, MessageReader, WebSocketClient, WebSocketConfig};
35use reqwest::header::USER_AGENT;
36use tokio::sync::Mutex;
37use tokio_tungstenite::tungstenite::{Error, Message};
38use ustr::Ustr;
39
40use super::{
41    enums::{CoinbaseIntxWsChannel, WsOperation},
42    error::CoinbaseIntxWsError,
43    messages::{CoinbaseIntxSubscription, CoinbaseIntxWsMessage, NautilusWsMessage},
44    parse::{
45        parse_candle_msg, parse_index_price_msg, parse_mark_price_msg,
46        parse_orderbook_snapshot_msg, parse_orderbook_update_msg, parse_quote_msg,
47    },
48};
49use crate::{
50    common::{
51        consts::COINBASE_INTX_WS_URL,
52        credential::{Credential, get_env_var},
53        parse::bar_spec_as_coinbase_channel,
54    },
55    websocket::parse::{parse_instrument_any, parse_trade_msg},
56};
57
58/// Provides a WebSocket client for connecting to [Coinbase International](https://www.coinbase.com/en/international-exchange).
59#[derive(Clone)]
60#[cfg_attr(
61    feature = "python",
62    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
63)]
64pub struct CoinbaseIntxWebSocketClient {
65    url: String,
66    credential: Credential,
67    heartbeat: Option<u64>,
68    inner: Option<Arc<WebSocketClient>>,
69    rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
70    signal: Arc<AtomicBool>,
71    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
72    subscriptions: Arc<Mutex<HashMap<CoinbaseIntxWsChannel, Vec<Ustr>>>>,
73}
74
75impl Default for CoinbaseIntxWebSocketClient {
76    fn default() -> Self {
77        Self::new(None, None, None, None, Some(10)).expect("Failed to create client")
78    }
79}
80
81impl CoinbaseIntxWebSocketClient {
82    /// Creates a new [`CoinbaseIntxWebSocketClient`] instance.
83    pub fn new(
84        url: Option<String>,
85        api_key: Option<String>,
86        api_secret: Option<String>,
87        api_passphrase: Option<String>,
88        heartbeat: Option<u64>,
89    ) -> anyhow::Result<Self> {
90        let url = url.unwrap_or(COINBASE_INTX_WS_URL.to_string());
91        let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
92        let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
93        let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
94
95        let credential = Credential::new(api_key, api_secret, api_passphrase);
96        let signal = Arc::new(AtomicBool::new(false));
97        let subscriptions = Arc::new(Mutex::new(HashMap::new()));
98
99        Ok(Self {
100            url,
101            credential,
102            heartbeat,
103            inner: None,
104            rx: None,
105            signal,
106            task_handle: None,
107            subscriptions,
108        })
109    }
110
111    /// Creates a new authenticated [`CoinbaseIntxWebSocketClient`] using environment variables and
112    /// the default Coinbase International production websocket url.
113    pub fn from_env() -> anyhow::Result<Self> {
114        Self::new(None, None, None, None, None)
115    }
116
117    /// Returns the websocket url being used by the client.
118    pub fn url(&self) -> &str {
119        self.url.as_str()
120    }
121
122    /// Returns the public API key being used by the client.
123    pub fn api_key(&self) -> &str {
124        self.credential.api_key.as_str()
125    }
126
127    /// Returns a value indicating whether the client is active.
128    pub fn is_active(&self) -> bool {
129        match &self.inner {
130            Some(inner) => inner.is_active(),
131            None => false,
132        }
133    }
134
135    /// Returns a value indicating whether the client is closed.
136    pub fn is_closed(&self) -> bool {
137        match &self.inner {
138            Some(inner) => inner.is_closed(),
139            None => true,
140        }
141    }
142
143    /// Connects the client to the server and caches the given instruments.
144    pub async fn connect(&mut self, instruments: Vec<InstrumentAny>) -> anyhow::Result<()> {
145        let client = self.clone();
146        let post_reconnect = Arc::new(move || {
147            let client = client.clone();
148            tokio::spawn(async move { client.resubscribe_all().await });
149        });
150
151        let config = WebSocketConfig {
152            url: self.url.clone(),
153            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
154            heartbeat: self.heartbeat,
155            heartbeat_msg: None,
156            handler: Consumer::Python(None),
157            ping_handler: None,
158            reconnect_timeout_ms: Some(5_000),
159            reconnect_delay_initial_ms: None, // Use default
160            reconnect_delay_max_ms: None,     // Use default
161            reconnect_backoff_factor: None,   // Use default
162            reconnect_jitter_ms: None,        // Use default
163        };
164        let (reader, client) =
165            WebSocketClient::connect_stream(config, vec![], None, Some(post_reconnect)).await?;
166
167        self.inner = Some(Arc::new(client));
168
169        let mut instruments_map: HashMap<Ustr, InstrumentAny> = HashMap::new();
170        for inst in instruments {
171            instruments_map.insert(inst.raw_symbol().inner(), inst);
172        }
173
174        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
175        self.rx = Some(Arc::new(rx));
176        let signal = self.signal.clone();
177
178        let stream_handle = get_runtime().spawn(async move {
179            CoinbaseIntxWsMessageHandler::new(instruments_map, reader, signal, tx)
180                .run()
181                .await;
182        });
183
184        self.task_handle = Some(Arc::new(stream_handle));
185
186        Ok(())
187    }
188
189    /// Provides the internal data stream as a channel-based stream.
190    ///
191    /// # Panics
192    ///
193    /// This function panics:
194    /// - If the websocket is not connected.
195    /// - If `stream_data` has already been called somewhere else (stream receiver is then taken).
196    pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + 'static {
197        let rx = self
198            .rx
199            .take()
200            .expect("Data stream receiver already taken or not connected"); // Design-time error
201        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
202        async_stream::stream! {
203            while let Some(data) = rx.recv().await {
204                yield data;
205            }
206        }
207    }
208
209    /// Closes the client.
210    pub async fn close(&mut self) -> Result<(), Error> {
211        tracing::debug!("Closing");
212        self.signal.store(true, Ordering::Relaxed);
213
214        match tokio::time::timeout(Duration::from_secs(5), async {
215            if let Some(inner) = &self.inner {
216                inner.disconnect().await;
217            } else {
218                log::error!("Error on close: not connected");
219            }
220        })
221        .await
222        {
223            Ok(()) => {
224                tracing::debug!("Inner disconnected");
225            }
226            Err(_) => {
227                tracing::error!("Timeout waiting for inner client to disconnect");
228            }
229        }
230
231        log::debug!("Closed");
232
233        Ok(())
234    }
235
236    /// Subscribes to the given channels and product IDs.
237    async fn subscribe(
238        &self,
239        channels: Vec<CoinbaseIntxWsChannel>,
240        product_ids: Vec<Ustr>,
241    ) -> Result<(), CoinbaseIntxWsError> {
242        // Update active subscriptions
243        let mut active_subs = self.subscriptions.lock().await;
244        for channel in &channels {
245            active_subs
246                .entry(*channel)
247                .or_insert_with(Vec::new)
248                .extend(product_ids.clone());
249        }
250        tracing::debug!(
251            "Added active subscription(s): channels={channels:?}, product_ids={product_ids:?}"
252        );
253
254        let time = chrono::DateTime::<Utc>::from(SystemTime::now())
255            .timestamp()
256            .to_string();
257        let signature = self.credential.sign_ws(&time);
258        let message = CoinbaseIntxSubscription {
259            op: WsOperation::Subscribe,
260            product_ids: Some(product_ids),
261            channels,
262            time,
263            key: self.credential.api_key,
264            passphrase: self.credential.api_passphrase,
265            signature,
266        };
267
268        let json_txt = serde_json::to_string(&message)
269            .map_err(|e| CoinbaseIntxWsError::JsonError(e.to_string()))?;
270
271        if let Some(inner) = &self.inner {
272            inner.send_text(json_txt, None).await;
273        } else {
274            return Err(CoinbaseIntxWsError::ClientError(
275                "Cannot send message: not connected".to_string(),
276            ));
277        }
278
279        Ok(())
280    }
281
282    /// Unsubscribes from the given channels and product IDs.
283    async fn unsubscribe(
284        &self,
285        channels: Vec<CoinbaseIntxWsChannel>,
286        product_ids: Vec<Ustr>,
287    ) -> Result<(), CoinbaseIntxWsError> {
288        // Update active subscriptions
289        let mut active_subs = self.subscriptions.lock().await;
290        for channel in &channels {
291            if let Some(subs) = active_subs.get_mut(channel) {
292                for product_id in &product_ids {
293                    subs.retain(|pid| pid != product_id);
294                }
295                if subs.is_empty() {
296                    active_subs.remove(channel);
297                }
298            }
299        }
300        tracing::debug!(
301            "Removed active subscription(s): channels={channels:?}, product_ids={product_ids:?}"
302        );
303
304        let time = chrono::DateTime::<Utc>::from(SystemTime::now())
305            .timestamp()
306            .to_string();
307        let signature = self.credential.sign_ws(&time);
308        let message = CoinbaseIntxSubscription {
309            op: WsOperation::Unsubscribe,
310            product_ids: Some(product_ids),
311            channels,
312            time,
313            key: self.credential.api_key,
314            passphrase: self.credential.api_passphrase,
315            signature,
316        };
317
318        let json_txt = serde_json::to_string(&message)
319            .map_err(|e| CoinbaseIntxWsError::JsonError(e.to_string()))?;
320
321        if let Some(inner) = &self.inner {
322            inner.send_text(json_txt, None).await;
323        } else {
324            return Err(CoinbaseIntxWsError::ClientError(
325                "Cannot send message: not connected".to_string(),
326            ));
327        }
328
329        Ok(())
330    }
331
332    /// Resubscribes for all active subscriptions.
333    async fn resubscribe_all(&self) {
334        let subs = self.subscriptions.lock().await.clone();
335
336        for (channel, product_ids) in subs {
337            if product_ids.is_empty() {
338                continue;
339            }
340
341            tracing::debug!("Resubscribing: channel={channel}, product_ids={product_ids:?}");
342
343            if let Err(e) = self.subscribe(vec![channel], product_ids).await {
344                tracing::error!("Failed to resubscribe to channel {channel}: {e}");
345            }
346        }
347    }
348
349    /// Subscribes to instrument definition updates for the given instrument IDs.
350    pub async fn subscribe_instruments(
351        &self,
352        instrument_ids: Vec<InstrumentId>,
353    ) -> Result<(), CoinbaseIntxWsError> {
354        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
355        self.subscribe(vec![CoinbaseIntxWsChannel::Instruments], product_ids)
356            .await
357    }
358
359    /// Subscribes to funding message streams for the given instrument IDs.
360    pub async fn subscribe_funding(
361        &self,
362        instrument_ids: Vec<InstrumentId>,
363    ) -> Result<(), CoinbaseIntxWsError> {
364        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
365        self.subscribe(vec![CoinbaseIntxWsChannel::Funding], product_ids)
366            .await
367    }
368
369    /// Subscribes to risk message streams for the given instrument IDs.
370    pub async fn subscribe_risk(
371        &self,
372        instrument_ids: Vec<InstrumentId>,
373    ) -> Result<(), CoinbaseIntxWsError> {
374        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
375        self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
376            .await
377    }
378
379    /// Subscribes to order book (level 2) streams for the given instrument IDs.
380    pub async fn subscribe_order_book(
381        &self,
382        instrument_ids: Vec<InstrumentId>,
383    ) -> Result<(), CoinbaseIntxWsError> {
384        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
385        self.subscribe(vec![CoinbaseIntxWsChannel::Level2], product_ids)
386            .await
387    }
388
389    /// Subscribes to quote (level 1) streams for the given instrument IDs.
390    pub async fn subscribe_quotes(
391        &self,
392        instrument_ids: Vec<InstrumentId>,
393    ) -> Result<(), CoinbaseIntxWsError> {
394        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
395        self.subscribe(vec![CoinbaseIntxWsChannel::Level1], product_ids)
396            .await
397    }
398
399    /// Subscribes to trade (match) streams for the given instrument IDs.
400    pub async fn subscribe_trades(
401        &self,
402        instrument_ids: Vec<InstrumentId>,
403    ) -> Result<(), CoinbaseIntxWsError> {
404        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
405        self.subscribe(vec![CoinbaseIntxWsChannel::Match], product_ids)
406            .await
407    }
408
409    /// Subscribes to risk streams (for mark prices) for the given instrument IDs.
410    pub async fn subscribe_mark_prices(
411        &self,
412        instrument_ids: Vec<InstrumentId>,
413    ) -> Result<(), CoinbaseIntxWsError> {
414        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
415        self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
416            .await
417    }
418
419    /// Subscribes to risk streams (for index prices) for the given instrument IDs.
420    pub async fn subscribe_index_prices(
421        &self,
422        instrument_ids: Vec<InstrumentId>,
423    ) -> Result<(), CoinbaseIntxWsError> {
424        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
425        self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
426            .await
427    }
428
429    /// Subscribes to bar (candle) streams for the given instrument IDs.
430    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), CoinbaseIntxWsError> {
431        let channel = bar_spec_as_coinbase_channel(bar_type.spec())
432            .map_err(|e| CoinbaseIntxWsError::ClientError(e.to_string()))?;
433        let product_ids = vec![bar_type.standard().instrument_id().symbol.inner()];
434        self.subscribe(vec![channel], product_ids).await
435    }
436
437    /// Unsubscribes from instrument definition streams for the given instrument IDs.
438    pub async fn unsubscribe_instruments(
439        &self,
440        instrument_ids: Vec<InstrumentId>,
441    ) -> Result<(), CoinbaseIntxWsError> {
442        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
443        self.unsubscribe(vec![CoinbaseIntxWsChannel::Instruments], product_ids)
444            .await
445    }
446
447    /// Unsubscribes from risk message streams for the given instrument IDs.
448    pub async fn unsubscribe_risk(
449        &self,
450        instrument_ids: Vec<InstrumentId>,
451    ) -> Result<(), CoinbaseIntxWsError> {
452        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
453        self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
454            .await
455    }
456
457    /// Unsubscribes from funding message streams for the given instrument IDs.
458    pub async fn unsubscribe_funding(
459        &self,
460        instrument_ids: Vec<InstrumentId>,
461    ) -> Result<(), CoinbaseIntxWsError> {
462        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
463        self.unsubscribe(vec![CoinbaseIntxWsChannel::Funding], product_ids)
464            .await
465    }
466
467    /// Unsubscribes from order book (level 2) streams for the given instrument IDs.
468    pub async fn unsubscribe_order_book(
469        &self,
470        instrument_ids: Vec<InstrumentId>,
471    ) -> Result<(), CoinbaseIntxWsError> {
472        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
473        self.unsubscribe(vec![CoinbaseIntxWsChannel::Level2], product_ids)
474            .await
475    }
476
477    /// Unsubscribes from quote (level 1) streams for the given instrument IDs.
478    pub async fn unsubscribe_quotes(
479        &self,
480        instrument_ids: Vec<InstrumentId>,
481    ) -> Result<(), CoinbaseIntxWsError> {
482        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
483        self.unsubscribe(vec![CoinbaseIntxWsChannel::Level1], product_ids)
484            .await
485    }
486
487    /// Unsubscribes from trade (match) streams for the given instrument IDs.
488    pub async fn unsubscribe_trades(
489        &self,
490        instrument_ids: Vec<InstrumentId>,
491    ) -> Result<(), CoinbaseIntxWsError> {
492        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
493        self.unsubscribe(vec![CoinbaseIntxWsChannel::Match], product_ids)
494            .await
495    }
496
497    /// Unsubscribes from risk streams (for mark prices) for the given instrument IDs.
498    pub async fn unsubscribe_mark_prices(
499        &self,
500        instrument_ids: Vec<InstrumentId>,
501    ) -> Result<(), CoinbaseIntxWsError> {
502        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
503        self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
504            .await
505    }
506
507    /// Unsubscribes from risk streams (for index prices) for the given instrument IDs.
508    pub async fn unsubscribe_index_prices(
509        &self,
510        instrument_ids: Vec<InstrumentId>,
511    ) -> Result<(), CoinbaseIntxWsError> {
512        let product_ids = instrument_ids_to_product_ids(&instrument_ids);
513        self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
514            .await
515    }
516
517    /// Unsubscribes from bar (candle) streams for the given instrument IDs.
518    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), CoinbaseIntxWsError> {
519        let channel = bar_spec_as_coinbase_channel(bar_type.spec())
520            .map_err(|e| CoinbaseIntxWsError::ClientError(e.to_string()))?;
521        let product_id = bar_type.standard().instrument_id().symbol.inner();
522        self.unsubscribe(vec![channel], vec![product_id]).await
523    }
524}
525
526fn instrument_ids_to_product_ids(instrument_ids: &[InstrumentId]) -> Vec<Ustr> {
527    instrument_ids.iter().map(|x| x.symbol.inner()).collect()
528}
529
530/// Provides a raw message handler for Coinbase International WebSocket feed.
531struct CoinbaseIntxFeedHandler {
532    reader: MessageReader,
533    signal: Arc<AtomicBool>,
534}
535
536impl CoinbaseIntxFeedHandler {
537    /// Creates a new [`CoinbaseIntxFeedHandler`] instance.
538    pub const fn new(reader: MessageReader, signal: Arc<AtomicBool>) -> Self {
539        Self { reader, signal }
540    }
541
542    /// Gets the next message from the WebSocket message stream.
543    async fn next(&mut self) -> Option<CoinbaseIntxWsMessage> {
544        // Timeout awaiting the next message before checking signal
545        let timeout = Duration::from_millis(10);
546
547        loop {
548            if self.signal.load(Ordering::Relaxed) {
549                tracing::debug!("Stop signal received");
550                break;
551            }
552
553            match tokio::time::timeout(timeout, self.reader.next()).await {
554                Ok(Some(msg)) => match msg {
555                    Ok(Message::Pong(_)) => {
556                        tracing::trace!("Received pong");
557                    }
558                    Ok(Message::Ping(_)) => {
559                        tracing::trace!("Received pong"); // Coinbase send ping frames as pongs
560                    }
561                    Ok(Message::Text(text)) => {
562                        match serde_json::from_str(&text) {
563                            Ok(event) => match &event {
564                                CoinbaseIntxWsMessage::Reject(msg) => {
565                                    tracing::error!("{msg:?}");
566                                }
567                                CoinbaseIntxWsMessage::Confirmation(msg) => {
568                                    tracing::debug!("{msg:?}");
569                                    continue;
570                                }
571                                CoinbaseIntxWsMessage::Instrument(_) => return Some(event),
572                                CoinbaseIntxWsMessage::Funding(_) => return Some(event),
573                                CoinbaseIntxWsMessage::Risk(_) => return Some(event),
574                                CoinbaseIntxWsMessage::BookSnapshot(_) => return Some(event),
575                                CoinbaseIntxWsMessage::BookUpdate(_) => return Some(event),
576                                CoinbaseIntxWsMessage::Quote(_) => return Some(event),
577                                CoinbaseIntxWsMessage::Trade(_) => return Some(event),
578                                CoinbaseIntxWsMessage::CandleSnapshot(_) => return Some(event),
579                                CoinbaseIntxWsMessage::CandleUpdate(_) => continue, // Ignore
580                            },
581                            Err(e) => {
582                                tracing::error!("Failed to parse message: {e}: {text}");
583                                break;
584                            }
585                        }
586                    }
587                    Ok(Message::Binary(msg)) => {
588                        tracing::debug!("Raw binary: {msg:?}");
589                    }
590                    Ok(Message::Close(_)) => {
591                        tracing::debug!("Received close message");
592                        return None;
593                    }
594                    Ok(msg) => {
595                        tracing::warn!("Unexpected message: {msg:?}");
596                    }
597                    Err(e) => {
598                        tracing::error!("{e}, stopping client");
599                        break; // Break as indicates a bug in the code
600                    }
601                },
602                Ok(None) => {
603                    tracing::info!("WebSocket stream closed");
604                    break;
605                }
606                Err(_) => {} // Timeout occurred awaiting a message, continue loop to check signal
607            }
608        }
609
610        tracing::debug!("Stopped message streaming");
611        None
612    }
613}
614
615/// Provides a Nautilus parser for the Coinbase International WebSocket feed.
616struct CoinbaseIntxWsMessageHandler {
617    instruments: HashMap<Ustr, InstrumentAny>,
618    handler: CoinbaseIntxFeedHandler,
619    tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
620}
621
622impl CoinbaseIntxWsMessageHandler {
623    /// Creates a new [`CoinbaseIntxWsMessageHandler`] instance.
624    pub const fn new(
625        instruments: HashMap<Ustr, InstrumentAny>,
626        reader: MessageReader,
627        signal: Arc<AtomicBool>,
628        tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
629    ) -> Self {
630        let handler = CoinbaseIntxFeedHandler::new(reader, signal);
631        Self {
632            instruments,
633            handler,
634            tx,
635        }
636    }
637
638    /// Runs the WebSocket message feed.
639    async fn run(&mut self) {
640        while let Some(data) = self.next().await {
641            if let Err(e) = self.tx.send(data) {
642                tracing::error!("Error sending data: {e}");
643                break; // Stop processing on channel error
644            }
645        }
646    }
647
648    /// Gets the next message from the WebSocket message handler.
649    async fn next(&mut self) -> Option<NautilusWsMessage> {
650        let clock = get_atomic_clock_realtime();
651
652        while let Some(event) = self.handler.next().await {
653            match event {
654                CoinbaseIntxWsMessage::Instrument(msg) => {
655                    if let Some(inst) = parse_instrument_any(&msg, clock.get_time_ns()) {
656                        // Update instruments map
657                        self.instruments
658                            .insert(inst.raw_symbol().inner(), inst.clone());
659                        return Some(NautilusWsMessage::Instrument(inst));
660                    }
661                }
662                CoinbaseIntxWsMessage::Funding(msg) => {
663                    tracing::warn!("Received {msg:?}"); // TODO: Implement
664                }
665                CoinbaseIntxWsMessage::BookSnapshot(msg) => {
666                    if let Some(inst) = self.instruments.get(&msg.product_id) {
667                        match parse_orderbook_snapshot_msg(
668                            &msg,
669                            inst.id(),
670                            inst.price_precision(),
671                            inst.size_precision(),
672                            clock.get_time_ns(),
673                        ) {
674                            Ok(deltas) => {
675                                let deltas = OrderBookDeltas_API::new(deltas);
676                                let data = Data::Deltas(deltas);
677                                return Some(NautilusWsMessage::Data(data));
678                            }
679                            Err(e) => {
680                                tracing::error!("Failed to parse orderbook snapshot: {e}");
681                                return None;
682                            }
683                        }
684                    } else {
685                        tracing::error!("No instrument found for {}", msg.product_id);
686                        return None;
687                    }
688                }
689                CoinbaseIntxWsMessage::BookUpdate(msg) => {
690                    if let Some(inst) = self.instruments.get(&msg.product_id) {
691                        match parse_orderbook_update_msg(
692                            &msg,
693                            inst.id(),
694                            inst.price_precision(),
695                            inst.size_precision(),
696                            clock.get_time_ns(),
697                        ) {
698                            Ok(deltas) => {
699                                let deltas = OrderBookDeltas_API::new(deltas);
700                                let data = Data::Deltas(deltas);
701                                return Some(NautilusWsMessage::Data(data));
702                            }
703                            Err(e) => {
704                                tracing::error!("Failed to parse orderbook update: {e}");
705                            }
706                        }
707                    } else {
708                        tracing::error!("No instrument found for {}", msg.product_id);
709                    }
710                }
711                CoinbaseIntxWsMessage::Quote(msg) => {
712                    if let Some(inst) = self.instruments.get(&msg.product_id) {
713                        match parse_quote_msg(
714                            &msg,
715                            inst.id(),
716                            inst.price_precision(),
717                            inst.size_precision(),
718                            clock.get_time_ns(),
719                        ) {
720                            Ok(quote) => return Some(NautilusWsMessage::Data(Data::Quote(quote))),
721                            Err(e) => {
722                                tracing::error!("Failed to parse quote: {e}");
723                            }
724                        }
725                    } else {
726                        tracing::error!("No instrument found for {}", msg.product_id);
727                    }
728                }
729                CoinbaseIntxWsMessage::Trade(msg) => {
730                    if let Some(inst) = self.instruments.get(&msg.product_id) {
731                        match parse_trade_msg(
732                            &msg,
733                            inst.id(),
734                            inst.price_precision(),
735                            inst.size_precision(),
736                            clock.get_time_ns(),
737                        ) {
738                            Ok(trade) => return Some(NautilusWsMessage::Data(Data::Trade(trade))),
739                            Err(e) => {
740                                tracing::error!("Failed to parse trade: {e}");
741                            }
742                        }
743                    } else {
744                        tracing::error!("No instrument found for {}", msg.product_id);
745                    }
746                }
747                CoinbaseIntxWsMessage::Risk(msg) => {
748                    if let Some(inst) = self.instruments.get(&msg.product_id) {
749                        let mark_price = match parse_mark_price_msg(
750                            &msg,
751                            inst.id(),
752                            inst.price_precision(),
753                            clock.get_time_ns(),
754                        ) {
755                            Ok(mark_price) => Some(mark_price),
756                            Err(e) => {
757                                tracing::error!("Failed to parse mark price: {e}");
758                                None
759                            }
760                        };
761
762                        let index_price = match parse_index_price_msg(
763                            &msg,
764                            inst.id(),
765                            inst.price_precision(),
766                            clock.get_time_ns(),
767                        ) {
768                            Ok(index_price) => Some(index_price),
769                            Err(e) => {
770                                tracing::error!("Failed to parse index price: {e}");
771                                None
772                            }
773                        };
774
775                        match (mark_price, index_price) {
776                            (Some(mark), Some(index)) => {
777                                return Some(NautilusWsMessage::MarkAndIndex((mark, index)));
778                            }
779                            (Some(mark), None) => return Some(NautilusWsMessage::MarkPrice(mark)),
780                            (None, Some(index)) => {
781                                return Some(NautilusWsMessage::IndexPrice(index));
782                            }
783                            (None, None) => continue,
784                        };
785                    } else {
786                        tracing::error!("No instrument found for {}", msg.product_id);
787                    }
788                }
789                CoinbaseIntxWsMessage::CandleSnapshot(msg) => {
790                    if let Some(inst) = self.instruments.get(&msg.product_id) {
791                        match parse_candle_msg(
792                            &msg,
793                            inst.id(),
794                            inst.price_precision(),
795                            inst.size_precision(),
796                            clock.get_time_ns(),
797                        ) {
798                            Ok(bar) => return Some(NautilusWsMessage::Data(Data::Bar(bar))),
799                            Err(e) => {
800                                tracing::error!("Failed to parse candle: {e}");
801                            }
802                        }
803                    } else {
804                        tracing::error!("No instrument found for {}", msg.product_id);
805                    }
806                }
807                _ => {
808                    tracing::warn!("Not implemented: {event:?}");
809                }
810            }
811        }
812        None // Connection closed
813    }
814}