1use std::sync::{
19 Arc, RwLock,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use chrono::{DateTime, Utc};
26use futures_util::{StreamExt, pin_mut};
27use nautilus_common::{
28 live::runner::get_data_event_sender,
29 messages::{
30 DataEvent,
31 data::{
32 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
34 SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeFundingRates,
35 SubscribeIndexPrices, SubscribeInstrument, SubscribeInstruments, SubscribeMarkPrices,
36 SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
37 UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeFundingRates,
38 UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
39 },
40 },
41};
42use nautilus_core::{
43 MUTEX_POISONED, UnixNanos,
44 time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_data::client::DataClient;
47use nautilus_model::{
48 data::{Data, FundingRateUpdate, OrderBookDeltas_API},
49 enums::BookType,
50 identifiers::{ClientId, InstrumentId, Venue},
51 instruments::{Instrument, InstrumentAny},
52};
53use tokio::{task::JoinHandle, time::Duration};
54use tokio_util::sync::CancellationToken;
55
56use crate::{
57 common::{
58 consts::OKX_VENUE,
59 enums::{OKXBookChannel, OKXContractType, OKXInstrumentType, OKXVipLevel},
60 parse::okx_instrument_type_from_symbol,
61 },
62 config::OKXDataClientConfig,
63 http::client::OKXHttpClient,
64 websocket::{client::OKXWebSocketClient, messages::NautilusWsMessage},
65};
66
67#[derive(Debug)]
68pub struct OKXDataClient {
69 client_id: ClientId,
70 config: OKXDataClientConfig,
71 http_client: OKXHttpClient,
72 ws_public: Option<OKXWebSocketClient>,
73 ws_business: Option<OKXWebSocketClient>,
74 is_connected: AtomicBool,
75 cancellation_token: CancellationToken,
76 tasks: Vec<JoinHandle<()>>,
77 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
78 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
79 book_channels: Arc<RwLock<AHashMap<InstrumentId, OKXBookChannel>>>,
80 clock: &'static AtomicTime,
81}
82
83impl OKXDataClient {
84 pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
90 let clock = get_atomic_clock_realtime();
91 let data_sender = get_data_event_sender();
92
93 let http_client = if config.has_api_credentials() {
94 OKXHttpClient::with_credentials(
95 config.api_key.clone(),
96 config.api_secret.clone(),
97 config.api_passphrase.clone(),
98 config.base_url_http.clone(),
99 config.http_timeout_secs,
100 config.max_retries,
101 config.retry_delay_initial_ms,
102 config.retry_delay_max_ms,
103 config.is_demo,
104 config.http_proxy_url.clone(),
105 )?
106 } else {
107 OKXHttpClient::new(
108 config.base_url_http.clone(),
109 config.http_timeout_secs,
110 config.max_retries,
111 config.retry_delay_initial_ms,
112 config.retry_delay_max_ms,
113 config.is_demo,
114 config.http_proxy_url.clone(),
115 )?
116 };
117
118 let ws_public = OKXWebSocketClient::new(
119 Some(config.ws_public_url()),
120 None,
121 None,
122 None,
123 None,
124 Some(20), )
126 .context("failed to construct OKX public websocket client")?;
127
128 let ws_business = if config.requires_business_ws() {
129 Some(
130 OKXWebSocketClient::new(
131 Some(config.ws_business_url()),
132 config.api_key.clone(),
133 config.api_secret.clone(),
134 config.api_passphrase.clone(),
135 None,
136 Some(20), )
138 .context("failed to construct OKX business websocket client")?,
139 )
140 } else {
141 None
142 };
143
144 if let Some(vip_level) = config.vip_level {
145 ws_public.set_vip_level(vip_level);
146 if let Some(ref ws) = ws_business {
147 ws.set_vip_level(vip_level);
148 }
149 }
150
151 Ok(Self {
152 client_id,
153 config,
154 http_client,
155 ws_public: Some(ws_public),
156 ws_business,
157 is_connected: AtomicBool::new(false),
158 cancellation_token: CancellationToken::new(),
159 tasks: Vec::new(),
160 data_sender,
161 instruments: Arc::new(RwLock::new(AHashMap::new())),
162 book_channels: Arc::new(RwLock::new(AHashMap::new())),
163 clock,
164 })
165 }
166
167 fn venue(&self) -> Venue {
168 *OKX_VENUE
169 }
170
171 fn vip_level(&self) -> Option<OKXVipLevel> {
172 self.ws_public.as_ref().map(|ws| ws.vip_level())
173 }
174
175 fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
176 self.ws_public
177 .as_ref()
178 .context("public websocket client not initialized")
179 }
180
181 fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
182 self.ws_business
183 .as_ref()
184 .context("business websocket client not available (credentials required)")
185 }
186
187 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
188 if let Err(e) = sender.send(DataEvent::Data(data)) {
189 tracing::error!("Failed to emit data event: {e}");
190 }
191 }
192
193 fn spawn_ws<F>(&self, fut: F, context: &'static str)
194 where
195 F: Future<Output = anyhow::Result<()>> + Send + 'static,
196 {
197 tokio::spawn(async move {
198 if let Err(e) = fut.await {
199 tracing::error!("{context}: {e:?}");
200 }
201 });
202 }
203
204 fn handle_ws_message(
205 message: NautilusWsMessage,
206 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
207 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
208 ) {
209 match message {
210 NautilusWsMessage::Data(payloads) => {
211 for data in payloads {
212 Self::send_data(data_sender, data);
213 }
214 }
215 NautilusWsMessage::Deltas(deltas) => {
216 Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
217 }
218 NautilusWsMessage::FundingRates(updates) => {
219 emit_funding_rates(updates);
220 }
221 NautilusWsMessage::Instrument(instrument) => {
222 upsert_instrument(instruments, *instrument);
223 }
224 NautilusWsMessage::AccountUpdate(_)
225 | NautilusWsMessage::PositionUpdate(_)
226 | NautilusWsMessage::OrderAccepted(_)
227 | NautilusWsMessage::OrderCanceled(_)
228 | NautilusWsMessage::OrderExpired(_)
229 | NautilusWsMessage::OrderRejected(_)
230 | NautilusWsMessage::OrderCancelRejected(_)
231 | NautilusWsMessage::OrderModifyRejected(_)
232 | NautilusWsMessage::OrderTriggered(_)
233 | NautilusWsMessage::OrderUpdated(_)
234 | NautilusWsMessage::ExecutionReports(_) => {
235 tracing::debug!("Ignoring trading message on data client");
236 }
237 NautilusWsMessage::Error(e) => {
238 tracing::error!("OKX websocket error: {e:?}");
239 }
240 NautilusWsMessage::Raw(value) => {
241 tracing::debug!("Unhandled websocket payload: {value:?}");
242 }
243 NautilusWsMessage::Reconnected => {
244 tracing::info!("Websocket reconnected");
245 }
246 NautilusWsMessage::Authenticated => {
247 tracing::debug!("Websocket authenticated");
248 }
249 }
250 }
251}
252
253fn emit_funding_rates(updates: Vec<FundingRateUpdate>) {
254 if updates.is_empty() {
255 return;
256 }
257
258 for update in updates {
259 tracing::debug!(
260 "Received funding rate update for {} but forwarding is not yet supported",
261 update.instrument_id
262 );
263 }
264}
265
266fn upsert_instrument(
267 cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
268 instrument: InstrumentAny,
269) {
270 let mut guard = cache.write().expect(MUTEX_POISONED);
271 guard.insert(instrument.id(), instrument);
272}
273
274fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
275 value
276 .and_then(|dt| dt.timestamp_nanos_opt())
277 .and_then(|nanos| u64::try_from(nanos).ok())
278 .map(UnixNanos::from)
279}
280
281fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
282 contract_filter_with_config_types(config.contract_types.as_ref(), instrument)
283}
284
285fn contract_filter_with_config_types(
286 contract_types: Option<&Vec<OKXContractType>>,
287 instrument: &InstrumentAny,
288) -> bool {
289 match contract_types {
290 None => true,
291 Some(filter) if filter.is_empty() => true,
292 Some(filter) => {
293 let is_inverse = instrument.is_inverse();
294 (is_inverse && filter.contains(&OKXContractType::Inverse))
295 || (!is_inverse && filter.contains(&OKXContractType::Linear))
296 }
297 }
298}
299
300#[async_trait::async_trait(?Send)]
301impl DataClient for OKXDataClient {
302 fn client_id(&self) -> ClientId {
303 self.client_id
304 }
305
306 fn venue(&self) -> Option<Venue> {
307 Some(self.venue())
308 }
309
310 fn start(&mut self) -> anyhow::Result<()> {
311 tracing::info!(
312 client_id = %self.client_id,
313 vip_level = ?self.vip_level(),
314 instrument_types = ?self.config.instrument_types,
315 is_demo = self.config.is_demo,
316 http_proxy_url = ?self.config.http_proxy_url,
317 ws_proxy_url = ?self.config.ws_proxy_url,
318 "Started"
319 );
320 Ok(())
321 }
322
323 fn stop(&mut self) -> anyhow::Result<()> {
324 tracing::info!("Stopping {id}", id = self.client_id);
325 self.cancellation_token.cancel();
326 self.is_connected.store(false, Ordering::Relaxed);
327 Ok(())
328 }
329
330 fn reset(&mut self) -> anyhow::Result<()> {
331 tracing::debug!("Resetting {id}", id = self.client_id);
332 self.is_connected.store(false, Ordering::Relaxed);
333 self.cancellation_token = CancellationToken::new();
334 self.tasks.clear();
335 self.book_channels
336 .write()
337 .expect("book channel cache lock poisoned")
338 .clear();
339 Ok(())
340 }
341
342 fn dispose(&mut self) -> anyhow::Result<()> {
343 tracing::debug!("Disposing {id}", id = self.client_id);
344 self.stop()
345 }
346
347 async fn connect(&mut self) -> anyhow::Result<()> {
348 if self.is_connected() {
349 return Ok(());
350 }
351
352 let instrument_types = if self.config.instrument_types.is_empty() {
353 vec![OKXInstrumentType::Spot]
354 } else {
355 self.config.instrument_types.clone()
356 };
357
358 let mut all_instruments = Vec::new();
359 for inst_type in &instrument_types {
360 let mut fetched = self
361 .http_client
362 .request_instruments(*inst_type, None)
363 .await
364 .with_context(|| format!("failed to request OKX instruments for {inst_type:?}"))?;
365
366 fetched.retain(|instrument| contract_filter_with_config(&self.config, instrument));
367 self.http_client.cache_instruments(fetched.clone());
368
369 let mut guard = self.instruments.write().expect(MUTEX_POISONED);
370 for instrument in &fetched {
371 guard.insert(instrument.id(), instrument.clone());
372 }
373 drop(guard);
374
375 all_instruments.extend(fetched);
376 }
377
378 for instrument in all_instruments {
379 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
380 tracing::warn!("Failed to send instrument: {e}");
381 }
382 }
383
384 if let Some(ref mut ws) = self.ws_public {
385 let instruments: Vec<_> = self
387 .instruments
388 .read()
389 .expect(MUTEX_POISONED)
390 .values()
391 .cloned()
392 .collect();
393 ws.cache_instruments(instruments);
394
395 ws.connect()
396 .await
397 .context("failed to connect OKX public websocket")?;
398 ws.wait_until_active(10.0)
399 .await
400 .context("public websocket did not become active")?;
401
402 let stream = ws.stream();
403 let sender = self.data_sender.clone();
404 let insts = self.instruments.clone();
405 let cancel = self.cancellation_token.clone();
406 let handle = tokio::spawn(async move {
407 pin_mut!(stream);
408 loop {
409 tokio::select! {
410 Some(message) = stream.next() => {
411 Self::handle_ws_message(message, &sender, &insts);
412 }
413 _ = cancel.cancelled() => {
414 tracing::debug!("Public websocket stream task cancelled");
415 break;
416 }
417 }
418 }
419 });
420 self.tasks.push(handle);
421
422 for inst_type in &instrument_types {
423 ws.subscribe_instruments(*inst_type)
424 .await
425 .with_context(|| {
426 format!("failed to subscribe to instrument type {inst_type:?}")
427 })?;
428 }
429 }
430
431 if let Some(ref mut ws) = self.ws_business {
432 let instruments: Vec<_> = self
434 .instruments
435 .read()
436 .expect(MUTEX_POISONED)
437 .values()
438 .cloned()
439 .collect();
440 ws.cache_instruments(instruments);
441
442 ws.connect()
443 .await
444 .context("failed to connect OKX business websocket")?;
445 ws.wait_until_active(10.0)
446 .await
447 .context("business websocket did not become active")?;
448
449 let stream = ws.stream();
450 let sender = self.data_sender.clone();
451 let insts = self.instruments.clone();
452 let cancel = self.cancellation_token.clone();
453 let handle = tokio::spawn(async move {
454 pin_mut!(stream);
455 loop {
456 tokio::select! {
457 Some(message) = stream.next() => {
458 Self::handle_ws_message(message, &sender, &insts);
459 }
460 _ = cancel.cancelled() => {
461 tracing::debug!("Business websocket stream task cancelled");
462 break;
463 }
464 }
465 }
466 });
467 self.tasks.push(handle);
468 }
469
470 self.is_connected.store(true, Ordering::Release);
471 tracing::info!(client_id = %self.client_id, "Connected");
472 Ok(())
473 }
474
475 async fn disconnect(&mut self) -> anyhow::Result<()> {
476 if self.is_disconnected() {
477 return Ok(());
478 }
479
480 self.cancellation_token.cancel();
481
482 if let Some(ref ws) = self.ws_public
483 && let Err(e) = ws.unsubscribe_all().await
484 {
485 tracing::warn!("Failed to unsubscribe all from public websocket: {e:?}");
486 }
487 if let Some(ref ws) = self.ws_business
488 && let Err(e) = ws.unsubscribe_all().await
489 {
490 tracing::warn!("Failed to unsubscribe all from business websocket: {e:?}");
491 }
492
493 tokio::time::sleep(Duration::from_millis(500)).await;
495
496 if let Some(ref mut ws) = self.ws_public {
497 let _ = ws.close().await;
498 }
499 if let Some(ref mut ws) = self.ws_business {
500 let _ = ws.close().await;
501 }
502
503 let handles: Vec<_> = self.tasks.drain(..).collect();
504 for handle in handles {
505 if let Err(e) = handle.await {
506 tracing::error!("Error joining websocket task: {e}");
507 }
508 }
509
510 self.book_channels.write().expect(MUTEX_POISONED).clear();
511 self.is_connected.store(false, Ordering::Release);
512 tracing::info!(client_id = %self.client_id, "Disconnected");
513 Ok(())
514 }
515
516 fn is_connected(&self) -> bool {
517 self.is_connected.load(Ordering::Relaxed)
518 }
519
520 fn is_disconnected(&self) -> bool {
521 !self.is_connected()
522 }
523
524 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
525 for inst_type in &self.config.instrument_types {
526 let ws = self.public_ws()?.clone();
527 let inst_type = *inst_type;
528
529 self.spawn_ws(
530 async move {
531 ws.subscribe_instruments(inst_type)
532 .await
533 .context("instruments subscription")?;
534 Ok(())
535 },
536 "subscribe_instruments",
537 );
538 }
539 Ok(())
540 }
541
542 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
543 let instrument_id = cmd.instrument_id;
546 let ws = self.public_ws()?.clone();
547
548 self.spawn_ws(
549 async move {
550 ws.subscribe_instrument(instrument_id)
551 .await
552 .context("instrument type subscription")?;
553 Ok(())
554 },
555 "subscribe_instrument",
556 );
557 Ok(())
558 }
559
560 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
561 if cmd.book_type != BookType::L2_MBP {
562 anyhow::bail!("OKX only supports L2_MBP order book deltas");
563 }
564
565 let depth = cmd.depth.map_or(0, |d| d.get());
566 if !matches!(depth, 0 | 50 | 400) {
567 anyhow::bail!("invalid depth {depth}; valid values are 50 or 400");
568 }
569
570 let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
571 let channel = match depth {
572 50 => {
573 if vip < OKXVipLevel::Vip4 {
574 anyhow::bail!(
575 "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
576 );
577 }
578 OKXBookChannel::Books50L2Tbt
579 }
580 0 | 400 => {
581 if vip >= OKXVipLevel::Vip5 {
582 OKXBookChannel::BookL2Tbt
583 } else {
584 OKXBookChannel::Book
585 }
586 }
587 _ => unreachable!(),
588 };
589
590 let instrument_id = cmd.instrument_id;
591 let ws = self.public_ws()?.clone();
592 let book_channels = Arc::clone(&self.book_channels);
593
594 self.spawn_ws(
595 async move {
596 match channel {
597 OKXBookChannel::Books50L2Tbt => ws
598 .subscribe_book50_l2_tbt(instrument_id)
599 .await
600 .context("books50-l2-tbt subscription")?,
601 OKXBookChannel::BookL2Tbt => ws
602 .subscribe_book_l2_tbt(instrument_id)
603 .await
604 .context("books-l2-tbt subscription")?,
605 OKXBookChannel::Book => ws
606 .subscribe_books_channel(instrument_id)
607 .await
608 .context("books subscription")?,
609 }
610 book_channels
611 .write()
612 .expect("book channel cache lock poisoned")
613 .insert(instrument_id, channel);
614 Ok(())
615 },
616 "order book delta subscription",
617 );
618
619 Ok(())
620 }
621
622 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
623 if cmd.book_type != BookType::L2_MBP {
624 anyhow::bail!("OKX only supports L2_MBP order book snapshots");
625 }
626 let depth = cmd.depth.map_or(5, |d| d.get());
627 if depth != 5 {
628 anyhow::bail!("OKX only supports depth=5 snapshots");
629 }
630
631 let ws = self.public_ws()?.clone();
632 let instrument_id = cmd.instrument_id;
633
634 self.spawn_ws(
635 async move {
636 ws.subscribe_book_depth5(instrument_id)
637 .await
638 .context("books5 subscription")
639 },
640 "order book snapshot subscription",
641 );
642 Ok(())
643 }
644
645 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
646 let ws = self.public_ws()?.clone();
647 let instrument_id = cmd.instrument_id;
648
649 self.spawn_ws(
650 async move {
651 ws.subscribe_quotes(instrument_id)
652 .await
653 .context("quotes subscription")
654 },
655 "quote subscription",
656 );
657 Ok(())
658 }
659
660 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
661 let ws = self.public_ws()?.clone();
662 let instrument_id = cmd.instrument_id;
663
664 self.spawn_ws(
665 async move {
666 ws.subscribe_trades(instrument_id, false)
667 .await
668 .context("trades subscription")
669 },
670 "trade subscription",
671 );
672 Ok(())
673 }
674
675 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
676 let ws = self.public_ws()?.clone();
677 let instrument_id = cmd.instrument_id;
678
679 self.spawn_ws(
680 async move {
681 ws.subscribe_mark_prices(instrument_id)
682 .await
683 .context("mark price subscription")
684 },
685 "mark price subscription",
686 );
687 Ok(())
688 }
689
690 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
691 let ws = self.public_ws()?.clone();
692 let instrument_id = cmd.instrument_id;
693
694 self.spawn_ws(
695 async move {
696 ws.subscribe_index_prices(instrument_id)
697 .await
698 .context("index price subscription")
699 },
700 "index price subscription",
701 );
702 Ok(())
703 }
704
705 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
706 let ws = self.public_ws()?.clone();
707 let instrument_id = cmd.instrument_id;
708
709 self.spawn_ws(
710 async move {
711 ws.subscribe_funding_rates(instrument_id)
712 .await
713 .context("funding rate subscription")
714 },
715 "funding rate subscription",
716 );
717 Ok(())
718 }
719
720 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
721 let ws = self.business_ws()?.clone();
722 let bar_type = cmd.bar_type;
723
724 self.spawn_ws(
725 async move {
726 ws.subscribe_bars(bar_type)
727 .await
728 .context("bars subscription")
729 },
730 "bar subscription",
731 );
732 Ok(())
733 }
734
735 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
736 let ws = self.public_ws()?.clone();
737 let instrument_id = cmd.instrument_id;
738 let channel = self
739 .book_channels
740 .write()
741 .expect("book channel cache lock poisoned")
742 .remove(&instrument_id);
743
744 self.spawn_ws(
745 async move {
746 match channel {
747 Some(OKXBookChannel::Books50L2Tbt) => ws
748 .unsubscribe_book50_l2_tbt(instrument_id)
749 .await
750 .context("books50-l2-tbt unsubscribe")?,
751 Some(OKXBookChannel::BookL2Tbt) => ws
752 .unsubscribe_book_l2_tbt(instrument_id)
753 .await
754 .context("books-l2-tbt unsubscribe")?,
755 Some(OKXBookChannel::Book) => ws
756 .unsubscribe_book(instrument_id)
757 .await
758 .context("book unsubscribe")?,
759 None => {
760 tracing::warn!(
761 "Book channel not found for {instrument_id}; unsubscribing fallback channel"
762 );
763 ws.unsubscribe_book(instrument_id)
764 .await
765 .context("book fallback unsubscribe")?;
766 }
767 }
768 Ok(())
769 },
770 "order book unsubscribe",
771 );
772 Ok(())
773 }
774
775 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
776 let ws = self.public_ws()?.clone();
777 let instrument_id = cmd.instrument_id;
778
779 self.spawn_ws(
780 async move {
781 ws.unsubscribe_book_depth5(instrument_id)
782 .await
783 .context("book depth5 unsubscribe")
784 },
785 "order book snapshot unsubscribe",
786 );
787 Ok(())
788 }
789
790 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
791 let ws = self.public_ws()?.clone();
792 let instrument_id = cmd.instrument_id;
793
794 self.spawn_ws(
795 async move {
796 ws.unsubscribe_quotes(instrument_id)
797 .await
798 .context("quotes unsubscribe")
799 },
800 "quote unsubscribe",
801 );
802 Ok(())
803 }
804
805 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
806 let ws = self.public_ws()?.clone();
807 let instrument_id = cmd.instrument_id;
808
809 self.spawn_ws(
810 async move {
811 ws.unsubscribe_trades(instrument_id, false) .await
813 .context("trades unsubscribe")
814 },
815 "trade unsubscribe",
816 );
817 Ok(())
818 }
819
820 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
821 let ws = self.public_ws()?.clone();
822 let instrument_id = cmd.instrument_id;
823
824 self.spawn_ws(
825 async move {
826 ws.unsubscribe_mark_prices(instrument_id)
827 .await
828 .context("mark price unsubscribe")
829 },
830 "mark price unsubscribe",
831 );
832 Ok(())
833 }
834
835 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
836 let ws = self.public_ws()?.clone();
837 let instrument_id = cmd.instrument_id;
838
839 self.spawn_ws(
840 async move {
841 ws.unsubscribe_index_prices(instrument_id)
842 .await
843 .context("index price unsubscribe")
844 },
845 "index price unsubscribe",
846 );
847 Ok(())
848 }
849
850 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
851 let ws = self.public_ws()?.clone();
852 let instrument_id = cmd.instrument_id;
853
854 self.spawn_ws(
855 async move {
856 ws.unsubscribe_funding_rates(instrument_id)
857 .await
858 .context("funding rate unsubscribe")
859 },
860 "funding rate unsubscribe",
861 );
862 Ok(())
863 }
864
865 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
866 let ws = self.business_ws()?.clone();
867 let bar_type = cmd.bar_type;
868
869 self.spawn_ws(
870 async move {
871 ws.unsubscribe_bars(bar_type)
872 .await
873 .context("bars unsubscribe")
874 },
875 "bar unsubscribe",
876 );
877 Ok(())
878 }
879
880 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
881 let http = self.http_client.clone();
882 let sender = self.data_sender.clone();
883 let instruments_cache = self.instruments.clone();
884 let request_id = request.request_id;
885 let client_id = request.client_id.unwrap_or(self.client_id);
886 let venue = self.venue();
887 let start = request.start;
888 let end = request.end;
889 let params = request.params.clone();
890 let clock = self.clock;
891 let start_nanos = datetime_to_unix_nanos(start);
892 let end_nanos = datetime_to_unix_nanos(end);
893 let instrument_types = if self.config.instrument_types.is_empty() {
894 vec![OKXInstrumentType::Spot]
895 } else {
896 self.config.instrument_types.clone()
897 };
898 let contract_types = self.config.contract_types.clone();
899 let instrument_families = self.config.instrument_families.clone();
900
901 tokio::spawn(async move {
902 let mut all_instruments = Vec::new();
903
904 for inst_type in instrument_types {
905 let supports_family = matches!(
906 inst_type,
907 OKXInstrumentType::Futures
908 | OKXInstrumentType::Swap
909 | OKXInstrumentType::Option
910 );
911
912 let families = match (&instrument_families, inst_type, supports_family) {
913 (Some(families), OKXInstrumentType::Option, true) => families.clone(),
914 (Some(families), _, true) => families.clone(),
915 (None, OKXInstrumentType::Option, _) => {
916 tracing::warn!(
917 "Skipping OPTION type: instrument_families required but not configured"
918 );
919 continue;
920 }
921 _ => vec![],
922 };
923
924 if families.is_empty() {
925 match http.request_instruments(inst_type, None).await {
926 Ok(instruments) => {
927 for instrument in instruments {
928 if !contract_filter_with_config_types(
929 contract_types.as_ref(),
930 &instrument,
931 ) {
932 continue;
933 }
934
935 upsert_instrument(&instruments_cache, instrument.clone());
936 all_instruments.push(instrument);
937 }
938 }
939 Err(e) => {
940 tracing::error!("Failed to fetch instruments for {inst_type:?}: {e:?}");
941 }
942 }
943 } else {
944 for family in families {
945 match http
946 .request_instruments(inst_type, Some(family.clone()))
947 .await
948 {
949 Ok(instruments) => {
950 for instrument in instruments {
951 if !contract_filter_with_config_types(
952 contract_types.as_ref(),
953 &instrument,
954 ) {
955 continue;
956 }
957
958 upsert_instrument(&instruments_cache, instrument.clone());
959 all_instruments.push(instrument);
960 }
961 }
962 Err(e) => {
963 tracing::error!(
964 "Failed to fetch instruments for {inst_type:?} family {family}: {e:?}"
965 );
966 }
967 }
968 }
969 }
970 }
971
972 let response = DataResponse::Instruments(InstrumentsResponse::new(
973 request_id,
974 client_id,
975 venue,
976 all_instruments,
977 start_nanos,
978 end_nanos,
979 clock.get_time_ns(),
980 params,
981 ));
982
983 if let Err(e) = sender.send(DataEvent::Response(response)) {
984 tracing::error!("Failed to send instruments response: {e}");
985 }
986 });
987
988 Ok(())
989 }
990
991 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
992 let http = self.http_client.clone();
993 let sender = self.data_sender.clone();
994 let instruments = self.instruments.clone();
995 let instrument_id = request.instrument_id;
996 let request_id = request.request_id;
997 let client_id = request.client_id.unwrap_or(self.client_id);
998 let start = request.start;
999 let end = request.end;
1000 let params = request.params.clone();
1001 let clock = self.clock;
1002 let start_nanos = datetime_to_unix_nanos(start);
1003 let end_nanos = datetime_to_unix_nanos(end);
1004 let instrument_types = if self.config.instrument_types.is_empty() {
1005 vec![OKXInstrumentType::Spot]
1006 } else {
1007 self.config.instrument_types.clone()
1008 };
1009 let contract_types = self.config.contract_types.clone();
1010
1011 tokio::spawn(async move {
1012 match http
1013 .request_instrument(instrument_id)
1014 .await
1015 .context("fetch instrument from API")
1016 {
1017 Ok(instrument) => {
1018 let inst_id = instrument.id();
1019 let symbol = inst_id.symbol.as_str();
1020 let inst_type = okx_instrument_type_from_symbol(symbol);
1021 if !instrument_types.contains(&inst_type) {
1022 tracing::error!(
1023 "Instrument {instrument_id} type {inst_type:?} not in configured types {instrument_types:?}"
1024 );
1025 return;
1026 }
1027
1028 if !contract_filter_with_config_types(contract_types.as_ref(), &instrument) {
1029 tracing::error!(
1030 "Instrument {instrument_id} filtered out by contract_types config"
1031 );
1032 return;
1033 }
1034
1035 upsert_instrument(&instruments, instrument.clone());
1036
1037 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1038 request_id,
1039 client_id,
1040 instrument.id(),
1041 instrument,
1042 start_nanos,
1043 end_nanos,
1044 clock.get_time_ns(),
1045 params,
1046 )));
1047
1048 if let Err(e) = sender.send(DataEvent::Response(response)) {
1049 tracing::error!("Failed to send instrument response: {e}");
1050 }
1051 }
1052 Err(e) => tracing::error!("Instrument request failed: {e:?}"),
1053 }
1054 });
1055
1056 Ok(())
1057 }
1058
1059 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
1060 let http = self.http_client.clone();
1061 let sender = self.data_sender.clone();
1062 let instrument_id = request.instrument_id;
1063 let start = request.start;
1064 let end = request.end;
1065 let limit = request.limit.map(|n| n.get() as u32);
1066 let request_id = request.request_id;
1067 let client_id = request.client_id.unwrap_or(self.client_id);
1068 let params = request.params.clone();
1069 let clock = self.clock;
1070 let start_nanos = datetime_to_unix_nanos(start);
1071 let end_nanos = datetime_to_unix_nanos(end);
1072
1073 tokio::spawn(async move {
1074 match http
1075 .request_trades(instrument_id, start, end, limit)
1076 .await
1077 .context("failed to request trades from OKX")
1078 {
1079 Ok(trades) => {
1080 let response = DataResponse::Trades(TradesResponse::new(
1081 request_id,
1082 client_id,
1083 instrument_id,
1084 trades,
1085 start_nanos,
1086 end_nanos,
1087 clock.get_time_ns(),
1088 params,
1089 ));
1090 if let Err(e) = sender.send(DataEvent::Response(response)) {
1091 tracing::error!("Failed to send trades response: {e}");
1092 }
1093 }
1094 Err(e) => tracing::error!("Trade request failed: {e:?}"),
1095 }
1096 });
1097
1098 Ok(())
1099 }
1100
1101 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1102 let http = self.http_client.clone();
1103 let sender = self.data_sender.clone();
1104 let bar_type = request.bar_type;
1105 let start = request.start;
1106 let end = request.end;
1107 let limit = request.limit.map(|n| n.get() as u32);
1108 let request_id = request.request_id;
1109 let client_id = request.client_id.unwrap_or(self.client_id);
1110 let params = request.params.clone();
1111 let clock = self.clock;
1112 let start_nanos = datetime_to_unix_nanos(start);
1113 let end_nanos = datetime_to_unix_nanos(end);
1114
1115 tokio::spawn(async move {
1116 match http
1117 .request_bars(bar_type, start, end, limit)
1118 .await
1119 .context("failed to request bars from OKX")
1120 {
1121 Ok(bars) => {
1122 let response = DataResponse::Bars(BarsResponse::new(
1123 request_id,
1124 client_id,
1125 bar_type,
1126 bars,
1127 start_nanos,
1128 end_nanos,
1129 clock.get_time_ns(),
1130 params,
1131 ));
1132 if let Err(e) = sender.send(DataEvent::Response(response)) {
1133 tracing::error!("Failed to send bars response: {e}");
1134 }
1135 }
1136 Err(e) => tracing::error!("Bar request failed: {e:?}"),
1137 }
1138 });
1139
1140 Ok(())
1141 }
1142}