Skip to main content

nautilus_architect_ax/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the AX Exchange adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, Mutex,
22        atomic::{AtomicBool, Ordering},
23    },
24    time::Duration,
25};
26
27use ahash::AHashMap;
28use anyhow::Context;
29use async_trait::async_trait;
30use chrono::{DateTime, Duration as ChronoDuration, Utc};
31use dashmap::DashMap;
32use futures_util::StreamExt;
33use nautilus_common::{
34    clients::DataClient,
35    live::{runner::get_data_event_sender, runtime::get_runtime},
36    messages::{
37        DataEvent, DataResponse,
38        data::{
39            BarsResponse, BookResponse, FundingRatesResponse, InstrumentResponse,
40            InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestFundingRates,
41            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
42            SubscribeBookDeltas, SubscribeFundingRates, SubscribeInstrument, SubscribeInstruments,
43            SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
44            UnsubscribeBookDeltas, UnsubscribeFundingRates, UnsubscribeInstrument,
45            UnsubscribeInstruments, UnsubscribeQuotes, UnsubscribeTrades,
46        },
47    },
48};
49use nautilus_core::{
50    datetime::datetime_to_unix_nanos,
51    time::{AtomicTime, get_atomic_clock_realtime},
52};
53use nautilus_model::{
54    data::{Data, FundingRateUpdate, OrderBookDeltas_API},
55    enums::BookType,
56    identifiers::{ClientId, InstrumentId, Venue},
57    instruments::InstrumentAny,
58};
59use tokio::task::JoinHandle;
60use tokio_util::sync::CancellationToken;
61use ustr::Ustr;
62
63use crate::{
64    common::{consts::AX_VENUE, enums::AxMarketDataLevel, parse::map_bar_spec_to_candle_width},
65    config::AxDataClientConfig,
66    http::client::AxHttpClient,
67    websocket::{data::client::AxMdWebSocketClient, messages::NautilusDataWsMessage},
68};
69
70/// AX Exchange data client for live market data streaming and historical data requests.
71///
72/// This client integrates with the Nautilus DataEngine to provide:
73/// - Real-time market data via WebSocket subscriptions
74/// - Historical data via REST API requests
75/// - Automatic instrument discovery and caching
76/// - Connection lifecycle management
77#[derive(Debug)]
78pub struct AxDataClient {
79    /// The client ID for this data client.
80    client_id: ClientId,
81    /// Configuration for the data client.
82    config: AxDataClientConfig,
83    /// HTTP client for REST API requests.
84    http_client: AxHttpClient,
85    /// WebSocket client for real-time data streaming.
86    ws_client: AxMdWebSocketClient,
87    /// Whether the client is currently connected.
88    is_connected: AtomicBool,
89    /// Cancellation token for async operations.
90    cancellation_token: CancellationToken,
91    /// Background task handles.
92    tasks: Vec<JoinHandle<()>>,
93    /// Channel sender for emitting data events to the DataEngine.
94    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
95    /// Cached instruments by symbol (shared with HTTP client).
96    instruments: Arc<DashMap<Ustr, InstrumentAny>>,
97    /// High-resolution clock for timestamps.
98    clock: &'static AtomicTime,
99    funding_rate_tasks: AHashMap<InstrumentId, JoinHandle<()>>,
100    funding_rate_cache: Arc<Mutex<AHashMap<InstrumentId, FundingRateUpdate>>>,
101}
102
103impl AxDataClient {
104    /// Creates a new [`AxDataClient`] instance.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error if the data event sender cannot be obtained.
109    pub fn new(
110        client_id: ClientId,
111        config: AxDataClientConfig,
112        http_client: AxHttpClient,
113        ws_client: AxMdWebSocketClient,
114    ) -> anyhow::Result<Self> {
115        let clock = get_atomic_clock_realtime();
116        let data_sender = get_data_event_sender();
117
118        // Share instruments cache with HTTP client
119        let instruments = http_client.instruments_cache.clone();
120
121        Ok(Self {
122            client_id,
123            config,
124            http_client,
125            ws_client,
126            is_connected: AtomicBool::new(false),
127            cancellation_token: CancellationToken::new(),
128            tasks: Vec::new(),
129            data_sender,
130            instruments,
131            clock,
132            funding_rate_tasks: AHashMap::new(),
133            funding_rate_cache: Arc::new(Mutex::new(AHashMap::new())),
134        })
135    }
136
137    /// Returns the venue for this data client.
138    #[must_use]
139    pub fn venue(&self) -> Venue {
140        *AX_VENUE
141    }
142
143    fn map_book_type_to_market_data_level(book_type: BookType) -> AxMarketDataLevel {
144        match book_type {
145            BookType::L3_MBO => AxMarketDataLevel::Level3,
146            BookType::L1_MBP | BookType::L2_MBP => AxMarketDataLevel::Level2,
147        }
148    }
149
150    /// Returns a reference to the instruments cache.
151    #[must_use]
152    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
153        &self.instruments
154    }
155
156    /// Spawns a message handler task to forward WebSocket data to the DataEngine.
157    fn spawn_message_handler(&mut self) {
158        let stream = self.ws_client.stream();
159        let data_sender = self.data_sender.clone();
160        let cancellation_token = self.cancellation_token.clone();
161
162        let handle = get_runtime().spawn(async move {
163            tokio::pin!(stream);
164
165            loop {
166                tokio::select! {
167                    () = cancellation_token.cancelled() => {
168                        log::debug!("Message handler cancelled");
169                        break;
170                    }
171                    msg = stream.next() => {
172                        match msg {
173                            Some(ws_msg) => {
174                                Self::handle_ws_message(ws_msg, &data_sender);
175                            }
176                            None => {
177                                log::debug!("WebSocket stream ended");
178                                break;
179                            }
180                        }
181                    }
182                }
183            }
184        });
185
186        self.tasks.push(handle);
187    }
188
189    /// Handles a WebSocket message and forwards data to the DataEngine.
190    fn handle_ws_message(
191        msg: NautilusDataWsMessage,
192        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
193    ) {
194        match msg {
195            NautilusDataWsMessage::Data(data_vec) => {
196                for data in data_vec {
197                    if let Err(e) = sender.send(DataEvent::Data(data)) {
198                        log::error!("Failed to send data event: {e}");
199                    }
200                }
201            }
202            NautilusDataWsMessage::Deltas(deltas) => {
203                let api_deltas = OrderBookDeltas_API::new(deltas);
204                if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(api_deltas))) {
205                    log::error!("Failed to send deltas event: {e}");
206                }
207            }
208            NautilusDataWsMessage::Bar(bar) => {
209                if let Err(e) = sender.send(DataEvent::Data(Data::Bar(bar))) {
210                    log::error!("Failed to send bar event: {e}");
211                }
212            }
213            NautilusDataWsMessage::Heartbeat => {
214                log::trace!("Received heartbeat");
215            }
216            NautilusDataWsMessage::Reconnected => {
217                log::info!("WebSocket reconnected");
218            }
219            NautilusDataWsMessage::Error(err) => {
220                // Subscription state messages are benign (e.g. duplicate subscribe/unsubscribe)
221                if err.message.contains("already subscribed")
222                    || err.message.contains("not subscribed")
223                {
224                    log::warn!("WebSocket subscription state: {err:?}");
225                } else {
226                    log::error!("WebSocket error: {err:?}");
227                }
228            }
229        }
230    }
231
232    fn spawn_ws<F>(&self, fut: F, context: &'static str)
233    where
234        F: Future<Output = anyhow::Result<()>> + Send + 'static,
235    {
236        get_runtime().spawn(async move {
237            if let Err(e) = fut.await {
238                log::error!("{context}: {e:?}");
239            }
240        });
241    }
242}
243
244#[async_trait(?Send)]
245impl DataClient for AxDataClient {
246    fn client_id(&self) -> ClientId {
247        self.client_id
248    }
249
250    fn venue(&self) -> Option<Venue> {
251        Some(*AX_VENUE)
252    }
253
254    fn start(&mut self) -> anyhow::Result<()> {
255        log::debug!("Starting {}", self.client_id);
256        Ok(())
257    }
258
259    fn stop(&mut self) -> anyhow::Result<()> {
260        log::debug!("Stopping {}", self.client_id);
261        self.cancellation_token.cancel();
262        self.is_connected.store(false, Ordering::Release);
263        Ok(())
264    }
265
266    fn reset(&mut self) -> anyhow::Result<()> {
267        log::debug!("Resetting {}", self.client_id);
268        self.cancellation_token.cancel();
269        for task in self.tasks.drain(..) {
270            task.abort();
271        }
272        for (_, task) in self.funding_rate_tasks.drain() {
273            task.abort();
274        }
275        self.funding_rate_cache.lock().unwrap().clear();
276        self.cancellation_token = CancellationToken::new();
277        Ok(())
278    }
279
280    fn dispose(&mut self) -> anyhow::Result<()> {
281        log::debug!("Disposing {}", self.client_id);
282        self.cancellation_token.cancel();
283        self.is_connected.store(false, Ordering::Release);
284        Ok(())
285    }
286
287    fn is_connected(&self) -> bool {
288        self.is_connected.load(Ordering::Acquire)
289    }
290
291    fn is_disconnected(&self) -> bool {
292        !self.is_connected()
293    }
294
295    async fn connect(&mut self) -> anyhow::Result<()> {
296        log::info!("Connecting {}", self.client_id);
297
298        // Recreate token so a previous disconnect/stop doesn't block new operations
299        self.cancellation_token = CancellationToken::new();
300
301        if self.config.has_api_credentials() {
302            let api_key = self
303                .config
304                .api_key
305                .clone()
306                .or_else(|| std::env::var("AX_API_KEY").ok())
307                .context("AX_API_KEY not configured")?;
308
309            let api_secret = self
310                .config
311                .api_secret
312                .clone()
313                .or_else(|| std::env::var("AX_API_SECRET").ok())
314                .context("AX_API_SECRET not configured")?;
315
316            let token = self
317                .http_client
318                .authenticate(&api_key, &api_secret, 86400)
319                .await
320                .context("Failed to authenticate with Ax")?;
321            log::info!("Authenticated with Ax");
322            self.ws_client.set_auth_token(token);
323        }
324
325        let instruments = self
326            .http_client
327            .request_instruments(None, None)
328            .await
329            .context("Failed to fetch instruments")?;
330
331        for instrument in &instruments {
332            self.ws_client.cache_instrument(instrument.clone());
333            if let Err(e) = self
334                .data_sender
335                .send(DataEvent::Instrument(instrument.clone()))
336            {
337                log::warn!("Failed to send instrument: {e}");
338            }
339        }
340        self.http_client.cache_instruments(instruments);
341        log::info!(
342            "Cached {} instruments",
343            self.http_client.get_cached_symbols().len()
344        );
345
346        self.ws_client
347            .connect()
348            .await
349            .context("Failed to connect WebSocket")?;
350        log::info!("WebSocket connected");
351        self.spawn_message_handler();
352
353        self.is_connected.store(true, Ordering::Release);
354        log::info!("Connected {}", self.client_id);
355
356        Ok(())
357    }
358
359    async fn disconnect(&mut self) -> anyhow::Result<()> {
360        log::info!("Disconnecting {}", self.client_id);
361        self.cancellation_token.cancel();
362        self.ws_client.close().await;
363
364        for task in self.tasks.drain(..) {
365            task.abort();
366        }
367        for (_, task) in self.funding_rate_tasks.drain() {
368            task.abort();
369        }
370        self.funding_rate_cache.lock().unwrap().clear();
371
372        self.is_connected.store(false, Ordering::Release);
373        log::info!("Disconnected {}", self.client_id);
374
375        Ok(())
376    }
377
378    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
379        // AX does not have a real-time instruments channel; instruments are fetched via HTTP
380        log::debug!("Instruments subscription not applicable for AX (use request_instruments)");
381        Ok(())
382    }
383
384    fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
385        // AX does not have a real-time instrument channel; instruments are fetched via HTTP
386        log::debug!("Instrument subscription not applicable for AX (use request_instrument)");
387        Ok(())
388    }
389
390    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
391        let symbol = cmd.instrument_id.symbol.to_string();
392        let level = Self::map_book_type_to_market_data_level(cmd.book_type);
393        if cmd.book_type == BookType::L1_MBP {
394            log::warn!(
395                "Book type L1_MBP not supported by AX for deltas, downgrading {symbol} to LEVEL_2"
396            );
397        }
398        log::debug!("Subscribing to book deltas for {symbol} at {level:?}");
399
400        let ws = self.ws_client.clone();
401        self.spawn_ws(
402            async move {
403                ws.subscribe_book_deltas(&symbol, level)
404                    .await
405                    .map_err(|e| anyhow::anyhow!(e))
406            },
407            "subscribe book deltas",
408        );
409
410        Ok(())
411    }
412
413    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
414        let symbol = cmd.instrument_id.symbol.to_string();
415        log::debug!("Subscribing to quotes for {symbol}");
416
417        let ws = self.ws_client.clone();
418        self.spawn_ws(
419            async move {
420                ws.subscribe_quotes(&symbol)
421                    .await
422                    .map_err(|e| anyhow::anyhow!(e))
423            },
424            "subscribe quotes",
425        );
426
427        Ok(())
428    }
429
430    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
431        let symbol = cmd.instrument_id.symbol.to_string();
432        log::debug!("Subscribing to trades for {symbol}");
433
434        let ws = self.ws_client.clone();
435        self.spawn_ws(
436            async move {
437                ws.subscribe_trades(&symbol)
438                    .await
439                    .map_err(|e| anyhow::anyhow!(e))
440            },
441            "subscribe trades",
442        );
443
444        Ok(())
445    }
446
447    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
448        let bar_type = cmd.bar_type;
449        let symbol = bar_type.instrument_id().symbol.to_string();
450        let width = map_bar_spec_to_candle_width(&bar_type.spec())?;
451        log::debug!("Subscribing to bars for {bar_type} (width: {width:?})");
452
453        let ws = self.ws_client.clone();
454        self.spawn_ws(
455            async move {
456                ws.subscribe_candles(&symbol, width)
457                    .await
458                    .map_err(|e| anyhow::anyhow!(e))
459            },
460            "subscribe bars",
461        );
462
463        Ok(())
464    }
465
466    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
467        // TODO: Hardcoded for now
468        const POLL_INTERVAL_SECS: u64 = 900; // 15 minutes
469
470        // Use 7-day lookback to capture latest rate across weekends/holidays
471        let lookback = ChronoDuration::days(7);
472
473        let instrument_id = cmd.instrument_id;
474
475        if self.funding_rate_tasks.contains_key(&instrument_id) {
476            log::debug!("Already subscribed to funding rates for {instrument_id}");
477            return Ok(());
478        }
479
480        log::debug!("Subscribing to funding rates for {instrument_id} (HTTP polling)");
481
482        let http = self.http_client.clone();
483        let sender = self.data_sender.clone();
484        let symbol = instrument_id.symbol.inner();
485        let cancel = self.cancellation_token.clone();
486        let cache = Arc::clone(&self.funding_rate_cache);
487        let clock = self.clock;
488
489        let handle = get_runtime().spawn(async move {
490            // First tick fires immediately for initial emission
491            let mut interval = tokio::time::interval(Duration::from_secs(POLL_INTERVAL_SECS));
492
493            loop {
494                tokio::select! {
495                    () = cancel.cancelled() => {
496                        log::debug!("Funding rate polling cancelled for {symbol}");
497                        break;
498                    }
499                    _ = interval.tick() => {
500                        let now: DateTime<Utc> = clock.get_time_ns().into();
501                        let start = now - lookback;
502
503                        match http.request_funding_rates(instrument_id, Some(start), Some(now)).await {
504                            Ok(funding_rates) => {
505                                if funding_rates.is_empty() {
506                                    log::warn!(
507                                        "No funding rates returned for {symbol}"
508                                    );
509                                } else if let Some(update) = funding_rates.last() {
510                                    // Only emit if rate changed
511                                    let should_emit = cache.lock().unwrap()
512                                        .get(&instrument_id) != Some(update);
513
514                                    if should_emit {
515                                        log::info!(
516                                            "Funding rate for {symbol}: {}",
517                                            update.rate,
518                                        );
519                                        let update = *update;
520                                        cache.lock().unwrap()
521                                            .insert(instrument_id, update);
522                                        if let Err(e) = sender.send(
523                                            DataEvent::FundingRate(update),
524                                        ) {
525                                            log::error!(
526                                                "Failed to send funding rate for {symbol}: {e}"
527                                            );
528                                        }
529                                    }
530                                }
531                            }
532                            Err(e) => {
533                                log::error!(
534                                    "Failed to poll funding rates for {symbol}: {e}"
535                                );
536                            }
537                        }
538                    }
539                }
540            }
541        });
542
543        self.funding_rate_tasks.insert(instrument_id, handle);
544        Ok(())
545    }
546
547    fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
548        Ok(())
549    }
550
551    fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
552        Ok(())
553    }
554
555    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
556        let symbol = cmd.instrument_id.symbol.to_string();
557        log::debug!("Unsubscribing from book deltas for {symbol}");
558
559        let ws = self.ws_client.clone();
560        self.spawn_ws(
561            async move {
562                ws.unsubscribe_book_deltas(&symbol)
563                    .await
564                    .map_err(|e| anyhow::anyhow!(e))
565            },
566            "unsubscribe book deltas",
567        );
568
569        Ok(())
570    }
571
572    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
573        let symbol = cmd.instrument_id.symbol.to_string();
574        log::debug!("Unsubscribing from quotes for {symbol}");
575
576        let ws = self.ws_client.clone();
577        self.spawn_ws(
578            async move {
579                ws.unsubscribe_quotes(&symbol)
580                    .await
581                    .map_err(|e| anyhow::anyhow!(e))
582            },
583            "unsubscribe quotes",
584        );
585
586        Ok(())
587    }
588
589    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
590        let symbol = cmd.instrument_id.symbol.to_string();
591        log::debug!("Unsubscribing from trades for {symbol}");
592
593        let ws = self.ws_client.clone();
594        self.spawn_ws(
595            async move {
596                ws.unsubscribe_trades(&symbol)
597                    .await
598                    .map_err(|e| anyhow::anyhow!(e))
599            },
600            "unsubscribe trades",
601        );
602
603        Ok(())
604    }
605
606    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
607        let bar_type = cmd.bar_type;
608        let symbol = bar_type.instrument_id().symbol.to_string();
609        let width = map_bar_spec_to_candle_width(&bar_type.spec())?;
610        log::debug!("Unsubscribing from bars for {bar_type}");
611
612        let ws = self.ws_client.clone();
613        self.spawn_ws(
614            async move {
615                ws.unsubscribe_candles(&symbol, width)
616                    .await
617                    .map_err(|e| anyhow::anyhow!(e))
618            },
619            "unsubscribe bars",
620        );
621
622        Ok(())
623    }
624
625    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
626        let instrument_id = cmd.instrument_id;
627
628        if let Some(task) = self.funding_rate_tasks.remove(&instrument_id) {
629            log::debug!("Unsubscribing from funding rates for {instrument_id}");
630            task.abort();
631            self.funding_rate_cache
632                .lock()
633                .unwrap()
634                .remove(&instrument_id);
635        } else {
636            log::debug!("Not subscribed to funding rates for {instrument_id}");
637        }
638
639        Ok(())
640    }
641
642    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
643        let http = self.http_client.clone();
644        let ws = self.ws_client.clone();
645        let sender = self.data_sender.clone();
646        let request_id = request.request_id;
647        let client_id = request.client_id.unwrap_or(self.client_id);
648        let venue = *AX_VENUE;
649        let start_nanos = datetime_to_unix_nanos(request.start);
650        let end_nanos = datetime_to_unix_nanos(request.end);
651        let params = request.params;
652        let clock = self.clock;
653
654        get_runtime().spawn(async move {
655            match http.request_instruments(None, None).await {
656                Ok(instruments) => {
657                    log::info!("Fetched {} instruments from Ax", instruments.len());
658                    for inst in &instruments {
659                        ws.cache_instrument(inst.clone());
660                    }
661                    http.cache_instruments(instruments.clone());
662
663                    let response = DataResponse::Instruments(InstrumentsResponse::new(
664                        request_id,
665                        client_id,
666                        venue,
667                        instruments,
668                        start_nanos,
669                        end_nanos,
670                        clock.get_time_ns(),
671                        params,
672                    ));
673
674                    if let Err(e) = sender.send(DataEvent::Response(response)) {
675                        log::error!("Failed to send instruments response: {e}");
676                    }
677                }
678                Err(e) => {
679                    log::error!("Failed to request instruments: {e}");
680                }
681            }
682        });
683
684        Ok(())
685    }
686
687    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
688        let http = self.http_client.clone();
689        let ws = self.ws_client.clone();
690        let sender = self.data_sender.clone();
691        let request_id = request.request_id;
692        let client_id = request.client_id.unwrap_or(self.client_id);
693        let instrument_id = request.instrument_id;
694        let symbol = instrument_id.symbol.inner();
695        let start_nanos = datetime_to_unix_nanos(request.start);
696        let end_nanos = datetime_to_unix_nanos(request.end);
697        let params = request.params;
698        let clock = self.clock;
699
700        get_runtime().spawn(async move {
701            match http.request_instrument(symbol, None, None).await {
702                Ok(instrument) => {
703                    log::debug!("Fetched instrument {symbol} from Ax");
704                    ws.cache_instrument(instrument.clone());
705                    http.cache_instrument(instrument.clone());
706
707                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
708                        request_id,
709                        client_id,
710                        instrument_id,
711                        instrument,
712                        start_nanos,
713                        end_nanos,
714                        clock.get_time_ns(),
715                        params,
716                    )));
717
718                    if let Err(e) = sender.send(DataEvent::Response(response)) {
719                        log::error!("Failed to send instrument response: {e}");
720                    }
721                }
722                Err(e) => {
723                    log::error!("Failed to request instrument {symbol}: {e}");
724                }
725            }
726        });
727
728        Ok(())
729    }
730
731    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
732        let http = self.http_client.clone();
733        let sender = self.data_sender.clone();
734        let request_id = request.request_id;
735        let client_id = request.client_id.unwrap_or(self.client_id);
736        let instrument_id = request.instrument_id;
737        let symbol = instrument_id.symbol.inner();
738        let depth = request.depth.map(|n| n.get());
739        let params = request.params;
740        let clock = self.clock;
741
742        get_runtime().spawn(async move {
743            match http.request_book_snapshot(symbol, depth).await {
744                Ok(book) => {
745                    log::debug!(
746                        "Fetched book snapshot for {symbol} ({} bids, {} asks)",
747                        book.bids(None).count(),
748                        book.asks(None).count(),
749                    );
750
751                    let response = DataResponse::Book(BookResponse::new(
752                        request_id,
753                        client_id,
754                        instrument_id,
755                        book,
756                        None,
757                        None,
758                        clock.get_time_ns(),
759                        params,
760                    ));
761
762                    if let Err(e) = sender.send(DataEvent::Response(response)) {
763                        log::error!("Failed to send book snapshot response: {e}");
764                    }
765                }
766                Err(e) => {
767                    log::error!("Failed to request book snapshot for {symbol}: {e}");
768                }
769            }
770        });
771
772        Ok(())
773    }
774
775    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
776        let http = self.http_client.clone();
777        let sender = self.data_sender.clone();
778        let request_id = request.request_id;
779        let client_id = request.client_id.unwrap_or(self.client_id);
780        let instrument_id = request.instrument_id;
781        let symbol = instrument_id.symbol.inner();
782        let limit = request.limit.map(|n| n.get() as i32);
783        let start_nanos = datetime_to_unix_nanos(request.start);
784        let end_nanos = datetime_to_unix_nanos(request.end);
785        let params = request.params;
786        let clock = self.clock;
787
788        get_runtime().spawn(async move {
789            match http
790                .request_trade_ticks(symbol, limit, start_nanos, end_nanos)
791                .await
792            {
793                Ok(ticks) => {
794                    log::debug!("Fetched {} trades for {symbol}", ticks.len());
795
796                    let response = DataResponse::Trades(TradesResponse::new(
797                        request_id,
798                        client_id,
799                        instrument_id,
800                        ticks,
801                        start_nanos,
802                        end_nanos,
803                        clock.get_time_ns(),
804                        params,
805                    ));
806
807                    if let Err(e) = sender.send(DataEvent::Response(response)) {
808                        log::error!("Failed to send trades response: {e}");
809                    }
810                }
811                Err(e) => {
812                    log::error!("Failed to request trades for {symbol}: {e}");
813                }
814            }
815        });
816
817        Ok(())
818    }
819
820    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
821        let http = self.http_client.clone();
822        let sender = self.data_sender.clone();
823        let request_id = request.request_id;
824        let client_id = request.client_id.unwrap_or(self.client_id);
825        let bar_type = request.bar_type;
826        let symbol = bar_type.instrument_id().symbol.inner();
827        let start = request.start;
828        let end = request.end;
829        let start_nanos = datetime_to_unix_nanos(start);
830        let end_nanos = datetime_to_unix_nanos(end);
831        let params = request.params;
832        let clock = self.clock;
833        let width = match map_bar_spec_to_candle_width(&bar_type.spec()) {
834            Ok(w) => w,
835            Err(e) => {
836                log::error!("Failed to map bar type {bar_type}: {e}");
837                return Err(e);
838            }
839        };
840
841        get_runtime().spawn(async move {
842            match http.request_bars(symbol, start, end, width).await {
843                Ok(bars) => {
844                    log::debug!("Fetched {} bars for {symbol}", bars.len());
845
846                    let response = DataResponse::Bars(BarsResponse::new(
847                        request_id,
848                        client_id,
849                        bar_type,
850                        bars,
851                        start_nanos,
852                        end_nanos,
853                        clock.get_time_ns(),
854                        params,
855                    ));
856
857                    if let Err(e) = sender.send(DataEvent::Response(response)) {
858                        log::error!("Failed to send bars response: {e}");
859                    }
860                }
861                Err(e) => {
862                    log::error!("Failed to request bars for {symbol}: {e}");
863                }
864            }
865        });
866
867        Ok(())
868    }
869
870    fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
871        let http = self.http_client.clone();
872        let sender = self.data_sender.clone();
873        let request_id = request.request_id;
874        let client_id = request.client_id.unwrap_or(self.client_id);
875        let instrument_id = request.instrument_id;
876        let symbol = instrument_id.symbol.inner();
877        let start = request.start;
878        let end = request.end;
879        let start_nanos = datetime_to_unix_nanos(start);
880        let end_nanos = datetime_to_unix_nanos(end);
881        let params = request.params;
882        let clock = self.clock;
883
884        get_runtime().spawn(async move {
885            match http.request_funding_rates(instrument_id, start, end).await {
886                Ok(funding_rates) => {
887                    log::debug!("Fetched {} funding rates for {symbol}", funding_rates.len());
888
889                    let ts_init = clock.get_time_ns();
890                    let response = DataResponse::FundingRates(FundingRatesResponse::new(
891                        request_id,
892                        client_id,
893                        instrument_id,
894                        funding_rates,
895                        start_nanos,
896                        end_nanos,
897                        ts_init,
898                        params,
899                    ));
900
901                    if let Err(e) = sender.send(DataEvent::Response(response)) {
902                        log::error!("Failed to send funding rates response: {e}");
903                    }
904                }
905                Err(e) => {
906                    log::error!("Failed to request funding rates for {symbol}: {e}");
907                }
908            }
909        });
910
911        Ok(())
912    }
913}