1use std::{
19 future::Future,
20 sync::{
21 Arc, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use chrono::{DateTime, Utc};
29use futures_util::StreamExt;
30use nautilus_common::{
31 messages::{
32 DataEvent,
33 data::{
34 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
36 SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeFundingRates,
37 SubscribeIndexPrices, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
38 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
39 UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeMarkPrices,
40 UnsubscribeQuotes, UnsubscribeTrades,
41 },
42 },
43 runner::get_data_event_sender,
44};
45use nautilus_core::{
46 MUTEX_POISONED, UnixNanos,
47 time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_data::client::DataClient;
50use nautilus_model::{
51 data::{Data, FundingRateUpdate, OrderBookDeltas_API},
52 enums::BookType,
53 identifiers::{ClientId, InstrumentId, Venue},
54 instruments::{Instrument, InstrumentAny},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58
59use crate::{
60 common::{
61 consts::OKX_VENUE,
62 enums::{OKXBookChannel, OKXContractType, OKXInstrumentType, OKXVipLevel},
63 },
64 config::OKXDataClientConfig,
65 http::client::OKXHttpClient,
66 websocket::{client::OKXWebSocketClient, messages::NautilusWsMessage},
67};
68
69#[derive(Debug)]
70pub struct OKXDataClient {
71 client_id: ClientId,
72 config: OKXDataClientConfig,
73 http_client: OKXHttpClient,
74 ws_public: Option<OKXWebSocketClient>,
75 ws_business: Option<OKXWebSocketClient>,
76 is_connected: AtomicBool,
77 cancellation_token: CancellationToken,
78 tasks: Vec<JoinHandle<()>>,
79 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
80 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
81 book_channels: Arc<RwLock<AHashMap<InstrumentId, OKXBookChannel>>>,
82 clock: &'static AtomicTime,
83 instrument_refresh_active: bool,
84}
85
86impl OKXDataClient {
87 pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
93 let clock = get_atomic_clock_realtime();
94 let data_sender = get_data_event_sender();
95
96 let http_client = if config.has_api_credentials() {
97 OKXHttpClient::with_credentials(
98 config.api_key.clone(),
99 config.api_secret.clone(),
100 config.api_passphrase.clone(),
101 config.base_url_http.clone(),
102 config.http_timeout_secs,
103 config.max_retries,
104 config.retry_delay_initial_ms,
105 config.retry_delay_max_ms,
106 config.is_demo,
107 )?
108 } else {
109 OKXHttpClient::new(
110 config.base_url_http.clone(),
111 config.http_timeout_secs,
112 config.max_retries,
113 config.retry_delay_initial_ms,
114 config.retry_delay_max_ms,
115 config.is_demo,
116 )?
117 };
118
119 let ws_public =
120 OKXWebSocketClient::new(Some(config.ws_public_url()), None, None, None, None, None)
121 .context("failed to construct OKX public websocket client")?;
122
123 let ws_business = if config.requires_business_ws() {
124 Some(
125 OKXWebSocketClient::new(
126 Some(config.ws_business_url()),
127 config.api_key.clone(),
128 config.api_secret.clone(),
129 config.api_passphrase.clone(),
130 None,
131 None,
132 )
133 .context("failed to construct OKX business websocket client")?,
134 )
135 } else {
136 None
137 };
138
139 if let Some(vip_level) = config.vip_level {
140 ws_public.set_vip_level(vip_level);
141 if let Some(ref ws) = ws_business {
142 ws.set_vip_level(vip_level);
143 }
144 }
145
146 Ok(Self {
147 client_id,
148 config,
149 http_client,
150 ws_public: Some(ws_public),
151 ws_business,
152 is_connected: AtomicBool::new(false),
153 cancellation_token: CancellationToken::new(),
154 tasks: Vec::new(),
155 data_sender,
156 instruments: Arc::new(RwLock::new(AHashMap::new())),
157 book_channels: Arc::new(RwLock::new(AHashMap::new())),
158 clock,
159 instrument_refresh_active: false,
160 })
161 }
162
163 fn venue(&self) -> Venue {
164 *OKX_VENUE
165 }
166
167 fn vip_level(&self) -> Option<OKXVipLevel> {
168 self.ws_public.as_ref().map(|ws| ws.vip_level())
169 }
170
171 fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
172 self.ws_public
173 .as_ref()
174 .context("public websocket client not initialized")
175 }
176
177 fn public_ws_mut(&mut self) -> anyhow::Result<&mut OKXWebSocketClient> {
178 self.ws_public
179 .as_mut()
180 .context("public websocket client not initialized")
181 }
182
183 fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
184 self.ws_business
185 .as_ref()
186 .context("business websocket client not available (credentials required)")
187 }
188
189 fn business_ws_mut(&mut self) -> anyhow::Result<&mut OKXWebSocketClient> {
190 self.ws_business
191 .as_mut()
192 .context("business websocket client not available (credentials required)")
193 }
194
195 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
196 if let Err(e) = sender.send(DataEvent::Data(data)) {
197 tracing::error!("Failed to emit data event: {e}");
198 }
199 }
200
201 fn spawn_ws<F>(&self, fut: F, context: &'static str)
202 where
203 F: Future<Output = anyhow::Result<()>> + Send + 'static,
204 {
205 tokio::spawn(async move {
206 if let Err(e) = fut.await {
207 tracing::error!("{context}: {e:?}");
208 }
209 });
210 }
211
212 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
213 let instrument_types = if self.config.instrument_types.is_empty() {
214 vec![OKXInstrumentType::Spot]
215 } else {
216 self.config.instrument_types.clone()
217 };
218
219 let mut collected: Vec<InstrumentAny> = Vec::new();
220
221 for inst_type in instrument_types {
222 let mut instruments = self
223 .http_client
224 .request_instruments(inst_type, None)
225 .await
226 .with_context(|| format!("failed to load instruments for {inst_type:?}"))?;
227 instruments.retain(|instrument| self.contract_filter(instrument));
228 tracing::debug!(
229 "loaded {count} instruments for {inst_type:?}",
230 count = instruments.len()
231 );
232 collected.extend(instruments);
233 }
234
235 if collected.is_empty() {
236 tracing::warn!("No OKX instruments were loaded");
237 return Ok(collected);
238 }
239
240 self.http_client.add_instruments(collected.clone());
241
242 if let Some(ws) = self.ws_public.as_mut() {
243 ws.initialize_instruments_cache(collected.clone());
244 }
245 if let Some(ws) = self.ws_business.as_mut() {
246 ws.initialize_instruments_cache(collected.clone());
247 }
248
249 {
250 let mut guard = self
251 .instruments
252 .write()
253 .expect("instrument cache lock poisoned");
254 guard.clear();
255 for instrument in &collected {
256 guard.insert(instrument.id(), instrument.clone());
257 }
258 }
259
260 Ok(collected)
261 }
262
263 fn contract_filter(&self, instrument: &InstrumentAny) -> bool {
264 contract_filter_with_config(&self.config, instrument)
265 }
266
267 fn handle_ws_message(
268 message: NautilusWsMessage,
269 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
270 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
271 ) {
272 match message {
273 NautilusWsMessage::Data(payloads) => {
274 for data in payloads {
275 Self::send_data(data_sender, data);
276 }
277 }
278 NautilusWsMessage::Deltas(deltas) => {
279 Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
280 }
281 NautilusWsMessage::FundingRates(updates) => {
282 emit_funding_rates(updates);
283 }
284 NautilusWsMessage::Instrument(instrument) => {
285 upsert_instrument(instruments, *instrument);
286 }
287 NautilusWsMessage::AccountUpdate(_)
288 | NautilusWsMessage::OrderRejected(_)
289 | NautilusWsMessage::OrderCancelRejected(_)
290 | NautilusWsMessage::OrderModifyRejected(_)
291 | NautilusWsMessage::ExecutionReports(_) => {
292 tracing::debug!("Ignoring trading message on data client");
293 }
294 NautilusWsMessage::Error(e) => {
295 tracing::error!("OKX websocket error: {e:?}");
296 }
297 NautilusWsMessage::Raw(value) => {
298 tracing::debug!("Unhandled websocket payload: {value:?}");
299 }
300 NautilusWsMessage::Reconnected => {
301 tracing::info!("Websocket reconnected");
302 }
303 }
304 }
305
306 fn spawn_public_stream(&mut self) -> anyhow::Result<()> {
307 let ws = self.public_ws_mut()?;
308 let stream = ws.stream();
309 self.spawn_stream_task(stream)
310 }
311
312 fn spawn_business_stream(&mut self) -> anyhow::Result<()> {
313 if self.ws_business.is_none() {
314 return Ok(());
315 }
316
317 let ws = self.business_ws_mut()?;
318 let stream = ws.stream();
319 self.spawn_stream_task(stream)
320 }
321
322 fn spawn_stream_task(
323 &mut self,
324 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
325 ) -> anyhow::Result<()> {
326 let data_sender = self.data_sender.clone();
327 let instruments = self.instruments.clone();
328 let cancellation = self.cancellation_token.clone();
329
330 let handle = tokio::spawn(async move {
331 tokio::pin!(stream);
332
333 loop {
334 tokio::select! {
335 maybe_msg = stream.next() => {
336 match maybe_msg {
337 Some(msg) => {
338 Self::handle_ws_message(msg, &data_sender, &instruments);
339 }
340 None => {
341 tracing::debug!("Websocket stream ended");
342 break;
343 }
344 }
345 }
346 _ = cancellation.cancelled() => {
347 tracing::debug!("Websocket stream task cancelled");
348 break;
349 }
350 }
351 }
352 });
353
354 self.tasks.push(handle);
355 Ok(())
356 }
357
358 fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
359 let Some(minutes) = self.config.update_instruments_interval_mins else {
360 return Ok(());
361 };
362
363 if minutes == 0 || self.instrument_refresh_active {
364 return Ok(());
365 }
366
367 let interval_secs = minutes.saturating_mul(60);
368 if interval_secs == 0 {
369 return Ok(());
370 }
371
372 let interval = Duration::from_secs(interval_secs);
373 let cancellation = self.cancellation_token.clone();
374 let instruments_cache = Arc::clone(&self.instruments);
375 let mut http_client = self.http_client.clone();
376 let config = self.config.clone();
377 let client_id = self.client_id;
378
379 let handle = tokio::spawn(async move {
380 loop {
381 let sleep = tokio::time::sleep(interval);
382 tokio::pin!(sleep);
383 tokio::select! {
384 _ = cancellation.cancelled() => {
385 tracing::debug!("OKX instrument refresh task cancelled");
386 break;
387 }
388 _ = &mut sleep => {
389 let instrument_types = if config.instrument_types.is_empty() {
390 vec![OKXInstrumentType::Spot]
391 } else {
392 config.instrument_types.clone()
393 };
394
395 let mut collected: Vec<InstrumentAny> = Vec::new();
396
397 for inst_type in instrument_types {
398 match http_client.request_instruments(inst_type, None).await {
399 Ok(mut instruments) => {
400 instruments.retain(|instrument| contract_filter_with_config(&config, instrument));
401 collected.extend(instruments);
402 }
403 Err(e) => {
404 tracing::warn!(client_id=%client_id, instrument_type=?inst_type, error=?e, "Failed to refresh OKX instruments for type");
405 }
406 }
407 }
408
409 if collected.is_empty() {
410 tracing::debug!(client_id=%client_id, "OKX instrument refresh yielded no instruments");
411 continue;
412 }
413
414 http_client.add_instruments(collected.clone());
415
416 {
417 let mut guard = instruments_cache
418 .write()
419 .expect("instrument cache lock poisoned");
420 guard.clear();
421 for instrument in &collected {
422 guard.insert(instrument.id(), instrument.clone());
423 }
424 }
425
426 tracing::debug!(client_id=%client_id, count=collected.len(), "OKX instruments refreshed");
427 }
428 }
429 }
430 });
431
432 self.tasks.push(handle);
433 self.instrument_refresh_active = true;
434 Ok(())
435 }
436}
437
438fn emit_funding_rates(updates: Vec<FundingRateUpdate>) {
439 if updates.is_empty() {
440 return;
441 }
442
443 for update in updates {
444 tracing::debug!(
445 "Received funding rate update for {} but forwarding is not yet supported",
446 update.instrument_id
447 );
448 }
449}
450
451fn upsert_instrument(
452 cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
453 instrument: InstrumentAny,
454) {
455 let mut guard = cache.write().expect(MUTEX_POISONED);
456 guard.insert(instrument.id(), instrument);
457}
458
459fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
460 value
461 .and_then(|dt| dt.timestamp_nanos_opt())
462 .and_then(|nanos| u64::try_from(nanos).ok())
463 .map(UnixNanos::from)
464}
465
466fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
467 match config.contract_types.as_ref() {
468 None => true,
469 Some(filter) if filter.is_empty() => true,
470 Some(filter) => {
471 let is_inverse = instrument.is_inverse();
472 (is_inverse && filter.contains(&OKXContractType::Inverse))
473 || (!is_inverse && filter.contains(&OKXContractType::Linear))
474 }
475 }
476}
477
478#[async_trait::async_trait]
479impl DataClient for OKXDataClient {
480 fn client_id(&self) -> ClientId {
481 self.client_id
482 }
483
484 fn venue(&self) -> Option<Venue> {
485 Some(self.venue())
486 }
487
488 fn start(&mut self) -> anyhow::Result<()> {
489 tracing::info!(
490 client_id = %self.client_id,
491 vip_level = ?self.vip_level(),
492 instrument_types = ?self.config.instrument_types,
493 is_demo = self.config.is_demo,
494 "Starting OKX data client"
495 );
496 Ok(())
497 }
498
499 fn stop(&mut self) -> anyhow::Result<()> {
500 tracing::info!("Stopping OKX data client {id}", id = self.client_id);
501 self.cancellation_token.cancel();
502 self.is_connected.store(false, Ordering::Relaxed);
503 self.instrument_refresh_active = false;
504 Ok(())
505 }
506
507 fn reset(&mut self) -> anyhow::Result<()> {
508 tracing::debug!("Resetting OKX data client {id}", id = self.client_id);
509 self.is_connected.store(false, Ordering::Relaxed);
510 self.cancellation_token = CancellationToken::new();
511 self.tasks.clear();
512 self.book_channels
513 .write()
514 .expect("book channel cache lock poisoned")
515 .clear();
516 self.instrument_refresh_active = false;
517 Ok(())
518 }
519
520 fn dispose(&mut self) -> anyhow::Result<()> {
521 tracing::debug!("Disposing OKX data client {id}", id = self.client_id);
522 self.stop()
523 }
524
525 async fn connect(&mut self) -> anyhow::Result<()> {
526 if self.is_connected() {
527 return Ok(());
528 }
529
530 self.bootstrap_instruments().await?;
531
532 {
533 let ws_public = self.public_ws_mut()?;
534 ws_public
535 .connect()
536 .await
537 .context("failed to connect OKX public websocket")?;
538 ws_public
539 .wait_until_active(10.0)
540 .await
541 .context("public websocket did not become active")?;
542 }
543
544 let instrument_types = if self.config.instrument_types.is_empty() {
545 vec![OKXInstrumentType::Spot]
546 } else {
547 self.config.instrument_types.clone()
548 };
549
550 let public_clone = self.public_ws()?.clone();
551 self.spawn_ws(
552 async move {
553 for inst_type in instrument_types {
554 public_clone
555 .subscribe_instruments(inst_type)
556 .await
557 .with_context(|| {
558 format!("failed to subscribe to instrument type {inst_type:?}")
559 })?;
560 }
561 Ok(())
562 },
563 "instrument subscription",
564 );
565
566 self.spawn_public_stream()?;
567
568 if self.ws_business.is_some() {
569 {
570 let ws_business = self.business_ws_mut()?;
571 ws_business
572 .connect()
573 .await
574 .context("failed to connect OKX business websocket")?;
575 ws_business
576 .wait_until_active(10.0)
577 .await
578 .context("business websocket did not become active")?;
579 }
580 self.spawn_business_stream()?;
581 }
582
583 self.maybe_spawn_instrument_refresh()?;
584
585 self.is_connected.store(true, Ordering::Relaxed);
586 tracing::info!("OKX data client connected");
587 Ok(())
588 }
589
590 async fn disconnect(&mut self) -> anyhow::Result<()> {
591 if self.is_disconnected() {
592 return Ok(());
593 }
594
595 self.cancellation_token.cancel();
596
597 if let Some(ws) = self.ws_public.as_ref()
598 && let Err(e) = ws.unsubscribe_all().await
599 {
600 tracing::warn!("Failed to unsubscribe all from public websocket: {e:?}");
601 }
602 if let Some(ws) = self.ws_business.as_ref()
603 && let Err(e) = ws.unsubscribe_all().await
604 {
605 tracing::warn!("Failed to unsubscribe all from business websocket: {e:?}");
606 }
607
608 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
610
611 if let Some(ws) = self.ws_public.as_mut() {
612 let _ = ws.close().await;
613 }
614 if let Some(ws) = self.ws_business.as_mut() {
615 let _ = ws.close().await;
616 }
617
618 for handle in self.tasks.drain(..) {
619 if let Err(e) = handle.await {
620 tracing::error!("Error joining websocket task: {e}");
621 }
622 }
623
624 self.cancellation_token = CancellationToken::new();
625 self.is_connected.store(false, Ordering::Relaxed);
626 self.book_channels
627 .write()
628 .expect("book channel cache lock poisoned")
629 .clear();
630 self.instrument_refresh_active = false;
631 tracing::info!("OKX data client disconnected");
632 Ok(())
633 }
634
635 fn is_connected(&self) -> bool {
636 self.is_connected.load(Ordering::Relaxed)
637 }
638
639 fn is_disconnected(&self) -> bool {
640 !self.is_connected()
641 }
642
643 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
644 if cmd.book_type != BookType::L2_MBP {
645 anyhow::bail!("OKX only supports L2_MBP order book deltas");
646 }
647
648 let depth = cmd.depth.map_or(0, |d| d.get());
649 if !matches!(depth, 0 | 50 | 400) {
650 anyhow::bail!("invalid depth {depth}; valid values are 50 or 400");
651 }
652
653 let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
654 let channel = match depth {
655 50 => {
656 if vip < OKXVipLevel::Vip4 {
657 anyhow::bail!(
658 "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
659 );
660 }
661 OKXBookChannel::Books50L2Tbt
662 }
663 0 | 400 => {
664 if vip >= OKXVipLevel::Vip5 {
665 OKXBookChannel::BookL2Tbt
666 } else {
667 OKXBookChannel::Book
668 }
669 }
670 _ => unreachable!(),
671 };
672
673 let instrument_id = cmd.instrument_id;
674 let ws = self.public_ws()?.clone();
675 let book_channels = Arc::clone(&self.book_channels);
676 self.spawn_ws(
677 async move {
678 match channel {
679 OKXBookChannel::Books50L2Tbt => ws
680 .subscribe_book50_l2_tbt(instrument_id)
681 .await
682 .context("books50-l2-tbt subscription")?,
683 OKXBookChannel::BookL2Tbt => ws
684 .subscribe_book_l2_tbt(instrument_id)
685 .await
686 .context("books-l2-tbt subscription")?,
687 OKXBookChannel::Book => ws
688 .subscribe_books_channel(instrument_id)
689 .await
690 .context("books subscription")?,
691 }
692 book_channels
693 .write()
694 .expect("book channel cache lock poisoned")
695 .insert(instrument_id, channel);
696 Ok(())
697 },
698 "order book delta subscription",
699 );
700
701 Ok(())
702 }
703
704 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
705 if cmd.book_type != BookType::L2_MBP {
706 anyhow::bail!("OKX only supports L2_MBP order book snapshots");
707 }
708 let depth = cmd.depth.map_or(5, |d| d.get());
709 if depth != 5 {
710 anyhow::bail!("OKX only supports depth=5 snapshots");
711 }
712
713 let ws = self.public_ws()?.clone();
714 let instrument_id = cmd.instrument_id;
715 self.spawn_ws(
716 async move {
717 ws.subscribe_book_depth5(instrument_id)
718 .await
719 .context("books5 subscription")
720 },
721 "order book snapshot subscription",
722 );
723 Ok(())
724 }
725
726 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
727 let ws = self.public_ws()?.clone();
728 let instrument_id = cmd.instrument_id;
729 self.spawn_ws(
730 async move {
731 ws.subscribe_quotes(instrument_id)
732 .await
733 .context("quotes subscription")
734 },
735 "quote subscription",
736 );
737 Ok(())
738 }
739
740 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
741 let ws = self.public_ws()?.clone();
742 let instrument_id = cmd.instrument_id;
743 self.spawn_ws(
744 async move {
745 ws.subscribe_trades(instrument_id, false)
746 .await
747 .context("trades subscription")
748 },
749 "trade subscription",
750 );
751 Ok(())
752 }
753
754 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
755 let ws = self.public_ws()?.clone();
756 let instrument_id = cmd.instrument_id;
757 self.spawn_ws(
758 async move {
759 ws.subscribe_mark_prices(instrument_id)
760 .await
761 .context("mark price subscription")
762 },
763 "mark price subscription",
764 );
765 Ok(())
766 }
767
768 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
769 let ws = self.public_ws()?.clone();
770 let instrument_id = cmd.instrument_id;
771 self.spawn_ws(
772 async move {
773 ws.subscribe_index_prices(instrument_id)
774 .await
775 .context("index price subscription")
776 },
777 "index price subscription",
778 );
779 Ok(())
780 }
781
782 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
783 let ws = self.public_ws()?.clone();
784 let instrument_id = cmd.instrument_id;
785 self.spawn_ws(
786 async move {
787 ws.subscribe_funding_rates(instrument_id)
788 .await
789 .context("funding rate subscription")
790 },
791 "funding rate subscription",
792 );
793 Ok(())
794 }
795
796 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
797 let ws = self.business_ws()?.clone();
798 let bar_type = cmd.bar_type;
799 self.spawn_ws(
800 async move {
801 ws.subscribe_bars(bar_type)
802 .await
803 .context("bars subscription")
804 },
805 "bar subscription",
806 );
807 Ok(())
808 }
809
810 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
811 let ws = self.public_ws()?.clone();
812 let instrument_id = cmd.instrument_id;
813 let channel = self
814 .book_channels
815 .write()
816 .expect("book channel cache lock poisoned")
817 .remove(&instrument_id);
818 self.spawn_ws(
819 async move {
820 match channel {
821 Some(OKXBookChannel::Books50L2Tbt) => ws
822 .unsubscribe_book50_l2_tbt(instrument_id)
823 .await
824 .context("books50-l2-tbt unsubscribe")?,
825 Some(OKXBookChannel::BookL2Tbt) => ws
826 .unsubscribe_book_l2_tbt(instrument_id)
827 .await
828 .context("books-l2-tbt unsubscribe")?,
829 Some(OKXBookChannel::Book) => ws
830 .unsubscribe_book(instrument_id)
831 .await
832 .context("book unsubscribe")?,
833 None => {
834 tracing::warn!(
835 "Book channel not found for {instrument_id}; unsubscribing fallback channel"
836 );
837 ws.unsubscribe_book(instrument_id)
838 .await
839 .context("book fallback unsubscribe")?;
840 }
841 }
842 Ok(())
843 },
844 "order book unsubscribe",
845 );
846 Ok(())
847 }
848
849 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
850 let ws = self.public_ws()?.clone();
851 let instrument_id = cmd.instrument_id;
852 self.spawn_ws(
853 async move {
854 ws.unsubscribe_book_depth5(instrument_id)
855 .await
856 .context("book depth5 unsubscribe")
857 },
858 "order book snapshot unsubscribe",
859 );
860 Ok(())
861 }
862
863 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
864 let ws = self.public_ws()?.clone();
865 let instrument_id = cmd.instrument_id;
866 self.spawn_ws(
867 async move {
868 ws.unsubscribe_quotes(instrument_id)
869 .await
870 .context("quotes unsubscribe")
871 },
872 "quote unsubscribe",
873 );
874 Ok(())
875 }
876
877 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
878 let ws = self.public_ws()?.clone();
879 let instrument_id = cmd.instrument_id;
880 self.spawn_ws(
881 async move {
882 ws.unsubscribe_trades(instrument_id, false) .await
884 .context("trades unsubscribe")
885 },
886 "trade unsubscribe",
887 );
888 Ok(())
889 }
890
891 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
892 let ws = self.public_ws()?.clone();
893 let instrument_id = cmd.instrument_id;
894 self.spawn_ws(
895 async move {
896 ws.unsubscribe_mark_prices(instrument_id)
897 .await
898 .context("mark price unsubscribe")
899 },
900 "mark price unsubscribe",
901 );
902 Ok(())
903 }
904
905 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
906 let ws = self.public_ws()?.clone();
907 let instrument_id = cmd.instrument_id;
908 self.spawn_ws(
909 async move {
910 ws.unsubscribe_index_prices(instrument_id)
911 .await
912 .context("index price unsubscribe")
913 },
914 "index price unsubscribe",
915 );
916 Ok(())
917 }
918
919 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
920 let ws = self.public_ws()?.clone();
921 let instrument_id = cmd.instrument_id;
922 self.spawn_ws(
923 async move {
924 ws.unsubscribe_funding_rates(instrument_id)
925 .await
926 .context("funding rate unsubscribe")
927 },
928 "funding rate unsubscribe",
929 );
930 Ok(())
931 }
932
933 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
934 let ws = self.business_ws()?.clone();
935 let bar_type = cmd.bar_type;
936 self.spawn_ws(
937 async move {
938 ws.unsubscribe_bars(bar_type)
939 .await
940 .context("bars unsubscribe")
941 },
942 "bar unsubscribe",
943 );
944 Ok(())
945 }
946
947 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
948 let instruments = {
949 let guard = self
950 .instruments
951 .read()
952 .expect("instrument cache lock poisoned");
953 guard.values().cloned().collect::<Vec<_>>()
954 };
955
956 let response = DataResponse::Instruments(InstrumentsResponse::new(
957 request.request_id,
958 request.client_id.unwrap_or(self.client_id),
959 self.venue(),
960 instruments,
961 datetime_to_unix_nanos(request.start),
962 datetime_to_unix_nanos(request.end),
963 self.clock.get_time_ns(),
964 request.params.clone(),
965 ));
966
967 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
968 tracing::error!("Failed to send instruments response: {e}");
969 }
970
971 Ok(())
972 }
973
974 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
975 let instrument = {
976 let guard = self
977 .instruments
978 .read()
979 .expect("instrument cache lock poisoned");
980 guard
981 .get(&request.instrument_id)
982 .cloned()
983 .context("instrument not found in cache")?
984 };
985
986 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
987 request.request_id,
988 request.client_id.unwrap_or(self.client_id),
989 instrument.id(),
990 instrument,
991 datetime_to_unix_nanos(request.start),
992 datetime_to_unix_nanos(request.end),
993 self.clock.get_time_ns(),
994 request.params.clone(),
995 )));
996
997 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
998 tracing::error!("Failed to send instrument response: {e}");
999 }
1000
1001 Ok(())
1002 }
1003
1004 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
1005 let http = self.http_client.clone();
1006 let sender = self.data_sender.clone();
1007 let instrument_id = request.instrument_id;
1008 let start = request.start;
1009 let end = request.end;
1010 let limit = request.limit.map(|n| n.get() as u32);
1011 let request_id = request.request_id;
1012 let client_id = request.client_id.unwrap_or(self.client_id);
1013 let params = request.params.clone();
1014 let clock = self.clock;
1015 let start_nanos = datetime_to_unix_nanos(start);
1016 let end_nanos = datetime_to_unix_nanos(end);
1017
1018 tokio::spawn(async move {
1019 match http
1020 .request_trades(instrument_id, start, end, limit)
1021 .await
1022 .context("failed to request trades from OKX")
1023 {
1024 Ok(trades) => {
1025 let response = DataResponse::Trades(TradesResponse::new(
1026 request_id,
1027 client_id,
1028 instrument_id,
1029 trades,
1030 start_nanos,
1031 end_nanos,
1032 clock.get_time_ns(),
1033 params,
1034 ));
1035 if let Err(e) = sender.send(DataEvent::Response(response)) {
1036 tracing::error!("Failed to send trades response: {e}");
1037 }
1038 }
1039 Err(e) => tracing::error!("Trade request failed: {e:?}"),
1040 }
1041 });
1042
1043 Ok(())
1044 }
1045
1046 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1047 let http = self.http_client.clone();
1048 let sender = self.data_sender.clone();
1049 let bar_type = request.bar_type;
1050 let start = request.start;
1051 let end = request.end;
1052 let limit = request.limit.map(|n| n.get() as u32);
1053 let request_id = request.request_id;
1054 let client_id = request.client_id.unwrap_or(self.client_id);
1055 let params = request.params.clone();
1056 let clock = self.clock;
1057 let start_nanos = datetime_to_unix_nanos(start);
1058 let end_nanos = datetime_to_unix_nanos(end);
1059
1060 tokio::spawn(async move {
1061 match http
1062 .request_bars(bar_type, start, end, limit)
1063 .await
1064 .context("failed to request bars from OKX")
1065 {
1066 Ok(bars) => {
1067 let response = DataResponse::Bars(BarsResponse::new(
1068 request_id,
1069 client_id,
1070 bar_type,
1071 bars,
1072 start_nanos,
1073 end_nanos,
1074 clock.get_time_ns(),
1075 params,
1076 ));
1077 if let Err(e) = sender.send(DataEvent::Response(response)) {
1078 tracing::error!("Failed to send bars response: {e}");
1079 }
1080 }
1081 Err(e) => tracing::error!("Bar request failed: {e:?}"),
1082 }
1083 });
1084
1085 Ok(())
1086 }
1087}