1#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23 cell::RefCell,
24 collections::HashSet,
25 fmt::Debug,
26 ops::{Deref, DerefMut},
27 rc::Rc,
28 sync::Arc,
29};
30
31use indexmap::IndexMap;
32use nautilus_common::{
33 clock::Clock,
34 messages::data::{
35 DataResponse, RequestBars, RequestBookSnapshot, RequestData, RequestInstrument,
36 RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
37 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, SubscribeData,
38 SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
39 SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
40 SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
41 UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeData, UnsubscribeIndexPrices,
42 UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
43 UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
44 },
45};
46use nautilus_core::UUID4;
47use nautilus_model::{
48 data::{Bar, BarType, DataType, QuoteTick, TradeTick},
49 identifiers::{ClientId, InstrumentId, Venue},
50 instruments::{Instrument, InstrumentAny},
51};
52
53pub trait DataClient {
54 fn client_id(&self) -> ClientId;
55 fn venue(&self) -> Option<Venue>;
56 fn start(&self);
57 fn stop(&self);
58 fn reset(&self);
59 fn dispose(&self);
60 fn is_connected(&self) -> bool;
61 fn is_disconnected(&self) -> bool;
62
63 fn subscribe(&mut self, cmd: SubscribeData) -> anyhow::Result<()> {
69 Ok(())
70 }
71 fn subscribe_instruments(&mut self, cmd: SubscribeInstruments) -> anyhow::Result<()> {
72 Ok(())
73 }
74 fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
75 Ok(())
76 }
77 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
78 Ok(())
79 }
80 fn subscribe_book_depth10(&mut self, cmd: SubscribeBookDepth10) -> anyhow::Result<()> {
81 Ok(())
82 }
83 fn subscribe_book_snapshots(&mut self, cmd: SubscribeBookSnapshots) -> anyhow::Result<()> {
84 Ok(())
85 }
86 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
87 Ok(())
88 }
89 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
90 Ok(())
91 }
92 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
93 Ok(())
94 }
95 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
96 Ok(())
97 }
98 fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
99 Ok(())
100 }
101 fn subscribe_instrument_status(
102 &mut self,
103 cmd: SubscribeInstrumentStatus,
104 ) -> anyhow::Result<()> {
105 Ok(())
106 }
107 fn subscribe_instrument_close(&mut self, cmd: SubscribeInstrumentClose) -> anyhow::Result<()> {
108 Ok(())
109 }
110 fn unsubscribe(&mut self, cmd: UnsubscribeData) -> anyhow::Result<()> {
111 Ok(())
112 }
113 fn unsubscribe_instruments(&mut self, cmd: UnsubscribeInstruments) -> anyhow::Result<()> {
114 Ok(())
115 }
116 fn unsubscribe_instrument(&mut self, cmd: UnsubscribeInstrument) -> anyhow::Result<()> {
117 Ok(())
118 }
119 fn unsubscribe_book_deltas(&mut self, cmd: UnsubscribeBookDeltas) -> anyhow::Result<()> {
120 Ok(())
121 }
122 fn unsubscribe_book_depth10(&mut self, cmd: UnsubscribeBookDepth10) -> anyhow::Result<()> {
123 Ok(())
124 }
125 fn unsubscribe_book_snapshots(&mut self, cmd: UnsubscribeBookSnapshots) -> anyhow::Result<()> {
126 Ok(())
127 }
128 fn unsubscribe_quotes(&mut self, cmd: UnsubscribeQuotes) -> anyhow::Result<()> {
129 Ok(())
130 }
131 fn unsubscribe_trades(&mut self, cmd: UnsubscribeTrades) -> anyhow::Result<()> {
132 Ok(())
133 }
134 fn unsubscribe_mark_prices(&mut self, cmd: UnsubscribeMarkPrices) -> anyhow::Result<()> {
135 Ok(())
136 }
137 fn unsubscribe_index_prices(&mut self, cmd: UnsubscribeIndexPrices) -> anyhow::Result<()> {
138 Ok(())
139 }
140 fn unsubscribe_bars(&mut self, cmd: UnsubscribeBars) -> anyhow::Result<()> {
141 Ok(())
142 }
143 fn unsubscribe_instrument_status(
144 &mut self,
145 cmd: UnsubscribeInstrumentStatus,
146 ) -> anyhow::Result<()> {
147 Ok(())
148 }
149 fn unsubscribe_instrument_close(
150 &mut self,
151 cmd: UnsubscribeInstrumentClose,
152 ) -> anyhow::Result<()> {
153 Ok(())
154 }
155
156 fn request_data(&self, request: RequestData) -> anyhow::Result<()>;
157
158 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
160 Ok(())
161 }
162
163 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
165 Ok(())
166 }
167
168 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
170 Ok(())
171 }
172
173 fn request_quotes(&self, request: RequestQuotes) -> anyhow::Result<()> {
175 Ok(())
176 }
177
178 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
180 Ok(())
181 }
182
183 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
185 Ok(())
186 }
187}
188
189pub struct DataClientAdapter {
190 client: Box<dyn DataClient>,
191 clock: Rc<RefCell<dyn Clock>>,
192 pub client_id: ClientId,
193 pub venue: Venue,
194 pub handles_book_deltas: bool,
195 pub handles_book_snapshots: bool,
196 pub subscriptions_generic: HashSet<DataType>,
197 pub subscriptions_book_deltas: HashSet<InstrumentId>,
198 pub subscriptions_book_depth10: HashSet<InstrumentId>,
199 pub subscriptions_book_snapshots: HashSet<InstrumentId>,
200 pub subscriptions_quotes: HashSet<InstrumentId>,
201 pub subscriptions_trades: HashSet<InstrumentId>,
202 pub subscriptions_bars: HashSet<BarType>,
203 pub subscriptions_instrument_status: HashSet<InstrumentId>,
204 pub subscriptions_instrument_close: HashSet<InstrumentId>,
205 pub subscriptions_instrument: HashSet<InstrumentId>,
206 pub subscriptions_instrument_venue: HashSet<Venue>,
207 pub subscriptions_mark_prices: HashSet<InstrumentId>,
208 pub subscriptions_index_prices: HashSet<InstrumentId>,
209}
210
211impl Deref for DataClientAdapter {
212 type Target = Box<dyn DataClient>;
213
214 fn deref(&self) -> &Self::Target {
215 &self.client
216 }
217}
218
219impl DerefMut for DataClientAdapter {
220 fn deref_mut(&mut self) -> &mut Self::Target {
221 &mut self.client
222 }
223}
224
225impl Debug for DataClientAdapter {
226 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227 f.debug_struct("DataClientAdapter")
228 .field("client_id", &self.client_id)
229 .field("venue", &self.venue)
230 .field("handles_book_deltas", &self.handles_book_deltas)
231 .field("handles_book_snapshots", &self.handles_book_snapshots)
232 .field("subscriptions_generic", &self.subscriptions_generic)
233 .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
234 .field(
235 "subscriptions_book_depth10",
236 &self.subscriptions_book_depth10,
237 )
238 .field(
239 "subscriptions_book_snapshot",
240 &self.subscriptions_book_snapshots,
241 )
242 .field("subscriptions_quotes", &self.subscriptions_quotes)
243 .field("subscriptions_trades", &self.subscriptions_trades)
244 .field("subscriptions_bars", &self.subscriptions_bars)
245 .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
246 .field(
247 "subscriptions_index_prices",
248 &self.subscriptions_index_prices,
249 )
250 .field(
251 "subscriptions_instrument_status",
252 &self.subscriptions_instrument_status,
253 )
254 .field(
255 "subscriptions_instrument_close",
256 &self.subscriptions_instrument_close,
257 )
258 .field("subscriptions_instrument", &self.subscriptions_instrument)
259 .field(
260 "subscriptions_instrument_venue",
261 &self.subscriptions_instrument_venue,
262 )
263 .finish()
264 }
265}
266
267impl DataClientAdapter {
268 #[must_use]
270 pub fn new(
271 client_id: ClientId,
272 venue: Venue,
273 handles_order_book_deltas: bool,
274 handles_order_book_snapshots: bool,
275 client: Box<dyn DataClient>,
276 clock: Rc<RefCell<dyn Clock>>,
277 ) -> Self {
278 Self {
279 client,
280 clock,
281 client_id,
282 venue,
283 handles_book_deltas: handles_order_book_deltas,
284 handles_book_snapshots: handles_order_book_snapshots,
285 subscriptions_generic: HashSet::new(),
286 subscriptions_book_deltas: HashSet::new(),
287 subscriptions_book_depth10: HashSet::new(),
288 subscriptions_book_snapshots: HashSet::new(),
289 subscriptions_quotes: HashSet::new(),
290 subscriptions_trades: HashSet::new(),
291 subscriptions_mark_prices: HashSet::new(),
292 subscriptions_index_prices: HashSet::new(),
293 subscriptions_bars: HashSet::new(),
294 subscriptions_instrument_status: HashSet::new(),
295 subscriptions_instrument_close: HashSet::new(),
296 subscriptions_instrument: HashSet::new(),
297 subscriptions_instrument_venue: HashSet::new(),
298 }
299 }
300
301 pub fn through_execute(&self, command: SubscribeCommand) {}
303
304 #[inline]
313 pub fn execute_subscribe_command(&mut self, cmd: SubscribeCommand) {
314 let result = match cmd.clone() {
315 SubscribeCommand::Data(cmd) => self.subscribe(cmd),
316 SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
317 SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
318 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
319 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
320 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
321 SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
322 SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
323 SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
324 SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
325 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
326 SubscribeCommand::InstrumentStatus(cmd) => todo!(),
327 SubscribeCommand::InstrumentClose(cmd) => todo!(),
328 };
329
330 if let Err(e) = result {
331 log::debug!("Error on subscribe: {cmd:?}");
332 }
333 }
334
335 #[inline]
336 pub fn execute_unsubscribe_command(&mut self, cmd: UnsubscribeCommand) {
337 let result = match cmd.clone() {
338 UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
339 UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
340 UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
341 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
342 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
343 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
344 UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
345 UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
346 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
347 UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
348 UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
349 UnsubscribeCommand::InstrumentStatus(cmd) => todo!(),
350 UnsubscribeCommand::InstrumentClose(cmd) => todo!(),
351 };
352
353 if let Err(e) = result {
354 log::debug!("Error on unsubscribe: {cmd:?}");
355 }
356 }
357
358 fn subscribe_instruments(&mut self, cmd: SubscribeInstruments) -> anyhow::Result<()> {
359 if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
360 self.subscriptions_instrument_venue.insert(cmd.venue);
361 self.client.subscribe_instruments(cmd)?;
362 }
363
364 Ok(())
365 }
366
367 fn unsubscribe_instruments(&mut self, cmd: UnsubscribeInstruments) -> anyhow::Result<()> {
368 if self.subscriptions_instrument_venue.contains(&cmd.venue) {
369 self.subscriptions_instrument_venue.remove(&cmd.venue);
370 self.client.unsubscribe_instruments(cmd)?;
371 }
372
373 Ok(())
374 }
375
376 fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
377 if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
378 self.subscriptions_instrument.insert(cmd.instrument_id);
379 self.client.subscribe_instrument(cmd)?;
380 }
381
382 Ok(())
383 }
384
385 fn unsubscribe_instrument(&mut self, cmd: UnsubscribeInstrument) -> anyhow::Result<()> {
386 if self.subscriptions_instrument.contains(&cmd.instrument_id) {
387 self.subscriptions_instrument.remove(&cmd.instrument_id);
388 self.client.unsubscribe_instrument(cmd)?;
389 }
390
391 Ok(())
392 }
393
394 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
395 if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
396 self.subscriptions_book_deltas.insert(cmd.instrument_id);
397 self.client.subscribe_book_deltas(cmd)?;
398 }
399
400 Ok(())
401 }
402
403 fn unsubscribe_book_deltas(&mut self, cmd: UnsubscribeBookDeltas) -> anyhow::Result<()> {
404 if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
405 self.subscriptions_book_deltas.remove(&cmd.instrument_id);
406 self.client.unsubscribe_book_deltas(cmd)?;
407 }
408
409 Ok(())
410 }
411
412 fn subscribe_book_depth10(&mut self, cmd: SubscribeBookDepth10) -> anyhow::Result<()> {
413 if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
414 self.subscriptions_book_depth10.insert(cmd.instrument_id);
415 self.client.subscribe_book_depth10(cmd)?;
416 }
417
418 Ok(())
419 }
420
421 fn unsubscribe_book_depth10(&mut self, cmd: UnsubscribeBookDepth10) -> anyhow::Result<()> {
422 if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
423 self.subscriptions_book_depth10.remove(&cmd.instrument_id);
424 self.client.unsubscribe_book_depth10(cmd)?;
425 }
426
427 Ok(())
428 }
429
430 fn subscribe_book_snapshots(&mut self, cmd: SubscribeBookSnapshots) -> anyhow::Result<()> {
431 if !self
432 .subscriptions_book_snapshots
433 .contains(&cmd.instrument_id)
434 {
435 self.subscriptions_book_snapshots.insert(cmd.instrument_id);
436 self.client.subscribe_book_snapshots(cmd)?;
437 }
438
439 Ok(())
440 }
441
442 fn unsubscribe_snapshots(&mut self, cmd: UnsubscribeBookSnapshots) -> anyhow::Result<()> {
443 if self
444 .subscriptions_book_snapshots
445 .contains(&cmd.instrument_id)
446 {
447 self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
448 self.client.unsubscribe_book_snapshots(cmd)?;
449 }
450
451 Ok(())
452 }
453
454 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
455 if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
456 self.subscriptions_quotes.insert(cmd.instrument_id);
457 self.client.subscribe_quotes(cmd)?;
458 }
459 Ok(())
460 }
461
462 fn unsubscribe_quotes(&mut self, cmd: UnsubscribeQuotes) -> anyhow::Result<()> {
463 if self.subscriptions_quotes.contains(&cmd.instrument_id) {
464 self.subscriptions_quotes.remove(&cmd.instrument_id);
465 self.client.unsubscribe_quotes(cmd)?;
466 }
467 Ok(())
468 }
469
470 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
471 if !self.subscriptions_trades.contains(&cmd.instrument_id) {
472 self.subscriptions_trades.insert(cmd.instrument_id);
473 self.client.subscribe_trades(cmd)?;
474 }
475 Ok(())
476 }
477
478 fn unsubscribe_trades(&mut self, cmd: UnsubscribeTrades) -> anyhow::Result<()> {
479 if self.subscriptions_trades.contains(&cmd.instrument_id) {
480 self.subscriptions_trades.remove(&cmd.instrument_id);
481 self.client.unsubscribe_trades(cmd)?;
482 }
483 Ok(())
484 }
485
486 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
487 if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
488 self.subscriptions_mark_prices.insert(cmd.instrument_id);
489 self.client.subscribe_mark_prices(cmd)?;
490 }
491 Ok(())
492 }
493
494 fn unsubscribe_mark_prices(&mut self, cmd: UnsubscribeMarkPrices) -> anyhow::Result<()> {
495 if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
496 self.subscriptions_mark_prices.remove(&cmd.instrument_id);
497 self.client.unsubscribe_mark_prices(cmd)?;
498 }
499 Ok(())
500 }
501
502 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
503 if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
504 self.subscriptions_index_prices.insert(cmd.instrument_id);
505 self.client.subscribe_index_prices(cmd)?;
506 }
507 Ok(())
508 }
509
510 fn unsubscribe_index_prices(&mut self, cmd: UnsubscribeIndexPrices) -> anyhow::Result<()> {
511 if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
512 self.subscriptions_index_prices.remove(&cmd.instrument_id);
513 self.client.unsubscribe_index_prices(cmd)?;
514 }
515 Ok(())
516 }
517
518 fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
519 if !self.subscriptions_bars.contains(&cmd.bar_type) {
520 self.subscriptions_bars.insert(cmd.bar_type);
521 self.client.subscribe_bars(cmd)?;
522 }
523 Ok(())
524 }
525
526 fn unsubscribe_bars(&mut self, cmd: UnsubscribeBars) -> anyhow::Result<()> {
527 if self.subscriptions_bars.contains(&cmd.bar_type) {
528 self.subscriptions_bars.remove(&cmd.bar_type);
529 self.client.unsubscribe_bars(cmd)?;
530 }
531 Ok(())
532 }
533
534 pub fn subscribe(&mut self, cmd: SubscribeData) -> anyhow::Result<()> {
535 if !self.subscriptions_generic.contains(&cmd.data_type) {
536 self.subscriptions_generic.insert(cmd.data_type.clone());
537 self.client.subscribe(cmd)?;
538 }
539 Ok(())
540 }
541
542 pub fn unsubscribe(&mut self, cmd: UnsubscribeData) -> anyhow::Result<()> {
543 if self.subscriptions_generic.contains(&cmd.data_type) {
544 self.subscriptions_generic.remove(&cmd.data_type);
545 self.client.unsubscribe(cmd)?;
546 }
547 Ok(())
548 }
549
550 pub fn request_data(&self, req: RequestData) -> anyhow::Result<()> {
553 self.client.request_data(req)
554 }
555
556 pub fn request_instrument(&self, req: RequestInstrument) -> anyhow::Result<()> {
557 self.client.request_instrument(req)
558 }
559
560 pub fn request_instruments(&self, req: RequestInstruments) -> anyhow::Result<()> {
561 self.client.request_instruments(req)
562 }
563
564 pub fn request_quotes(&self, req: RequestQuotes) -> anyhow::Result<()> {
565 self.client.request_quotes(req)
566 }
567
568 pub fn request_trades(&self, req: RequestTrades) -> anyhow::Result<()> {
569 self.client.request_trades(req)
570 }
571
572 pub fn request_bars(&self, req: RequestBars) -> anyhow::Result<()> {
573 self.client.request_bars(req)
574 }
575
576 #[must_use]
577 pub fn handle_instrument(
578 &self,
579 instrument: InstrumentAny,
580 correlation_id: UUID4,
581 ) -> DataResponse {
582 let instrument_id = instrument.id();
583 let metadata = IndexMap::from([("instrument_id".to_string(), instrument_id.to_string())]);
584 let data_type = DataType::new(stringify!(InstrumentAny), Some(metadata));
585 let data = Arc::new(instrument);
586
587 DataResponse::new(
588 correlation_id,
589 self.client_id,
590 instrument_id.venue,
591 data_type,
592 data,
593 self.clock.borrow().timestamp_ns(),
594 None,
595 )
596 }
597
598 #[must_use]
599 pub fn handle_instruments(
600 &self,
601 venue: Venue,
602 instruments: Vec<InstrumentAny>,
603 correlation_id: UUID4,
604 ) -> DataResponse {
605 let metadata = IndexMap::from([("venue".to_string(), venue.to_string())]);
606 let data_type = DataType::new(stringify!(InstrumentAny), Some(metadata));
607 let data = Arc::new(instruments);
608
609 DataResponse::new(
610 correlation_id,
611 self.client_id,
612 venue,
613 data_type,
614 data,
615 self.clock.borrow().timestamp_ns(),
616 None,
617 )
618 }
619
620 #[must_use]
621 pub fn handle_quotes(
622 &self,
623 instrument_id: &InstrumentId,
624 quotes: Vec<QuoteTick>,
625 correlation_id: UUID4,
626 ) -> DataResponse {
627 let metadata = IndexMap::from([("instrument_id".to_string(), instrument_id.to_string())]);
628 let data_type = DataType::new(stringify!(QuoteTick), Some(metadata));
629 let data = Arc::new(quotes);
630
631 DataResponse::new(
632 correlation_id,
633 self.client_id,
634 instrument_id.venue,
635 data_type,
636 data,
637 self.clock.borrow().timestamp_ns(),
638 None,
639 )
640 }
641
642 #[must_use]
643 pub fn handle_trades(
644 &self,
645 instrument_id: &InstrumentId,
646 trades: Vec<TradeTick>,
647 correlation_id: UUID4,
648 ) -> DataResponse {
649 let metadata = IndexMap::from([("instrument_id".to_string(), instrument_id.to_string())]);
650 let data_type = DataType::new(stringify!(TradeTick), Some(metadata));
651 let data = Arc::new(trades);
652
653 DataResponse::new(
654 correlation_id,
655 self.client_id,
656 instrument_id.venue,
657 data_type,
658 data,
659 self.clock.borrow().timestamp_ns(),
660 None,
661 )
662 }
663
664 #[must_use]
665 pub fn handle_bars(
666 &self,
667 bar_type: &BarType,
668 bars: Vec<Bar>,
669 correlation_id: UUID4,
670 ) -> DataResponse {
671 let metadata = IndexMap::from([("bar_type".to_string(), bar_type.to_string())]);
672 let data_type = DataType::new(stringify!(Bar), Some(metadata));
673 let data = Arc::new(bars);
674
675 DataResponse::new(
676 correlation_id,
677 self.client_id,
678 bar_type.instrument_id().venue,
679 data_type,
680 data,
681 self.clock.borrow().timestamp_ns(),
682 None,
683 )
684 }
685}