1use std::{
17 collections::HashMap,
18 sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21 },
22 time::{Duration, SystemTime},
23};
24
25use chrono::Utc;
26use futures_util::{Stream, StreamExt};
27use nautilus_common::runtime::get_runtime;
28use nautilus_core::{consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
29use nautilus_model::{
30 data::{BarType, Data, OrderBookDeltas_API},
31 identifiers::InstrumentId,
32 instruments::{Instrument, InstrumentAny},
33};
34use nautilus_network::websocket::{Consumer, MessageReader, WebSocketClient, WebSocketConfig};
35use reqwest::header::USER_AGENT;
36use tokio::sync::Mutex;
37use tokio_tungstenite::tungstenite::{Error, Message};
38use ustr::Ustr;
39
40use super::{
41 enums::{CoinbaseIntxWsChannel, WsOperation},
42 error::CoinbaseIntxWsError,
43 messages::{CoinbaseIntxSubscription, CoinbaseIntxWsMessage, NautilusWsMessage},
44 parse::{
45 parse_candle_msg, parse_index_price_msg, parse_mark_price_msg,
46 parse_orderbook_snapshot_msg, parse_orderbook_update_msg, parse_quote_msg,
47 },
48};
49use crate::{
50 common::{
51 consts::COINBASE_INTX_WS_URL,
52 credential::{Credential, get_env_var},
53 parse::bar_spec_as_coinbase_channel,
54 },
55 websocket::parse::{parse_instrument_any, parse_trade_msg},
56};
57
58#[derive(Clone)]
60#[cfg_attr(
61 feature = "python",
62 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
63)]
64pub struct CoinbaseIntxWebSocketClient {
65 url: String,
66 credential: Credential,
67 heartbeat: Option<u64>,
68 inner: Option<Arc<WebSocketClient>>,
69 rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
70 signal: Arc<AtomicBool>,
71 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
72 subscriptions: Arc<Mutex<HashMap<CoinbaseIntxWsChannel, Vec<Ustr>>>>,
73}
74
75impl Default for CoinbaseIntxWebSocketClient {
76 fn default() -> Self {
77 Self::new(None, None, None, None, Some(10)).expect("Failed to create client")
78 }
79}
80
81impl CoinbaseIntxWebSocketClient {
82 pub fn new(
84 url: Option<String>,
85 api_key: Option<String>,
86 api_secret: Option<String>,
87 api_passphrase: Option<String>,
88 heartbeat: Option<u64>,
89 ) -> anyhow::Result<Self> {
90 let url = url.unwrap_or(COINBASE_INTX_WS_URL.to_string());
91 let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
92 let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
93 let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
94
95 let credential = Credential::new(api_key, api_secret, api_passphrase);
96 let signal = Arc::new(AtomicBool::new(false));
97 let subscriptions = Arc::new(Mutex::new(HashMap::new()));
98
99 Ok(Self {
100 url,
101 credential,
102 heartbeat,
103 inner: None,
104 rx: None,
105 signal,
106 task_handle: None,
107 subscriptions,
108 })
109 }
110
111 pub fn from_env() -> anyhow::Result<Self> {
114 Self::new(None, None, None, None, None)
115 }
116
117 pub fn url(&self) -> &str {
119 self.url.as_str()
120 }
121
122 pub fn api_key(&self) -> &str {
124 self.credential.api_key.as_str()
125 }
126
127 pub fn is_active(&self) -> bool {
129 match &self.inner {
130 Some(inner) => inner.is_active(),
131 None => false,
132 }
133 }
134
135 pub fn is_closed(&self) -> bool {
137 match &self.inner {
138 Some(inner) => inner.is_closed(),
139 None => true,
140 }
141 }
142
143 pub async fn connect(&mut self, instruments: Vec<InstrumentAny>) -> anyhow::Result<()> {
145 let client = self.clone();
146 let post_reconnect = Arc::new(move || {
147 let client = client.clone();
148 tokio::spawn(async move { client.resubscribe_all().await });
149 });
150
151 let config = WebSocketConfig {
152 url: self.url.clone(),
153 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
154 heartbeat: self.heartbeat,
155 heartbeat_msg: None,
156 handler: Consumer::Python(None),
157 ping_handler: None,
158 reconnect_timeout_ms: Some(5_000),
159 reconnect_delay_initial_ms: None, reconnect_delay_max_ms: None, reconnect_backoff_factor: None, reconnect_jitter_ms: None, };
164 let (reader, client) =
165 WebSocketClient::connect_stream(config, vec![], None, Some(post_reconnect)).await?;
166
167 self.inner = Some(Arc::new(client));
168
169 let mut instruments_map: HashMap<Ustr, InstrumentAny> = HashMap::new();
170 for inst in instruments {
171 instruments_map.insert(inst.raw_symbol().inner(), inst);
172 }
173
174 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
175 self.rx = Some(Arc::new(rx));
176 let signal = self.signal.clone();
177
178 let stream_handle = get_runtime().spawn(async move {
179 CoinbaseIntxWsMessageHandler::new(instruments_map, reader, signal, tx)
180 .run()
181 .await;
182 });
183
184 self.task_handle = Some(Arc::new(stream_handle));
185
186 Ok(())
187 }
188
189 pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + 'static {
197 let rx = self
198 .rx
199 .take()
200 .expect("Data stream receiver already taken or not connected"); let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
202 async_stream::stream! {
203 while let Some(data) = rx.recv().await {
204 yield data;
205 }
206 }
207 }
208
209 pub async fn close(&mut self) -> Result<(), Error> {
211 tracing::debug!("Closing");
212 self.signal.store(true, Ordering::Relaxed);
213
214 match tokio::time::timeout(Duration::from_secs(5), async {
215 if let Some(inner) = &self.inner {
216 inner.disconnect().await;
217 } else {
218 log::error!("Error on close: not connected");
219 }
220 })
221 .await
222 {
223 Ok(()) => {
224 tracing::debug!("Inner disconnected");
225 }
226 Err(_) => {
227 tracing::error!("Timeout waiting for inner client to disconnect");
228 }
229 }
230
231 log::debug!("Closed");
232
233 Ok(())
234 }
235
236 async fn subscribe(
238 &self,
239 channels: Vec<CoinbaseIntxWsChannel>,
240 product_ids: Vec<Ustr>,
241 ) -> Result<(), CoinbaseIntxWsError> {
242 let mut active_subs = self.subscriptions.lock().await;
244 for channel in &channels {
245 active_subs
246 .entry(*channel)
247 .or_insert_with(Vec::new)
248 .extend(product_ids.clone());
249 }
250 tracing::debug!(
251 "Added active subscription(s): channels={channels:?}, product_ids={product_ids:?}"
252 );
253
254 let time = chrono::DateTime::<Utc>::from(SystemTime::now())
255 .timestamp()
256 .to_string();
257 let signature = self.credential.sign_ws(&time);
258 let message = CoinbaseIntxSubscription {
259 op: WsOperation::Subscribe,
260 product_ids: Some(product_ids),
261 channels,
262 time,
263 key: self.credential.api_key,
264 passphrase: self.credential.api_passphrase,
265 signature,
266 };
267
268 let json_txt = serde_json::to_string(&message)
269 .map_err(|e| CoinbaseIntxWsError::JsonError(e.to_string()))?;
270
271 if let Some(inner) = &self.inner {
272 inner.send_text(json_txt, None).await;
273 } else {
274 return Err(CoinbaseIntxWsError::ClientError(
275 "Cannot send message: not connected".to_string(),
276 ));
277 }
278
279 Ok(())
280 }
281
282 async fn unsubscribe(
284 &self,
285 channels: Vec<CoinbaseIntxWsChannel>,
286 product_ids: Vec<Ustr>,
287 ) -> Result<(), CoinbaseIntxWsError> {
288 let mut active_subs = self.subscriptions.lock().await;
290 for channel in &channels {
291 if let Some(subs) = active_subs.get_mut(channel) {
292 for product_id in &product_ids {
293 subs.retain(|pid| pid != product_id);
294 }
295 if subs.is_empty() {
296 active_subs.remove(channel);
297 }
298 }
299 }
300 tracing::debug!(
301 "Removed active subscription(s): channels={channels:?}, product_ids={product_ids:?}"
302 );
303
304 let time = chrono::DateTime::<Utc>::from(SystemTime::now())
305 .timestamp()
306 .to_string();
307 let signature = self.credential.sign_ws(&time);
308 let message = CoinbaseIntxSubscription {
309 op: WsOperation::Unsubscribe,
310 product_ids: Some(product_ids),
311 channels,
312 time,
313 key: self.credential.api_key,
314 passphrase: self.credential.api_passphrase,
315 signature,
316 };
317
318 let json_txt = serde_json::to_string(&message)
319 .map_err(|e| CoinbaseIntxWsError::JsonError(e.to_string()))?;
320
321 if let Some(inner) = &self.inner {
322 inner.send_text(json_txt, None).await;
323 } else {
324 return Err(CoinbaseIntxWsError::ClientError(
325 "Cannot send message: not connected".to_string(),
326 ));
327 }
328
329 Ok(())
330 }
331
332 async fn resubscribe_all(&self) {
334 let subs = self.subscriptions.lock().await.clone();
335
336 for (channel, product_ids) in subs {
337 if product_ids.is_empty() {
338 continue;
339 }
340
341 tracing::debug!("Resubscribing: channel={channel}, product_ids={product_ids:?}");
342
343 if let Err(e) = self.subscribe(vec![channel], product_ids).await {
344 tracing::error!("Failed to resubscribe to channel {channel}: {e}");
345 }
346 }
347 }
348
349 pub async fn subscribe_instruments(
351 &self,
352 instrument_ids: Vec<InstrumentId>,
353 ) -> Result<(), CoinbaseIntxWsError> {
354 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
355 self.subscribe(vec![CoinbaseIntxWsChannel::Instruments], product_ids)
356 .await
357 }
358
359 pub async fn subscribe_funding(
361 &self,
362 instrument_ids: Vec<InstrumentId>,
363 ) -> Result<(), CoinbaseIntxWsError> {
364 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
365 self.subscribe(vec![CoinbaseIntxWsChannel::Funding], product_ids)
366 .await
367 }
368
369 pub async fn subscribe_risk(
371 &self,
372 instrument_ids: Vec<InstrumentId>,
373 ) -> Result<(), CoinbaseIntxWsError> {
374 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
375 self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
376 .await
377 }
378
379 pub async fn subscribe_order_book(
381 &self,
382 instrument_ids: Vec<InstrumentId>,
383 ) -> Result<(), CoinbaseIntxWsError> {
384 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
385 self.subscribe(vec![CoinbaseIntxWsChannel::Level2], product_ids)
386 .await
387 }
388
389 pub async fn subscribe_quotes(
391 &self,
392 instrument_ids: Vec<InstrumentId>,
393 ) -> Result<(), CoinbaseIntxWsError> {
394 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
395 self.subscribe(vec![CoinbaseIntxWsChannel::Level1], product_ids)
396 .await
397 }
398
399 pub async fn subscribe_trades(
401 &self,
402 instrument_ids: Vec<InstrumentId>,
403 ) -> Result<(), CoinbaseIntxWsError> {
404 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
405 self.subscribe(vec![CoinbaseIntxWsChannel::Match], product_ids)
406 .await
407 }
408
409 pub async fn subscribe_mark_prices(
411 &self,
412 instrument_ids: Vec<InstrumentId>,
413 ) -> Result<(), CoinbaseIntxWsError> {
414 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
415 self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
416 .await
417 }
418
419 pub async fn subscribe_index_prices(
421 &self,
422 instrument_ids: Vec<InstrumentId>,
423 ) -> Result<(), CoinbaseIntxWsError> {
424 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
425 self.subscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
426 .await
427 }
428
429 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), CoinbaseIntxWsError> {
431 let channel = bar_spec_as_coinbase_channel(bar_type.spec())
432 .map_err(|e| CoinbaseIntxWsError::ClientError(e.to_string()))?;
433 let product_ids = vec![bar_type.standard().instrument_id().symbol.inner()];
434 self.subscribe(vec![channel], product_ids).await
435 }
436
437 pub async fn unsubscribe_instruments(
439 &self,
440 instrument_ids: Vec<InstrumentId>,
441 ) -> Result<(), CoinbaseIntxWsError> {
442 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
443 self.unsubscribe(vec![CoinbaseIntxWsChannel::Instruments], product_ids)
444 .await
445 }
446
447 pub async fn unsubscribe_risk(
449 &self,
450 instrument_ids: Vec<InstrumentId>,
451 ) -> Result<(), CoinbaseIntxWsError> {
452 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
453 self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
454 .await
455 }
456
457 pub async fn unsubscribe_funding(
459 &self,
460 instrument_ids: Vec<InstrumentId>,
461 ) -> Result<(), CoinbaseIntxWsError> {
462 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
463 self.unsubscribe(vec![CoinbaseIntxWsChannel::Funding], product_ids)
464 .await
465 }
466
467 pub async fn unsubscribe_order_book(
469 &self,
470 instrument_ids: Vec<InstrumentId>,
471 ) -> Result<(), CoinbaseIntxWsError> {
472 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
473 self.unsubscribe(vec![CoinbaseIntxWsChannel::Level2], product_ids)
474 .await
475 }
476
477 pub async fn unsubscribe_quotes(
479 &self,
480 instrument_ids: Vec<InstrumentId>,
481 ) -> Result<(), CoinbaseIntxWsError> {
482 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
483 self.unsubscribe(vec![CoinbaseIntxWsChannel::Level1], product_ids)
484 .await
485 }
486
487 pub async fn unsubscribe_trades(
489 &self,
490 instrument_ids: Vec<InstrumentId>,
491 ) -> Result<(), CoinbaseIntxWsError> {
492 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
493 self.unsubscribe(vec![CoinbaseIntxWsChannel::Match], product_ids)
494 .await
495 }
496
497 pub async fn unsubscribe_mark_prices(
499 &self,
500 instrument_ids: Vec<InstrumentId>,
501 ) -> Result<(), CoinbaseIntxWsError> {
502 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
503 self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
504 .await
505 }
506
507 pub async fn unsubscribe_index_prices(
509 &self,
510 instrument_ids: Vec<InstrumentId>,
511 ) -> Result<(), CoinbaseIntxWsError> {
512 let product_ids = instrument_ids_to_product_ids(&instrument_ids);
513 self.unsubscribe(vec![CoinbaseIntxWsChannel::Risk], product_ids)
514 .await
515 }
516
517 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), CoinbaseIntxWsError> {
519 let channel = bar_spec_as_coinbase_channel(bar_type.spec())
520 .map_err(|e| CoinbaseIntxWsError::ClientError(e.to_string()))?;
521 let product_id = bar_type.standard().instrument_id().symbol.inner();
522 self.unsubscribe(vec![channel], vec![product_id]).await
523 }
524}
525
526fn instrument_ids_to_product_ids(instrument_ids: &[InstrumentId]) -> Vec<Ustr> {
527 instrument_ids.iter().map(|x| x.symbol.inner()).collect()
528}
529
530struct CoinbaseIntxFeedHandler {
532 reader: MessageReader,
533 signal: Arc<AtomicBool>,
534}
535
536impl CoinbaseIntxFeedHandler {
537 pub const fn new(reader: MessageReader, signal: Arc<AtomicBool>) -> Self {
539 Self { reader, signal }
540 }
541
542 async fn next(&mut self) -> Option<CoinbaseIntxWsMessage> {
544 let timeout = Duration::from_millis(10);
546
547 loop {
548 if self.signal.load(Ordering::Relaxed) {
549 tracing::debug!("Stop signal received");
550 break;
551 }
552
553 match tokio::time::timeout(timeout, self.reader.next()).await {
554 Ok(Some(msg)) => match msg {
555 Ok(Message::Pong(_)) => {
556 tracing::trace!("Received pong");
557 }
558 Ok(Message::Ping(_)) => {
559 tracing::trace!("Received pong"); }
561 Ok(Message::Text(text)) => {
562 match serde_json::from_str(&text) {
563 Ok(event) => match &event {
564 CoinbaseIntxWsMessage::Reject(msg) => {
565 tracing::error!("{msg:?}");
566 }
567 CoinbaseIntxWsMessage::Confirmation(msg) => {
568 tracing::debug!("{msg:?}");
569 continue;
570 }
571 CoinbaseIntxWsMessage::Instrument(_) => return Some(event),
572 CoinbaseIntxWsMessage::Funding(_) => return Some(event),
573 CoinbaseIntxWsMessage::Risk(_) => return Some(event),
574 CoinbaseIntxWsMessage::BookSnapshot(_) => return Some(event),
575 CoinbaseIntxWsMessage::BookUpdate(_) => return Some(event),
576 CoinbaseIntxWsMessage::Quote(_) => return Some(event),
577 CoinbaseIntxWsMessage::Trade(_) => return Some(event),
578 CoinbaseIntxWsMessage::CandleSnapshot(_) => return Some(event),
579 CoinbaseIntxWsMessage::CandleUpdate(_) => continue, },
581 Err(e) => {
582 tracing::error!("Failed to parse message: {e}: {text}");
583 break;
584 }
585 }
586 }
587 Ok(Message::Binary(msg)) => {
588 tracing::debug!("Raw binary: {msg:?}");
589 }
590 Ok(Message::Close(_)) => {
591 tracing::debug!("Received close message");
592 return None;
593 }
594 Ok(msg) => {
595 tracing::warn!("Unexpected message: {msg:?}");
596 }
597 Err(e) => {
598 tracing::error!("{e}, stopping client");
599 break; }
601 },
602 Ok(None) => {
603 tracing::info!("WebSocket stream closed");
604 break;
605 }
606 Err(_) => {} }
608 }
609
610 tracing::debug!("Stopped message streaming");
611 None
612 }
613}
614
615struct CoinbaseIntxWsMessageHandler {
617 instruments: HashMap<Ustr, InstrumentAny>,
618 handler: CoinbaseIntxFeedHandler,
619 tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
620}
621
622impl CoinbaseIntxWsMessageHandler {
623 pub const fn new(
625 instruments: HashMap<Ustr, InstrumentAny>,
626 reader: MessageReader,
627 signal: Arc<AtomicBool>,
628 tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
629 ) -> Self {
630 let handler = CoinbaseIntxFeedHandler::new(reader, signal);
631 Self {
632 instruments,
633 handler,
634 tx,
635 }
636 }
637
638 async fn run(&mut self) {
640 while let Some(data) = self.next().await {
641 if let Err(e) = self.tx.send(data) {
642 tracing::error!("Error sending data: {e}");
643 break; }
645 }
646 }
647
648 async fn next(&mut self) -> Option<NautilusWsMessage> {
650 let clock = get_atomic_clock_realtime();
651
652 while let Some(event) = self.handler.next().await {
653 match event {
654 CoinbaseIntxWsMessage::Instrument(msg) => {
655 if let Some(inst) = parse_instrument_any(&msg, clock.get_time_ns()) {
656 self.instruments
658 .insert(inst.raw_symbol().inner(), inst.clone());
659 return Some(NautilusWsMessage::Instrument(inst));
660 }
661 }
662 CoinbaseIntxWsMessage::Funding(msg) => {
663 tracing::warn!("Received {msg:?}"); }
665 CoinbaseIntxWsMessage::BookSnapshot(msg) => {
666 if let Some(inst) = self.instruments.get(&msg.product_id) {
667 match parse_orderbook_snapshot_msg(
668 &msg,
669 inst.id(),
670 inst.price_precision(),
671 inst.size_precision(),
672 clock.get_time_ns(),
673 ) {
674 Ok(deltas) => {
675 let deltas = OrderBookDeltas_API::new(deltas);
676 let data = Data::Deltas(deltas);
677 return Some(NautilusWsMessage::Data(data));
678 }
679 Err(e) => {
680 tracing::error!("Failed to parse orderbook snapshot: {e}");
681 return None;
682 }
683 }
684 } else {
685 tracing::error!("No instrument found for {}", msg.product_id);
686 return None;
687 }
688 }
689 CoinbaseIntxWsMessage::BookUpdate(msg) => {
690 if let Some(inst) = self.instruments.get(&msg.product_id) {
691 match parse_orderbook_update_msg(
692 &msg,
693 inst.id(),
694 inst.price_precision(),
695 inst.size_precision(),
696 clock.get_time_ns(),
697 ) {
698 Ok(deltas) => {
699 let deltas = OrderBookDeltas_API::new(deltas);
700 let data = Data::Deltas(deltas);
701 return Some(NautilusWsMessage::Data(data));
702 }
703 Err(e) => {
704 tracing::error!("Failed to parse orderbook update: {e}");
705 }
706 }
707 } else {
708 tracing::error!("No instrument found for {}", msg.product_id);
709 }
710 }
711 CoinbaseIntxWsMessage::Quote(msg) => {
712 if let Some(inst) = self.instruments.get(&msg.product_id) {
713 match parse_quote_msg(
714 &msg,
715 inst.id(),
716 inst.price_precision(),
717 inst.size_precision(),
718 clock.get_time_ns(),
719 ) {
720 Ok(quote) => return Some(NautilusWsMessage::Data(Data::Quote(quote))),
721 Err(e) => {
722 tracing::error!("Failed to parse quote: {e}");
723 }
724 }
725 } else {
726 tracing::error!("No instrument found for {}", msg.product_id);
727 }
728 }
729 CoinbaseIntxWsMessage::Trade(msg) => {
730 if let Some(inst) = self.instruments.get(&msg.product_id) {
731 match parse_trade_msg(
732 &msg,
733 inst.id(),
734 inst.price_precision(),
735 inst.size_precision(),
736 clock.get_time_ns(),
737 ) {
738 Ok(trade) => return Some(NautilusWsMessage::Data(Data::Trade(trade))),
739 Err(e) => {
740 tracing::error!("Failed to parse trade: {e}");
741 }
742 }
743 } else {
744 tracing::error!("No instrument found for {}", msg.product_id);
745 }
746 }
747 CoinbaseIntxWsMessage::Risk(msg) => {
748 if let Some(inst) = self.instruments.get(&msg.product_id) {
749 let mark_price = match parse_mark_price_msg(
750 &msg,
751 inst.id(),
752 inst.price_precision(),
753 clock.get_time_ns(),
754 ) {
755 Ok(mark_price) => Some(mark_price),
756 Err(e) => {
757 tracing::error!("Failed to parse mark price: {e}");
758 None
759 }
760 };
761
762 let index_price = match parse_index_price_msg(
763 &msg,
764 inst.id(),
765 inst.price_precision(),
766 clock.get_time_ns(),
767 ) {
768 Ok(index_price) => Some(index_price),
769 Err(e) => {
770 tracing::error!("Failed to parse index price: {e}");
771 None
772 }
773 };
774
775 match (mark_price, index_price) {
776 (Some(mark), Some(index)) => {
777 return Some(NautilusWsMessage::MarkAndIndex((mark, index)));
778 }
779 (Some(mark), None) => return Some(NautilusWsMessage::MarkPrice(mark)),
780 (None, Some(index)) => {
781 return Some(NautilusWsMessage::IndexPrice(index));
782 }
783 (None, None) => continue,
784 };
785 } else {
786 tracing::error!("No instrument found for {}", msg.product_id);
787 }
788 }
789 CoinbaseIntxWsMessage::CandleSnapshot(msg) => {
790 if let Some(inst) = self.instruments.get(&msg.product_id) {
791 match parse_candle_msg(
792 &msg,
793 inst.id(),
794 inst.price_precision(),
795 inst.size_precision(),
796 clock.get_time_ns(),
797 ) {
798 Ok(bar) => return Some(NautilusWsMessage::Data(Data::Bar(bar))),
799 Err(e) => {
800 tracing::error!("Failed to parse candle: {e}");
801 }
802 }
803 } else {
804 tracing::error!("No instrument found for {}", msg.product_id);
805 }
806 }
807 _ => {
808 tracing::warn!("Not implemented: {event:?}");
809 }
810 }
811 }
812 None }
814}