1use std::{
17 sync::{
18 Arc,
19 atomic::{AtomicBool, Ordering},
20 },
21 time::{Duration, SystemTime},
22};
23
24use ahash::{AHashMap, AHashSet};
25use chrono::Utc;
26use dashmap::DashMap;
27use futures_util::{Stream, StreamExt};
28use nautilus_common::{logging::log_task_stopped, runtime::get_runtime};
29use nautilus_core::{
30 consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
31};
32use nautilus_model::{
33 data::{BarType, Data, OrderBookDeltas_API},
34 identifiers::InstrumentId,
35 instruments::{Instrument, InstrumentAny},
36};
37use nautilus_network::websocket::{MessageReader, WebSocketClient, WebSocketConfig};
38use reqwest::header::USER_AGENT;
39use tokio_tungstenite::tungstenite::{Error, Message};
40use ustr::Ustr;
41
42use super::{
43 enums::{CoinbaseIntxWsChannel, WsOperation},
44 error::CoinbaseIntxWsError,
45 messages::{CoinbaseIntxSubscription, CoinbaseIntxWsMessage, NautilusWsMessage},
46 parse::{
47 parse_candle_msg, parse_index_price_msg, parse_mark_price_msg,
48 parse_orderbook_snapshot_msg, parse_orderbook_update_msg, parse_quote_msg,
49 },
50};
51use crate::{
52 common::{
53 consts::COINBASE_INTX_WS_URL, credential::Credential, parse::bar_spec_as_coinbase_channel,
54 },
55 websocket::parse::{parse_instrument_any, parse_trade_msg},
56};
57
58#[derive(Debug, Clone)]
60#[cfg_attr(
61 feature = "python",
62 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
63)]
64pub struct CoinbaseIntxWebSocketClient {
65 url: String,
66 credential: Credential,
67 heartbeat: Option<u64>,
68 inner: Arc<tokio::sync::RwLock<Option<WebSocketClient>>>,
69 rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
70 signal: Arc<AtomicBool>,
71 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
72 subscriptions: Arc<DashMap<CoinbaseIntxWsChannel, AHashSet<Ustr>>>,
73 instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
74}
75
76impl Default for CoinbaseIntxWebSocketClient {
77 fn default() -> Self {
78 Self::new(None, None, None, None, Some(10)).expect("Failed to create client")
79 }
80}
81
82impl CoinbaseIntxWebSocketClient {
83 pub fn new(
89 url: Option<String>,
90 api_key: Option<String>,
91 api_secret: Option<String>,
92 api_passphrase: Option<String>,
93 heartbeat: Option<u64>,
94 ) -> anyhow::Result<Self> {
95 let url = url.unwrap_or(COINBASE_INTX_WS_URL.to_string());
96 let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
97 let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
98 let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
99
100 let credential = Credential::new(api_key, api_secret, api_passphrase);
101 let signal = Arc::new(AtomicBool::new(false));
102 let subscriptions = Arc::new(DashMap::new());
103 let instruments_cache = Arc::new(AHashMap::new());
104
105 Ok(Self {
106 url,
107 credential,
108 heartbeat,
109 inner: Arc::new(tokio::sync::RwLock::new(None)),
110 rx: None,
111 signal,
112 task_handle: None,
113 subscriptions,
114 instruments_cache,
115 })
116 }
117
118 pub fn from_env() -> anyhow::Result<Self> {
125 Self::new(None, None, None, None, None)
126 }
127
128 #[must_use]
130 pub const fn url(&self) -> &str {
131 self.url.as_str()
132 }
133
134 #[must_use]
136 pub fn api_key(&self) -> &str {
137 self.credential.api_key.as_str()
138 }
139
140 #[must_use]
142 pub fn is_active(&self) -> bool {
143 self.inner
144 .try_read()
145 .ok()
146 .and_then(|guard| {
147 guard
148 .as_ref()
149 .map(nautilus_network::websocket::WebSocketClient::is_active)
150 })
151 .unwrap_or(false)
152 }
153
154 #[must_use]
156 pub fn is_closed(&self) -> bool {
157 self.inner
158 .try_read()
159 .ok()
160 .and_then(|guard| {
161 guard
162 .as_ref()
163 .map(nautilus_network::websocket::WebSocketClient::is_closed)
164 })
165 .unwrap_or(true)
166 }
167
168 pub fn initialize_instruments_cache(&mut self, instruments: Vec<InstrumentAny>) {
170 let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
171 for inst in instruments {
172 instruments_cache.insert(inst.symbol().inner(), inst.clone());
173 }
174
175 self.instruments_cache = Arc::new(instruments_cache);
176 }
177
178 #[must_use]
180 pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<CoinbaseIntxWsChannel> {
181 let product_id = instrument_id.symbol.inner();
182 let mut channels = Vec::new();
183
184 for entry in self.subscriptions.iter() {
185 let (channel, instruments) = entry.pair();
186 if instruments.contains(&product_id) {
187 channels.push(*channel);
188 }
189 }
190
191 channels
192 }
193
194 pub async fn connect(&mut self) -> anyhow::Result<()> {
200 let client = self.clone();
201 let post_reconnect = Arc::new(move || {
202 let client = client.clone();
203 tokio::spawn(async move { client.resubscribe_all().await });
204 });
205
206 let config = WebSocketConfig {
207 url: self.url.clone(),
208 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
209 message_handler: None, heartbeat: self.heartbeat,
211 heartbeat_msg: None,
212 ping_handler: None,
213 reconnect_timeout_ms: Some(5_000),
214 reconnect_delay_initial_ms: None, reconnect_delay_max_ms: None, reconnect_backoff_factor: None, reconnect_jitter_ms: None, };
219 let (reader, client) =
220 WebSocketClient::connect_stream(config, vec![], None, Some(post_reconnect)).await?;
221
222 *self.inner.write().await = Some(client);
223
224 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
225 self.rx = Some(Arc::new(rx));
226 let signal = self.signal.clone();
227
228 let instruments_cache = (*self.instruments_cache).clone();
230
231 let stream_handle = get_runtime().spawn(async move {
232 CoinbaseIntxWsMessageHandler::new(reader, signal, tx, instruments_cache)
233 .run()
234 .await;
235 });
236
237 self.task_handle = Some(Arc::new(stream_handle));
238
239 Ok(())
240 }
241
242 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), CoinbaseIntxWsError> {
248 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
249
250 tokio::time::timeout(timeout, async {
251 while !self.is_active() {
252 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
253 }
254 })
255 .await
256 .map_err(|_| {
257 CoinbaseIntxWsError::ClientError(format!(
258 "WebSocket connection timeout after {timeout_secs} seconds"
259 ))
260 })?;
261
262 Ok(())
263 }
264
265 pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + 'static {
273 let rx = self
274 .rx
275 .take()
276 .expect("Data stream receiver already taken or not connected"); let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
278 async_stream::stream! {
279 while let Some(data) = rx.recv().await {
280 yield data;
281 }
282 }
283 }
284
285 pub async fn close(&mut self) -> Result<(), Error> {
291 tracing::debug!("Closing");
292 self.signal.store(true, Ordering::Relaxed);
293
294 match tokio::time::timeout(Duration::from_secs(5), async {
295 if let Some(inner) = self.inner.read().await.as_ref() {
296 inner.disconnect().await;
297 } else {
298 log::error!("Error on close: not connected");
299 }
300 })
301 .await
302 {
303 Ok(()) => {
304 tracing::debug!("Inner disconnected");
305 }
306 Err(_) => {
307 tracing::error!("Timeout waiting for inner client to disconnect");
308 }
309 }
310
311 log::debug!("Closed");
312
313 Ok(())
314 }
315
316 async fn subscribe(
322 &self,
323 channels: Vec<CoinbaseIntxWsChannel>,
324 product_ids: Vec<Ustr>,
325 ) -> Result<(), CoinbaseIntxWsError> {
326 for channel in &channels {
328 self.subscriptions
329 .entry(*channel)
330 .or_default()
331 .extend(product_ids.clone());
332 }
333 tracing::debug!(
334 "Added active subscription(s): channels={channels:?}, product_ids={product_ids:?}"
335 );
336
337 let time = chrono::DateTime::<Utc>::from(SystemTime::now())
338 .timestamp()
339 .to_string();
340 let signature = self.credential.sign_ws(&time);
341 let message = CoinbaseIntxSubscription {
342 op: WsOperation::Subscribe,
343 product_ids: Some(product_ids),
344 channels,
345 time,
346 key: self.credential.api_key,
347 passphrase: self.credential.api_passphrase,
348 signature,
349 };
350
351 let json_txt = serde_json::to_string(&message)
352 .map_err(|e| CoinbaseIntxWsError::JsonError(e.to_string()))?;
353
354 if let Some(inner) = self.inner.read().await.as_ref() {
355 if let Err(err) = inner.send_text(json_txt, None).await {
356 tracing::error!("Error sending message: {err:?}");
357 }
358 } else {
359 return Err(CoinbaseIntxWsError::ClientError(
360 "Cannot send message: not connected".to_string(),
361 ));
362 }
363
364 Ok(())
365 }
366
367 async fn unsubscribe(
369 &self,
370 channels: Vec<CoinbaseIntxWsChannel>,
371 product_ids: Vec<Ustr>,
372 ) -> Result<(), CoinbaseIntxWsError> {
373 for channel in &channels {
375 if let Some(mut entry) = self.subscriptions.get_mut(channel) {
376 for product_id in &product_ids {
377 entry.remove(product_id);
378 }
379 if entry.is_empty() {
380 drop(entry);
381 self.subscriptions.remove(channel);
382 }
383 }
384 }
385 tracing::debug!(
386 "Removed active subscription(s): channels={channels:?}, product_ids={product_ids:?}"
387 );
388
389 let time = chrono::DateTime::<Utc>::from(SystemTime::now())
390 .timestamp()
391 .to_string();
392 let signature = self.credential.sign_ws(&time);
393 let message = CoinbaseIntxSubscription {
394 op: WsOperation::Unsubscribe,
395 product_ids: Some(product_ids),
396 channels,
397 time,
398 key: self.credential.api_key,
399 passphrase: self.credential.api_passphrase,
400 signature,
401 };
402
403 let json_txt = serde_json::to_string(&message)
404 .map_err(|e| CoinbaseIntxWsError::JsonError(e.to_string()))?;
405
406 if let Some(inner) = self.inner.read().await.as_ref() {
407 if let Err(err) = inner.send_text(json_txt, None).await {
408 tracing::error!("Error sending message: {err:?}");
409 }
410 } else {
411 return Err(CoinbaseIntxWsError::ClientError(
412 "Cannot send message: not connected".to_string(),
413 ));
414 }
415
416 Ok(())
417 }
418
419 async fn resubscribe_all(&self) {
421 let mut subs = Vec::new();
422 for entry in self.subscriptions.iter() {
423 let (channel, product_ids) = entry.pair();
424 if !product_ids.is_empty() {
425 subs.push((*channel, product_ids.clone()));
426 }
427 }
428
429 for (channel, product_ids) in subs {
430 tracing::debug!("Resubscribing: channel={channel}, product_ids={product_ids:?}");
431
432 if let Err(e) = self
433 .subscribe(vec![channel], product_ids.into_iter().collect())
434 .await
435 {
436 tracing::error!("Failed to resubscribe to channel {channel}: {e}");
437 }
438 }
439 }
440
441 pub async fn subscribe_instruments(
448 &self,
449 instrument_ids: Vec<InstrumentId>,
450 ) -> Result<(), CoinbaseIntxWsError> {
451 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
452 self.subscribe(vec![CoinbaseIntxWsChannel::Instruments], product_ids)
453 .await
454 }
455
456 pub async fn subscribe_funding_rates(
463 &self,
464 instrument_ids: Vec<InstrumentId>,
465 ) -> Result<(), CoinbaseIntxWsError> {
466 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
467 self.subscribe(vec![CoinbaseIntxWsChannel::Funding], product_ids)
468 .await
469 }
470
471 pub async fn subscribe_risk(
478 &self,
479 instrument_ids: Vec<InstrumentId>,
480 ) -> Result<(), CoinbaseIntxWsError> {
481 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
482 self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
483 .await
484 }
485
486 pub async fn subscribe_book(
493 &self,
494 instrument_ids: Vec<InstrumentId>,
495 ) -> Result<(), CoinbaseIntxWsError> {
496 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
497 self.subscribe(vec![CoinbaseIntxWsChannel::Level2], product_ids)
498 .await
499 }
500
501 pub async fn subscribe_quotes(
508 &self,
509 instrument_ids: Vec<InstrumentId>,
510 ) -> Result<(), CoinbaseIntxWsError> {
511 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
512 self.subscribe(vec![CoinbaseIntxWsChannel::Level1], product_ids)
513 .await
514 }
515
516 pub async fn subscribe_trades(
523 &self,
524 instrument_ids: Vec<InstrumentId>,
525 ) -> Result<(), CoinbaseIntxWsError> {
526 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
527 self.subscribe(vec![CoinbaseIntxWsChannel::Match], product_ids)
528 .await
529 }
530
531 pub async fn subscribe_mark_prices(
538 &self,
539 instrument_ids: Vec<InstrumentId>,
540 ) -> Result<(), CoinbaseIntxWsError> {
541 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
542 self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
543 .await
544 }
545
546 pub async fn subscribe_index_prices(
553 &self,
554 instrument_ids: Vec<InstrumentId>,
555 ) -> Result<(), CoinbaseIntxWsError> {
556 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
557 self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
558 .await
559 }
560
561 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), CoinbaseIntxWsError> {
568 let channel = bar_spec_as_coinbase_channel(bar_type.spec())
569 .map_err(|e| CoinbaseIntxWsError::ClientError(e.to_string()))?;
570 let product_ids = vec![bar_type.standard().instrument_id().symbol.inner()];
571 self.subscribe(vec![channel], product_ids).await
572 }
573
574 pub async fn unsubscribe_instruments(
581 &self,
582 instrument_ids: Vec<InstrumentId>,
583 ) -> Result<(), CoinbaseIntxWsError> {
584 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
585 self.unsubscribe(vec![CoinbaseIntxWsChannel::Instruments], product_ids)
586 .await
587 }
588
589 pub async fn unsubscribe_risk(
596 &self,
597 instrument_ids: Vec<InstrumentId>,
598 ) -> Result<(), CoinbaseIntxWsError> {
599 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
600 self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
601 .await
602 }
603
604 pub async fn unsubscribe_funding(
611 &self,
612 instrument_ids: Vec<InstrumentId>,
613 ) -> Result<(), CoinbaseIntxWsError> {
614 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
615 self.unsubscribe(vec![CoinbaseIntxWsChannel::Funding], product_ids)
616 .await
617 }
618
619 pub async fn unsubscribe_book(
626 &self,
627 instrument_ids: Vec<InstrumentId>,
628 ) -> Result<(), CoinbaseIntxWsError> {
629 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
630 self.unsubscribe(vec![CoinbaseIntxWsChannel::Level2], product_ids)
631 .await
632 }
633
634 pub async fn unsubscribe_quotes(
641 &self,
642 instrument_ids: Vec<InstrumentId>,
643 ) -> Result<(), CoinbaseIntxWsError> {
644 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
645 self.unsubscribe(vec![CoinbaseIntxWsChannel::Level1], product_ids)
646 .await
647 }
648
649 pub async fn unsubscribe_trades(
656 &self,
657 instrument_ids: Vec<InstrumentId>,
658 ) -> Result<(), CoinbaseIntxWsError> {
659 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
660 self.unsubscribe(vec![CoinbaseIntxWsChannel::Match], product_ids)
661 .await
662 }
663
664 pub async fn unsubscribe_mark_prices(
671 &self,
672 instrument_ids: Vec<InstrumentId>,
673 ) -> Result<(), CoinbaseIntxWsError> {
674 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
675 self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
676 .await
677 }
678
679 pub async fn unsubscribe_index_prices(
686 &self,
687 instrument_ids: Vec<InstrumentId>,
688 ) -> Result<(), CoinbaseIntxWsError> {
689 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
690 self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
691 .await
692 }
693
694 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), CoinbaseIntxWsError> {
701 let channel = bar_spec_as_coinbase_channel(bar_type.spec())
702 .map_err(|e| CoinbaseIntxWsError::ClientError(e.to_string()))?;
703 let product_id = bar_type.standard().instrument_id().symbol.inner();
704 self.unsubscribe(vec![channel], vec![product_id]).await
705 }
706}
707
708fn instrument_ids_to_product_ids(instrument_ids: &[InstrumentId]) -> Vec<Ustr> {
709 instrument_ids.iter().map(|x| x.symbol.inner()).collect()
710}
711
712struct CoinbaseIntxFeedHandler {
714 reader: MessageReader,
715 signal: Arc<AtomicBool>,
716}
717
718impl CoinbaseIntxFeedHandler {
719 pub const fn new(reader: MessageReader, signal: Arc<AtomicBool>) -> Self {
721 Self { reader, signal }
722 }
723
724 async fn next(&mut self) -> Option<CoinbaseIntxWsMessage> {
726 let timeout = Duration::from_millis(10);
728
729 loop {
730 if self.signal.load(Ordering::Relaxed) {
731 tracing::debug!("Stop signal received");
732 break;
733 }
734
735 match tokio::time::timeout(timeout, self.reader.next()).await {
736 Ok(Some(msg)) => match msg {
737 Ok(Message::Pong(_)) => {
738 tracing::trace!("Received pong");
739 }
740 Ok(Message::Ping(_)) => {
741 tracing::trace!("Received pong"); }
743 Ok(Message::Text(text)) => {
744 match serde_json::from_str(&text) {
745 Ok(event) => match &event {
746 CoinbaseIntxWsMessage::Reject(msg) => {
747 tracing::error!("{msg:?}");
748 }
749 CoinbaseIntxWsMessage::Confirmation(msg) => {
750 tracing::debug!("{msg:?}");
751 continue;
752 }
753 CoinbaseIntxWsMessage::Instrument(_) => return Some(event),
754 CoinbaseIntxWsMessage::Funding(_) => return Some(event),
755 CoinbaseIntxWsMessage::Risk(_) => return Some(event),
756 CoinbaseIntxWsMessage::BookSnapshot(_) => return Some(event),
757 CoinbaseIntxWsMessage::BookUpdate(_) => return Some(event),
758 CoinbaseIntxWsMessage::Quote(_) => return Some(event),
759 CoinbaseIntxWsMessage::Trade(_) => return Some(event),
760 CoinbaseIntxWsMessage::CandleSnapshot(_) => return Some(event),
761 CoinbaseIntxWsMessage::CandleUpdate(_) => continue, },
763 Err(e) => {
764 tracing::error!("Failed to parse message: {e}: {text}");
765 break;
766 }
767 }
768 }
769 Ok(Message::Binary(msg)) => {
770 tracing::debug!("Raw binary: {msg:?}");
771 }
772 Ok(Message::Close(_)) => {
773 tracing::debug!("Received close message");
774 return None;
775 }
776 Ok(msg) => {
777 tracing::warn!("Unexpected message: {msg:?}");
778 }
779 Err(e) => {
780 tracing::error!("{e}, stopping client");
781 break; }
783 },
784 Ok(None) => {
785 tracing::info!("WebSocket stream closed");
786 break;
787 }
788 Err(_) => {} }
790 }
791
792 log_task_stopped("message-streaming");
793 None
794 }
795}
796
797struct CoinbaseIntxWsMessageHandler {
799 handler: CoinbaseIntxFeedHandler,
800 tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
801 instruments_cache: AHashMap<Ustr, InstrumentAny>,
802}
803
804impl CoinbaseIntxWsMessageHandler {
805 pub const fn new(
807 reader: MessageReader,
808 signal: Arc<AtomicBool>,
809 tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
810 instruments_cache: AHashMap<Ustr, InstrumentAny>,
811 ) -> Self {
812 let handler = CoinbaseIntxFeedHandler::new(reader, signal);
813 Self {
814 handler,
815 tx,
816 instruments_cache,
817 }
818 }
819
820 async fn run(&mut self) {
822 while let Some(data) = self.next().await {
823 if let Err(e) = self.tx.send(data) {
824 tracing::error!("Error sending data: {e}");
825 break; }
827 }
828 }
829
830 async fn next(&mut self) -> Option<NautilusWsMessage> {
832 let clock = get_atomic_clock_realtime();
833
834 while let Some(event) = self.handler.next().await {
835 match event {
836 CoinbaseIntxWsMessage::Instrument(msg) => {
837 if let Some(inst) = parse_instrument_any(&msg, clock.get_time_ns()) {
838 self.instruments_cache
840 .insert(inst.raw_symbol().inner(), inst.clone());
841 return Some(NautilusWsMessage::Instrument(inst));
842 }
843 }
844 CoinbaseIntxWsMessage::Funding(msg) => {
845 tracing::warn!("Received {msg:?}"); }
847 CoinbaseIntxWsMessage::BookSnapshot(msg) => {
848 if let Some(inst) = self.instruments_cache.get(&msg.product_id) {
849 match parse_orderbook_snapshot_msg(
850 &msg,
851 inst.id(),
852 inst.price_precision(),
853 inst.size_precision(),
854 clock.get_time_ns(),
855 ) {
856 Ok(deltas) => {
857 let deltas = OrderBookDeltas_API::new(deltas);
858 let data = Data::Deltas(deltas);
859 return Some(NautilusWsMessage::Data(data));
860 }
861 Err(e) => {
862 tracing::error!("Failed to parse orderbook snapshot: {e}");
863 return None;
864 }
865 }
866 }
867 tracing::error!("No instrument found for {}", msg.product_id);
868 return None;
869 }
870 CoinbaseIntxWsMessage::BookUpdate(msg) => {
871 if let Some(inst) = self.instruments_cache.get(&msg.product_id) {
872 match parse_orderbook_update_msg(
873 &msg,
874 inst.id(),
875 inst.price_precision(),
876 inst.size_precision(),
877 clock.get_time_ns(),
878 ) {
879 Ok(deltas) => {
880 let deltas = OrderBookDeltas_API::new(deltas);
881 let data = Data::Deltas(deltas);
882 return Some(NautilusWsMessage::Data(data));
883 }
884 Err(e) => {
885 tracing::error!("Failed to parse orderbook update: {e}");
886 }
887 }
888 } else {
889 tracing::error!("No instrument found for {}", msg.product_id);
890 }
891 }
892 CoinbaseIntxWsMessage::Quote(msg) => {
893 if let Some(inst) = self.instruments_cache.get(&msg.product_id) {
894 match parse_quote_msg(
895 &msg,
896 inst.id(),
897 inst.price_precision(),
898 inst.size_precision(),
899 clock.get_time_ns(),
900 ) {
901 Ok(quote) => return Some(NautilusWsMessage::Data(Data::Quote(quote))),
902 Err(e) => {
903 tracing::error!("Failed to parse quote: {e}");
904 }
905 }
906 } else {
907 tracing::error!("No instrument found for {}", msg.product_id);
908 }
909 }
910 CoinbaseIntxWsMessage::Trade(msg) => {
911 if let Some(inst) = self.instruments_cache.get(&msg.product_id) {
912 match parse_trade_msg(
913 &msg,
914 inst.id(),
915 inst.price_precision(),
916 inst.size_precision(),
917 clock.get_time_ns(),
918 ) {
919 Ok(trade) => return Some(NautilusWsMessage::Data(Data::Trade(trade))),
920 Err(e) => {
921 tracing::error!("Failed to parse trade: {e}");
922 }
923 }
924 } else {
925 tracing::error!("No instrument found for {}", msg.product_id);
926 }
927 }
928 CoinbaseIntxWsMessage::Risk(msg) => {
929 if let Some(inst) = self.instruments_cache.get(&msg.product_id) {
930 let mark_price = match parse_mark_price_msg(
931 &msg,
932 inst.id(),
933 inst.price_precision(),
934 clock.get_time_ns(),
935 ) {
936 Ok(mark_price) => Some(mark_price),
937 Err(e) => {
938 tracing::error!("Failed to parse mark price: {e}");
939 None
940 }
941 };
942
943 let index_price = match parse_index_price_msg(
944 &msg,
945 inst.id(),
946 inst.price_precision(),
947 clock.get_time_ns(),
948 ) {
949 Ok(index_price) => Some(index_price),
950 Err(e) => {
951 tracing::error!("Failed to parse index price: {e}");
952 None
953 }
954 };
955
956 match (mark_price, index_price) {
957 (Some(mark), Some(index)) => {
958 return Some(NautilusWsMessage::MarkAndIndex((mark, index)));
959 }
960 (Some(mark), None) => return Some(NautilusWsMessage::MarkPrice(mark)),
961 (None, Some(index)) => {
962 return Some(NautilusWsMessage::IndexPrice(index));
963 }
964 (None, None) => continue,
965 };
966 }
967 tracing::error!("No instrument found for {}", msg.product_id);
968 }
969 CoinbaseIntxWsMessage::CandleSnapshot(msg) => {
970 if let Some(inst) = self.instruments_cache.get(&msg.product_id) {
971 match parse_candle_msg(
972 &msg,
973 inst.id(),
974 inst.price_precision(),
975 inst.size_precision(),
976 clock.get_time_ns(),
977 ) {
978 Ok(bar) => return Some(NautilusWsMessage::Data(Data::Bar(bar))),
979 Err(e) => {
980 tracing::error!("Failed to parse candle: {e}");
981 }
982 }
983 } else {
984 tracing::error!("No instrument found for {}", msg.product_id);
985 }
986 }
987 _ => {
988 tracing::warn!("Not implemented: {event:?}");
989 }
990 }
991 }
992 None }
994}