1#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23 cell::RefCell,
24 collections::{HashMap, HashSet},
25 fmt::Debug,
26 ops::{Deref, DerefMut},
27 rc::Rc,
28 sync::Arc,
29};
30
31use indexmap::IndexMap;
32use nautilus_common::{
33 clock::{Clock, TestClock},
34 messages::data::{Action, DataRequest, DataResponse, Payload, SubscriptionCommand},
35};
36use nautilus_core::{UUID4, UnixNanos};
37use nautilus_model::{
38 data::{Bar, BarType, DataType, QuoteTick, TradeTick},
39 enums::BookType,
40 identifiers::{ClientId, InstrumentId, Venue},
41 instruments::InstrumentAny,
42};
43
44pub trait DataClient {
45 fn client_id(&self) -> ClientId;
46 fn venue(&self) -> Option<Venue>;
47 fn start(&self);
48 fn stop(&self);
49 fn reset(&self);
50 fn dispose(&self);
51 fn is_connected(&self) -> bool;
52 fn is_disconnected(&self) -> bool;
53
54 fn subscribe(
63 &mut self,
64 data_type: &DataType,
65 params: &Option<HashMap<String, String>>,
66 ) -> anyhow::Result<()>;
67 fn subscribe_instruments(
68 &mut self,
69 venue: Option<&Venue>,
70 params: &Option<HashMap<String, String>>,
71 ) -> anyhow::Result<()>;
72 fn subscribe_instrument(
73 &mut self,
74 instrument_id: &InstrumentId,
75 params: &Option<HashMap<String, String>>,
76 ) -> anyhow::Result<()>;
77 fn subscribe_order_book_deltas(
78 &mut self,
79 instrument_id: &InstrumentId,
80 book_type: BookType,
81 depth: Option<usize>,
82 params: &Option<HashMap<String, String>>,
83 ) -> anyhow::Result<()>;
84 fn subscribe_order_book_snapshots(
85 &mut self,
86 instrument_id: &InstrumentId,
87 book_type: BookType,
88 depth: Option<usize>,
89 params: &Option<HashMap<String, String>>,
90 ) -> anyhow::Result<()>;
91 fn subscribe_quote_ticks(
92 &mut self,
93 instrument_id: &InstrumentId,
94 params: &Option<HashMap<String, String>>,
95 ) -> anyhow::Result<()>;
96 fn subscribe_trade_ticks(
97 &mut self,
98 instrument_id: &InstrumentId,
99 params: &Option<HashMap<String, String>>,
100 ) -> anyhow::Result<()>;
101 fn subscribe_bars(
102 &mut self,
103 bar_type: &BarType,
104 params: &Option<HashMap<String, String>>,
105 ) -> anyhow::Result<()>;
106 fn subscribe_instrument_status(
107 &mut self,
108 instrument_id: &InstrumentId,
109 params: &Option<HashMap<String, String>>,
110 ) -> anyhow::Result<()>;
111 fn subscribe_instrument_close(
112 &mut self,
113 instrument_id: &InstrumentId,
114 params: &Option<HashMap<String, String>>,
115 ) -> anyhow::Result<()>;
116 fn unsubscribe(
117 &mut self,
118 data_type: &DataType,
119 params: &Option<HashMap<String, String>>,
120 ) -> anyhow::Result<()>;
121 fn unsubscribe_instruments(
122 &mut self,
123 venue: Option<&Venue>,
124 params: &Option<HashMap<String, String>>,
125 ) -> anyhow::Result<()>;
126 fn unsubscribe_instrument(
127 &mut self,
128 instrument_id: &InstrumentId,
129 params: &Option<HashMap<String, String>>,
130 ) -> anyhow::Result<()>;
131 fn unsubscribe_order_book_deltas(
132 &mut self,
133 instrument_id: &InstrumentId,
134 params: &Option<HashMap<String, String>>,
135 ) -> anyhow::Result<()>;
136 fn unsubscribe_order_book_snapshots(
137 &mut self,
138 instrument_id: &InstrumentId,
139 params: &Option<HashMap<String, String>>,
140 ) -> anyhow::Result<()>;
141 fn unsubscribe_quote_ticks(
142 &mut self,
143 instrument_id: &InstrumentId,
144 params: &Option<HashMap<String, String>>,
145 ) -> anyhow::Result<()>;
146 fn unsubscribe_trade_ticks(
147 &mut self,
148 instrument_id: &InstrumentId,
149 params: &Option<HashMap<String, String>>,
150 ) -> anyhow::Result<()>;
151 fn unsubscribe_bars(
152 &mut self,
153 bar_type: &BarType,
154 params: &Option<HashMap<String, String>>,
155 ) -> anyhow::Result<()>;
156 fn unsubscribe_instrument_status(
157 &mut self,
158 instrument_id: &InstrumentId,
159 params: &Option<HashMap<String, String>>,
160 ) -> anyhow::Result<()>;
161 fn unsubscribe_instrument_close(
162 &mut self,
163 instrument_id: &InstrumentId,
164 params: &Option<HashMap<String, String>>,
165 ) -> anyhow::Result<()>;
166
167 fn request_data(&self, request: DataRequest);
170 fn request_instruments(
171 &self,
172 correlation_id: UUID4,
173 venue: Venue,
174 start: Option<UnixNanos>,
175 end: Option<UnixNanos>,
176 params: &Option<HashMap<String, String>>,
177 ) -> Vec<InstrumentAny>;
178 fn request_instrument(
179 &self,
180 correlation_id: UUID4,
181 instrument_id: InstrumentId,
182 start: Option<UnixNanos>,
183 end: Option<UnixNanos>,
184 params: &Option<HashMap<String, String>>,
185 ) -> InstrumentAny;
186 fn request_order_book_snapshot(
188 &self,
189 correlation_id: UUID4,
190 instrument_id: InstrumentId,
191 depth: Option<usize>,
192 params: &Option<HashMap<String, String>>,
193 ) -> Payload;
194 fn request_quote_ticks(
195 &self,
196 correlation_id: UUID4,
197 instrument_id: InstrumentId,
198 start: Option<UnixNanos>,
199 end: Option<UnixNanos>,
200 limit: Option<usize>,
201 params: &Option<HashMap<String, String>>,
202 ) -> Vec<QuoteTick>;
203 fn request_trade_ticks(
204 &self,
205 correlation_id: UUID4,
206 instrument_id: InstrumentId,
207 start: Option<UnixNanos>,
208 end: Option<UnixNanos>,
209 limit: Option<usize>,
210 params: &Option<HashMap<String, String>>,
211 ) -> Vec<TradeTick>;
212 fn request_bars(
213 &self,
214 correlation_id: UUID4,
215 bar_type: BarType,
216 start: Option<UnixNanos>,
217 end: Option<UnixNanos>,
218 limit: Option<usize>,
219 params: &Option<HashMap<String, String>>,
220 ) -> Vec<Bar>;
221}
222
223pub struct DataClientAdapter {
224 client: Box<dyn DataClient>,
225 clock: Rc<RefCell<TestClock>>,
226 pub client_id: ClientId,
227 pub venue: Venue,
228 pub handles_order_book_deltas: bool,
229 pub handles_order_book_snapshots: bool,
230 pub subscriptions_generic: HashSet<DataType>,
231 pub subscriptions_order_book_delta: HashSet<InstrumentId>,
232 pub subscriptions_order_book_snapshot: HashSet<InstrumentId>,
233 pub subscriptions_quote_tick: HashSet<InstrumentId>,
234 pub subscriptions_trade_tick: HashSet<InstrumentId>,
235 pub subscriptions_bar: HashSet<BarType>,
236 pub subscriptions_instrument_status: HashSet<InstrumentId>,
237 pub subscriptions_instrument_close: HashSet<InstrumentId>,
238 pub subscriptions_instrument: HashSet<InstrumentId>,
239 pub subscriptions_instrument_venue: HashSet<Venue>,
240}
241
242impl Deref for DataClientAdapter {
243 type Target = Box<dyn DataClient>;
244
245 fn deref(&self) -> &Self::Target {
246 &self.client
247 }
248}
249
250impl DerefMut for DataClientAdapter {
251 fn deref_mut(&mut self) -> &mut Self::Target {
252 &mut self.client
253 }
254}
255
256impl Debug for DataClientAdapter {
257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258 f.debug_struct("DataClientAdapter")
259 .field("client_id", &self.client_id)
260 .field("venue", &self.venue)
261 .field("handles_order_book_deltas", &self.handles_order_book_deltas)
262 .field(
263 "handles_order_book_snapshots",
264 &self.handles_order_book_snapshots,
265 )
266 .field("subscriptions_generic", &self.subscriptions_generic)
267 .field(
268 "subscriptions_order_book_delta",
269 &self.subscriptions_order_book_delta,
270 )
271 .field(
272 "subscriptions_order_book_snapshot",
273 &self.subscriptions_order_book_snapshot,
274 )
275 .field("subscriptions_quote_tick", &self.subscriptions_quote_tick)
276 .field("subscriptions_trade_tick", &self.subscriptions_trade_tick)
277 .field("subscriptions_bar", &self.subscriptions_bar)
278 .field(
279 "subscriptions_instrument_status",
280 &self.subscriptions_instrument_status,
281 )
282 .field(
283 "subscriptions_instrument_close",
284 &self.subscriptions_instrument_close,
285 )
286 .field("subscriptions_instrument", &self.subscriptions_instrument)
287 .field(
288 "subscriptions_instrument_venue",
289 &self.subscriptions_instrument_venue,
290 )
291 .finish()
292 }
293}
294
295impl DataClientAdapter {
296 #[must_use]
298 pub fn new(
299 client_id: ClientId,
300 venue: Venue,
301 handles_order_book_deltas: bool,
302 handles_order_book_snapshots: bool,
303 client: Box<dyn DataClient>,
304 clock: Rc<RefCell<TestClock>>,
305 ) -> Self {
306 Self {
307 client,
308 clock,
309 client_id,
310 venue,
311 handles_order_book_deltas,
312 handles_order_book_snapshots,
313 subscriptions_generic: HashSet::new(),
314 subscriptions_order_book_delta: HashSet::new(),
315 subscriptions_order_book_snapshot: HashSet::new(),
316 subscriptions_quote_tick: HashSet::new(),
317 subscriptions_trade_tick: HashSet::new(),
318 subscriptions_bar: HashSet::new(),
319 subscriptions_instrument_status: HashSet::new(),
320 subscriptions_instrument_close: HashSet::new(),
321 subscriptions_instrument: HashSet::new(),
322 subscriptions_instrument_venue: HashSet::new(),
323 }
324 }
325
326 pub fn through_execute(&self, command: SubscriptionCommand) {}
328
329 pub fn execute(&mut self, command: SubscriptionCommand) {
330 match command.action {
331 Action::Subscribe => self.execute_subscribe_command(command),
332 Action::Unsubscribe => self.execute_unsubscribe_command(command),
333 }
334 }
335
336 #[inline]
337 fn execute_subscribe_command(&mut self, command: SubscriptionCommand) {
338 match command.data_type.type_name() {
339 stringify!(InstrumentAny) => Self::subscribe_instrument(self, command),
340 stringify!(OrderBookDelta) => Self::subscribe_order_book_deltas(self, command),
341 stringify!(OrderBookDeltas) | stringify!(OrderBookDepth10) => {
342 Self::subscribe_snapshots(self, command);
343 }
344 stringify!(QuoteTick) => Self::subscribe_quote_ticks(self, command),
345 stringify!(TradeTick) => Self::subscribe_trade_ticks(self, command),
346 stringify!(Bar) => Self::subscribe_bars(self, command),
347 _ => Self::subscribe(self, command),
348 }
349 }
350
351 #[inline]
352 fn execute_unsubscribe_command(&mut self, command: SubscriptionCommand) {
353 match command.data_type.type_name() {
354 stringify!(InstrumentAny) => Self::unsubscribe_instrument(self, command),
355 stringify!(OrderBookDelta) => Self::unsubscribe_order_book_deltas(self, command),
356 stringify!(OrderBookDeltas) | stringify!(OrderBookDepth10) => {
357 Self::unsubscribe_snapshots(self, command);
358 }
359 stringify!(QuoteTick) => Self::unsubscribe_quote_ticks(self, command),
360 stringify!(TradeTick) => Self::unsubscribe_trade_ticks(self, command),
361 stringify!(Bar) => Self::unsubscribe_bars(self, command),
362 _ => Self::unsubscribe(self, command),
363 }
364 }
365
366 fn subscribe_instrument(&mut self, command: SubscriptionCommand) {
367 let instrument_id = command.data_type.instrument_id();
368 let venue = command.data_type.venue();
369
370 if let Some(instrument_id) = instrument_id {
371 if !self.subscriptions_instrument.contains(&instrument_id) {
374 self.client
375 .subscribe_instrument(&instrument_id, &command.params)
376 .expect("Error on subscribe");
377 }
378
379 self.subscriptions_instrument.insert(instrument_id);
380 }
381
382 if let Some(venue) = venue {
383 if !self.subscriptions_instrument_venue.contains(&venue) {
384 self.client
385 .subscribe_instruments(Some(&venue), &command.params)
386 .expect("Error on subscribe");
387 }
388
389 self.subscriptions_instrument_venue.insert(venue);
390 }
391 }
392
393 fn unsubscribe_instrument(&mut self, command: SubscriptionCommand) {
394 let instrument_id = command.data_type.instrument_id();
395 let venue = command.data_type.venue();
396
397 if let Some(instrument_id) = instrument_id {
398 if self.subscriptions_instrument.contains(&instrument_id) {
399 self.client
400 .unsubscribe_instrument(&instrument_id, &command.params)
401 .expect("Error on subscribe");
402 }
403
404 self.subscriptions_instrument.remove(&instrument_id);
405 }
406
407 if let Some(venue) = venue {
408 if self.subscriptions_instrument_venue.contains(&venue) {
409 self.client
410 .unsubscribe_instruments(Some(&venue), &command.params)
411 .expect("Error on subscribe");
412 }
413
414 self.subscriptions_instrument_venue.remove(&venue);
415 }
416 }
417
418 fn subscribe_order_book_deltas(&mut self, command: SubscriptionCommand) {
419 let instrument_id = command
420 .data_type
421 .instrument_id()
422 .expect("Error on subscribe: no 'instrument_id' in metadata");
423
424 let book_type = command.data_type.book_type();
425 let depth = command.data_type.depth();
426
427 if !self.subscriptions_order_book_delta.contains(&instrument_id) {
428 self.client
429 .subscribe_order_book_deltas(&instrument_id, book_type, depth, &command.params)
430 .expect("Error on subscribe");
431 }
432
433 self.subscriptions_order_book_delta.insert(instrument_id);
434 }
435
436 fn unsubscribe_order_book_deltas(&mut self, command: SubscriptionCommand) {
437 let instrument_id = command
438 .data_type
439 .instrument_id()
440 .expect("Error on subscribe: no 'instrument_id' in metadata");
441
442 if self.subscriptions_order_book_delta.contains(&instrument_id) {
443 self.client
444 .unsubscribe_order_book_deltas(&instrument_id, &command.params)
445 .expect("Error on subscribe");
446 }
447
448 self.subscriptions_order_book_delta.remove(&instrument_id);
449 }
450
451 fn subscribe_snapshots(&mut self, command: SubscriptionCommand) {
452 let instrument_id = command
453 .data_type
454 .instrument_id()
455 .expect("Error on subscribe: no 'instrument_id' in metadata");
456
457 let book_type = command.data_type.book_type();
458 let depth = command.data_type.depth();
459
460 if !self
461 .subscriptions_order_book_snapshot
462 .contains(&instrument_id)
463 {
464 self.client
465 .subscribe_order_book_snapshots(&instrument_id, book_type, depth, &command.params)
466 .expect("Error on subscribe");
467 }
468
469 self.subscriptions_order_book_snapshot.insert(instrument_id);
470 }
471
472 fn unsubscribe_snapshots(&mut self, command: SubscriptionCommand) {
473 let instrument_id = command
474 .data_type
475 .instrument_id()
476 .expect("Error on subscribe: no 'instrument_id' in metadata");
477
478 if self
479 .subscriptions_order_book_snapshot
480 .contains(&instrument_id)
481 {
482 self.client
483 .unsubscribe_order_book_snapshots(&instrument_id, &command.params)
484 .expect("Error on subscribe");
485 }
486
487 self.subscriptions_order_book_snapshot
488 .remove(&instrument_id);
489 }
490
491 fn subscribe_quote_ticks(&mut self, command: SubscriptionCommand) {
492 let instrument_id = command
493 .data_type
494 .instrument_id()
495 .expect("Error on subscribe: no 'instrument_id' in metadata");
496
497 if !self.subscriptions_quote_tick.contains(&instrument_id) {
498 self.client
499 .subscribe_quote_ticks(&instrument_id, &command.params)
500 .expect("Error on subscribe");
501 }
502 self.subscriptions_quote_tick.insert(instrument_id);
503 }
504
505 fn unsubscribe_quote_ticks(&mut self, command: SubscriptionCommand) {
506 let instrument_id = command
507 .data_type
508 .instrument_id()
509 .expect("Error on subscribe: no 'instrument_id' in metadata");
510
511 if self.subscriptions_quote_tick.contains(&instrument_id) {
512 self.client
513 .unsubscribe_quote_ticks(&instrument_id, &command.params)
514 .expect("Error on subscribe");
515 }
516 self.subscriptions_quote_tick.remove(&instrument_id);
517 }
518
519 fn unsubscribe_trade_ticks(&mut self, command: SubscriptionCommand) {
520 let instrument_id = command
521 .data_type
522 .instrument_id()
523 .expect("Error on subscribe: no 'instrument_id' in metadata");
524
525 if self.subscriptions_trade_tick.contains(&instrument_id) {
526 self.client
527 .unsubscribe_trade_ticks(&instrument_id, &command.params)
528 .expect("Error on subscribe");
529 }
530 self.subscriptions_trade_tick.remove(&instrument_id);
531 }
532
533 fn subscribe_trade_ticks(&mut self, command: SubscriptionCommand) {
534 let instrument_id = command
535 .data_type
536 .instrument_id()
537 .expect("Error on subscribe: no 'instrument_id' in metadata");
538
539 if !self.subscriptions_trade_tick.contains(&instrument_id) {
540 self.client
541 .subscribe_trade_ticks(&instrument_id, &command.params)
542 .expect("Error on subscribe");
543 }
544 self.subscriptions_trade_tick.insert(instrument_id);
545 }
546
547 fn subscribe_bars(&mut self, command: SubscriptionCommand) {
548 let bar_type = command.data_type.bar_type();
549
550 if !self.subscriptions_bar.contains(&bar_type) {
551 self.client
552 .subscribe_bars(&bar_type, &command.params)
553 .expect("Error on subscribe");
554 }
555 self.subscriptions_bar.insert(bar_type);
556 }
557
558 fn unsubscribe_bars(&mut self, command: SubscriptionCommand) {
559 let bar_type = command.data_type.bar_type();
560
561 if self.subscriptions_bar.contains(&bar_type) {
562 self.client
563 .subscribe_bars(&bar_type, &command.params)
564 .expect("Error on subscribe");
565 }
566 self.subscriptions_bar.remove(&bar_type);
567 }
568
569 pub fn subscribe(&mut self, command: SubscriptionCommand) {
570 let data_type = command.data_type;
571 if !self.subscriptions_generic.contains(&data_type) {
572 self.client
573 .subscribe(&data_type, &command.params)
574 .expect("Error on subscribe");
575 }
576 self.subscriptions_generic.insert(data_type);
577 }
578
579 pub fn unsubscribe(&mut self, command: SubscriptionCommand) {
580 let data_type = command.data_type;
581 if self.subscriptions_generic.contains(&data_type) {
582 self.client
583 .unsubscribe(&data_type, &command.params)
584 .expect("Error on unsubscribe");
585 }
586 self.subscriptions_generic.remove(&data_type);
587 }
588
589 pub fn through_request(&self, req: DataRequest) {
596 self.client.request_data(req);
597 }
598
599 #[must_use]
600 pub fn request(&self, req: DataRequest) -> DataResponse {
601 let instrument_id = req.data_type.instrument_id();
602 let venue = req.data_type.venue();
603 let start = req.data_type.start();
604 let end = req.data_type.end();
605 let limit = req.data_type.limit();
606
607 match req.data_type.type_name() {
608 stringify!(InstrumentAny) => match (instrument_id, venue) {
609 (None, Some(venue)) => {
610 let instruments = self.client.request_instruments(
611 req.correlation_id,
612 venue,
613 start,
614 end,
615 &req.params,
616 );
617 self.handle_instruments(venue, instruments, req.correlation_id)
618 }
619 (Some(instrument_id), None) => {
620 let instrument = self.client.request_instrument(
621 req.correlation_id,
622 instrument_id,
623 start,
624 end,
625 &req.params,
626 );
627 self.handle_instrument(instrument, req.correlation_id)
628 }
629 _ => {
630 todo!()
631 }
632 },
633 stringify!(QuoteTick) => {
634 let instrument_id =
635 instrument_id.expect("Error on request: no 'instrument_id' found in metadata");
636 let quotes = self.client.request_quote_ticks(
637 req.correlation_id,
638 instrument_id,
639 start,
640 end,
641 limit,
642 &req.params,
643 );
644 self.handle_quote_ticks(&instrument_id, quotes, req.correlation_id)
645 }
646 stringify!(TradeTick) => {
647 let instrument_id =
648 instrument_id.expect("Error on request: no 'instrument_id' found in metadata");
649 let trades = self.client.request_trade_ticks(
650 req.correlation_id,
651 instrument_id,
652 start,
653 end,
654 limit,
655 &req.params,
656 );
657 self.handle_trade_ticks(&instrument_id, trades, req.correlation_id)
658 }
659 stringify!(Bar) => {
660 let bar_type = req.data_type.bar_type();
661 let bars = self.client.request_bars(
662 req.correlation_id,
663 bar_type,
664 start,
665 end,
666 limit,
667 &req.params,
668 );
669 self.handle_bars(&bar_type, bars, req.correlation_id)
670 }
671 _ => {
672 todo!()
673 }
674 }
675 }
676
677 #[must_use]
678 pub fn handle_instrument(
679 &self,
680 instrument: InstrumentAny,
681 correlation_id: UUID4,
682 ) -> DataResponse {
683 let instrument_id = instrument.id();
684 let metadata = IndexMap::from([("instrument_id".to_string(), instrument_id.to_string())]);
685 let data_type = DataType::new(stringify!(InstrumentAny), Some(metadata));
686 let data = Arc::new(instrument);
687
688 DataResponse::new(
689 correlation_id,
690 self.client_id,
691 instrument_id.venue,
692 data_type,
693 data,
694 self.clock.borrow().timestamp_ns(),
695 None,
696 )
697 }
698
699 #[must_use]
700 pub fn handle_instruments(
701 &self,
702 venue: Venue,
703 instruments: Vec<InstrumentAny>,
704 correlation_id: UUID4,
705 ) -> DataResponse {
706 let metadata = IndexMap::from([("venue".to_string(), venue.to_string())]);
707 let data_type = DataType::new(stringify!(InstrumentAny), Some(metadata));
708 let data = Arc::new(instruments);
709
710 DataResponse::new(
711 correlation_id,
712 self.client_id,
713 venue,
714 data_type,
715 data,
716 self.clock.borrow().timestamp_ns(),
717 None,
718 )
719 }
720
721 #[must_use]
722 pub fn handle_quote_ticks(
723 &self,
724 instrument_id: &InstrumentId,
725 quotes: Vec<QuoteTick>,
726 correlation_id: UUID4,
727 ) -> DataResponse {
728 let metadata = IndexMap::from([("instrument_id".to_string(), instrument_id.to_string())]);
729 let data_type = DataType::new(stringify!(QuoteTick), Some(metadata));
730 let data = Arc::new(quotes);
731
732 DataResponse::new(
733 correlation_id,
734 self.client_id,
735 instrument_id.venue,
736 data_type,
737 data,
738 self.clock.borrow().timestamp_ns(),
739 None,
740 )
741 }
742
743 #[must_use]
744 pub fn handle_trade_ticks(
745 &self,
746 instrument_id: &InstrumentId,
747 trades: Vec<TradeTick>,
748 correlation_id: UUID4,
749 ) -> DataResponse {
750 let metadata = IndexMap::from([("instrument_id".to_string(), instrument_id.to_string())]);
751 let data_type = DataType::new(stringify!(TradeTick), Some(metadata));
752 let data = Arc::new(trades);
753
754 DataResponse::new(
755 correlation_id,
756 self.client_id,
757 instrument_id.venue,
758 data_type,
759 data,
760 self.clock.borrow().timestamp_ns(),
761 None,
762 )
763 }
764
765 #[must_use]
766 pub fn handle_bars(
767 &self,
768 bar_type: &BarType,
769 bars: Vec<Bar>,
770 correlation_id: UUID4,
771 ) -> DataResponse {
772 let metadata = IndexMap::from([("bar_type".to_string(), bar_type.to_string())]);
773 let data_type = DataType::new(stringify!(Bar), Some(metadata));
774 let data = Arc::new(bars);
775
776 DataResponse::new(
777 correlation_id,
778 self.client_id,
779 bar_type.instrument_id().venue,
780 data_type,
781 data,
782 self.clock.borrow().timestamp_ns(),
783 None,
784 )
785 }
786}