nautilus_okx/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the OKX adapter.
17
18use std::sync::{
19    Arc, RwLock,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use chrono::{DateTime, Utc};
26use futures_util::{StreamExt, pin_mut};
27use nautilus_common::{
28    live::runner::get_data_event_sender,
29    messages::{
30        DataEvent,
31        data::{
32            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
34            SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeFundingRates,
35            SubscribeIndexPrices, SubscribeInstrument, SubscribeInstruments, SubscribeMarkPrices,
36            SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
37            UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeFundingRates,
38            UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
39        },
40    },
41};
42use nautilus_core::{
43    MUTEX_POISONED, UnixNanos,
44    time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_data::client::DataClient;
47use nautilus_model::{
48    data::{Data, FundingRateUpdate, OrderBookDeltas_API},
49    enums::BookType,
50    identifiers::{ClientId, InstrumentId, Venue},
51    instruments::{Instrument, InstrumentAny},
52};
53use tokio::{task::JoinHandle, time::Duration};
54use tokio_util::sync::CancellationToken;
55
56use crate::{
57    common::{
58        consts::OKX_VENUE,
59        enums::{OKXBookChannel, OKXContractType, OKXInstrumentType, OKXVipLevel},
60        parse::okx_instrument_type_from_symbol,
61    },
62    config::OKXDataClientConfig,
63    http::client::OKXHttpClient,
64    websocket::{client::OKXWebSocketClient, messages::NautilusWsMessage},
65};
66
67#[derive(Debug)]
68pub struct OKXDataClient {
69    client_id: ClientId,
70    config: OKXDataClientConfig,
71    http_client: OKXHttpClient,
72    ws_public: Option<OKXWebSocketClient>,
73    ws_business: Option<OKXWebSocketClient>,
74    is_connected: AtomicBool,
75    cancellation_token: CancellationToken,
76    tasks: Vec<JoinHandle<()>>,
77    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
78    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
79    book_channels: Arc<RwLock<AHashMap<InstrumentId, OKXBookChannel>>>,
80    clock: &'static AtomicTime,
81}
82
83impl OKXDataClient {
84    /// Creates a new [`OKXDataClient`] instance.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the client fails to initialize.
89    pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
90        let clock = get_atomic_clock_realtime();
91        let data_sender = get_data_event_sender();
92
93        let http_client = if config.has_api_credentials() {
94            OKXHttpClient::with_credentials(
95                config.api_key.clone(),
96                config.api_secret.clone(),
97                config.api_passphrase.clone(),
98                config.base_url_http.clone(),
99                config.http_timeout_secs,
100                config.max_retries,
101                config.retry_delay_initial_ms,
102                config.retry_delay_max_ms,
103                config.is_demo,
104                config.http_proxy_url.clone(),
105            )?
106        } else {
107            OKXHttpClient::new(
108                config.base_url_http.clone(),
109                config.http_timeout_secs,
110                config.max_retries,
111                config.retry_delay_initial_ms,
112                config.retry_delay_max_ms,
113                config.is_demo,
114                config.http_proxy_url.clone(),
115            )?
116        };
117
118        let ws_public = OKXWebSocketClient::new(
119            Some(config.ws_public_url()),
120            None,
121            None,
122            None,
123            None,
124            Some(20), // Heartbeat
125        )
126        .context("failed to construct OKX public websocket client")?;
127
128        let ws_business = if config.requires_business_ws() {
129            Some(
130                OKXWebSocketClient::new(
131                    Some(config.ws_business_url()),
132                    config.api_key.clone(),
133                    config.api_secret.clone(),
134                    config.api_passphrase.clone(),
135                    None,
136                    Some(20), // Heartbeat
137                )
138                .context("failed to construct OKX business websocket client")?,
139            )
140        } else {
141            None
142        };
143
144        if let Some(vip_level) = config.vip_level {
145            ws_public.set_vip_level(vip_level);
146            if let Some(ref ws) = ws_business {
147                ws.set_vip_level(vip_level);
148            }
149        }
150
151        Ok(Self {
152            client_id,
153            config,
154            http_client,
155            ws_public: Some(ws_public),
156            ws_business,
157            is_connected: AtomicBool::new(false),
158            cancellation_token: CancellationToken::new(),
159            tasks: Vec::new(),
160            data_sender,
161            instruments: Arc::new(RwLock::new(AHashMap::new())),
162            book_channels: Arc::new(RwLock::new(AHashMap::new())),
163            clock,
164        })
165    }
166
167    fn venue(&self) -> Venue {
168        *OKX_VENUE
169    }
170
171    fn vip_level(&self) -> Option<OKXVipLevel> {
172        self.ws_public.as_ref().map(|ws| ws.vip_level())
173    }
174
175    fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
176        self.ws_public
177            .as_ref()
178            .context("public websocket client not initialized")
179    }
180
181    fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
182        self.ws_business
183            .as_ref()
184            .context("business websocket client not available (credentials required)")
185    }
186
187    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
188        if let Err(e) = sender.send(DataEvent::Data(data)) {
189            tracing::error!("Failed to emit data event: {e}");
190        }
191    }
192
193    fn spawn_ws<F>(&self, fut: F, context: &'static str)
194    where
195        F: Future<Output = anyhow::Result<()>> + Send + 'static,
196    {
197        tokio::spawn(async move {
198            if let Err(e) = fut.await {
199                tracing::error!("{context}: {e:?}");
200            }
201        });
202    }
203
204    fn handle_ws_message(
205        message: NautilusWsMessage,
206        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
207        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
208    ) {
209        match message {
210            NautilusWsMessage::Data(payloads) => {
211                for data in payloads {
212                    Self::send_data(data_sender, data);
213                }
214            }
215            NautilusWsMessage::Deltas(deltas) => {
216                Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
217            }
218            NautilusWsMessage::FundingRates(updates) => {
219                emit_funding_rates(updates);
220            }
221            NautilusWsMessage::Instrument(instrument) => {
222                upsert_instrument(instruments, *instrument);
223            }
224            NautilusWsMessage::AccountUpdate(_)
225            | NautilusWsMessage::PositionUpdate(_)
226            | NautilusWsMessage::OrderAccepted(_)
227            | NautilusWsMessage::OrderCanceled(_)
228            | NautilusWsMessage::OrderExpired(_)
229            | NautilusWsMessage::OrderRejected(_)
230            | NautilusWsMessage::OrderCancelRejected(_)
231            | NautilusWsMessage::OrderModifyRejected(_)
232            | NautilusWsMessage::OrderTriggered(_)
233            | NautilusWsMessage::OrderUpdated(_)
234            | NautilusWsMessage::ExecutionReports(_) => {
235                tracing::debug!("Ignoring trading message on data client");
236            }
237            NautilusWsMessage::Error(e) => {
238                tracing::error!("OKX websocket error: {e:?}");
239            }
240            NautilusWsMessage::Raw(value) => {
241                tracing::debug!("Unhandled websocket payload: {value:?}");
242            }
243            NautilusWsMessage::Reconnected => {
244                tracing::info!("Websocket reconnected");
245            }
246            NautilusWsMessage::Authenticated => {
247                tracing::debug!("Websocket authenticated");
248            }
249        }
250    }
251}
252
253fn emit_funding_rates(updates: Vec<FundingRateUpdate>) {
254    if updates.is_empty() {
255        return;
256    }
257
258    for update in updates {
259        tracing::debug!(
260            "Received funding rate update for {} but forwarding is not yet supported",
261            update.instrument_id
262        );
263    }
264}
265
266fn upsert_instrument(
267    cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
268    instrument: InstrumentAny,
269) {
270    let mut guard = cache.write().expect(MUTEX_POISONED);
271    guard.insert(instrument.id(), instrument);
272}
273
274fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
275    value
276        .and_then(|dt| dt.timestamp_nanos_opt())
277        .and_then(|nanos| u64::try_from(nanos).ok())
278        .map(UnixNanos::from)
279}
280
281fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
282    contract_filter_with_config_types(config.contract_types.as_ref(), instrument)
283}
284
285fn contract_filter_with_config_types(
286    contract_types: Option<&Vec<OKXContractType>>,
287    instrument: &InstrumentAny,
288) -> bool {
289    match contract_types {
290        None => true,
291        Some(filter) if filter.is_empty() => true,
292        Some(filter) => {
293            let is_inverse = instrument.is_inverse();
294            (is_inverse && filter.contains(&OKXContractType::Inverse))
295                || (!is_inverse && filter.contains(&OKXContractType::Linear))
296        }
297    }
298}
299
300#[async_trait::async_trait(?Send)]
301impl DataClient for OKXDataClient {
302    fn client_id(&self) -> ClientId {
303        self.client_id
304    }
305
306    fn venue(&self) -> Option<Venue> {
307        Some(self.venue())
308    }
309
310    fn start(&mut self) -> anyhow::Result<()> {
311        tracing::info!(
312            client_id = %self.client_id,
313            vip_level = ?self.vip_level(),
314            instrument_types = ?self.config.instrument_types,
315            is_demo = self.config.is_demo,
316            http_proxy_url = ?self.config.http_proxy_url,
317            ws_proxy_url = ?self.config.ws_proxy_url,
318            "Started"
319        );
320        Ok(())
321    }
322
323    fn stop(&mut self) -> anyhow::Result<()> {
324        tracing::info!("Stopping {id}", id = self.client_id);
325        self.cancellation_token.cancel();
326        self.is_connected.store(false, Ordering::Relaxed);
327        Ok(())
328    }
329
330    fn reset(&mut self) -> anyhow::Result<()> {
331        tracing::debug!("Resetting {id}", id = self.client_id);
332        self.is_connected.store(false, Ordering::Relaxed);
333        self.cancellation_token = CancellationToken::new();
334        self.tasks.clear();
335        self.book_channels
336            .write()
337            .expect("book channel cache lock poisoned")
338            .clear();
339        Ok(())
340    }
341
342    fn dispose(&mut self) -> anyhow::Result<()> {
343        tracing::debug!("Disposing {id}", id = self.client_id);
344        self.stop()
345    }
346
347    async fn connect(&mut self) -> anyhow::Result<()> {
348        if self.is_connected() {
349            return Ok(());
350        }
351
352        let instrument_types = if self.config.instrument_types.is_empty() {
353            vec![OKXInstrumentType::Spot]
354        } else {
355            self.config.instrument_types.clone()
356        };
357
358        let mut all_instruments = Vec::new();
359        for inst_type in &instrument_types {
360            let mut fetched = self
361                .http_client
362                .request_instruments(*inst_type, None)
363                .await
364                .with_context(|| format!("failed to request OKX instruments for {inst_type:?}"))?;
365
366            fetched.retain(|instrument| contract_filter_with_config(&self.config, instrument));
367            self.http_client.cache_instruments(fetched.clone());
368
369            let mut guard = self.instruments.write().expect(MUTEX_POISONED);
370            for instrument in &fetched {
371                guard.insert(instrument.id(), instrument.clone());
372            }
373            drop(guard);
374
375            all_instruments.extend(fetched);
376        }
377
378        for instrument in all_instruments {
379            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
380                tracing::warn!("Failed to send instrument: {e}");
381            }
382        }
383
384        if let Some(ref mut ws) = self.ws_public {
385            // Cache instruments to websocket before connecting so handler has them
386            let instruments: Vec<_> = self
387                .instruments
388                .read()
389                .expect(MUTEX_POISONED)
390                .values()
391                .cloned()
392                .collect();
393            ws.cache_instruments(instruments);
394
395            ws.connect()
396                .await
397                .context("failed to connect OKX public websocket")?;
398            ws.wait_until_active(10.0)
399                .await
400                .context("public websocket did not become active")?;
401
402            let stream = ws.stream();
403            let sender = self.data_sender.clone();
404            let insts = self.instruments.clone();
405            let cancel = self.cancellation_token.clone();
406            let handle = tokio::spawn(async move {
407                pin_mut!(stream);
408                loop {
409                    tokio::select! {
410                        Some(message) = stream.next() => {
411                            Self::handle_ws_message(message, &sender, &insts);
412                        }
413                        _ = cancel.cancelled() => {
414                            tracing::debug!("Public websocket stream task cancelled");
415                            break;
416                        }
417                    }
418                }
419            });
420            self.tasks.push(handle);
421
422            for inst_type in &instrument_types {
423                ws.subscribe_instruments(*inst_type)
424                    .await
425                    .with_context(|| {
426                        format!("failed to subscribe to instrument type {inst_type:?}")
427                    })?;
428            }
429        }
430
431        if let Some(ref mut ws) = self.ws_business {
432            // Cache instruments to websocket before connecting so handler has them
433            let instruments: Vec<_> = self
434                .instruments
435                .read()
436                .expect(MUTEX_POISONED)
437                .values()
438                .cloned()
439                .collect();
440            ws.cache_instruments(instruments);
441
442            ws.connect()
443                .await
444                .context("failed to connect OKX business websocket")?;
445            ws.wait_until_active(10.0)
446                .await
447                .context("business websocket did not become active")?;
448
449            let stream = ws.stream();
450            let sender = self.data_sender.clone();
451            let insts = self.instruments.clone();
452            let cancel = self.cancellation_token.clone();
453            let handle = tokio::spawn(async move {
454                pin_mut!(stream);
455                loop {
456                    tokio::select! {
457                        Some(message) = stream.next() => {
458                            Self::handle_ws_message(message, &sender, &insts);
459                        }
460                        _ = cancel.cancelled() => {
461                            tracing::debug!("Business websocket stream task cancelled");
462                            break;
463                        }
464                    }
465                }
466            });
467            self.tasks.push(handle);
468        }
469
470        self.is_connected.store(true, Ordering::Release);
471        tracing::info!(client_id = %self.client_id, "Connected");
472        Ok(())
473    }
474
475    async fn disconnect(&mut self) -> anyhow::Result<()> {
476        if self.is_disconnected() {
477            return Ok(());
478        }
479
480        self.cancellation_token.cancel();
481
482        if let Some(ref ws) = self.ws_public
483            && let Err(e) = ws.unsubscribe_all().await
484        {
485            tracing::warn!("Failed to unsubscribe all from public websocket: {e:?}");
486        }
487        if let Some(ref ws) = self.ws_business
488            && let Err(e) = ws.unsubscribe_all().await
489        {
490            tracing::warn!("Failed to unsubscribe all from business websocket: {e:?}");
491        }
492
493        // Allow time for unsubscribe confirmations
494        tokio::time::sleep(Duration::from_millis(500)).await;
495
496        if let Some(ref mut ws) = self.ws_public {
497            let _ = ws.close().await;
498        }
499        if let Some(ref mut ws) = self.ws_business {
500            let _ = ws.close().await;
501        }
502
503        let handles: Vec<_> = self.tasks.drain(..).collect();
504        for handle in handles {
505            if let Err(e) = handle.await {
506                tracing::error!("Error joining websocket task: {e}");
507            }
508        }
509
510        self.book_channels.write().expect(MUTEX_POISONED).clear();
511        self.is_connected.store(false, Ordering::Release);
512        tracing::info!(client_id = %self.client_id, "Disconnected");
513        Ok(())
514    }
515
516    fn is_connected(&self) -> bool {
517        self.is_connected.load(Ordering::Relaxed)
518    }
519
520    fn is_disconnected(&self) -> bool {
521        !self.is_connected()
522    }
523
524    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
525        for inst_type in &self.config.instrument_types {
526            let ws = self.public_ws()?.clone();
527            let inst_type = *inst_type;
528
529            self.spawn_ws(
530                async move {
531                    ws.subscribe_instruments(inst_type)
532                        .await
533                        .context("instruments subscription")?;
534                    Ok(())
535                },
536                "subscribe_instruments",
537            );
538        }
539        Ok(())
540    }
541
542    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
543        // OKX instruments channel doesn't support subscribing to individual instruments via instId
544        // Instead, subscribe to the instrument type if not already subscribed
545        let instrument_id = cmd.instrument_id;
546        let ws = self.public_ws()?.clone();
547
548        self.spawn_ws(
549            async move {
550                ws.subscribe_instrument(instrument_id)
551                    .await
552                    .context("instrument type subscription")?;
553                Ok(())
554            },
555            "subscribe_instrument",
556        );
557        Ok(())
558    }
559
560    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
561        if cmd.book_type != BookType::L2_MBP {
562            anyhow::bail!("OKX only supports L2_MBP order book deltas");
563        }
564
565        let depth = cmd.depth.map_or(0, |d| d.get());
566        if !matches!(depth, 0 | 50 | 400) {
567            anyhow::bail!("invalid depth {depth}; valid values are 50 or 400");
568        }
569
570        let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
571        let channel = match depth {
572            50 => {
573                if vip < OKXVipLevel::Vip4 {
574                    anyhow::bail!(
575                        "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
576                    );
577                }
578                OKXBookChannel::Books50L2Tbt
579            }
580            0 | 400 => {
581                if vip >= OKXVipLevel::Vip5 {
582                    OKXBookChannel::BookL2Tbt
583                } else {
584                    OKXBookChannel::Book
585                }
586            }
587            _ => unreachable!(),
588        };
589
590        let instrument_id = cmd.instrument_id;
591        let ws = self.public_ws()?.clone();
592        let book_channels = Arc::clone(&self.book_channels);
593
594        self.spawn_ws(
595            async move {
596                match channel {
597                    OKXBookChannel::Books50L2Tbt => ws
598                        .subscribe_book50_l2_tbt(instrument_id)
599                        .await
600                        .context("books50-l2-tbt subscription")?,
601                    OKXBookChannel::BookL2Tbt => ws
602                        .subscribe_book_l2_tbt(instrument_id)
603                        .await
604                        .context("books-l2-tbt subscription")?,
605                    OKXBookChannel::Book => ws
606                        .subscribe_books_channel(instrument_id)
607                        .await
608                        .context("books subscription")?,
609                }
610                book_channels
611                    .write()
612                    .expect("book channel cache lock poisoned")
613                    .insert(instrument_id, channel);
614                Ok(())
615            },
616            "order book delta subscription",
617        );
618
619        Ok(())
620    }
621
622    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
623        if cmd.book_type != BookType::L2_MBP {
624            anyhow::bail!("OKX only supports L2_MBP order book snapshots");
625        }
626        let depth = cmd.depth.map_or(5, |d| d.get());
627        if depth != 5 {
628            anyhow::bail!("OKX only supports depth=5 snapshots");
629        }
630
631        let ws = self.public_ws()?.clone();
632        let instrument_id = cmd.instrument_id;
633
634        self.spawn_ws(
635            async move {
636                ws.subscribe_book_depth5(instrument_id)
637                    .await
638                    .context("books5 subscription")
639            },
640            "order book snapshot subscription",
641        );
642        Ok(())
643    }
644
645    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
646        let ws = self.public_ws()?.clone();
647        let instrument_id = cmd.instrument_id;
648
649        self.spawn_ws(
650            async move {
651                ws.subscribe_quotes(instrument_id)
652                    .await
653                    .context("quotes subscription")
654            },
655            "quote subscription",
656        );
657        Ok(())
658    }
659
660    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
661        let ws = self.public_ws()?.clone();
662        let instrument_id = cmd.instrument_id;
663
664        self.spawn_ws(
665            async move {
666                ws.subscribe_trades(instrument_id, false)
667                    .await
668                    .context("trades subscription")
669            },
670            "trade subscription",
671        );
672        Ok(())
673    }
674
675    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
676        let ws = self.public_ws()?.clone();
677        let instrument_id = cmd.instrument_id;
678
679        self.spawn_ws(
680            async move {
681                ws.subscribe_mark_prices(instrument_id)
682                    .await
683                    .context("mark price subscription")
684            },
685            "mark price subscription",
686        );
687        Ok(())
688    }
689
690    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
691        let ws = self.public_ws()?.clone();
692        let instrument_id = cmd.instrument_id;
693
694        self.spawn_ws(
695            async move {
696                ws.subscribe_index_prices(instrument_id)
697                    .await
698                    .context("index price subscription")
699            },
700            "index price subscription",
701        );
702        Ok(())
703    }
704
705    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
706        let ws = self.public_ws()?.clone();
707        let instrument_id = cmd.instrument_id;
708
709        self.spawn_ws(
710            async move {
711                ws.subscribe_funding_rates(instrument_id)
712                    .await
713                    .context("funding rate subscription")
714            },
715            "funding rate subscription",
716        );
717        Ok(())
718    }
719
720    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
721        let ws = self.business_ws()?.clone();
722        let bar_type = cmd.bar_type;
723
724        self.spawn_ws(
725            async move {
726                ws.subscribe_bars(bar_type)
727                    .await
728                    .context("bars subscription")
729            },
730            "bar subscription",
731        );
732        Ok(())
733    }
734
735    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
736        let ws = self.public_ws()?.clone();
737        let instrument_id = cmd.instrument_id;
738        let channel = self
739            .book_channels
740            .write()
741            .expect("book channel cache lock poisoned")
742            .remove(&instrument_id);
743
744        self.spawn_ws(
745            async move {
746                match channel {
747                    Some(OKXBookChannel::Books50L2Tbt) => ws
748                        .unsubscribe_book50_l2_tbt(instrument_id)
749                        .await
750                        .context("books50-l2-tbt unsubscribe")?,
751                    Some(OKXBookChannel::BookL2Tbt) => ws
752                        .unsubscribe_book_l2_tbt(instrument_id)
753                        .await
754                        .context("books-l2-tbt unsubscribe")?,
755                    Some(OKXBookChannel::Book) => ws
756                        .unsubscribe_book(instrument_id)
757                        .await
758                        .context("book unsubscribe")?,
759                    None => {
760                        tracing::warn!(
761                            "Book channel not found for {instrument_id}; unsubscribing fallback channel"
762                        );
763                        ws.unsubscribe_book(instrument_id)
764                            .await
765                            .context("book fallback unsubscribe")?;
766                    }
767                }
768                Ok(())
769            },
770            "order book unsubscribe",
771        );
772        Ok(())
773    }
774
775    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
776        let ws = self.public_ws()?.clone();
777        let instrument_id = cmd.instrument_id;
778
779        self.spawn_ws(
780            async move {
781                ws.unsubscribe_book_depth5(instrument_id)
782                    .await
783                    .context("book depth5 unsubscribe")
784            },
785            "order book snapshot unsubscribe",
786        );
787        Ok(())
788    }
789
790    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
791        let ws = self.public_ws()?.clone();
792        let instrument_id = cmd.instrument_id;
793
794        self.spawn_ws(
795            async move {
796                ws.unsubscribe_quotes(instrument_id)
797                    .await
798                    .context("quotes unsubscribe")
799            },
800            "quote unsubscribe",
801        );
802        Ok(())
803    }
804
805    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
806        let ws = self.public_ws()?.clone();
807        let instrument_id = cmd.instrument_id;
808
809        self.spawn_ws(
810            async move {
811                ws.unsubscribe_trades(instrument_id, false) // TODO: Aggregated trades?
812                    .await
813                    .context("trades unsubscribe")
814            },
815            "trade unsubscribe",
816        );
817        Ok(())
818    }
819
820    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
821        let ws = self.public_ws()?.clone();
822        let instrument_id = cmd.instrument_id;
823
824        self.spawn_ws(
825            async move {
826                ws.unsubscribe_mark_prices(instrument_id)
827                    .await
828                    .context("mark price unsubscribe")
829            },
830            "mark price unsubscribe",
831        );
832        Ok(())
833    }
834
835    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
836        let ws = self.public_ws()?.clone();
837        let instrument_id = cmd.instrument_id;
838
839        self.spawn_ws(
840            async move {
841                ws.unsubscribe_index_prices(instrument_id)
842                    .await
843                    .context("index price unsubscribe")
844            },
845            "index price unsubscribe",
846        );
847        Ok(())
848    }
849
850    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
851        let ws = self.public_ws()?.clone();
852        let instrument_id = cmd.instrument_id;
853
854        self.spawn_ws(
855            async move {
856                ws.unsubscribe_funding_rates(instrument_id)
857                    .await
858                    .context("funding rate unsubscribe")
859            },
860            "funding rate unsubscribe",
861        );
862        Ok(())
863    }
864
865    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
866        let ws = self.business_ws()?.clone();
867        let bar_type = cmd.bar_type;
868
869        self.spawn_ws(
870            async move {
871                ws.unsubscribe_bars(bar_type)
872                    .await
873                    .context("bars unsubscribe")
874            },
875            "bar unsubscribe",
876        );
877        Ok(())
878    }
879
880    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
881        let http = self.http_client.clone();
882        let sender = self.data_sender.clone();
883        let instruments_cache = self.instruments.clone();
884        let request_id = request.request_id;
885        let client_id = request.client_id.unwrap_or(self.client_id);
886        let venue = self.venue();
887        let start = request.start;
888        let end = request.end;
889        let params = request.params.clone();
890        let clock = self.clock;
891        let start_nanos = datetime_to_unix_nanos(start);
892        let end_nanos = datetime_to_unix_nanos(end);
893        let instrument_types = if self.config.instrument_types.is_empty() {
894            vec![OKXInstrumentType::Spot]
895        } else {
896            self.config.instrument_types.clone()
897        };
898        let contract_types = self.config.contract_types.clone();
899        let instrument_families = self.config.instrument_families.clone();
900
901        tokio::spawn(async move {
902            let mut all_instruments = Vec::new();
903
904            for inst_type in instrument_types {
905                let supports_family = matches!(
906                    inst_type,
907                    OKXInstrumentType::Futures
908                        | OKXInstrumentType::Swap
909                        | OKXInstrumentType::Option
910                );
911
912                let families = match (&instrument_families, inst_type, supports_family) {
913                    (Some(families), OKXInstrumentType::Option, true) => families.clone(),
914                    (Some(families), _, true) => families.clone(),
915                    (None, OKXInstrumentType::Option, _) => {
916                        tracing::warn!(
917                            "Skipping OPTION type: instrument_families required but not configured"
918                        );
919                        continue;
920                    }
921                    _ => vec![],
922                };
923
924                if families.is_empty() {
925                    match http.request_instruments(inst_type, None).await {
926                        Ok(instruments) => {
927                            for instrument in instruments {
928                                if !contract_filter_with_config_types(
929                                    contract_types.as_ref(),
930                                    &instrument,
931                                ) {
932                                    continue;
933                                }
934
935                                upsert_instrument(&instruments_cache, instrument.clone());
936                                all_instruments.push(instrument);
937                            }
938                        }
939                        Err(e) => {
940                            tracing::error!("Failed to fetch instruments for {inst_type:?}: {e:?}");
941                        }
942                    }
943                } else {
944                    for family in families {
945                        match http
946                            .request_instruments(inst_type, Some(family.clone()))
947                            .await
948                        {
949                            Ok(instruments) => {
950                                for instrument in instruments {
951                                    if !contract_filter_with_config_types(
952                                        contract_types.as_ref(),
953                                        &instrument,
954                                    ) {
955                                        continue;
956                                    }
957
958                                    upsert_instrument(&instruments_cache, instrument.clone());
959                                    all_instruments.push(instrument);
960                                }
961                            }
962                            Err(e) => {
963                                tracing::error!(
964                                    "Failed to fetch instruments for {inst_type:?} family {family}: {e:?}"
965                                );
966                            }
967                        }
968                    }
969                }
970            }
971
972            let response = DataResponse::Instruments(InstrumentsResponse::new(
973                request_id,
974                client_id,
975                venue,
976                all_instruments,
977                start_nanos,
978                end_nanos,
979                clock.get_time_ns(),
980                params,
981            ));
982
983            if let Err(e) = sender.send(DataEvent::Response(response)) {
984                tracing::error!("Failed to send instruments response: {e}");
985            }
986        });
987
988        Ok(())
989    }
990
991    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
992        let http = self.http_client.clone();
993        let sender = self.data_sender.clone();
994        let instruments = self.instruments.clone();
995        let instrument_id = request.instrument_id;
996        let request_id = request.request_id;
997        let client_id = request.client_id.unwrap_or(self.client_id);
998        let start = request.start;
999        let end = request.end;
1000        let params = request.params.clone();
1001        let clock = self.clock;
1002        let start_nanos = datetime_to_unix_nanos(start);
1003        let end_nanos = datetime_to_unix_nanos(end);
1004        let instrument_types = if self.config.instrument_types.is_empty() {
1005            vec![OKXInstrumentType::Spot]
1006        } else {
1007            self.config.instrument_types.clone()
1008        };
1009        let contract_types = self.config.contract_types.clone();
1010
1011        tokio::spawn(async move {
1012            match http
1013                .request_instrument(instrument_id)
1014                .await
1015                .context("fetch instrument from API")
1016            {
1017                Ok(instrument) => {
1018                    let inst_id = instrument.id();
1019                    let symbol = inst_id.symbol.as_str();
1020                    let inst_type = okx_instrument_type_from_symbol(symbol);
1021                    if !instrument_types.contains(&inst_type) {
1022                        tracing::error!(
1023                            "Instrument {instrument_id} type {inst_type:?} not in configured types {instrument_types:?}"
1024                        );
1025                        return;
1026                    }
1027
1028                    if !contract_filter_with_config_types(contract_types.as_ref(), &instrument) {
1029                        tracing::error!(
1030                            "Instrument {instrument_id} filtered out by contract_types config"
1031                        );
1032                        return;
1033                    }
1034
1035                    upsert_instrument(&instruments, instrument.clone());
1036
1037                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1038                        request_id,
1039                        client_id,
1040                        instrument.id(),
1041                        instrument,
1042                        start_nanos,
1043                        end_nanos,
1044                        clock.get_time_ns(),
1045                        params,
1046                    )));
1047
1048                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1049                        tracing::error!("Failed to send instrument response: {e}");
1050                    }
1051                }
1052                Err(e) => tracing::error!("Instrument request failed: {e:?}"),
1053            }
1054        });
1055
1056        Ok(())
1057    }
1058
1059    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
1060        let http = self.http_client.clone();
1061        let sender = self.data_sender.clone();
1062        let instrument_id = request.instrument_id;
1063        let start = request.start;
1064        let end = request.end;
1065        let limit = request.limit.map(|n| n.get() as u32);
1066        let request_id = request.request_id;
1067        let client_id = request.client_id.unwrap_or(self.client_id);
1068        let params = request.params.clone();
1069        let clock = self.clock;
1070        let start_nanos = datetime_to_unix_nanos(start);
1071        let end_nanos = datetime_to_unix_nanos(end);
1072
1073        tokio::spawn(async move {
1074            match http
1075                .request_trades(instrument_id, start, end, limit)
1076                .await
1077                .context("failed to request trades from OKX")
1078            {
1079                Ok(trades) => {
1080                    let response = DataResponse::Trades(TradesResponse::new(
1081                        request_id,
1082                        client_id,
1083                        instrument_id,
1084                        trades,
1085                        start_nanos,
1086                        end_nanos,
1087                        clock.get_time_ns(),
1088                        params,
1089                    ));
1090                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1091                        tracing::error!("Failed to send trades response: {e}");
1092                    }
1093                }
1094                Err(e) => tracing::error!("Trade request failed: {e:?}"),
1095            }
1096        });
1097
1098        Ok(())
1099    }
1100
1101    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1102        let http = self.http_client.clone();
1103        let sender = self.data_sender.clone();
1104        let bar_type = request.bar_type;
1105        let start = request.start;
1106        let end = request.end;
1107        let limit = request.limit.map(|n| n.get() as u32);
1108        let request_id = request.request_id;
1109        let client_id = request.client_id.unwrap_or(self.client_id);
1110        let params = request.params.clone();
1111        let clock = self.clock;
1112        let start_nanos = datetime_to_unix_nanos(start);
1113        let end_nanos = datetime_to_unix_nanos(end);
1114
1115        tokio::spawn(async move {
1116            match http
1117                .request_bars(bar_type, start, end, limit)
1118                .await
1119                .context("failed to request bars from OKX")
1120            {
1121                Ok(bars) => {
1122                    let response = DataResponse::Bars(BarsResponse::new(
1123                        request_id,
1124                        client_id,
1125                        bar_type,
1126                        bars,
1127                        start_nanos,
1128                        end_nanos,
1129                        clock.get_time_ns(),
1130                        params,
1131                    ));
1132                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1133                        tracing::error!("Failed to send bars response: {e}");
1134                    }
1135                }
1136                Err(e) => tracing::error!("Bar request failed: {e:?}"),
1137            }
1138        });
1139
1140        Ok(())
1141    }
1142}