nautilus_okx/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the OKX adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use chrono::{DateTime, Utc};
29use futures_util::StreamExt;
30use nautilus_common::{
31    messages::{
32        DataEvent,
33        data::{
34            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
36            SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeFundingRates,
37            SubscribeIndexPrices, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
38            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
39            UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeMarkPrices,
40            UnsubscribeQuotes, UnsubscribeTrades,
41        },
42    },
43    runner::get_data_event_sender,
44};
45use nautilus_core::{
46    UnixNanos,
47    time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_data::client::DataClient;
50use nautilus_model::{
51    data::{Data, FundingRateUpdate, OrderBookDeltas_API},
52    enums::BookType,
53    identifiers::{ClientId, InstrumentId, Venue},
54    instruments::{Instrument, InstrumentAny},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58
59use crate::{
60    common::{
61        consts::OKX_VENUE,
62        enums::{OKXBookChannel, OKXContractType, OKXInstrumentType, OKXVipLevel},
63    },
64    config::OKXDataClientConfig,
65    http::client::OKXHttpClient,
66    websocket::{client::OKXWebSocketClient, messages::NautilusWsMessage},
67};
68
69#[derive(Debug)]
70pub struct OKXDataClient {
71    client_id: ClientId,
72    config: OKXDataClientConfig,
73    http_client: OKXHttpClient,
74    ws_public: Option<OKXWebSocketClient>,
75    ws_business: Option<OKXWebSocketClient>,
76    is_connected: AtomicBool,
77    cancellation_token: CancellationToken,
78    tasks: Vec<JoinHandle<()>>,
79    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
80    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
81    book_channels: Arc<RwLock<AHashMap<InstrumentId, OKXBookChannel>>>,
82    clock: &'static AtomicTime,
83    instrument_refresh_active: bool,
84}
85
86impl OKXDataClient {
87    /// Creates a new [`OKXDataClient`] instance.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if the client fails to initialize.
92    pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
93        let clock = get_atomic_clock_realtime();
94        let data_sender = get_data_event_sender();
95
96        let http_client = if config.has_api_credentials() {
97            OKXHttpClient::with_credentials(
98                config.api_key.clone(),
99                config.api_secret.clone(),
100                config.api_passphrase.clone(),
101                config.base_url_http.clone(),
102                config.http_timeout_secs,
103                None,
104                None,
105                None,
106                config.is_demo,
107            )?
108        } else {
109            OKXHttpClient::new(
110                config.base_url_http.clone(),
111                config.http_timeout_secs,
112                None,
113                None,
114                None,
115                config.is_demo,
116            )?
117        };
118
119        let ws_public =
120            OKXWebSocketClient::new(Some(config.ws_public_url()), None, None, None, None, None)
121                .context("failed to construct OKX public websocket client")?;
122
123        let ws_business = if config.requires_business_ws() {
124            Some(
125                OKXWebSocketClient::new(
126                    Some(config.ws_business_url()),
127                    config.api_key.clone(),
128                    config.api_secret.clone(),
129                    config.api_passphrase.clone(),
130                    None,
131                    None,
132                )
133                .context("failed to construct OKX business websocket client")?,
134            )
135        } else {
136            None
137        };
138
139        Ok(Self {
140            client_id,
141            config,
142            http_client,
143            ws_public: Some(ws_public),
144            ws_business,
145            is_connected: AtomicBool::new(false),
146            cancellation_token: CancellationToken::new(),
147            tasks: Vec::new(),
148            data_sender,
149            instruments: Arc::new(RwLock::new(AHashMap::new())),
150            book_channels: Arc::new(RwLock::new(AHashMap::new())),
151            clock,
152            instrument_refresh_active: false,
153        })
154    }
155
156    fn venue(&self) -> Venue {
157        *OKX_VENUE
158    }
159
160    fn vip_level(&self) -> Option<OKXVipLevel> {
161        self.ws_public.as_ref().map(|ws| ws.vip_level())
162    }
163
164    fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
165        self.ws_public
166            .as_ref()
167            .context("public websocket client not initialized")
168    }
169
170    fn public_ws_mut(&mut self) -> anyhow::Result<&mut OKXWebSocketClient> {
171        self.ws_public
172            .as_mut()
173            .context("public websocket client not initialized")
174    }
175
176    fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
177        self.ws_business
178            .as_ref()
179            .context("business websocket client not available (credentials required)")
180    }
181
182    fn business_ws_mut(&mut self) -> anyhow::Result<&mut OKXWebSocketClient> {
183        self.ws_business
184            .as_mut()
185            .context("business websocket client not available (credentials required)")
186    }
187
188    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
189        if let Err(err) = sender.send(DataEvent::Data(data)) {
190            tracing::error!("Failed to emit data event: {err}");
191        }
192    }
193
194    fn spawn_ws<F>(&self, fut: F, context: &'static str)
195    where
196        F: Future<Output = anyhow::Result<()>> + Send + 'static,
197    {
198        tokio::spawn(async move {
199            if let Err(err) = fut.await {
200                tracing::error!("{context}: {err:?}");
201            }
202        });
203    }
204
205    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
206        let instrument_types = if self.config.instrument_types.is_empty() {
207            vec![OKXInstrumentType::Spot]
208        } else {
209            self.config.instrument_types.clone()
210        };
211
212        let mut collected: Vec<InstrumentAny> = Vec::new();
213
214        for inst_type in instrument_types {
215            let mut instruments = self
216                .http_client
217                .request_instruments(inst_type, None)
218                .await
219                .with_context(|| format!("failed to load instruments for {inst_type:?}"))?;
220            instruments.retain(|instrument| self.contract_filter(instrument));
221            tracing::debug!(
222                "loaded {count} instruments for {inst_type:?}",
223                count = instruments.len()
224            );
225            collected.extend(instruments);
226        }
227
228        if collected.is_empty() {
229            tracing::warn!("No OKX instruments were loaded");
230            return Ok(collected);
231        }
232
233        self.http_client.add_instruments(collected.clone());
234
235        if let Some(ws) = self.ws_public.as_mut() {
236            ws.initialize_instruments_cache(collected.clone());
237        }
238        if let Some(ws) = self.ws_business.as_mut() {
239            ws.initialize_instruments_cache(collected.clone());
240        }
241
242        {
243            let mut guard = self
244                .instruments
245                .write()
246                .expect("instrument cache lock poisoned");
247            guard.clear();
248            for instrument in &collected {
249                guard.insert(instrument.id(), instrument.clone());
250            }
251        }
252
253        Ok(collected)
254    }
255
256    fn contract_filter(&self, instrument: &InstrumentAny) -> bool {
257        contract_filter_with_config(&self.config, instrument)
258    }
259
260    fn handle_ws_message(
261        message: NautilusWsMessage,
262        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
263        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
264    ) {
265        match message {
266            NautilusWsMessage::Data(payloads) => {
267                for data in payloads {
268                    Self::send_data(data_sender, data);
269                }
270            }
271            NautilusWsMessage::Deltas(deltas) => {
272                Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
273            }
274            NautilusWsMessage::FundingRates(updates) => {
275                emit_funding_rates(updates);
276            }
277            NautilusWsMessage::Instrument(instrument) => {
278                upsert_instrument(instruments, *instrument);
279            }
280            NautilusWsMessage::AccountUpdate(_)
281            | NautilusWsMessage::OrderRejected(_)
282            | NautilusWsMessage::OrderCancelRejected(_)
283            | NautilusWsMessage::OrderModifyRejected(_)
284            | NautilusWsMessage::ExecutionReports(_) => {
285                tracing::debug!("Ignoring trading message on data client");
286            }
287            NautilusWsMessage::Error(err) => {
288                tracing::error!("OKX websocket error: {err:?}");
289            }
290            NautilusWsMessage::Raw(value) => {
291                tracing::debug!("Unhandled websocket payload: {value:?}");
292            }
293            NautilusWsMessage::Reconnected => {
294                tracing::info!("Websocket reconnected");
295            }
296        }
297    }
298
299    fn spawn_public_stream(&mut self) -> anyhow::Result<()> {
300        let ws = self.public_ws_mut()?;
301        let stream = ws.stream();
302        self.spawn_stream_task(stream)
303    }
304
305    fn spawn_business_stream(&mut self) -> anyhow::Result<()> {
306        if self.ws_business.is_none() {
307            return Ok(());
308        }
309
310        let ws = self.business_ws_mut()?;
311        let stream = ws.stream();
312        self.spawn_stream_task(stream)
313    }
314
315    fn spawn_stream_task(
316        &mut self,
317        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
318    ) -> anyhow::Result<()> {
319        let data_sender = self.data_sender.clone();
320        let instruments = self.instruments.clone();
321        let cancellation = self.cancellation_token.clone();
322
323        let handle = tokio::spawn(async move {
324            tokio::pin!(stream);
325
326            loop {
327                tokio::select! {
328                    maybe_msg = stream.next() => {
329                        match maybe_msg {
330                            Some(msg) => {
331                                Self::handle_ws_message(msg, &data_sender, &instruments);
332                            }
333                            None => {
334                                tracing::debug!("Websocket stream ended");
335                                break;
336                            }
337                        }
338                    }
339                    _ = cancellation.cancelled() => {
340                        tracing::debug!("Websocket stream task cancelled");
341                        break;
342                    }
343                }
344            }
345        });
346
347        self.tasks.push(handle);
348        Ok(())
349    }
350
351    fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
352        let Some(minutes) = self.config.update_instruments_interval_mins else {
353            return Ok(());
354        };
355
356        if minutes == 0 || self.instrument_refresh_active {
357            return Ok(());
358        }
359
360        let interval_secs = minutes.saturating_mul(60);
361        if interval_secs == 0 {
362            return Ok(());
363        }
364
365        let interval = Duration::from_secs(interval_secs);
366        let cancellation = self.cancellation_token.clone();
367        let instruments_cache = Arc::clone(&self.instruments);
368        let mut http_client = self.http_client.clone();
369        let config = self.config.clone();
370        let client_id = self.client_id;
371
372        let handle = tokio::spawn(async move {
373            loop {
374                let sleep = tokio::time::sleep(interval);
375                tokio::pin!(sleep);
376                tokio::select! {
377                    _ = cancellation.cancelled() => {
378                        tracing::debug!("OKX instrument refresh task cancelled");
379                        break;
380                    }
381                    _ = &mut sleep => {
382                        let instrument_types = if config.instrument_types.is_empty() {
383                            vec![OKXInstrumentType::Spot]
384                        } else {
385                            config.instrument_types.clone()
386                        };
387
388                        let mut collected: Vec<InstrumentAny> = Vec::new();
389
390                        for inst_type in instrument_types {
391                            match http_client.request_instruments(inst_type, None).await {
392                                Ok(mut instruments) => {
393                                    instruments.retain(|instrument| contract_filter_with_config(&config, instrument));
394                                    collected.extend(instruments);
395                                }
396                                Err(err) => {
397                                    tracing::warn!(client_id=%client_id, instrument_type=?inst_type, error=?err, "Failed to refresh OKX instruments for type");
398                                }
399                            }
400                        }
401
402                        if collected.is_empty() {
403                            tracing::debug!(client_id=%client_id, "OKX instrument refresh yielded no instruments");
404                            continue;
405                        }
406
407                        http_client.add_instruments(collected.clone());
408
409                        {
410                            let mut guard = instruments_cache
411                                .write()
412                                .expect("instrument cache lock poisoned");
413                            guard.clear();
414                            for instrument in &collected {
415                                guard.insert(instrument.id(), instrument.clone());
416                            }
417                        }
418
419                        tracing::debug!(client_id=%client_id, count=collected.len(), "OKX instruments refreshed");
420                    }
421                }
422            }
423        });
424
425        self.tasks.push(handle);
426        self.instrument_refresh_active = true;
427        Ok(())
428    }
429}
430
431fn emit_funding_rates(updates: Vec<FundingRateUpdate>) {
432    if updates.is_empty() {
433        return;
434    }
435
436    for update in updates {
437        tracing::debug!(
438            "Received funding rate update for {} but forwarding is not yet supported",
439            update.instrument_id
440        );
441    }
442}
443
444fn upsert_instrument(
445    cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
446    instrument: InstrumentAny,
447) {
448    let mut guard = cache.write().expect("instrument cache lock poisoned");
449    guard.insert(instrument.id(), instrument);
450}
451
452fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
453    value
454        .and_then(|dt| dt.timestamp_nanos_opt())
455        .and_then(|nanos| u64::try_from(nanos).ok())
456        .map(UnixNanos::from)
457}
458
459fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
460    match config.contract_types.as_ref() {
461        None => true,
462        Some(filter) if filter.is_empty() => true,
463        Some(filter) => {
464            let is_inverse = instrument.is_inverse();
465            (is_inverse && filter.contains(&OKXContractType::Inverse))
466                || (!is_inverse && filter.contains(&OKXContractType::Linear))
467        }
468    }
469}
470
471#[async_trait::async_trait]
472impl DataClient for OKXDataClient {
473    fn client_id(&self) -> ClientId {
474        self.client_id
475    }
476
477    fn venue(&self) -> Option<Venue> {
478        Some(self.venue())
479    }
480
481    fn start(&mut self) -> anyhow::Result<()> {
482        tracing::info!(
483            client_id = %self.client_id,
484            vip_level = ?self.vip_level(),
485            instrument_types = ?self.config.instrument_types,
486            is_demo = self.config.is_demo,
487            "Starting OKX data client"
488        );
489        Ok(())
490    }
491
492    fn stop(&mut self) -> anyhow::Result<()> {
493        tracing::info!("Stopping OKX data client {id}", id = self.client_id);
494        self.cancellation_token.cancel();
495        self.is_connected.store(false, Ordering::Relaxed);
496        self.instrument_refresh_active = false;
497        Ok(())
498    }
499
500    fn reset(&mut self) -> anyhow::Result<()> {
501        tracing::debug!("Resetting OKX data client {id}", id = self.client_id);
502        self.is_connected.store(false, Ordering::Relaxed);
503        self.cancellation_token = CancellationToken::new();
504        self.tasks.clear();
505        self.book_channels
506            .write()
507            .expect("book channel cache lock poisoned")
508            .clear();
509        self.instrument_refresh_active = false;
510        Ok(())
511    }
512
513    fn dispose(&mut self) -> anyhow::Result<()> {
514        tracing::debug!("Disposing OKX data client {id}", id = self.client_id);
515        self.stop()
516    }
517
518    async fn connect(&mut self) -> anyhow::Result<()> {
519        if self.is_connected() {
520            return Ok(());
521        }
522
523        self.bootstrap_instruments().await?;
524
525        // Query VIP level and update websocket clients
526        if self.config.has_api_credentials()
527            && let Ok(Some(vip_level)) = self.http_client.request_vip_level().await
528        {
529            if let Some(ws) = self.ws_public.as_mut() {
530                ws.set_vip_level(vip_level);
531            }
532            if let Some(ws) = self.ws_business.as_mut() {
533                ws.set_vip_level(vip_level);
534            }
535        }
536
537        {
538            let ws_public = self.public_ws_mut()?;
539            ws_public
540                .connect()
541                .await
542                .context("failed to connect OKX public websocket")?;
543            ws_public
544                .wait_until_active(10.0)
545                .await
546                .context("public websocket did not become active")?;
547        }
548
549        let instrument_types = if self.config.instrument_types.is_empty() {
550            vec![OKXInstrumentType::Spot]
551        } else {
552            self.config.instrument_types.clone()
553        };
554
555        let public_clone = self.public_ws()?.clone();
556        self.spawn_ws(
557            async move {
558                for inst_type in instrument_types {
559                    public_clone
560                        .subscribe_instruments(inst_type)
561                        .await
562                        .with_context(|| {
563                            format!("failed to subscribe to instrument type {inst_type:?}")
564                        })?;
565                }
566                Ok(())
567            },
568            "instrument subscription",
569        );
570
571        self.spawn_public_stream()?;
572
573        if self.ws_business.is_some() {
574            {
575                let ws_business = self.business_ws_mut()?;
576                ws_business
577                    .connect()
578                    .await
579                    .context("failed to connect OKX business websocket")?;
580                ws_business
581                    .wait_until_active(10.0)
582                    .await
583                    .context("business websocket did not become active")?;
584            }
585            self.spawn_business_stream()?;
586        }
587
588        self.maybe_spawn_instrument_refresh()?;
589
590        self.is_connected.store(true, Ordering::Relaxed);
591        tracing::info!("OKX data client connected");
592        Ok(())
593    }
594
595    async fn disconnect(&mut self) -> anyhow::Result<()> {
596        if self.is_disconnected() {
597            return Ok(());
598        }
599
600        self.cancellation_token.cancel();
601
602        if let Some(ws) = self.ws_public.as_mut() {
603            let _ = ws.close().await;
604        }
605        if let Some(ws) = self.ws_business.as_mut() {
606            let _ = ws.close().await;
607        }
608
609        for handle in self.tasks.drain(..) {
610            if let Err(err) = handle.await {
611                tracing::error!("Error joining websocket task: {err}");
612            }
613        }
614
615        self.cancellation_token = CancellationToken::new();
616        self.is_connected.store(false, Ordering::Relaxed);
617        self.book_channels
618            .write()
619            .expect("book channel cache lock poisoned")
620            .clear();
621        self.instrument_refresh_active = false;
622        tracing::info!("OKX data client disconnected");
623        Ok(())
624    }
625
626    fn is_connected(&self) -> bool {
627        self.is_connected.load(Ordering::Relaxed)
628    }
629
630    fn is_disconnected(&self) -> bool {
631        !self.is_connected()
632    }
633
634    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
635        if cmd.book_type != BookType::L2_MBP {
636            anyhow::bail!("OKX only supports L2_MBP order book deltas");
637        }
638
639        let depth = cmd.depth.map(|d| d.get()).unwrap_or(0);
640        if !matches!(depth, 0 | 50 | 400) {
641            anyhow::bail!("invalid depth {depth}; valid values are 50 or 400");
642        }
643
644        let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
645        let channel = match depth {
646            50 => {
647                if vip < OKXVipLevel::Vip4 {
648                    anyhow::bail!(
649                        "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
650                    );
651                }
652                OKXBookChannel::Books50L2Tbt
653            }
654            0 | 400 => {
655                if vip >= OKXVipLevel::Vip5 {
656                    OKXBookChannel::BookL2Tbt
657                } else {
658                    OKXBookChannel::Book
659                }
660            }
661            _ => unreachable!(),
662        };
663
664        let instrument_id = cmd.instrument_id;
665        let ws = self.public_ws()?.clone();
666        let book_channels = Arc::clone(&self.book_channels);
667        self.spawn_ws(
668            async move {
669                match channel {
670                    OKXBookChannel::Books50L2Tbt => ws
671                        .subscribe_book50_l2_tbt(instrument_id)
672                        .await
673                        .context("books50-l2-tbt subscription")?,
674                    OKXBookChannel::BookL2Tbt => ws
675                        .subscribe_book_l2_tbt(instrument_id)
676                        .await
677                        .context("books-l2-tbt subscription")?,
678                    OKXBookChannel::Book => ws
679                        .subscribe_books_channel(instrument_id)
680                        .await
681                        .context("books subscription")?,
682                }
683                book_channels
684                    .write()
685                    .expect("book channel cache lock poisoned")
686                    .insert(instrument_id, channel);
687                Ok(())
688            },
689            "order book delta subscription",
690        );
691
692        Ok(())
693    }
694
695    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
696        if cmd.book_type != BookType::L2_MBP {
697            anyhow::bail!("OKX only supports L2_MBP order book snapshots");
698        }
699        let depth = cmd.depth.map(|d| d.get()).unwrap_or(5);
700        if depth != 5 {
701            anyhow::bail!("OKX only supports depth=5 snapshots");
702        }
703
704        let ws = self.public_ws()?.clone();
705        let instrument_id = cmd.instrument_id;
706        self.spawn_ws(
707            async move {
708                ws.subscribe_book_depth5(instrument_id)
709                    .await
710                    .context("books5 subscription")
711            },
712            "order book snapshot subscription",
713        );
714        Ok(())
715    }
716
717    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
718        let ws = self.public_ws()?.clone();
719        let instrument_id = cmd.instrument_id;
720        self.spawn_ws(
721            async move {
722                ws.subscribe_quotes(instrument_id)
723                    .await
724                    .context("quotes subscription")
725            },
726            "quote subscription",
727        );
728        Ok(())
729    }
730
731    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
732        let ws = self.public_ws()?.clone();
733        let instrument_id = cmd.instrument_id;
734        self.spawn_ws(
735            async move {
736                ws.subscribe_trades(instrument_id, false)
737                    .await
738                    .context("trades subscription")
739            },
740            "trade subscription",
741        );
742        Ok(())
743    }
744
745    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
746        let ws = self.public_ws()?.clone();
747        let instrument_id = cmd.instrument_id;
748        self.spawn_ws(
749            async move {
750                ws.subscribe_mark_prices(instrument_id)
751                    .await
752                    .context("mark price subscription")
753            },
754            "mark price subscription",
755        );
756        Ok(())
757    }
758
759    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
760        let ws = self.public_ws()?.clone();
761        let instrument_id = cmd.instrument_id;
762        self.spawn_ws(
763            async move {
764                ws.subscribe_index_prices(instrument_id)
765                    .await
766                    .context("index price subscription")
767            },
768            "index price subscription",
769        );
770        Ok(())
771    }
772
773    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
774        let ws = self.public_ws()?.clone();
775        let instrument_id = cmd.instrument_id;
776        self.spawn_ws(
777            async move {
778                ws.subscribe_funding_rates(instrument_id)
779                    .await
780                    .context("funding rate subscription")
781            },
782            "funding rate subscription",
783        );
784        Ok(())
785    }
786
787    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
788        let ws = self.business_ws()?.clone();
789        let bar_type = cmd.bar_type;
790        self.spawn_ws(
791            async move {
792                ws.subscribe_bars(bar_type)
793                    .await
794                    .context("bars subscription")
795            },
796            "bar subscription",
797        );
798        Ok(())
799    }
800
801    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
802        let ws = self.public_ws()?.clone();
803        let instrument_id = cmd.instrument_id;
804        let channel = self
805            .book_channels
806            .write()
807            .expect("book channel cache lock poisoned")
808            .remove(&instrument_id);
809        self.spawn_ws(
810            async move {
811                match channel {
812                    Some(OKXBookChannel::Books50L2Tbt) => ws
813                        .unsubscribe_book50_l2_tbt(instrument_id)
814                        .await
815                        .context("books50-l2-tbt unsubscribe")?,
816                    Some(OKXBookChannel::BookL2Tbt) => ws
817                        .unsubscribe_book_l2_tbt(instrument_id)
818                        .await
819                        .context("books-l2-tbt unsubscribe")?,
820                    Some(OKXBookChannel::Book) => ws
821                        .unsubscribe_book(instrument_id)
822                        .await
823                        .context("book unsubscribe")?,
824                    None => {
825                        tracing::warn!(
826                            "Book channel not found for {instrument_id}; unsubscribing fallback channel"
827                        );
828                        ws.unsubscribe_book(instrument_id)
829                            .await
830                            .context("book fallback unsubscribe")?;
831                    }
832                }
833                Ok(())
834            },
835            "order book unsubscribe",
836        );
837        Ok(())
838    }
839
840    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
841        let ws = self.public_ws()?.clone();
842        let instrument_id = cmd.instrument_id;
843        self.spawn_ws(
844            async move {
845                ws.unsubscribe_book_depth5(instrument_id)
846                    .await
847                    .context("book depth5 unsubscribe")
848            },
849            "order book snapshot unsubscribe",
850        );
851        Ok(())
852    }
853
854    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
855        let ws = self.public_ws()?.clone();
856        let instrument_id = cmd.instrument_id;
857        self.spawn_ws(
858            async move {
859                ws.unsubscribe_quotes(instrument_id)
860                    .await
861                    .context("quotes unsubscribe")
862            },
863            "quote unsubscribe",
864        );
865        Ok(())
866    }
867
868    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
869        let ws = self.public_ws()?.clone();
870        let instrument_id = cmd.instrument_id;
871        self.spawn_ws(
872            async move {
873                ws.unsubscribe_trades(instrument_id, false) // TODO: Aggregated trades?
874                    .await
875                    .context("trades unsubscribe")
876            },
877            "trade unsubscribe",
878        );
879        Ok(())
880    }
881
882    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
883        let ws = self.public_ws()?.clone();
884        let instrument_id = cmd.instrument_id;
885        self.spawn_ws(
886            async move {
887                ws.unsubscribe_mark_prices(instrument_id)
888                    .await
889                    .context("mark price unsubscribe")
890            },
891            "mark price unsubscribe",
892        );
893        Ok(())
894    }
895
896    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
897        let ws = self.public_ws()?.clone();
898        let instrument_id = cmd.instrument_id;
899        self.spawn_ws(
900            async move {
901                ws.unsubscribe_index_prices(instrument_id)
902                    .await
903                    .context("index price unsubscribe")
904            },
905            "index price unsubscribe",
906        );
907        Ok(())
908    }
909
910    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
911        let ws = self.public_ws()?.clone();
912        let instrument_id = cmd.instrument_id;
913        self.spawn_ws(
914            async move {
915                ws.unsubscribe_funding_rates(instrument_id)
916                    .await
917                    .context("funding rate unsubscribe")
918            },
919            "funding rate unsubscribe",
920        );
921        Ok(())
922    }
923
924    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
925        let ws = self.business_ws()?.clone();
926        let bar_type = cmd.bar_type;
927        self.spawn_ws(
928            async move {
929                ws.unsubscribe_bars(bar_type)
930                    .await
931                    .context("bars unsubscribe")
932            },
933            "bar unsubscribe",
934        );
935        Ok(())
936    }
937
938    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
939        let instruments = {
940            let guard = self
941                .instruments
942                .read()
943                .expect("instrument cache lock poisoned");
944            guard.values().cloned().collect::<Vec<_>>()
945        };
946
947        let response = DataResponse::Instruments(InstrumentsResponse::new(
948            request.request_id,
949            request.client_id.unwrap_or(self.client_id),
950            self.venue(),
951            instruments,
952            datetime_to_unix_nanos(request.start),
953            datetime_to_unix_nanos(request.end),
954            self.clock.get_time_ns(),
955            request.params.clone(),
956        ));
957
958        if let Err(err) = self.data_sender.send(DataEvent::Response(response)) {
959            tracing::error!("Failed to send instruments response: {err}");
960        }
961
962        Ok(())
963    }
964
965    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
966        let instrument = {
967            let guard = self
968                .instruments
969                .read()
970                .expect("instrument cache lock poisoned");
971            guard
972                .get(&request.instrument_id)
973                .cloned()
974                .context("instrument not found in cache")?
975        };
976
977        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
978            request.request_id,
979            request.client_id.unwrap_or(self.client_id),
980            instrument.id(),
981            instrument,
982            datetime_to_unix_nanos(request.start),
983            datetime_to_unix_nanos(request.end),
984            self.clock.get_time_ns(),
985            request.params.clone(),
986        )));
987
988        if let Err(err) = self.data_sender.send(DataEvent::Response(response)) {
989            tracing::error!("Failed to send instrument response: {err}");
990        }
991
992        Ok(())
993    }
994
995    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
996        let http = self.http_client.clone();
997        let sender = self.data_sender.clone();
998        let instrument_id = request.instrument_id;
999        let start = request.start;
1000        let end = request.end;
1001        let limit = request.limit.map(|n| n.get() as u32);
1002        let request_id = request.request_id;
1003        let client_id = request.client_id.unwrap_or(self.client_id);
1004        let params = request.params.clone();
1005        let clock = self.clock;
1006        let start_nanos = datetime_to_unix_nanos(start);
1007        let end_nanos = datetime_to_unix_nanos(end);
1008
1009        tokio::spawn(async move {
1010            match http
1011                .request_trades(instrument_id, start, end, limit)
1012                .await
1013                .context("failed to request trades from OKX")
1014            {
1015                Ok(trades) => {
1016                    let response = DataResponse::Trades(TradesResponse::new(
1017                        request_id,
1018                        client_id,
1019                        instrument_id,
1020                        trades,
1021                        start_nanos,
1022                        end_nanos,
1023                        clock.get_time_ns(),
1024                        params,
1025                    ));
1026                    if let Err(err) = sender.send(DataEvent::Response(response)) {
1027                        tracing::error!("Failed to send trades response: {err}");
1028                    }
1029                }
1030                Err(err) => tracing::error!("Trade request failed: {err:?}"),
1031            }
1032        });
1033
1034        Ok(())
1035    }
1036
1037    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1038        let http = self.http_client.clone();
1039        let sender = self.data_sender.clone();
1040        let bar_type = request.bar_type;
1041        let start = request.start;
1042        let end = request.end;
1043        let limit = request.limit.map(|n| n.get() as u32);
1044        let request_id = request.request_id;
1045        let client_id = request.client_id.unwrap_or(self.client_id);
1046        let params = request.params.clone();
1047        let clock = self.clock;
1048        let start_nanos = datetime_to_unix_nanos(start);
1049        let end_nanos = datetime_to_unix_nanos(end);
1050
1051        tokio::spawn(async move {
1052            match http
1053                .request_bars(bar_type, start, end, limit)
1054                .await
1055                .context("failed to request bars from OKX")
1056            {
1057                Ok(bars) => {
1058                    let response = DataResponse::Bars(BarsResponse::new(
1059                        request_id,
1060                        client_id,
1061                        bar_type,
1062                        bars,
1063                        start_nanos,
1064                        end_nanos,
1065                        clock.get_time_ns(),
1066                        params,
1067                    ));
1068                    if let Err(err) = sender.send(DataEvent::Response(response)) {
1069                        tracing::error!("Failed to send bars response: {err}");
1070                    }
1071                }
1072                Err(err) => tracing::error!("Bar request failed: {err:?}"),
1073            }
1074        });
1075
1076        Ok(())
1077    }
1078}