nautilus_okx/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the OKX adapter.
17
18use std::sync::{
19    Arc, RwLock,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27    live::{runner::get_data_event_sender, runtime::get_runtime},
28    messages::{
29        DataEvent,
30        data::{
31            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
32            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
33            SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeFundingRates,
34            SubscribeIndexPrices, SubscribeInstrument, SubscribeInstruments, SubscribeMarkPrices,
35            SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
36            UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeFundingRates,
37            UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
38        },
39    },
40};
41use nautilus_core::{
42    MUTEX_POISONED,
43    datetime::datetime_to_unix_nanos,
44    time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_data::client::DataClient;
47use nautilus_model::{
48    data::{Data, FundingRateUpdate, OrderBookDeltas_API},
49    enums::BookType,
50    identifiers::{ClientId, InstrumentId, Venue},
51    instruments::{Instrument, InstrumentAny},
52};
53use tokio::{task::JoinHandle, time::Duration};
54use tokio_util::sync::CancellationToken;
55
56use crate::{
57    common::{
58        consts::OKX_VENUE,
59        enums::{OKXBookChannel, OKXContractType, OKXInstrumentType, OKXVipLevel},
60        parse::okx_instrument_type_from_symbol,
61    },
62    config::OKXDataClientConfig,
63    http::client::OKXHttpClient,
64    websocket::{client::OKXWebSocketClient, messages::NautilusWsMessage},
65};
66
67#[derive(Debug)]
68pub struct OKXDataClient {
69    client_id: ClientId,
70    config: OKXDataClientConfig,
71    http_client: OKXHttpClient,
72    ws_public: Option<OKXWebSocketClient>,
73    ws_business: Option<OKXWebSocketClient>,
74    is_connected: AtomicBool,
75    cancellation_token: CancellationToken,
76    tasks: Vec<JoinHandle<()>>,
77    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
78    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
79    book_channels: Arc<RwLock<AHashMap<InstrumentId, OKXBookChannel>>>,
80    clock: &'static AtomicTime,
81}
82
83impl OKXDataClient {
84    /// Creates a new [`OKXDataClient`] instance.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the client fails to initialize.
89    pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
90        let clock = get_atomic_clock_realtime();
91        let data_sender = get_data_event_sender();
92
93        let http_client = if config.has_api_credentials() {
94            OKXHttpClient::with_credentials(
95                config.api_key.clone(),
96                config.api_secret.clone(),
97                config.api_passphrase.clone(),
98                config.base_url_http.clone(),
99                config.http_timeout_secs,
100                config.max_retries,
101                config.retry_delay_initial_ms,
102                config.retry_delay_max_ms,
103                config.is_demo,
104                config.http_proxy_url.clone(),
105            )?
106        } else {
107            OKXHttpClient::new(
108                config.base_url_http.clone(),
109                config.http_timeout_secs,
110                config.max_retries,
111                config.retry_delay_initial_ms,
112                config.retry_delay_max_ms,
113                config.is_demo,
114                config.http_proxy_url.clone(),
115            )?
116        };
117
118        let ws_public = OKXWebSocketClient::new(
119            Some(config.ws_public_url()),
120            None,
121            None,
122            None,
123            None,
124            Some(20), // Heartbeat
125        )
126        .context("failed to construct OKX public websocket client")?;
127
128        let ws_business = if config.requires_business_ws() {
129            Some(
130                OKXWebSocketClient::new(
131                    Some(config.ws_business_url()),
132                    config.api_key.clone(),
133                    config.api_secret.clone(),
134                    config.api_passphrase.clone(),
135                    None,
136                    Some(20), // Heartbeat
137                )
138                .context("failed to construct OKX business websocket client")?,
139            )
140        } else {
141            None
142        };
143
144        if let Some(vip_level) = config.vip_level {
145            ws_public.set_vip_level(vip_level);
146            if let Some(ref ws) = ws_business {
147                ws.set_vip_level(vip_level);
148            }
149        }
150
151        Ok(Self {
152            client_id,
153            config,
154            http_client,
155            ws_public: Some(ws_public),
156            ws_business,
157            is_connected: AtomicBool::new(false),
158            cancellation_token: CancellationToken::new(),
159            tasks: Vec::new(),
160            data_sender,
161            instruments: Arc::new(RwLock::new(AHashMap::new())),
162            book_channels: Arc::new(RwLock::new(AHashMap::new())),
163            clock,
164        })
165    }
166
167    fn venue(&self) -> Venue {
168        *OKX_VENUE
169    }
170
171    fn vip_level(&self) -> Option<OKXVipLevel> {
172        self.ws_public.as_ref().map(|ws| ws.vip_level())
173    }
174
175    fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
176        self.ws_public
177            .as_ref()
178            .context("public websocket client not initialized")
179    }
180
181    fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
182        self.ws_business
183            .as_ref()
184            .context("business websocket client not available (credentials required)")
185    }
186
187    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
188        if let Err(e) = sender.send(DataEvent::Data(data)) {
189            tracing::error!("Failed to emit data event: {e}");
190        }
191    }
192
193    fn spawn_ws<F>(&self, fut: F, context: &'static str)
194    where
195        F: Future<Output = anyhow::Result<()>> + Send + 'static,
196    {
197        get_runtime().spawn(async move {
198            if let Err(e) = fut.await {
199                tracing::error!("{context}: {e:?}");
200            }
201        });
202    }
203
204    fn handle_ws_message(
205        message: NautilusWsMessage,
206        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
207        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
208    ) {
209        match message {
210            NautilusWsMessage::Data(payloads) => {
211                for data in payloads {
212                    Self::send_data(data_sender, data);
213                }
214            }
215            NautilusWsMessage::Deltas(deltas) => {
216                Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
217            }
218            NautilusWsMessage::FundingRates(updates) => {
219                emit_funding_rates(updates);
220            }
221            NautilusWsMessage::Instrument(instrument) => {
222                upsert_instrument(instruments, *instrument);
223            }
224            NautilusWsMessage::AccountUpdate(_)
225            | NautilusWsMessage::PositionUpdate(_)
226            | NautilusWsMessage::OrderAccepted(_)
227            | NautilusWsMessage::OrderCanceled(_)
228            | NautilusWsMessage::OrderExpired(_)
229            | NautilusWsMessage::OrderRejected(_)
230            | NautilusWsMessage::OrderCancelRejected(_)
231            | NautilusWsMessage::OrderModifyRejected(_)
232            | NautilusWsMessage::OrderTriggered(_)
233            | NautilusWsMessage::OrderUpdated(_)
234            | NautilusWsMessage::ExecutionReports(_) => {
235                tracing::debug!("Ignoring trading message on data client");
236            }
237            NautilusWsMessage::Error(e) => {
238                tracing::error!("OKX websocket error: {e:?}");
239            }
240            NautilusWsMessage::Raw(value) => {
241                tracing::debug!("Unhandled websocket payload: {value:?}");
242            }
243            NautilusWsMessage::Reconnected => {
244                tracing::info!("Websocket reconnected");
245            }
246            NautilusWsMessage::Authenticated => {
247                tracing::debug!("Websocket authenticated");
248            }
249        }
250    }
251}
252
253fn emit_funding_rates(updates: Vec<FundingRateUpdate>) {
254    if updates.is_empty() {
255        return;
256    }
257
258    for update in updates {
259        tracing::debug!(
260            "Received funding rate update for {} but forwarding is not yet supported",
261            update.instrument_id
262        );
263    }
264}
265
266fn upsert_instrument(
267    cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
268    instrument: InstrumentAny,
269) {
270    let mut guard = cache.write().expect(MUTEX_POISONED);
271    guard.insert(instrument.id(), instrument);
272}
273
274fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
275    contract_filter_with_config_types(config.contract_types.as_ref(), instrument)
276}
277
278fn contract_filter_with_config_types(
279    contract_types: Option<&Vec<OKXContractType>>,
280    instrument: &InstrumentAny,
281) -> bool {
282    match contract_types {
283        None => true,
284        Some(filter) if filter.is_empty() => true,
285        Some(filter) => {
286            let is_inverse = instrument.is_inverse();
287            (is_inverse && filter.contains(&OKXContractType::Inverse))
288                || (!is_inverse && filter.contains(&OKXContractType::Linear))
289        }
290    }
291}
292
293#[async_trait::async_trait(?Send)]
294impl DataClient for OKXDataClient {
295    fn client_id(&self) -> ClientId {
296        self.client_id
297    }
298
299    fn venue(&self) -> Option<Venue> {
300        Some(self.venue())
301    }
302
303    fn start(&mut self) -> anyhow::Result<()> {
304        tracing::info!(
305            client_id = %self.client_id,
306            vip_level = ?self.vip_level(),
307            instrument_types = ?self.config.instrument_types,
308            is_demo = self.config.is_demo,
309            http_proxy_url = ?self.config.http_proxy_url,
310            ws_proxy_url = ?self.config.ws_proxy_url,
311            "Started"
312        );
313        Ok(())
314    }
315
316    fn stop(&mut self) -> anyhow::Result<()> {
317        tracing::info!("Stopping {id}", id = self.client_id);
318        self.cancellation_token.cancel();
319        self.is_connected.store(false, Ordering::Relaxed);
320        Ok(())
321    }
322
323    fn reset(&mut self) -> anyhow::Result<()> {
324        tracing::debug!("Resetting {id}", id = self.client_id);
325        self.is_connected.store(false, Ordering::Relaxed);
326        self.cancellation_token = CancellationToken::new();
327        self.tasks.clear();
328        self.book_channels
329            .write()
330            .expect("book channel cache lock poisoned")
331            .clear();
332        Ok(())
333    }
334
335    fn dispose(&mut self) -> anyhow::Result<()> {
336        tracing::debug!("Disposing {id}", id = self.client_id);
337        self.stop()
338    }
339
340    async fn connect(&mut self) -> anyhow::Result<()> {
341        if self.is_connected() {
342            return Ok(());
343        }
344
345        let instrument_types = if self.config.instrument_types.is_empty() {
346            vec![OKXInstrumentType::Spot]
347        } else {
348            self.config.instrument_types.clone()
349        };
350
351        let mut all_instruments = Vec::new();
352        for inst_type in &instrument_types {
353            let mut fetched = self
354                .http_client
355                .request_instruments(*inst_type, None)
356                .await
357                .with_context(|| format!("failed to request OKX instruments for {inst_type:?}"))?;
358
359            fetched.retain(|instrument| contract_filter_with_config(&self.config, instrument));
360            self.http_client.cache_instruments(fetched.clone());
361
362            let mut guard = self.instruments.write().expect(MUTEX_POISONED);
363            for instrument in &fetched {
364                guard.insert(instrument.id(), instrument.clone());
365            }
366            drop(guard);
367
368            all_instruments.extend(fetched);
369        }
370
371        for instrument in all_instruments {
372            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
373                tracing::warn!("Failed to send instrument: {e}");
374            }
375        }
376
377        if let Some(ref mut ws) = self.ws_public {
378            // Cache instruments to websocket before connecting so handler has them
379            let instruments: Vec<_> = self
380                .instruments
381                .read()
382                .expect(MUTEX_POISONED)
383                .values()
384                .cloned()
385                .collect();
386            ws.cache_instruments(instruments);
387
388            ws.connect()
389                .await
390                .context("failed to connect OKX public websocket")?;
391            ws.wait_until_active(10.0)
392                .await
393                .context("public websocket did not become active")?;
394
395            let stream = ws.stream();
396            let sender = self.data_sender.clone();
397            let insts = self.instruments.clone();
398            let cancel = self.cancellation_token.clone();
399            let handle = get_runtime().spawn(async move {
400                pin_mut!(stream);
401                loop {
402                    tokio::select! {
403                        Some(message) = stream.next() => {
404                            Self::handle_ws_message(message, &sender, &insts);
405                        }
406                        _ = cancel.cancelled() => {
407                            tracing::debug!("Public websocket stream task cancelled");
408                            break;
409                        }
410                    }
411                }
412            });
413            self.tasks.push(handle);
414
415            for inst_type in &instrument_types {
416                ws.subscribe_instruments(*inst_type)
417                    .await
418                    .with_context(|| {
419                        format!("failed to subscribe to instrument type {inst_type:?}")
420                    })?;
421            }
422        }
423
424        if let Some(ref mut ws) = self.ws_business {
425            // Cache instruments to websocket before connecting so handler has them
426            let instruments: Vec<_> = self
427                .instruments
428                .read()
429                .expect(MUTEX_POISONED)
430                .values()
431                .cloned()
432                .collect();
433            ws.cache_instruments(instruments);
434
435            ws.connect()
436                .await
437                .context("failed to connect OKX business websocket")?;
438            ws.wait_until_active(10.0)
439                .await
440                .context("business websocket did not become active")?;
441
442            let stream = ws.stream();
443            let sender = self.data_sender.clone();
444            let insts = self.instruments.clone();
445            let cancel = self.cancellation_token.clone();
446            let handle = get_runtime().spawn(async move {
447                pin_mut!(stream);
448                loop {
449                    tokio::select! {
450                        Some(message) = stream.next() => {
451                            Self::handle_ws_message(message, &sender, &insts);
452                        }
453                        _ = cancel.cancelled() => {
454                            tracing::debug!("Business websocket stream task cancelled");
455                            break;
456                        }
457                    }
458                }
459            });
460            self.tasks.push(handle);
461        }
462
463        self.is_connected.store(true, Ordering::Release);
464        tracing::info!(client_id = %self.client_id, "Connected");
465        Ok(())
466    }
467
468    async fn disconnect(&mut self) -> anyhow::Result<()> {
469        if self.is_disconnected() {
470            return Ok(());
471        }
472
473        self.cancellation_token.cancel();
474
475        if let Some(ref ws) = self.ws_public
476            && let Err(e) = ws.unsubscribe_all().await
477        {
478            tracing::warn!("Failed to unsubscribe all from public websocket: {e:?}");
479        }
480        if let Some(ref ws) = self.ws_business
481            && let Err(e) = ws.unsubscribe_all().await
482        {
483            tracing::warn!("Failed to unsubscribe all from business websocket: {e:?}");
484        }
485
486        // Allow time for unsubscribe confirmations
487        tokio::time::sleep(Duration::from_millis(500)).await;
488
489        if let Some(ref mut ws) = self.ws_public {
490            let _ = ws.close().await;
491        }
492        if let Some(ref mut ws) = self.ws_business {
493            let _ = ws.close().await;
494        }
495
496        let handles: Vec<_> = self.tasks.drain(..).collect();
497        for handle in handles {
498            if let Err(e) = handle.await {
499                tracing::error!("Error joining websocket task: {e}");
500            }
501        }
502
503        self.book_channels.write().expect(MUTEX_POISONED).clear();
504        self.is_connected.store(false, Ordering::Release);
505        tracing::info!(client_id = %self.client_id, "Disconnected");
506        Ok(())
507    }
508
509    fn is_connected(&self) -> bool {
510        self.is_connected.load(Ordering::Relaxed)
511    }
512
513    fn is_disconnected(&self) -> bool {
514        !self.is_connected()
515    }
516
517    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
518        for inst_type in &self.config.instrument_types {
519            let ws = self.public_ws()?.clone();
520            let inst_type = *inst_type;
521
522            self.spawn_ws(
523                async move {
524                    ws.subscribe_instruments(inst_type)
525                        .await
526                        .context("instruments subscription")?;
527                    Ok(())
528                },
529                "subscribe_instruments",
530            );
531        }
532        Ok(())
533    }
534
535    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
536        // OKX instruments channel doesn't support subscribing to individual instruments via instId
537        // Instead, subscribe to the instrument type if not already subscribed
538        let instrument_id = cmd.instrument_id;
539        let ws = self.public_ws()?.clone();
540
541        self.spawn_ws(
542            async move {
543                ws.subscribe_instrument(instrument_id)
544                    .await
545                    .context("instrument type subscription")?;
546                Ok(())
547            },
548            "subscribe_instrument",
549        );
550        Ok(())
551    }
552
553    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
554        if cmd.book_type != BookType::L2_MBP {
555            anyhow::bail!("OKX only supports L2_MBP order book deltas");
556        }
557
558        let depth = cmd.depth.map_or(0, |d| d.get());
559        if !matches!(depth, 0 | 50 | 400) {
560            anyhow::bail!("invalid depth {depth}; valid values are 50 or 400");
561        }
562
563        let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
564        let channel = match depth {
565            50 => {
566                if vip < OKXVipLevel::Vip4 {
567                    anyhow::bail!(
568                        "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
569                    );
570                }
571                OKXBookChannel::Books50L2Tbt
572            }
573            0 | 400 => {
574                if vip >= OKXVipLevel::Vip5 {
575                    OKXBookChannel::BookL2Tbt
576                } else {
577                    OKXBookChannel::Book
578                }
579            }
580            _ => unreachable!(),
581        };
582
583        let instrument_id = cmd.instrument_id;
584        let ws = self.public_ws()?.clone();
585        let book_channels = Arc::clone(&self.book_channels);
586
587        self.spawn_ws(
588            async move {
589                match channel {
590                    OKXBookChannel::Books50L2Tbt => ws
591                        .subscribe_book50_l2_tbt(instrument_id)
592                        .await
593                        .context("books50-l2-tbt subscription")?,
594                    OKXBookChannel::BookL2Tbt => ws
595                        .subscribe_book_l2_tbt(instrument_id)
596                        .await
597                        .context("books-l2-tbt subscription")?,
598                    OKXBookChannel::Book => ws
599                        .subscribe_books_channel(instrument_id)
600                        .await
601                        .context("books subscription")?,
602                }
603                book_channels
604                    .write()
605                    .expect("book channel cache lock poisoned")
606                    .insert(instrument_id, channel);
607                Ok(())
608            },
609            "order book delta subscription",
610        );
611
612        Ok(())
613    }
614
615    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
616        if cmd.book_type != BookType::L2_MBP {
617            anyhow::bail!("OKX only supports L2_MBP order book snapshots");
618        }
619        let depth = cmd.depth.map_or(5, |d| d.get());
620        if depth != 5 {
621            anyhow::bail!("OKX only supports depth=5 snapshots");
622        }
623
624        let ws = self.public_ws()?.clone();
625        let instrument_id = cmd.instrument_id;
626
627        self.spawn_ws(
628            async move {
629                ws.subscribe_book_depth5(instrument_id)
630                    .await
631                    .context("books5 subscription")
632            },
633            "order book snapshot subscription",
634        );
635        Ok(())
636    }
637
638    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
639        let ws = self.public_ws()?.clone();
640        let instrument_id = cmd.instrument_id;
641
642        self.spawn_ws(
643            async move {
644                ws.subscribe_quotes(instrument_id)
645                    .await
646                    .context("quotes subscription")
647            },
648            "quote subscription",
649        );
650        Ok(())
651    }
652
653    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
654        let ws = self.public_ws()?.clone();
655        let instrument_id = cmd.instrument_id;
656
657        self.spawn_ws(
658            async move {
659                ws.subscribe_trades(instrument_id, false)
660                    .await
661                    .context("trades subscription")
662            },
663            "trade subscription",
664        );
665        Ok(())
666    }
667
668    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
669        let ws = self.public_ws()?.clone();
670        let instrument_id = cmd.instrument_id;
671
672        self.spawn_ws(
673            async move {
674                ws.subscribe_mark_prices(instrument_id)
675                    .await
676                    .context("mark price subscription")
677            },
678            "mark price subscription",
679        );
680        Ok(())
681    }
682
683    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
684        let ws = self.public_ws()?.clone();
685        let instrument_id = cmd.instrument_id;
686
687        self.spawn_ws(
688            async move {
689                ws.subscribe_index_prices(instrument_id)
690                    .await
691                    .context("index price subscription")
692            },
693            "index price subscription",
694        );
695        Ok(())
696    }
697
698    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
699        let ws = self.public_ws()?.clone();
700        let instrument_id = cmd.instrument_id;
701
702        self.spawn_ws(
703            async move {
704                ws.subscribe_funding_rates(instrument_id)
705                    .await
706                    .context("funding rate subscription")
707            },
708            "funding rate subscription",
709        );
710        Ok(())
711    }
712
713    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
714        let ws = self.business_ws()?.clone();
715        let bar_type = cmd.bar_type;
716
717        self.spawn_ws(
718            async move {
719                ws.subscribe_bars(bar_type)
720                    .await
721                    .context("bars subscription")
722            },
723            "bar subscription",
724        );
725        Ok(())
726    }
727
728    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
729        let ws = self.public_ws()?.clone();
730        let instrument_id = cmd.instrument_id;
731        let channel = self
732            .book_channels
733            .write()
734            .expect("book channel cache lock poisoned")
735            .remove(&instrument_id);
736
737        self.spawn_ws(
738            async move {
739                match channel {
740                    Some(OKXBookChannel::Books50L2Tbt) => ws
741                        .unsubscribe_book50_l2_tbt(instrument_id)
742                        .await
743                        .context("books50-l2-tbt unsubscribe")?,
744                    Some(OKXBookChannel::BookL2Tbt) => ws
745                        .unsubscribe_book_l2_tbt(instrument_id)
746                        .await
747                        .context("books-l2-tbt unsubscribe")?,
748                    Some(OKXBookChannel::Book) => ws
749                        .unsubscribe_book(instrument_id)
750                        .await
751                        .context("book unsubscribe")?,
752                    None => {
753                        tracing::warn!(
754                            "Book channel not found for {instrument_id}; unsubscribing fallback channel"
755                        );
756                        ws.unsubscribe_book(instrument_id)
757                            .await
758                            .context("book fallback unsubscribe")?;
759                    }
760                }
761                Ok(())
762            },
763            "order book unsubscribe",
764        );
765        Ok(())
766    }
767
768    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
769        let ws = self.public_ws()?.clone();
770        let instrument_id = cmd.instrument_id;
771
772        self.spawn_ws(
773            async move {
774                ws.unsubscribe_book_depth5(instrument_id)
775                    .await
776                    .context("book depth5 unsubscribe")
777            },
778            "order book snapshot unsubscribe",
779        );
780        Ok(())
781    }
782
783    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
784        let ws = self.public_ws()?.clone();
785        let instrument_id = cmd.instrument_id;
786
787        self.spawn_ws(
788            async move {
789                ws.unsubscribe_quotes(instrument_id)
790                    .await
791                    .context("quotes unsubscribe")
792            },
793            "quote unsubscribe",
794        );
795        Ok(())
796    }
797
798    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
799        let ws = self.public_ws()?.clone();
800        let instrument_id = cmd.instrument_id;
801
802        self.spawn_ws(
803            async move {
804                ws.unsubscribe_trades(instrument_id, false) // TODO: Aggregated trades?
805                    .await
806                    .context("trades unsubscribe")
807            },
808            "trade unsubscribe",
809        );
810        Ok(())
811    }
812
813    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
814        let ws = self.public_ws()?.clone();
815        let instrument_id = cmd.instrument_id;
816
817        self.spawn_ws(
818            async move {
819                ws.unsubscribe_mark_prices(instrument_id)
820                    .await
821                    .context("mark price unsubscribe")
822            },
823            "mark price unsubscribe",
824        );
825        Ok(())
826    }
827
828    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
829        let ws = self.public_ws()?.clone();
830        let instrument_id = cmd.instrument_id;
831
832        self.spawn_ws(
833            async move {
834                ws.unsubscribe_index_prices(instrument_id)
835                    .await
836                    .context("index price unsubscribe")
837            },
838            "index price unsubscribe",
839        );
840        Ok(())
841    }
842
843    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
844        let ws = self.public_ws()?.clone();
845        let instrument_id = cmd.instrument_id;
846
847        self.spawn_ws(
848            async move {
849                ws.unsubscribe_funding_rates(instrument_id)
850                    .await
851                    .context("funding rate unsubscribe")
852            },
853            "funding rate unsubscribe",
854        );
855        Ok(())
856    }
857
858    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
859        let ws = self.business_ws()?.clone();
860        let bar_type = cmd.bar_type;
861
862        self.spawn_ws(
863            async move {
864                ws.unsubscribe_bars(bar_type)
865                    .await
866                    .context("bars unsubscribe")
867            },
868            "bar unsubscribe",
869        );
870        Ok(())
871    }
872
873    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
874        let http = self.http_client.clone();
875        let sender = self.data_sender.clone();
876        let instruments_cache = self.instruments.clone();
877        let request_id = request.request_id;
878        let client_id = request.client_id.unwrap_or(self.client_id);
879        let venue = self.venue();
880        let start = request.start;
881        let end = request.end;
882        let params = request.params.clone();
883        let clock = self.clock;
884        let start_nanos = datetime_to_unix_nanos(start);
885        let end_nanos = datetime_to_unix_nanos(end);
886        let instrument_types = if self.config.instrument_types.is_empty() {
887            vec![OKXInstrumentType::Spot]
888        } else {
889            self.config.instrument_types.clone()
890        };
891        let contract_types = self.config.contract_types.clone();
892        let instrument_families = self.config.instrument_families.clone();
893
894        get_runtime().spawn(async move {
895            let mut all_instruments = Vec::new();
896
897            for inst_type in instrument_types {
898                let supports_family = matches!(
899                    inst_type,
900                    OKXInstrumentType::Futures
901                        | OKXInstrumentType::Swap
902                        | OKXInstrumentType::Option
903                );
904
905                let families = match (&instrument_families, inst_type, supports_family) {
906                    (Some(families), OKXInstrumentType::Option, true) => families.clone(),
907                    (Some(families), _, true) => families.clone(),
908                    (None, OKXInstrumentType::Option, _) => {
909                        tracing::warn!(
910                            "Skipping OPTION type: instrument_families required but not configured"
911                        );
912                        continue;
913                    }
914                    _ => vec![],
915                };
916
917                if families.is_empty() {
918                    match http.request_instruments(inst_type, None).await {
919                        Ok(instruments) => {
920                            for instrument in instruments {
921                                if !contract_filter_with_config_types(
922                                    contract_types.as_ref(),
923                                    &instrument,
924                                ) {
925                                    continue;
926                                }
927
928                                upsert_instrument(&instruments_cache, instrument.clone());
929                                all_instruments.push(instrument);
930                            }
931                        }
932                        Err(e) => {
933                            tracing::error!("Failed to fetch instruments for {inst_type:?}: {e:?}");
934                        }
935                    }
936                } else {
937                    for family in families {
938                        match http
939                            .request_instruments(inst_type, Some(family.clone()))
940                            .await
941                        {
942                            Ok(instruments) => {
943                                for instrument in instruments {
944                                    if !contract_filter_with_config_types(
945                                        contract_types.as_ref(),
946                                        &instrument,
947                                    ) {
948                                        continue;
949                                    }
950
951                                    upsert_instrument(&instruments_cache, instrument.clone());
952                                    all_instruments.push(instrument);
953                                }
954                            }
955                            Err(e) => {
956                                tracing::error!(
957                                    "Failed to fetch instruments for {inst_type:?} family {family}: {e:?}"
958                                );
959                            }
960                        }
961                    }
962                }
963            }
964
965            let response = DataResponse::Instruments(InstrumentsResponse::new(
966                request_id,
967                client_id,
968                venue,
969                all_instruments,
970                start_nanos,
971                end_nanos,
972                clock.get_time_ns(),
973                params,
974            ));
975
976            if let Err(e) = sender.send(DataEvent::Response(response)) {
977                tracing::error!("Failed to send instruments response: {e}");
978            }
979        });
980
981        Ok(())
982    }
983
984    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
985        let http = self.http_client.clone();
986        let sender = self.data_sender.clone();
987        let instruments = self.instruments.clone();
988        let instrument_id = request.instrument_id;
989        let request_id = request.request_id;
990        let client_id = request.client_id.unwrap_or(self.client_id);
991        let start = request.start;
992        let end = request.end;
993        let params = request.params.clone();
994        let clock = self.clock;
995        let start_nanos = datetime_to_unix_nanos(start);
996        let end_nanos = datetime_to_unix_nanos(end);
997        let instrument_types = if self.config.instrument_types.is_empty() {
998            vec![OKXInstrumentType::Spot]
999        } else {
1000            self.config.instrument_types.clone()
1001        };
1002        let contract_types = self.config.contract_types.clone();
1003
1004        get_runtime().spawn(async move {
1005            match http
1006                .request_instrument(instrument_id)
1007                .await
1008                .context("fetch instrument from API")
1009            {
1010                Ok(instrument) => {
1011                    let inst_id = instrument.id();
1012                    let symbol = inst_id.symbol.as_str();
1013                    let inst_type = okx_instrument_type_from_symbol(symbol);
1014                    if !instrument_types.contains(&inst_type) {
1015                        tracing::error!(
1016                            "Instrument {instrument_id} type {inst_type:?} not in configured types {instrument_types:?}"
1017                        );
1018                        return;
1019                    }
1020
1021                    if !contract_filter_with_config_types(contract_types.as_ref(), &instrument) {
1022                        tracing::error!(
1023                            "Instrument {instrument_id} filtered out by contract_types config"
1024                        );
1025                        return;
1026                    }
1027
1028                    upsert_instrument(&instruments, instrument.clone());
1029
1030                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1031                        request_id,
1032                        client_id,
1033                        instrument.id(),
1034                        instrument,
1035                        start_nanos,
1036                        end_nanos,
1037                        clock.get_time_ns(),
1038                        params,
1039                    )));
1040
1041                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1042                        tracing::error!("Failed to send instrument response: {e}");
1043                    }
1044                }
1045                Err(e) => tracing::error!("Instrument request failed: {e:?}"),
1046            }
1047        });
1048
1049        Ok(())
1050    }
1051
1052    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
1053        let http = self.http_client.clone();
1054        let sender = self.data_sender.clone();
1055        let instrument_id = request.instrument_id;
1056        let start = request.start;
1057        let end = request.end;
1058        let limit = request.limit.map(|n| n.get() as u32);
1059        let request_id = request.request_id;
1060        let client_id = request.client_id.unwrap_or(self.client_id);
1061        let params = request.params.clone();
1062        let clock = self.clock;
1063        let start_nanos = datetime_to_unix_nanos(start);
1064        let end_nanos = datetime_to_unix_nanos(end);
1065
1066        get_runtime().spawn(async move {
1067            match http
1068                .request_trades(instrument_id, start, end, limit)
1069                .await
1070                .context("failed to request trades from OKX")
1071            {
1072                Ok(trades) => {
1073                    let response = DataResponse::Trades(TradesResponse::new(
1074                        request_id,
1075                        client_id,
1076                        instrument_id,
1077                        trades,
1078                        start_nanos,
1079                        end_nanos,
1080                        clock.get_time_ns(),
1081                        params,
1082                    ));
1083                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1084                        tracing::error!("Failed to send trades response: {e}");
1085                    }
1086                }
1087                Err(e) => tracing::error!("Trade request failed: {e:?}"),
1088            }
1089        });
1090
1091        Ok(())
1092    }
1093
1094    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1095        let http = self.http_client.clone();
1096        let sender = self.data_sender.clone();
1097        let bar_type = request.bar_type;
1098        let start = request.start;
1099        let end = request.end;
1100        let limit = request.limit.map(|n| n.get() as u32);
1101        let request_id = request.request_id;
1102        let client_id = request.client_id.unwrap_or(self.client_id);
1103        let params = request.params.clone();
1104        let clock = self.clock;
1105        let start_nanos = datetime_to_unix_nanos(start);
1106        let end_nanos = datetime_to_unix_nanos(end);
1107
1108        get_runtime().spawn(async move {
1109            match http
1110                .request_bars(bar_type, start, end, limit)
1111                .await
1112                .context("failed to request bars from OKX")
1113            {
1114                Ok(bars) => {
1115                    let response = DataResponse::Bars(BarsResponse::new(
1116                        request_id,
1117                        client_id,
1118                        bar_type,
1119                        bars,
1120                        start_nanos,
1121                        end_nanos,
1122                        clock.get_time_ns(),
1123                        params,
1124                    ));
1125                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1126                        tracing::error!("Failed to send bars response: {e}");
1127                    }
1128                }
1129                Err(e) => tracing::error!("Bar request failed: {e:?}"),
1130            }
1131        });
1132
1133        Ok(())
1134    }
1135}