1use std::{
19 future::Future,
20 sync::{
21 Arc, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use chrono::{DateTime, Utc};
29use futures_util::StreamExt;
30use nautilus_common::{
31 messages::{
32 DataEvent,
33 data::{
34 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
36 SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeFundingRates,
37 SubscribeIndexPrices, SubscribeInstrument, SubscribeInstruments, SubscribeMarkPrices,
38 SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
39 UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeFundingRates,
40 UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
41 },
42 },
43 runner::get_data_event_sender,
44};
45use nautilus_core::{
46 MUTEX_POISONED, UnixNanos,
47 time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_data::client::DataClient;
50use nautilus_model::{
51 data::{Data, FundingRateUpdate, OrderBookDeltas_API},
52 enums::BookType,
53 identifiers::{ClientId, InstrumentId, Venue},
54 instruments::{Instrument, InstrumentAny},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58
59use crate::{
60 common::{
61 consts::OKX_VENUE,
62 enums::{OKXBookChannel, OKXContractType, OKXInstrumentType, OKXVipLevel},
63 parse::okx_instrument_type_from_symbol,
64 },
65 config::OKXDataClientConfig,
66 http::client::OKXHttpClient,
67 websocket::{client::OKXWebSocketClient, messages::NautilusWsMessage},
68};
69
70#[derive(Debug)]
71pub struct OKXDataClient {
72 client_id: ClientId,
73 config: OKXDataClientConfig,
74 http_client: OKXHttpClient,
75 ws_public: Option<OKXWebSocketClient>,
76 ws_business: Option<OKXWebSocketClient>,
77 is_connected: AtomicBool,
78 cancellation_token: CancellationToken,
79 tasks: Vec<JoinHandle<()>>,
80 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
81 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
82 book_channels: Arc<RwLock<AHashMap<InstrumentId, OKXBookChannel>>>,
83 clock: &'static AtomicTime,
84 instrument_refresh_active: bool,
85}
86
87impl OKXDataClient {
88 pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
94 let clock = get_atomic_clock_realtime();
95 let data_sender = get_data_event_sender();
96
97 let http_client = if config.has_api_credentials() {
98 OKXHttpClient::with_credentials(
99 config.api_key.clone(),
100 config.api_secret.clone(),
101 config.api_passphrase.clone(),
102 config.base_url_http.clone(),
103 config.http_timeout_secs,
104 config.max_retries,
105 config.retry_delay_initial_ms,
106 config.retry_delay_max_ms,
107 config.is_demo,
108 config.http_proxy_url.clone(),
109 )?
110 } else {
111 OKXHttpClient::new(
112 config.base_url_http.clone(),
113 config.http_timeout_secs,
114 config.max_retries,
115 config.retry_delay_initial_ms,
116 config.retry_delay_max_ms,
117 config.is_demo,
118 config.http_proxy_url.clone(),
119 )?
120 };
121
122 let ws_public = OKXWebSocketClient::new(
123 Some(config.ws_public_url()),
124 None,
125 None,
126 None,
127 None,
128 Some(20), )
130 .context("failed to construct OKX public websocket client")?;
131
132 let ws_business = if config.requires_business_ws() {
133 Some(
134 OKXWebSocketClient::new(
135 Some(config.ws_business_url()),
136 config.api_key.clone(),
137 config.api_secret.clone(),
138 config.api_passphrase.clone(),
139 None,
140 Some(20), )
142 .context("failed to construct OKX business websocket client")?,
143 )
144 } else {
145 None
146 };
147
148 if let Some(vip_level) = config.vip_level {
149 ws_public.set_vip_level(vip_level);
150 if let Some(ref ws) = ws_business {
151 ws.set_vip_level(vip_level);
152 }
153 }
154
155 Ok(Self {
156 client_id,
157 config,
158 http_client,
159 ws_public: Some(ws_public),
160 ws_business,
161 is_connected: AtomicBool::new(false),
162 cancellation_token: CancellationToken::new(),
163 tasks: Vec::new(),
164 data_sender,
165 instruments: Arc::new(RwLock::new(AHashMap::new())),
166 book_channels: Arc::new(RwLock::new(AHashMap::new())),
167 clock,
168 instrument_refresh_active: false,
169 })
170 }
171
172 fn venue(&self) -> Venue {
173 *OKX_VENUE
174 }
175
176 fn vip_level(&self) -> Option<OKXVipLevel> {
177 self.ws_public.as_ref().map(|ws| ws.vip_level())
178 }
179
180 fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
181 self.ws_public
182 .as_ref()
183 .context("public websocket client not initialized")
184 }
185
186 fn public_ws_mut(&mut self) -> anyhow::Result<&mut OKXWebSocketClient> {
187 self.ws_public
188 .as_mut()
189 .context("public websocket client not initialized")
190 }
191
192 fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
193 self.ws_business
194 .as_ref()
195 .context("business websocket client not available (credentials required)")
196 }
197
198 fn business_ws_mut(&mut self) -> anyhow::Result<&mut OKXWebSocketClient> {
199 self.ws_business
200 .as_mut()
201 .context("business websocket client not available (credentials required)")
202 }
203
204 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
205 if let Err(e) = sender.send(DataEvent::Data(data)) {
206 tracing::error!("Failed to emit data event: {e}");
207 }
208 }
209
210 fn spawn_ws<F>(&self, fut: F, context: &'static str)
211 where
212 F: Future<Output = anyhow::Result<()>> + Send + 'static,
213 {
214 tokio::spawn(async move {
215 if let Err(e) = fut.await {
216 tracing::error!("{context}: {e:?}");
217 }
218 });
219 }
220
221 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
222 let instrument_types = if self.config.instrument_types.is_empty() {
223 vec![OKXInstrumentType::Spot]
224 } else {
225 self.config.instrument_types.clone()
226 };
227
228 let mut collected: Vec<InstrumentAny> = Vec::new();
229
230 for inst_type in instrument_types {
231 let mut instruments = self
232 .http_client
233 .request_instruments(inst_type, None)
234 .await
235 .with_context(|| format!("failed to load instruments for {inst_type:?}"))?;
236 instruments.retain(|instrument| self.contract_filter(instrument));
237 tracing::debug!(
238 "loaded {count} instruments for {inst_type:?}",
239 count = instruments.len()
240 );
241 collected.extend(instruments);
242 }
243
244 if collected.is_empty() {
245 tracing::warn!("No OKX instruments were loaded");
246 return Ok(collected);
247 }
248
249 self.http_client.cache_instruments(collected.clone());
250
251 if let Some(ws) = self.ws_public.as_mut() {
252 ws.cache_instruments(collected.clone());
253 }
254
255 if let Some(ws) = self.ws_business.as_mut() {
256 ws.cache_instruments(collected.clone());
257 }
258
259 {
260 let mut guard = self
261 .instruments
262 .write()
263 .expect("instrument cache lock poisoned");
264 guard.clear();
265 for instrument in &collected {
266 guard.insert(instrument.id(), instrument.clone());
267 }
268 }
269
270 Ok(collected)
271 }
272
273 fn contract_filter(&self, instrument: &InstrumentAny) -> bool {
274 contract_filter_with_config(&self.config, instrument)
275 }
276
277 fn handle_ws_message(
278 message: NautilusWsMessage,
279 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
280 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
281 ) {
282 match message {
283 NautilusWsMessage::Data(payloads) => {
284 for data in payloads {
285 Self::send_data(data_sender, data);
286 }
287 }
288 NautilusWsMessage::Deltas(deltas) => {
289 Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
290 }
291 NautilusWsMessage::FundingRates(updates) => {
292 emit_funding_rates(updates);
293 }
294 NautilusWsMessage::Instrument(instrument) => {
295 upsert_instrument(instruments, *instrument);
296 }
297 NautilusWsMessage::AccountUpdate(_)
298 | NautilusWsMessage::PositionUpdate(_)
299 | NautilusWsMessage::OrderRejected(_)
300 | NautilusWsMessage::OrderCancelRejected(_)
301 | NautilusWsMessage::OrderModifyRejected(_)
302 | NautilusWsMessage::ExecutionReports(_) => {
303 tracing::debug!("Ignoring trading message on data client");
304 }
305 NautilusWsMessage::Error(e) => {
306 tracing::error!("OKX websocket error: {e:?}");
307 }
308 NautilusWsMessage::Raw(value) => {
309 tracing::debug!("Unhandled websocket payload: {value:?}");
310 }
311 NautilusWsMessage::Reconnected => {
312 tracing::info!("Websocket reconnected");
313 }
314 NautilusWsMessage::Authenticated => {
315 tracing::debug!("Websocket authenticated");
316 }
317 }
318 }
319
320 fn spawn_public_stream(&mut self) -> anyhow::Result<()> {
321 let ws = self.public_ws_mut()?;
322 let stream = ws.stream();
323 self.spawn_stream_task(stream)
324 }
325
326 fn spawn_business_stream(&mut self) -> anyhow::Result<()> {
327 if self.ws_business.is_none() {
328 return Ok(());
329 }
330
331 let ws = self.business_ws_mut()?;
332 let stream = ws.stream();
333 self.spawn_stream_task(stream)
334 }
335
336 fn spawn_stream_task(
337 &mut self,
338 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
339 ) -> anyhow::Result<()> {
340 let data_sender = self.data_sender.clone();
341 let instruments = self.instruments.clone();
342 let cancellation = self.cancellation_token.clone();
343
344 let handle = tokio::spawn(async move {
345 tokio::pin!(stream);
346
347 loop {
348 tokio::select! {
349 maybe_msg = stream.next() => {
350 match maybe_msg {
351 Some(msg) => {
352 Self::handle_ws_message(msg, &data_sender, &instruments);
353 }
354 None => {
355 tracing::debug!("Websocket stream ended");
356 break;
357 }
358 }
359 }
360 _ = cancellation.cancelled() => {
361 tracing::debug!("Websocket stream task cancelled");
362 break;
363 }
364 }
365 }
366 });
367
368 self.tasks.push(handle);
369 Ok(())
370 }
371
372 fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
373 let Some(minutes) = self.config.update_instruments_interval_mins else {
374 return Ok(());
375 };
376
377 if minutes == 0 || self.instrument_refresh_active {
378 return Ok(());
379 }
380
381 let interval_secs = minutes.saturating_mul(60);
382 if interval_secs == 0 {
383 return Ok(());
384 }
385
386 let interval = Duration::from_secs(interval_secs);
387 let cancellation = self.cancellation_token.clone();
388 let instruments_cache = Arc::clone(&self.instruments);
389 let http_client = self.http_client.clone();
390 let config = self.config.clone();
391 let client_id = self.client_id;
392
393 let handle = tokio::spawn(async move {
394 loop {
395 let sleep = tokio::time::sleep(interval);
396 tokio::pin!(sleep);
397 tokio::select! {
398 _ = cancellation.cancelled() => {
399 tracing::debug!("OKX instrument refresh task cancelled");
400 break;
401 }
402 _ = &mut sleep => {
403 let instrument_types = if config.instrument_types.is_empty() {
404 vec![OKXInstrumentType::Spot]
405 } else {
406 config.instrument_types.clone()
407 };
408
409 let mut collected: Vec<InstrumentAny> = Vec::new();
410
411 for inst_type in instrument_types {
412 match http_client.request_instruments(inst_type, None).await {
413 Ok(mut instruments) => {
414 instruments.retain(|instrument| contract_filter_with_config(&config, instrument));
415 collected.extend(instruments);
416 }
417 Err(e) => {
418 tracing::warn!(client_id=%client_id, instrument_type=?inst_type, error=?e, "Failed to refresh OKX instruments for type");
419 }
420 }
421 }
422
423 if collected.is_empty() {
424 tracing::debug!(client_id=%client_id, "OKX instrument refresh yielded no instruments");
425 continue;
426 }
427
428 http_client.cache_instruments(collected.clone());
429
430 {
431 let mut guard = instruments_cache
432 .write()
433 .expect("instrument cache lock poisoned");
434 guard.clear();
435 for instrument in &collected {
436 guard.insert(instrument.id(), instrument.clone());
437 }
438 }
439
440 tracing::debug!(client_id=%client_id, count=collected.len(), "OKX instruments refreshed");
441 }
442 }
443 }
444 });
445
446 self.tasks.push(handle);
447 self.instrument_refresh_active = true;
448 Ok(())
449 }
450}
451
452fn emit_funding_rates(updates: Vec<FundingRateUpdate>) {
453 if updates.is_empty() {
454 return;
455 }
456
457 for update in updates {
458 tracing::debug!(
459 "Received funding rate update for {} but forwarding is not yet supported",
460 update.instrument_id
461 );
462 }
463}
464
465fn upsert_instrument(
466 cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
467 instrument: InstrumentAny,
468) {
469 let mut guard = cache.write().expect(MUTEX_POISONED);
470 guard.insert(instrument.id(), instrument);
471}
472
473fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
474 value
475 .and_then(|dt| dt.timestamp_nanos_opt())
476 .and_then(|nanos| u64::try_from(nanos).ok())
477 .map(UnixNanos::from)
478}
479
480fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
481 contract_filter_with_config_types(config.contract_types.as_ref(), instrument)
482}
483
484fn contract_filter_with_config_types(
485 contract_types: Option<&Vec<OKXContractType>>,
486 instrument: &InstrumentAny,
487) -> bool {
488 match contract_types {
489 None => true,
490 Some(filter) if filter.is_empty() => true,
491 Some(filter) => {
492 let is_inverse = instrument.is_inverse();
493 (is_inverse && filter.contains(&OKXContractType::Inverse))
494 || (!is_inverse && filter.contains(&OKXContractType::Linear))
495 }
496 }
497}
498
499#[async_trait::async_trait]
500impl DataClient for OKXDataClient {
501 fn client_id(&self) -> ClientId {
502 self.client_id
503 }
504
505 fn venue(&self) -> Option<Venue> {
506 Some(self.venue())
507 }
508
509 fn start(&mut self) -> anyhow::Result<()> {
510 tracing::info!(
511 client_id = %self.client_id,
512 vip_level = ?self.vip_level(),
513 instrument_types = ?self.config.instrument_types,
514 is_demo = self.config.is_demo,
515 http_proxy_url = ?self.config.http_proxy_url,
516 ws_proxy_url = ?self.config.ws_proxy_url,
517 "Starting OKX data client"
518 );
519 Ok(())
520 }
521
522 fn stop(&mut self) -> anyhow::Result<()> {
523 tracing::info!("Stopping OKX data client {id}", id = self.client_id);
524 self.cancellation_token.cancel();
525 self.is_connected.store(false, Ordering::Relaxed);
526 self.instrument_refresh_active = false;
527 Ok(())
528 }
529
530 fn reset(&mut self) -> anyhow::Result<()> {
531 tracing::debug!("Resetting OKX data client {id}", id = self.client_id);
532 self.is_connected.store(false, Ordering::Relaxed);
533 self.cancellation_token = CancellationToken::new();
534 self.tasks.clear();
535 self.book_channels
536 .write()
537 .expect("book channel cache lock poisoned")
538 .clear();
539 self.instrument_refresh_active = false;
540 Ok(())
541 }
542
543 fn dispose(&mut self) -> anyhow::Result<()> {
544 tracing::debug!("Disposing OKX data client {id}", id = self.client_id);
545 self.stop()
546 }
547
548 async fn connect(&mut self) -> anyhow::Result<()> {
549 if self.is_connected() {
550 return Ok(());
551 }
552
553 self.bootstrap_instruments().await?;
554
555 {
556 let ws_public = self.public_ws_mut()?;
557 ws_public
558 .connect()
559 .await
560 .context("failed to connect OKX public websocket")?;
561 ws_public
562 .wait_until_active(10.0)
563 .await
564 .context("public websocket did not become active")?;
565 }
566
567 let instrument_types = if self.config.instrument_types.is_empty() {
568 vec![OKXInstrumentType::Spot]
569 } else {
570 self.config.instrument_types.clone()
571 };
572
573 self.spawn_public_stream()?;
574
575 let public_clone = self.public_ws()?.clone();
576
577 self.spawn_ws(
578 async move {
579 for inst_type in instrument_types {
580 public_clone
581 .subscribe_instruments(inst_type)
582 .await
583 .with_context(|| {
584 format!("failed to subscribe to instrument type {inst_type:?}")
585 })?;
586 }
587 Ok(())
588 },
589 "instrument subscription",
590 );
591
592 if self.ws_business.is_some() {
593 {
594 let ws_business = self.business_ws_mut()?;
595 ws_business
596 .connect()
597 .await
598 .context("failed to connect OKX business websocket")?;
599 ws_business
600 .wait_until_active(10.0)
601 .await
602 .context("business websocket did not become active")?;
603 }
604 self.spawn_business_stream()?;
605 }
606
607 self.maybe_spawn_instrument_refresh()?;
608
609 self.is_connected.store(true, Ordering::Relaxed);
610 tracing::info!(client_id = %self.client_id, "Connected");
611 Ok(())
612 }
613
614 async fn disconnect(&mut self) -> anyhow::Result<()> {
615 if self.is_disconnected() {
616 return Ok(());
617 }
618
619 if let Some(ws) = self.ws_public.as_ref()
620 && let Err(e) = ws.unsubscribe_all().await
621 {
622 tracing::warn!("Failed to unsubscribe all from public websocket: {e:?}");
623 }
624 if let Some(ws) = self.ws_business.as_ref()
625 && let Err(e) = ws.unsubscribe_all().await
626 {
627 tracing::warn!("Failed to unsubscribe all from business websocket: {e:?}");
628 }
629
630 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
632
633 self.cancellation_token.cancel();
634
635 if let Some(ws) = self.ws_public.as_mut() {
636 let _ = ws.close().await;
637 }
638 if let Some(ws) = self.ws_business.as_mut() {
639 let _ = ws.close().await;
640 }
641
642 for handle in self.tasks.drain(..) {
643 if let Err(e) = handle.await {
644 tracing::error!("Error joining websocket task: {e}");
645 }
646 }
647
648 self.cancellation_token = CancellationToken::new();
649 self.is_connected.store(false, Ordering::Relaxed);
650 self.book_channels
651 .write()
652 .expect("book channel cache lock poisoned")
653 .clear();
654 self.instrument_refresh_active = false;
655
656 tracing::info!(client_id = %self.client_id, "Disconnected");
657 Ok(())
658 }
659
660 fn is_connected(&self) -> bool {
661 self.is_connected.load(Ordering::Relaxed)
662 }
663
664 fn is_disconnected(&self) -> bool {
665 !self.is_connected()
666 }
667
668 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
669 for inst_type in &self.config.instrument_types {
671 let ws = self.public_ws()?.clone();
672 let inst_type = *inst_type;
673
674 self.spawn_ws(
675 async move {
676 ws.subscribe_instruments(inst_type)
677 .await
678 .context("instruments subscription")?;
679 Ok(())
680 },
681 "subscribe_instruments",
682 );
683 }
684 Ok(())
685 }
686
687 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
688 let instrument_id = cmd.instrument_id;
691 let ws = self.public_ws()?.clone();
692
693 self.spawn_ws(
694 async move {
695 ws.subscribe_instrument(instrument_id)
696 .await
697 .context("instrument type subscription")?;
698 Ok(())
699 },
700 "subscribe_instrument",
701 );
702 Ok(())
703 }
704
705 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
706 if cmd.book_type != BookType::L2_MBP {
707 anyhow::bail!("OKX only supports L2_MBP order book deltas");
708 }
709
710 let depth = cmd.depth.map_or(0, |d| d.get());
711 if !matches!(depth, 0 | 50 | 400) {
712 anyhow::bail!("invalid depth {depth}; valid values are 50 or 400");
713 }
714
715 let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
716 let channel = match depth {
717 50 => {
718 if vip < OKXVipLevel::Vip4 {
719 anyhow::bail!(
720 "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
721 );
722 }
723 OKXBookChannel::Books50L2Tbt
724 }
725 0 | 400 => {
726 if vip >= OKXVipLevel::Vip5 {
727 OKXBookChannel::BookL2Tbt
728 } else {
729 OKXBookChannel::Book
730 }
731 }
732 _ => unreachable!(),
733 };
734
735 let instrument_id = cmd.instrument_id;
736 let ws = self.public_ws()?.clone();
737 let book_channels = Arc::clone(&self.book_channels);
738
739 self.spawn_ws(
740 async move {
741 match channel {
742 OKXBookChannel::Books50L2Tbt => ws
743 .subscribe_book50_l2_tbt(instrument_id)
744 .await
745 .context("books50-l2-tbt subscription")?,
746 OKXBookChannel::BookL2Tbt => ws
747 .subscribe_book_l2_tbt(instrument_id)
748 .await
749 .context("books-l2-tbt subscription")?,
750 OKXBookChannel::Book => ws
751 .subscribe_books_channel(instrument_id)
752 .await
753 .context("books subscription")?,
754 }
755 book_channels
756 .write()
757 .expect("book channel cache lock poisoned")
758 .insert(instrument_id, channel);
759 Ok(())
760 },
761 "order book delta subscription",
762 );
763
764 Ok(())
765 }
766
767 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
768 if cmd.book_type != BookType::L2_MBP {
769 anyhow::bail!("OKX only supports L2_MBP order book snapshots");
770 }
771 let depth = cmd.depth.map_or(5, |d| d.get());
772 if depth != 5 {
773 anyhow::bail!("OKX only supports depth=5 snapshots");
774 }
775
776 let ws = self.public_ws()?.clone();
777 let instrument_id = cmd.instrument_id;
778
779 self.spawn_ws(
780 async move {
781 ws.subscribe_book_depth5(instrument_id)
782 .await
783 .context("books5 subscription")
784 },
785 "order book snapshot subscription",
786 );
787 Ok(())
788 }
789
790 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
791 let ws = self.public_ws()?.clone();
792 let instrument_id = cmd.instrument_id;
793
794 self.spawn_ws(
795 async move {
796 ws.subscribe_quotes(instrument_id)
797 .await
798 .context("quotes subscription")
799 },
800 "quote subscription",
801 );
802 Ok(())
803 }
804
805 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
806 let ws = self.public_ws()?.clone();
807 let instrument_id = cmd.instrument_id;
808
809 self.spawn_ws(
810 async move {
811 ws.subscribe_trades(instrument_id, false)
812 .await
813 .context("trades subscription")
814 },
815 "trade subscription",
816 );
817 Ok(())
818 }
819
820 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
821 let ws = self.public_ws()?.clone();
822 let instrument_id = cmd.instrument_id;
823
824 self.spawn_ws(
825 async move {
826 ws.subscribe_mark_prices(instrument_id)
827 .await
828 .context("mark price subscription")
829 },
830 "mark price subscription",
831 );
832 Ok(())
833 }
834
835 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
836 let ws = self.public_ws()?.clone();
837 let instrument_id = cmd.instrument_id;
838
839 self.spawn_ws(
840 async move {
841 ws.subscribe_index_prices(instrument_id)
842 .await
843 .context("index price subscription")
844 },
845 "index price subscription",
846 );
847 Ok(())
848 }
849
850 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
851 let ws = self.public_ws()?.clone();
852 let instrument_id = cmd.instrument_id;
853
854 self.spawn_ws(
855 async move {
856 ws.subscribe_funding_rates(instrument_id)
857 .await
858 .context("funding rate subscription")
859 },
860 "funding rate subscription",
861 );
862 Ok(())
863 }
864
865 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
866 let ws = self.business_ws()?.clone();
867 let bar_type = cmd.bar_type;
868
869 self.spawn_ws(
870 async move {
871 ws.subscribe_bars(bar_type)
872 .await
873 .context("bars subscription")
874 },
875 "bar subscription",
876 );
877 Ok(())
878 }
879
880 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
881 let ws = self.public_ws()?.clone();
882 let instrument_id = cmd.instrument_id;
883 let channel = self
884 .book_channels
885 .write()
886 .expect("book channel cache lock poisoned")
887 .remove(&instrument_id);
888
889 self.spawn_ws(
890 async move {
891 match channel {
892 Some(OKXBookChannel::Books50L2Tbt) => ws
893 .unsubscribe_book50_l2_tbt(instrument_id)
894 .await
895 .context("books50-l2-tbt unsubscribe")?,
896 Some(OKXBookChannel::BookL2Tbt) => ws
897 .unsubscribe_book_l2_tbt(instrument_id)
898 .await
899 .context("books-l2-tbt unsubscribe")?,
900 Some(OKXBookChannel::Book) => ws
901 .unsubscribe_book(instrument_id)
902 .await
903 .context("book unsubscribe")?,
904 None => {
905 tracing::warn!(
906 "Book channel not found for {instrument_id}; unsubscribing fallback channel"
907 );
908 ws.unsubscribe_book(instrument_id)
909 .await
910 .context("book fallback unsubscribe")?;
911 }
912 }
913 Ok(())
914 },
915 "order book unsubscribe",
916 );
917 Ok(())
918 }
919
920 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
921 let ws = self.public_ws()?.clone();
922 let instrument_id = cmd.instrument_id;
923
924 self.spawn_ws(
925 async move {
926 ws.unsubscribe_book_depth5(instrument_id)
927 .await
928 .context("book depth5 unsubscribe")
929 },
930 "order book snapshot unsubscribe",
931 );
932 Ok(())
933 }
934
935 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
936 let ws = self.public_ws()?.clone();
937 let instrument_id = cmd.instrument_id;
938
939 self.spawn_ws(
940 async move {
941 ws.unsubscribe_quotes(instrument_id)
942 .await
943 .context("quotes unsubscribe")
944 },
945 "quote unsubscribe",
946 );
947 Ok(())
948 }
949
950 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
951 let ws = self.public_ws()?.clone();
952 let instrument_id = cmd.instrument_id;
953
954 self.spawn_ws(
955 async move {
956 ws.unsubscribe_trades(instrument_id, false) .await
958 .context("trades unsubscribe")
959 },
960 "trade unsubscribe",
961 );
962 Ok(())
963 }
964
965 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
966 let ws = self.public_ws()?.clone();
967 let instrument_id = cmd.instrument_id;
968
969 self.spawn_ws(
970 async move {
971 ws.unsubscribe_mark_prices(instrument_id)
972 .await
973 .context("mark price unsubscribe")
974 },
975 "mark price unsubscribe",
976 );
977 Ok(())
978 }
979
980 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
981 let ws = self.public_ws()?.clone();
982 let instrument_id = cmd.instrument_id;
983
984 self.spawn_ws(
985 async move {
986 ws.unsubscribe_index_prices(instrument_id)
987 .await
988 .context("index price unsubscribe")
989 },
990 "index price unsubscribe",
991 );
992 Ok(())
993 }
994
995 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
996 let ws = self.public_ws()?.clone();
997 let instrument_id = cmd.instrument_id;
998
999 self.spawn_ws(
1000 async move {
1001 ws.unsubscribe_funding_rates(instrument_id)
1002 .await
1003 .context("funding rate unsubscribe")
1004 },
1005 "funding rate unsubscribe",
1006 );
1007 Ok(())
1008 }
1009
1010 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1011 let ws = self.business_ws()?.clone();
1012 let bar_type = cmd.bar_type;
1013
1014 self.spawn_ws(
1015 async move {
1016 ws.unsubscribe_bars(bar_type)
1017 .await
1018 .context("bars unsubscribe")
1019 },
1020 "bar unsubscribe",
1021 );
1022 Ok(())
1023 }
1024
1025 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
1026 let http = self.http_client.clone();
1027 let sender = self.data_sender.clone();
1028 let instruments_cache = self.instruments.clone();
1029 let request_id = request.request_id;
1030 let client_id = request.client_id.unwrap_or(self.client_id);
1031 let venue = self.venue();
1032 let start = request.start;
1033 let end = request.end;
1034 let params = request.params.clone();
1035 let clock = self.clock;
1036 let start_nanos = datetime_to_unix_nanos(start);
1037 let end_nanos = datetime_to_unix_nanos(end);
1038 let instrument_types = if self.config.instrument_types.is_empty() {
1039 vec![OKXInstrumentType::Spot]
1040 } else {
1041 self.config.instrument_types.clone()
1042 };
1043 let contract_types = self.config.contract_types.clone();
1044 let instrument_families = self.config.instrument_families.clone();
1045
1046 tokio::spawn(async move {
1047 let mut all_instruments = Vec::new();
1048
1049 for inst_type in instrument_types {
1050 let supports_family = matches!(
1051 inst_type,
1052 OKXInstrumentType::Futures
1053 | OKXInstrumentType::Swap
1054 | OKXInstrumentType::Option
1055 );
1056
1057 let families = match (&instrument_families, inst_type, supports_family) {
1058 (Some(families), OKXInstrumentType::Option, true) => families.clone(),
1059 (Some(families), _, true) => families.clone(),
1060 (None, OKXInstrumentType::Option, _) => {
1061 tracing::warn!(
1062 "Skipping OPTION type: instrument_families required but not configured"
1063 );
1064 continue;
1065 }
1066 _ => vec![],
1067 };
1068
1069 if families.is_empty() {
1070 match http.request_instruments(inst_type, None).await {
1071 Ok(instruments) => {
1072 for instrument in instruments {
1073 if !contract_filter_with_config_types(
1074 contract_types.as_ref(),
1075 &instrument,
1076 ) {
1077 continue;
1078 }
1079
1080 upsert_instrument(&instruments_cache, instrument.clone());
1081 all_instruments.push(instrument);
1082 }
1083 }
1084 Err(e) => {
1085 tracing::error!("Failed to fetch instruments for {inst_type:?}: {e:?}");
1086 }
1087 }
1088 } else {
1089 for family in families {
1090 match http
1091 .request_instruments(inst_type, Some(family.clone()))
1092 .await
1093 {
1094 Ok(instruments) => {
1095 for instrument in instruments {
1096 if !contract_filter_with_config_types(
1097 contract_types.as_ref(),
1098 &instrument,
1099 ) {
1100 continue;
1101 }
1102
1103 upsert_instrument(&instruments_cache, instrument.clone());
1104 all_instruments.push(instrument);
1105 }
1106 }
1107 Err(e) => {
1108 tracing::error!(
1109 "Failed to fetch instruments for {inst_type:?} family {family}: {e:?}"
1110 );
1111 }
1112 }
1113 }
1114 }
1115 }
1116
1117 let response = DataResponse::Instruments(InstrumentsResponse::new(
1118 request_id,
1119 client_id,
1120 venue,
1121 all_instruments,
1122 start_nanos,
1123 end_nanos,
1124 clock.get_time_ns(),
1125 params,
1126 ));
1127
1128 if let Err(e) = sender.send(DataEvent::Response(response)) {
1129 tracing::error!("Failed to send instruments response: {e}");
1130 }
1131 });
1132
1133 Ok(())
1134 }
1135
1136 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
1137 let http = self.http_client.clone();
1138 let sender = self.data_sender.clone();
1139 let instruments = self.instruments.clone();
1140 let instrument_id = request.instrument_id;
1141 let request_id = request.request_id;
1142 let client_id = request.client_id.unwrap_or(self.client_id);
1143 let start = request.start;
1144 let end = request.end;
1145 let params = request.params.clone();
1146 let clock = self.clock;
1147 let start_nanos = datetime_to_unix_nanos(start);
1148 let end_nanos = datetime_to_unix_nanos(end);
1149 let instrument_types = if self.config.instrument_types.is_empty() {
1150 vec![OKXInstrumentType::Spot]
1151 } else {
1152 self.config.instrument_types.clone()
1153 };
1154 let contract_types = self.config.contract_types.clone();
1155
1156 tokio::spawn(async move {
1157 match http
1158 .request_instrument(instrument_id)
1159 .await
1160 .context("fetch instrument from API")
1161 {
1162 Ok(instrument) => {
1163 let inst_id = instrument.id();
1164 let symbol = inst_id.symbol.as_str();
1165 let inst_type = okx_instrument_type_from_symbol(symbol);
1166 if !instrument_types.contains(&inst_type) {
1167 tracing::error!(
1168 "Instrument {instrument_id} type {inst_type:?} not in configured types {instrument_types:?}"
1169 );
1170 return;
1171 }
1172
1173 if !contract_filter_with_config_types(contract_types.as_ref(), &instrument) {
1174 tracing::error!(
1175 "Instrument {instrument_id} filtered out by contract_types config"
1176 );
1177 return;
1178 }
1179
1180 upsert_instrument(&instruments, instrument.clone());
1181
1182 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1183 request_id,
1184 client_id,
1185 instrument.id(),
1186 instrument,
1187 start_nanos,
1188 end_nanos,
1189 clock.get_time_ns(),
1190 params,
1191 )));
1192
1193 if let Err(e) = sender.send(DataEvent::Response(response)) {
1194 tracing::error!("Failed to send instrument response: {e}");
1195 }
1196 }
1197 Err(e) => tracing::error!("Instrument request failed: {e:?}"),
1198 }
1199 });
1200
1201 Ok(())
1202 }
1203
1204 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
1205 let http = self.http_client.clone();
1206 let sender = self.data_sender.clone();
1207 let instrument_id = request.instrument_id;
1208 let start = request.start;
1209 let end = request.end;
1210 let limit = request.limit.map(|n| n.get() as u32);
1211 let request_id = request.request_id;
1212 let client_id = request.client_id.unwrap_or(self.client_id);
1213 let params = request.params.clone();
1214 let clock = self.clock;
1215 let start_nanos = datetime_to_unix_nanos(start);
1216 let end_nanos = datetime_to_unix_nanos(end);
1217
1218 tokio::spawn(async move {
1219 match http
1220 .request_trades(instrument_id, start, end, limit)
1221 .await
1222 .context("failed to request trades from OKX")
1223 {
1224 Ok(trades) => {
1225 let response = DataResponse::Trades(TradesResponse::new(
1226 request_id,
1227 client_id,
1228 instrument_id,
1229 trades,
1230 start_nanos,
1231 end_nanos,
1232 clock.get_time_ns(),
1233 params,
1234 ));
1235 if let Err(e) = sender.send(DataEvent::Response(response)) {
1236 tracing::error!("Failed to send trades response: {e}");
1237 }
1238 }
1239 Err(e) => tracing::error!("Trade request failed: {e:?}"),
1240 }
1241 });
1242
1243 Ok(())
1244 }
1245
1246 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1247 let http = self.http_client.clone();
1248 let sender = self.data_sender.clone();
1249 let bar_type = request.bar_type;
1250 let start = request.start;
1251 let end = request.end;
1252 let limit = request.limit.map(|n| n.get() as u32);
1253 let request_id = request.request_id;
1254 let client_id = request.client_id.unwrap_or(self.client_id);
1255 let params = request.params.clone();
1256 let clock = self.clock;
1257 let start_nanos = datetime_to_unix_nanos(start);
1258 let end_nanos = datetime_to_unix_nanos(end);
1259
1260 tokio::spawn(async move {
1261 match http
1262 .request_bars(bar_type, start, end, limit)
1263 .await
1264 .context("failed to request bars from OKX")
1265 {
1266 Ok(bars) => {
1267 let response = DataResponse::Bars(BarsResponse::new(
1268 request_id,
1269 client_id,
1270 bar_type,
1271 bars,
1272 start_nanos,
1273 end_nanos,
1274 clock.get_time_ns(),
1275 params,
1276 ));
1277 if let Err(e) = sender.send(DataEvent::Response(response)) {
1278 tracing::error!("Failed to send bars response: {e}");
1279 }
1280 }
1281 Err(e) => tracing::error!("Bar request failed: {e:?}"),
1282 }
1283 });
1284
1285 Ok(())
1286 }
1287}