nautilus_okx/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the OKX adapter.
17
18use std::sync::{
19    Arc, RwLock,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27    clients::DataClient,
28    live::{runner::get_data_event_sender, runtime::get_runtime},
29    messages::{
30        DataEvent,
31        data::{
32            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
34            SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
35            SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
36            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeFundingRates,
37            UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
38        },
39    },
40};
41use nautilus_core::{
42    MUTEX_POISONED,
43    datetime::datetime_to_unix_nanos,
44    time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_model::{
47    data::{Data, FundingRateUpdate, OrderBookDeltas_API},
48    enums::BookType,
49    identifiers::{ClientId, InstrumentId, Venue},
50    instruments::{Instrument, InstrumentAny},
51};
52use tokio::{task::JoinHandle, time::Duration};
53use tokio_util::sync::CancellationToken;
54
55use crate::{
56    common::{
57        consts::OKX_VENUE,
58        enums::{OKXBookChannel, OKXContractType, OKXInstrumentType, OKXVipLevel},
59        parse::okx_instrument_type_from_symbol,
60    },
61    config::OKXDataClientConfig,
62    http::client::OKXHttpClient,
63    websocket::{client::OKXWebSocketClient, messages::NautilusWsMessage},
64};
65
66#[derive(Debug)]
67pub struct OKXDataClient {
68    client_id: ClientId,
69    config: OKXDataClientConfig,
70    http_client: OKXHttpClient,
71    ws_public: Option<OKXWebSocketClient>,
72    ws_business: Option<OKXWebSocketClient>,
73    is_connected: AtomicBool,
74    cancellation_token: CancellationToken,
75    tasks: Vec<JoinHandle<()>>,
76    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
77    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
78    book_channels: Arc<RwLock<AHashMap<InstrumentId, OKXBookChannel>>>,
79    clock: &'static AtomicTime,
80}
81
82impl OKXDataClient {
83    /// Creates a new [`OKXDataClient`] instance.
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if the client fails to initialize.
88    pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
89        let clock = get_atomic_clock_realtime();
90        let data_sender = get_data_event_sender();
91
92        let http_client = if config.has_api_credentials() {
93            OKXHttpClient::with_credentials(
94                config.api_key.clone(),
95                config.api_secret.clone(),
96                config.api_passphrase.clone(),
97                config.base_url_http.clone(),
98                config.http_timeout_secs,
99                config.max_retries,
100                config.retry_delay_initial_ms,
101                config.retry_delay_max_ms,
102                config.is_demo,
103                config.http_proxy_url.clone(),
104            )?
105        } else {
106            OKXHttpClient::new(
107                config.base_url_http.clone(),
108                config.http_timeout_secs,
109                config.max_retries,
110                config.retry_delay_initial_ms,
111                config.retry_delay_max_ms,
112                config.is_demo,
113                config.http_proxy_url.clone(),
114            )?
115        };
116
117        let ws_public = OKXWebSocketClient::new(
118            Some(config.ws_public_url()),
119            None,
120            None,
121            None,
122            None,
123            Some(20), // Heartbeat
124        )
125        .context("failed to construct OKX public websocket client")?;
126
127        let ws_business = if config.requires_business_ws() {
128            Some(
129                OKXWebSocketClient::new(
130                    Some(config.ws_business_url()),
131                    config.api_key.clone(),
132                    config.api_secret.clone(),
133                    config.api_passphrase.clone(),
134                    None,
135                    Some(20), // Heartbeat
136                )
137                .context("failed to construct OKX business websocket client")?,
138            )
139        } else {
140            None
141        };
142
143        if let Some(vip_level) = config.vip_level {
144            ws_public.set_vip_level(vip_level);
145            if let Some(ref ws) = ws_business {
146                ws.set_vip_level(vip_level);
147            }
148        }
149
150        Ok(Self {
151            client_id,
152            config,
153            http_client,
154            ws_public: Some(ws_public),
155            ws_business,
156            is_connected: AtomicBool::new(false),
157            cancellation_token: CancellationToken::new(),
158            tasks: Vec::new(),
159            data_sender,
160            instruments: Arc::new(RwLock::new(AHashMap::new())),
161            book_channels: Arc::new(RwLock::new(AHashMap::new())),
162            clock,
163        })
164    }
165
166    fn venue(&self) -> Venue {
167        *OKX_VENUE
168    }
169
170    fn vip_level(&self) -> Option<OKXVipLevel> {
171        self.ws_public.as_ref().map(|ws| ws.vip_level())
172    }
173
174    fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
175        self.ws_public
176            .as_ref()
177            .context("public websocket client not initialized")
178    }
179
180    fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
181        self.ws_business
182            .as_ref()
183            .context("business websocket client not available (credentials required)")
184    }
185
186    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
187        if let Err(e) = sender.send(DataEvent::Data(data)) {
188            log::error!("Failed to emit data event: {e}");
189        }
190    }
191
192    fn spawn_ws<F>(&self, fut: F, context: &'static str)
193    where
194        F: Future<Output = anyhow::Result<()>> + Send + 'static,
195    {
196        get_runtime().spawn(async move {
197            if let Err(e) = fut.await {
198                log::error!("{context}: {e:?}");
199            }
200        });
201    }
202
203    fn handle_ws_message(
204        message: NautilusWsMessage,
205        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
206        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
207    ) {
208        match message {
209            NautilusWsMessage::Data(payloads) => {
210                for data in payloads {
211                    Self::send_data(data_sender, data);
212                }
213            }
214            NautilusWsMessage::Deltas(deltas) => {
215                Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
216            }
217            NautilusWsMessage::FundingRates(updates) => {
218                emit_funding_rates(updates);
219            }
220            NautilusWsMessage::Instrument(instrument) => {
221                upsert_instrument(instruments, *instrument);
222            }
223            NautilusWsMessage::AccountUpdate(_)
224            | NautilusWsMessage::PositionUpdate(_)
225            | NautilusWsMessage::OrderAccepted(_)
226            | NautilusWsMessage::OrderCanceled(_)
227            | NautilusWsMessage::OrderExpired(_)
228            | NautilusWsMessage::OrderRejected(_)
229            | NautilusWsMessage::OrderCancelRejected(_)
230            | NautilusWsMessage::OrderModifyRejected(_)
231            | NautilusWsMessage::OrderTriggered(_)
232            | NautilusWsMessage::OrderUpdated(_)
233            | NautilusWsMessage::ExecutionReports(_) => {
234                log::debug!("Ignoring trading message on data client");
235            }
236            NautilusWsMessage::Error(e) => {
237                log::error!("OKX websocket error: {e:?}");
238            }
239            NautilusWsMessage::Raw(value) => {
240                log::debug!("Unhandled websocket payload: {value:?}");
241            }
242            NautilusWsMessage::Reconnected => {
243                log::info!("Websocket reconnected");
244            }
245            NautilusWsMessage::Authenticated => {
246                log::debug!("Websocket authenticated");
247            }
248        }
249    }
250}
251
252fn emit_funding_rates(updates: Vec<FundingRateUpdate>) {
253    if updates.is_empty() {
254        return;
255    }
256
257    for update in updates {
258        log::debug!(
259            "Received funding rate update for {} but forwarding is not yet supported",
260            update.instrument_id
261        );
262    }
263}
264
265fn upsert_instrument(
266    cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
267    instrument: InstrumentAny,
268) {
269    let mut guard = cache.write().expect(MUTEX_POISONED);
270    guard.insert(instrument.id(), instrument);
271}
272
273fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
274    contract_filter_with_config_types(config.contract_types.as_ref(), instrument)
275}
276
277fn contract_filter_with_config_types(
278    contract_types: Option<&Vec<OKXContractType>>,
279    instrument: &InstrumentAny,
280) -> bool {
281    match contract_types {
282        None => true,
283        Some(filter) if filter.is_empty() => true,
284        Some(filter) => {
285            let is_inverse = instrument.is_inverse();
286            (is_inverse && filter.contains(&OKXContractType::Inverse))
287                || (!is_inverse && filter.contains(&OKXContractType::Linear))
288        }
289    }
290}
291
292#[async_trait::async_trait(?Send)]
293impl DataClient for OKXDataClient {
294    fn client_id(&self) -> ClientId {
295        self.client_id
296    }
297
298    fn venue(&self) -> Option<Venue> {
299        Some(self.venue())
300    }
301
302    fn start(&mut self) -> anyhow::Result<()> {
303        log::info!(
304            "Started: client_id={}, vip_level={:?}, instrument_types={:?}, is_demo={}, http_proxy_url={:?}, ws_proxy_url={:?}",
305            self.client_id,
306            self.vip_level(),
307            self.config.instrument_types,
308            self.config.is_demo,
309            self.config.http_proxy_url,
310            self.config.ws_proxy_url,
311        );
312        Ok(())
313    }
314
315    fn stop(&mut self) -> anyhow::Result<()> {
316        log::info!("Stopping {id}", id = self.client_id);
317        self.cancellation_token.cancel();
318        self.is_connected.store(false, Ordering::Relaxed);
319        Ok(())
320    }
321
322    fn reset(&mut self) -> anyhow::Result<()> {
323        log::debug!("Resetting {id}", id = self.client_id);
324        self.is_connected.store(false, Ordering::Relaxed);
325        self.cancellation_token = CancellationToken::new();
326        self.tasks.clear();
327        self.book_channels
328            .write()
329            .expect("book channel cache lock poisoned")
330            .clear();
331        Ok(())
332    }
333
334    fn dispose(&mut self) -> anyhow::Result<()> {
335        log::debug!("Disposing {id}", id = self.client_id);
336        self.stop()
337    }
338
339    async fn connect(&mut self) -> anyhow::Result<()> {
340        if self.is_connected() {
341            return Ok(());
342        }
343
344        let instrument_types = if self.config.instrument_types.is_empty() {
345            vec![OKXInstrumentType::Spot]
346        } else {
347            self.config.instrument_types.clone()
348        };
349
350        let mut all_instruments = Vec::new();
351        for inst_type in &instrument_types {
352            let mut fetched = self
353                .http_client
354                .request_instruments(*inst_type, None)
355                .await
356                .with_context(|| format!("failed to request OKX instruments for {inst_type:?}"))?;
357
358            fetched.retain(|instrument| contract_filter_with_config(&self.config, instrument));
359            self.http_client.cache_instruments(fetched.clone());
360
361            let mut guard = self.instruments.write().expect(MUTEX_POISONED);
362            for instrument in &fetched {
363                guard.insert(instrument.id(), instrument.clone());
364            }
365            drop(guard);
366
367            all_instruments.extend(fetched);
368        }
369
370        for instrument in all_instruments {
371            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
372                log::warn!("Failed to send instrument: {e}");
373            }
374        }
375
376        if let Some(ref mut ws) = self.ws_public {
377            // Cache instruments to websocket before connecting so handler has them
378            let instruments: Vec<_> = self
379                .instruments
380                .read()
381                .expect(MUTEX_POISONED)
382                .values()
383                .cloned()
384                .collect();
385            ws.cache_instruments(instruments);
386
387            ws.connect()
388                .await
389                .context("failed to connect OKX public websocket")?;
390            ws.wait_until_active(10.0)
391                .await
392                .context("public websocket did not become active")?;
393
394            let stream = ws.stream();
395            let sender = self.data_sender.clone();
396            let insts = self.instruments.clone();
397            let cancel = self.cancellation_token.clone();
398            let handle = get_runtime().spawn(async move {
399                pin_mut!(stream);
400                loop {
401                    tokio::select! {
402                        Some(message) = stream.next() => {
403                            Self::handle_ws_message(message, &sender, &insts);
404                        }
405                        () = cancel.cancelled() => {
406                            log::debug!("Public websocket stream task cancelled");
407                            break;
408                        }
409                    }
410                }
411            });
412            self.tasks.push(handle);
413
414            for inst_type in &instrument_types {
415                ws.subscribe_instruments(*inst_type)
416                    .await
417                    .with_context(|| {
418                        format!("failed to subscribe to instrument type {inst_type:?}")
419                    })?;
420            }
421        }
422
423        if let Some(ref mut ws) = self.ws_business {
424            // Cache instruments to websocket before connecting so handler has them
425            let instruments: Vec<_> = self
426                .instruments
427                .read()
428                .expect(MUTEX_POISONED)
429                .values()
430                .cloned()
431                .collect();
432            ws.cache_instruments(instruments);
433
434            ws.connect()
435                .await
436                .context("failed to connect OKX business websocket")?;
437            ws.wait_until_active(10.0)
438                .await
439                .context("business websocket did not become active")?;
440
441            let stream = ws.stream();
442            let sender = self.data_sender.clone();
443            let insts = self.instruments.clone();
444            let cancel = self.cancellation_token.clone();
445            let handle = get_runtime().spawn(async move {
446                pin_mut!(stream);
447                loop {
448                    tokio::select! {
449                        Some(message) = stream.next() => {
450                            Self::handle_ws_message(message, &sender, &insts);
451                        }
452                        () = cancel.cancelled() => {
453                            log::debug!("Business websocket stream task cancelled");
454                            break;
455                        }
456                    }
457                }
458            });
459            self.tasks.push(handle);
460        }
461
462        self.is_connected.store(true, Ordering::Release);
463        log::info!("Connected: client_id={}", self.client_id);
464        Ok(())
465    }
466
467    async fn disconnect(&mut self) -> anyhow::Result<()> {
468        if self.is_disconnected() {
469            return Ok(());
470        }
471
472        self.cancellation_token.cancel();
473
474        if let Some(ref ws) = self.ws_public
475            && let Err(e) = ws.unsubscribe_all().await
476        {
477            log::warn!("Failed to unsubscribe all from public websocket: {e:?}");
478        }
479        if let Some(ref ws) = self.ws_business
480            && let Err(e) = ws.unsubscribe_all().await
481        {
482            log::warn!("Failed to unsubscribe all from business websocket: {e:?}");
483        }
484
485        // Allow time for unsubscribe confirmations
486        tokio::time::sleep(Duration::from_millis(500)).await;
487
488        if let Some(ref mut ws) = self.ws_public {
489            let _ = ws.close().await;
490        }
491        if let Some(ref mut ws) = self.ws_business {
492            let _ = ws.close().await;
493        }
494
495        let handles: Vec<_> = self.tasks.drain(..).collect();
496        for handle in handles {
497            if let Err(e) = handle.await {
498                log::error!("Error joining websocket task: {e}");
499            }
500        }
501
502        self.book_channels.write().expect(MUTEX_POISONED).clear();
503        self.is_connected.store(false, Ordering::Release);
504        log::info!("Disconnected: client_id={}", self.client_id);
505        Ok(())
506    }
507
508    fn is_connected(&self) -> bool {
509        self.is_connected.load(Ordering::Relaxed)
510    }
511
512    fn is_disconnected(&self) -> bool {
513        !self.is_connected()
514    }
515
516    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
517        for inst_type in &self.config.instrument_types {
518            let ws = self.public_ws()?.clone();
519            let inst_type = *inst_type;
520
521            self.spawn_ws(
522                async move {
523                    ws.subscribe_instruments(inst_type)
524                        .await
525                        .context("instruments subscription")?;
526                    Ok(())
527                },
528                "subscribe_instruments",
529            );
530        }
531        Ok(())
532    }
533
534    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
535        // OKX instruments channel doesn't support subscribing to individual instruments via instId
536        // Instead, subscribe to the instrument type if not already subscribed
537        let instrument_id = cmd.instrument_id;
538        let ws = self.public_ws()?.clone();
539
540        self.spawn_ws(
541            async move {
542                ws.subscribe_instrument(instrument_id)
543                    .await
544                    .context("instrument type subscription")?;
545                Ok(())
546            },
547            "subscribe_instrument",
548        );
549        Ok(())
550    }
551
552    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
553        if cmd.book_type != BookType::L2_MBP {
554            anyhow::bail!("OKX only supports L2_MBP order book deltas");
555        }
556
557        let depth = cmd.depth.map_or(0, |d| d.get());
558        if !matches!(depth, 0 | 50 | 400) {
559            anyhow::bail!("invalid depth {depth}; valid values are 50 or 400");
560        }
561
562        let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
563        let channel = match depth {
564            50 => {
565                if vip < OKXVipLevel::Vip4 {
566                    anyhow::bail!(
567                        "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
568                    );
569                }
570                OKXBookChannel::Books50L2Tbt
571            }
572            0 | 400 => {
573                if vip >= OKXVipLevel::Vip5 {
574                    OKXBookChannel::BookL2Tbt
575                } else {
576                    OKXBookChannel::Book
577                }
578            }
579            _ => unreachable!(),
580        };
581
582        let instrument_id = cmd.instrument_id;
583        let ws = self.public_ws()?.clone();
584        let book_channels = Arc::clone(&self.book_channels);
585
586        self.spawn_ws(
587            async move {
588                match channel {
589                    OKXBookChannel::Books50L2Tbt => ws
590                        .subscribe_book50_l2_tbt(instrument_id)
591                        .await
592                        .context("books50-l2-tbt subscription")?,
593                    OKXBookChannel::BookL2Tbt => ws
594                        .subscribe_book_l2_tbt(instrument_id)
595                        .await
596                        .context("books-l2-tbt subscription")?,
597                    OKXBookChannel::Book => ws
598                        .subscribe_books_channel(instrument_id)
599                        .await
600                        .context("books subscription")?,
601                }
602                book_channels
603                    .write()
604                    .expect("book channel cache lock poisoned")
605                    .insert(instrument_id, channel);
606                Ok(())
607            },
608            "order book delta subscription",
609        );
610
611        Ok(())
612    }
613
614    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
615        let ws = self.public_ws()?.clone();
616        let instrument_id = cmd.instrument_id;
617
618        self.spawn_ws(
619            async move {
620                ws.subscribe_quotes(instrument_id)
621                    .await
622                    .context("quotes subscription")
623            },
624            "quote subscription",
625        );
626        Ok(())
627    }
628
629    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
630        let ws = self.public_ws()?.clone();
631        let instrument_id = cmd.instrument_id;
632
633        self.spawn_ws(
634            async move {
635                ws.subscribe_trades(instrument_id, false)
636                    .await
637                    .context("trades subscription")
638            },
639            "trade subscription",
640        );
641        Ok(())
642    }
643
644    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
645        let ws = self.public_ws()?.clone();
646        let instrument_id = cmd.instrument_id;
647
648        self.spawn_ws(
649            async move {
650                ws.subscribe_mark_prices(instrument_id)
651                    .await
652                    .context("mark price subscription")
653            },
654            "mark price subscription",
655        );
656        Ok(())
657    }
658
659    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
660        let ws = self.public_ws()?.clone();
661        let instrument_id = cmd.instrument_id;
662
663        self.spawn_ws(
664            async move {
665                ws.subscribe_index_prices(instrument_id)
666                    .await
667                    .context("index price subscription")
668            },
669            "index price subscription",
670        );
671        Ok(())
672    }
673
674    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
675        let ws = self.public_ws()?.clone();
676        let instrument_id = cmd.instrument_id;
677
678        self.spawn_ws(
679            async move {
680                ws.subscribe_funding_rates(instrument_id)
681                    .await
682                    .context("funding rate subscription")
683            },
684            "funding rate subscription",
685        );
686        Ok(())
687    }
688
689    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
690        let ws = self.business_ws()?.clone();
691        let bar_type = cmd.bar_type;
692
693        self.spawn_ws(
694            async move {
695                ws.subscribe_bars(bar_type)
696                    .await
697                    .context("bars subscription")
698            },
699            "bar subscription",
700        );
701        Ok(())
702    }
703
704    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
705        let ws = self.public_ws()?.clone();
706        let instrument_id = cmd.instrument_id;
707        let channel = self
708            .book_channels
709            .write()
710            .expect("book channel cache lock poisoned")
711            .remove(&instrument_id);
712
713        self.spawn_ws(
714            async move {
715                match channel {
716                    Some(OKXBookChannel::Books50L2Tbt) => ws
717                        .unsubscribe_book50_l2_tbt(instrument_id)
718                        .await
719                        .context("books50-l2-tbt unsubscribe")?,
720                    Some(OKXBookChannel::BookL2Tbt) => ws
721                        .unsubscribe_book_l2_tbt(instrument_id)
722                        .await
723                        .context("books-l2-tbt unsubscribe")?,
724                    Some(OKXBookChannel::Book) => ws
725                        .unsubscribe_book(instrument_id)
726                        .await
727                        .context("book unsubscribe")?,
728                    None => {
729                        log::warn!(
730                            "Book channel not found for {instrument_id}; unsubscribing fallback channel"
731                        );
732                        ws.unsubscribe_book(instrument_id)
733                            .await
734                            .context("book fallback unsubscribe")?;
735                    }
736                }
737                Ok(())
738            },
739            "order book unsubscribe",
740        );
741        Ok(())
742    }
743
744    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
745        let ws = self.public_ws()?.clone();
746        let instrument_id = cmd.instrument_id;
747
748        self.spawn_ws(
749            async move {
750                ws.unsubscribe_quotes(instrument_id)
751                    .await
752                    .context("quotes unsubscribe")
753            },
754            "quote unsubscribe",
755        );
756        Ok(())
757    }
758
759    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
760        let ws = self.public_ws()?.clone();
761        let instrument_id = cmd.instrument_id;
762
763        self.spawn_ws(
764            async move {
765                ws.unsubscribe_trades(instrument_id, false) // TODO: Aggregated trades?
766                    .await
767                    .context("trades unsubscribe")
768            },
769            "trade unsubscribe",
770        );
771        Ok(())
772    }
773
774    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
775        let ws = self.public_ws()?.clone();
776        let instrument_id = cmd.instrument_id;
777
778        self.spawn_ws(
779            async move {
780                ws.unsubscribe_mark_prices(instrument_id)
781                    .await
782                    .context("mark price unsubscribe")
783            },
784            "mark price unsubscribe",
785        );
786        Ok(())
787    }
788
789    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
790        let ws = self.public_ws()?.clone();
791        let instrument_id = cmd.instrument_id;
792
793        self.spawn_ws(
794            async move {
795                ws.unsubscribe_index_prices(instrument_id)
796                    .await
797                    .context("index price unsubscribe")
798            },
799            "index price unsubscribe",
800        );
801        Ok(())
802    }
803
804    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
805        let ws = self.public_ws()?.clone();
806        let instrument_id = cmd.instrument_id;
807
808        self.spawn_ws(
809            async move {
810                ws.unsubscribe_funding_rates(instrument_id)
811                    .await
812                    .context("funding rate unsubscribe")
813            },
814            "funding rate unsubscribe",
815        );
816        Ok(())
817    }
818
819    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
820        let ws = self.business_ws()?.clone();
821        let bar_type = cmd.bar_type;
822
823        self.spawn_ws(
824            async move {
825                ws.unsubscribe_bars(bar_type)
826                    .await
827                    .context("bars unsubscribe")
828            },
829            "bar unsubscribe",
830        );
831        Ok(())
832    }
833
834    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
835        let http = self.http_client.clone();
836        let sender = self.data_sender.clone();
837        let instruments_cache = self.instruments.clone();
838        let request_id = request.request_id;
839        let client_id = request.client_id.unwrap_or(self.client_id);
840        let venue = self.venue();
841        let start = request.start;
842        let end = request.end;
843        let params = request.params.clone();
844        let clock = self.clock;
845        let start_nanos = datetime_to_unix_nanos(start);
846        let end_nanos = datetime_to_unix_nanos(end);
847        let instrument_types = if self.config.instrument_types.is_empty() {
848            vec![OKXInstrumentType::Spot]
849        } else {
850            self.config.instrument_types.clone()
851        };
852        let contract_types = self.config.contract_types.clone();
853        let instrument_families = self.config.instrument_families.clone();
854
855        get_runtime().spawn(async move {
856            let mut all_instruments = Vec::new();
857
858            for inst_type in instrument_types {
859                let supports_family = matches!(
860                    inst_type,
861                    OKXInstrumentType::Futures
862                        | OKXInstrumentType::Swap
863                        | OKXInstrumentType::Option
864                );
865
866                let families = match (&instrument_families, inst_type, supports_family) {
867                    (Some(families), OKXInstrumentType::Option, true) => families.clone(),
868                    (Some(families), _, true) => families.clone(),
869                    (None, OKXInstrumentType::Option, _) => {
870                        log::warn!(
871                            "Skipping OPTION type: instrument_families required but not configured"
872                        );
873                        continue;
874                    }
875                    _ => vec![],
876                };
877
878                if families.is_empty() {
879                    match http.request_instruments(inst_type, None).await {
880                        Ok(instruments) => {
881                            for instrument in instruments {
882                                if !contract_filter_with_config_types(
883                                    contract_types.as_ref(),
884                                    &instrument,
885                                ) {
886                                    continue;
887                                }
888
889                                upsert_instrument(&instruments_cache, instrument.clone());
890                                all_instruments.push(instrument);
891                            }
892                        }
893                        Err(e) => {
894                            log::error!("Failed to fetch instruments for {inst_type:?}: {e:?}");
895                        }
896                    }
897                } else {
898                    for family in families {
899                        match http
900                            .request_instruments(inst_type, Some(family.clone()))
901                            .await
902                        {
903                            Ok(instruments) => {
904                                for instrument in instruments {
905                                    if !contract_filter_with_config_types(
906                                        contract_types.as_ref(),
907                                        &instrument,
908                                    ) {
909                                        continue;
910                                    }
911
912                                    upsert_instrument(&instruments_cache, instrument.clone());
913                                    all_instruments.push(instrument);
914                                }
915                            }
916                            Err(e) => {
917                                log::error!(
918                                    "Failed to fetch instruments for {inst_type:?} family {family}: {e:?}"
919                                );
920                            }
921                        }
922                    }
923                }
924            }
925
926            let response = DataResponse::Instruments(InstrumentsResponse::new(
927                request_id,
928                client_id,
929                venue,
930                all_instruments,
931                start_nanos,
932                end_nanos,
933                clock.get_time_ns(),
934                params,
935            ));
936
937            if let Err(e) = sender.send(DataEvent::Response(response)) {
938                log::error!("Failed to send instruments response: {e}");
939            }
940        });
941
942        Ok(())
943    }
944
945    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
946        let http = self.http_client.clone();
947        let sender = self.data_sender.clone();
948        let instruments = self.instruments.clone();
949        let instrument_id = request.instrument_id;
950        let request_id = request.request_id;
951        let client_id = request.client_id.unwrap_or(self.client_id);
952        let start = request.start;
953        let end = request.end;
954        let params = request.params.clone();
955        let clock = self.clock;
956        let start_nanos = datetime_to_unix_nanos(start);
957        let end_nanos = datetime_to_unix_nanos(end);
958        let instrument_types = if self.config.instrument_types.is_empty() {
959            vec![OKXInstrumentType::Spot]
960        } else {
961            self.config.instrument_types.clone()
962        };
963        let contract_types = self.config.contract_types.clone();
964
965        get_runtime().spawn(async move {
966            match http
967                .request_instrument(instrument_id)
968                .await
969                .context("fetch instrument from API")
970            {
971                Ok(instrument) => {
972                    let inst_id = instrument.id();
973                    let symbol = inst_id.symbol.as_str();
974                    let inst_type = okx_instrument_type_from_symbol(symbol);
975                    if !instrument_types.contains(&inst_type) {
976                        log::error!(
977                            "Instrument {instrument_id} type {inst_type:?} not in configured types {instrument_types:?}"
978                        );
979                        return;
980                    }
981
982                    if !contract_filter_with_config_types(contract_types.as_ref(), &instrument) {
983                        log::error!(
984                            "Instrument {instrument_id} filtered out by contract_types config"
985                        );
986                        return;
987                    }
988
989                    upsert_instrument(&instruments, instrument.clone());
990
991                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
992                        request_id,
993                        client_id,
994                        instrument.id(),
995                        instrument,
996                        start_nanos,
997                        end_nanos,
998                        clock.get_time_ns(),
999                        params,
1000                    )));
1001
1002                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1003                        log::error!("Failed to send instrument response: {e}");
1004                    }
1005                }
1006                Err(e) => log::error!("Instrument request failed: {e:?}"),
1007            }
1008        });
1009
1010        Ok(())
1011    }
1012
1013    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
1014        let http = self.http_client.clone();
1015        let sender = self.data_sender.clone();
1016        let instrument_id = request.instrument_id;
1017        let start = request.start;
1018        let end = request.end;
1019        let limit = request.limit.map(|n| n.get() as u32);
1020        let request_id = request.request_id;
1021        let client_id = request.client_id.unwrap_or(self.client_id);
1022        let params = request.params.clone();
1023        let clock = self.clock;
1024        let start_nanos = datetime_to_unix_nanos(start);
1025        let end_nanos = datetime_to_unix_nanos(end);
1026
1027        get_runtime().spawn(async move {
1028            match http
1029                .request_trades(instrument_id, start, end, limit)
1030                .await
1031                .context("failed to request trades from OKX")
1032            {
1033                Ok(trades) => {
1034                    let response = DataResponse::Trades(TradesResponse::new(
1035                        request_id,
1036                        client_id,
1037                        instrument_id,
1038                        trades,
1039                        start_nanos,
1040                        end_nanos,
1041                        clock.get_time_ns(),
1042                        params,
1043                    ));
1044                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1045                        log::error!("Failed to send trades response: {e}");
1046                    }
1047                }
1048                Err(e) => log::error!("Trade request failed: {e:?}"),
1049            }
1050        });
1051
1052        Ok(())
1053    }
1054
1055    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1056        let http = self.http_client.clone();
1057        let sender = self.data_sender.clone();
1058        let bar_type = request.bar_type;
1059        let start = request.start;
1060        let end = request.end;
1061        let limit = request.limit.map(|n| n.get() as u32);
1062        let request_id = request.request_id;
1063        let client_id = request.client_id.unwrap_or(self.client_id);
1064        let params = request.params.clone();
1065        let clock = self.clock;
1066        let start_nanos = datetime_to_unix_nanos(start);
1067        let end_nanos = datetime_to_unix_nanos(end);
1068
1069        get_runtime().spawn(async move {
1070            match http
1071                .request_bars(bar_type, start, end, limit)
1072                .await
1073                .context("failed to request bars from OKX")
1074            {
1075                Ok(bars) => {
1076                    let response = DataResponse::Bars(BarsResponse::new(
1077                        request_id,
1078                        client_id,
1079                        bar_type,
1080                        bars,
1081                        start_nanos,
1082                        end_nanos,
1083                        clock.get_time_ns(),
1084                        params,
1085                    ));
1086                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1087                        log::error!("Failed to send bars response: {e}");
1088                    }
1089                }
1090                Err(e) => log::error!("Bar request failed: {e:?}"),
1091            }
1092        });
1093
1094        Ok(())
1095    }
1096}