nautilus_okx/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the OKX adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use chrono::{DateTime, Utc};
29use futures_util::StreamExt;
30use nautilus_common::{
31    messages::{
32        DataEvent,
33        data::{
34            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
36            SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeFundingRates,
37            SubscribeIndexPrices, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
38            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
39            UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeMarkPrices,
40            UnsubscribeQuotes, UnsubscribeTrades,
41        },
42    },
43    runner::get_data_event_sender,
44};
45use nautilus_core::{
46    MUTEX_POISONED, UnixNanos,
47    time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_data::client::DataClient;
50use nautilus_model::{
51    data::{Data, FundingRateUpdate, OrderBookDeltas_API},
52    enums::BookType,
53    identifiers::{ClientId, InstrumentId, Venue},
54    instruments::{Instrument, InstrumentAny},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58
59use crate::{
60    common::{
61        consts::OKX_VENUE,
62        enums::{OKXBookChannel, OKXContractType, OKXInstrumentType, OKXVipLevel},
63    },
64    config::OKXDataClientConfig,
65    http::client::OKXHttpClient,
66    websocket::{client::OKXWebSocketClient, messages::NautilusWsMessage},
67};
68
69#[derive(Debug)]
70pub struct OKXDataClient {
71    client_id: ClientId,
72    config: OKXDataClientConfig,
73    http_client: OKXHttpClient,
74    ws_public: Option<OKXWebSocketClient>,
75    ws_business: Option<OKXWebSocketClient>,
76    is_connected: AtomicBool,
77    cancellation_token: CancellationToken,
78    tasks: Vec<JoinHandle<()>>,
79    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
80    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
81    book_channels: Arc<RwLock<AHashMap<InstrumentId, OKXBookChannel>>>,
82    clock: &'static AtomicTime,
83    instrument_refresh_active: bool,
84}
85
86impl OKXDataClient {
87    /// Creates a new [`OKXDataClient`] instance.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if the client fails to initialize.
92    pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
93        let clock = get_atomic_clock_realtime();
94        let data_sender = get_data_event_sender();
95
96        let http_client = if config.has_api_credentials() {
97            OKXHttpClient::with_credentials(
98                config.api_key.clone(),
99                config.api_secret.clone(),
100                config.api_passphrase.clone(),
101                config.base_url_http.clone(),
102                config.http_timeout_secs,
103                config.max_retries,
104                config.retry_delay_initial_ms,
105                config.retry_delay_max_ms,
106                config.is_demo,
107            )?
108        } else {
109            OKXHttpClient::new(
110                config.base_url_http.clone(),
111                config.http_timeout_secs,
112                config.max_retries,
113                config.retry_delay_initial_ms,
114                config.retry_delay_max_ms,
115                config.is_demo,
116            )?
117        };
118
119        let ws_public =
120            OKXWebSocketClient::new(Some(config.ws_public_url()), None, None, None, None, None)
121                .context("failed to construct OKX public websocket client")?;
122
123        let ws_business = if config.requires_business_ws() {
124            Some(
125                OKXWebSocketClient::new(
126                    Some(config.ws_business_url()),
127                    config.api_key.clone(),
128                    config.api_secret.clone(),
129                    config.api_passphrase.clone(),
130                    None,
131                    None,
132                )
133                .context("failed to construct OKX business websocket client")?,
134            )
135        } else {
136            None
137        };
138
139        if let Some(vip_level) = config.vip_level {
140            ws_public.set_vip_level(vip_level);
141            if let Some(ref ws) = ws_business {
142                ws.set_vip_level(vip_level);
143            }
144        }
145
146        Ok(Self {
147            client_id,
148            config,
149            http_client,
150            ws_public: Some(ws_public),
151            ws_business,
152            is_connected: AtomicBool::new(false),
153            cancellation_token: CancellationToken::new(),
154            tasks: Vec::new(),
155            data_sender,
156            instruments: Arc::new(RwLock::new(AHashMap::new())),
157            book_channels: Arc::new(RwLock::new(AHashMap::new())),
158            clock,
159            instrument_refresh_active: false,
160        })
161    }
162
163    fn venue(&self) -> Venue {
164        *OKX_VENUE
165    }
166
167    fn vip_level(&self) -> Option<OKXVipLevel> {
168        self.ws_public.as_ref().map(|ws| ws.vip_level())
169    }
170
171    fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
172        self.ws_public
173            .as_ref()
174            .context("public websocket client not initialized")
175    }
176
177    fn public_ws_mut(&mut self) -> anyhow::Result<&mut OKXWebSocketClient> {
178        self.ws_public
179            .as_mut()
180            .context("public websocket client not initialized")
181    }
182
183    fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
184        self.ws_business
185            .as_ref()
186            .context("business websocket client not available (credentials required)")
187    }
188
189    fn business_ws_mut(&mut self) -> anyhow::Result<&mut OKXWebSocketClient> {
190        self.ws_business
191            .as_mut()
192            .context("business websocket client not available (credentials required)")
193    }
194
195    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
196        if let Err(e) = sender.send(DataEvent::Data(data)) {
197            tracing::error!("Failed to emit data event: {e}");
198        }
199    }
200
201    fn spawn_ws<F>(&self, fut: F, context: &'static str)
202    where
203        F: Future<Output = anyhow::Result<()>> + Send + 'static,
204    {
205        tokio::spawn(async move {
206            if let Err(e) = fut.await {
207                tracing::error!("{context}: {e:?}");
208            }
209        });
210    }
211
212    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
213        let instrument_types = if self.config.instrument_types.is_empty() {
214            vec![OKXInstrumentType::Spot]
215        } else {
216            self.config.instrument_types.clone()
217        };
218
219        let mut collected: Vec<InstrumentAny> = Vec::new();
220
221        for inst_type in instrument_types {
222            let mut instruments = self
223                .http_client
224                .request_instruments(inst_type, None)
225                .await
226                .with_context(|| format!("failed to load instruments for {inst_type:?}"))?;
227            instruments.retain(|instrument| self.contract_filter(instrument));
228            tracing::debug!(
229                "loaded {count} instruments for {inst_type:?}",
230                count = instruments.len()
231            );
232            collected.extend(instruments);
233        }
234
235        if collected.is_empty() {
236            tracing::warn!("No OKX instruments were loaded");
237            return Ok(collected);
238        }
239
240        self.http_client.add_instruments(collected.clone());
241
242        if let Some(ws) = self.ws_public.as_mut() {
243            ws.initialize_instruments_cache(collected.clone());
244        }
245        if let Some(ws) = self.ws_business.as_mut() {
246            ws.initialize_instruments_cache(collected.clone());
247        }
248
249        {
250            let mut guard = self
251                .instruments
252                .write()
253                .expect("instrument cache lock poisoned");
254            guard.clear();
255            for instrument in &collected {
256                guard.insert(instrument.id(), instrument.clone());
257            }
258        }
259
260        Ok(collected)
261    }
262
263    fn contract_filter(&self, instrument: &InstrumentAny) -> bool {
264        contract_filter_with_config(&self.config, instrument)
265    }
266
267    fn handle_ws_message(
268        message: NautilusWsMessage,
269        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
270        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
271    ) {
272        match message {
273            NautilusWsMessage::Data(payloads) => {
274                for data in payloads {
275                    Self::send_data(data_sender, data);
276                }
277            }
278            NautilusWsMessage::Deltas(deltas) => {
279                Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
280            }
281            NautilusWsMessage::FundingRates(updates) => {
282                emit_funding_rates(updates);
283            }
284            NautilusWsMessage::Instrument(instrument) => {
285                upsert_instrument(instruments, *instrument);
286            }
287            NautilusWsMessage::AccountUpdate(_)
288            | NautilusWsMessage::OrderRejected(_)
289            | NautilusWsMessage::OrderCancelRejected(_)
290            | NautilusWsMessage::OrderModifyRejected(_)
291            | NautilusWsMessage::ExecutionReports(_) => {
292                tracing::debug!("Ignoring trading message on data client");
293            }
294            NautilusWsMessage::Error(e) => {
295                tracing::error!("OKX websocket error: {e:?}");
296            }
297            NautilusWsMessage::Raw(value) => {
298                tracing::debug!("Unhandled websocket payload: {value:?}");
299            }
300            NautilusWsMessage::Reconnected => {
301                tracing::info!("Websocket reconnected");
302            }
303        }
304    }
305
306    fn spawn_public_stream(&mut self) -> anyhow::Result<()> {
307        let ws = self.public_ws_mut()?;
308        let stream = ws.stream();
309        self.spawn_stream_task(stream)
310    }
311
312    fn spawn_business_stream(&mut self) -> anyhow::Result<()> {
313        if self.ws_business.is_none() {
314            return Ok(());
315        }
316
317        let ws = self.business_ws_mut()?;
318        let stream = ws.stream();
319        self.spawn_stream_task(stream)
320    }
321
322    fn spawn_stream_task(
323        &mut self,
324        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
325    ) -> anyhow::Result<()> {
326        let data_sender = self.data_sender.clone();
327        let instruments = self.instruments.clone();
328        let cancellation = self.cancellation_token.clone();
329
330        let handle = tokio::spawn(async move {
331            tokio::pin!(stream);
332
333            loop {
334                tokio::select! {
335                    maybe_msg = stream.next() => {
336                        match maybe_msg {
337                            Some(msg) => {
338                                Self::handle_ws_message(msg, &data_sender, &instruments);
339                            }
340                            None => {
341                                tracing::debug!("Websocket stream ended");
342                                break;
343                            }
344                        }
345                    }
346                    _ = cancellation.cancelled() => {
347                        tracing::debug!("Websocket stream task cancelled");
348                        break;
349                    }
350                }
351            }
352        });
353
354        self.tasks.push(handle);
355        Ok(())
356    }
357
358    fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
359        let Some(minutes) = self.config.update_instruments_interval_mins else {
360            return Ok(());
361        };
362
363        if minutes == 0 || self.instrument_refresh_active {
364            return Ok(());
365        }
366
367        let interval_secs = minutes.saturating_mul(60);
368        if interval_secs == 0 {
369            return Ok(());
370        }
371
372        let interval = Duration::from_secs(interval_secs);
373        let cancellation = self.cancellation_token.clone();
374        let instruments_cache = Arc::clone(&self.instruments);
375        let mut http_client = self.http_client.clone();
376        let config = self.config.clone();
377        let client_id = self.client_id;
378
379        let handle = tokio::spawn(async move {
380            loop {
381                let sleep = tokio::time::sleep(interval);
382                tokio::pin!(sleep);
383                tokio::select! {
384                    _ = cancellation.cancelled() => {
385                        tracing::debug!("OKX instrument refresh task cancelled");
386                        break;
387                    }
388                    _ = &mut sleep => {
389                        let instrument_types = if config.instrument_types.is_empty() {
390                            vec![OKXInstrumentType::Spot]
391                        } else {
392                            config.instrument_types.clone()
393                        };
394
395                        let mut collected: Vec<InstrumentAny> = Vec::new();
396
397                        for inst_type in instrument_types {
398                            match http_client.request_instruments(inst_type, None).await {
399                                Ok(mut instruments) => {
400                                    instruments.retain(|instrument| contract_filter_with_config(&config, instrument));
401                                    collected.extend(instruments);
402                                }
403                                Err(e) => {
404                                    tracing::warn!(client_id=%client_id, instrument_type=?inst_type, error=?e, "Failed to refresh OKX instruments for type");
405                                }
406                            }
407                        }
408
409                        if collected.is_empty() {
410                            tracing::debug!(client_id=%client_id, "OKX instrument refresh yielded no instruments");
411                            continue;
412                        }
413
414                        http_client.add_instruments(collected.clone());
415
416                        {
417                            let mut guard = instruments_cache
418                                .write()
419                                .expect("instrument cache lock poisoned");
420                            guard.clear();
421                            for instrument in &collected {
422                                guard.insert(instrument.id(), instrument.clone());
423                            }
424                        }
425
426                        tracing::debug!(client_id=%client_id, count=collected.len(), "OKX instruments refreshed");
427                    }
428                }
429            }
430        });
431
432        self.tasks.push(handle);
433        self.instrument_refresh_active = true;
434        Ok(())
435    }
436}
437
438fn emit_funding_rates(updates: Vec<FundingRateUpdate>) {
439    if updates.is_empty() {
440        return;
441    }
442
443    for update in updates {
444        tracing::debug!(
445            "Received funding rate update for {} but forwarding is not yet supported",
446            update.instrument_id
447        );
448    }
449}
450
451fn upsert_instrument(
452    cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
453    instrument: InstrumentAny,
454) {
455    let mut guard = cache.write().expect(MUTEX_POISONED);
456    guard.insert(instrument.id(), instrument);
457}
458
459fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
460    value
461        .and_then(|dt| dt.timestamp_nanos_opt())
462        .and_then(|nanos| u64::try_from(nanos).ok())
463        .map(UnixNanos::from)
464}
465
466fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
467    match config.contract_types.as_ref() {
468        None => true,
469        Some(filter) if filter.is_empty() => true,
470        Some(filter) => {
471            let is_inverse = instrument.is_inverse();
472            (is_inverse && filter.contains(&OKXContractType::Inverse))
473                || (!is_inverse && filter.contains(&OKXContractType::Linear))
474        }
475    }
476}
477
478#[async_trait::async_trait]
479impl DataClient for OKXDataClient {
480    fn client_id(&self) -> ClientId {
481        self.client_id
482    }
483
484    fn venue(&self) -> Option<Venue> {
485        Some(self.venue())
486    }
487
488    fn start(&mut self) -> anyhow::Result<()> {
489        tracing::info!(
490            client_id = %self.client_id,
491            vip_level = ?self.vip_level(),
492            instrument_types = ?self.config.instrument_types,
493            is_demo = self.config.is_demo,
494            "Starting OKX data client"
495        );
496        Ok(())
497    }
498
499    fn stop(&mut self) -> anyhow::Result<()> {
500        tracing::info!("Stopping OKX data client {id}", id = self.client_id);
501        self.cancellation_token.cancel();
502        self.is_connected.store(false, Ordering::Relaxed);
503        self.instrument_refresh_active = false;
504        Ok(())
505    }
506
507    fn reset(&mut self) -> anyhow::Result<()> {
508        tracing::debug!("Resetting OKX data client {id}", id = self.client_id);
509        self.is_connected.store(false, Ordering::Relaxed);
510        self.cancellation_token = CancellationToken::new();
511        self.tasks.clear();
512        self.book_channels
513            .write()
514            .expect("book channel cache lock poisoned")
515            .clear();
516        self.instrument_refresh_active = false;
517        Ok(())
518    }
519
520    fn dispose(&mut self) -> anyhow::Result<()> {
521        tracing::debug!("Disposing OKX data client {id}", id = self.client_id);
522        self.stop()
523    }
524
525    async fn connect(&mut self) -> anyhow::Result<()> {
526        if self.is_connected() {
527            return Ok(());
528        }
529
530        self.bootstrap_instruments().await?;
531
532        {
533            let ws_public = self.public_ws_mut()?;
534            ws_public
535                .connect()
536                .await
537                .context("failed to connect OKX public websocket")?;
538            ws_public
539                .wait_until_active(10.0)
540                .await
541                .context("public websocket did not become active")?;
542        }
543
544        let instrument_types = if self.config.instrument_types.is_empty() {
545            vec![OKXInstrumentType::Spot]
546        } else {
547            self.config.instrument_types.clone()
548        };
549
550        let public_clone = self.public_ws()?.clone();
551        self.spawn_ws(
552            async move {
553                for inst_type in instrument_types {
554                    public_clone
555                        .subscribe_instruments(inst_type)
556                        .await
557                        .with_context(|| {
558                            format!("failed to subscribe to instrument type {inst_type:?}")
559                        })?;
560                }
561                Ok(())
562            },
563            "instrument subscription",
564        );
565
566        self.spawn_public_stream()?;
567
568        if self.ws_business.is_some() {
569            {
570                let ws_business = self.business_ws_mut()?;
571                ws_business
572                    .connect()
573                    .await
574                    .context("failed to connect OKX business websocket")?;
575                ws_business
576                    .wait_until_active(10.0)
577                    .await
578                    .context("business websocket did not become active")?;
579            }
580            self.spawn_business_stream()?;
581        }
582
583        self.maybe_spawn_instrument_refresh()?;
584
585        self.is_connected.store(true, Ordering::Relaxed);
586        tracing::info!("OKX data client connected");
587        Ok(())
588    }
589
590    async fn disconnect(&mut self) -> anyhow::Result<()> {
591        if self.is_disconnected() {
592            return Ok(());
593        }
594
595        self.cancellation_token.cancel();
596
597        if let Some(ws) = self.ws_public.as_ref()
598            && let Err(e) = ws.unsubscribe_all().await
599        {
600            tracing::warn!("Failed to unsubscribe all from public websocket: {e:?}");
601        }
602        if let Some(ws) = self.ws_business.as_ref()
603            && let Err(e) = ws.unsubscribe_all().await
604        {
605            tracing::warn!("Failed to unsubscribe all from business websocket: {e:?}");
606        }
607
608        // Brief delay to allow unsubscribe confirmations to be processed
609        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
610
611        if let Some(ws) = self.ws_public.as_mut() {
612            let _ = ws.close().await;
613        }
614        if let Some(ws) = self.ws_business.as_mut() {
615            let _ = ws.close().await;
616        }
617
618        for handle in self.tasks.drain(..) {
619            if let Err(e) = handle.await {
620                tracing::error!("Error joining websocket task: {e}");
621            }
622        }
623
624        self.cancellation_token = CancellationToken::new();
625        self.is_connected.store(false, Ordering::Relaxed);
626        self.book_channels
627            .write()
628            .expect("book channel cache lock poisoned")
629            .clear();
630        self.instrument_refresh_active = false;
631        tracing::info!("OKX data client disconnected");
632        Ok(())
633    }
634
635    fn is_connected(&self) -> bool {
636        self.is_connected.load(Ordering::Relaxed)
637    }
638
639    fn is_disconnected(&self) -> bool {
640        !self.is_connected()
641    }
642
643    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
644        if cmd.book_type != BookType::L2_MBP {
645            anyhow::bail!("OKX only supports L2_MBP order book deltas");
646        }
647
648        let depth = cmd.depth.map_or(0, |d| d.get());
649        if !matches!(depth, 0 | 50 | 400) {
650            anyhow::bail!("invalid depth {depth}; valid values are 50 or 400");
651        }
652
653        let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
654        let channel = match depth {
655            50 => {
656                if vip < OKXVipLevel::Vip4 {
657                    anyhow::bail!(
658                        "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
659                    );
660                }
661                OKXBookChannel::Books50L2Tbt
662            }
663            0 | 400 => {
664                if vip >= OKXVipLevel::Vip5 {
665                    OKXBookChannel::BookL2Tbt
666                } else {
667                    OKXBookChannel::Book
668                }
669            }
670            _ => unreachable!(),
671        };
672
673        let instrument_id = cmd.instrument_id;
674        let ws = self.public_ws()?.clone();
675        let book_channels = Arc::clone(&self.book_channels);
676        self.spawn_ws(
677            async move {
678                match channel {
679                    OKXBookChannel::Books50L2Tbt => ws
680                        .subscribe_book50_l2_tbt(instrument_id)
681                        .await
682                        .context("books50-l2-tbt subscription")?,
683                    OKXBookChannel::BookL2Tbt => ws
684                        .subscribe_book_l2_tbt(instrument_id)
685                        .await
686                        .context("books-l2-tbt subscription")?,
687                    OKXBookChannel::Book => ws
688                        .subscribe_books_channel(instrument_id)
689                        .await
690                        .context("books subscription")?,
691                }
692                book_channels
693                    .write()
694                    .expect("book channel cache lock poisoned")
695                    .insert(instrument_id, channel);
696                Ok(())
697            },
698            "order book delta subscription",
699        );
700
701        Ok(())
702    }
703
704    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
705        if cmd.book_type != BookType::L2_MBP {
706            anyhow::bail!("OKX only supports L2_MBP order book snapshots");
707        }
708        let depth = cmd.depth.map_or(5, |d| d.get());
709        if depth != 5 {
710            anyhow::bail!("OKX only supports depth=5 snapshots");
711        }
712
713        let ws = self.public_ws()?.clone();
714        let instrument_id = cmd.instrument_id;
715        self.spawn_ws(
716            async move {
717                ws.subscribe_book_depth5(instrument_id)
718                    .await
719                    .context("books5 subscription")
720            },
721            "order book snapshot subscription",
722        );
723        Ok(())
724    }
725
726    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
727        let ws = self.public_ws()?.clone();
728        let instrument_id = cmd.instrument_id;
729        self.spawn_ws(
730            async move {
731                ws.subscribe_quotes(instrument_id)
732                    .await
733                    .context("quotes subscription")
734            },
735            "quote subscription",
736        );
737        Ok(())
738    }
739
740    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
741        let ws = self.public_ws()?.clone();
742        let instrument_id = cmd.instrument_id;
743        self.spawn_ws(
744            async move {
745                ws.subscribe_trades(instrument_id, false)
746                    .await
747                    .context("trades subscription")
748            },
749            "trade subscription",
750        );
751        Ok(())
752    }
753
754    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
755        let ws = self.public_ws()?.clone();
756        let instrument_id = cmd.instrument_id;
757        self.spawn_ws(
758            async move {
759                ws.subscribe_mark_prices(instrument_id)
760                    .await
761                    .context("mark price subscription")
762            },
763            "mark price subscription",
764        );
765        Ok(())
766    }
767
768    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
769        let ws = self.public_ws()?.clone();
770        let instrument_id = cmd.instrument_id;
771        self.spawn_ws(
772            async move {
773                ws.subscribe_index_prices(instrument_id)
774                    .await
775                    .context("index price subscription")
776            },
777            "index price subscription",
778        );
779        Ok(())
780    }
781
782    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
783        let ws = self.public_ws()?.clone();
784        let instrument_id = cmd.instrument_id;
785        self.spawn_ws(
786            async move {
787                ws.subscribe_funding_rates(instrument_id)
788                    .await
789                    .context("funding rate subscription")
790            },
791            "funding rate subscription",
792        );
793        Ok(())
794    }
795
796    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
797        let ws = self.business_ws()?.clone();
798        let bar_type = cmd.bar_type;
799        self.spawn_ws(
800            async move {
801                ws.subscribe_bars(bar_type)
802                    .await
803                    .context("bars subscription")
804            },
805            "bar subscription",
806        );
807        Ok(())
808    }
809
810    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
811        let ws = self.public_ws()?.clone();
812        let instrument_id = cmd.instrument_id;
813        let channel = self
814            .book_channels
815            .write()
816            .expect("book channel cache lock poisoned")
817            .remove(&instrument_id);
818        self.spawn_ws(
819            async move {
820                match channel {
821                    Some(OKXBookChannel::Books50L2Tbt) => ws
822                        .unsubscribe_book50_l2_tbt(instrument_id)
823                        .await
824                        .context("books50-l2-tbt unsubscribe")?,
825                    Some(OKXBookChannel::BookL2Tbt) => ws
826                        .unsubscribe_book_l2_tbt(instrument_id)
827                        .await
828                        .context("books-l2-tbt unsubscribe")?,
829                    Some(OKXBookChannel::Book) => ws
830                        .unsubscribe_book(instrument_id)
831                        .await
832                        .context("book unsubscribe")?,
833                    None => {
834                        tracing::warn!(
835                            "Book channel not found for {instrument_id}; unsubscribing fallback channel"
836                        );
837                        ws.unsubscribe_book(instrument_id)
838                            .await
839                            .context("book fallback unsubscribe")?;
840                    }
841                }
842                Ok(())
843            },
844            "order book unsubscribe",
845        );
846        Ok(())
847    }
848
849    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
850        let ws = self.public_ws()?.clone();
851        let instrument_id = cmd.instrument_id;
852        self.spawn_ws(
853            async move {
854                ws.unsubscribe_book_depth5(instrument_id)
855                    .await
856                    .context("book depth5 unsubscribe")
857            },
858            "order book snapshot unsubscribe",
859        );
860        Ok(())
861    }
862
863    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
864        let ws = self.public_ws()?.clone();
865        let instrument_id = cmd.instrument_id;
866        self.spawn_ws(
867            async move {
868                ws.unsubscribe_quotes(instrument_id)
869                    .await
870                    .context("quotes unsubscribe")
871            },
872            "quote unsubscribe",
873        );
874        Ok(())
875    }
876
877    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
878        let ws = self.public_ws()?.clone();
879        let instrument_id = cmd.instrument_id;
880        self.spawn_ws(
881            async move {
882                ws.unsubscribe_trades(instrument_id, false) // TODO: Aggregated trades?
883                    .await
884                    .context("trades unsubscribe")
885            },
886            "trade unsubscribe",
887        );
888        Ok(())
889    }
890
891    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
892        let ws = self.public_ws()?.clone();
893        let instrument_id = cmd.instrument_id;
894        self.spawn_ws(
895            async move {
896                ws.unsubscribe_mark_prices(instrument_id)
897                    .await
898                    .context("mark price unsubscribe")
899            },
900            "mark price unsubscribe",
901        );
902        Ok(())
903    }
904
905    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
906        let ws = self.public_ws()?.clone();
907        let instrument_id = cmd.instrument_id;
908        self.spawn_ws(
909            async move {
910                ws.unsubscribe_index_prices(instrument_id)
911                    .await
912                    .context("index price unsubscribe")
913            },
914            "index price unsubscribe",
915        );
916        Ok(())
917    }
918
919    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
920        let ws = self.public_ws()?.clone();
921        let instrument_id = cmd.instrument_id;
922        self.spawn_ws(
923            async move {
924                ws.unsubscribe_funding_rates(instrument_id)
925                    .await
926                    .context("funding rate unsubscribe")
927            },
928            "funding rate unsubscribe",
929        );
930        Ok(())
931    }
932
933    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
934        let ws = self.business_ws()?.clone();
935        let bar_type = cmd.bar_type;
936        self.spawn_ws(
937            async move {
938                ws.unsubscribe_bars(bar_type)
939                    .await
940                    .context("bars unsubscribe")
941            },
942            "bar unsubscribe",
943        );
944        Ok(())
945    }
946
947    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
948        let instruments = {
949            let guard = self
950                .instruments
951                .read()
952                .expect("instrument cache lock poisoned");
953            guard.values().cloned().collect::<Vec<_>>()
954        };
955
956        let response = DataResponse::Instruments(InstrumentsResponse::new(
957            request.request_id,
958            request.client_id.unwrap_or(self.client_id),
959            self.venue(),
960            instruments,
961            datetime_to_unix_nanos(request.start),
962            datetime_to_unix_nanos(request.end),
963            self.clock.get_time_ns(),
964            request.params.clone(),
965        ));
966
967        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
968            tracing::error!("Failed to send instruments response: {e}");
969        }
970
971        Ok(())
972    }
973
974    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
975        let instrument = {
976            let guard = self
977                .instruments
978                .read()
979                .expect("instrument cache lock poisoned");
980            guard
981                .get(&request.instrument_id)
982                .cloned()
983                .context("instrument not found in cache")?
984        };
985
986        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
987            request.request_id,
988            request.client_id.unwrap_or(self.client_id),
989            instrument.id(),
990            instrument,
991            datetime_to_unix_nanos(request.start),
992            datetime_to_unix_nanos(request.end),
993            self.clock.get_time_ns(),
994            request.params.clone(),
995        )));
996
997        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
998            tracing::error!("Failed to send instrument response: {e}");
999        }
1000
1001        Ok(())
1002    }
1003
1004    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
1005        let http = self.http_client.clone();
1006        let sender = self.data_sender.clone();
1007        let instrument_id = request.instrument_id;
1008        let start = request.start;
1009        let end = request.end;
1010        let limit = request.limit.map(|n| n.get() as u32);
1011        let request_id = request.request_id;
1012        let client_id = request.client_id.unwrap_or(self.client_id);
1013        let params = request.params.clone();
1014        let clock = self.clock;
1015        let start_nanos = datetime_to_unix_nanos(start);
1016        let end_nanos = datetime_to_unix_nanos(end);
1017
1018        tokio::spawn(async move {
1019            match http
1020                .request_trades(instrument_id, start, end, limit)
1021                .await
1022                .context("failed to request trades from OKX")
1023            {
1024                Ok(trades) => {
1025                    let response = DataResponse::Trades(TradesResponse::new(
1026                        request_id,
1027                        client_id,
1028                        instrument_id,
1029                        trades,
1030                        start_nanos,
1031                        end_nanos,
1032                        clock.get_time_ns(),
1033                        params,
1034                    ));
1035                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1036                        tracing::error!("Failed to send trades response: {e}");
1037                    }
1038                }
1039                Err(e) => tracing::error!("Trade request failed: {e:?}"),
1040            }
1041        });
1042
1043        Ok(())
1044    }
1045
1046    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1047        let http = self.http_client.clone();
1048        let sender = self.data_sender.clone();
1049        let bar_type = request.bar_type;
1050        let start = request.start;
1051        let end = request.end;
1052        let limit = request.limit.map(|n| n.get() as u32);
1053        let request_id = request.request_id;
1054        let client_id = request.client_id.unwrap_or(self.client_id);
1055        let params = request.params.clone();
1056        let clock = self.clock;
1057        let start_nanos = datetime_to_unix_nanos(start);
1058        let end_nanos = datetime_to_unix_nanos(end);
1059
1060        tokio::spawn(async move {
1061            match http
1062                .request_bars(bar_type, start, end, limit)
1063                .await
1064                .context("failed to request bars from OKX")
1065            {
1066                Ok(bars) => {
1067                    let response = DataResponse::Bars(BarsResponse::new(
1068                        request_id,
1069                        client_id,
1070                        bar_type,
1071                        bars,
1072                        start_nanos,
1073                        end_nanos,
1074                        clock.get_time_ns(),
1075                        params,
1076                    ));
1077                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1078                        tracing::error!("Failed to send bars response: {e}");
1079                    }
1080                }
1081                Err(e) => tracing::error!("Bar request failed: {e:?}"),
1082            }
1083        });
1084
1085        Ok(())
1086    }
1087}