1use std::sync::{
19 Arc, RwLock,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27 clients::DataClient,
28 live::{runner::get_data_event_sender, runtime::get_runtime},
29 messages::{
30 DataEvent,
31 data::{
32 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
34 SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
35 SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
36 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeFundingRates,
37 UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
38 },
39 },
40};
41use nautilus_core::{
42 MUTEX_POISONED,
43 datetime::datetime_to_unix_nanos,
44 time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_model::{
47 data::{Data, FundingRateUpdate, OrderBookDeltas_API},
48 enums::BookType,
49 identifiers::{ClientId, InstrumentId, Venue},
50 instruments::{Instrument, InstrumentAny},
51};
52use tokio::{task::JoinHandle, time::Duration};
53use tokio_util::sync::CancellationToken;
54
55use crate::{
56 common::{
57 consts::OKX_VENUE,
58 enums::{OKXBookChannel, OKXContractType, OKXInstrumentType, OKXVipLevel},
59 parse::okx_instrument_type_from_symbol,
60 },
61 config::OKXDataClientConfig,
62 http::client::OKXHttpClient,
63 websocket::{client::OKXWebSocketClient, messages::NautilusWsMessage},
64};
65
66#[derive(Debug)]
67pub struct OKXDataClient {
68 client_id: ClientId,
69 config: OKXDataClientConfig,
70 http_client: OKXHttpClient,
71 ws_public: Option<OKXWebSocketClient>,
72 ws_business: Option<OKXWebSocketClient>,
73 is_connected: AtomicBool,
74 cancellation_token: CancellationToken,
75 tasks: Vec<JoinHandle<()>>,
76 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
77 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
78 book_channels: Arc<RwLock<AHashMap<InstrumentId, OKXBookChannel>>>,
79 clock: &'static AtomicTime,
80}
81
82impl OKXDataClient {
83 pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
89 let clock = get_atomic_clock_realtime();
90 let data_sender = get_data_event_sender();
91
92 let http_client = if config.has_api_credentials() {
93 OKXHttpClient::with_credentials(
94 config.api_key.clone(),
95 config.api_secret.clone(),
96 config.api_passphrase.clone(),
97 config.base_url_http.clone(),
98 config.http_timeout_secs,
99 config.max_retries,
100 config.retry_delay_initial_ms,
101 config.retry_delay_max_ms,
102 config.is_demo,
103 config.http_proxy_url.clone(),
104 )?
105 } else {
106 OKXHttpClient::new(
107 config.base_url_http.clone(),
108 config.http_timeout_secs,
109 config.max_retries,
110 config.retry_delay_initial_ms,
111 config.retry_delay_max_ms,
112 config.is_demo,
113 config.http_proxy_url.clone(),
114 )?
115 };
116
117 let ws_public = OKXWebSocketClient::new(
118 Some(config.ws_public_url()),
119 None,
120 None,
121 None,
122 None,
123 Some(20), )
125 .context("failed to construct OKX public websocket client")?;
126
127 let ws_business = if config.requires_business_ws() {
128 Some(
129 OKXWebSocketClient::new(
130 Some(config.ws_business_url()),
131 config.api_key.clone(),
132 config.api_secret.clone(),
133 config.api_passphrase.clone(),
134 None,
135 Some(20), )
137 .context("failed to construct OKX business websocket client")?,
138 )
139 } else {
140 None
141 };
142
143 if let Some(vip_level) = config.vip_level {
144 ws_public.set_vip_level(vip_level);
145 if let Some(ref ws) = ws_business {
146 ws.set_vip_level(vip_level);
147 }
148 }
149
150 Ok(Self {
151 client_id,
152 config,
153 http_client,
154 ws_public: Some(ws_public),
155 ws_business,
156 is_connected: AtomicBool::new(false),
157 cancellation_token: CancellationToken::new(),
158 tasks: Vec::new(),
159 data_sender,
160 instruments: Arc::new(RwLock::new(AHashMap::new())),
161 book_channels: Arc::new(RwLock::new(AHashMap::new())),
162 clock,
163 })
164 }
165
166 fn venue(&self) -> Venue {
167 *OKX_VENUE
168 }
169
170 fn vip_level(&self) -> Option<OKXVipLevel> {
171 self.ws_public.as_ref().map(|ws| ws.vip_level())
172 }
173
174 fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
175 self.ws_public
176 .as_ref()
177 .context("public websocket client not initialized")
178 }
179
180 fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
181 self.ws_business
182 .as_ref()
183 .context("business websocket client not available (credentials required)")
184 }
185
186 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
187 if let Err(e) = sender.send(DataEvent::Data(data)) {
188 log::error!("Failed to emit data event: {e}");
189 }
190 }
191
192 fn spawn_ws<F>(&self, fut: F, context: &'static str)
193 where
194 F: Future<Output = anyhow::Result<()>> + Send + 'static,
195 {
196 get_runtime().spawn(async move {
197 if let Err(e) = fut.await {
198 log::error!("{context}: {e:?}");
199 }
200 });
201 }
202
203 fn handle_ws_message(
204 message: NautilusWsMessage,
205 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
206 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
207 ) {
208 match message {
209 NautilusWsMessage::Data(payloads) => {
210 for data in payloads {
211 Self::send_data(data_sender, data);
212 }
213 }
214 NautilusWsMessage::Deltas(deltas) => {
215 Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
216 }
217 NautilusWsMessage::FundingRates(updates) => {
218 emit_funding_rates(updates);
219 }
220 NautilusWsMessage::Instrument(instrument) => {
221 upsert_instrument(instruments, *instrument);
222 }
223 NautilusWsMessage::AccountUpdate(_)
224 | NautilusWsMessage::PositionUpdate(_)
225 | NautilusWsMessage::OrderAccepted(_)
226 | NautilusWsMessage::OrderCanceled(_)
227 | NautilusWsMessage::OrderExpired(_)
228 | NautilusWsMessage::OrderRejected(_)
229 | NautilusWsMessage::OrderCancelRejected(_)
230 | NautilusWsMessage::OrderModifyRejected(_)
231 | NautilusWsMessage::OrderTriggered(_)
232 | NautilusWsMessage::OrderUpdated(_)
233 | NautilusWsMessage::ExecutionReports(_) => {
234 log::debug!("Ignoring trading message on data client");
235 }
236 NautilusWsMessage::Error(e) => {
237 log::error!("OKX websocket error: {e:?}");
238 }
239 NautilusWsMessage::Raw(value) => {
240 log::debug!("Unhandled websocket payload: {value:?}");
241 }
242 NautilusWsMessage::Reconnected => {
243 log::info!("Websocket reconnected");
244 }
245 NautilusWsMessage::Authenticated => {
246 log::debug!("Websocket authenticated");
247 }
248 }
249 }
250}
251
252fn emit_funding_rates(updates: Vec<FundingRateUpdate>) {
253 if updates.is_empty() {
254 return;
255 }
256
257 for update in updates {
258 log::debug!(
259 "Received funding rate update for {} but forwarding is not yet supported",
260 update.instrument_id
261 );
262 }
263}
264
265fn upsert_instrument(
266 cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
267 instrument: InstrumentAny,
268) {
269 let mut guard = cache.write().expect(MUTEX_POISONED);
270 guard.insert(instrument.id(), instrument);
271}
272
273fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
274 contract_filter_with_config_types(config.contract_types.as_ref(), instrument)
275}
276
277fn contract_filter_with_config_types(
278 contract_types: Option<&Vec<OKXContractType>>,
279 instrument: &InstrumentAny,
280) -> bool {
281 match contract_types {
282 None => true,
283 Some(filter) if filter.is_empty() => true,
284 Some(filter) => {
285 let is_inverse = instrument.is_inverse();
286 (is_inverse && filter.contains(&OKXContractType::Inverse))
287 || (!is_inverse && filter.contains(&OKXContractType::Linear))
288 }
289 }
290}
291
292#[async_trait::async_trait(?Send)]
293impl DataClient for OKXDataClient {
294 fn client_id(&self) -> ClientId {
295 self.client_id
296 }
297
298 fn venue(&self) -> Option<Venue> {
299 Some(self.venue())
300 }
301
302 fn start(&mut self) -> anyhow::Result<()> {
303 log::info!(
304 "Started: client_id={}, vip_level={:?}, instrument_types={:?}, is_demo={}, http_proxy_url={:?}, ws_proxy_url={:?}",
305 self.client_id,
306 self.vip_level(),
307 self.config.instrument_types,
308 self.config.is_demo,
309 self.config.http_proxy_url,
310 self.config.ws_proxy_url,
311 );
312 Ok(())
313 }
314
315 fn stop(&mut self) -> anyhow::Result<()> {
316 log::info!("Stopping {id}", id = self.client_id);
317 self.cancellation_token.cancel();
318 self.is_connected.store(false, Ordering::Relaxed);
319 Ok(())
320 }
321
322 fn reset(&mut self) -> anyhow::Result<()> {
323 log::debug!("Resetting {id}", id = self.client_id);
324 self.is_connected.store(false, Ordering::Relaxed);
325 self.cancellation_token = CancellationToken::new();
326 self.tasks.clear();
327 self.book_channels
328 .write()
329 .expect("book channel cache lock poisoned")
330 .clear();
331 Ok(())
332 }
333
334 fn dispose(&mut self) -> anyhow::Result<()> {
335 log::debug!("Disposing {id}", id = self.client_id);
336 self.stop()
337 }
338
339 async fn connect(&mut self) -> anyhow::Result<()> {
340 if self.is_connected() {
341 return Ok(());
342 }
343
344 let instrument_types = if self.config.instrument_types.is_empty() {
345 vec![OKXInstrumentType::Spot]
346 } else {
347 self.config.instrument_types.clone()
348 };
349
350 let mut all_instruments = Vec::new();
351 for inst_type in &instrument_types {
352 let mut fetched = self
353 .http_client
354 .request_instruments(*inst_type, None)
355 .await
356 .with_context(|| format!("failed to request OKX instruments for {inst_type:?}"))?;
357
358 fetched.retain(|instrument| contract_filter_with_config(&self.config, instrument));
359 self.http_client.cache_instruments(fetched.clone());
360
361 let mut guard = self.instruments.write().expect(MUTEX_POISONED);
362 for instrument in &fetched {
363 guard.insert(instrument.id(), instrument.clone());
364 }
365 drop(guard);
366
367 all_instruments.extend(fetched);
368 }
369
370 for instrument in all_instruments {
371 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
372 log::warn!("Failed to send instrument: {e}");
373 }
374 }
375
376 if let Some(ref mut ws) = self.ws_public {
377 let instruments: Vec<_> = self
379 .instruments
380 .read()
381 .expect(MUTEX_POISONED)
382 .values()
383 .cloned()
384 .collect();
385 ws.cache_instruments(instruments);
386
387 ws.connect()
388 .await
389 .context("failed to connect OKX public websocket")?;
390 ws.wait_until_active(10.0)
391 .await
392 .context("public websocket did not become active")?;
393
394 let stream = ws.stream();
395 let sender = self.data_sender.clone();
396 let insts = self.instruments.clone();
397 let cancel = self.cancellation_token.clone();
398 let handle = get_runtime().spawn(async move {
399 pin_mut!(stream);
400 loop {
401 tokio::select! {
402 Some(message) = stream.next() => {
403 Self::handle_ws_message(message, &sender, &insts);
404 }
405 () = cancel.cancelled() => {
406 log::debug!("Public websocket stream task cancelled");
407 break;
408 }
409 }
410 }
411 });
412 self.tasks.push(handle);
413
414 for inst_type in &instrument_types {
415 ws.subscribe_instruments(*inst_type)
416 .await
417 .with_context(|| {
418 format!("failed to subscribe to instrument type {inst_type:?}")
419 })?;
420 }
421 }
422
423 if let Some(ref mut ws) = self.ws_business {
424 let instruments: Vec<_> = self
426 .instruments
427 .read()
428 .expect(MUTEX_POISONED)
429 .values()
430 .cloned()
431 .collect();
432 ws.cache_instruments(instruments);
433
434 ws.connect()
435 .await
436 .context("failed to connect OKX business websocket")?;
437 ws.wait_until_active(10.0)
438 .await
439 .context("business websocket did not become active")?;
440
441 let stream = ws.stream();
442 let sender = self.data_sender.clone();
443 let insts = self.instruments.clone();
444 let cancel = self.cancellation_token.clone();
445 let handle = get_runtime().spawn(async move {
446 pin_mut!(stream);
447 loop {
448 tokio::select! {
449 Some(message) = stream.next() => {
450 Self::handle_ws_message(message, &sender, &insts);
451 }
452 () = cancel.cancelled() => {
453 log::debug!("Business websocket stream task cancelled");
454 break;
455 }
456 }
457 }
458 });
459 self.tasks.push(handle);
460 }
461
462 self.is_connected.store(true, Ordering::Release);
463 log::info!("Connected: client_id={}", self.client_id);
464 Ok(())
465 }
466
467 async fn disconnect(&mut self) -> anyhow::Result<()> {
468 if self.is_disconnected() {
469 return Ok(());
470 }
471
472 self.cancellation_token.cancel();
473
474 if let Some(ref ws) = self.ws_public
475 && let Err(e) = ws.unsubscribe_all().await
476 {
477 log::warn!("Failed to unsubscribe all from public websocket: {e:?}");
478 }
479 if let Some(ref ws) = self.ws_business
480 && let Err(e) = ws.unsubscribe_all().await
481 {
482 log::warn!("Failed to unsubscribe all from business websocket: {e:?}");
483 }
484
485 tokio::time::sleep(Duration::from_millis(500)).await;
487
488 if let Some(ref mut ws) = self.ws_public {
489 let _ = ws.close().await;
490 }
491 if let Some(ref mut ws) = self.ws_business {
492 let _ = ws.close().await;
493 }
494
495 let handles: Vec<_> = self.tasks.drain(..).collect();
496 for handle in handles {
497 if let Err(e) = handle.await {
498 log::error!("Error joining websocket task: {e}");
499 }
500 }
501
502 self.book_channels.write().expect(MUTEX_POISONED).clear();
503 self.is_connected.store(false, Ordering::Release);
504 log::info!("Disconnected: client_id={}", self.client_id);
505 Ok(())
506 }
507
508 fn is_connected(&self) -> bool {
509 self.is_connected.load(Ordering::Relaxed)
510 }
511
512 fn is_disconnected(&self) -> bool {
513 !self.is_connected()
514 }
515
516 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
517 for inst_type in &self.config.instrument_types {
518 let ws = self.public_ws()?.clone();
519 let inst_type = *inst_type;
520
521 self.spawn_ws(
522 async move {
523 ws.subscribe_instruments(inst_type)
524 .await
525 .context("instruments subscription")?;
526 Ok(())
527 },
528 "subscribe_instruments",
529 );
530 }
531 Ok(())
532 }
533
534 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
535 let instrument_id = cmd.instrument_id;
538 let ws = self.public_ws()?.clone();
539
540 self.spawn_ws(
541 async move {
542 ws.subscribe_instrument(instrument_id)
543 .await
544 .context("instrument type subscription")?;
545 Ok(())
546 },
547 "subscribe_instrument",
548 );
549 Ok(())
550 }
551
552 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
553 if cmd.book_type != BookType::L2_MBP {
554 anyhow::bail!("OKX only supports L2_MBP order book deltas");
555 }
556
557 let depth = cmd.depth.map_or(0, |d| d.get());
558 if !matches!(depth, 0 | 50 | 400) {
559 anyhow::bail!("invalid depth {depth}; valid values are 50 or 400");
560 }
561
562 let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
563 let channel = match depth {
564 50 => {
565 if vip < OKXVipLevel::Vip4 {
566 anyhow::bail!(
567 "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
568 );
569 }
570 OKXBookChannel::Books50L2Tbt
571 }
572 0 | 400 => {
573 if vip >= OKXVipLevel::Vip5 {
574 OKXBookChannel::BookL2Tbt
575 } else {
576 OKXBookChannel::Book
577 }
578 }
579 _ => unreachable!(),
580 };
581
582 let instrument_id = cmd.instrument_id;
583 let ws = self.public_ws()?.clone();
584 let book_channels = Arc::clone(&self.book_channels);
585
586 self.spawn_ws(
587 async move {
588 match channel {
589 OKXBookChannel::Books50L2Tbt => ws
590 .subscribe_book50_l2_tbt(instrument_id)
591 .await
592 .context("books50-l2-tbt subscription")?,
593 OKXBookChannel::BookL2Tbt => ws
594 .subscribe_book_l2_tbt(instrument_id)
595 .await
596 .context("books-l2-tbt subscription")?,
597 OKXBookChannel::Book => ws
598 .subscribe_books_channel(instrument_id)
599 .await
600 .context("books subscription")?,
601 }
602 book_channels
603 .write()
604 .expect("book channel cache lock poisoned")
605 .insert(instrument_id, channel);
606 Ok(())
607 },
608 "order book delta subscription",
609 );
610
611 Ok(())
612 }
613
614 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
615 let ws = self.public_ws()?.clone();
616 let instrument_id = cmd.instrument_id;
617
618 self.spawn_ws(
619 async move {
620 ws.subscribe_quotes(instrument_id)
621 .await
622 .context("quotes subscription")
623 },
624 "quote subscription",
625 );
626 Ok(())
627 }
628
629 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
630 let ws = self.public_ws()?.clone();
631 let instrument_id = cmd.instrument_id;
632
633 self.spawn_ws(
634 async move {
635 ws.subscribe_trades(instrument_id, false)
636 .await
637 .context("trades subscription")
638 },
639 "trade subscription",
640 );
641 Ok(())
642 }
643
644 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
645 let ws = self.public_ws()?.clone();
646 let instrument_id = cmd.instrument_id;
647
648 self.spawn_ws(
649 async move {
650 ws.subscribe_mark_prices(instrument_id)
651 .await
652 .context("mark price subscription")
653 },
654 "mark price subscription",
655 );
656 Ok(())
657 }
658
659 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
660 let ws = self.public_ws()?.clone();
661 let instrument_id = cmd.instrument_id;
662
663 self.spawn_ws(
664 async move {
665 ws.subscribe_index_prices(instrument_id)
666 .await
667 .context("index price subscription")
668 },
669 "index price subscription",
670 );
671 Ok(())
672 }
673
674 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
675 let ws = self.public_ws()?.clone();
676 let instrument_id = cmd.instrument_id;
677
678 self.spawn_ws(
679 async move {
680 ws.subscribe_funding_rates(instrument_id)
681 .await
682 .context("funding rate subscription")
683 },
684 "funding rate subscription",
685 );
686 Ok(())
687 }
688
689 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
690 let ws = self.business_ws()?.clone();
691 let bar_type = cmd.bar_type;
692
693 self.spawn_ws(
694 async move {
695 ws.subscribe_bars(bar_type)
696 .await
697 .context("bars subscription")
698 },
699 "bar subscription",
700 );
701 Ok(())
702 }
703
704 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
705 let ws = self.public_ws()?.clone();
706 let instrument_id = cmd.instrument_id;
707 let channel = self
708 .book_channels
709 .write()
710 .expect("book channel cache lock poisoned")
711 .remove(&instrument_id);
712
713 self.spawn_ws(
714 async move {
715 match channel {
716 Some(OKXBookChannel::Books50L2Tbt) => ws
717 .unsubscribe_book50_l2_tbt(instrument_id)
718 .await
719 .context("books50-l2-tbt unsubscribe")?,
720 Some(OKXBookChannel::BookL2Tbt) => ws
721 .unsubscribe_book_l2_tbt(instrument_id)
722 .await
723 .context("books-l2-tbt unsubscribe")?,
724 Some(OKXBookChannel::Book) => ws
725 .unsubscribe_book(instrument_id)
726 .await
727 .context("book unsubscribe")?,
728 None => {
729 log::warn!(
730 "Book channel not found for {instrument_id}; unsubscribing fallback channel"
731 );
732 ws.unsubscribe_book(instrument_id)
733 .await
734 .context("book fallback unsubscribe")?;
735 }
736 }
737 Ok(())
738 },
739 "order book unsubscribe",
740 );
741 Ok(())
742 }
743
744 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
745 let ws = self.public_ws()?.clone();
746 let instrument_id = cmd.instrument_id;
747
748 self.spawn_ws(
749 async move {
750 ws.unsubscribe_quotes(instrument_id)
751 .await
752 .context("quotes unsubscribe")
753 },
754 "quote unsubscribe",
755 );
756 Ok(())
757 }
758
759 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
760 let ws = self.public_ws()?.clone();
761 let instrument_id = cmd.instrument_id;
762
763 self.spawn_ws(
764 async move {
765 ws.unsubscribe_trades(instrument_id, false) .await
767 .context("trades unsubscribe")
768 },
769 "trade unsubscribe",
770 );
771 Ok(())
772 }
773
774 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
775 let ws = self.public_ws()?.clone();
776 let instrument_id = cmd.instrument_id;
777
778 self.spawn_ws(
779 async move {
780 ws.unsubscribe_mark_prices(instrument_id)
781 .await
782 .context("mark price unsubscribe")
783 },
784 "mark price unsubscribe",
785 );
786 Ok(())
787 }
788
789 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
790 let ws = self.public_ws()?.clone();
791 let instrument_id = cmd.instrument_id;
792
793 self.spawn_ws(
794 async move {
795 ws.unsubscribe_index_prices(instrument_id)
796 .await
797 .context("index price unsubscribe")
798 },
799 "index price unsubscribe",
800 );
801 Ok(())
802 }
803
804 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
805 let ws = self.public_ws()?.clone();
806 let instrument_id = cmd.instrument_id;
807
808 self.spawn_ws(
809 async move {
810 ws.unsubscribe_funding_rates(instrument_id)
811 .await
812 .context("funding rate unsubscribe")
813 },
814 "funding rate unsubscribe",
815 );
816 Ok(())
817 }
818
819 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
820 let ws = self.business_ws()?.clone();
821 let bar_type = cmd.bar_type;
822
823 self.spawn_ws(
824 async move {
825 ws.unsubscribe_bars(bar_type)
826 .await
827 .context("bars unsubscribe")
828 },
829 "bar unsubscribe",
830 );
831 Ok(())
832 }
833
834 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
835 let http = self.http_client.clone();
836 let sender = self.data_sender.clone();
837 let instruments_cache = self.instruments.clone();
838 let request_id = request.request_id;
839 let client_id = request.client_id.unwrap_or(self.client_id);
840 let venue = self.venue();
841 let start = request.start;
842 let end = request.end;
843 let params = request.params.clone();
844 let clock = self.clock;
845 let start_nanos = datetime_to_unix_nanos(start);
846 let end_nanos = datetime_to_unix_nanos(end);
847 let instrument_types = if self.config.instrument_types.is_empty() {
848 vec![OKXInstrumentType::Spot]
849 } else {
850 self.config.instrument_types.clone()
851 };
852 let contract_types = self.config.contract_types.clone();
853 let instrument_families = self.config.instrument_families.clone();
854
855 get_runtime().spawn(async move {
856 let mut all_instruments = Vec::new();
857
858 for inst_type in instrument_types {
859 let supports_family = matches!(
860 inst_type,
861 OKXInstrumentType::Futures
862 | OKXInstrumentType::Swap
863 | OKXInstrumentType::Option
864 );
865
866 let families = match (&instrument_families, inst_type, supports_family) {
867 (Some(families), OKXInstrumentType::Option, true) => families.clone(),
868 (Some(families), _, true) => families.clone(),
869 (None, OKXInstrumentType::Option, _) => {
870 log::warn!(
871 "Skipping OPTION type: instrument_families required but not configured"
872 );
873 continue;
874 }
875 _ => vec![],
876 };
877
878 if families.is_empty() {
879 match http.request_instruments(inst_type, None).await {
880 Ok(instruments) => {
881 for instrument in instruments {
882 if !contract_filter_with_config_types(
883 contract_types.as_ref(),
884 &instrument,
885 ) {
886 continue;
887 }
888
889 upsert_instrument(&instruments_cache, instrument.clone());
890 all_instruments.push(instrument);
891 }
892 }
893 Err(e) => {
894 log::error!("Failed to fetch instruments for {inst_type:?}: {e:?}");
895 }
896 }
897 } else {
898 for family in families {
899 match http
900 .request_instruments(inst_type, Some(family.clone()))
901 .await
902 {
903 Ok(instruments) => {
904 for instrument in instruments {
905 if !contract_filter_with_config_types(
906 contract_types.as_ref(),
907 &instrument,
908 ) {
909 continue;
910 }
911
912 upsert_instrument(&instruments_cache, instrument.clone());
913 all_instruments.push(instrument);
914 }
915 }
916 Err(e) => {
917 log::error!(
918 "Failed to fetch instruments for {inst_type:?} family {family}: {e:?}"
919 );
920 }
921 }
922 }
923 }
924 }
925
926 let response = DataResponse::Instruments(InstrumentsResponse::new(
927 request_id,
928 client_id,
929 venue,
930 all_instruments,
931 start_nanos,
932 end_nanos,
933 clock.get_time_ns(),
934 params,
935 ));
936
937 if let Err(e) = sender.send(DataEvent::Response(response)) {
938 log::error!("Failed to send instruments response: {e}");
939 }
940 });
941
942 Ok(())
943 }
944
945 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
946 let http = self.http_client.clone();
947 let sender = self.data_sender.clone();
948 let instruments = self.instruments.clone();
949 let instrument_id = request.instrument_id;
950 let request_id = request.request_id;
951 let client_id = request.client_id.unwrap_or(self.client_id);
952 let start = request.start;
953 let end = request.end;
954 let params = request.params.clone();
955 let clock = self.clock;
956 let start_nanos = datetime_to_unix_nanos(start);
957 let end_nanos = datetime_to_unix_nanos(end);
958 let instrument_types = if self.config.instrument_types.is_empty() {
959 vec![OKXInstrumentType::Spot]
960 } else {
961 self.config.instrument_types.clone()
962 };
963 let contract_types = self.config.contract_types.clone();
964
965 get_runtime().spawn(async move {
966 match http
967 .request_instrument(instrument_id)
968 .await
969 .context("fetch instrument from API")
970 {
971 Ok(instrument) => {
972 let inst_id = instrument.id();
973 let symbol = inst_id.symbol.as_str();
974 let inst_type = okx_instrument_type_from_symbol(symbol);
975 if !instrument_types.contains(&inst_type) {
976 log::error!(
977 "Instrument {instrument_id} type {inst_type:?} not in configured types {instrument_types:?}"
978 );
979 return;
980 }
981
982 if !contract_filter_with_config_types(contract_types.as_ref(), &instrument) {
983 log::error!(
984 "Instrument {instrument_id} filtered out by contract_types config"
985 );
986 return;
987 }
988
989 upsert_instrument(&instruments, instrument.clone());
990
991 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
992 request_id,
993 client_id,
994 instrument.id(),
995 instrument,
996 start_nanos,
997 end_nanos,
998 clock.get_time_ns(),
999 params,
1000 )));
1001
1002 if let Err(e) = sender.send(DataEvent::Response(response)) {
1003 log::error!("Failed to send instrument response: {e}");
1004 }
1005 }
1006 Err(e) => log::error!("Instrument request failed: {e:?}"),
1007 }
1008 });
1009
1010 Ok(())
1011 }
1012
1013 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
1014 let http = self.http_client.clone();
1015 let sender = self.data_sender.clone();
1016 let instrument_id = request.instrument_id;
1017 let start = request.start;
1018 let end = request.end;
1019 let limit = request.limit.map(|n| n.get() as u32);
1020 let request_id = request.request_id;
1021 let client_id = request.client_id.unwrap_or(self.client_id);
1022 let params = request.params.clone();
1023 let clock = self.clock;
1024 let start_nanos = datetime_to_unix_nanos(start);
1025 let end_nanos = datetime_to_unix_nanos(end);
1026
1027 get_runtime().spawn(async move {
1028 match http
1029 .request_trades(instrument_id, start, end, limit)
1030 .await
1031 .context("failed to request trades from OKX")
1032 {
1033 Ok(trades) => {
1034 let response = DataResponse::Trades(TradesResponse::new(
1035 request_id,
1036 client_id,
1037 instrument_id,
1038 trades,
1039 start_nanos,
1040 end_nanos,
1041 clock.get_time_ns(),
1042 params,
1043 ));
1044 if let Err(e) = sender.send(DataEvent::Response(response)) {
1045 log::error!("Failed to send trades response: {e}");
1046 }
1047 }
1048 Err(e) => log::error!("Trade request failed: {e:?}"),
1049 }
1050 });
1051
1052 Ok(())
1053 }
1054
1055 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1056 let http = self.http_client.clone();
1057 let sender = self.data_sender.clone();
1058 let bar_type = request.bar_type;
1059 let start = request.start;
1060 let end = request.end;
1061 let limit = request.limit.map(|n| n.get() as u32);
1062 let request_id = request.request_id;
1063 let client_id = request.client_id.unwrap_or(self.client_id);
1064 let params = request.params.clone();
1065 let clock = self.clock;
1066 let start_nanos = datetime_to_unix_nanos(start);
1067 let end_nanos = datetime_to_unix_nanos(end);
1068
1069 get_runtime().spawn(async move {
1070 match http
1071 .request_bars(bar_type, start, end, limit)
1072 .await
1073 .context("failed to request bars from OKX")
1074 {
1075 Ok(bars) => {
1076 let response = DataResponse::Bars(BarsResponse::new(
1077 request_id,
1078 client_id,
1079 bar_type,
1080 bars,
1081 start_nanos,
1082 end_nanos,
1083 clock.get_time_ns(),
1084 params,
1085 ));
1086 if let Err(e) = sender.send(DataEvent::Response(response)) {
1087 log::error!("Failed to send bars response: {e}");
1088 }
1089 }
1090 Err(e) => log::error!("Bar request failed: {e:?}"),
1091 }
1092 });
1093
1094 Ok(())
1095 }
1096}