1use std::{
19 future::Future,
20 sync::{
21 Arc, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use chrono::{DateTime, Utc};
29use futures_util::StreamExt;
30use nautilus_common::{
31 messages::{
32 DataEvent,
33 data::{
34 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
36 SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeFundingRates,
37 SubscribeIndexPrices, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
38 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
39 UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeMarkPrices,
40 UnsubscribeQuotes, UnsubscribeTrades,
41 },
42 },
43 runner::get_data_event_sender,
44};
45use nautilus_core::{
46 UnixNanos,
47 time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_data::client::DataClient;
50use nautilus_model::{
51 data::{Data, FundingRateUpdate, OrderBookDeltas_API},
52 enums::BookType,
53 identifiers::{ClientId, InstrumentId, Venue},
54 instruments::{Instrument, InstrumentAny},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58
59use crate::{
60 common::{
61 consts::OKX_VENUE,
62 enums::{OKXBookChannel, OKXContractType, OKXInstrumentType, OKXVipLevel},
63 },
64 config::OKXDataClientConfig,
65 http::client::OKXHttpClient,
66 websocket::{client::OKXWebSocketClient, messages::NautilusWsMessage},
67};
68
69#[derive(Debug)]
70pub struct OKXDataClient {
71 client_id: ClientId,
72 config: OKXDataClientConfig,
73 http_client: OKXHttpClient,
74 ws_public: Option<OKXWebSocketClient>,
75 ws_business: Option<OKXWebSocketClient>,
76 is_connected: AtomicBool,
77 cancellation_token: CancellationToken,
78 tasks: Vec<JoinHandle<()>>,
79 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
80 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
81 book_channels: Arc<RwLock<AHashMap<InstrumentId, OKXBookChannel>>>,
82 clock: &'static AtomicTime,
83 instrument_refresh_active: bool,
84}
85
86impl OKXDataClient {
87 pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
93 let clock = get_atomic_clock_realtime();
94 let data_sender = get_data_event_sender();
95
96 let http_client = if config.has_api_credentials() {
97 OKXHttpClient::with_credentials(
98 config.api_key.clone(),
99 config.api_secret.clone(),
100 config.api_passphrase.clone(),
101 config.base_url_http.clone(),
102 config.http_timeout_secs,
103 None,
104 None,
105 None,
106 config.is_demo,
107 )?
108 } else {
109 OKXHttpClient::new(
110 config.base_url_http.clone(),
111 config.http_timeout_secs,
112 None,
113 None,
114 None,
115 config.is_demo,
116 )?
117 };
118
119 let ws_public =
120 OKXWebSocketClient::new(Some(config.ws_public_url()), None, None, None, None, None)
121 .context("failed to construct OKX public websocket client")?;
122
123 let ws_business = if config.requires_business_ws() {
124 Some(
125 OKXWebSocketClient::new(
126 Some(config.ws_business_url()),
127 config.api_key.clone(),
128 config.api_secret.clone(),
129 config.api_passphrase.clone(),
130 None,
131 None,
132 )
133 .context("failed to construct OKX business websocket client")?,
134 )
135 } else {
136 None
137 };
138
139 Ok(Self {
140 client_id,
141 config,
142 http_client,
143 ws_public: Some(ws_public),
144 ws_business,
145 is_connected: AtomicBool::new(false),
146 cancellation_token: CancellationToken::new(),
147 tasks: Vec::new(),
148 data_sender,
149 instruments: Arc::new(RwLock::new(AHashMap::new())),
150 book_channels: Arc::new(RwLock::new(AHashMap::new())),
151 clock,
152 instrument_refresh_active: false,
153 })
154 }
155
156 fn venue(&self) -> Venue {
157 *OKX_VENUE
158 }
159
160 fn vip_level(&self) -> Option<OKXVipLevel> {
161 self.ws_public.as_ref().map(|ws| ws.vip_level())
162 }
163
164 fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
165 self.ws_public
166 .as_ref()
167 .context("public websocket client not initialized")
168 }
169
170 fn public_ws_mut(&mut self) -> anyhow::Result<&mut OKXWebSocketClient> {
171 self.ws_public
172 .as_mut()
173 .context("public websocket client not initialized")
174 }
175
176 fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
177 self.ws_business
178 .as_ref()
179 .context("business websocket client not available (credentials required)")
180 }
181
182 fn business_ws_mut(&mut self) -> anyhow::Result<&mut OKXWebSocketClient> {
183 self.ws_business
184 .as_mut()
185 .context("business websocket client not available (credentials required)")
186 }
187
188 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
189 if let Err(err) = sender.send(DataEvent::Data(data)) {
190 tracing::error!("Failed to emit data event: {err}");
191 }
192 }
193
194 fn spawn_ws<F>(&self, fut: F, context: &'static str)
195 where
196 F: Future<Output = anyhow::Result<()>> + Send + 'static,
197 {
198 tokio::spawn(async move {
199 if let Err(err) = fut.await {
200 tracing::error!("{context}: {err:?}");
201 }
202 });
203 }
204
205 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
206 let instrument_types = if self.config.instrument_types.is_empty() {
207 vec![OKXInstrumentType::Spot]
208 } else {
209 self.config.instrument_types.clone()
210 };
211
212 let mut collected: Vec<InstrumentAny> = Vec::new();
213
214 for inst_type in instrument_types {
215 let mut instruments = self
216 .http_client
217 .request_instruments(inst_type, None)
218 .await
219 .with_context(|| format!("failed to load instruments for {inst_type:?}"))?;
220 instruments.retain(|instrument| self.contract_filter(instrument));
221 tracing::debug!(
222 "loaded {count} instruments for {inst_type:?}",
223 count = instruments.len()
224 );
225 collected.extend(instruments);
226 }
227
228 if collected.is_empty() {
229 tracing::warn!("No OKX instruments were loaded");
230 return Ok(collected);
231 }
232
233 self.http_client.add_instruments(collected.clone());
234
235 if let Some(ws) = self.ws_public.as_mut() {
236 ws.initialize_instruments_cache(collected.clone());
237 }
238 if let Some(ws) = self.ws_business.as_mut() {
239 ws.initialize_instruments_cache(collected.clone());
240 }
241
242 {
243 let mut guard = self
244 .instruments
245 .write()
246 .expect("instrument cache lock poisoned");
247 guard.clear();
248 for instrument in &collected {
249 guard.insert(instrument.id(), instrument.clone());
250 }
251 }
252
253 Ok(collected)
254 }
255
256 fn contract_filter(&self, instrument: &InstrumentAny) -> bool {
257 contract_filter_with_config(&self.config, instrument)
258 }
259
260 fn handle_ws_message(
261 message: NautilusWsMessage,
262 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
263 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
264 ) {
265 match message {
266 NautilusWsMessage::Data(payloads) => {
267 for data in payloads {
268 Self::send_data(data_sender, data);
269 }
270 }
271 NautilusWsMessage::Deltas(deltas) => {
272 Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
273 }
274 NautilusWsMessage::FundingRates(updates) => {
275 emit_funding_rates(updates);
276 }
277 NautilusWsMessage::Instrument(instrument) => {
278 upsert_instrument(instruments, *instrument);
279 }
280 NautilusWsMessage::AccountUpdate(_)
281 | NautilusWsMessage::OrderRejected(_)
282 | NautilusWsMessage::OrderCancelRejected(_)
283 | NautilusWsMessage::OrderModifyRejected(_)
284 | NautilusWsMessage::ExecutionReports(_) => {
285 tracing::debug!("Ignoring trading message on data client");
286 }
287 NautilusWsMessage::Error(err) => {
288 tracing::error!("OKX websocket error: {err:?}");
289 }
290 NautilusWsMessage::Raw(value) => {
291 tracing::debug!("Unhandled websocket payload: {value:?}");
292 }
293 NautilusWsMessage::Reconnected => {
294 tracing::info!("Websocket reconnected");
295 }
296 }
297 }
298
299 fn spawn_public_stream(&mut self) -> anyhow::Result<()> {
300 let ws = self.public_ws_mut()?;
301 let stream = ws.stream();
302 self.spawn_stream_task(stream)
303 }
304
305 fn spawn_business_stream(&mut self) -> anyhow::Result<()> {
306 if self.ws_business.is_none() {
307 return Ok(());
308 }
309
310 let ws = self.business_ws_mut()?;
311 let stream = ws.stream();
312 self.spawn_stream_task(stream)
313 }
314
315 fn spawn_stream_task(
316 &mut self,
317 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
318 ) -> anyhow::Result<()> {
319 let data_sender = self.data_sender.clone();
320 let instruments = self.instruments.clone();
321 let cancellation = self.cancellation_token.clone();
322
323 let handle = tokio::spawn(async move {
324 tokio::pin!(stream);
325
326 loop {
327 tokio::select! {
328 maybe_msg = stream.next() => {
329 match maybe_msg {
330 Some(msg) => {
331 Self::handle_ws_message(msg, &data_sender, &instruments);
332 }
333 None => {
334 tracing::debug!("Websocket stream ended");
335 break;
336 }
337 }
338 }
339 _ = cancellation.cancelled() => {
340 tracing::debug!("Websocket stream task cancelled");
341 break;
342 }
343 }
344 }
345 });
346
347 self.tasks.push(handle);
348 Ok(())
349 }
350
351 fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
352 let Some(minutes) = self.config.update_instruments_interval_mins else {
353 return Ok(());
354 };
355
356 if minutes == 0 || self.instrument_refresh_active {
357 return Ok(());
358 }
359
360 let interval_secs = minutes.saturating_mul(60);
361 if interval_secs == 0 {
362 return Ok(());
363 }
364
365 let interval = Duration::from_secs(interval_secs);
366 let cancellation = self.cancellation_token.clone();
367 let instruments_cache = Arc::clone(&self.instruments);
368 let mut http_client = self.http_client.clone();
369 let config = self.config.clone();
370 let client_id = self.client_id;
371
372 let handle = tokio::spawn(async move {
373 loop {
374 let sleep = tokio::time::sleep(interval);
375 tokio::pin!(sleep);
376 tokio::select! {
377 _ = cancellation.cancelled() => {
378 tracing::debug!("OKX instrument refresh task cancelled");
379 break;
380 }
381 _ = &mut sleep => {
382 let instrument_types = if config.instrument_types.is_empty() {
383 vec![OKXInstrumentType::Spot]
384 } else {
385 config.instrument_types.clone()
386 };
387
388 let mut collected: Vec<InstrumentAny> = Vec::new();
389
390 for inst_type in instrument_types {
391 match http_client.request_instruments(inst_type, None).await {
392 Ok(mut instruments) => {
393 instruments.retain(|instrument| contract_filter_with_config(&config, instrument));
394 collected.extend(instruments);
395 }
396 Err(err) => {
397 tracing::warn!(client_id=%client_id, instrument_type=?inst_type, error=?err, "Failed to refresh OKX instruments for type");
398 }
399 }
400 }
401
402 if collected.is_empty() {
403 tracing::debug!(client_id=%client_id, "OKX instrument refresh yielded no instruments");
404 continue;
405 }
406
407 http_client.add_instruments(collected.clone());
408
409 {
410 let mut guard = instruments_cache
411 .write()
412 .expect("instrument cache lock poisoned");
413 guard.clear();
414 for instrument in &collected {
415 guard.insert(instrument.id(), instrument.clone());
416 }
417 }
418
419 tracing::debug!(client_id=%client_id, count=collected.len(), "OKX instruments refreshed");
420 }
421 }
422 }
423 });
424
425 self.tasks.push(handle);
426 self.instrument_refresh_active = true;
427 Ok(())
428 }
429}
430
431fn emit_funding_rates(updates: Vec<FundingRateUpdate>) {
432 if updates.is_empty() {
433 return;
434 }
435
436 for update in updates {
437 tracing::debug!(
438 "Received funding rate update for {} but forwarding is not yet supported",
439 update.instrument_id
440 );
441 }
442}
443
444fn upsert_instrument(
445 cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
446 instrument: InstrumentAny,
447) {
448 let mut guard = cache.write().expect("instrument cache lock poisoned");
449 guard.insert(instrument.id(), instrument);
450}
451
452fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
453 value
454 .and_then(|dt| dt.timestamp_nanos_opt())
455 .and_then(|nanos| u64::try_from(nanos).ok())
456 .map(UnixNanos::from)
457}
458
459fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
460 match config.contract_types.as_ref() {
461 None => true,
462 Some(filter) if filter.is_empty() => true,
463 Some(filter) => {
464 let is_inverse = instrument.is_inverse();
465 (is_inverse && filter.contains(&OKXContractType::Inverse))
466 || (!is_inverse && filter.contains(&OKXContractType::Linear))
467 }
468 }
469}
470
471#[async_trait::async_trait]
472impl DataClient for OKXDataClient {
473 fn client_id(&self) -> ClientId {
474 self.client_id
475 }
476
477 fn venue(&self) -> Option<Venue> {
478 Some(self.venue())
479 }
480
481 fn start(&mut self) -> anyhow::Result<()> {
482 tracing::info!(
483 client_id = %self.client_id,
484 vip_level = ?self.vip_level(),
485 instrument_types = ?self.config.instrument_types,
486 is_demo = self.config.is_demo,
487 "Starting OKX data client"
488 );
489 Ok(())
490 }
491
492 fn stop(&mut self) -> anyhow::Result<()> {
493 tracing::info!("Stopping OKX data client {id}", id = self.client_id);
494 self.cancellation_token.cancel();
495 self.is_connected.store(false, Ordering::Relaxed);
496 self.instrument_refresh_active = false;
497 Ok(())
498 }
499
500 fn reset(&mut self) -> anyhow::Result<()> {
501 tracing::debug!("Resetting OKX data client {id}", id = self.client_id);
502 self.is_connected.store(false, Ordering::Relaxed);
503 self.cancellation_token = CancellationToken::new();
504 self.tasks.clear();
505 self.book_channels
506 .write()
507 .expect("book channel cache lock poisoned")
508 .clear();
509 self.instrument_refresh_active = false;
510 Ok(())
511 }
512
513 fn dispose(&mut self) -> anyhow::Result<()> {
514 tracing::debug!("Disposing OKX data client {id}", id = self.client_id);
515 self.stop()
516 }
517
518 async fn connect(&mut self) -> anyhow::Result<()> {
519 if self.is_connected() {
520 return Ok(());
521 }
522
523 self.bootstrap_instruments().await?;
524
525 if self.config.has_api_credentials()
527 && let Ok(Some(vip_level)) = self.http_client.request_vip_level().await
528 {
529 if let Some(ws) = self.ws_public.as_mut() {
530 ws.set_vip_level(vip_level);
531 }
532 if let Some(ws) = self.ws_business.as_mut() {
533 ws.set_vip_level(vip_level);
534 }
535 }
536
537 {
538 let ws_public = self.public_ws_mut()?;
539 ws_public
540 .connect()
541 .await
542 .context("failed to connect OKX public websocket")?;
543 ws_public
544 .wait_until_active(10.0)
545 .await
546 .context("public websocket did not become active")?;
547 }
548
549 let instrument_types = if self.config.instrument_types.is_empty() {
550 vec![OKXInstrumentType::Spot]
551 } else {
552 self.config.instrument_types.clone()
553 };
554
555 let public_clone = self.public_ws()?.clone();
556 self.spawn_ws(
557 async move {
558 for inst_type in instrument_types {
559 public_clone
560 .subscribe_instruments(inst_type)
561 .await
562 .with_context(|| {
563 format!("failed to subscribe to instrument type {inst_type:?}")
564 })?;
565 }
566 Ok(())
567 },
568 "instrument subscription",
569 );
570
571 self.spawn_public_stream()?;
572
573 if self.ws_business.is_some() {
574 {
575 let ws_business = self.business_ws_mut()?;
576 ws_business
577 .connect()
578 .await
579 .context("failed to connect OKX business websocket")?;
580 ws_business
581 .wait_until_active(10.0)
582 .await
583 .context("business websocket did not become active")?;
584 }
585 self.spawn_business_stream()?;
586 }
587
588 self.maybe_spawn_instrument_refresh()?;
589
590 self.is_connected.store(true, Ordering::Relaxed);
591 tracing::info!("OKX data client connected");
592 Ok(())
593 }
594
595 async fn disconnect(&mut self) -> anyhow::Result<()> {
596 if self.is_disconnected() {
597 return Ok(());
598 }
599
600 self.cancellation_token.cancel();
601
602 if let Some(ws) = self.ws_public.as_mut() {
603 let _ = ws.close().await;
604 }
605 if let Some(ws) = self.ws_business.as_mut() {
606 let _ = ws.close().await;
607 }
608
609 for handle in self.tasks.drain(..) {
610 if let Err(err) = handle.await {
611 tracing::error!("Error joining websocket task: {err}");
612 }
613 }
614
615 self.cancellation_token = CancellationToken::new();
616 self.is_connected.store(false, Ordering::Relaxed);
617 self.book_channels
618 .write()
619 .expect("book channel cache lock poisoned")
620 .clear();
621 self.instrument_refresh_active = false;
622 tracing::info!("OKX data client disconnected");
623 Ok(())
624 }
625
626 fn is_connected(&self) -> bool {
627 self.is_connected.load(Ordering::Relaxed)
628 }
629
630 fn is_disconnected(&self) -> bool {
631 !self.is_connected()
632 }
633
634 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
635 if cmd.book_type != BookType::L2_MBP {
636 anyhow::bail!("OKX only supports L2_MBP order book deltas");
637 }
638
639 let depth = cmd.depth.map(|d| d.get()).unwrap_or(0);
640 if !matches!(depth, 0 | 50 | 400) {
641 anyhow::bail!("invalid depth {depth}; valid values are 50 or 400");
642 }
643
644 let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
645 let channel = match depth {
646 50 => {
647 if vip < OKXVipLevel::Vip4 {
648 anyhow::bail!(
649 "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
650 );
651 }
652 OKXBookChannel::Books50L2Tbt
653 }
654 0 | 400 => {
655 if vip >= OKXVipLevel::Vip5 {
656 OKXBookChannel::BookL2Tbt
657 } else {
658 OKXBookChannel::Book
659 }
660 }
661 _ => unreachable!(),
662 };
663
664 let instrument_id = cmd.instrument_id;
665 let ws = self.public_ws()?.clone();
666 let book_channels = Arc::clone(&self.book_channels);
667 self.spawn_ws(
668 async move {
669 match channel {
670 OKXBookChannel::Books50L2Tbt => ws
671 .subscribe_book50_l2_tbt(instrument_id)
672 .await
673 .context("books50-l2-tbt subscription")?,
674 OKXBookChannel::BookL2Tbt => ws
675 .subscribe_book_l2_tbt(instrument_id)
676 .await
677 .context("books-l2-tbt subscription")?,
678 OKXBookChannel::Book => ws
679 .subscribe_books_channel(instrument_id)
680 .await
681 .context("books subscription")?,
682 }
683 book_channels
684 .write()
685 .expect("book channel cache lock poisoned")
686 .insert(instrument_id, channel);
687 Ok(())
688 },
689 "order book delta subscription",
690 );
691
692 Ok(())
693 }
694
695 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
696 if cmd.book_type != BookType::L2_MBP {
697 anyhow::bail!("OKX only supports L2_MBP order book snapshots");
698 }
699 let depth = cmd.depth.map(|d| d.get()).unwrap_or(5);
700 if depth != 5 {
701 anyhow::bail!("OKX only supports depth=5 snapshots");
702 }
703
704 let ws = self.public_ws()?.clone();
705 let instrument_id = cmd.instrument_id;
706 self.spawn_ws(
707 async move {
708 ws.subscribe_book_depth5(instrument_id)
709 .await
710 .context("books5 subscription")
711 },
712 "order book snapshot subscription",
713 );
714 Ok(())
715 }
716
717 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
718 let ws = self.public_ws()?.clone();
719 let instrument_id = cmd.instrument_id;
720 self.spawn_ws(
721 async move {
722 ws.subscribe_quotes(instrument_id)
723 .await
724 .context("quotes subscription")
725 },
726 "quote subscription",
727 );
728 Ok(())
729 }
730
731 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
732 let ws = self.public_ws()?.clone();
733 let instrument_id = cmd.instrument_id;
734 self.spawn_ws(
735 async move {
736 ws.subscribe_trades(instrument_id, false)
737 .await
738 .context("trades subscription")
739 },
740 "trade subscription",
741 );
742 Ok(())
743 }
744
745 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
746 let ws = self.public_ws()?.clone();
747 let instrument_id = cmd.instrument_id;
748 self.spawn_ws(
749 async move {
750 ws.subscribe_mark_prices(instrument_id)
751 .await
752 .context("mark price subscription")
753 },
754 "mark price subscription",
755 );
756 Ok(())
757 }
758
759 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
760 let ws = self.public_ws()?.clone();
761 let instrument_id = cmd.instrument_id;
762 self.spawn_ws(
763 async move {
764 ws.subscribe_index_prices(instrument_id)
765 .await
766 .context("index price subscription")
767 },
768 "index price subscription",
769 );
770 Ok(())
771 }
772
773 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
774 let ws = self.public_ws()?.clone();
775 let instrument_id = cmd.instrument_id;
776 self.spawn_ws(
777 async move {
778 ws.subscribe_funding_rates(instrument_id)
779 .await
780 .context("funding rate subscription")
781 },
782 "funding rate subscription",
783 );
784 Ok(())
785 }
786
787 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
788 let ws = self.business_ws()?.clone();
789 let bar_type = cmd.bar_type;
790 self.spawn_ws(
791 async move {
792 ws.subscribe_bars(bar_type)
793 .await
794 .context("bars subscription")
795 },
796 "bar subscription",
797 );
798 Ok(())
799 }
800
801 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
802 let ws = self.public_ws()?.clone();
803 let instrument_id = cmd.instrument_id;
804 let channel = self
805 .book_channels
806 .write()
807 .expect("book channel cache lock poisoned")
808 .remove(&instrument_id);
809 self.spawn_ws(
810 async move {
811 match channel {
812 Some(OKXBookChannel::Books50L2Tbt) => ws
813 .unsubscribe_book50_l2_tbt(instrument_id)
814 .await
815 .context("books50-l2-tbt unsubscribe")?,
816 Some(OKXBookChannel::BookL2Tbt) => ws
817 .unsubscribe_book_l2_tbt(instrument_id)
818 .await
819 .context("books-l2-tbt unsubscribe")?,
820 Some(OKXBookChannel::Book) => ws
821 .unsubscribe_book(instrument_id)
822 .await
823 .context("book unsubscribe")?,
824 None => {
825 tracing::warn!(
826 "Book channel not found for {instrument_id}; unsubscribing fallback channel"
827 );
828 ws.unsubscribe_book(instrument_id)
829 .await
830 .context("book fallback unsubscribe")?;
831 }
832 }
833 Ok(())
834 },
835 "order book unsubscribe",
836 );
837 Ok(())
838 }
839
840 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
841 let ws = self.public_ws()?.clone();
842 let instrument_id = cmd.instrument_id;
843 self.spawn_ws(
844 async move {
845 ws.unsubscribe_book_depth5(instrument_id)
846 .await
847 .context("book depth5 unsubscribe")
848 },
849 "order book snapshot unsubscribe",
850 );
851 Ok(())
852 }
853
854 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
855 let ws = self.public_ws()?.clone();
856 let instrument_id = cmd.instrument_id;
857 self.spawn_ws(
858 async move {
859 ws.unsubscribe_quotes(instrument_id)
860 .await
861 .context("quotes unsubscribe")
862 },
863 "quote unsubscribe",
864 );
865 Ok(())
866 }
867
868 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
869 let ws = self.public_ws()?.clone();
870 let instrument_id = cmd.instrument_id;
871 self.spawn_ws(
872 async move {
873 ws.unsubscribe_trades(instrument_id, false) .await
875 .context("trades unsubscribe")
876 },
877 "trade unsubscribe",
878 );
879 Ok(())
880 }
881
882 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
883 let ws = self.public_ws()?.clone();
884 let instrument_id = cmd.instrument_id;
885 self.spawn_ws(
886 async move {
887 ws.unsubscribe_mark_prices(instrument_id)
888 .await
889 .context("mark price unsubscribe")
890 },
891 "mark price unsubscribe",
892 );
893 Ok(())
894 }
895
896 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
897 let ws = self.public_ws()?.clone();
898 let instrument_id = cmd.instrument_id;
899 self.spawn_ws(
900 async move {
901 ws.unsubscribe_index_prices(instrument_id)
902 .await
903 .context("index price unsubscribe")
904 },
905 "index price unsubscribe",
906 );
907 Ok(())
908 }
909
910 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
911 let ws = self.public_ws()?.clone();
912 let instrument_id = cmd.instrument_id;
913 self.spawn_ws(
914 async move {
915 ws.unsubscribe_funding_rates(instrument_id)
916 .await
917 .context("funding rate unsubscribe")
918 },
919 "funding rate unsubscribe",
920 );
921 Ok(())
922 }
923
924 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
925 let ws = self.business_ws()?.clone();
926 let bar_type = cmd.bar_type;
927 self.spawn_ws(
928 async move {
929 ws.unsubscribe_bars(bar_type)
930 .await
931 .context("bars unsubscribe")
932 },
933 "bar unsubscribe",
934 );
935 Ok(())
936 }
937
938 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
939 let instruments = {
940 let guard = self
941 .instruments
942 .read()
943 .expect("instrument cache lock poisoned");
944 guard.values().cloned().collect::<Vec<_>>()
945 };
946
947 let response = DataResponse::Instruments(InstrumentsResponse::new(
948 request.request_id,
949 request.client_id.unwrap_or(self.client_id),
950 self.venue(),
951 instruments,
952 datetime_to_unix_nanos(request.start),
953 datetime_to_unix_nanos(request.end),
954 self.clock.get_time_ns(),
955 request.params.clone(),
956 ));
957
958 if let Err(err) = self.data_sender.send(DataEvent::Response(response)) {
959 tracing::error!("Failed to send instruments response: {err}");
960 }
961
962 Ok(())
963 }
964
965 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
966 let instrument = {
967 let guard = self
968 .instruments
969 .read()
970 .expect("instrument cache lock poisoned");
971 guard
972 .get(&request.instrument_id)
973 .cloned()
974 .context("instrument not found in cache")?
975 };
976
977 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
978 request.request_id,
979 request.client_id.unwrap_or(self.client_id),
980 instrument.id(),
981 instrument,
982 datetime_to_unix_nanos(request.start),
983 datetime_to_unix_nanos(request.end),
984 self.clock.get_time_ns(),
985 request.params.clone(),
986 )));
987
988 if let Err(err) = self.data_sender.send(DataEvent::Response(response)) {
989 tracing::error!("Failed to send instrument response: {err}");
990 }
991
992 Ok(())
993 }
994
995 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
996 let http = self.http_client.clone();
997 let sender = self.data_sender.clone();
998 let instrument_id = request.instrument_id;
999 let start = request.start;
1000 let end = request.end;
1001 let limit = request.limit.map(|n| n.get() as u32);
1002 let request_id = request.request_id;
1003 let client_id = request.client_id.unwrap_or(self.client_id);
1004 let params = request.params.clone();
1005 let clock = self.clock;
1006 let start_nanos = datetime_to_unix_nanos(start);
1007 let end_nanos = datetime_to_unix_nanos(end);
1008
1009 tokio::spawn(async move {
1010 match http
1011 .request_trades(instrument_id, start, end, limit)
1012 .await
1013 .context("failed to request trades from OKX")
1014 {
1015 Ok(trades) => {
1016 let response = DataResponse::Trades(TradesResponse::new(
1017 request_id,
1018 client_id,
1019 instrument_id,
1020 trades,
1021 start_nanos,
1022 end_nanos,
1023 clock.get_time_ns(),
1024 params,
1025 ));
1026 if let Err(err) = sender.send(DataEvent::Response(response)) {
1027 tracing::error!("Failed to send trades response: {err}");
1028 }
1029 }
1030 Err(err) => tracing::error!("Trade request failed: {err:?}"),
1031 }
1032 });
1033
1034 Ok(())
1035 }
1036
1037 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1038 let http = self.http_client.clone();
1039 let sender = self.data_sender.clone();
1040 let bar_type = request.bar_type;
1041 let start = request.start;
1042 let end = request.end;
1043 let limit = request.limit.map(|n| n.get() as u32);
1044 let request_id = request.request_id;
1045 let client_id = request.client_id.unwrap_or(self.client_id);
1046 let params = request.params.clone();
1047 let clock = self.clock;
1048 let start_nanos = datetime_to_unix_nanos(start);
1049 let end_nanos = datetime_to_unix_nanos(end);
1050
1051 tokio::spawn(async move {
1052 match http
1053 .request_bars(bar_type, start, end, limit)
1054 .await
1055 .context("failed to request bars from OKX")
1056 {
1057 Ok(bars) => {
1058 let response = DataResponse::Bars(BarsResponse::new(
1059 request_id,
1060 client_id,
1061 bar_type,
1062 bars,
1063 start_nanos,
1064 end_nanos,
1065 clock.get_time_ns(),
1066 params,
1067 ));
1068 if let Err(err) = sender.send(DataEvent::Response(response)) {
1069 tracing::error!("Failed to send bars response: {err}");
1070 }
1071 }
1072 Err(err) => tracing::error!("Bar request failed: {err:?}"),
1073 }
1074 });
1075
1076 Ok(())
1077 }
1078}