1use std::sync::{
19 Arc, RwLock,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27 live::{runner::get_data_event_sender, runtime::get_runtime},
28 messages::{
29 DataEvent,
30 data::{
31 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
32 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
33 SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeFundingRates,
34 SubscribeIndexPrices, SubscribeInstrument, SubscribeInstruments, SubscribeMarkPrices,
35 SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
36 UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeFundingRates,
37 UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
38 },
39 },
40};
41use nautilus_core::{
42 MUTEX_POISONED,
43 datetime::datetime_to_unix_nanos,
44 time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_data::client::DataClient;
47use nautilus_model::{
48 data::{Data, FundingRateUpdate, OrderBookDeltas_API},
49 enums::BookType,
50 identifiers::{ClientId, InstrumentId, Venue},
51 instruments::{Instrument, InstrumentAny},
52};
53use tokio::{task::JoinHandle, time::Duration};
54use tokio_util::sync::CancellationToken;
55
56use crate::{
57 common::{
58 consts::OKX_VENUE,
59 enums::{OKXBookChannel, OKXContractType, OKXInstrumentType, OKXVipLevel},
60 parse::okx_instrument_type_from_symbol,
61 },
62 config::OKXDataClientConfig,
63 http::client::OKXHttpClient,
64 websocket::{client::OKXWebSocketClient, messages::NautilusWsMessage},
65};
66
67#[derive(Debug)]
68pub struct OKXDataClient {
69 client_id: ClientId,
70 config: OKXDataClientConfig,
71 http_client: OKXHttpClient,
72 ws_public: Option<OKXWebSocketClient>,
73 ws_business: Option<OKXWebSocketClient>,
74 is_connected: AtomicBool,
75 cancellation_token: CancellationToken,
76 tasks: Vec<JoinHandle<()>>,
77 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
78 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
79 book_channels: Arc<RwLock<AHashMap<InstrumentId, OKXBookChannel>>>,
80 clock: &'static AtomicTime,
81}
82
83impl OKXDataClient {
84 pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
90 let clock = get_atomic_clock_realtime();
91 let data_sender = get_data_event_sender();
92
93 let http_client = if config.has_api_credentials() {
94 OKXHttpClient::with_credentials(
95 config.api_key.clone(),
96 config.api_secret.clone(),
97 config.api_passphrase.clone(),
98 config.base_url_http.clone(),
99 config.http_timeout_secs,
100 config.max_retries,
101 config.retry_delay_initial_ms,
102 config.retry_delay_max_ms,
103 config.is_demo,
104 config.http_proxy_url.clone(),
105 )?
106 } else {
107 OKXHttpClient::new(
108 config.base_url_http.clone(),
109 config.http_timeout_secs,
110 config.max_retries,
111 config.retry_delay_initial_ms,
112 config.retry_delay_max_ms,
113 config.is_demo,
114 config.http_proxy_url.clone(),
115 )?
116 };
117
118 let ws_public = OKXWebSocketClient::new(
119 Some(config.ws_public_url()),
120 None,
121 None,
122 None,
123 None,
124 Some(20), )
126 .context("failed to construct OKX public websocket client")?;
127
128 let ws_business = if config.requires_business_ws() {
129 Some(
130 OKXWebSocketClient::new(
131 Some(config.ws_business_url()),
132 config.api_key.clone(),
133 config.api_secret.clone(),
134 config.api_passphrase.clone(),
135 None,
136 Some(20), )
138 .context("failed to construct OKX business websocket client")?,
139 )
140 } else {
141 None
142 };
143
144 if let Some(vip_level) = config.vip_level {
145 ws_public.set_vip_level(vip_level);
146 if let Some(ref ws) = ws_business {
147 ws.set_vip_level(vip_level);
148 }
149 }
150
151 Ok(Self {
152 client_id,
153 config,
154 http_client,
155 ws_public: Some(ws_public),
156 ws_business,
157 is_connected: AtomicBool::new(false),
158 cancellation_token: CancellationToken::new(),
159 tasks: Vec::new(),
160 data_sender,
161 instruments: Arc::new(RwLock::new(AHashMap::new())),
162 book_channels: Arc::new(RwLock::new(AHashMap::new())),
163 clock,
164 })
165 }
166
167 fn venue(&self) -> Venue {
168 *OKX_VENUE
169 }
170
171 fn vip_level(&self) -> Option<OKXVipLevel> {
172 self.ws_public.as_ref().map(|ws| ws.vip_level())
173 }
174
175 fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
176 self.ws_public
177 .as_ref()
178 .context("public websocket client not initialized")
179 }
180
181 fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
182 self.ws_business
183 .as_ref()
184 .context("business websocket client not available (credentials required)")
185 }
186
187 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
188 if let Err(e) = sender.send(DataEvent::Data(data)) {
189 tracing::error!("Failed to emit data event: {e}");
190 }
191 }
192
193 fn spawn_ws<F>(&self, fut: F, context: &'static str)
194 where
195 F: Future<Output = anyhow::Result<()>> + Send + 'static,
196 {
197 get_runtime().spawn(async move {
198 if let Err(e) = fut.await {
199 tracing::error!("{context}: {e:?}");
200 }
201 });
202 }
203
204 fn handle_ws_message(
205 message: NautilusWsMessage,
206 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
207 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
208 ) {
209 match message {
210 NautilusWsMessage::Data(payloads) => {
211 for data in payloads {
212 Self::send_data(data_sender, data);
213 }
214 }
215 NautilusWsMessage::Deltas(deltas) => {
216 Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
217 }
218 NautilusWsMessage::FundingRates(updates) => {
219 emit_funding_rates(updates);
220 }
221 NautilusWsMessage::Instrument(instrument) => {
222 upsert_instrument(instruments, *instrument);
223 }
224 NautilusWsMessage::AccountUpdate(_)
225 | NautilusWsMessage::PositionUpdate(_)
226 | NautilusWsMessage::OrderAccepted(_)
227 | NautilusWsMessage::OrderCanceled(_)
228 | NautilusWsMessage::OrderExpired(_)
229 | NautilusWsMessage::OrderRejected(_)
230 | NautilusWsMessage::OrderCancelRejected(_)
231 | NautilusWsMessage::OrderModifyRejected(_)
232 | NautilusWsMessage::OrderTriggered(_)
233 | NautilusWsMessage::OrderUpdated(_)
234 | NautilusWsMessage::ExecutionReports(_) => {
235 tracing::debug!("Ignoring trading message on data client");
236 }
237 NautilusWsMessage::Error(e) => {
238 tracing::error!("OKX websocket error: {e:?}");
239 }
240 NautilusWsMessage::Raw(value) => {
241 tracing::debug!("Unhandled websocket payload: {value:?}");
242 }
243 NautilusWsMessage::Reconnected => {
244 tracing::info!("Websocket reconnected");
245 }
246 NautilusWsMessage::Authenticated => {
247 tracing::debug!("Websocket authenticated");
248 }
249 }
250 }
251}
252
253fn emit_funding_rates(updates: Vec<FundingRateUpdate>) {
254 if updates.is_empty() {
255 return;
256 }
257
258 for update in updates {
259 tracing::debug!(
260 "Received funding rate update for {} but forwarding is not yet supported",
261 update.instrument_id
262 );
263 }
264}
265
266fn upsert_instrument(
267 cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
268 instrument: InstrumentAny,
269) {
270 let mut guard = cache.write().expect(MUTEX_POISONED);
271 guard.insert(instrument.id(), instrument);
272}
273
274fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
275 contract_filter_with_config_types(config.contract_types.as_ref(), instrument)
276}
277
278fn contract_filter_with_config_types(
279 contract_types: Option<&Vec<OKXContractType>>,
280 instrument: &InstrumentAny,
281) -> bool {
282 match contract_types {
283 None => true,
284 Some(filter) if filter.is_empty() => true,
285 Some(filter) => {
286 let is_inverse = instrument.is_inverse();
287 (is_inverse && filter.contains(&OKXContractType::Inverse))
288 || (!is_inverse && filter.contains(&OKXContractType::Linear))
289 }
290 }
291}
292
293#[async_trait::async_trait(?Send)]
294impl DataClient for OKXDataClient {
295 fn client_id(&self) -> ClientId {
296 self.client_id
297 }
298
299 fn venue(&self) -> Option<Venue> {
300 Some(self.venue())
301 }
302
303 fn start(&mut self) -> anyhow::Result<()> {
304 tracing::info!(
305 client_id = %self.client_id,
306 vip_level = ?self.vip_level(),
307 instrument_types = ?self.config.instrument_types,
308 is_demo = self.config.is_demo,
309 http_proxy_url = ?self.config.http_proxy_url,
310 ws_proxy_url = ?self.config.ws_proxy_url,
311 "Started"
312 );
313 Ok(())
314 }
315
316 fn stop(&mut self) -> anyhow::Result<()> {
317 tracing::info!("Stopping {id}", id = self.client_id);
318 self.cancellation_token.cancel();
319 self.is_connected.store(false, Ordering::Relaxed);
320 Ok(())
321 }
322
323 fn reset(&mut self) -> anyhow::Result<()> {
324 tracing::debug!("Resetting {id}", id = self.client_id);
325 self.is_connected.store(false, Ordering::Relaxed);
326 self.cancellation_token = CancellationToken::new();
327 self.tasks.clear();
328 self.book_channels
329 .write()
330 .expect("book channel cache lock poisoned")
331 .clear();
332 Ok(())
333 }
334
335 fn dispose(&mut self) -> anyhow::Result<()> {
336 tracing::debug!("Disposing {id}", id = self.client_id);
337 self.stop()
338 }
339
340 async fn connect(&mut self) -> anyhow::Result<()> {
341 if self.is_connected() {
342 return Ok(());
343 }
344
345 let instrument_types = if self.config.instrument_types.is_empty() {
346 vec![OKXInstrumentType::Spot]
347 } else {
348 self.config.instrument_types.clone()
349 };
350
351 let mut all_instruments = Vec::new();
352 for inst_type in &instrument_types {
353 let mut fetched = self
354 .http_client
355 .request_instruments(*inst_type, None)
356 .await
357 .with_context(|| format!("failed to request OKX instruments for {inst_type:?}"))?;
358
359 fetched.retain(|instrument| contract_filter_with_config(&self.config, instrument));
360 self.http_client.cache_instruments(fetched.clone());
361
362 let mut guard = self.instruments.write().expect(MUTEX_POISONED);
363 for instrument in &fetched {
364 guard.insert(instrument.id(), instrument.clone());
365 }
366 drop(guard);
367
368 all_instruments.extend(fetched);
369 }
370
371 for instrument in all_instruments {
372 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
373 tracing::warn!("Failed to send instrument: {e}");
374 }
375 }
376
377 if let Some(ref mut ws) = self.ws_public {
378 let instruments: Vec<_> = self
380 .instruments
381 .read()
382 .expect(MUTEX_POISONED)
383 .values()
384 .cloned()
385 .collect();
386 ws.cache_instruments(instruments);
387
388 ws.connect()
389 .await
390 .context("failed to connect OKX public websocket")?;
391 ws.wait_until_active(10.0)
392 .await
393 .context("public websocket did not become active")?;
394
395 let stream = ws.stream();
396 let sender = self.data_sender.clone();
397 let insts = self.instruments.clone();
398 let cancel = self.cancellation_token.clone();
399 let handle = get_runtime().spawn(async move {
400 pin_mut!(stream);
401 loop {
402 tokio::select! {
403 Some(message) = stream.next() => {
404 Self::handle_ws_message(message, &sender, &insts);
405 }
406 _ = cancel.cancelled() => {
407 tracing::debug!("Public websocket stream task cancelled");
408 break;
409 }
410 }
411 }
412 });
413 self.tasks.push(handle);
414
415 for inst_type in &instrument_types {
416 ws.subscribe_instruments(*inst_type)
417 .await
418 .with_context(|| {
419 format!("failed to subscribe to instrument type {inst_type:?}")
420 })?;
421 }
422 }
423
424 if let Some(ref mut ws) = self.ws_business {
425 let instruments: Vec<_> = self
427 .instruments
428 .read()
429 .expect(MUTEX_POISONED)
430 .values()
431 .cloned()
432 .collect();
433 ws.cache_instruments(instruments);
434
435 ws.connect()
436 .await
437 .context("failed to connect OKX business websocket")?;
438 ws.wait_until_active(10.0)
439 .await
440 .context("business websocket did not become active")?;
441
442 let stream = ws.stream();
443 let sender = self.data_sender.clone();
444 let insts = self.instruments.clone();
445 let cancel = self.cancellation_token.clone();
446 let handle = get_runtime().spawn(async move {
447 pin_mut!(stream);
448 loop {
449 tokio::select! {
450 Some(message) = stream.next() => {
451 Self::handle_ws_message(message, &sender, &insts);
452 }
453 _ = cancel.cancelled() => {
454 tracing::debug!("Business websocket stream task cancelled");
455 break;
456 }
457 }
458 }
459 });
460 self.tasks.push(handle);
461 }
462
463 self.is_connected.store(true, Ordering::Release);
464 tracing::info!(client_id = %self.client_id, "Connected");
465 Ok(())
466 }
467
468 async fn disconnect(&mut self) -> anyhow::Result<()> {
469 if self.is_disconnected() {
470 return Ok(());
471 }
472
473 self.cancellation_token.cancel();
474
475 if let Some(ref ws) = self.ws_public
476 && let Err(e) = ws.unsubscribe_all().await
477 {
478 tracing::warn!("Failed to unsubscribe all from public websocket: {e:?}");
479 }
480 if let Some(ref ws) = self.ws_business
481 && let Err(e) = ws.unsubscribe_all().await
482 {
483 tracing::warn!("Failed to unsubscribe all from business websocket: {e:?}");
484 }
485
486 tokio::time::sleep(Duration::from_millis(500)).await;
488
489 if let Some(ref mut ws) = self.ws_public {
490 let _ = ws.close().await;
491 }
492 if let Some(ref mut ws) = self.ws_business {
493 let _ = ws.close().await;
494 }
495
496 let handles: Vec<_> = self.tasks.drain(..).collect();
497 for handle in handles {
498 if let Err(e) = handle.await {
499 tracing::error!("Error joining websocket task: {e}");
500 }
501 }
502
503 self.book_channels.write().expect(MUTEX_POISONED).clear();
504 self.is_connected.store(false, Ordering::Release);
505 tracing::info!(client_id = %self.client_id, "Disconnected");
506 Ok(())
507 }
508
509 fn is_connected(&self) -> bool {
510 self.is_connected.load(Ordering::Relaxed)
511 }
512
513 fn is_disconnected(&self) -> bool {
514 !self.is_connected()
515 }
516
517 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
518 for inst_type in &self.config.instrument_types {
519 let ws = self.public_ws()?.clone();
520 let inst_type = *inst_type;
521
522 self.spawn_ws(
523 async move {
524 ws.subscribe_instruments(inst_type)
525 .await
526 .context("instruments subscription")?;
527 Ok(())
528 },
529 "subscribe_instruments",
530 );
531 }
532 Ok(())
533 }
534
535 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
536 let instrument_id = cmd.instrument_id;
539 let ws = self.public_ws()?.clone();
540
541 self.spawn_ws(
542 async move {
543 ws.subscribe_instrument(instrument_id)
544 .await
545 .context("instrument type subscription")?;
546 Ok(())
547 },
548 "subscribe_instrument",
549 );
550 Ok(())
551 }
552
553 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
554 if cmd.book_type != BookType::L2_MBP {
555 anyhow::bail!("OKX only supports L2_MBP order book deltas");
556 }
557
558 let depth = cmd.depth.map_or(0, |d| d.get());
559 if !matches!(depth, 0 | 50 | 400) {
560 anyhow::bail!("invalid depth {depth}; valid values are 50 or 400");
561 }
562
563 let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
564 let channel = match depth {
565 50 => {
566 if vip < OKXVipLevel::Vip4 {
567 anyhow::bail!(
568 "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
569 );
570 }
571 OKXBookChannel::Books50L2Tbt
572 }
573 0 | 400 => {
574 if vip >= OKXVipLevel::Vip5 {
575 OKXBookChannel::BookL2Tbt
576 } else {
577 OKXBookChannel::Book
578 }
579 }
580 _ => unreachable!(),
581 };
582
583 let instrument_id = cmd.instrument_id;
584 let ws = self.public_ws()?.clone();
585 let book_channels = Arc::clone(&self.book_channels);
586
587 self.spawn_ws(
588 async move {
589 match channel {
590 OKXBookChannel::Books50L2Tbt => ws
591 .subscribe_book50_l2_tbt(instrument_id)
592 .await
593 .context("books50-l2-tbt subscription")?,
594 OKXBookChannel::BookL2Tbt => ws
595 .subscribe_book_l2_tbt(instrument_id)
596 .await
597 .context("books-l2-tbt subscription")?,
598 OKXBookChannel::Book => ws
599 .subscribe_books_channel(instrument_id)
600 .await
601 .context("books subscription")?,
602 }
603 book_channels
604 .write()
605 .expect("book channel cache lock poisoned")
606 .insert(instrument_id, channel);
607 Ok(())
608 },
609 "order book delta subscription",
610 );
611
612 Ok(())
613 }
614
615 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
616 if cmd.book_type != BookType::L2_MBP {
617 anyhow::bail!("OKX only supports L2_MBP order book snapshots");
618 }
619 let depth = cmd.depth.map_or(5, |d| d.get());
620 if depth != 5 {
621 anyhow::bail!("OKX only supports depth=5 snapshots");
622 }
623
624 let ws = self.public_ws()?.clone();
625 let instrument_id = cmd.instrument_id;
626
627 self.spawn_ws(
628 async move {
629 ws.subscribe_book_depth5(instrument_id)
630 .await
631 .context("books5 subscription")
632 },
633 "order book snapshot subscription",
634 );
635 Ok(())
636 }
637
638 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
639 let ws = self.public_ws()?.clone();
640 let instrument_id = cmd.instrument_id;
641
642 self.spawn_ws(
643 async move {
644 ws.subscribe_quotes(instrument_id)
645 .await
646 .context("quotes subscription")
647 },
648 "quote subscription",
649 );
650 Ok(())
651 }
652
653 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
654 let ws = self.public_ws()?.clone();
655 let instrument_id = cmd.instrument_id;
656
657 self.spawn_ws(
658 async move {
659 ws.subscribe_trades(instrument_id, false)
660 .await
661 .context("trades subscription")
662 },
663 "trade subscription",
664 );
665 Ok(())
666 }
667
668 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
669 let ws = self.public_ws()?.clone();
670 let instrument_id = cmd.instrument_id;
671
672 self.spawn_ws(
673 async move {
674 ws.subscribe_mark_prices(instrument_id)
675 .await
676 .context("mark price subscription")
677 },
678 "mark price subscription",
679 );
680 Ok(())
681 }
682
683 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
684 let ws = self.public_ws()?.clone();
685 let instrument_id = cmd.instrument_id;
686
687 self.spawn_ws(
688 async move {
689 ws.subscribe_index_prices(instrument_id)
690 .await
691 .context("index price subscription")
692 },
693 "index price subscription",
694 );
695 Ok(())
696 }
697
698 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
699 let ws = self.public_ws()?.clone();
700 let instrument_id = cmd.instrument_id;
701
702 self.spawn_ws(
703 async move {
704 ws.subscribe_funding_rates(instrument_id)
705 .await
706 .context("funding rate subscription")
707 },
708 "funding rate subscription",
709 );
710 Ok(())
711 }
712
713 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
714 let ws = self.business_ws()?.clone();
715 let bar_type = cmd.bar_type;
716
717 self.spawn_ws(
718 async move {
719 ws.subscribe_bars(bar_type)
720 .await
721 .context("bars subscription")
722 },
723 "bar subscription",
724 );
725 Ok(())
726 }
727
728 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
729 let ws = self.public_ws()?.clone();
730 let instrument_id = cmd.instrument_id;
731 let channel = self
732 .book_channels
733 .write()
734 .expect("book channel cache lock poisoned")
735 .remove(&instrument_id);
736
737 self.spawn_ws(
738 async move {
739 match channel {
740 Some(OKXBookChannel::Books50L2Tbt) => ws
741 .unsubscribe_book50_l2_tbt(instrument_id)
742 .await
743 .context("books50-l2-tbt unsubscribe")?,
744 Some(OKXBookChannel::BookL2Tbt) => ws
745 .unsubscribe_book_l2_tbt(instrument_id)
746 .await
747 .context("books-l2-tbt unsubscribe")?,
748 Some(OKXBookChannel::Book) => ws
749 .unsubscribe_book(instrument_id)
750 .await
751 .context("book unsubscribe")?,
752 None => {
753 tracing::warn!(
754 "Book channel not found for {instrument_id}; unsubscribing fallback channel"
755 );
756 ws.unsubscribe_book(instrument_id)
757 .await
758 .context("book fallback unsubscribe")?;
759 }
760 }
761 Ok(())
762 },
763 "order book unsubscribe",
764 );
765 Ok(())
766 }
767
768 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
769 let ws = self.public_ws()?.clone();
770 let instrument_id = cmd.instrument_id;
771
772 self.spawn_ws(
773 async move {
774 ws.unsubscribe_book_depth5(instrument_id)
775 .await
776 .context("book depth5 unsubscribe")
777 },
778 "order book snapshot unsubscribe",
779 );
780 Ok(())
781 }
782
783 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
784 let ws = self.public_ws()?.clone();
785 let instrument_id = cmd.instrument_id;
786
787 self.spawn_ws(
788 async move {
789 ws.unsubscribe_quotes(instrument_id)
790 .await
791 .context("quotes unsubscribe")
792 },
793 "quote unsubscribe",
794 );
795 Ok(())
796 }
797
798 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
799 let ws = self.public_ws()?.clone();
800 let instrument_id = cmd.instrument_id;
801
802 self.spawn_ws(
803 async move {
804 ws.unsubscribe_trades(instrument_id, false) .await
806 .context("trades unsubscribe")
807 },
808 "trade unsubscribe",
809 );
810 Ok(())
811 }
812
813 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
814 let ws = self.public_ws()?.clone();
815 let instrument_id = cmd.instrument_id;
816
817 self.spawn_ws(
818 async move {
819 ws.unsubscribe_mark_prices(instrument_id)
820 .await
821 .context("mark price unsubscribe")
822 },
823 "mark price unsubscribe",
824 );
825 Ok(())
826 }
827
828 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
829 let ws = self.public_ws()?.clone();
830 let instrument_id = cmd.instrument_id;
831
832 self.spawn_ws(
833 async move {
834 ws.unsubscribe_index_prices(instrument_id)
835 .await
836 .context("index price unsubscribe")
837 },
838 "index price unsubscribe",
839 );
840 Ok(())
841 }
842
843 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
844 let ws = self.public_ws()?.clone();
845 let instrument_id = cmd.instrument_id;
846
847 self.spawn_ws(
848 async move {
849 ws.unsubscribe_funding_rates(instrument_id)
850 .await
851 .context("funding rate unsubscribe")
852 },
853 "funding rate unsubscribe",
854 );
855 Ok(())
856 }
857
858 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
859 let ws = self.business_ws()?.clone();
860 let bar_type = cmd.bar_type;
861
862 self.spawn_ws(
863 async move {
864 ws.unsubscribe_bars(bar_type)
865 .await
866 .context("bars unsubscribe")
867 },
868 "bar unsubscribe",
869 );
870 Ok(())
871 }
872
873 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
874 let http = self.http_client.clone();
875 let sender = self.data_sender.clone();
876 let instruments_cache = self.instruments.clone();
877 let request_id = request.request_id;
878 let client_id = request.client_id.unwrap_or(self.client_id);
879 let venue = self.venue();
880 let start = request.start;
881 let end = request.end;
882 let params = request.params.clone();
883 let clock = self.clock;
884 let start_nanos = datetime_to_unix_nanos(start);
885 let end_nanos = datetime_to_unix_nanos(end);
886 let instrument_types = if self.config.instrument_types.is_empty() {
887 vec![OKXInstrumentType::Spot]
888 } else {
889 self.config.instrument_types.clone()
890 };
891 let contract_types = self.config.contract_types.clone();
892 let instrument_families = self.config.instrument_families.clone();
893
894 get_runtime().spawn(async move {
895 let mut all_instruments = Vec::new();
896
897 for inst_type in instrument_types {
898 let supports_family = matches!(
899 inst_type,
900 OKXInstrumentType::Futures
901 | OKXInstrumentType::Swap
902 | OKXInstrumentType::Option
903 );
904
905 let families = match (&instrument_families, inst_type, supports_family) {
906 (Some(families), OKXInstrumentType::Option, true) => families.clone(),
907 (Some(families), _, true) => families.clone(),
908 (None, OKXInstrumentType::Option, _) => {
909 tracing::warn!(
910 "Skipping OPTION type: instrument_families required but not configured"
911 );
912 continue;
913 }
914 _ => vec![],
915 };
916
917 if families.is_empty() {
918 match http.request_instruments(inst_type, None).await {
919 Ok(instruments) => {
920 for instrument in instruments {
921 if !contract_filter_with_config_types(
922 contract_types.as_ref(),
923 &instrument,
924 ) {
925 continue;
926 }
927
928 upsert_instrument(&instruments_cache, instrument.clone());
929 all_instruments.push(instrument);
930 }
931 }
932 Err(e) => {
933 tracing::error!("Failed to fetch instruments for {inst_type:?}: {e:?}");
934 }
935 }
936 } else {
937 for family in families {
938 match http
939 .request_instruments(inst_type, Some(family.clone()))
940 .await
941 {
942 Ok(instruments) => {
943 for instrument in instruments {
944 if !contract_filter_with_config_types(
945 contract_types.as_ref(),
946 &instrument,
947 ) {
948 continue;
949 }
950
951 upsert_instrument(&instruments_cache, instrument.clone());
952 all_instruments.push(instrument);
953 }
954 }
955 Err(e) => {
956 tracing::error!(
957 "Failed to fetch instruments for {inst_type:?} family {family}: {e:?}"
958 );
959 }
960 }
961 }
962 }
963 }
964
965 let response = DataResponse::Instruments(InstrumentsResponse::new(
966 request_id,
967 client_id,
968 venue,
969 all_instruments,
970 start_nanos,
971 end_nanos,
972 clock.get_time_ns(),
973 params,
974 ));
975
976 if let Err(e) = sender.send(DataEvent::Response(response)) {
977 tracing::error!("Failed to send instruments response: {e}");
978 }
979 });
980
981 Ok(())
982 }
983
984 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
985 let http = self.http_client.clone();
986 let sender = self.data_sender.clone();
987 let instruments = self.instruments.clone();
988 let instrument_id = request.instrument_id;
989 let request_id = request.request_id;
990 let client_id = request.client_id.unwrap_or(self.client_id);
991 let start = request.start;
992 let end = request.end;
993 let params = request.params.clone();
994 let clock = self.clock;
995 let start_nanos = datetime_to_unix_nanos(start);
996 let end_nanos = datetime_to_unix_nanos(end);
997 let instrument_types = if self.config.instrument_types.is_empty() {
998 vec![OKXInstrumentType::Spot]
999 } else {
1000 self.config.instrument_types.clone()
1001 };
1002 let contract_types = self.config.contract_types.clone();
1003
1004 get_runtime().spawn(async move {
1005 match http
1006 .request_instrument(instrument_id)
1007 .await
1008 .context("fetch instrument from API")
1009 {
1010 Ok(instrument) => {
1011 let inst_id = instrument.id();
1012 let symbol = inst_id.symbol.as_str();
1013 let inst_type = okx_instrument_type_from_symbol(symbol);
1014 if !instrument_types.contains(&inst_type) {
1015 tracing::error!(
1016 "Instrument {instrument_id} type {inst_type:?} not in configured types {instrument_types:?}"
1017 );
1018 return;
1019 }
1020
1021 if !contract_filter_with_config_types(contract_types.as_ref(), &instrument) {
1022 tracing::error!(
1023 "Instrument {instrument_id} filtered out by contract_types config"
1024 );
1025 return;
1026 }
1027
1028 upsert_instrument(&instruments, instrument.clone());
1029
1030 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1031 request_id,
1032 client_id,
1033 instrument.id(),
1034 instrument,
1035 start_nanos,
1036 end_nanos,
1037 clock.get_time_ns(),
1038 params,
1039 )));
1040
1041 if let Err(e) = sender.send(DataEvent::Response(response)) {
1042 tracing::error!("Failed to send instrument response: {e}");
1043 }
1044 }
1045 Err(e) => tracing::error!("Instrument request failed: {e:?}"),
1046 }
1047 });
1048
1049 Ok(())
1050 }
1051
1052 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
1053 let http = self.http_client.clone();
1054 let sender = self.data_sender.clone();
1055 let instrument_id = request.instrument_id;
1056 let start = request.start;
1057 let end = request.end;
1058 let limit = request.limit.map(|n| n.get() as u32);
1059 let request_id = request.request_id;
1060 let client_id = request.client_id.unwrap_or(self.client_id);
1061 let params = request.params.clone();
1062 let clock = self.clock;
1063 let start_nanos = datetime_to_unix_nanos(start);
1064 let end_nanos = datetime_to_unix_nanos(end);
1065
1066 get_runtime().spawn(async move {
1067 match http
1068 .request_trades(instrument_id, start, end, limit)
1069 .await
1070 .context("failed to request trades from OKX")
1071 {
1072 Ok(trades) => {
1073 let response = DataResponse::Trades(TradesResponse::new(
1074 request_id,
1075 client_id,
1076 instrument_id,
1077 trades,
1078 start_nanos,
1079 end_nanos,
1080 clock.get_time_ns(),
1081 params,
1082 ));
1083 if let Err(e) = sender.send(DataEvent::Response(response)) {
1084 tracing::error!("Failed to send trades response: {e}");
1085 }
1086 }
1087 Err(e) => tracing::error!("Trade request failed: {e:?}"),
1088 }
1089 });
1090
1091 Ok(())
1092 }
1093
1094 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1095 let http = self.http_client.clone();
1096 let sender = self.data_sender.clone();
1097 let bar_type = request.bar_type;
1098 let start = request.start;
1099 let end = request.end;
1100 let limit = request.limit.map(|n| n.get() as u32);
1101 let request_id = request.request_id;
1102 let client_id = request.client_id.unwrap_or(self.client_id);
1103 let params = request.params.clone();
1104 let clock = self.clock;
1105 let start_nanos = datetime_to_unix_nanos(start);
1106 let end_nanos = datetime_to_unix_nanos(end);
1107
1108 get_runtime().spawn(async move {
1109 match http
1110 .request_bars(bar_type, start, end, limit)
1111 .await
1112 .context("failed to request bars from OKX")
1113 {
1114 Ok(bars) => {
1115 let response = DataResponse::Bars(BarsResponse::new(
1116 request_id,
1117 client_id,
1118 bar_type,
1119 bars,
1120 start_nanos,
1121 end_nanos,
1122 clock.get_time_ns(),
1123 params,
1124 ));
1125 if let Err(e) = sender.send(DataEvent::Response(response)) {
1126 tracing::error!("Failed to send bars response: {e}");
1127 }
1128 }
1129 Err(e) => tracing::error!("Bar request failed: {e:?}"),
1130 }
1131 });
1132
1133 Ok(())
1134 }
1135}