nautilus_okx/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the OKX adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use chrono::{DateTime, Utc};
29use futures_util::StreamExt;
30use nautilus_common::{
31    messages::{
32        DataEvent,
33        data::{
34            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
36            SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeFundingRates,
37            SubscribeIndexPrices, SubscribeInstrument, SubscribeInstruments, SubscribeMarkPrices,
38            SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
39            UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeFundingRates,
40            UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
41        },
42    },
43    runner::get_data_event_sender,
44};
45use nautilus_core::{
46    MUTEX_POISONED, UnixNanos,
47    time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_data::client::DataClient;
50use nautilus_model::{
51    data::{Data, FundingRateUpdate, OrderBookDeltas_API},
52    enums::BookType,
53    identifiers::{ClientId, InstrumentId, Venue},
54    instruments::{Instrument, InstrumentAny},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58
59use crate::{
60    common::{
61        consts::OKX_VENUE,
62        enums::{OKXBookChannel, OKXContractType, OKXInstrumentType, OKXVipLevel},
63        parse::okx_instrument_type_from_symbol,
64    },
65    config::OKXDataClientConfig,
66    http::client::OKXHttpClient,
67    websocket::{client::OKXWebSocketClient, messages::NautilusWsMessage},
68};
69
70#[derive(Debug)]
71pub struct OKXDataClient {
72    client_id: ClientId,
73    config: OKXDataClientConfig,
74    http_client: OKXHttpClient,
75    ws_public: Option<OKXWebSocketClient>,
76    ws_business: Option<OKXWebSocketClient>,
77    is_connected: AtomicBool,
78    cancellation_token: CancellationToken,
79    tasks: Vec<JoinHandle<()>>,
80    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
81    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
82    book_channels: Arc<RwLock<AHashMap<InstrumentId, OKXBookChannel>>>,
83    clock: &'static AtomicTime,
84    instrument_refresh_active: bool,
85}
86
87impl OKXDataClient {
88    /// Creates a new [`OKXDataClient`] instance.
89    ///
90    /// # Errors
91    ///
92    /// Returns an error if the client fails to initialize.
93    pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
94        let clock = get_atomic_clock_realtime();
95        let data_sender = get_data_event_sender();
96
97        let http_client = if config.has_api_credentials() {
98            OKXHttpClient::with_credentials(
99                config.api_key.clone(),
100                config.api_secret.clone(),
101                config.api_passphrase.clone(),
102                config.base_url_http.clone(),
103                config.http_timeout_secs,
104                config.max_retries,
105                config.retry_delay_initial_ms,
106                config.retry_delay_max_ms,
107                config.is_demo,
108                config.http_proxy_url.clone(),
109            )?
110        } else {
111            OKXHttpClient::new(
112                config.base_url_http.clone(),
113                config.http_timeout_secs,
114                config.max_retries,
115                config.retry_delay_initial_ms,
116                config.retry_delay_max_ms,
117                config.is_demo,
118                config.http_proxy_url.clone(),
119            )?
120        };
121
122        let ws_public = OKXWebSocketClient::new(
123            Some(config.ws_public_url()),
124            None,
125            None,
126            None,
127            None,
128            Some(20), // Heartbeat
129        )
130        .context("failed to construct OKX public websocket client")?;
131
132        let ws_business = if config.requires_business_ws() {
133            Some(
134                OKXWebSocketClient::new(
135                    Some(config.ws_business_url()),
136                    config.api_key.clone(),
137                    config.api_secret.clone(),
138                    config.api_passphrase.clone(),
139                    None,
140                    Some(20), // Heartbeat
141                )
142                .context("failed to construct OKX business websocket client")?,
143            )
144        } else {
145            None
146        };
147
148        if let Some(vip_level) = config.vip_level {
149            ws_public.set_vip_level(vip_level);
150            if let Some(ref ws) = ws_business {
151                ws.set_vip_level(vip_level);
152            }
153        }
154
155        Ok(Self {
156            client_id,
157            config,
158            http_client,
159            ws_public: Some(ws_public),
160            ws_business,
161            is_connected: AtomicBool::new(false),
162            cancellation_token: CancellationToken::new(),
163            tasks: Vec::new(),
164            data_sender,
165            instruments: Arc::new(RwLock::new(AHashMap::new())),
166            book_channels: Arc::new(RwLock::new(AHashMap::new())),
167            clock,
168            instrument_refresh_active: false,
169        })
170    }
171
172    fn venue(&self) -> Venue {
173        *OKX_VENUE
174    }
175
176    fn vip_level(&self) -> Option<OKXVipLevel> {
177        self.ws_public.as_ref().map(|ws| ws.vip_level())
178    }
179
180    fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
181        self.ws_public
182            .as_ref()
183            .context("public websocket client not initialized")
184    }
185
186    fn public_ws_mut(&mut self) -> anyhow::Result<&mut OKXWebSocketClient> {
187        self.ws_public
188            .as_mut()
189            .context("public websocket client not initialized")
190    }
191
192    fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
193        self.ws_business
194            .as_ref()
195            .context("business websocket client not available (credentials required)")
196    }
197
198    fn business_ws_mut(&mut self) -> anyhow::Result<&mut OKXWebSocketClient> {
199        self.ws_business
200            .as_mut()
201            .context("business websocket client not available (credentials required)")
202    }
203
204    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
205        if let Err(e) = sender.send(DataEvent::Data(data)) {
206            tracing::error!("Failed to emit data event: {e}");
207        }
208    }
209
210    fn spawn_ws<F>(&self, fut: F, context: &'static str)
211    where
212        F: Future<Output = anyhow::Result<()>> + Send + 'static,
213    {
214        tokio::spawn(async move {
215            if let Err(e) = fut.await {
216                tracing::error!("{context}: {e:?}");
217            }
218        });
219    }
220
221    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
222        let instrument_types = if self.config.instrument_types.is_empty() {
223            vec![OKXInstrumentType::Spot]
224        } else {
225            self.config.instrument_types.clone()
226        };
227
228        let mut collected: Vec<InstrumentAny> = Vec::new();
229
230        for inst_type in instrument_types {
231            let mut instruments = self
232                .http_client
233                .request_instruments(inst_type, None)
234                .await
235                .with_context(|| format!("failed to load instruments for {inst_type:?}"))?;
236            instruments.retain(|instrument| self.contract_filter(instrument));
237            tracing::debug!(
238                "loaded {count} instruments for {inst_type:?}",
239                count = instruments.len()
240            );
241            collected.extend(instruments);
242        }
243
244        if collected.is_empty() {
245            tracing::warn!("No OKX instruments were loaded");
246            return Ok(collected);
247        }
248
249        self.http_client.cache_instruments(collected.clone());
250
251        if let Some(ws) = self.ws_public.as_mut() {
252            ws.cache_instruments(collected.clone());
253        }
254
255        if let Some(ws) = self.ws_business.as_mut() {
256            ws.cache_instruments(collected.clone());
257        }
258
259        {
260            let mut guard = self
261                .instruments
262                .write()
263                .expect("instrument cache lock poisoned");
264            guard.clear();
265            for instrument in &collected {
266                guard.insert(instrument.id(), instrument.clone());
267            }
268        }
269
270        Ok(collected)
271    }
272
273    fn contract_filter(&self, instrument: &InstrumentAny) -> bool {
274        contract_filter_with_config(&self.config, instrument)
275    }
276
277    fn handle_ws_message(
278        message: NautilusWsMessage,
279        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
280        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
281    ) {
282        match message {
283            NautilusWsMessage::Data(payloads) => {
284                for data in payloads {
285                    Self::send_data(data_sender, data);
286                }
287            }
288            NautilusWsMessage::Deltas(deltas) => {
289                Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
290            }
291            NautilusWsMessage::FundingRates(updates) => {
292                emit_funding_rates(updates);
293            }
294            NautilusWsMessage::Instrument(instrument) => {
295                upsert_instrument(instruments, *instrument);
296            }
297            NautilusWsMessage::AccountUpdate(_)
298            | NautilusWsMessage::PositionUpdate(_)
299            | NautilusWsMessage::OrderRejected(_)
300            | NautilusWsMessage::OrderCancelRejected(_)
301            | NautilusWsMessage::OrderModifyRejected(_)
302            | NautilusWsMessage::ExecutionReports(_) => {
303                tracing::debug!("Ignoring trading message on data client");
304            }
305            NautilusWsMessage::Error(e) => {
306                tracing::error!("OKX websocket error: {e:?}");
307            }
308            NautilusWsMessage::Raw(value) => {
309                tracing::debug!("Unhandled websocket payload: {value:?}");
310            }
311            NautilusWsMessage::Reconnected => {
312                tracing::info!("Websocket reconnected");
313            }
314            NautilusWsMessage::Authenticated => {
315                tracing::debug!("Websocket authenticated");
316            }
317        }
318    }
319
320    fn spawn_public_stream(&mut self) -> anyhow::Result<()> {
321        let ws = self.public_ws_mut()?;
322        let stream = ws.stream();
323        self.spawn_stream_task(stream)
324    }
325
326    fn spawn_business_stream(&mut self) -> anyhow::Result<()> {
327        if self.ws_business.is_none() {
328            return Ok(());
329        }
330
331        let ws = self.business_ws_mut()?;
332        let stream = ws.stream();
333        self.spawn_stream_task(stream)
334    }
335
336    fn spawn_stream_task(
337        &mut self,
338        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
339    ) -> anyhow::Result<()> {
340        let data_sender = self.data_sender.clone();
341        let instruments = self.instruments.clone();
342        let cancellation = self.cancellation_token.clone();
343
344        let handle = tokio::spawn(async move {
345            tokio::pin!(stream);
346
347            loop {
348                tokio::select! {
349                    maybe_msg = stream.next() => {
350                        match maybe_msg {
351                            Some(msg) => {
352                                Self::handle_ws_message(msg, &data_sender, &instruments);
353                            }
354                            None => {
355                                tracing::debug!("Websocket stream ended");
356                                break;
357                            }
358                        }
359                    }
360                    _ = cancellation.cancelled() => {
361                        tracing::debug!("Websocket stream task cancelled");
362                        break;
363                    }
364                }
365            }
366        });
367
368        self.tasks.push(handle);
369        Ok(())
370    }
371
372    fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
373        let Some(minutes) = self.config.update_instruments_interval_mins else {
374            return Ok(());
375        };
376
377        if minutes == 0 || self.instrument_refresh_active {
378            return Ok(());
379        }
380
381        let interval_secs = minutes.saturating_mul(60);
382        if interval_secs == 0 {
383            return Ok(());
384        }
385
386        let interval = Duration::from_secs(interval_secs);
387        let cancellation = self.cancellation_token.clone();
388        let instruments_cache = Arc::clone(&self.instruments);
389        let http_client = self.http_client.clone();
390        let config = self.config.clone();
391        let client_id = self.client_id;
392
393        let handle = tokio::spawn(async move {
394            loop {
395                let sleep = tokio::time::sleep(interval);
396                tokio::pin!(sleep);
397                tokio::select! {
398                    _ = cancellation.cancelled() => {
399                        tracing::debug!("OKX instrument refresh task cancelled");
400                        break;
401                    }
402                    _ = &mut sleep => {
403                        let instrument_types = if config.instrument_types.is_empty() {
404                            vec![OKXInstrumentType::Spot]
405                        } else {
406                            config.instrument_types.clone()
407                        };
408
409                        let mut collected: Vec<InstrumentAny> = Vec::new();
410
411                        for inst_type in instrument_types {
412                            match http_client.request_instruments(inst_type, None).await {
413                                Ok(mut instruments) => {
414                                    instruments.retain(|instrument| contract_filter_with_config(&config, instrument));
415                                    collected.extend(instruments);
416                                }
417                                Err(e) => {
418                                    tracing::warn!(client_id=%client_id, instrument_type=?inst_type, error=?e, "Failed to refresh OKX instruments for type");
419                                }
420                            }
421                        }
422
423                        if collected.is_empty() {
424                            tracing::debug!(client_id=%client_id, "OKX instrument refresh yielded no instruments");
425                            continue;
426                        }
427
428                        http_client.cache_instruments(collected.clone());
429
430                        {
431                            let mut guard = instruments_cache
432                                .write()
433                                .expect("instrument cache lock poisoned");
434                            guard.clear();
435                            for instrument in &collected {
436                                guard.insert(instrument.id(), instrument.clone());
437                            }
438                        }
439
440                        tracing::debug!(client_id=%client_id, count=collected.len(), "OKX instruments refreshed");
441                    }
442                }
443            }
444        });
445
446        self.tasks.push(handle);
447        self.instrument_refresh_active = true;
448        Ok(())
449    }
450}
451
452fn emit_funding_rates(updates: Vec<FundingRateUpdate>) {
453    if updates.is_empty() {
454        return;
455    }
456
457    for update in updates {
458        tracing::debug!(
459            "Received funding rate update for {} but forwarding is not yet supported",
460            update.instrument_id
461        );
462    }
463}
464
465fn upsert_instrument(
466    cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
467    instrument: InstrumentAny,
468) {
469    let mut guard = cache.write().expect(MUTEX_POISONED);
470    guard.insert(instrument.id(), instrument);
471}
472
473fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
474    value
475        .and_then(|dt| dt.timestamp_nanos_opt())
476        .and_then(|nanos| u64::try_from(nanos).ok())
477        .map(UnixNanos::from)
478}
479
480fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
481    contract_filter_with_config_types(config.contract_types.as_ref(), instrument)
482}
483
484fn contract_filter_with_config_types(
485    contract_types: Option<&Vec<OKXContractType>>,
486    instrument: &InstrumentAny,
487) -> bool {
488    match contract_types {
489        None => true,
490        Some(filter) if filter.is_empty() => true,
491        Some(filter) => {
492            let is_inverse = instrument.is_inverse();
493            (is_inverse && filter.contains(&OKXContractType::Inverse))
494                || (!is_inverse && filter.contains(&OKXContractType::Linear))
495        }
496    }
497}
498
499#[async_trait::async_trait]
500impl DataClient for OKXDataClient {
501    fn client_id(&self) -> ClientId {
502        self.client_id
503    }
504
505    fn venue(&self) -> Option<Venue> {
506        Some(self.venue())
507    }
508
509    fn start(&mut self) -> anyhow::Result<()> {
510        tracing::info!(
511            client_id = %self.client_id,
512            vip_level = ?self.vip_level(),
513            instrument_types = ?self.config.instrument_types,
514            is_demo = self.config.is_demo,
515            http_proxy_url = ?self.config.http_proxy_url,
516            ws_proxy_url = ?self.config.ws_proxy_url,
517            "Starting OKX data client"
518        );
519        Ok(())
520    }
521
522    fn stop(&mut self) -> anyhow::Result<()> {
523        tracing::info!("Stopping OKX data client {id}", id = self.client_id);
524        self.cancellation_token.cancel();
525        self.is_connected.store(false, Ordering::Relaxed);
526        self.instrument_refresh_active = false;
527        Ok(())
528    }
529
530    fn reset(&mut self) -> anyhow::Result<()> {
531        tracing::debug!("Resetting OKX data client {id}", id = self.client_id);
532        self.is_connected.store(false, Ordering::Relaxed);
533        self.cancellation_token = CancellationToken::new();
534        self.tasks.clear();
535        self.book_channels
536            .write()
537            .expect("book channel cache lock poisoned")
538            .clear();
539        self.instrument_refresh_active = false;
540        Ok(())
541    }
542
543    fn dispose(&mut self) -> anyhow::Result<()> {
544        tracing::debug!("Disposing OKX data client {id}", id = self.client_id);
545        self.stop()
546    }
547
548    async fn connect(&mut self) -> anyhow::Result<()> {
549        if self.is_connected() {
550            return Ok(());
551        }
552
553        self.bootstrap_instruments().await?;
554
555        {
556            let ws_public = self.public_ws_mut()?;
557            ws_public
558                .connect()
559                .await
560                .context("failed to connect OKX public websocket")?;
561            ws_public
562                .wait_until_active(10.0)
563                .await
564                .context("public websocket did not become active")?;
565        }
566
567        let instrument_types = if self.config.instrument_types.is_empty() {
568            vec![OKXInstrumentType::Spot]
569        } else {
570            self.config.instrument_types.clone()
571        };
572
573        self.spawn_public_stream()?;
574
575        let public_clone = self.public_ws()?.clone();
576
577        self.spawn_ws(
578            async move {
579                for inst_type in instrument_types {
580                    public_clone
581                        .subscribe_instruments(inst_type)
582                        .await
583                        .with_context(|| {
584                            format!("failed to subscribe to instrument type {inst_type:?}")
585                        })?;
586                }
587                Ok(())
588            },
589            "instrument subscription",
590        );
591
592        if self.ws_business.is_some() {
593            {
594                let ws_business = self.business_ws_mut()?;
595                ws_business
596                    .connect()
597                    .await
598                    .context("failed to connect OKX business websocket")?;
599                ws_business
600                    .wait_until_active(10.0)
601                    .await
602                    .context("business websocket did not become active")?;
603            }
604            self.spawn_business_stream()?;
605        }
606
607        self.maybe_spawn_instrument_refresh()?;
608
609        self.is_connected.store(true, Ordering::Relaxed);
610        tracing::info!(client_id = %self.client_id, "Connected");
611        Ok(())
612    }
613
614    async fn disconnect(&mut self) -> anyhow::Result<()> {
615        if self.is_disconnected() {
616            return Ok(());
617        }
618
619        if let Some(ws) = self.ws_public.as_ref()
620            && let Err(e) = ws.unsubscribe_all().await
621        {
622            tracing::warn!("Failed to unsubscribe all from public websocket: {e:?}");
623        }
624        if let Some(ws) = self.ws_business.as_ref()
625            && let Err(e) = ws.unsubscribe_all().await
626        {
627            tracing::warn!("Failed to unsubscribe all from business websocket: {e:?}");
628        }
629
630        // Allow time for unsubscribe confirmations to be processed
631        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
632
633        self.cancellation_token.cancel();
634
635        if let Some(ws) = self.ws_public.as_mut() {
636            let _ = ws.close().await;
637        }
638        if let Some(ws) = self.ws_business.as_mut() {
639            let _ = ws.close().await;
640        }
641
642        for handle in self.tasks.drain(..) {
643            if let Err(e) = handle.await {
644                tracing::error!("Error joining websocket task: {e}");
645            }
646        }
647
648        self.cancellation_token = CancellationToken::new();
649        self.is_connected.store(false, Ordering::Relaxed);
650        self.book_channels
651            .write()
652            .expect("book channel cache lock poisoned")
653            .clear();
654        self.instrument_refresh_active = false;
655
656        tracing::info!(client_id = %self.client_id, "Disconnected");
657        Ok(())
658    }
659
660    fn is_connected(&self) -> bool {
661        self.is_connected.load(Ordering::Relaxed)
662    }
663
664    fn is_disconnected(&self) -> bool {
665        !self.is_connected()
666    }
667
668    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
669        // Subscribe to instruments channel for all configured instrument types
670        for inst_type in &self.config.instrument_types {
671            let ws = self.public_ws()?.clone();
672            let inst_type = *inst_type;
673
674            self.spawn_ws(
675                async move {
676                    ws.subscribe_instruments(inst_type)
677                        .await
678                        .context("instruments subscription")?;
679                    Ok(())
680                },
681                "subscribe_instruments",
682            );
683        }
684        Ok(())
685    }
686
687    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
688        // OKX instruments channel doesn't support subscribing to individual instruments via instId
689        // Instead, subscribe to the instrument type if not already subscribed
690        let instrument_id = cmd.instrument_id;
691        let ws = self.public_ws()?.clone();
692
693        self.spawn_ws(
694            async move {
695                ws.subscribe_instrument(instrument_id)
696                    .await
697                    .context("instrument type subscription")?;
698                Ok(())
699            },
700            "subscribe_instrument",
701        );
702        Ok(())
703    }
704
705    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
706        if cmd.book_type != BookType::L2_MBP {
707            anyhow::bail!("OKX only supports L2_MBP order book deltas");
708        }
709
710        let depth = cmd.depth.map_or(0, |d| d.get());
711        if !matches!(depth, 0 | 50 | 400) {
712            anyhow::bail!("invalid depth {depth}; valid values are 50 or 400");
713        }
714
715        let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
716        let channel = match depth {
717            50 => {
718                if vip < OKXVipLevel::Vip4 {
719                    anyhow::bail!(
720                        "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
721                    );
722                }
723                OKXBookChannel::Books50L2Tbt
724            }
725            0 | 400 => {
726                if vip >= OKXVipLevel::Vip5 {
727                    OKXBookChannel::BookL2Tbt
728                } else {
729                    OKXBookChannel::Book
730                }
731            }
732            _ => unreachable!(),
733        };
734
735        let instrument_id = cmd.instrument_id;
736        let ws = self.public_ws()?.clone();
737        let book_channels = Arc::clone(&self.book_channels);
738
739        self.spawn_ws(
740            async move {
741                match channel {
742                    OKXBookChannel::Books50L2Tbt => ws
743                        .subscribe_book50_l2_tbt(instrument_id)
744                        .await
745                        .context("books50-l2-tbt subscription")?,
746                    OKXBookChannel::BookL2Tbt => ws
747                        .subscribe_book_l2_tbt(instrument_id)
748                        .await
749                        .context("books-l2-tbt subscription")?,
750                    OKXBookChannel::Book => ws
751                        .subscribe_books_channel(instrument_id)
752                        .await
753                        .context("books subscription")?,
754                }
755                book_channels
756                    .write()
757                    .expect("book channel cache lock poisoned")
758                    .insert(instrument_id, channel);
759                Ok(())
760            },
761            "order book delta subscription",
762        );
763
764        Ok(())
765    }
766
767    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
768        if cmd.book_type != BookType::L2_MBP {
769            anyhow::bail!("OKX only supports L2_MBP order book snapshots");
770        }
771        let depth = cmd.depth.map_or(5, |d| d.get());
772        if depth != 5 {
773            anyhow::bail!("OKX only supports depth=5 snapshots");
774        }
775
776        let ws = self.public_ws()?.clone();
777        let instrument_id = cmd.instrument_id;
778
779        self.spawn_ws(
780            async move {
781                ws.subscribe_book_depth5(instrument_id)
782                    .await
783                    .context("books5 subscription")
784            },
785            "order book snapshot subscription",
786        );
787        Ok(())
788    }
789
790    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
791        let ws = self.public_ws()?.clone();
792        let instrument_id = cmd.instrument_id;
793
794        self.spawn_ws(
795            async move {
796                ws.subscribe_quotes(instrument_id)
797                    .await
798                    .context("quotes subscription")
799            },
800            "quote subscription",
801        );
802        Ok(())
803    }
804
805    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
806        let ws = self.public_ws()?.clone();
807        let instrument_id = cmd.instrument_id;
808
809        self.spawn_ws(
810            async move {
811                ws.subscribe_trades(instrument_id, false)
812                    .await
813                    .context("trades subscription")
814            },
815            "trade subscription",
816        );
817        Ok(())
818    }
819
820    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
821        let ws = self.public_ws()?.clone();
822        let instrument_id = cmd.instrument_id;
823
824        self.spawn_ws(
825            async move {
826                ws.subscribe_mark_prices(instrument_id)
827                    .await
828                    .context("mark price subscription")
829            },
830            "mark price subscription",
831        );
832        Ok(())
833    }
834
835    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
836        let ws = self.public_ws()?.clone();
837        let instrument_id = cmd.instrument_id;
838
839        self.spawn_ws(
840            async move {
841                ws.subscribe_index_prices(instrument_id)
842                    .await
843                    .context("index price subscription")
844            },
845            "index price subscription",
846        );
847        Ok(())
848    }
849
850    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
851        let ws = self.public_ws()?.clone();
852        let instrument_id = cmd.instrument_id;
853
854        self.spawn_ws(
855            async move {
856                ws.subscribe_funding_rates(instrument_id)
857                    .await
858                    .context("funding rate subscription")
859            },
860            "funding rate subscription",
861        );
862        Ok(())
863    }
864
865    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
866        let ws = self.business_ws()?.clone();
867        let bar_type = cmd.bar_type;
868
869        self.spawn_ws(
870            async move {
871                ws.subscribe_bars(bar_type)
872                    .await
873                    .context("bars subscription")
874            },
875            "bar subscription",
876        );
877        Ok(())
878    }
879
880    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
881        let ws = self.public_ws()?.clone();
882        let instrument_id = cmd.instrument_id;
883        let channel = self
884            .book_channels
885            .write()
886            .expect("book channel cache lock poisoned")
887            .remove(&instrument_id);
888
889        self.spawn_ws(
890            async move {
891                match channel {
892                    Some(OKXBookChannel::Books50L2Tbt) => ws
893                        .unsubscribe_book50_l2_tbt(instrument_id)
894                        .await
895                        .context("books50-l2-tbt unsubscribe")?,
896                    Some(OKXBookChannel::BookL2Tbt) => ws
897                        .unsubscribe_book_l2_tbt(instrument_id)
898                        .await
899                        .context("books-l2-tbt unsubscribe")?,
900                    Some(OKXBookChannel::Book) => ws
901                        .unsubscribe_book(instrument_id)
902                        .await
903                        .context("book unsubscribe")?,
904                    None => {
905                        tracing::warn!(
906                            "Book channel not found for {instrument_id}; unsubscribing fallback channel"
907                        );
908                        ws.unsubscribe_book(instrument_id)
909                            .await
910                            .context("book fallback unsubscribe")?;
911                    }
912                }
913                Ok(())
914            },
915            "order book unsubscribe",
916        );
917        Ok(())
918    }
919
920    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
921        let ws = self.public_ws()?.clone();
922        let instrument_id = cmd.instrument_id;
923
924        self.spawn_ws(
925            async move {
926                ws.unsubscribe_book_depth5(instrument_id)
927                    .await
928                    .context("book depth5 unsubscribe")
929            },
930            "order book snapshot unsubscribe",
931        );
932        Ok(())
933    }
934
935    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
936        let ws = self.public_ws()?.clone();
937        let instrument_id = cmd.instrument_id;
938
939        self.spawn_ws(
940            async move {
941                ws.unsubscribe_quotes(instrument_id)
942                    .await
943                    .context("quotes unsubscribe")
944            },
945            "quote unsubscribe",
946        );
947        Ok(())
948    }
949
950    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
951        let ws = self.public_ws()?.clone();
952        let instrument_id = cmd.instrument_id;
953
954        self.spawn_ws(
955            async move {
956                ws.unsubscribe_trades(instrument_id, false) // TODO: Aggregated trades?
957                    .await
958                    .context("trades unsubscribe")
959            },
960            "trade unsubscribe",
961        );
962        Ok(())
963    }
964
965    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
966        let ws = self.public_ws()?.clone();
967        let instrument_id = cmd.instrument_id;
968
969        self.spawn_ws(
970            async move {
971                ws.unsubscribe_mark_prices(instrument_id)
972                    .await
973                    .context("mark price unsubscribe")
974            },
975            "mark price unsubscribe",
976        );
977        Ok(())
978    }
979
980    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
981        let ws = self.public_ws()?.clone();
982        let instrument_id = cmd.instrument_id;
983
984        self.spawn_ws(
985            async move {
986                ws.unsubscribe_index_prices(instrument_id)
987                    .await
988                    .context("index price unsubscribe")
989            },
990            "index price unsubscribe",
991        );
992        Ok(())
993    }
994
995    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
996        let ws = self.public_ws()?.clone();
997        let instrument_id = cmd.instrument_id;
998
999        self.spawn_ws(
1000            async move {
1001                ws.unsubscribe_funding_rates(instrument_id)
1002                    .await
1003                    .context("funding rate unsubscribe")
1004            },
1005            "funding rate unsubscribe",
1006        );
1007        Ok(())
1008    }
1009
1010    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1011        let ws = self.business_ws()?.clone();
1012        let bar_type = cmd.bar_type;
1013
1014        self.spawn_ws(
1015            async move {
1016                ws.unsubscribe_bars(bar_type)
1017                    .await
1018                    .context("bars unsubscribe")
1019            },
1020            "bar unsubscribe",
1021        );
1022        Ok(())
1023    }
1024
1025    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
1026        let http = self.http_client.clone();
1027        let sender = self.data_sender.clone();
1028        let instruments_cache = self.instruments.clone();
1029        let request_id = request.request_id;
1030        let client_id = request.client_id.unwrap_or(self.client_id);
1031        let venue = self.venue();
1032        let start = request.start;
1033        let end = request.end;
1034        let params = request.params.clone();
1035        let clock = self.clock;
1036        let start_nanos = datetime_to_unix_nanos(start);
1037        let end_nanos = datetime_to_unix_nanos(end);
1038        let instrument_types = if self.config.instrument_types.is_empty() {
1039            vec![OKXInstrumentType::Spot]
1040        } else {
1041            self.config.instrument_types.clone()
1042        };
1043        let contract_types = self.config.contract_types.clone();
1044        let instrument_families = self.config.instrument_families.clone();
1045
1046        tokio::spawn(async move {
1047            let mut all_instruments = Vec::new();
1048
1049            for inst_type in instrument_types {
1050                let supports_family = matches!(
1051                    inst_type,
1052                    OKXInstrumentType::Futures
1053                        | OKXInstrumentType::Swap
1054                        | OKXInstrumentType::Option
1055                );
1056
1057                let families = match (&instrument_families, inst_type, supports_family) {
1058                    (Some(families), OKXInstrumentType::Option, true) => families.clone(),
1059                    (Some(families), _, true) => families.clone(),
1060                    (None, OKXInstrumentType::Option, _) => {
1061                        tracing::warn!(
1062                            "Skipping OPTION type: instrument_families required but not configured"
1063                        );
1064                        continue;
1065                    }
1066                    _ => vec![],
1067                };
1068
1069                if families.is_empty() {
1070                    match http.request_instruments(inst_type, None).await {
1071                        Ok(instruments) => {
1072                            for instrument in instruments {
1073                                if !contract_filter_with_config_types(
1074                                    contract_types.as_ref(),
1075                                    &instrument,
1076                                ) {
1077                                    continue;
1078                                }
1079
1080                                upsert_instrument(&instruments_cache, instrument.clone());
1081                                all_instruments.push(instrument);
1082                            }
1083                        }
1084                        Err(e) => {
1085                            tracing::error!("Failed to fetch instruments for {inst_type:?}: {e:?}");
1086                        }
1087                    }
1088                } else {
1089                    for family in families {
1090                        match http
1091                            .request_instruments(inst_type, Some(family.clone()))
1092                            .await
1093                        {
1094                            Ok(instruments) => {
1095                                for instrument in instruments {
1096                                    if !contract_filter_with_config_types(
1097                                        contract_types.as_ref(),
1098                                        &instrument,
1099                                    ) {
1100                                        continue;
1101                                    }
1102
1103                                    upsert_instrument(&instruments_cache, instrument.clone());
1104                                    all_instruments.push(instrument);
1105                                }
1106                            }
1107                            Err(e) => {
1108                                tracing::error!(
1109                                    "Failed to fetch instruments for {inst_type:?} family {family}: {e:?}"
1110                                );
1111                            }
1112                        }
1113                    }
1114                }
1115            }
1116
1117            let response = DataResponse::Instruments(InstrumentsResponse::new(
1118                request_id,
1119                client_id,
1120                venue,
1121                all_instruments,
1122                start_nanos,
1123                end_nanos,
1124                clock.get_time_ns(),
1125                params,
1126            ));
1127
1128            if let Err(e) = sender.send(DataEvent::Response(response)) {
1129                tracing::error!("Failed to send instruments response: {e}");
1130            }
1131        });
1132
1133        Ok(())
1134    }
1135
1136    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
1137        let http = self.http_client.clone();
1138        let sender = self.data_sender.clone();
1139        let instruments = self.instruments.clone();
1140        let instrument_id = request.instrument_id;
1141        let request_id = request.request_id;
1142        let client_id = request.client_id.unwrap_or(self.client_id);
1143        let start = request.start;
1144        let end = request.end;
1145        let params = request.params.clone();
1146        let clock = self.clock;
1147        let start_nanos = datetime_to_unix_nanos(start);
1148        let end_nanos = datetime_to_unix_nanos(end);
1149        let instrument_types = if self.config.instrument_types.is_empty() {
1150            vec![OKXInstrumentType::Spot]
1151        } else {
1152            self.config.instrument_types.clone()
1153        };
1154        let contract_types = self.config.contract_types.clone();
1155
1156        tokio::spawn(async move {
1157            match http
1158                .request_instrument(instrument_id)
1159                .await
1160                .context("fetch instrument from API")
1161            {
1162                Ok(instrument) => {
1163                    let inst_id = instrument.id();
1164                    let symbol = inst_id.symbol.as_str();
1165                    let inst_type = okx_instrument_type_from_symbol(symbol);
1166                    if !instrument_types.contains(&inst_type) {
1167                        tracing::error!(
1168                            "Instrument {instrument_id} type {inst_type:?} not in configured types {instrument_types:?}"
1169                        );
1170                        return;
1171                    }
1172
1173                    if !contract_filter_with_config_types(contract_types.as_ref(), &instrument) {
1174                        tracing::error!(
1175                            "Instrument {instrument_id} filtered out by contract_types config"
1176                        );
1177                        return;
1178                    }
1179
1180                    upsert_instrument(&instruments, instrument.clone());
1181
1182                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1183                        request_id,
1184                        client_id,
1185                        instrument.id(),
1186                        instrument,
1187                        start_nanos,
1188                        end_nanos,
1189                        clock.get_time_ns(),
1190                        params,
1191                    )));
1192
1193                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1194                        tracing::error!("Failed to send instrument response: {e}");
1195                    }
1196                }
1197                Err(e) => tracing::error!("Instrument request failed: {e:?}"),
1198            }
1199        });
1200
1201        Ok(())
1202    }
1203
1204    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
1205        let http = self.http_client.clone();
1206        let sender = self.data_sender.clone();
1207        let instrument_id = request.instrument_id;
1208        let start = request.start;
1209        let end = request.end;
1210        let limit = request.limit.map(|n| n.get() as u32);
1211        let request_id = request.request_id;
1212        let client_id = request.client_id.unwrap_or(self.client_id);
1213        let params = request.params.clone();
1214        let clock = self.clock;
1215        let start_nanos = datetime_to_unix_nanos(start);
1216        let end_nanos = datetime_to_unix_nanos(end);
1217
1218        tokio::spawn(async move {
1219            match http
1220                .request_trades(instrument_id, start, end, limit)
1221                .await
1222                .context("failed to request trades from OKX")
1223            {
1224                Ok(trades) => {
1225                    let response = DataResponse::Trades(TradesResponse::new(
1226                        request_id,
1227                        client_id,
1228                        instrument_id,
1229                        trades,
1230                        start_nanos,
1231                        end_nanos,
1232                        clock.get_time_ns(),
1233                        params,
1234                    ));
1235                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1236                        tracing::error!("Failed to send trades response: {e}");
1237                    }
1238                }
1239                Err(e) => tracing::error!("Trade request failed: {e:?}"),
1240            }
1241        });
1242
1243        Ok(())
1244    }
1245
1246    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1247        let http = self.http_client.clone();
1248        let sender = self.data_sender.clone();
1249        let bar_type = request.bar_type;
1250        let start = request.start;
1251        let end = request.end;
1252        let limit = request.limit.map(|n| n.get() as u32);
1253        let request_id = request.request_id;
1254        let client_id = request.client_id.unwrap_or(self.client_id);
1255        let params = request.params.clone();
1256        let clock = self.clock;
1257        let start_nanos = datetime_to_unix_nanos(start);
1258        let end_nanos = datetime_to_unix_nanos(end);
1259
1260        tokio::spawn(async move {
1261            match http
1262                .request_bars(bar_type, start, end, limit)
1263                .await
1264                .context("failed to request bars from OKX")
1265            {
1266                Ok(bars) => {
1267                    let response = DataResponse::Bars(BarsResponse::new(
1268                        request_id,
1269                        client_id,
1270                        bar_type,
1271                        bars,
1272                        start_nanos,
1273                        end_nanos,
1274                        clock.get_time_ns(),
1275                        params,
1276                    ));
1277                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1278                        tracing::error!("Failed to send bars response: {e}");
1279                    }
1280                }
1281                Err(e) => tracing::error!("Bar request failed: {e:?}"),
1282            }
1283        });
1284
1285        Ok(())
1286    }
1287}