nautilus_data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Base data client functionality.
17
18// Under development
19#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23    cell::RefCell,
24    collections::HashSet,
25    fmt::Debug,
26    ops::{Deref, DerefMut},
27    rc::Rc,
28    sync::Arc,
29};
30
31use indexmap::IndexMap;
32use nautilus_common::{
33    clock::Clock,
34    messages::data::{
35        DataResponse, RequestBars, RequestBookSnapshot, RequestData, RequestInstrument,
36        RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
37        SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, SubscribeData,
38        SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
39        SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
40        SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
41        UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeData, UnsubscribeIndexPrices,
42        UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
43        UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
44    },
45};
46use nautilus_core::UUID4;
47use nautilus_model::{
48    data::{Bar, BarType, DataType, QuoteTick, TradeTick},
49    identifiers::{ClientId, InstrumentId, Venue},
50    instruments::{Instrument, InstrumentAny},
51};
52
53pub trait DataClient {
54    fn client_id(&self) -> ClientId;
55    fn venue(&self) -> Option<Venue>;
56    fn start(&self);
57    fn stop(&self);
58    fn reset(&self);
59    fn dispose(&self);
60    fn is_connected(&self) -> bool;
61    fn is_disconnected(&self) -> bool;
62
63    // TODO: Move to separate trait
64    // A [`LiveDataClient`] must have two channels to send back data and data responses
65    // fn get_response_data_channel(&self) -> tokio::sync::mpsc::UnboundedSender<DataResponse>;
66    // fn get_subscriber_data_channel(&self) -> tokio::sync::mpsc::UnboundedSender<Data>;
67
68    fn subscribe(&mut self, cmd: SubscribeData) -> anyhow::Result<()> {
69        Ok(())
70    }
71    fn subscribe_instruments(&mut self, cmd: SubscribeInstruments) -> anyhow::Result<()> {
72        Ok(())
73    }
74    fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
75        Ok(())
76    }
77    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
78        Ok(())
79    }
80    fn subscribe_book_depth10(&mut self, cmd: SubscribeBookDepth10) -> anyhow::Result<()> {
81        Ok(())
82    }
83    fn subscribe_book_snapshots(&mut self, cmd: SubscribeBookSnapshots) -> anyhow::Result<()> {
84        Ok(())
85    }
86    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
87        Ok(())
88    }
89    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
90        Ok(())
91    }
92    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
93        Ok(())
94    }
95    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
96        Ok(())
97    }
98    fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
99        Ok(())
100    }
101    fn subscribe_instrument_status(
102        &mut self,
103        cmd: SubscribeInstrumentStatus,
104    ) -> anyhow::Result<()> {
105        Ok(())
106    }
107    fn subscribe_instrument_close(&mut self, cmd: SubscribeInstrumentClose) -> anyhow::Result<()> {
108        Ok(())
109    }
110    fn unsubscribe(&mut self, cmd: UnsubscribeData) -> anyhow::Result<()> {
111        Ok(())
112    }
113    fn unsubscribe_instruments(&mut self, cmd: UnsubscribeInstruments) -> anyhow::Result<()> {
114        Ok(())
115    }
116    fn unsubscribe_instrument(&mut self, cmd: UnsubscribeInstrument) -> anyhow::Result<()> {
117        Ok(())
118    }
119    fn unsubscribe_book_deltas(&mut self, cmd: UnsubscribeBookDeltas) -> anyhow::Result<()> {
120        Ok(())
121    }
122    fn unsubscribe_book_depth10(&mut self, cmd: UnsubscribeBookDepth10) -> anyhow::Result<()> {
123        Ok(())
124    }
125    fn unsubscribe_book_snapshots(&mut self, cmd: UnsubscribeBookSnapshots) -> anyhow::Result<()> {
126        Ok(())
127    }
128    fn unsubscribe_quotes(&mut self, cmd: UnsubscribeQuotes) -> anyhow::Result<()> {
129        Ok(())
130    }
131    fn unsubscribe_trades(&mut self, cmd: UnsubscribeTrades) -> anyhow::Result<()> {
132        Ok(())
133    }
134    fn unsubscribe_mark_prices(&mut self, cmd: UnsubscribeMarkPrices) -> anyhow::Result<()> {
135        Ok(())
136    }
137    fn unsubscribe_index_prices(&mut self, cmd: UnsubscribeIndexPrices) -> anyhow::Result<()> {
138        Ok(())
139    }
140    fn unsubscribe_bars(&mut self, cmd: UnsubscribeBars) -> anyhow::Result<()> {
141        Ok(())
142    }
143    fn unsubscribe_instrument_status(
144        &mut self,
145        cmd: UnsubscribeInstrumentStatus,
146    ) -> anyhow::Result<()> {
147        Ok(())
148    }
149    fn unsubscribe_instrument_close(
150        &mut self,
151        cmd: UnsubscribeInstrumentClose,
152    ) -> anyhow::Result<()> {
153        Ok(())
154    }
155
156    fn request_data(&self, request: RequestData) -> anyhow::Result<()>;
157
158    /// Requests instruments data from the data provider.
159    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
160        Ok(())
161    }
162
163    /// Requests instrument data from the data provider.
164    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
165        Ok(())
166    }
167
168    /// Requests a book snapshot from the data provider.
169    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
170        Ok(())
171    }
172
173    /// Requests quotes data from the data provider.
174    fn request_quotes(&self, request: RequestQuotes) -> anyhow::Result<()> {
175        Ok(())
176    }
177
178    /// Requests trades data from the data provider.
179    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
180        Ok(())
181    }
182
183    /// Requests bars data from the data provider.
184    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
185        Ok(())
186    }
187}
188
189pub struct DataClientAdapter {
190    client: Box<dyn DataClient>,
191    clock: Rc<RefCell<dyn Clock>>,
192    pub client_id: ClientId,
193    pub venue: Venue,
194    pub handles_book_deltas: bool,
195    pub handles_book_snapshots: bool,
196    pub subscriptions_generic: HashSet<DataType>,
197    pub subscriptions_book_deltas: HashSet<InstrumentId>,
198    pub subscriptions_book_depth10: HashSet<InstrumentId>,
199    pub subscriptions_book_snapshots: HashSet<InstrumentId>,
200    pub subscriptions_quotes: HashSet<InstrumentId>,
201    pub subscriptions_trades: HashSet<InstrumentId>,
202    pub subscriptions_bars: HashSet<BarType>,
203    pub subscriptions_instrument_status: HashSet<InstrumentId>,
204    pub subscriptions_instrument_close: HashSet<InstrumentId>,
205    pub subscriptions_instrument: HashSet<InstrumentId>,
206    pub subscriptions_instrument_venue: HashSet<Venue>,
207    pub subscriptions_mark_prices: HashSet<InstrumentId>,
208    pub subscriptions_index_prices: HashSet<InstrumentId>,
209}
210
211impl Deref for DataClientAdapter {
212    type Target = Box<dyn DataClient>;
213
214    fn deref(&self) -> &Self::Target {
215        &self.client
216    }
217}
218
219impl DerefMut for DataClientAdapter {
220    fn deref_mut(&mut self) -> &mut Self::Target {
221        &mut self.client
222    }
223}
224
225impl Debug for DataClientAdapter {
226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227        f.debug_struct("DataClientAdapter")
228            .field("client_id", &self.client_id)
229            .field("venue", &self.venue)
230            .field("handles_book_deltas", &self.handles_book_deltas)
231            .field("handles_book_snapshots", &self.handles_book_snapshots)
232            .field("subscriptions_generic", &self.subscriptions_generic)
233            .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
234            .field(
235                "subscriptions_book_depth10",
236                &self.subscriptions_book_depth10,
237            )
238            .field(
239                "subscriptions_book_snapshot",
240                &self.subscriptions_book_snapshots,
241            )
242            .field("subscriptions_quotes", &self.subscriptions_quotes)
243            .field("subscriptions_trades", &self.subscriptions_trades)
244            .field("subscriptions_bars", &self.subscriptions_bars)
245            .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
246            .field(
247                "subscriptions_index_prices",
248                &self.subscriptions_index_prices,
249            )
250            .field(
251                "subscriptions_instrument_status",
252                &self.subscriptions_instrument_status,
253            )
254            .field(
255                "subscriptions_instrument_close",
256                &self.subscriptions_instrument_close,
257            )
258            .field("subscriptions_instrument", &self.subscriptions_instrument)
259            .field(
260                "subscriptions_instrument_venue",
261                &self.subscriptions_instrument_venue,
262            )
263            .finish()
264    }
265}
266
267impl DataClientAdapter {
268    /// Creates a new [`DataClientAdapter`] instance.
269    #[must_use]
270    pub fn new(
271        client_id: ClientId,
272        venue: Venue,
273        handles_order_book_deltas: bool,
274        handles_order_book_snapshots: bool,
275        client: Box<dyn DataClient>,
276        clock: Rc<RefCell<dyn Clock>>,
277    ) -> Self {
278        Self {
279            client,
280            clock,
281            client_id,
282            venue,
283            handles_book_deltas: handles_order_book_deltas,
284            handles_book_snapshots: handles_order_book_snapshots,
285            subscriptions_generic: HashSet::new(),
286            subscriptions_book_deltas: HashSet::new(),
287            subscriptions_book_depth10: HashSet::new(),
288            subscriptions_book_snapshots: HashSet::new(),
289            subscriptions_quotes: HashSet::new(),
290            subscriptions_trades: HashSet::new(),
291            subscriptions_mark_prices: HashSet::new(),
292            subscriptions_index_prices: HashSet::new(),
293            subscriptions_bars: HashSet::new(),
294            subscriptions_instrument_status: HashSet::new(),
295            subscriptions_instrument_close: HashSet::new(),
296            subscriptions_instrument: HashSet::new(),
297            subscriptions_instrument_venue: HashSet::new(),
298        }
299    }
300
301    /// TODO: Decide whether to use mut references for subscription commands
302    pub fn through_execute(&self, command: SubscribeCommand) {}
303
304    // // TODO: Deprecated
305    // pub fn execute(&mut self, command: SubscribeCommand) {
306    //     match command.action {
307    //         Action::Subscribe => self.execute_subscribe_command(command),
308    //         Action::Unsubscribe => self.execute_unsubscribe_command(command),
309    //     }
310    // }
311
312    #[inline]
313    pub fn execute_subscribe_command(&mut self, cmd: SubscribeCommand) {
314        let result = match cmd.clone() {
315            SubscribeCommand::Data(cmd) => self.subscribe(cmd),
316            SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
317            SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
318            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
319            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
320            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
321            SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
322            SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
323            SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
324            SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
325            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
326            SubscribeCommand::InstrumentStatus(cmd) => todo!(),
327            SubscribeCommand::InstrumentClose(cmd) => todo!(),
328        };
329
330        if let Err(e) = result {
331            log::debug!("Error on subscribe: {cmd:?}");
332        }
333    }
334
335    #[inline]
336    pub fn execute_unsubscribe_command(&mut self, cmd: UnsubscribeCommand) {
337        let result = match cmd.clone() {
338            UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
339            UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
340            UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
341            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
342            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
343            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
344            UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
345            UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
346            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
347            UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
348            UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
349            UnsubscribeCommand::InstrumentStatus(cmd) => todo!(),
350            UnsubscribeCommand::InstrumentClose(cmd) => todo!(),
351        };
352
353        if let Err(e) = result {
354            log::debug!("Error on unsubscribe: {cmd:?}");
355        }
356    }
357
358    fn subscribe_instruments(&mut self, cmd: SubscribeInstruments) -> anyhow::Result<()> {
359        if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
360            self.subscriptions_instrument_venue.insert(cmd.venue);
361            self.client.subscribe_instruments(cmd)?;
362        }
363
364        Ok(())
365    }
366
367    fn unsubscribe_instruments(&mut self, cmd: UnsubscribeInstruments) -> anyhow::Result<()> {
368        if self.subscriptions_instrument_venue.contains(&cmd.venue) {
369            self.subscriptions_instrument_venue.remove(&cmd.venue);
370            self.client.unsubscribe_instruments(cmd)?;
371        }
372
373        Ok(())
374    }
375
376    fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
377        if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
378            self.subscriptions_instrument.insert(cmd.instrument_id);
379            self.client.subscribe_instrument(cmd)?;
380        }
381
382        Ok(())
383    }
384
385    fn unsubscribe_instrument(&mut self, cmd: UnsubscribeInstrument) -> anyhow::Result<()> {
386        if self.subscriptions_instrument.contains(&cmd.instrument_id) {
387            self.subscriptions_instrument.remove(&cmd.instrument_id);
388            self.client.unsubscribe_instrument(cmd)?;
389        }
390
391        Ok(())
392    }
393
394    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
395        if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
396            self.subscriptions_book_deltas.insert(cmd.instrument_id);
397            self.client.subscribe_book_deltas(cmd)?;
398        }
399
400        Ok(())
401    }
402
403    fn unsubscribe_book_deltas(&mut self, cmd: UnsubscribeBookDeltas) -> anyhow::Result<()> {
404        if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
405            self.subscriptions_book_deltas.remove(&cmd.instrument_id);
406            self.client.unsubscribe_book_deltas(cmd)?;
407        }
408
409        Ok(())
410    }
411
412    fn subscribe_book_depth10(&mut self, cmd: SubscribeBookDepth10) -> anyhow::Result<()> {
413        if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
414            self.subscriptions_book_depth10.insert(cmd.instrument_id);
415            self.client.subscribe_book_depth10(cmd)?;
416        }
417
418        Ok(())
419    }
420
421    fn unsubscribe_book_depth10(&mut self, cmd: UnsubscribeBookDepth10) -> anyhow::Result<()> {
422        if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
423            self.subscriptions_book_depth10.remove(&cmd.instrument_id);
424            self.client.unsubscribe_book_depth10(cmd)?;
425        }
426
427        Ok(())
428    }
429
430    fn subscribe_book_snapshots(&mut self, cmd: SubscribeBookSnapshots) -> anyhow::Result<()> {
431        if !self
432            .subscriptions_book_snapshots
433            .contains(&cmd.instrument_id)
434        {
435            self.subscriptions_book_snapshots.insert(cmd.instrument_id);
436            self.client.subscribe_book_snapshots(cmd)?;
437        }
438
439        Ok(())
440    }
441
442    fn unsubscribe_snapshots(&mut self, cmd: UnsubscribeBookSnapshots) -> anyhow::Result<()> {
443        if self
444            .subscriptions_book_snapshots
445            .contains(&cmd.instrument_id)
446        {
447            self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
448            self.client.unsubscribe_book_snapshots(cmd)?;
449        }
450
451        Ok(())
452    }
453
454    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
455        if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
456            self.subscriptions_quotes.insert(cmd.instrument_id);
457            self.client.subscribe_quotes(cmd)?;
458        }
459        Ok(())
460    }
461
462    fn unsubscribe_quotes(&mut self, cmd: UnsubscribeQuotes) -> anyhow::Result<()> {
463        if self.subscriptions_quotes.contains(&cmd.instrument_id) {
464            self.subscriptions_quotes.remove(&cmd.instrument_id);
465            self.client.unsubscribe_quotes(cmd)?;
466        }
467        Ok(())
468    }
469
470    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
471        if !self.subscriptions_trades.contains(&cmd.instrument_id) {
472            self.subscriptions_trades.insert(cmd.instrument_id);
473            self.client.subscribe_trades(cmd)?;
474        }
475        Ok(())
476    }
477
478    fn unsubscribe_trades(&mut self, cmd: UnsubscribeTrades) -> anyhow::Result<()> {
479        if self.subscriptions_trades.contains(&cmd.instrument_id) {
480            self.subscriptions_trades.remove(&cmd.instrument_id);
481            self.client.unsubscribe_trades(cmd)?;
482        }
483        Ok(())
484    }
485
486    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
487        if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
488            self.subscriptions_mark_prices.insert(cmd.instrument_id);
489            self.client.subscribe_mark_prices(cmd)?;
490        }
491        Ok(())
492    }
493
494    fn unsubscribe_mark_prices(&mut self, cmd: UnsubscribeMarkPrices) -> anyhow::Result<()> {
495        if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
496            self.subscriptions_mark_prices.remove(&cmd.instrument_id);
497            self.client.unsubscribe_mark_prices(cmd)?;
498        }
499        Ok(())
500    }
501
502    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
503        if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
504            self.subscriptions_index_prices.insert(cmd.instrument_id);
505            self.client.subscribe_index_prices(cmd)?;
506        }
507        Ok(())
508    }
509
510    fn unsubscribe_index_prices(&mut self, cmd: UnsubscribeIndexPrices) -> anyhow::Result<()> {
511        if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
512            self.subscriptions_index_prices.remove(&cmd.instrument_id);
513            self.client.unsubscribe_index_prices(cmd)?;
514        }
515        Ok(())
516    }
517
518    fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
519        if !self.subscriptions_bars.contains(&cmd.bar_type) {
520            self.subscriptions_bars.insert(cmd.bar_type);
521            self.client.subscribe_bars(cmd)?;
522        }
523        Ok(())
524    }
525
526    fn unsubscribe_bars(&mut self, cmd: UnsubscribeBars) -> anyhow::Result<()> {
527        if self.subscriptions_bars.contains(&cmd.bar_type) {
528            self.subscriptions_bars.remove(&cmd.bar_type);
529            self.client.unsubscribe_bars(cmd)?;
530        }
531        Ok(())
532    }
533
534    pub fn subscribe(&mut self, cmd: SubscribeData) -> anyhow::Result<()> {
535        if !self.subscriptions_generic.contains(&cmd.data_type) {
536            self.subscriptions_generic.insert(cmd.data_type.clone());
537            self.client.subscribe(cmd)?;
538        }
539        Ok(())
540    }
541
542    pub fn unsubscribe(&mut self, cmd: UnsubscribeData) -> anyhow::Result<()> {
543        if self.subscriptions_generic.contains(&cmd.data_type) {
544            self.subscriptions_generic.remove(&cmd.data_type);
545            self.client.unsubscribe(cmd)?;
546        }
547        Ok(())
548    }
549
550    // -- DATA REQUEST HANDLERS IMPLEMENTATION ---------------------------------------------------------------------------
551
552    pub fn request_data(&self, req: RequestData) -> anyhow::Result<()> {
553        self.client.request_data(req)
554    }
555
556    pub fn request_instrument(&self, req: RequestInstrument) -> anyhow::Result<()> {
557        self.client.request_instrument(req)
558    }
559
560    pub fn request_instruments(&self, req: RequestInstruments) -> anyhow::Result<()> {
561        self.client.request_instruments(req)
562    }
563
564    pub fn request_quotes(&self, req: RequestQuotes) -> anyhow::Result<()> {
565        self.client.request_quotes(req)
566    }
567
568    pub fn request_trades(&self, req: RequestTrades) -> anyhow::Result<()> {
569        self.client.request_trades(req)
570    }
571
572    pub fn request_bars(&self, req: RequestBars) -> anyhow::Result<()> {
573        self.client.request_bars(req)
574    }
575
576    #[must_use]
577    pub fn handle_instrument(
578        &self,
579        instrument: InstrumentAny,
580        correlation_id: UUID4,
581    ) -> DataResponse {
582        let instrument_id = instrument.id();
583        let metadata = IndexMap::from([("instrument_id".to_string(), instrument_id.to_string())]);
584        let data_type = DataType::new(stringify!(InstrumentAny), Some(metadata));
585        let data = Arc::new(instrument);
586
587        DataResponse::new(
588            correlation_id,
589            self.client_id,
590            instrument_id.venue,
591            data_type,
592            data,
593            self.clock.borrow().timestamp_ns(),
594            None,
595        )
596    }
597
598    #[must_use]
599    pub fn handle_instruments(
600        &self,
601        venue: Venue,
602        instruments: Vec<InstrumentAny>,
603        correlation_id: UUID4,
604    ) -> DataResponse {
605        let metadata = IndexMap::from([("venue".to_string(), venue.to_string())]);
606        let data_type = DataType::new(stringify!(InstrumentAny), Some(metadata));
607        let data = Arc::new(instruments);
608
609        DataResponse::new(
610            correlation_id,
611            self.client_id,
612            venue,
613            data_type,
614            data,
615            self.clock.borrow().timestamp_ns(),
616            None,
617        )
618    }
619
620    #[must_use]
621    pub fn handle_quotes(
622        &self,
623        instrument_id: &InstrumentId,
624        quotes: Vec<QuoteTick>,
625        correlation_id: UUID4,
626    ) -> DataResponse {
627        let metadata = IndexMap::from([("instrument_id".to_string(), instrument_id.to_string())]);
628        let data_type = DataType::new(stringify!(QuoteTick), Some(metadata));
629        let data = Arc::new(quotes);
630
631        DataResponse::new(
632            correlation_id,
633            self.client_id,
634            instrument_id.venue,
635            data_type,
636            data,
637            self.clock.borrow().timestamp_ns(),
638            None,
639        )
640    }
641
642    #[must_use]
643    pub fn handle_trades(
644        &self,
645        instrument_id: &InstrumentId,
646        trades: Vec<TradeTick>,
647        correlation_id: UUID4,
648    ) -> DataResponse {
649        let metadata = IndexMap::from([("instrument_id".to_string(), instrument_id.to_string())]);
650        let data_type = DataType::new(stringify!(TradeTick), Some(metadata));
651        let data = Arc::new(trades);
652
653        DataResponse::new(
654            correlation_id,
655            self.client_id,
656            instrument_id.venue,
657            data_type,
658            data,
659            self.clock.borrow().timestamp_ns(),
660            None,
661        )
662    }
663
664    #[must_use]
665    pub fn handle_bars(
666        &self,
667        bar_type: &BarType,
668        bars: Vec<Bar>,
669        correlation_id: UUID4,
670    ) -> DataResponse {
671        let metadata = IndexMap::from([("bar_type".to_string(), bar_type.to_string())]);
672        let data_type = DataType::new(stringify!(Bar), Some(metadata));
673        let data = Arc::new(bars);
674
675        DataResponse::new(
676            correlation_id,
677            self.client_id,
678            bar_type.instrument_id().venue,
679            data_type,
680            data,
681            self.clock.borrow().timestamp_ns(),
682            None,
683        )
684    }
685}