nautilus_data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Base data client functionality.
17
18// Under development
19#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23    cell::RefCell,
24    collections::{HashMap, HashSet},
25    fmt::Debug,
26    ops::{Deref, DerefMut},
27    rc::Rc,
28    sync::Arc,
29};
30
31use indexmap::IndexMap;
32use nautilus_common::{
33    clock::{Clock, TestClock},
34    messages::data::{Action, DataRequest, DataResponse, Payload, SubscriptionCommand},
35};
36use nautilus_core::{UUID4, UnixNanos};
37use nautilus_model::{
38    data::{Bar, BarType, DataType, QuoteTick, TradeTick},
39    enums::BookType,
40    identifiers::{ClientId, InstrumentId, Venue},
41    instruments::InstrumentAny,
42};
43
44pub trait DataClient {
45    fn client_id(&self) -> ClientId;
46    fn venue(&self) -> Option<Venue>;
47    fn start(&self);
48    fn stop(&self);
49    fn reset(&self);
50    fn dispose(&self);
51    fn is_connected(&self) -> bool;
52    fn is_disconnected(&self) -> bool;
53
54    // TODO: Move to separate trait
55    // A [`LiveDataClient`] must have two channels to send back data and data responses
56    // fn get_response_data_channel(&self) -> tokio::sync::mpsc::UnboundedSender<DataResponse>;
57    // fn get_subscriber_data_channel(&self) -> tokio::sync::mpsc::UnboundedSender<Data>;
58
59    // -- COMMAND HANDLERS ------------------------------------------------------------------------
60
61    /// Parse command and call specific function
62    fn subscribe(
63        &mut self,
64        data_type: &DataType,
65        params: &Option<HashMap<String, String>>,
66    ) -> anyhow::Result<()>;
67    fn subscribe_instruments(
68        &mut self,
69        venue: Option<&Venue>,
70        params: &Option<HashMap<String, String>>,
71    ) -> anyhow::Result<()>;
72    fn subscribe_instrument(
73        &mut self,
74        instrument_id: &InstrumentId,
75        params: &Option<HashMap<String, String>>,
76    ) -> anyhow::Result<()>;
77    fn subscribe_order_book_deltas(
78        &mut self,
79        instrument_id: &InstrumentId,
80        book_type: BookType,
81        depth: Option<usize>,
82        params: &Option<HashMap<String, String>>,
83    ) -> anyhow::Result<()>;
84    fn subscribe_order_book_snapshots(
85        &mut self,
86        instrument_id: &InstrumentId,
87        book_type: BookType,
88        depth: Option<usize>,
89        params: &Option<HashMap<String, String>>,
90    ) -> anyhow::Result<()>;
91    fn subscribe_quote_ticks(
92        &mut self,
93        instrument_id: &InstrumentId,
94        params: &Option<HashMap<String, String>>,
95    ) -> anyhow::Result<()>;
96    fn subscribe_trade_ticks(
97        &mut self,
98        instrument_id: &InstrumentId,
99        params: &Option<HashMap<String, String>>,
100    ) -> anyhow::Result<()>;
101    fn subscribe_bars(
102        &mut self,
103        bar_type: &BarType,
104        params: &Option<HashMap<String, String>>,
105    ) -> anyhow::Result<()>;
106    fn subscribe_instrument_status(
107        &mut self,
108        instrument_id: &InstrumentId,
109        params: &Option<HashMap<String, String>>,
110    ) -> anyhow::Result<()>;
111    fn subscribe_instrument_close(
112        &mut self,
113        instrument_id: &InstrumentId,
114        params: &Option<HashMap<String, String>>,
115    ) -> anyhow::Result<()>;
116    fn unsubscribe(
117        &mut self,
118        data_type: &DataType,
119        params: &Option<HashMap<String, String>>,
120    ) -> anyhow::Result<()>;
121    fn unsubscribe_instruments(
122        &mut self,
123        venue: Option<&Venue>,
124        params: &Option<HashMap<String, String>>,
125    ) -> anyhow::Result<()>;
126    fn unsubscribe_instrument(
127        &mut self,
128        instrument_id: &InstrumentId,
129        params: &Option<HashMap<String, String>>,
130    ) -> anyhow::Result<()>;
131    fn unsubscribe_order_book_deltas(
132        &mut self,
133        instrument_id: &InstrumentId,
134        params: &Option<HashMap<String, String>>,
135    ) -> anyhow::Result<()>;
136    fn unsubscribe_order_book_snapshots(
137        &mut self,
138        instrument_id: &InstrumentId,
139        params: &Option<HashMap<String, String>>,
140    ) -> anyhow::Result<()>;
141    fn unsubscribe_quote_ticks(
142        &mut self,
143        instrument_id: &InstrumentId,
144        params: &Option<HashMap<String, String>>,
145    ) -> anyhow::Result<()>;
146    fn unsubscribe_trade_ticks(
147        &mut self,
148        instrument_id: &InstrumentId,
149        params: &Option<HashMap<String, String>>,
150    ) -> anyhow::Result<()>;
151    fn unsubscribe_bars(
152        &mut self,
153        bar_type: &BarType,
154        params: &Option<HashMap<String, String>>,
155    ) -> anyhow::Result<()>;
156    fn unsubscribe_instrument_status(
157        &mut self,
158        instrument_id: &InstrumentId,
159        params: &Option<HashMap<String, String>>,
160    ) -> anyhow::Result<()>;
161    fn unsubscribe_instrument_close(
162        &mut self,
163        instrument_id: &InstrumentId,
164        params: &Option<HashMap<String, String>>,
165    ) -> anyhow::Result<()>;
166
167    // -- DATA REQUEST HANDLERS -------------------------------------------------------------------
168
169    fn request_data(&self, request: DataRequest);
170    fn request_instruments(
171        &self,
172        correlation_id: UUID4,
173        venue: Venue,
174        start: Option<UnixNanos>,
175        end: Option<UnixNanos>,
176        params: &Option<HashMap<String, String>>,
177    ) -> Vec<InstrumentAny>;
178    fn request_instrument(
179        &self,
180        correlation_id: UUID4,
181        instrument_id: InstrumentId,
182        start: Option<UnixNanos>,
183        end: Option<UnixNanos>,
184        params: &Option<HashMap<String, String>>,
185    ) -> InstrumentAny;
186    // TODO: figure out where to call this and it's return type
187    fn request_order_book_snapshot(
188        &self,
189        correlation_id: UUID4,
190        instrument_id: InstrumentId,
191        depth: Option<usize>,
192        params: &Option<HashMap<String, String>>,
193    ) -> Payload;
194    fn request_quote_ticks(
195        &self,
196        correlation_id: UUID4,
197        instrument_id: InstrumentId,
198        start: Option<UnixNanos>,
199        end: Option<UnixNanos>,
200        limit: Option<usize>,
201        params: &Option<HashMap<String, String>>,
202    ) -> Vec<QuoteTick>;
203    fn request_trade_ticks(
204        &self,
205        correlation_id: UUID4,
206        instrument_id: InstrumentId,
207        start: Option<UnixNanos>,
208        end: Option<UnixNanos>,
209        limit: Option<usize>,
210        params: &Option<HashMap<String, String>>,
211    ) -> Vec<TradeTick>;
212    fn request_bars(
213        &self,
214        correlation_id: UUID4,
215        bar_type: BarType,
216        start: Option<UnixNanos>,
217        end: Option<UnixNanos>,
218        limit: Option<usize>,
219        params: &Option<HashMap<String, String>>,
220    ) -> Vec<Bar>;
221}
222
223pub struct DataClientAdapter {
224    client: Box<dyn DataClient>,
225    clock: Rc<RefCell<TestClock>>,
226    pub client_id: ClientId,
227    pub venue: Venue,
228    pub handles_order_book_deltas: bool,
229    pub handles_order_book_snapshots: bool,
230    pub subscriptions_generic: HashSet<DataType>,
231    pub subscriptions_order_book_delta: HashSet<InstrumentId>,
232    pub subscriptions_order_book_snapshot: HashSet<InstrumentId>,
233    pub subscriptions_quote_tick: HashSet<InstrumentId>,
234    pub subscriptions_trade_tick: HashSet<InstrumentId>,
235    pub subscriptions_bar: HashSet<BarType>,
236    pub subscriptions_instrument_status: HashSet<InstrumentId>,
237    pub subscriptions_instrument_close: HashSet<InstrumentId>,
238    pub subscriptions_instrument: HashSet<InstrumentId>,
239    pub subscriptions_instrument_venue: HashSet<Venue>,
240}
241
242impl Deref for DataClientAdapter {
243    type Target = Box<dyn DataClient>;
244
245    fn deref(&self) -> &Self::Target {
246        &self.client
247    }
248}
249
250impl DerefMut for DataClientAdapter {
251    fn deref_mut(&mut self) -> &mut Self::Target {
252        &mut self.client
253    }
254}
255
256impl Debug for DataClientAdapter {
257    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258        f.debug_struct("DataClientAdapter")
259            .field("client_id", &self.client_id)
260            .field("venue", &self.venue)
261            .field("handles_order_book_deltas", &self.handles_order_book_deltas)
262            .field(
263                "handles_order_book_snapshots",
264                &self.handles_order_book_snapshots,
265            )
266            .field("subscriptions_generic", &self.subscriptions_generic)
267            .field(
268                "subscriptions_order_book_delta",
269                &self.subscriptions_order_book_delta,
270            )
271            .field(
272                "subscriptions_order_book_snapshot",
273                &self.subscriptions_order_book_snapshot,
274            )
275            .field("subscriptions_quote_tick", &self.subscriptions_quote_tick)
276            .field("subscriptions_trade_tick", &self.subscriptions_trade_tick)
277            .field("subscriptions_bar", &self.subscriptions_bar)
278            .field(
279                "subscriptions_instrument_status",
280                &self.subscriptions_instrument_status,
281            )
282            .field(
283                "subscriptions_instrument_close",
284                &self.subscriptions_instrument_close,
285            )
286            .field("subscriptions_instrument", &self.subscriptions_instrument)
287            .field(
288                "subscriptions_instrument_venue",
289                &self.subscriptions_instrument_venue,
290            )
291            .finish()
292    }
293}
294
295impl DataClientAdapter {
296    /// Creates a new [`DataClientAdapter`] instance.
297    #[must_use]
298    pub fn new(
299        client_id: ClientId,
300        venue: Venue,
301        handles_order_book_deltas: bool,
302        handles_order_book_snapshots: bool,
303        client: Box<dyn DataClient>,
304        clock: Rc<RefCell<TestClock>>,
305    ) -> Self {
306        Self {
307            client,
308            clock,
309            client_id,
310            venue,
311            handles_order_book_deltas,
312            handles_order_book_snapshots,
313            subscriptions_generic: HashSet::new(),
314            subscriptions_order_book_delta: HashSet::new(),
315            subscriptions_order_book_snapshot: HashSet::new(),
316            subscriptions_quote_tick: HashSet::new(),
317            subscriptions_trade_tick: HashSet::new(),
318            subscriptions_bar: HashSet::new(),
319            subscriptions_instrument_status: HashSet::new(),
320            subscriptions_instrument_close: HashSet::new(),
321            subscriptions_instrument: HashSet::new(),
322            subscriptions_instrument_venue: HashSet::new(),
323        }
324    }
325
326    /// TODO: Decide whether to use mut references for subscription commands
327    pub fn through_execute(&self, command: SubscriptionCommand) {}
328
329    pub fn execute(&mut self, command: SubscriptionCommand) {
330        match command.action {
331            Action::Subscribe => self.execute_subscribe_command(command),
332            Action::Unsubscribe => self.execute_unsubscribe_command(command),
333        }
334    }
335
336    #[inline]
337    fn execute_subscribe_command(&mut self, command: SubscriptionCommand) {
338        match command.data_type.type_name() {
339            stringify!(InstrumentAny) => Self::subscribe_instrument(self, command),
340            stringify!(OrderBookDelta) => Self::subscribe_order_book_deltas(self, command),
341            stringify!(OrderBookDeltas) | stringify!(OrderBookDepth10) => {
342                Self::subscribe_snapshots(self, command);
343            }
344            stringify!(QuoteTick) => Self::subscribe_quote_ticks(self, command),
345            stringify!(TradeTick) => Self::subscribe_trade_ticks(self, command),
346            stringify!(Bar) => Self::subscribe_bars(self, command),
347            _ => Self::subscribe(self, command),
348        }
349    }
350
351    #[inline]
352    fn execute_unsubscribe_command(&mut self, command: SubscriptionCommand) {
353        match command.data_type.type_name() {
354            stringify!(InstrumentAny) => Self::unsubscribe_instrument(self, command),
355            stringify!(OrderBookDelta) => Self::unsubscribe_order_book_deltas(self, command),
356            stringify!(OrderBookDeltas) | stringify!(OrderBookDepth10) => {
357                Self::unsubscribe_snapshots(self, command);
358            }
359            stringify!(QuoteTick) => Self::unsubscribe_quote_ticks(self, command),
360            stringify!(TradeTick) => Self::unsubscribe_trade_ticks(self, command),
361            stringify!(Bar) => Self::unsubscribe_bars(self, command),
362            _ => Self::unsubscribe(self, command),
363        }
364    }
365
366    fn subscribe_instrument(&mut self, command: SubscriptionCommand) {
367        let instrument_id = command.data_type.instrument_id();
368        let venue = command.data_type.venue();
369
370        if let Some(instrument_id) = instrument_id {
371            // TODO: consider using insert_with once it stabilizes
372            // https://github.com/rust-lang/rust/issues/60896
373            if !self.subscriptions_instrument.contains(&instrument_id) {
374                self.client
375                    .subscribe_instrument(&instrument_id, &command.params)
376                    .expect("Error on subscribe");
377            }
378
379            self.subscriptions_instrument.insert(instrument_id);
380        }
381
382        if let Some(venue) = venue {
383            if !self.subscriptions_instrument_venue.contains(&venue) {
384                self.client
385                    .subscribe_instruments(Some(&venue), &command.params)
386                    .expect("Error on subscribe");
387            }
388
389            self.subscriptions_instrument_venue.insert(venue);
390        }
391    }
392
393    fn unsubscribe_instrument(&mut self, command: SubscriptionCommand) {
394        let instrument_id = command.data_type.instrument_id();
395        let venue = command.data_type.venue();
396
397        if let Some(instrument_id) = instrument_id {
398            if self.subscriptions_instrument.contains(&instrument_id) {
399                self.client
400                    .unsubscribe_instrument(&instrument_id, &command.params)
401                    .expect("Error on subscribe");
402            }
403
404            self.subscriptions_instrument.remove(&instrument_id);
405        }
406
407        if let Some(venue) = venue {
408            if self.subscriptions_instrument_venue.contains(&venue) {
409                self.client
410                    .unsubscribe_instruments(Some(&venue), &command.params)
411                    .expect("Error on subscribe");
412            }
413
414            self.subscriptions_instrument_venue.remove(&venue);
415        }
416    }
417
418    fn subscribe_order_book_deltas(&mut self, command: SubscriptionCommand) {
419        let instrument_id = command
420            .data_type
421            .instrument_id()
422            .expect("Error on subscribe: no 'instrument_id' in metadata");
423
424        let book_type = command.data_type.book_type();
425        let depth = command.data_type.depth();
426
427        if !self.subscriptions_order_book_delta.contains(&instrument_id) {
428            self.client
429                .subscribe_order_book_deltas(&instrument_id, book_type, depth, &command.params)
430                .expect("Error on subscribe");
431        }
432
433        self.subscriptions_order_book_delta.insert(instrument_id);
434    }
435
436    fn unsubscribe_order_book_deltas(&mut self, command: SubscriptionCommand) {
437        let instrument_id = command
438            .data_type
439            .instrument_id()
440            .expect("Error on subscribe: no 'instrument_id' in metadata");
441
442        if self.subscriptions_order_book_delta.contains(&instrument_id) {
443            self.client
444                .unsubscribe_order_book_deltas(&instrument_id, &command.params)
445                .expect("Error on subscribe");
446        }
447
448        self.subscriptions_order_book_delta.remove(&instrument_id);
449    }
450
451    fn subscribe_snapshots(&mut self, command: SubscriptionCommand) {
452        let instrument_id = command
453            .data_type
454            .instrument_id()
455            .expect("Error on subscribe: no 'instrument_id' in metadata");
456
457        let book_type = command.data_type.book_type();
458        let depth = command.data_type.depth();
459
460        if !self
461            .subscriptions_order_book_snapshot
462            .contains(&instrument_id)
463        {
464            self.client
465                .subscribe_order_book_snapshots(&instrument_id, book_type, depth, &command.params)
466                .expect("Error on subscribe");
467        }
468
469        self.subscriptions_order_book_snapshot.insert(instrument_id);
470    }
471
472    fn unsubscribe_snapshots(&mut self, command: SubscriptionCommand) {
473        let instrument_id = command
474            .data_type
475            .instrument_id()
476            .expect("Error on subscribe: no 'instrument_id' in metadata");
477
478        if self
479            .subscriptions_order_book_snapshot
480            .contains(&instrument_id)
481        {
482            self.client
483                .unsubscribe_order_book_snapshots(&instrument_id, &command.params)
484                .expect("Error on subscribe");
485        }
486
487        self.subscriptions_order_book_snapshot
488            .remove(&instrument_id);
489    }
490
491    fn subscribe_quote_ticks(&mut self, command: SubscriptionCommand) {
492        let instrument_id = command
493            .data_type
494            .instrument_id()
495            .expect("Error on subscribe: no 'instrument_id' in metadata");
496
497        if !self.subscriptions_quote_tick.contains(&instrument_id) {
498            self.client
499                .subscribe_quote_ticks(&instrument_id, &command.params)
500                .expect("Error on subscribe");
501        }
502        self.subscriptions_quote_tick.insert(instrument_id);
503    }
504
505    fn unsubscribe_quote_ticks(&mut self, command: SubscriptionCommand) {
506        let instrument_id = command
507            .data_type
508            .instrument_id()
509            .expect("Error on subscribe: no 'instrument_id' in metadata");
510
511        if self.subscriptions_quote_tick.contains(&instrument_id) {
512            self.client
513                .unsubscribe_quote_ticks(&instrument_id, &command.params)
514                .expect("Error on subscribe");
515        }
516        self.subscriptions_quote_tick.remove(&instrument_id);
517    }
518
519    fn unsubscribe_trade_ticks(&mut self, command: SubscriptionCommand) {
520        let instrument_id = command
521            .data_type
522            .instrument_id()
523            .expect("Error on subscribe: no 'instrument_id' in metadata");
524
525        if self.subscriptions_trade_tick.contains(&instrument_id) {
526            self.client
527                .unsubscribe_trade_ticks(&instrument_id, &command.params)
528                .expect("Error on subscribe");
529        }
530        self.subscriptions_trade_tick.remove(&instrument_id);
531    }
532
533    fn subscribe_trade_ticks(&mut self, command: SubscriptionCommand) {
534        let instrument_id = command
535            .data_type
536            .instrument_id()
537            .expect("Error on subscribe: no 'instrument_id' in metadata");
538
539        if !self.subscriptions_trade_tick.contains(&instrument_id) {
540            self.client
541                .subscribe_trade_ticks(&instrument_id, &command.params)
542                .expect("Error on subscribe");
543        }
544        self.subscriptions_trade_tick.insert(instrument_id);
545    }
546
547    fn subscribe_bars(&mut self, command: SubscriptionCommand) {
548        let bar_type = command.data_type.bar_type();
549
550        if !self.subscriptions_bar.contains(&bar_type) {
551            self.client
552                .subscribe_bars(&bar_type, &command.params)
553                .expect("Error on subscribe");
554        }
555        self.subscriptions_bar.insert(bar_type);
556    }
557
558    fn unsubscribe_bars(&mut self, command: SubscriptionCommand) {
559        let bar_type = command.data_type.bar_type();
560
561        if self.subscriptions_bar.contains(&bar_type) {
562            self.client
563                .subscribe_bars(&bar_type, &command.params)
564                .expect("Error on subscribe");
565        }
566        self.subscriptions_bar.remove(&bar_type);
567    }
568
569    pub fn subscribe(&mut self, command: SubscriptionCommand) {
570        let data_type = command.data_type;
571        if !self.subscriptions_generic.contains(&data_type) {
572            self.client
573                .subscribe(&data_type, &command.params)
574                .expect("Error on subscribe");
575        }
576        self.subscriptions_generic.insert(data_type);
577    }
578
579    pub fn unsubscribe(&mut self, command: SubscriptionCommand) {
580        let data_type = command.data_type;
581        if self.subscriptions_generic.contains(&data_type) {
582            self.client
583                .unsubscribe(&data_type, &command.params)
584                .expect("Error on unsubscribe");
585        }
586        self.subscriptions_generic.remove(&data_type);
587    }
588
589    // -- DATA REQUEST HANDLERS IMPLEMENTATION ---------------------------------------------------------------------------
590
591    /// TODO: New clients implement a request pattern
592    /// that does not return a `DataResponse` directly
593    /// it internally uses a queue/channel to send
594    /// back response
595    pub fn through_request(&self, req: DataRequest) {
596        self.client.request_data(req);
597    }
598
599    #[must_use]
600    pub fn request(&self, req: DataRequest) -> DataResponse {
601        let instrument_id = req.data_type.instrument_id();
602        let venue = req.data_type.venue();
603        let start = req.data_type.start();
604        let end = req.data_type.end();
605        let limit = req.data_type.limit();
606
607        match req.data_type.type_name() {
608            stringify!(InstrumentAny) => match (instrument_id, venue) {
609                (None, Some(venue)) => {
610                    let instruments = self.client.request_instruments(
611                        req.correlation_id,
612                        venue,
613                        start,
614                        end,
615                        &req.params,
616                    );
617                    self.handle_instruments(venue, instruments, req.correlation_id)
618                }
619                (Some(instrument_id), None) => {
620                    let instrument = self.client.request_instrument(
621                        req.correlation_id,
622                        instrument_id,
623                        start,
624                        end,
625                        &req.params,
626                    );
627                    self.handle_instrument(instrument, req.correlation_id)
628                }
629                _ => {
630                    todo!()
631                }
632            },
633            stringify!(QuoteTick) => {
634                let instrument_id =
635                    instrument_id.expect("Error on request: no 'instrument_id' found in metadata");
636                let quotes = self.client.request_quote_ticks(
637                    req.correlation_id,
638                    instrument_id,
639                    start,
640                    end,
641                    limit,
642                    &req.params,
643                );
644                self.handle_quote_ticks(&instrument_id, quotes, req.correlation_id)
645            }
646            stringify!(TradeTick) => {
647                let instrument_id =
648                    instrument_id.expect("Error on request: no 'instrument_id' found in metadata");
649                let trades = self.client.request_trade_ticks(
650                    req.correlation_id,
651                    instrument_id,
652                    start,
653                    end,
654                    limit,
655                    &req.params,
656                );
657                self.handle_trade_ticks(&instrument_id, trades, req.correlation_id)
658            }
659            stringify!(Bar) => {
660                let bar_type = req.data_type.bar_type();
661                let bars = self.client.request_bars(
662                    req.correlation_id,
663                    bar_type,
664                    start,
665                    end,
666                    limit,
667                    &req.params,
668                );
669                self.handle_bars(&bar_type, bars, req.correlation_id)
670            }
671            _ => {
672                todo!()
673            }
674        }
675    }
676
677    #[must_use]
678    pub fn handle_instrument(
679        &self,
680        instrument: InstrumentAny,
681        correlation_id: UUID4,
682    ) -> DataResponse {
683        let instrument_id = instrument.id();
684        let metadata = IndexMap::from([("instrument_id".to_string(), instrument_id.to_string())]);
685        let data_type = DataType::new(stringify!(InstrumentAny), Some(metadata));
686        let data = Arc::new(instrument);
687
688        DataResponse::new(
689            correlation_id,
690            self.client_id,
691            instrument_id.venue,
692            data_type,
693            data,
694            self.clock.borrow().timestamp_ns(),
695            None,
696        )
697    }
698
699    #[must_use]
700    pub fn handle_instruments(
701        &self,
702        venue: Venue,
703        instruments: Vec<InstrumentAny>,
704        correlation_id: UUID4,
705    ) -> DataResponse {
706        let metadata = IndexMap::from([("venue".to_string(), venue.to_string())]);
707        let data_type = DataType::new(stringify!(InstrumentAny), Some(metadata));
708        let data = Arc::new(instruments);
709
710        DataResponse::new(
711            correlation_id,
712            self.client_id,
713            venue,
714            data_type,
715            data,
716            self.clock.borrow().timestamp_ns(),
717            None,
718        )
719    }
720
721    #[must_use]
722    pub fn handle_quote_ticks(
723        &self,
724        instrument_id: &InstrumentId,
725        quotes: Vec<QuoteTick>,
726        correlation_id: UUID4,
727    ) -> DataResponse {
728        let metadata = IndexMap::from([("instrument_id".to_string(), instrument_id.to_string())]);
729        let data_type = DataType::new(stringify!(QuoteTick), Some(metadata));
730        let data = Arc::new(quotes);
731
732        DataResponse::new(
733            correlation_id,
734            self.client_id,
735            instrument_id.venue,
736            data_type,
737            data,
738            self.clock.borrow().timestamp_ns(),
739            None,
740        )
741    }
742
743    #[must_use]
744    pub fn handle_trade_ticks(
745        &self,
746        instrument_id: &InstrumentId,
747        trades: Vec<TradeTick>,
748        correlation_id: UUID4,
749    ) -> DataResponse {
750        let metadata = IndexMap::from([("instrument_id".to_string(), instrument_id.to_string())]);
751        let data_type = DataType::new(stringify!(TradeTick), Some(metadata));
752        let data = Arc::new(trades);
753
754        DataResponse::new(
755            correlation_id,
756            self.client_id,
757            instrument_id.venue,
758            data_type,
759            data,
760            self.clock.borrow().timestamp_ns(),
761            None,
762        )
763    }
764
765    #[must_use]
766    pub fn handle_bars(
767        &self,
768        bar_type: &BarType,
769        bars: Vec<Bar>,
770        correlation_id: UUID4,
771    ) -> DataResponse {
772        let metadata = IndexMap::from([("bar_type".to_string(), bar_type.to_string())]);
773        let data_type = DataType::new(stringify!(Bar), Some(metadata));
774        let data = Arc::new(bars);
775
776        DataResponse::new(
777            correlation_id,
778            self.client_id,
779            bar_type.instrument_id().venue,
780            data_type,
781            data,
782            self.clock.borrow().timestamp_ns(),
783            None,
784        )
785    }
786}