1pub mod config;
19pub mod database;
20
21mod index;
22
23#[cfg(test)]
24mod tests;
25
26use std::{
27 collections::{HashMap, HashSet, VecDeque},
28 time::{SystemTime, UNIX_EPOCH},
29};
30
31use bytes::Bytes;
32pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
34use index::CacheIndex;
35use nautilus_core::{
36 UUID4,
37 correctness::{
38 FAILED, check_key_not_in_map, check_predicate_false, check_slice_not_empty,
39 check_valid_string,
40 },
41};
42use nautilus_model::{
43 accounts::AccountAny,
44 data::{Bar, BarType, QuoteTick, TradeTick},
45 enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
46 identifiers::{
47 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
48 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
49 },
50 instruments::{InstrumentAny, SyntheticInstrument},
51 orderbook::OrderBook,
52 orders::{OrderAny, OrderList},
53 position::Position,
54 types::{Currency, Money, Price, Quantity},
55};
56use ustr::Ustr;
57
58use crate::xrate::get_exchange_rate;
59
60pub struct Cache {
62 config: CacheConfig,
63 index: CacheIndex,
64 database: Option<Box<dyn CacheDatabaseAdapter>>,
65 general: HashMap<String, Bytes>,
66 quotes: HashMap<InstrumentId, VecDeque<QuoteTick>>,
67 trades: HashMap<InstrumentId, VecDeque<TradeTick>>,
68 mark_prices: HashMap<InstrumentId, Price>,
69 mark_xrates: HashMap<(Currency, Currency), f64>,
70 books: HashMap<InstrumentId, OrderBook>,
71 bars: HashMap<BarType, VecDeque<Bar>>,
72 currencies: HashMap<Ustr, Currency>,
73 instruments: HashMap<InstrumentId, InstrumentAny>,
74 synthetics: HashMap<InstrumentId, SyntheticInstrument>,
75 accounts: HashMap<AccountId, AccountAny>,
76 orders: HashMap<ClientOrderId, OrderAny>,
77 order_lists: HashMap<OrderListId, OrderList>,
78 positions: HashMap<PositionId, Position>,
79 position_snapshots: HashMap<PositionId, Bytes>,
80}
81
82impl Default for Cache {
83 fn default() -> Self {
85 Self::new(Some(CacheConfig::default()), None)
86 }
87}
88
89impl Cache {
90 #[must_use]
92 pub fn new(
93 config: Option<CacheConfig>,
94 database: Option<Box<dyn CacheDatabaseAdapter>>,
95 ) -> Self {
96 Self {
97 config: config.unwrap_or_default(),
98 index: CacheIndex::default(),
99 database,
100 general: HashMap::new(),
101 mark_prices: HashMap::new(),
102 mark_xrates: HashMap::new(),
103 quotes: HashMap::new(),
104 trades: HashMap::new(),
105 books: HashMap::new(),
106 bars: HashMap::new(),
107 currencies: HashMap::new(),
108 instruments: HashMap::new(),
109 synthetics: HashMap::new(),
110 accounts: HashMap::new(),
111 orders: HashMap::new(),
112 order_lists: HashMap::new(),
113 positions: HashMap::new(),
114 position_snapshots: HashMap::new(),
115 }
116 }
117
118 #[must_use]
120 pub fn memory_address(&self) -> String {
121 format!("{:?}", std::ptr::from_ref(self))
122 }
123
124 pub fn cache_general(&mut self) -> anyhow::Result<()> {
128 self.general = match &mut self.database {
129 Some(db) => db.load()?,
130 None => HashMap::new(),
131 };
132
133 log::info!(
134 "Cached {} general object(s) from database",
135 self.general.len()
136 );
137 Ok(())
138 }
139
140 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
142 let cache_map = match &self.database {
143 Some(db) => db.load_all().await?,
144 None => CacheMap::default(),
145 };
146
147 self.currencies = cache_map.currencies;
148 self.instruments = cache_map.instruments;
149 self.synthetics = cache_map.synthetics;
150 self.accounts = cache_map.accounts;
151 self.orders = cache_map.orders;
152 self.positions = cache_map.positions;
153 Ok(())
154 }
155
156 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
158 self.currencies = match &mut self.database {
159 Some(db) => db.load_currencies().await?,
160 None => HashMap::new(),
161 };
162
163 log::info!("Cached {} currencies from database", self.general.len());
164 Ok(())
165 }
166
167 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
169 self.instruments = match &mut self.database {
170 Some(db) => db.load_instruments().await?,
171 None => HashMap::new(),
172 };
173
174 log::info!("Cached {} instruments from database", self.general.len());
175 Ok(())
176 }
177
178 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
181 self.synthetics = match &mut self.database {
182 Some(db) => db.load_synthetics().await?,
183 None => HashMap::new(),
184 };
185
186 log::info!(
187 "Cached {} synthetic instruments from database",
188 self.general.len()
189 );
190 Ok(())
191 }
192
193 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
195 self.accounts = match &mut self.database {
196 Some(db) => db.load_accounts().await?,
197 None => HashMap::new(),
198 };
199
200 log::info!(
201 "Cached {} synthetic instruments from database",
202 self.general.len()
203 );
204 Ok(())
205 }
206
207 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
209 self.orders = match &mut self.database {
210 Some(db) => db.load_orders().await?,
211 None => HashMap::new(),
212 };
213
214 log::info!("Cached {} orders from database", self.general.len());
215 Ok(())
216 }
217
218 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
220 self.positions = match &mut self.database {
221 Some(db) => db.load_positions().await?,
222 None => HashMap::new(),
223 };
224
225 log::info!("Cached {} positions from database", self.general.len());
226 Ok(())
227 }
228
229 pub fn build_index(&mut self) {
231 self.index.clear();
232 log::debug!("Building index");
233
234 for account_id in self.accounts.keys() {
236 self.index
237 .venue_account
238 .insert(account_id.get_issuer(), *account_id);
239 }
240
241 for (client_order_id, order) in &self.orders {
243 let instrument_id = order.instrument_id();
244 let venue = instrument_id.venue;
245 let strategy_id = order.strategy_id();
246
247 self.index
249 .venue_orders
250 .entry(venue)
251 .or_default()
252 .insert(*client_order_id);
253
254 if let Some(venue_order_id) = order.venue_order_id() {
256 self.index
257 .venue_order_ids
258 .insert(venue_order_id, *client_order_id);
259 }
260
261 if let Some(position_id) = order.position_id() {
263 self.index
264 .order_position
265 .insert(*client_order_id, position_id);
266 }
267
268 self.index
270 .order_strategy
271 .insert(*client_order_id, order.strategy_id());
272
273 self.index
275 .instrument_orders
276 .entry(instrument_id)
277 .or_default()
278 .insert(*client_order_id);
279
280 self.index
282 .strategy_orders
283 .entry(strategy_id)
284 .or_default()
285 .insert(*client_order_id);
286
287 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
289 self.index
290 .exec_algorithm_orders
291 .entry(exec_algorithm_id)
292 .or_default()
293 .insert(*client_order_id);
294 }
295
296 if let Some(exec_spawn_id) = order.exec_spawn_id() {
298 self.index
299 .exec_spawn_orders
300 .entry(exec_spawn_id)
301 .or_default()
302 .insert(*client_order_id);
303 }
304
305 self.index.orders.insert(*client_order_id);
307
308 if order.is_open() {
310 self.index.orders_open.insert(*client_order_id);
311 }
312
313 if order.is_closed() {
315 self.index.orders_closed.insert(*client_order_id);
316 }
317
318 if let Some(emulation_trigger) = order.emulation_trigger() {
320 if emulation_trigger != TriggerType::NoTrigger && !order.is_closed() {
321 self.index.orders_emulated.insert(*client_order_id);
322 }
323 }
324
325 if order.is_inflight() {
327 self.index.orders_inflight.insert(*client_order_id);
328 }
329
330 self.index.strategies.insert(strategy_id);
332
333 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
335 self.index.exec_algorithms.insert(exec_algorithm_id);
336 }
337 }
338
339 for (position_id, position) in &self.positions {
341 let instrument_id = position.instrument_id;
342 let venue = instrument_id.venue;
343 let strategy_id = position.strategy_id;
344
345 self.index
347 .venue_positions
348 .entry(venue)
349 .or_default()
350 .insert(*position_id);
351
352 self.index
354 .position_strategy
355 .insert(*position_id, position.strategy_id);
356
357 self.index
359 .position_orders
360 .entry(*position_id)
361 .or_default()
362 .extend(position.client_order_ids().into_iter());
363
364 self.index
366 .instrument_positions
367 .entry(instrument_id)
368 .or_default()
369 .insert(*position_id);
370
371 self.index
373 .strategy_positions
374 .entry(strategy_id)
375 .or_default()
376 .insert(*position_id);
377
378 self.index.positions.insert(*position_id);
380
381 if position.is_open() {
383 self.index.positions_open.insert(*position_id);
384 }
385
386 if position.is_closed() {
388 self.index.positions_closed.insert(*position_id);
389 }
390
391 self.index.strategies.insert(strategy_id);
393 }
394 }
395
396 #[must_use]
398 pub const fn has_backing(&self) -> bool {
399 self.config.database.is_some()
400 }
401
402 #[must_use]
404 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
405 let quote = if let Some(quote) = self.quote(&position.instrument_id) {
406 quote
407 } else {
408 log::warn!(
409 "Cannot calculate unrealized PnL for {}, no quotes for {}",
410 position.id,
411 position.instrument_id
412 );
413 return None;
414 };
415
416 let last = match position.side {
417 PositionSide::Flat | PositionSide::NoPositionSide => {
418 return Some(Money::new(0.0, position.settlement_currency));
419 }
420 PositionSide::Long => quote.ask_price,
421 PositionSide::Short => quote.bid_price,
422 };
423
424 Some(position.unrealized_pnl(last))
425 }
426
427 #[must_use]
432 pub fn check_integrity(&mut self) -> bool {
433 let mut error_count = 0;
434 let failure = "Integrity failure";
435
436 let timestamp_us = SystemTime::now()
438 .duration_since(UNIX_EPOCH)
439 .expect("Time went backwards")
440 .as_micros();
441
442 log::info!("Checking data integrity");
443
444 for account_id in self.accounts.keys() {
446 if !self
447 .index
448 .venue_account
449 .contains_key(&account_id.get_issuer())
450 {
451 log::error!(
452 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
453 );
454 error_count += 1;
455 }
456 }
457
458 for (client_order_id, order) in &self.orders {
459 if !self.index.order_strategy.contains_key(client_order_id) {
460 log::error!(
461 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
462 );
463 error_count += 1;
464 }
465 if !self.index.orders.contains(client_order_id) {
466 log::error!(
467 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
468 );
469 error_count += 1;
470 }
471 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
472 log::error!(
473 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
474 );
475 error_count += 1;
476 }
477 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
478 log::error!(
479 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
480 );
481 error_count += 1;
482 }
483 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
484 log::error!(
485 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
486 );
487 error_count += 1;
488 }
489 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
490 if !self
491 .index
492 .exec_algorithm_orders
493 .contains_key(&exec_algorithm_id)
494 {
495 log::error!(
496 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
497 );
498 error_count += 1;
499 }
500 if order.exec_spawn_id().is_none()
501 && !self.index.exec_spawn_orders.contains_key(client_order_id)
502 {
503 log::error!(
504 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
505 );
506 error_count += 1;
507 }
508 }
509 }
510
511 for (position_id, position) in &self.positions {
512 if !self.index.position_strategy.contains_key(position_id) {
513 log::error!(
514 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
515 );
516 error_count += 1;
517 }
518 if !self.index.position_orders.contains_key(position_id) {
519 log::error!(
520 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
521 );
522 error_count += 1;
523 }
524 if !self.index.positions.contains(position_id) {
525 log::error!(
526 "{failure} in positions: {position_id} not found in `self.index.positions`",
527 );
528 error_count += 1;
529 }
530 if position.is_open() && !self.index.positions_open.contains(position_id) {
531 log::error!(
532 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
533 );
534 error_count += 1;
535 }
536 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
537 log::error!(
538 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
539 );
540 error_count += 1;
541 }
542 }
543
544 for account_id in self.index.venue_account.values() {
546 if !self.accounts.contains_key(account_id) {
547 log::error!(
548 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
549 );
550 error_count += 1;
551 }
552 }
553
554 for client_order_id in self.index.venue_order_ids.values() {
555 if !self.orders.contains_key(client_order_id) {
556 log::error!(
557 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
558 );
559 error_count += 1;
560 }
561 }
562
563 for client_order_id in self.index.client_order_ids.keys() {
564 if !self.orders.contains_key(client_order_id) {
565 log::error!(
566 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
567 );
568 error_count += 1;
569 }
570 }
571
572 for client_order_id in self.index.order_position.keys() {
573 if !self.orders.contains_key(client_order_id) {
574 log::error!(
575 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
576 );
577 error_count += 1;
578 }
579 }
580
581 for client_order_id in self.index.order_strategy.keys() {
583 if !self.orders.contains_key(client_order_id) {
584 log::error!(
585 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
586 );
587 error_count += 1;
588 }
589 }
590
591 for position_id in self.index.position_strategy.keys() {
592 if !self.positions.contains_key(position_id) {
593 log::error!(
594 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
595 );
596 error_count += 1;
597 }
598 }
599
600 for position_id in self.index.position_orders.keys() {
601 if !self.positions.contains_key(position_id) {
602 log::error!(
603 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
604 );
605 error_count += 1;
606 }
607 }
608
609 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
610 for client_order_id in client_order_ids {
611 if !self.orders.contains_key(client_order_id) {
612 log::error!(
613 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
614 );
615 error_count += 1;
616 }
617 }
618 }
619
620 for instrument_id in self.index.instrument_positions.keys() {
621 if !self.index.instrument_orders.contains_key(instrument_id) {
622 log::error!(
623 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
624 );
625 error_count += 1;
626 }
627 }
628
629 for client_order_ids in self.index.strategy_orders.values() {
630 for client_order_id in client_order_ids {
631 if !self.orders.contains_key(client_order_id) {
632 log::error!(
633 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
634 );
635 error_count += 1;
636 }
637 }
638 }
639
640 for position_ids in self.index.strategy_positions.values() {
641 for position_id in position_ids {
642 if !self.positions.contains_key(position_id) {
643 log::error!(
644 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
645 );
646 error_count += 1;
647 }
648 }
649 }
650
651 for client_order_id in &self.index.orders {
652 if !self.orders.contains_key(client_order_id) {
653 log::error!(
654 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
655 );
656 error_count += 1;
657 }
658 }
659
660 for client_order_id in &self.index.orders_emulated {
661 if !self.orders.contains_key(client_order_id) {
662 log::error!(
663 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
664 );
665 error_count += 1;
666 }
667 }
668
669 for client_order_id in &self.index.orders_inflight {
670 if !self.orders.contains_key(client_order_id) {
671 log::error!(
672 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
673 );
674 error_count += 1;
675 }
676 }
677
678 for client_order_id in &self.index.orders_open {
679 if !self.orders.contains_key(client_order_id) {
680 log::error!(
681 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
682 );
683 error_count += 1;
684 }
685 }
686
687 for client_order_id in &self.index.orders_closed {
688 if !self.orders.contains_key(client_order_id) {
689 log::error!(
690 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
691 );
692 error_count += 1;
693 }
694 }
695
696 for position_id in &self.index.positions {
697 if !self.positions.contains_key(position_id) {
698 log::error!(
699 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
700 );
701 error_count += 1;
702 }
703 }
704
705 for position_id in &self.index.positions_open {
706 if !self.positions.contains_key(position_id) {
707 log::error!(
708 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
709 );
710 error_count += 1;
711 }
712 }
713
714 for position_id in &self.index.positions_closed {
715 if !self.positions.contains_key(position_id) {
716 log::error!(
717 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
718 );
719 error_count += 1;
720 }
721 }
722
723 for strategy_id in &self.index.strategies {
724 if !self.index.strategy_orders.contains_key(strategy_id) {
725 log::error!(
726 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
727 );
728 error_count += 1;
729 }
730 }
731
732 for exec_algorithm_id in &self.index.exec_algorithms {
733 if !self
734 .index
735 .exec_algorithm_orders
736 .contains_key(exec_algorithm_id)
737 {
738 log::error!(
739 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
740 );
741 error_count += 1;
742 }
743 }
744
745 let total_us = SystemTime::now()
746 .duration_since(UNIX_EPOCH)
747 .expect("Time went backwards")
748 .as_micros()
749 - timestamp_us;
750
751 if error_count == 0 {
752 log::info!("Integrity check passed in {total_us}μs");
753 true
754 } else {
755 log::error!(
756 "Integrity check failed with {error_count} error{} in {total_us}μs",
757 if error_count == 1 { "" } else { "s" },
758 );
759 false
760 }
761 }
762
763 #[must_use]
767 pub fn check_residuals(&self) -> bool {
768 log::debug!("Checking residuals");
769
770 let mut residuals = false;
771
772 for order in self.orders_open(None, None, None, None) {
774 residuals = true;
775 log::warn!("Residual {order:?}");
776 }
777
778 for position in self.positions_open(None, None, None, None) {
780 residuals = true;
781 log::warn!("Residual {position}");
782 }
783
784 residuals
785 }
786
787 pub fn clear_index(&mut self) {
789 self.index.clear();
790 log::debug!("Cleared index");
791 }
792
793 pub fn reset(&mut self) {
797 log::debug!("Resetting cache");
798
799 self.general.clear();
800 self.mark_prices.clear();
801 self.mark_xrates.clear();
802 self.quotes.clear();
803 self.trades.clear();
804 self.books.clear();
805 self.bars.clear();
806 self.currencies.clear();
807 self.instruments.clear();
808 self.synthetics.clear();
809 self.accounts.clear();
810 self.orders.clear();
811 self.order_lists.clear();
812 self.positions.clear();
813 self.position_snapshots.clear();
814
815 self.clear_index();
816
817 log::info!("Reset cache");
818 }
819
820 pub fn dispose(&mut self) {
822 if let Some(database) = &mut self.database {
823 database.close().expect("Failed to close database");
824 }
825 }
826
827 pub fn flush_db(&mut self) {
829 if let Some(database) = &mut self.database {
830 database.flush().expect("Failed to flush database");
831 }
832 }
833
834 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
839 check_valid_string(key, stringify!(key)).expect(FAILED);
840 check_predicate_false(value.is_empty(), stringify!(value)).expect(FAILED);
841
842 log::debug!("Adding general {key}");
843 self.general.insert(key.to_string(), value.clone());
844
845 if let Some(database) = &mut self.database {
846 database.add(key.to_string(), value)?;
847 }
848 Ok(())
849 }
850
851 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
853 log::debug!("Adding `OrderBook` {}", book.instrument_id);
854
855 if self.config.save_market_data {
856 if let Some(database) = &mut self.database {
857 database.add_order_book(&book)?;
858 }
859 }
860
861 self.books.insert(book.instrument_id, book);
862 Ok(())
863 }
864
865 pub fn add_mark_price(&mut self, instrument_id: &InstrumentId, price: Price) {
867 log::debug!("Adding mark `Price` for {instrument_id}");
868
869 self.mark_prices.insert(*instrument_id, price);
870 }
871
872 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
874 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
875
876 if self.config.save_market_data {
877 if let Some(database) = &mut self.database {
878 database.add_quote("e)?;
879 }
880 }
881
882 let quotes_deque = self
883 .quotes
884 .entry(quote.instrument_id)
885 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
886 quotes_deque.push_front(quote);
887 Ok(())
888 }
889
890 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
892 check_slice_not_empty(quotes, stringify!(quotes)).unwrap();
893
894 let instrument_id = quotes[0].instrument_id;
895 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
896
897 if self.config.save_market_data {
898 if let Some(database) = &mut self.database {
899 for quote in quotes {
900 database.add_quote(quote).unwrap();
901 }
902 }
903 }
904
905 let quotes_deque = self
906 .quotes
907 .entry(instrument_id)
908 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
909
910 for quote in quotes {
911 quotes_deque.push_front(*quote);
912 }
913 Ok(())
914 }
915
916 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
918 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
919
920 if self.config.save_market_data {
921 if let Some(database) = &mut self.database {
922 database.add_trade(&trade)?;
923 }
924 }
925
926 let trades_deque = self
927 .trades
928 .entry(trade.instrument_id)
929 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
930 trades_deque.push_front(trade);
931 Ok(())
932 }
933
934 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
936 check_slice_not_empty(trades, stringify!(trades)).unwrap();
937
938 let instrument_id = trades[0].instrument_id;
939 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
940
941 if self.config.save_market_data {
942 if let Some(database) = &mut self.database {
943 for trade in trades {
944 database.add_trade(trade).unwrap();
945 }
946 }
947 }
948
949 let trades_deque = self
950 .trades
951 .entry(instrument_id)
952 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
953
954 for trade in trades {
955 trades_deque.push_front(*trade);
956 }
957 Ok(())
958 }
959
960 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
962 log::debug!("Adding `Bar` {}", bar.bar_type);
963
964 if self.config.save_market_data {
965 if let Some(database) = &mut self.database {
966 database.add_bar(&bar)?;
967 }
968 }
969
970 let bars = self
971 .bars
972 .entry(bar.bar_type)
973 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
974 bars.push_front(bar);
975 Ok(())
976 }
977
978 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
980 check_slice_not_empty(bars, stringify!(bars)).unwrap();
981
982 let bar_type = bars[0].bar_type;
983 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
984
985 if self.config.save_market_data {
986 if let Some(database) = &mut self.database {
987 for bar in bars {
988 database.add_bar(bar).unwrap();
989 }
990 }
991 }
992
993 let bars_deque = self
994 .bars
995 .entry(bar_type)
996 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
997
998 for bar in bars {
999 bars_deque.push_front(*bar);
1000 }
1001 Ok(())
1002 }
1003
1004 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1006 log::debug!("Adding `Currency` {}", currency.code);
1007
1008 if let Some(database) = &mut self.database {
1009 database.add_currency(¤cy)?;
1010 }
1011
1012 self.currencies.insert(currency.code, currency);
1013 Ok(())
1014 }
1015
1016 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1018 log::debug!("Adding `Instrument` {}", instrument.id());
1019
1020 if let Some(database) = &mut self.database {
1021 database.add_instrument(&instrument)?;
1022 }
1023
1024 self.instruments.insert(instrument.id(), instrument);
1025 Ok(())
1026 }
1027
1028 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1030 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1031
1032 if let Some(database) = &mut self.database {
1033 database.add_synthetic(&synthetic)?;
1034 }
1035
1036 self.synthetics.insert(synthetic.id, synthetic);
1037 Ok(())
1038 }
1039
1040 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1042 log::debug!("Adding `Account` {}", account.id());
1043
1044 if let Some(database) = &mut self.database {
1045 database.add_account(&account)?;
1046 }
1047
1048 let account_id = account.id();
1049 self.accounts.insert(account_id, account);
1050 self.index
1051 .venue_account
1052 .insert(account_id.get_issuer(), account_id);
1053 Ok(())
1054 }
1055
1056 pub fn add_venue_order_id(
1060 &mut self,
1061 client_order_id: &ClientOrderId,
1062 venue_order_id: &VenueOrderId,
1063 overwrite: bool,
1064 ) -> anyhow::Result<()> {
1065 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id) {
1066 if !overwrite && existing_venue_order_id != venue_order_id {
1067 anyhow::bail!(
1068 "Existing {existing_venue_order_id} for {client_order_id}
1069 did not match the given {venue_order_id}.
1070 If you are writing a test then try a different `venue_order_id`,
1071 otherwise this is probably a bug."
1072 );
1073 }
1074 }
1075
1076 self.index
1077 .client_order_ids
1078 .insert(*client_order_id, *venue_order_id);
1079 self.index
1080 .venue_order_ids
1081 .insert(*venue_order_id, *client_order_id);
1082
1083 Ok(())
1084 }
1085
1086 pub fn add_order(
1099 &mut self,
1100 order: OrderAny,
1101 position_id: Option<PositionId>,
1102 client_id: Option<ClientId>,
1103 replace_existing: bool,
1104 ) -> anyhow::Result<()> {
1105 let instrument_id = order.instrument_id();
1106 let venue = instrument_id.venue;
1107 let client_order_id = order.client_order_id();
1108 let strategy_id = order.strategy_id();
1109 let exec_algorithm_id = order.exec_algorithm_id();
1110 let exec_spawn_id = order.exec_spawn_id();
1111
1112 if !replace_existing {
1113 check_key_not_in_map(
1114 &client_order_id,
1115 &self.orders,
1116 stringify!(client_order_id),
1117 stringify!(orders),
1118 )
1119 .expect(FAILED);
1120 check_key_not_in_map(
1121 &client_order_id,
1122 &self.orders,
1123 stringify!(client_order_id),
1124 stringify!(orders),
1125 )
1126 .expect(FAILED);
1127 check_key_not_in_map(
1128 &client_order_id,
1129 &self.orders,
1130 stringify!(client_order_id),
1131 stringify!(orders),
1132 )
1133 .expect(FAILED);
1134 check_key_not_in_map(
1135 &client_order_id,
1136 &self.orders,
1137 stringify!(client_order_id),
1138 stringify!(orders),
1139 )
1140 .expect(FAILED);
1141 }
1142
1143 log::debug!("Adding {order:?}");
1144
1145 self.index.orders.insert(client_order_id);
1146 self.index
1147 .order_strategy
1148 .insert(client_order_id, strategy_id);
1149 self.index.strategies.insert(strategy_id);
1150
1151 self.index
1153 .venue_orders
1154 .entry(venue)
1155 .or_default()
1156 .insert(client_order_id);
1157
1158 self.index
1160 .instrument_orders
1161 .entry(instrument_id)
1162 .or_default()
1163 .insert(client_order_id);
1164
1165 self.index
1167 .strategy_orders
1168 .entry(strategy_id)
1169 .or_default()
1170 .insert(client_order_id);
1171
1172 if let Some(exec_algorithm_id) = exec_algorithm_id {
1174 self.index.exec_algorithms.insert(exec_algorithm_id);
1175
1176 self.index
1177 .exec_algorithm_orders
1178 .entry(exec_algorithm_id)
1179 .or_default()
1180 .insert(client_order_id);
1181
1182 self.index
1183 .exec_spawn_orders
1184 .entry(exec_spawn_id.expect("`exec_spawn_id` is guaranteed to exist"))
1185 .or_default()
1186 .insert(client_order_id);
1187 }
1188
1189 match order.emulation_trigger() {
1191 Some(_) => {
1192 self.index.orders_emulated.remove(&client_order_id);
1193 }
1194 None => {
1195 self.index.orders_emulated.insert(client_order_id);
1196 }
1197 }
1198
1199 if let Some(position_id) = position_id {
1201 self.add_position_id(
1202 &position_id,
1203 &order.instrument_id().venue,
1204 &client_order_id,
1205 &strategy_id,
1206 )?;
1207 }
1208
1209 if let Some(client_id) = client_id {
1211 self.index.order_client.insert(client_order_id, client_id);
1212 log::debug!("Indexed {client_id:?}");
1213 }
1214
1215 if let Some(database) = &mut self.database {
1216 database.add_order(&order, client_id)?;
1217 }
1222
1223 self.orders.insert(client_order_id, order);
1224
1225 Ok(())
1226 }
1227
1228 pub fn add_position_id(
1230 &mut self,
1231 position_id: &PositionId,
1232 _venue: &Venue,
1233 client_order_id: &ClientOrderId,
1234 strategy_id: &StrategyId,
1235 ) -> anyhow::Result<()> {
1236 self.index
1237 .order_position
1238 .insert(*client_order_id, *position_id);
1239
1240 if let Some(database) = &mut self.database {
1242 database.index_order_position(*client_order_id, *position_id)?;
1243 }
1244
1245 self.index
1247 .position_strategy
1248 .insert(*position_id, *strategy_id);
1249
1250 self.index
1252 .position_orders
1253 .entry(*position_id)
1254 .or_default()
1255 .insert(*client_order_id);
1256
1257 self.index
1259 .strategy_positions
1260 .entry(*strategy_id)
1261 .or_default()
1262 .insert(*position_id);
1263
1264 Ok(())
1265 }
1266
1267 pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1269 self.positions.insert(position.id, position.clone());
1270 self.index.positions.insert(position.id);
1271 self.index.positions_open.insert(position.id);
1272
1273 log::debug!("Adding {position}");
1274
1275 self.add_position_id(
1276 &position.id,
1277 &position.instrument_id.venue,
1278 &position.opening_order_id,
1279 &position.strategy_id,
1280 )?;
1281
1282 let venue = position.instrument_id.venue;
1283 let venue_positions = self.index.venue_positions.entry(venue).or_default();
1284 venue_positions.insert(position.id);
1285
1286 let instrument_id = position.instrument_id;
1288 let instrument_positions = self
1289 .index
1290 .instrument_positions
1291 .entry(instrument_id)
1292 .or_default();
1293 instrument_positions.insert(position.id);
1294
1295 if let Some(database) = &mut self.database {
1296 database.add_position(&position)?;
1297 }
1306
1307 Ok(())
1308 }
1309
1310 pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1312 if let Some(database) = &mut self.database {
1313 database.update_account(&account)?;
1314 }
1315 Ok(())
1316 }
1317
1318 pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1320 let client_order_id = order.client_order_id();
1321
1322 if let Some(venue_order_id) = order.venue_order_id() {
1324 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1327 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1329 }
1330 }
1331
1332 if order.is_inflight() {
1334 self.index.orders_inflight.insert(client_order_id);
1335 } else {
1336 self.index.orders_inflight.remove(&client_order_id);
1337 }
1338
1339 if order.is_open() {
1341 self.index.orders_closed.remove(&client_order_id);
1342 self.index.orders_open.insert(client_order_id);
1343 } else if order.is_closed() {
1344 self.index.orders_open.remove(&client_order_id);
1345 self.index.orders_pending_cancel.remove(&client_order_id);
1346 self.index.orders_closed.insert(client_order_id);
1347 }
1348
1349 if let Some(emulation_trigger) = order.emulation_trigger() {
1351 match emulation_trigger {
1352 TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1353 _ => self.index.orders_emulated.insert(client_order_id),
1354 };
1355 }
1356
1357 if let Some(database) = &mut self.database {
1358 database.update_order(order.last_event())?;
1359 }
1364
1365 self.orders.insert(client_order_id, order.clone());
1367
1368 Ok(())
1369 }
1370
1371 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1373 self.index
1374 .orders_pending_cancel
1375 .insert(order.client_order_id());
1376 }
1377
1378 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1380 if position.is_open() {
1382 self.index.positions_open.insert(position.id);
1383 self.index.positions_closed.remove(&position.id);
1384 } else {
1385 self.index.positions_closed.insert(position.id);
1386 self.index.positions_open.remove(&position.id);
1387 }
1388
1389 if let Some(database) = &mut self.database {
1390 database.update_position(position)?;
1391 }
1396 Ok(())
1397 }
1398
1399 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1402 let position_id = position.id;
1403
1404 let mut copied_position = position.clone();
1405 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1406 copied_position.id = PositionId::new(new_id);
1407
1408 let position_serialized = bincode::serialize(&copied_position)?;
1410
1411 let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1412 let new_snapshots = match snapshots {
1413 Some(existing_snapshots) => {
1414 let mut combined = existing_snapshots.to_vec();
1415 combined.extend(position_serialized);
1416 Bytes::from(combined)
1417 }
1418 None => Bytes::from(position_serialized),
1419 };
1420 self.position_snapshots.insert(position_id, new_snapshots);
1421
1422 log::debug!("Snapshot {}", copied_position);
1423 Ok(())
1424 }
1425
1426 pub fn snapshot_position_state(
1427 &mut self,
1428 position: &Position,
1429 open_only: Option<bool>,
1432 ) -> anyhow::Result<()> {
1433 let open_only = open_only.unwrap_or(true);
1434
1435 if open_only && !position.is_open() {
1436 return Ok(());
1437 }
1438
1439 if let Some(database) = &mut self.database {
1440 database.snapshot_position_state(position).map_err(|e| {
1441 log::error!(
1442 "Failed to snapshot position state for {}: {:?}",
1443 position.id,
1444 e
1445 );
1446 e
1447 })?;
1448 } else {
1449 log::warn!(
1450 "Cannot snapshot position state for {} (no database configured)",
1451 position.id
1452 );
1453 }
1454
1455 todo!()
1457 }
1458
1459 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1460 let database = if let Some(database) = &self.database {
1461 database
1462 } else {
1463 log::warn!(
1464 "Cannot snapshot order state for {} (no database configured)",
1465 order.client_order_id()
1466 );
1467 return Ok(());
1468 };
1469
1470 database.snapshot_order_state(order)
1471 }
1472
1473 fn build_order_query_filter_set(
1476 &self,
1477 venue: Option<&Venue>,
1478 instrument_id: Option<&InstrumentId>,
1479 strategy_id: Option<&StrategyId>,
1480 ) -> Option<HashSet<ClientOrderId>> {
1481 let mut query: Option<HashSet<ClientOrderId>> = None;
1482
1483 if let Some(venue) = venue {
1484 query = Some(
1485 self.index
1486 .venue_orders
1487 .get(venue)
1488 .cloned()
1489 .unwrap_or_default(),
1490 );
1491 }
1492
1493 if let Some(instrument_id) = instrument_id {
1494 let instrument_orders = self
1495 .index
1496 .instrument_orders
1497 .get(instrument_id)
1498 .cloned()
1499 .unwrap_or_default();
1500
1501 if let Some(existing_query) = &mut query {
1502 *existing_query = existing_query
1503 .intersection(&instrument_orders)
1504 .copied()
1505 .collect();
1506 } else {
1507 query = Some(instrument_orders);
1508 }
1509 }
1510
1511 if let Some(strategy_id) = strategy_id {
1512 let strategy_orders = self
1513 .index
1514 .strategy_orders
1515 .get(strategy_id)
1516 .cloned()
1517 .unwrap_or_default();
1518
1519 if let Some(existing_query) = &mut query {
1520 *existing_query = existing_query
1521 .intersection(&strategy_orders)
1522 .copied()
1523 .collect();
1524 } else {
1525 query = Some(strategy_orders);
1526 }
1527 }
1528
1529 query
1530 }
1531
1532 fn build_position_query_filter_set(
1533 &self,
1534 venue: Option<&Venue>,
1535 instrument_id: Option<&InstrumentId>,
1536 strategy_id: Option<&StrategyId>,
1537 ) -> Option<HashSet<PositionId>> {
1538 let mut query: Option<HashSet<PositionId>> = None;
1539
1540 if let Some(venue) = venue {
1541 query = Some(
1542 self.index
1543 .venue_positions
1544 .get(venue)
1545 .cloned()
1546 .unwrap_or_default(),
1547 );
1548 }
1549
1550 if let Some(instrument_id) = instrument_id {
1551 let instrument_positions = self
1552 .index
1553 .instrument_positions
1554 .get(instrument_id)
1555 .cloned()
1556 .unwrap_or_default();
1557
1558 if let Some(existing_query) = query {
1559 query = Some(
1560 existing_query
1561 .intersection(&instrument_positions)
1562 .copied()
1563 .collect(),
1564 );
1565 } else {
1566 query = Some(instrument_positions);
1567 }
1568 }
1569
1570 if let Some(strategy_id) = strategy_id {
1571 let strategy_positions = self
1572 .index
1573 .strategy_positions
1574 .get(strategy_id)
1575 .cloned()
1576 .unwrap_or_default();
1577
1578 if let Some(existing_query) = query {
1579 query = Some(
1580 existing_query
1581 .intersection(&strategy_positions)
1582 .copied()
1583 .collect(),
1584 );
1585 } else {
1586 query = Some(strategy_positions);
1587 }
1588 }
1589
1590 query
1591 }
1592
1593 fn get_orders_for_ids(
1594 &self,
1595 client_order_ids: &HashSet<ClientOrderId>,
1596 side: Option<OrderSide>,
1597 ) -> Vec<&OrderAny> {
1598 let side = side.unwrap_or(OrderSide::NoOrderSide);
1599 let mut orders = Vec::new();
1600
1601 for client_order_id in client_order_ids {
1602 let order = self
1603 .orders
1604 .get(client_order_id)
1605 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
1606 if side == OrderSide::NoOrderSide || side == order.order_side() {
1607 orders.push(order);
1608 }
1609 }
1610
1611 orders
1612 }
1613
1614 fn get_positions_for_ids(
1615 &self,
1616 position_ids: &HashSet<PositionId>,
1617 side: Option<PositionSide>,
1618 ) -> Vec<&Position> {
1619 let side = side.unwrap_or(PositionSide::NoPositionSide);
1620 let mut positions = Vec::new();
1621
1622 for position_id in position_ids {
1623 let position = self
1624 .positions
1625 .get(position_id)
1626 .unwrap_or_else(|| panic!("Position {position_id} not found"));
1627 if side == PositionSide::NoPositionSide || side == position.side {
1628 positions.push(position);
1629 }
1630 }
1631
1632 positions
1633 }
1634
1635 #[must_use]
1637 pub fn client_order_ids(
1638 &self,
1639 venue: Option<&Venue>,
1640 instrument_id: Option<&InstrumentId>,
1641 strategy_id: Option<&StrategyId>,
1642 ) -> HashSet<ClientOrderId> {
1643 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1644 match query {
1645 Some(query) => self.index.orders.intersection(&query).copied().collect(),
1646 None => self.index.orders.clone(),
1647 }
1648 }
1649
1650 #[must_use]
1652 pub fn client_order_ids_open(
1653 &self,
1654 venue: Option<&Venue>,
1655 instrument_id: Option<&InstrumentId>,
1656 strategy_id: Option<&StrategyId>,
1657 ) -> HashSet<ClientOrderId> {
1658 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1659 match query {
1660 Some(query) => self
1661 .index
1662 .orders_open
1663 .intersection(&query)
1664 .copied()
1665 .collect(),
1666 None => self.index.orders_open.clone(),
1667 }
1668 }
1669
1670 #[must_use]
1672 pub fn client_order_ids_closed(
1673 &self,
1674 venue: Option<&Venue>,
1675 instrument_id: Option<&InstrumentId>,
1676 strategy_id: Option<&StrategyId>,
1677 ) -> HashSet<ClientOrderId> {
1678 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1679 match query {
1680 Some(query) => self
1681 .index
1682 .orders_closed
1683 .intersection(&query)
1684 .copied()
1685 .collect(),
1686 None => self.index.orders_closed.clone(),
1687 }
1688 }
1689
1690 #[must_use]
1692 pub fn client_order_ids_emulated(
1693 &self,
1694 venue: Option<&Venue>,
1695 instrument_id: Option<&InstrumentId>,
1696 strategy_id: Option<&StrategyId>,
1697 ) -> HashSet<ClientOrderId> {
1698 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1699 match query {
1700 Some(query) => self
1701 .index
1702 .orders_emulated
1703 .intersection(&query)
1704 .copied()
1705 .collect(),
1706 None => self.index.orders_emulated.clone(),
1707 }
1708 }
1709
1710 #[must_use]
1712 pub fn client_order_ids_inflight(
1713 &self,
1714 venue: Option<&Venue>,
1715 instrument_id: Option<&InstrumentId>,
1716 strategy_id: Option<&StrategyId>,
1717 ) -> HashSet<ClientOrderId> {
1718 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1719 match query {
1720 Some(query) => self
1721 .index
1722 .orders_inflight
1723 .intersection(&query)
1724 .copied()
1725 .collect(),
1726 None => self.index.orders_inflight.clone(),
1727 }
1728 }
1729
1730 #[must_use]
1732 pub fn position_ids(
1733 &self,
1734 venue: Option<&Venue>,
1735 instrument_id: Option<&InstrumentId>,
1736 strategy_id: Option<&StrategyId>,
1737 ) -> HashSet<PositionId> {
1738 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
1739 match query {
1740 Some(query) => self.index.positions.intersection(&query).copied().collect(),
1741 None => self.index.positions.clone(),
1742 }
1743 }
1744
1745 #[must_use]
1747 pub fn position_open_ids(
1748 &self,
1749 venue: Option<&Venue>,
1750 instrument_id: Option<&InstrumentId>,
1751 strategy_id: Option<&StrategyId>,
1752 ) -> HashSet<PositionId> {
1753 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
1754 match query {
1755 Some(query) => self
1756 .index
1757 .positions_open
1758 .intersection(&query)
1759 .copied()
1760 .collect(),
1761 None => self.index.positions_open.clone(),
1762 }
1763 }
1764
1765 #[must_use]
1767 pub fn position_closed_ids(
1768 &self,
1769 venue: Option<&Venue>,
1770 instrument_id: Option<&InstrumentId>,
1771 strategy_id: Option<&StrategyId>,
1772 ) -> HashSet<PositionId> {
1773 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
1774 match query {
1775 Some(query) => self
1776 .index
1777 .positions_closed
1778 .intersection(&query)
1779 .copied()
1780 .collect(),
1781 None => self.index.positions_closed.clone(),
1782 }
1783 }
1784
1785 #[must_use]
1787 pub fn actor_ids(&self) -> HashSet<ComponentId> {
1788 self.index.actors.clone()
1789 }
1790
1791 #[must_use]
1793 pub fn strategy_ids(&self) -> HashSet<StrategyId> {
1794 self.index.strategies.clone()
1795 }
1796
1797 #[must_use]
1799 pub fn exec_algorithm_ids(&self) -> HashSet<ExecAlgorithmId> {
1800 self.index.exec_algorithms.clone()
1801 }
1802
1803 #[must_use]
1807 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
1808 self.orders.get(client_order_id)
1809 }
1810
1811 #[must_use]
1813 pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
1814 self.orders.get_mut(client_order_id)
1815 }
1816
1817 #[must_use]
1819 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
1820 self.index.venue_order_ids.get(venue_order_id)
1821 }
1822
1823 #[must_use]
1825 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
1826 self.index.client_order_ids.get(client_order_id)
1827 }
1828
1829 #[must_use]
1831 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
1832 self.index.order_client.get(client_order_id)
1833 }
1834
1835 #[must_use]
1837 pub fn orders(
1838 &self,
1839 venue: Option<&Venue>,
1840 instrument_id: Option<&InstrumentId>,
1841 strategy_id: Option<&StrategyId>,
1842 side: Option<OrderSide>,
1843 ) -> Vec<&OrderAny> {
1844 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
1845 self.get_orders_for_ids(&client_order_ids, side)
1846 }
1847
1848 #[must_use]
1850 pub fn orders_open(
1851 &self,
1852 venue: Option<&Venue>,
1853 instrument_id: Option<&InstrumentId>,
1854 strategy_id: Option<&StrategyId>,
1855 side: Option<OrderSide>,
1856 ) -> Vec<&OrderAny> {
1857 let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
1858 self.get_orders_for_ids(&client_order_ids, side)
1859 }
1860
1861 #[must_use]
1863 pub fn orders_closed(
1864 &self,
1865 venue: Option<&Venue>,
1866 instrument_id: Option<&InstrumentId>,
1867 strategy_id: Option<&StrategyId>,
1868 side: Option<OrderSide>,
1869 ) -> Vec<&OrderAny> {
1870 let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
1871 self.get_orders_for_ids(&client_order_ids, side)
1872 }
1873
1874 #[must_use]
1876 pub fn orders_emulated(
1877 &self,
1878 venue: Option<&Venue>,
1879 instrument_id: Option<&InstrumentId>,
1880 strategy_id: Option<&StrategyId>,
1881 side: Option<OrderSide>,
1882 ) -> Vec<&OrderAny> {
1883 let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
1884 self.get_orders_for_ids(&client_order_ids, side)
1885 }
1886
1887 #[must_use]
1889 pub fn orders_inflight(
1890 &self,
1891 venue: Option<&Venue>,
1892 instrument_id: Option<&InstrumentId>,
1893 strategy_id: Option<&StrategyId>,
1894 side: Option<OrderSide>,
1895 ) -> Vec<&OrderAny> {
1896 let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
1897 self.get_orders_for_ids(&client_order_ids, side)
1898 }
1899
1900 #[must_use]
1902 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
1903 let client_order_ids = self.index.position_orders.get(position_id);
1904 match client_order_ids {
1905 Some(client_order_ids) => {
1906 self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
1907 }
1908 None => Vec::new(),
1909 }
1910 }
1911
1912 #[must_use]
1914 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
1915 self.index.orders.contains(client_order_id)
1916 }
1917
1918 #[must_use]
1920 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
1921 self.index.orders_open.contains(client_order_id)
1922 }
1923
1924 #[must_use]
1926 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
1927 self.index.orders_closed.contains(client_order_id)
1928 }
1929
1930 #[must_use]
1932 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
1933 self.index.orders_emulated.contains(client_order_id)
1934 }
1935
1936 #[must_use]
1938 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
1939 self.index.orders_inflight.contains(client_order_id)
1940 }
1941
1942 #[must_use]
1944 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
1945 self.index.orders_pending_cancel.contains(client_order_id)
1946 }
1947
1948 #[must_use]
1950 pub fn orders_open_count(
1951 &self,
1952 venue: Option<&Venue>,
1953 instrument_id: Option<&InstrumentId>,
1954 strategy_id: Option<&StrategyId>,
1955 side: Option<OrderSide>,
1956 ) -> usize {
1957 self.orders_open(venue, instrument_id, strategy_id, side)
1958 .len()
1959 }
1960
1961 #[must_use]
1963 pub fn orders_closed_count(
1964 &self,
1965 venue: Option<&Venue>,
1966 instrument_id: Option<&InstrumentId>,
1967 strategy_id: Option<&StrategyId>,
1968 side: Option<OrderSide>,
1969 ) -> usize {
1970 self.orders_closed(venue, instrument_id, strategy_id, side)
1971 .len()
1972 }
1973
1974 #[must_use]
1976 pub fn orders_emulated_count(
1977 &self,
1978 venue: Option<&Venue>,
1979 instrument_id: Option<&InstrumentId>,
1980 strategy_id: Option<&StrategyId>,
1981 side: Option<OrderSide>,
1982 ) -> usize {
1983 self.orders_emulated(venue, instrument_id, strategy_id, side)
1984 .len()
1985 }
1986
1987 #[must_use]
1989 pub fn orders_inflight_count(
1990 &self,
1991 venue: Option<&Venue>,
1992 instrument_id: Option<&InstrumentId>,
1993 strategy_id: Option<&StrategyId>,
1994 side: Option<OrderSide>,
1995 ) -> usize {
1996 self.orders_inflight(venue, instrument_id, strategy_id, side)
1997 .len()
1998 }
1999
2000 #[must_use]
2002 pub fn orders_total_count(
2003 &self,
2004 venue: Option<&Venue>,
2005 instrument_id: Option<&InstrumentId>,
2006 strategy_id: Option<&StrategyId>,
2007 side: Option<OrderSide>,
2008 ) -> usize {
2009 self.orders(venue, instrument_id, strategy_id, side).len()
2010 }
2011
2012 #[must_use]
2014 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2015 self.order_lists.get(order_list_id)
2016 }
2017
2018 #[must_use]
2020 pub fn order_lists(
2021 &self,
2022 venue: Option<&Venue>,
2023 instrument_id: Option<&InstrumentId>,
2024 strategy_id: Option<&StrategyId>,
2025 ) -> Vec<&OrderList> {
2026 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2027
2028 if let Some(venue) = venue {
2029 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2030 }
2031
2032 if let Some(instrument_id) = instrument_id {
2033 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2034 }
2035
2036 if let Some(strategy_id) = strategy_id {
2037 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2038 }
2039
2040 order_lists
2041 }
2042
2043 #[must_use]
2045 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2046 self.order_lists.contains_key(order_list_id)
2047 }
2048
2049 #[must_use]
2054 pub fn orders_for_exec_algorithm(
2055 &self,
2056 exec_algorithm_id: &ExecAlgorithmId,
2057 venue: Option<&Venue>,
2058 instrument_id: Option<&InstrumentId>,
2059 strategy_id: Option<&StrategyId>,
2060 side: Option<OrderSide>,
2061 ) -> Vec<&OrderAny> {
2062 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2063 let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2064
2065 if let Some(query) = query {
2066 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2067 let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2068 }
2069 }
2070
2071 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2072 self.get_orders_for_ids(exec_algorithm_order_ids, side)
2073 } else {
2074 Vec::new()
2075 }
2076 }
2077
2078 #[must_use]
2080 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2081 self.get_orders_for_ids(
2082 self.index
2083 .exec_spawn_orders
2084 .get(exec_spawn_id)
2085 .unwrap_or(&HashSet::new()),
2086 None,
2087 )
2088 }
2089
2090 #[must_use]
2092 pub fn exec_spawn_total_quantity(
2093 &self,
2094 exec_spawn_id: &ClientOrderId,
2095 active_only: bool,
2096 ) -> Option<Quantity> {
2097 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2098
2099 let mut total_quantity: Option<Quantity> = None;
2100
2101 for spawn_order in exec_spawn_orders {
2102 if !active_only || !spawn_order.is_closed() {
2103 if let Some(mut total_quantity) = total_quantity {
2104 total_quantity += spawn_order.quantity();
2105 }
2106 } else {
2107 total_quantity = Some(spawn_order.quantity());
2108 }
2109 }
2110
2111 total_quantity
2112 }
2113
2114 #[must_use]
2116 pub fn exec_spawn_total_filled_qty(
2117 &self,
2118 exec_spawn_id: &ClientOrderId,
2119 active_only: bool,
2120 ) -> Option<Quantity> {
2121 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2122
2123 let mut total_quantity: Option<Quantity> = None;
2124
2125 for spawn_order in exec_spawn_orders {
2126 if !active_only || !spawn_order.is_closed() {
2127 if let Some(mut total_quantity) = total_quantity {
2128 total_quantity += spawn_order.filled_qty();
2129 }
2130 } else {
2131 total_quantity = Some(spawn_order.filled_qty());
2132 }
2133 }
2134
2135 total_quantity
2136 }
2137
2138 #[must_use]
2140 pub fn exec_spawn_total_leaves_qty(
2141 &self,
2142 exec_spawn_id: &ClientOrderId,
2143 active_only: bool,
2144 ) -> Option<Quantity> {
2145 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2146
2147 let mut total_quantity: Option<Quantity> = None;
2148
2149 for spawn_order in exec_spawn_orders {
2150 if !active_only || !spawn_order.is_closed() {
2151 if let Some(mut total_quantity) = total_quantity {
2152 total_quantity += spawn_order.leaves_qty();
2153 }
2154 } else {
2155 total_quantity = Some(spawn_order.leaves_qty());
2156 }
2157 }
2158
2159 total_quantity
2160 }
2161
2162 #[must_use]
2166 pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2167 self.positions.get(position_id)
2168 }
2169
2170 #[must_use]
2172 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2173 self.index
2174 .order_position
2175 .get(client_order_id)
2176 .and_then(|position_id| self.positions.get(position_id))
2177 }
2178
2179 #[must_use]
2181 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2182 self.index.order_position.get(client_order_id)
2183 }
2184
2185 #[must_use]
2187 pub fn positions(
2188 &self,
2189 venue: Option<&Venue>,
2190 instrument_id: Option<&InstrumentId>,
2191 strategy_id: Option<&StrategyId>,
2192 side: Option<PositionSide>,
2193 ) -> Vec<&Position> {
2194 let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2195 self.get_positions_for_ids(&position_ids, side)
2196 }
2197
2198 #[must_use]
2200 pub fn positions_open(
2201 &self,
2202 venue: Option<&Venue>,
2203 instrument_id: Option<&InstrumentId>,
2204 strategy_id: Option<&StrategyId>,
2205 side: Option<PositionSide>,
2206 ) -> Vec<&Position> {
2207 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2208 self.get_positions_for_ids(&position_ids, side)
2209 }
2210
2211 #[must_use]
2213 pub fn positions_closed(
2214 &self,
2215 venue: Option<&Venue>,
2216 instrument_id: Option<&InstrumentId>,
2217 strategy_id: Option<&StrategyId>,
2218 side: Option<PositionSide>,
2219 ) -> Vec<&Position> {
2220 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2221 self.get_positions_for_ids(&position_ids, side)
2222 }
2223
2224 #[must_use]
2226 pub fn position_exists(&self, position_id: &PositionId) -> bool {
2227 self.index.positions.contains(position_id)
2228 }
2229
2230 #[must_use]
2232 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2233 self.index.positions_open.contains(position_id)
2234 }
2235
2236 #[must_use]
2238 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2239 self.index.positions_closed.contains(position_id)
2240 }
2241
2242 #[must_use]
2244 pub fn positions_open_count(
2245 &self,
2246 venue: Option<&Venue>,
2247 instrument_id: Option<&InstrumentId>,
2248 strategy_id: Option<&StrategyId>,
2249 side: Option<PositionSide>,
2250 ) -> usize {
2251 self.positions_open(venue, instrument_id, strategy_id, side)
2252 .len()
2253 }
2254
2255 #[must_use]
2257 pub fn positions_closed_count(
2258 &self,
2259 venue: Option<&Venue>,
2260 instrument_id: Option<&InstrumentId>,
2261 strategy_id: Option<&StrategyId>,
2262 side: Option<PositionSide>,
2263 ) -> usize {
2264 self.positions_closed(venue, instrument_id, strategy_id, side)
2265 .len()
2266 }
2267
2268 #[must_use]
2270 pub fn positions_total_count(
2271 &self,
2272 venue: Option<&Venue>,
2273 instrument_id: Option<&InstrumentId>,
2274 strategy_id: Option<&StrategyId>,
2275 side: Option<PositionSide>,
2276 ) -> usize {
2277 self.positions(venue, instrument_id, strategy_id, side)
2278 .len()
2279 }
2280
2281 #[must_use]
2285 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2286 self.index.order_strategy.get(client_order_id)
2287 }
2288
2289 #[must_use]
2291 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2292 self.index.position_strategy.get(position_id)
2293 }
2294
2295 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2299 check_valid_string(key, stringify!(key)).expect(FAILED);
2300
2301 Ok(self.general.get(key))
2302 }
2303
2304 #[must_use]
2308 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2309 match price_type {
2310 PriceType::Bid => self
2311 .quotes
2312 .get(instrument_id)
2313 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2314 PriceType::Ask => self
2315 .quotes
2316 .get(instrument_id)
2317 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2318 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2319 quotes.front().map(|quote| {
2320 Price::new(
2321 (quote.ask_price.as_f64() + quote.bid_price.as_f64()) / 2.0,
2322 quote.bid_price.precision + 1,
2323 )
2324 })
2325 }),
2326 PriceType::Last => self
2327 .trades
2328 .get(instrument_id)
2329 .and_then(|trades| trades.front().map(|trade| trade.price)),
2330 PriceType::Mark => self.mark_prices.get(instrument_id).copied(),
2331 }
2332 }
2333
2334 #[must_use]
2336 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2337 self.quotes
2338 .get(instrument_id)
2339 .map(|quotes| quotes.iter().copied().collect())
2340 }
2341
2342 #[must_use]
2344 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2345 self.trades
2346 .get(instrument_id)
2347 .map(|trades| trades.iter().copied().collect())
2348 }
2349
2350 #[must_use]
2352 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2353 self.bars
2354 .get(bar_type)
2355 .map(|bars| bars.iter().copied().collect())
2356 }
2357
2358 #[must_use]
2360 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
2361 self.books.get(instrument_id)
2362 }
2363
2364 #[must_use]
2366 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
2367 self.books.get_mut(instrument_id)
2368 }
2369
2370 #[must_use]
2372 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
2373 self.quotes
2374 .get(instrument_id)
2375 .and_then(|quotes| quotes.front())
2376 }
2377
2378 #[must_use]
2380 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
2381 self.trades
2382 .get(instrument_id)
2383 .and_then(|trades| trades.front())
2384 }
2385
2386 #[must_use]
2388 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
2389 self.bars.get(bar_type).and_then(|bars| bars.front())
2390 }
2391
2392 #[must_use]
2394 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
2395 self.books.get(instrument_id).map_or(0, |book| book.count) as usize
2396 }
2397
2398 #[must_use]
2400 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
2401 self.quotes
2402 .get(instrument_id)
2403 .map_or(0, std::collections::VecDeque::len)
2404 }
2405
2406 #[must_use]
2408 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
2409 self.trades
2410 .get(instrument_id)
2411 .map_or(0, std::collections::VecDeque::len)
2412 }
2413
2414 #[must_use]
2416 pub fn bar_count(&self, bar_type: &BarType) -> usize {
2417 self.bars
2418 .get(bar_type)
2419 .map_or(0, std::collections::VecDeque::len)
2420 }
2421
2422 #[must_use]
2424 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
2425 self.books.contains_key(instrument_id)
2426 }
2427
2428 #[must_use]
2430 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
2431 self.quote_count(instrument_id) > 0
2432 }
2433
2434 #[must_use]
2436 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
2437 self.trade_count(instrument_id) > 0
2438 }
2439
2440 #[must_use]
2442 pub fn has_bars(&self, bar_type: &BarType) -> bool {
2443 self.bar_count(bar_type) > 0
2444 }
2445
2446 #[must_use]
2447 pub fn get_xrate(
2448 &self,
2449 venue: Venue,
2450 from_currency: Currency,
2451 to_currency: Currency,
2452 price_type: PriceType,
2453 ) -> Option<f64> {
2454 if from_currency == to_currency {
2455 return Some(1.0);
2458 }
2459
2460 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
2461
2462 match get_exchange_rate(
2463 from_currency.code,
2464 to_currency.code,
2465 price_type,
2466 bid_quote,
2467 ask_quote,
2468 ) {
2469 Ok(rate) => rate,
2470 Err(e) => {
2471 log::error!("Failed to calculate xrate: {e}");
2472 None
2473 }
2474 }
2475 }
2476
2477 fn build_quote_table(&self, venue: &Venue) -> (HashMap<String, f64>, HashMap<String, f64>) {
2478 let mut bid_quotes = HashMap::new();
2479 let mut ask_quotes = HashMap::new();
2480
2481 for instrument_id in self.instruments.keys() {
2482 if instrument_id.venue != *venue {
2483 continue;
2484 }
2485
2486 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
2487 if let Some(tick) = ticks.front() {
2488 (tick.bid_price, tick.ask_price)
2489 } else {
2490 continue; }
2492 } else {
2493 let bid_bar = self
2494 .bars
2495 .iter()
2496 .find(|(k, _)| {
2497 k.instrument_id() == *instrument_id
2498 && matches!(k.spec().price_type, PriceType::Bid)
2499 })
2500 .map(|(_, v)| v);
2501
2502 let ask_bar = self
2503 .bars
2504 .iter()
2505 .find(|(k, _)| {
2506 k.instrument_id() == *instrument_id
2507 && matches!(k.spec().price_type, PriceType::Ask)
2508 })
2509 .map(|(_, v)| v);
2510
2511 match (bid_bar, ask_bar) {
2512 (Some(bid), Some(ask)) => {
2513 let bid_price = bid.front().unwrap().close;
2514 let ask_price = ask.front().unwrap().close;
2515
2516 (bid_price, ask_price)
2517 }
2518 _ => continue,
2519 }
2520 };
2521
2522 bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
2523 ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
2524 }
2525
2526 (bid_quotes, ask_quotes)
2527 }
2528
2529 #[must_use]
2531 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
2532 self.mark_xrates.get(&(from_currency, to_currency)).copied()
2533 }
2534
2535 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
2541 assert!(xrate > 0.0, "xrate was zero");
2542 self.mark_xrates.insert((from_currency, to_currency), xrate);
2543 self.mark_xrates
2544 .insert((to_currency, from_currency), 1.0 / xrate);
2545 }
2546
2547 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
2549 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
2550 }
2551
2552 pub fn clear_mark_xrates(&mut self) {
2554 self.mark_xrates.clear();
2555 }
2556
2557 #[must_use]
2561 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
2562 self.instruments.get(instrument_id)
2563 }
2564
2565 #[must_use]
2567 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
2568 self.instruments
2569 .keys()
2570 .filter(|i| venue.is_none() || &i.venue == venue.unwrap())
2571 .collect()
2572 }
2573
2574 #[must_use]
2576 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
2577 self.instruments
2578 .values()
2579 .filter(|i| &i.id().venue == venue)
2580 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(u)))
2581 .collect()
2582 }
2583
2584 #[must_use]
2586 pub fn bar_types(
2587 &self,
2588 instrument_id: Option<&InstrumentId>,
2589 price_type: Option<&PriceType>,
2590 aggregation_source: AggregationSource,
2591 ) -> Vec<&BarType> {
2592 let mut bar_types = self
2593 .bars
2594 .keys()
2595 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
2596 .collect::<Vec<&BarType>>();
2597
2598 if let Some(instrument_id) = instrument_id {
2599 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
2600 }
2601
2602 if let Some(price_type) = price_type {
2603 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
2604 }
2605
2606 bar_types
2607 }
2608
2609 #[must_use]
2613 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
2614 self.synthetics.get(instrument_id)
2615 }
2616
2617 #[must_use]
2619 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
2620 self.synthetics.keys().collect()
2621 }
2622
2623 #[must_use]
2625 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
2626 self.synthetics.values().collect()
2627 }
2628
2629 #[must_use]
2633 pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
2634 self.accounts.get(account_id)
2635 }
2636
2637 #[must_use]
2639 pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
2640 self.index
2641 .venue_account
2642 .get(venue)
2643 .and_then(|account_id| self.accounts.get(account_id))
2644 }
2645
2646 #[must_use]
2648 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
2649 self.index.venue_account.get(venue)
2650 }
2651
2652 #[must_use]
2654 pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
2655 self.accounts
2656 .values()
2657 .filter(|account| &account.id() == account_id)
2658 .collect()
2659 }
2660}