1use std::{
22 fmt::Debug,
23 ops::{Deref, DerefMut},
24};
25
26use ahash::AHashSet;
27use nautilus_common::{
28 clients::{DataClient, log_command_error},
29 messages::data::{
30 RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData, RequestInstrument,
31 RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
32 SubscribeBookDepth10, SubscribeCommand, SubscribeCustomData, SubscribeFundingRates,
33 SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
34 SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
35 SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
36 UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
37 UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
38 UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
39 },
40};
41#[cfg(feature = "defi")]
42use nautilus_model::defi::Blockchain;
43use nautilus_model::{
44 data::{BarType, DataType},
45 identifiers::{ClientId, InstrumentId, Venue},
46};
47
48#[cfg(feature = "defi")]
49#[allow(unused_imports)] use crate::defi::client as _;
51
52pub struct DataClientAdapter {
54 pub(crate) client: Box<dyn DataClient>,
55 pub client_id: ClientId,
56 pub venue: Option<Venue>,
57 pub handles_book_deltas: bool,
58 pub handles_book_snapshots: bool,
59 pub subscriptions_custom: AHashSet<DataType>,
60 pub subscriptions_book_deltas: AHashSet<InstrumentId>,
61 pub subscriptions_book_depth10: AHashSet<InstrumentId>,
62 pub subscriptions_quotes: AHashSet<InstrumentId>,
63 pub subscriptions_trades: AHashSet<InstrumentId>,
64 pub subscriptions_bars: AHashSet<BarType>,
65 pub subscriptions_instrument_status: AHashSet<InstrumentId>,
66 pub subscriptions_instrument_close: AHashSet<InstrumentId>,
67 pub subscriptions_instrument: AHashSet<InstrumentId>,
68 pub subscriptions_instrument_venue: AHashSet<Venue>,
69 pub subscriptions_mark_prices: AHashSet<InstrumentId>,
70 pub subscriptions_index_prices: AHashSet<InstrumentId>,
71 pub subscriptions_funding_rates: AHashSet<InstrumentId>,
72 #[cfg(feature = "defi")]
73 pub subscriptions_blocks: AHashSet<Blockchain>,
74 #[cfg(feature = "defi")]
75 pub subscriptions_pools: AHashSet<InstrumentId>,
76 #[cfg(feature = "defi")]
77 pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
78 #[cfg(feature = "defi")]
79 pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
80 #[cfg(feature = "defi")]
81 pub subscriptions_pool_fee_collects: AHashSet<InstrumentId>,
82 #[cfg(feature = "defi")]
83 pub subscriptions_pool_flash: AHashSet<InstrumentId>,
84}
85
86impl Deref for DataClientAdapter {
87 type Target = Box<dyn DataClient>;
88
89 fn deref(&self) -> &Self::Target {
90 &self.client
91 }
92}
93
94impl DerefMut for DataClientAdapter {
95 fn deref_mut(&mut self) -> &mut Self::Target {
96 &mut self.client
97 }
98}
99
100impl Debug for DataClientAdapter {
101 #[rustfmt::skip]
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 f.debug_struct(stringify!(DataClientAdapter))
104 .field("client_id", &self.client_id)
105 .field("venue", &self.venue)
106 .field("handles_book_deltas", &self.handles_book_deltas)
107 .field("handles_book_snapshots", &self.handles_book_snapshots)
108 .field("subscriptions_custom", &self.subscriptions_custom)
109 .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
110 .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
111 .field("subscriptions_quotes", &self.subscriptions_quotes)
112 .field("subscriptions_trades", &self.subscriptions_trades)
113 .field("subscriptions_bars", &self.subscriptions_bars)
114 .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
115 .field("subscriptions_index_prices", &self.subscriptions_index_prices)
116 .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
117 .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
118 .field("subscriptions_instrument", &self.subscriptions_instrument)
119 .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
120 .finish()
121 }
122}
123
124impl DataClientAdapter {
125 #[must_use]
127 pub fn new(
128 client_id: ClientId,
129 venue: Option<Venue>,
130 handles_order_book_deltas: bool,
131 handles_order_book_snapshots: bool,
132 client: Box<dyn DataClient>,
133 ) -> Self {
134 Self {
135 client,
136 client_id,
137 venue,
138 handles_book_deltas: handles_order_book_deltas,
139 handles_book_snapshots: handles_order_book_snapshots,
140 subscriptions_custom: AHashSet::new(),
141 subscriptions_book_deltas: AHashSet::new(),
142 subscriptions_book_depth10: AHashSet::new(),
143 subscriptions_quotes: AHashSet::new(),
144 subscriptions_trades: AHashSet::new(),
145 subscriptions_mark_prices: AHashSet::new(),
146 subscriptions_index_prices: AHashSet::new(),
147 subscriptions_funding_rates: AHashSet::new(),
148 subscriptions_bars: AHashSet::new(),
149 subscriptions_instrument_status: AHashSet::new(),
150 subscriptions_instrument_close: AHashSet::new(),
151 subscriptions_instrument: AHashSet::new(),
152 subscriptions_instrument_venue: AHashSet::new(),
153 #[cfg(feature = "defi")]
154 subscriptions_blocks: AHashSet::new(),
155 #[cfg(feature = "defi")]
156 subscriptions_pools: AHashSet::new(),
157 #[cfg(feature = "defi")]
158 subscriptions_pool_swaps: AHashSet::new(),
159 #[cfg(feature = "defi")]
160 subscriptions_pool_liquidity_updates: AHashSet::new(),
161 #[cfg(feature = "defi")]
162 subscriptions_pool_fee_collects: AHashSet::new(),
163 #[cfg(feature = "defi")]
164 subscriptions_pool_flash: AHashSet::new(),
165 }
166 }
167
168 #[allow(clippy::borrowed_box)]
169 #[must_use]
170 pub fn get_client(&self) -> &Box<dyn DataClient> {
171 &self.client
172 }
173
174 pub async fn connect(&mut self) -> anyhow::Result<()> {
180 self.client.connect().await
181 }
182
183 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
189 self.client.disconnect().await
190 }
191
192 #[inline]
193 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
194 if let Err(e) = match cmd {
195 SubscribeCommand::Data(cmd) => self.subscribe(cmd),
196 SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
197 SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
198 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
199 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
200 SubscribeCommand::BookSnapshots(_) => Ok(()), SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
202 SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
203 SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
204 SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
205 SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
206 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
207 SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
208 SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
209 } {
210 log_command_error(&cmd, &e);
211 }
212 }
213
214 #[inline]
215 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
216 if let Err(e) = match cmd {
217 UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
218 UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
219 UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
220 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
221 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
222 UnsubscribeCommand::BookSnapshots(_) => Ok(()), UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
224 UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
225 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
226 UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
227 UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
228 UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
229 UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
230 UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
231 } {
232 log_command_error(&cmd, &e);
233 }
234 }
235
236 pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
244 if !self.subscriptions_custom.contains(&cmd.data_type) {
245 self.subscriptions_custom.insert(cmd.data_type.clone());
246 self.client.subscribe(cmd)?;
247 }
248 Ok(())
249 }
250
251 pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
257 if self.subscriptions_custom.contains(&cmd.data_type) {
258 self.subscriptions_custom.remove(&cmd.data_type);
259 self.client.unsubscribe(cmd)?;
260 }
261 Ok(())
262 }
263
264 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
270 if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
271 self.subscriptions_instrument_venue.insert(cmd.venue);
272 self.client.subscribe_instruments(cmd)?;
273 }
274
275 Ok(())
276 }
277
278 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
284 if self.subscriptions_instrument_venue.contains(&cmd.venue) {
285 self.subscriptions_instrument_venue.remove(&cmd.venue);
286 self.client.unsubscribe_instruments(cmd)?;
287 }
288
289 Ok(())
290 }
291
292 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
298 if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
299 self.subscriptions_instrument.insert(cmd.instrument_id);
300 self.client.subscribe_instrument(cmd)?;
301 }
302
303 Ok(())
304 }
305
306 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
312 if self.subscriptions_instrument.contains(&cmd.instrument_id) {
313 self.subscriptions_instrument.remove(&cmd.instrument_id);
314 self.client.unsubscribe_instrument(cmd)?;
315 }
316
317 Ok(())
318 }
319
320 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
326 if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
327 self.subscriptions_book_deltas.insert(cmd.instrument_id);
328 self.client.subscribe_book_deltas(cmd)?;
329 }
330
331 Ok(())
332 }
333
334 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
340 if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
341 self.subscriptions_book_deltas.remove(&cmd.instrument_id);
342 self.client.unsubscribe_book_deltas(cmd)?;
343 }
344
345 Ok(())
346 }
347
348 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
354 if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
355 self.subscriptions_book_depth10.insert(cmd.instrument_id);
356 self.client.subscribe_book_depth10(cmd)?;
357 }
358
359 Ok(())
360 }
361
362 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
368 if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
369 self.subscriptions_book_depth10.remove(&cmd.instrument_id);
370 self.client.unsubscribe_book_depth10(cmd)?;
371 }
372
373 Ok(())
374 }
375
376 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
382 if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
383 self.subscriptions_quotes.insert(cmd.instrument_id);
384 self.client.subscribe_quotes(cmd)?;
385 }
386 Ok(())
387 }
388
389 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
395 if self.subscriptions_quotes.contains(&cmd.instrument_id) {
396 self.subscriptions_quotes.remove(&cmd.instrument_id);
397 self.client.unsubscribe_quotes(cmd)?;
398 }
399 Ok(())
400 }
401
402 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
408 if !self.subscriptions_trades.contains(&cmd.instrument_id) {
409 self.subscriptions_trades.insert(cmd.instrument_id);
410 self.client.subscribe_trades(cmd)?;
411 }
412 Ok(())
413 }
414
415 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
421 if self.subscriptions_trades.contains(&cmd.instrument_id) {
422 self.subscriptions_trades.remove(&cmd.instrument_id);
423 self.client.unsubscribe_trades(cmd)?;
424 }
425 Ok(())
426 }
427
428 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
434 if !self.subscriptions_bars.contains(&cmd.bar_type) {
435 self.subscriptions_bars.insert(cmd.bar_type);
436 self.client.subscribe_bars(cmd)?;
437 }
438 Ok(())
439 }
440
441 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
447 if self.subscriptions_bars.contains(&cmd.bar_type) {
448 self.subscriptions_bars.remove(&cmd.bar_type);
449 self.client.unsubscribe_bars(cmd)?;
450 }
451 Ok(())
452 }
453
454 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
460 if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
461 self.subscriptions_mark_prices.insert(cmd.instrument_id);
462 self.client.subscribe_mark_prices(cmd)?;
463 }
464 Ok(())
465 }
466
467 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
473 if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
474 self.subscriptions_mark_prices.remove(&cmd.instrument_id);
475 self.client.unsubscribe_mark_prices(cmd)?;
476 }
477 Ok(())
478 }
479
480 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
486 if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
487 self.subscriptions_index_prices.insert(cmd.instrument_id);
488 self.client.subscribe_index_prices(cmd)?;
489 }
490 Ok(())
491 }
492
493 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
499 if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
500 self.subscriptions_index_prices.remove(&cmd.instrument_id);
501 self.client.unsubscribe_index_prices(cmd)?;
502 }
503 Ok(())
504 }
505
506 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
512 if !self
513 .subscriptions_funding_rates
514 .contains(&cmd.instrument_id)
515 {
516 self.subscriptions_funding_rates.insert(cmd.instrument_id);
517 self.client.subscribe_funding_rates(cmd)?;
518 }
519 Ok(())
520 }
521
522 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
528 if self
529 .subscriptions_funding_rates
530 .contains(&cmd.instrument_id)
531 {
532 self.subscriptions_funding_rates.remove(&cmd.instrument_id);
533 self.client.unsubscribe_funding_rates(cmd)?;
534 }
535 Ok(())
536 }
537
538 fn subscribe_instrument_status(
544 &mut self,
545 cmd: &SubscribeInstrumentStatus,
546 ) -> anyhow::Result<()> {
547 if !self
548 .subscriptions_instrument_status
549 .contains(&cmd.instrument_id)
550 {
551 self.subscriptions_instrument_status
552 .insert(cmd.instrument_id);
553 self.client.subscribe_instrument_status(cmd)?;
554 }
555 Ok(())
556 }
557
558 fn unsubscribe_instrument_status(
564 &mut self,
565 cmd: &UnsubscribeInstrumentStatus,
566 ) -> anyhow::Result<()> {
567 if self
568 .subscriptions_instrument_status
569 .contains(&cmd.instrument_id)
570 {
571 self.subscriptions_instrument_status
572 .remove(&cmd.instrument_id);
573 self.client.unsubscribe_instrument_status(cmd)?;
574 }
575 Ok(())
576 }
577
578 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
584 if !self
585 .subscriptions_instrument_close
586 .contains(&cmd.instrument_id)
587 {
588 self.subscriptions_instrument_close
589 .insert(cmd.instrument_id);
590 self.client.subscribe_instrument_close(cmd)?;
591 }
592 Ok(())
593 }
594
595 fn unsubscribe_instrument_close(
601 &mut self,
602 cmd: &UnsubscribeInstrumentClose,
603 ) -> anyhow::Result<()> {
604 if self
605 .subscriptions_instrument_close
606 .contains(&cmd.instrument_id)
607 {
608 self.subscriptions_instrument_close
609 .remove(&cmd.instrument_id);
610 self.client.unsubscribe_instrument_close(cmd)?;
611 }
612 Ok(())
613 }
614
615 pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
623 self.client.request_data(req)
624 }
625
626 pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
632 self.client.request_instrument(req)
633 }
634
635 pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
641 self.client.request_instruments(req)
642 }
643
644 pub fn request_book_snapshot(&self, req: &RequestBookSnapshot) -> anyhow::Result<()> {
650 self.client.request_book_snapshot(req)
651 }
652
653 pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
659 self.client.request_quotes(req)
660 }
661
662 pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
668 self.client.request_trades(req)
669 }
670
671 pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
677 self.client.request_bars(req)
678 }
679
680 pub fn request_book_depth(&self, req: &RequestBookDepth) -> anyhow::Result<()> {
686 self.client.request_book_depth(req)
687 }
688}