1use std::{
19 future::Future,
20 sync::{
21 Arc, Mutex,
22 atomic::{AtomicBool, Ordering},
23 },
24 time::Duration,
25};
26
27use ahash::AHashMap;
28use anyhow::Context;
29use async_trait::async_trait;
30use chrono::{DateTime, Duration as ChronoDuration, Utc};
31use dashmap::DashMap;
32use futures_util::StreamExt;
33use nautilus_common::{
34 clients::DataClient,
35 live::{runner::get_data_event_sender, runtime::get_runtime},
36 messages::{
37 DataEvent, DataResponse,
38 data::{
39 BarsResponse, BookResponse, FundingRatesResponse, InstrumentResponse,
40 InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestFundingRates,
41 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
42 SubscribeBookDeltas, SubscribeFundingRates, SubscribeInstrument, SubscribeInstruments,
43 SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
44 UnsubscribeBookDeltas, UnsubscribeFundingRates, UnsubscribeInstrument,
45 UnsubscribeInstruments, UnsubscribeQuotes, UnsubscribeTrades,
46 },
47 },
48};
49use nautilus_core::{
50 datetime::datetime_to_unix_nanos,
51 time::{AtomicTime, get_atomic_clock_realtime},
52};
53use nautilus_model::{
54 data::{Data, FundingRateUpdate, OrderBookDeltas_API},
55 enums::BookType,
56 identifiers::{ClientId, InstrumentId, Venue},
57 instruments::InstrumentAny,
58};
59use tokio::task::JoinHandle;
60use tokio_util::sync::CancellationToken;
61use ustr::Ustr;
62
63use crate::{
64 common::{consts::AX_VENUE, enums::AxMarketDataLevel, parse::map_bar_spec_to_candle_width},
65 config::AxDataClientConfig,
66 http::client::AxHttpClient,
67 websocket::{data::client::AxMdWebSocketClient, messages::NautilusDataWsMessage},
68};
69
70#[derive(Debug)]
78pub struct AxDataClient {
79 client_id: ClientId,
81 config: AxDataClientConfig,
83 http_client: AxHttpClient,
85 ws_client: AxMdWebSocketClient,
87 is_connected: AtomicBool,
89 cancellation_token: CancellationToken,
91 tasks: Vec<JoinHandle<()>>,
93 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
95 instruments: Arc<DashMap<Ustr, InstrumentAny>>,
97 clock: &'static AtomicTime,
99 funding_rate_tasks: AHashMap<InstrumentId, JoinHandle<()>>,
100 funding_rate_cache: Arc<Mutex<AHashMap<InstrumentId, FundingRateUpdate>>>,
101}
102
103impl AxDataClient {
104 pub fn new(
110 client_id: ClientId,
111 config: AxDataClientConfig,
112 http_client: AxHttpClient,
113 ws_client: AxMdWebSocketClient,
114 ) -> anyhow::Result<Self> {
115 let clock = get_atomic_clock_realtime();
116 let data_sender = get_data_event_sender();
117
118 let instruments = http_client.instruments_cache.clone();
120
121 Ok(Self {
122 client_id,
123 config,
124 http_client,
125 ws_client,
126 is_connected: AtomicBool::new(false),
127 cancellation_token: CancellationToken::new(),
128 tasks: Vec::new(),
129 data_sender,
130 instruments,
131 clock,
132 funding_rate_tasks: AHashMap::new(),
133 funding_rate_cache: Arc::new(Mutex::new(AHashMap::new())),
134 })
135 }
136
137 #[must_use]
139 pub fn venue(&self) -> Venue {
140 *AX_VENUE
141 }
142
143 fn map_book_type_to_market_data_level(book_type: BookType) -> AxMarketDataLevel {
144 match book_type {
145 BookType::L3_MBO => AxMarketDataLevel::Level3,
146 BookType::L1_MBP | BookType::L2_MBP => AxMarketDataLevel::Level2,
147 }
148 }
149
150 #[must_use]
152 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
153 &self.instruments
154 }
155
156 fn spawn_message_handler(&mut self) {
158 let stream = self.ws_client.stream();
159 let data_sender = self.data_sender.clone();
160 let cancellation_token = self.cancellation_token.clone();
161
162 let handle = get_runtime().spawn(async move {
163 tokio::pin!(stream);
164
165 loop {
166 tokio::select! {
167 () = cancellation_token.cancelled() => {
168 log::debug!("Message handler cancelled");
169 break;
170 }
171 msg = stream.next() => {
172 match msg {
173 Some(ws_msg) => {
174 Self::handle_ws_message(ws_msg, &data_sender);
175 }
176 None => {
177 log::debug!("WebSocket stream ended");
178 break;
179 }
180 }
181 }
182 }
183 }
184 });
185
186 self.tasks.push(handle);
187 }
188
189 fn handle_ws_message(
191 msg: NautilusDataWsMessage,
192 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
193 ) {
194 match msg {
195 NautilusDataWsMessage::Data(data_vec) => {
196 for data in data_vec {
197 if let Err(e) = sender.send(DataEvent::Data(data)) {
198 log::error!("Failed to send data event: {e}");
199 }
200 }
201 }
202 NautilusDataWsMessage::Deltas(deltas) => {
203 let api_deltas = OrderBookDeltas_API::new(deltas);
204 if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(api_deltas))) {
205 log::error!("Failed to send deltas event: {e}");
206 }
207 }
208 NautilusDataWsMessage::Bar(bar) => {
209 if let Err(e) = sender.send(DataEvent::Data(Data::Bar(bar))) {
210 log::error!("Failed to send bar event: {e}");
211 }
212 }
213 NautilusDataWsMessage::Heartbeat => {
214 log::trace!("Received heartbeat");
215 }
216 NautilusDataWsMessage::Reconnected => {
217 log::info!("WebSocket reconnected");
218 }
219 NautilusDataWsMessage::Error(err) => {
220 if err.message.contains("already subscribed")
222 || err.message.contains("not subscribed")
223 {
224 log::warn!("WebSocket subscription state: {err:?}");
225 } else {
226 log::error!("WebSocket error: {err:?}");
227 }
228 }
229 }
230 }
231
232 fn spawn_ws<F>(&self, fut: F, context: &'static str)
233 where
234 F: Future<Output = anyhow::Result<()>> + Send + 'static,
235 {
236 get_runtime().spawn(async move {
237 if let Err(e) = fut.await {
238 log::error!("{context}: {e:?}");
239 }
240 });
241 }
242}
243
244#[async_trait(?Send)]
245impl DataClient for AxDataClient {
246 fn client_id(&self) -> ClientId {
247 self.client_id
248 }
249
250 fn venue(&self) -> Option<Venue> {
251 Some(*AX_VENUE)
252 }
253
254 fn start(&mut self) -> anyhow::Result<()> {
255 log::debug!("Starting {}", self.client_id);
256 Ok(())
257 }
258
259 fn stop(&mut self) -> anyhow::Result<()> {
260 log::debug!("Stopping {}", self.client_id);
261 self.cancellation_token.cancel();
262 self.is_connected.store(false, Ordering::Release);
263 Ok(())
264 }
265
266 fn reset(&mut self) -> anyhow::Result<()> {
267 log::debug!("Resetting {}", self.client_id);
268 self.cancellation_token.cancel();
269 for task in self.tasks.drain(..) {
270 task.abort();
271 }
272 for (_, task) in self.funding_rate_tasks.drain() {
273 task.abort();
274 }
275 self.funding_rate_cache.lock().unwrap().clear();
276 self.cancellation_token = CancellationToken::new();
277 Ok(())
278 }
279
280 fn dispose(&mut self) -> anyhow::Result<()> {
281 log::debug!("Disposing {}", self.client_id);
282 self.cancellation_token.cancel();
283 self.is_connected.store(false, Ordering::Release);
284 Ok(())
285 }
286
287 fn is_connected(&self) -> bool {
288 self.is_connected.load(Ordering::Acquire)
289 }
290
291 fn is_disconnected(&self) -> bool {
292 !self.is_connected()
293 }
294
295 async fn connect(&mut self) -> anyhow::Result<()> {
296 log::info!("Connecting {}", self.client_id);
297
298 self.cancellation_token = CancellationToken::new();
300
301 if self.config.has_api_credentials() {
302 let api_key = self
303 .config
304 .api_key
305 .clone()
306 .or_else(|| std::env::var("AX_API_KEY").ok())
307 .context("AX_API_KEY not configured")?;
308
309 let api_secret = self
310 .config
311 .api_secret
312 .clone()
313 .or_else(|| std::env::var("AX_API_SECRET").ok())
314 .context("AX_API_SECRET not configured")?;
315
316 let token = self
317 .http_client
318 .authenticate(&api_key, &api_secret, 86400)
319 .await
320 .context("Failed to authenticate with Ax")?;
321 log::info!("Authenticated with Ax");
322 self.ws_client.set_auth_token(token);
323 }
324
325 let instruments = self
326 .http_client
327 .request_instruments(None, None)
328 .await
329 .context("Failed to fetch instruments")?;
330
331 for instrument in &instruments {
332 self.ws_client.cache_instrument(instrument.clone());
333 if let Err(e) = self
334 .data_sender
335 .send(DataEvent::Instrument(instrument.clone()))
336 {
337 log::warn!("Failed to send instrument: {e}");
338 }
339 }
340 self.http_client.cache_instruments(instruments);
341 log::info!(
342 "Cached {} instruments",
343 self.http_client.get_cached_symbols().len()
344 );
345
346 self.ws_client
347 .connect()
348 .await
349 .context("Failed to connect WebSocket")?;
350 log::info!("WebSocket connected");
351 self.spawn_message_handler();
352
353 self.is_connected.store(true, Ordering::Release);
354 log::info!("Connected {}", self.client_id);
355
356 Ok(())
357 }
358
359 async fn disconnect(&mut self) -> anyhow::Result<()> {
360 log::info!("Disconnecting {}", self.client_id);
361 self.cancellation_token.cancel();
362 self.ws_client.close().await;
363
364 for task in self.tasks.drain(..) {
365 task.abort();
366 }
367 for (_, task) in self.funding_rate_tasks.drain() {
368 task.abort();
369 }
370 self.funding_rate_cache.lock().unwrap().clear();
371
372 self.is_connected.store(false, Ordering::Release);
373 log::info!("Disconnected {}", self.client_id);
374
375 Ok(())
376 }
377
378 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
379 log::debug!("Instruments subscription not applicable for AX (use request_instruments)");
381 Ok(())
382 }
383
384 fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
385 log::debug!("Instrument subscription not applicable for AX (use request_instrument)");
387 Ok(())
388 }
389
390 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
391 let symbol = cmd.instrument_id.symbol.to_string();
392 let level = Self::map_book_type_to_market_data_level(cmd.book_type);
393 if cmd.book_type == BookType::L1_MBP {
394 log::warn!(
395 "Book type L1_MBP not supported by AX for deltas, downgrading {symbol} to LEVEL_2"
396 );
397 }
398 log::debug!("Subscribing to book deltas for {symbol} at {level:?}");
399
400 let ws = self.ws_client.clone();
401 self.spawn_ws(
402 async move {
403 ws.subscribe_book_deltas(&symbol, level)
404 .await
405 .map_err(|e| anyhow::anyhow!(e))
406 },
407 "subscribe book deltas",
408 );
409
410 Ok(())
411 }
412
413 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
414 let symbol = cmd.instrument_id.symbol.to_string();
415 log::debug!("Subscribing to quotes for {symbol}");
416
417 let ws = self.ws_client.clone();
418 self.spawn_ws(
419 async move {
420 ws.subscribe_quotes(&symbol)
421 .await
422 .map_err(|e| anyhow::anyhow!(e))
423 },
424 "subscribe quotes",
425 );
426
427 Ok(())
428 }
429
430 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
431 let symbol = cmd.instrument_id.symbol.to_string();
432 log::debug!("Subscribing to trades for {symbol}");
433
434 let ws = self.ws_client.clone();
435 self.spawn_ws(
436 async move {
437 ws.subscribe_trades(&symbol)
438 .await
439 .map_err(|e| anyhow::anyhow!(e))
440 },
441 "subscribe trades",
442 );
443
444 Ok(())
445 }
446
447 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
448 let bar_type = cmd.bar_type;
449 let symbol = bar_type.instrument_id().symbol.to_string();
450 let width = map_bar_spec_to_candle_width(&bar_type.spec())?;
451 log::debug!("Subscribing to bars for {bar_type} (width: {width:?})");
452
453 let ws = self.ws_client.clone();
454 self.spawn_ws(
455 async move {
456 ws.subscribe_candles(&symbol, width)
457 .await
458 .map_err(|e| anyhow::anyhow!(e))
459 },
460 "subscribe bars",
461 );
462
463 Ok(())
464 }
465
466 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
467 const POLL_INTERVAL_SECS: u64 = 900; let lookback = ChronoDuration::days(7);
472
473 let instrument_id = cmd.instrument_id;
474
475 if self.funding_rate_tasks.contains_key(&instrument_id) {
476 log::debug!("Already subscribed to funding rates for {instrument_id}");
477 return Ok(());
478 }
479
480 log::debug!("Subscribing to funding rates for {instrument_id} (HTTP polling)");
481
482 let http = self.http_client.clone();
483 let sender = self.data_sender.clone();
484 let symbol = instrument_id.symbol.inner();
485 let cancel = self.cancellation_token.clone();
486 let cache = Arc::clone(&self.funding_rate_cache);
487 let clock = self.clock;
488
489 let handle = get_runtime().spawn(async move {
490 let mut interval = tokio::time::interval(Duration::from_secs(POLL_INTERVAL_SECS));
492
493 loop {
494 tokio::select! {
495 () = cancel.cancelled() => {
496 log::debug!("Funding rate polling cancelled for {symbol}");
497 break;
498 }
499 _ = interval.tick() => {
500 let now: DateTime<Utc> = clock.get_time_ns().into();
501 let start = now - lookback;
502
503 match http.request_funding_rates(instrument_id, Some(start), Some(now)).await {
504 Ok(funding_rates) => {
505 if funding_rates.is_empty() {
506 log::warn!(
507 "No funding rates returned for {symbol}"
508 );
509 } else if let Some(update) = funding_rates.last() {
510 let should_emit = cache.lock().unwrap()
512 .get(&instrument_id) != Some(update);
513
514 if should_emit {
515 log::info!(
516 "Funding rate for {symbol}: {}",
517 update.rate,
518 );
519 let update = *update;
520 cache.lock().unwrap()
521 .insert(instrument_id, update);
522 if let Err(e) = sender.send(
523 DataEvent::FundingRate(update),
524 ) {
525 log::error!(
526 "Failed to send funding rate for {symbol}: {e}"
527 );
528 }
529 }
530 }
531 }
532 Err(e) => {
533 log::error!(
534 "Failed to poll funding rates for {symbol}: {e}"
535 );
536 }
537 }
538 }
539 }
540 }
541 });
542
543 self.funding_rate_tasks.insert(instrument_id, handle);
544 Ok(())
545 }
546
547 fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
548 Ok(())
549 }
550
551 fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
552 Ok(())
553 }
554
555 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
556 let symbol = cmd.instrument_id.symbol.to_string();
557 log::debug!("Unsubscribing from book deltas for {symbol}");
558
559 let ws = self.ws_client.clone();
560 self.spawn_ws(
561 async move {
562 ws.unsubscribe_book_deltas(&symbol)
563 .await
564 .map_err(|e| anyhow::anyhow!(e))
565 },
566 "unsubscribe book deltas",
567 );
568
569 Ok(())
570 }
571
572 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
573 let symbol = cmd.instrument_id.symbol.to_string();
574 log::debug!("Unsubscribing from quotes for {symbol}");
575
576 let ws = self.ws_client.clone();
577 self.spawn_ws(
578 async move {
579 ws.unsubscribe_quotes(&symbol)
580 .await
581 .map_err(|e| anyhow::anyhow!(e))
582 },
583 "unsubscribe quotes",
584 );
585
586 Ok(())
587 }
588
589 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
590 let symbol = cmd.instrument_id.symbol.to_string();
591 log::debug!("Unsubscribing from trades for {symbol}");
592
593 let ws = self.ws_client.clone();
594 self.spawn_ws(
595 async move {
596 ws.unsubscribe_trades(&symbol)
597 .await
598 .map_err(|e| anyhow::anyhow!(e))
599 },
600 "unsubscribe trades",
601 );
602
603 Ok(())
604 }
605
606 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
607 let bar_type = cmd.bar_type;
608 let symbol = bar_type.instrument_id().symbol.to_string();
609 let width = map_bar_spec_to_candle_width(&bar_type.spec())?;
610 log::debug!("Unsubscribing from bars for {bar_type}");
611
612 let ws = self.ws_client.clone();
613 self.spawn_ws(
614 async move {
615 ws.unsubscribe_candles(&symbol, width)
616 .await
617 .map_err(|e| anyhow::anyhow!(e))
618 },
619 "unsubscribe bars",
620 );
621
622 Ok(())
623 }
624
625 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
626 let instrument_id = cmd.instrument_id;
627
628 if let Some(task) = self.funding_rate_tasks.remove(&instrument_id) {
629 log::debug!("Unsubscribing from funding rates for {instrument_id}");
630 task.abort();
631 self.funding_rate_cache
632 .lock()
633 .unwrap()
634 .remove(&instrument_id);
635 } else {
636 log::debug!("Not subscribed to funding rates for {instrument_id}");
637 }
638
639 Ok(())
640 }
641
642 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
643 let http = self.http_client.clone();
644 let ws = self.ws_client.clone();
645 let sender = self.data_sender.clone();
646 let request_id = request.request_id;
647 let client_id = request.client_id.unwrap_or(self.client_id);
648 let venue = *AX_VENUE;
649 let start_nanos = datetime_to_unix_nanos(request.start);
650 let end_nanos = datetime_to_unix_nanos(request.end);
651 let params = request.params;
652 let clock = self.clock;
653
654 get_runtime().spawn(async move {
655 match http.request_instruments(None, None).await {
656 Ok(instruments) => {
657 log::info!("Fetched {} instruments from Ax", instruments.len());
658 for inst in &instruments {
659 ws.cache_instrument(inst.clone());
660 }
661 http.cache_instruments(instruments.clone());
662
663 let response = DataResponse::Instruments(InstrumentsResponse::new(
664 request_id,
665 client_id,
666 venue,
667 instruments,
668 start_nanos,
669 end_nanos,
670 clock.get_time_ns(),
671 params,
672 ));
673
674 if let Err(e) = sender.send(DataEvent::Response(response)) {
675 log::error!("Failed to send instruments response: {e}");
676 }
677 }
678 Err(e) => {
679 log::error!("Failed to request instruments: {e}");
680 }
681 }
682 });
683
684 Ok(())
685 }
686
687 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
688 let http = self.http_client.clone();
689 let ws = self.ws_client.clone();
690 let sender = self.data_sender.clone();
691 let request_id = request.request_id;
692 let client_id = request.client_id.unwrap_or(self.client_id);
693 let instrument_id = request.instrument_id;
694 let symbol = instrument_id.symbol.inner();
695 let start_nanos = datetime_to_unix_nanos(request.start);
696 let end_nanos = datetime_to_unix_nanos(request.end);
697 let params = request.params;
698 let clock = self.clock;
699
700 get_runtime().spawn(async move {
701 match http.request_instrument(symbol, None, None).await {
702 Ok(instrument) => {
703 log::debug!("Fetched instrument {symbol} from Ax");
704 ws.cache_instrument(instrument.clone());
705 http.cache_instrument(instrument.clone());
706
707 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
708 request_id,
709 client_id,
710 instrument_id,
711 instrument,
712 start_nanos,
713 end_nanos,
714 clock.get_time_ns(),
715 params,
716 )));
717
718 if let Err(e) = sender.send(DataEvent::Response(response)) {
719 log::error!("Failed to send instrument response: {e}");
720 }
721 }
722 Err(e) => {
723 log::error!("Failed to request instrument {symbol}: {e}");
724 }
725 }
726 });
727
728 Ok(())
729 }
730
731 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
732 let http = self.http_client.clone();
733 let sender = self.data_sender.clone();
734 let request_id = request.request_id;
735 let client_id = request.client_id.unwrap_or(self.client_id);
736 let instrument_id = request.instrument_id;
737 let symbol = instrument_id.symbol.inner();
738 let depth = request.depth.map(|n| n.get());
739 let params = request.params;
740 let clock = self.clock;
741
742 get_runtime().spawn(async move {
743 match http.request_book_snapshot(symbol, depth).await {
744 Ok(book) => {
745 log::debug!(
746 "Fetched book snapshot for {symbol} ({} bids, {} asks)",
747 book.bids(None).count(),
748 book.asks(None).count(),
749 );
750
751 let response = DataResponse::Book(BookResponse::new(
752 request_id,
753 client_id,
754 instrument_id,
755 book,
756 None,
757 None,
758 clock.get_time_ns(),
759 params,
760 ));
761
762 if let Err(e) = sender.send(DataEvent::Response(response)) {
763 log::error!("Failed to send book snapshot response: {e}");
764 }
765 }
766 Err(e) => {
767 log::error!("Failed to request book snapshot for {symbol}: {e}");
768 }
769 }
770 });
771
772 Ok(())
773 }
774
775 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
776 let http = self.http_client.clone();
777 let sender = self.data_sender.clone();
778 let request_id = request.request_id;
779 let client_id = request.client_id.unwrap_or(self.client_id);
780 let instrument_id = request.instrument_id;
781 let symbol = instrument_id.symbol.inner();
782 let limit = request.limit.map(|n| n.get() as i32);
783 let start_nanos = datetime_to_unix_nanos(request.start);
784 let end_nanos = datetime_to_unix_nanos(request.end);
785 let params = request.params;
786 let clock = self.clock;
787
788 get_runtime().spawn(async move {
789 match http
790 .request_trade_ticks(symbol, limit, start_nanos, end_nanos)
791 .await
792 {
793 Ok(ticks) => {
794 log::debug!("Fetched {} trades for {symbol}", ticks.len());
795
796 let response = DataResponse::Trades(TradesResponse::new(
797 request_id,
798 client_id,
799 instrument_id,
800 ticks,
801 start_nanos,
802 end_nanos,
803 clock.get_time_ns(),
804 params,
805 ));
806
807 if let Err(e) = sender.send(DataEvent::Response(response)) {
808 log::error!("Failed to send trades response: {e}");
809 }
810 }
811 Err(e) => {
812 log::error!("Failed to request trades for {symbol}: {e}");
813 }
814 }
815 });
816
817 Ok(())
818 }
819
820 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
821 let http = self.http_client.clone();
822 let sender = self.data_sender.clone();
823 let request_id = request.request_id;
824 let client_id = request.client_id.unwrap_or(self.client_id);
825 let bar_type = request.bar_type;
826 let symbol = bar_type.instrument_id().symbol.inner();
827 let start = request.start;
828 let end = request.end;
829 let start_nanos = datetime_to_unix_nanos(start);
830 let end_nanos = datetime_to_unix_nanos(end);
831 let params = request.params;
832 let clock = self.clock;
833 let width = match map_bar_spec_to_candle_width(&bar_type.spec()) {
834 Ok(w) => w,
835 Err(e) => {
836 log::error!("Failed to map bar type {bar_type}: {e}");
837 return Err(e);
838 }
839 };
840
841 get_runtime().spawn(async move {
842 match http.request_bars(symbol, start, end, width).await {
843 Ok(bars) => {
844 log::debug!("Fetched {} bars for {symbol}", bars.len());
845
846 let response = DataResponse::Bars(BarsResponse::new(
847 request_id,
848 client_id,
849 bar_type,
850 bars,
851 start_nanos,
852 end_nanos,
853 clock.get_time_ns(),
854 params,
855 ));
856
857 if let Err(e) = sender.send(DataEvent::Response(response)) {
858 log::error!("Failed to send bars response: {e}");
859 }
860 }
861 Err(e) => {
862 log::error!("Failed to request bars for {symbol}: {e}");
863 }
864 }
865 });
866
867 Ok(())
868 }
869
870 fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
871 let http = self.http_client.clone();
872 let sender = self.data_sender.clone();
873 let request_id = request.request_id;
874 let client_id = request.client_id.unwrap_or(self.client_id);
875 let instrument_id = request.instrument_id;
876 let symbol = instrument_id.symbol.inner();
877 let start = request.start;
878 let end = request.end;
879 let start_nanos = datetime_to_unix_nanos(start);
880 let end_nanos = datetime_to_unix_nanos(end);
881 let params = request.params;
882 let clock = self.clock;
883
884 get_runtime().spawn(async move {
885 match http.request_funding_rates(instrument_id, start, end).await {
886 Ok(funding_rates) => {
887 log::debug!("Fetched {} funding rates for {symbol}", funding_rates.len());
888
889 let ts_init = clock.get_time_ns();
890 let response = DataResponse::FundingRates(FundingRatesResponse::new(
891 request_id,
892 client_id,
893 instrument_id,
894 funding_rates,
895 start_nanos,
896 end_nanos,
897 ts_init,
898 params,
899 ));
900
901 if let Err(e) = sender.send(DataEvent::Response(response)) {
902 log::error!("Failed to send funding rates response: {e}");
903 }
904 }
905 Err(e) => {
906 log::error!("Failed to request funding rates for {symbol}: {e}");
907 }
908 }
909 });
910
911 Ok(())
912 }
913}