1#![allow(dead_code)]
20#![allow(unused_variables)]
21
22pub mod database;
23
24#[cfg(test)]
25mod tests;
26
27use std::{
28 collections::{HashMap, HashSet, VecDeque},
29 time::{SystemTime, UNIX_EPOCH},
30};
31
32use bytes::Bytes;
33use database::CacheDatabaseAdapter;
34use nautilus_core::{
35 correctness::{
36 check_key_not_in_map, check_predicate_false, check_slice_not_empty, check_valid_string,
37 FAILED,
38 },
39 UUID4,
40};
41use nautilus_model::{
42 accounts::AccountAny,
43 data::{Bar, BarType, QuoteTick, TradeTick},
44 enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
45 identifiers::{
46 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
47 OrderListId, PositionId, StrategyId, Symbol, Venue, VenueOrderId,
48 },
49 instruments::{InstrumentAny, SyntheticInstrument},
50 orderbook::OrderBook,
51 orders::{OrderAny, OrderList},
52 position::Position,
53 types::{Currency, Money, Price, Quantity},
54};
55use rust_decimal::Decimal;
56use serde::{Deserialize, Serialize};
57use ustr::Ustr;
58
59use crate::{
60 enums::SerializationEncoding, msgbus::database::DatabaseConfig, xrate::get_exchange_rate,
61};
62
63#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(default)]
66pub struct CacheConfig {
67 pub database: Option<DatabaseConfig>,
69 pub encoding: SerializationEncoding,
71 pub timestamps_as_iso8601: bool,
73 pub buffer_interval_ms: Option<usize>,
75 pub use_trader_prefix: bool,
77 pub use_instance_id: bool,
79 pub flush_on_start: bool,
81 pub drop_instruments_on_reset: bool,
83 pub tick_capacity: usize,
85 pub bar_capacity: usize,
87 pub save_market_data: bool,
89}
90
91impl Default for CacheConfig {
92 fn default() -> Self {
94 Self {
95 database: None,
96 encoding: SerializationEncoding::MsgPack,
97 timestamps_as_iso8601: false,
98 buffer_interval_ms: None,
99 use_trader_prefix: true,
100 use_instance_id: false,
101 flush_on_start: false,
102 drop_instruments_on_reset: true,
103 tick_capacity: 10_000,
104 bar_capacity: 10_000,
105 save_market_data: false,
106 }
107 }
108}
109
110impl CacheConfig {
111 #[allow(clippy::too_many_arguments)]
113 #[must_use]
114 pub const fn new(
115 database: Option<DatabaseConfig>,
116 encoding: SerializationEncoding,
117 timestamps_as_iso8601: bool,
118 buffer_interval_ms: Option<usize>,
119 use_trader_prefix: bool,
120 use_instance_id: bool,
121 flush_on_start: bool,
122 drop_instruments_on_reset: bool,
123 tick_capacity: usize,
124 bar_capacity: usize,
125 save_market_data: bool,
126 ) -> Self {
127 Self {
128 database,
129 encoding,
130 timestamps_as_iso8601,
131 buffer_interval_ms,
132 use_trader_prefix,
133 use_instance_id,
134 flush_on_start,
135 drop_instruments_on_reset,
136 tick_capacity,
137 bar_capacity,
138 save_market_data,
139 }
140 }
141}
142
143#[derive(Debug)]
145pub struct CacheIndex {
146 venue_account: HashMap<Venue, AccountId>,
147 venue_orders: HashMap<Venue, HashSet<ClientOrderId>>,
148 venue_positions: HashMap<Venue, HashSet<PositionId>>,
149 venue_order_ids: HashMap<VenueOrderId, ClientOrderId>,
150 client_order_ids: HashMap<ClientOrderId, VenueOrderId>,
151 order_position: HashMap<ClientOrderId, PositionId>,
152 order_strategy: HashMap<ClientOrderId, StrategyId>,
153 order_client: HashMap<ClientOrderId, ClientId>,
154 position_strategy: HashMap<PositionId, StrategyId>,
155 position_orders: HashMap<PositionId, HashSet<ClientOrderId>>,
156 instrument_orders: HashMap<InstrumentId, HashSet<ClientOrderId>>,
157 instrument_positions: HashMap<InstrumentId, HashSet<PositionId>>,
158 strategy_orders: HashMap<StrategyId, HashSet<ClientOrderId>>,
159 strategy_positions: HashMap<StrategyId, HashSet<PositionId>>,
160 exec_algorithm_orders: HashMap<ExecAlgorithmId, HashSet<ClientOrderId>>,
161 exec_spawn_orders: HashMap<ClientOrderId, HashSet<ClientOrderId>>,
162 orders: HashSet<ClientOrderId>,
163 orders_open: HashSet<ClientOrderId>,
164 orders_closed: HashSet<ClientOrderId>,
165 orders_emulated: HashSet<ClientOrderId>,
166 orders_inflight: HashSet<ClientOrderId>,
167 orders_pending_cancel: HashSet<ClientOrderId>,
168 positions: HashSet<PositionId>,
169 positions_open: HashSet<PositionId>,
170 positions_closed: HashSet<PositionId>,
171 actors: HashSet<ComponentId>,
172 strategies: HashSet<StrategyId>,
173 exec_algorithms: HashSet<ExecAlgorithmId>,
174}
175
176impl CacheIndex {
177 pub fn clear(&mut self) {
179 self.venue_account.clear();
180 self.venue_orders.clear();
181 self.venue_positions.clear();
182 self.venue_order_ids.clear();
183 self.client_order_ids.clear();
184 self.order_position.clear();
185 self.order_strategy.clear();
186 self.order_client.clear();
187 self.position_strategy.clear();
188 self.position_orders.clear();
189 self.instrument_orders.clear();
190 self.instrument_positions.clear();
191 self.strategy_orders.clear();
192 self.strategy_positions.clear();
193 self.exec_algorithm_orders.clear();
194 self.exec_spawn_orders.clear();
195 self.orders.clear();
196 self.orders_open.clear();
197 self.orders_closed.clear();
198 self.orders_emulated.clear();
199 self.orders_inflight.clear();
200 self.orders_pending_cancel.clear();
201 self.positions.clear();
202 self.positions_open.clear();
203 self.positions_closed.clear();
204 self.actors.clear();
205 self.strategies.clear();
206 self.exec_algorithms.clear();
207 }
208}
209
210pub struct Cache {
212 config: CacheConfig,
213 index: CacheIndex,
214 database: Option<Box<dyn CacheDatabaseAdapter>>,
215 general: HashMap<String, Bytes>,
216 quotes: HashMap<InstrumentId, VecDeque<QuoteTick>>,
217 trades: HashMap<InstrumentId, VecDeque<TradeTick>>,
218 books: HashMap<InstrumentId, OrderBook>,
219 bars: HashMap<BarType, VecDeque<Bar>>,
220 currencies: HashMap<Ustr, Currency>,
221 instruments: HashMap<InstrumentId, InstrumentAny>,
222 synthetics: HashMap<InstrumentId, SyntheticInstrument>,
223 accounts: HashMap<AccountId, AccountAny>,
224 orders: HashMap<ClientOrderId, OrderAny>,
225 order_lists: HashMap<OrderListId, OrderList>,
226 positions: HashMap<PositionId, Position>,
227 position_snapshots: HashMap<PositionId, Bytes>,
228}
229
230unsafe impl Send for Cache {}
232unsafe impl Sync for Cache {}
233
234impl Default for Cache {
235 fn default() -> Self {
237 Self::new(Some(CacheConfig::default()), None)
238 }
239}
240
241impl Cache {
242 #[must_use]
244 pub fn new(
245 config: Option<CacheConfig>,
246 database: Option<Box<dyn CacheDatabaseAdapter>>,
247 ) -> Self {
248 let index = CacheIndex {
249 venue_account: HashMap::new(),
250 venue_orders: HashMap::new(),
251 venue_positions: HashMap::new(),
252 venue_order_ids: HashMap::new(),
253 client_order_ids: HashMap::new(),
254 order_position: HashMap::new(),
255 order_strategy: HashMap::new(),
256 order_client: HashMap::new(),
257 position_strategy: HashMap::new(),
258 position_orders: HashMap::new(),
259 instrument_orders: HashMap::new(),
260 instrument_positions: HashMap::new(),
261 strategy_orders: HashMap::new(),
262 strategy_positions: HashMap::new(),
263 exec_algorithm_orders: HashMap::new(),
264 exec_spawn_orders: HashMap::new(),
265 orders: HashSet::new(),
266 orders_open: HashSet::new(),
267 orders_closed: HashSet::new(),
268 orders_emulated: HashSet::new(),
269 orders_inflight: HashSet::new(),
270 orders_pending_cancel: HashSet::new(),
271 positions: HashSet::new(),
272 positions_open: HashSet::new(),
273 positions_closed: HashSet::new(),
274 actors: HashSet::new(),
275 strategies: HashSet::new(),
276 exec_algorithms: HashSet::new(),
277 };
278
279 Self {
280 config: config.unwrap_or_default(),
281 index,
282 database,
283 general: HashMap::new(),
284 quotes: HashMap::new(),
285 trades: HashMap::new(),
286 books: HashMap::new(),
287 bars: HashMap::new(),
288 currencies: HashMap::new(),
289 instruments: HashMap::new(),
290 synthetics: HashMap::new(),
291 accounts: HashMap::new(),
292 orders: HashMap::new(),
293 order_lists: HashMap::new(),
294 positions: HashMap::new(),
295 position_snapshots: HashMap::new(),
296 }
297 }
298
299 #[must_use]
301 pub fn memory_address(&self) -> String {
302 format!("{:?}", std::ptr::from_ref(self))
303 }
304
305 pub fn cache_general(&mut self) -> anyhow::Result<()> {
309 self.general = match &mut self.database {
310 Some(db) => db.load()?,
311 None => HashMap::new(),
312 };
313
314 log::info!(
315 "Cached {} general object(s) from database",
316 self.general.len()
317 );
318 Ok(())
319 }
320
321 pub fn cache_currencies(&mut self) -> anyhow::Result<()> {
323 self.currencies = match &mut self.database {
324 Some(db) => db.load_currencies()?,
325 None => HashMap::new(),
326 };
327
328 log::info!("Cached {} currencies from database", self.general.len());
329 Ok(())
330 }
331
332 pub fn cache_instruments(&mut self) -> anyhow::Result<()> {
334 self.instruments = match &mut self.database {
335 Some(db) => db.load_instruments()?,
336 None => HashMap::new(),
337 };
338
339 log::info!("Cached {} instruments from database", self.general.len());
340 Ok(())
341 }
342
343 pub fn cache_synthetics(&mut self) -> anyhow::Result<()> {
346 self.synthetics = match &mut self.database {
347 Some(db) => db.load_synthetics()?,
348 None => HashMap::new(),
349 };
350
351 log::info!(
352 "Cached {} synthetic instruments from database",
353 self.general.len()
354 );
355 Ok(())
356 }
357
358 pub fn cache_accounts(&mut self) -> anyhow::Result<()> {
360 self.accounts = match &mut self.database {
361 Some(db) => db.load_accounts()?,
362 None => HashMap::new(),
363 };
364
365 log::info!(
366 "Cached {} synthetic instruments from database",
367 self.general.len()
368 );
369 Ok(())
370 }
371
372 pub fn cache_orders(&mut self) -> anyhow::Result<()> {
374 self.orders = match &mut self.database {
375 Some(db) => db.load_orders()?,
376 None => HashMap::new(),
377 };
378
379 log::info!("Cached {} orders from database", self.general.len());
380 Ok(())
381 }
382
383 pub fn cache_positions(&mut self) -> anyhow::Result<()> {
385 self.positions = match &mut self.database {
386 Some(db) => db.load_positions()?,
387 None => HashMap::new(),
388 };
389
390 log::info!("Cached {} positions from database", self.general.len());
391 Ok(())
392 }
393
394 pub fn build_index(&mut self) {
396 self.index.clear();
397 log::debug!("Building index");
398
399 for account_id in self.accounts.keys() {
401 self.index
402 .venue_account
403 .insert(account_id.get_issuer(), *account_id);
404 }
405
406 for (client_order_id, order) in &self.orders {
408 let instrument_id = order.instrument_id();
409 let venue = instrument_id.venue;
410 let strategy_id = order.strategy_id();
411
412 self.index
414 .venue_orders
415 .entry(venue)
416 .or_default()
417 .insert(*client_order_id);
418
419 if let Some(venue_order_id) = order.venue_order_id() {
421 self.index
422 .venue_order_ids
423 .insert(venue_order_id, *client_order_id);
424 }
425
426 if let Some(position_id) = order.position_id() {
428 self.index
429 .order_position
430 .insert(*client_order_id, position_id);
431 }
432
433 self.index
435 .order_strategy
436 .insert(*client_order_id, order.strategy_id());
437
438 self.index
440 .instrument_orders
441 .entry(instrument_id)
442 .or_default()
443 .insert(*client_order_id);
444
445 self.index
447 .strategy_orders
448 .entry(strategy_id)
449 .or_default()
450 .insert(*client_order_id);
451
452 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
454 self.index
455 .exec_algorithm_orders
456 .entry(exec_algorithm_id)
457 .or_default()
458 .insert(*client_order_id);
459 }
460
461 if let Some(exec_spawn_id) = order.exec_spawn_id() {
463 self.index
464 .exec_spawn_orders
465 .entry(exec_spawn_id)
466 .or_default()
467 .insert(*client_order_id);
468 }
469
470 self.index.orders.insert(*client_order_id);
472
473 if order.is_open() {
475 self.index.orders_open.insert(*client_order_id);
476 }
477
478 if order.is_closed() {
480 self.index.orders_closed.insert(*client_order_id);
481 }
482
483 if let Some(emulation_trigger) = order.emulation_trigger() {
485 if emulation_trigger != TriggerType::NoTrigger && !order.is_closed() {
486 self.index.orders_emulated.insert(*client_order_id);
487 }
488 }
489
490 if order.is_inflight() {
492 self.index.orders_inflight.insert(*client_order_id);
493 }
494
495 self.index.strategies.insert(strategy_id);
497
498 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
500 self.index.exec_algorithms.insert(exec_algorithm_id);
501 }
502 }
503
504 for (position_id, position) in &self.positions {
506 let instrument_id = position.instrument_id;
507 let venue = instrument_id.venue;
508 let strategy_id = position.strategy_id;
509
510 self.index
512 .venue_positions
513 .entry(venue)
514 .or_default()
515 .insert(*position_id);
516
517 self.index
519 .position_strategy
520 .insert(*position_id, position.strategy_id);
521
522 self.index
524 .position_orders
525 .entry(*position_id)
526 .or_default()
527 .extend(position.client_order_ids().into_iter());
528
529 self.index
531 .instrument_positions
532 .entry(instrument_id)
533 .or_default()
534 .insert(*position_id);
535
536 self.index
538 .strategy_positions
539 .entry(strategy_id)
540 .or_default()
541 .insert(*position_id);
542
543 self.index.positions.insert(*position_id);
545
546 if position.is_open() {
548 self.index.positions_open.insert(*position_id);
549 }
550
551 if position.is_closed() {
553 self.index.positions_closed.insert(*position_id);
554 }
555
556 self.index.strategies.insert(strategy_id);
558 }
559 }
560
561 #[must_use]
563 pub const fn has_backing(&self) -> bool {
564 self.config.database.is_some()
565 }
566
567 #[must_use]
569 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
570 let quote = if let Some(quote) = self.quote(&position.instrument_id) {
571 quote
572 } else {
573 log::warn!(
574 "Cannot calculate unrealized PnL for {}, no quotes for {}",
575 position.id,
576 position.instrument_id
577 );
578 return None;
579 };
580
581 let last = match position.side {
582 PositionSide::Flat | PositionSide::NoPositionSide => {
583 return Some(Money::new(0.0, position.settlement_currency));
584 }
585 PositionSide::Long => quote.ask_price,
586 PositionSide::Short => quote.bid_price,
587 };
588
589 Some(position.unrealized_pnl(last))
590 }
591
592 #[must_use]
597 pub fn check_integrity(&mut self) -> bool {
598 let mut error_count = 0;
599 let failure = "Integrity failure";
600
601 let timestamp_us = SystemTime::now()
603 .duration_since(UNIX_EPOCH)
604 .expect("Time went backwards")
605 .as_micros();
606
607 log::info!("Checking data integrity");
608
609 for account_id in self.accounts.keys() {
611 if !self
612 .index
613 .venue_account
614 .contains_key(&account_id.get_issuer())
615 {
616 log::error!(
617 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
618 );
619 error_count += 1;
620 }
621 }
622
623 for (client_order_id, order) in &self.orders {
624 if !self.index.order_strategy.contains_key(client_order_id) {
625 log::error!(
626 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
627 );
628 error_count += 1;
629 }
630 if !self.index.orders.contains(client_order_id) {
631 log::error!(
632 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
633 );
634 error_count += 1;
635 }
636 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
637 log::error!(
638 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
639 );
640 error_count += 1;
641 }
642 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
643 log::error!(
644 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
645 );
646 error_count += 1;
647 }
648 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
649 log::error!(
650 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
651 );
652 error_count += 1;
653 }
654 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
655 if !self
656 .index
657 .exec_algorithm_orders
658 .contains_key(&exec_algorithm_id)
659 {
660 log::error!(
661 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
662 );
663 error_count += 1;
664 }
665 if order.exec_spawn_id().is_none()
666 && !self.index.exec_spawn_orders.contains_key(client_order_id)
667 {
668 log::error!(
669 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
670 );
671 error_count += 1;
672 }
673 }
674 }
675
676 for (position_id, position) in &self.positions {
677 if !self.index.position_strategy.contains_key(position_id) {
678 log::error!(
679 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
680 );
681 error_count += 1;
682 }
683 if !self.index.position_orders.contains_key(position_id) {
684 log::error!(
685 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
686 );
687 error_count += 1;
688 }
689 if !self.index.positions.contains(position_id) {
690 log::error!(
691 "{failure} in positions: {position_id} not found in `self.index.positions`",
692 );
693 error_count += 1;
694 }
695 if position.is_open() && !self.index.positions_open.contains(position_id) {
696 log::error!(
697 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
698 );
699 error_count += 1;
700 }
701 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
702 log::error!(
703 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
704 );
705 error_count += 1;
706 }
707 }
708
709 for account_id in self.index.venue_account.values() {
711 if !self.accounts.contains_key(account_id) {
712 log::error!(
713 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
714 );
715 error_count += 1;
716 }
717 }
718
719 for client_order_id in self.index.venue_order_ids.values() {
720 if !self.orders.contains_key(client_order_id) {
721 log::error!(
722 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
723 );
724 error_count += 1;
725 }
726 }
727
728 for client_order_id in self.index.client_order_ids.keys() {
729 if !self.orders.contains_key(client_order_id) {
730 log::error!(
731 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
732 );
733 error_count += 1;
734 }
735 }
736
737 for client_order_id in self.index.order_position.keys() {
738 if !self.orders.contains_key(client_order_id) {
739 log::error!(
740 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
741 );
742 error_count += 1;
743 }
744 }
745
746 for client_order_id in self.index.order_strategy.keys() {
748 if !self.orders.contains_key(client_order_id) {
749 log::error!(
750 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
751 );
752 error_count += 1;
753 }
754 }
755
756 for position_id in self.index.position_strategy.keys() {
757 if !self.positions.contains_key(position_id) {
758 log::error!(
759 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
760 );
761 error_count += 1;
762 }
763 }
764
765 for position_id in self.index.position_orders.keys() {
766 if !self.positions.contains_key(position_id) {
767 log::error!(
768 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
769 );
770 error_count += 1;
771 }
772 }
773
774 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
775 for client_order_id in client_order_ids {
776 if !self.orders.contains_key(client_order_id) {
777 log::error!(
778 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
779 );
780 error_count += 1;
781 }
782 }
783 }
784
785 for instrument_id in self.index.instrument_positions.keys() {
786 if !self.index.instrument_orders.contains_key(instrument_id) {
787 log::error!(
788 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
789 );
790 error_count += 1;
791 }
792 }
793
794 for client_order_ids in self.index.strategy_orders.values() {
795 for client_order_id in client_order_ids {
796 if !self.orders.contains_key(client_order_id) {
797 log::error!(
798 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
799 );
800 error_count += 1;
801 }
802 }
803 }
804
805 for position_ids in self.index.strategy_positions.values() {
806 for position_id in position_ids {
807 if !self.positions.contains_key(position_id) {
808 log::error!(
809 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
810 );
811 error_count += 1;
812 }
813 }
814 }
815
816 for client_order_id in &self.index.orders {
817 if !self.orders.contains_key(client_order_id) {
818 log::error!(
819 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
820 );
821 error_count += 1;
822 }
823 }
824
825 for client_order_id in &self.index.orders_emulated {
826 if !self.orders.contains_key(client_order_id) {
827 log::error!(
828 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
829 );
830 error_count += 1;
831 }
832 }
833
834 for client_order_id in &self.index.orders_inflight {
835 if !self.orders.contains_key(client_order_id) {
836 log::error!(
837 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
838 );
839 error_count += 1;
840 }
841 }
842
843 for client_order_id in &self.index.orders_open {
844 if !self.orders.contains_key(client_order_id) {
845 log::error!(
846 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
847 );
848 error_count += 1;
849 }
850 }
851
852 for client_order_id in &self.index.orders_closed {
853 if !self.orders.contains_key(client_order_id) {
854 log::error!(
855 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
856 );
857 error_count += 1;
858 }
859 }
860
861 for position_id in &self.index.positions {
862 if !self.positions.contains_key(position_id) {
863 log::error!(
864 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
865 );
866 error_count += 1;
867 }
868 }
869
870 for position_id in &self.index.positions_open {
871 if !self.positions.contains_key(position_id) {
872 log::error!(
873 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
874 );
875 error_count += 1;
876 }
877 }
878
879 for position_id in &self.index.positions_closed {
880 if !self.positions.contains_key(position_id) {
881 log::error!(
882 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
883 );
884 error_count += 1;
885 }
886 }
887
888 for strategy_id in &self.index.strategies {
889 if !self.index.strategy_orders.contains_key(strategy_id) {
890 log::error!(
891 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
892 );
893 error_count += 1;
894 }
895 }
896
897 for exec_algorithm_id in &self.index.exec_algorithms {
898 if !self
899 .index
900 .exec_algorithm_orders
901 .contains_key(exec_algorithm_id)
902 {
903 log::error!(
904 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
905 );
906 error_count += 1;
907 }
908 }
909
910 let total_us = SystemTime::now()
911 .duration_since(UNIX_EPOCH)
912 .expect("Time went backwards")
913 .as_micros()
914 - timestamp_us;
915
916 if error_count == 0 {
917 log::info!("Integrity check passed in {total_us}μs");
918 true
919 } else {
920 log::error!(
921 "Integrity check failed with {error_count} error{} in {total_us}μs",
922 if error_count == 1 { "" } else { "s" },
923 );
924 false
925 }
926 }
927
928 #[must_use]
932 pub fn check_residuals(&self) -> bool {
933 log::debug!("Checking residuals");
934
935 let mut residuals = false;
936
937 for order in self.orders_open(None, None, None, None) {
939 residuals = true;
940 log::warn!("Residual {order:?}");
941 }
942
943 for position in self.positions_open(None, None, None, None) {
945 residuals = true;
946 log::warn!("Residual {position}");
947 }
948
949 residuals
950 }
951
952 pub fn clear_index(&mut self) {
954 self.index.clear();
955 log::debug!("Cleared index");
956 }
957
958 pub fn reset(&mut self) {
962 log::debug!("Resetting cache");
963
964 self.general.clear();
965 self.quotes.clear();
966 self.trades.clear();
967 self.books.clear();
968 self.bars.clear();
969 self.currencies.clear();
970 self.instruments.clear();
971 self.synthetics.clear();
972 self.accounts.clear();
973 self.orders.clear();
974 self.order_lists.clear();
975 self.positions.clear();
976 self.position_snapshots.clear();
977
978 self.clear_index();
979
980 log::info!("Reset cache");
981 }
982
983 pub fn dispose(&mut self) {
985 if let Some(database) = &mut self.database {
986 database.close().expect("Failed to close database");
987 }
988 }
989
990 pub fn flush_db(&mut self) {
992 if let Some(database) = &mut self.database {
993 database.flush().expect("Failed to flush database");
994 }
995 }
996
997 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1002 check_valid_string(key, stringify!(key)).expect(FAILED);
1003 check_predicate_false(value.is_empty(), stringify!(value)).expect(FAILED);
1004
1005 log::debug!("Adding general {key}");
1006 self.general.insert(key.to_string(), value.clone());
1007
1008 if let Some(database) = &mut self.database {
1009 database.add(key.to_string(), value)?;
1010 }
1011 Ok(())
1012 }
1013
1014 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1016 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1017
1018 if self.config.save_market_data {
1019 if let Some(database) = &mut self.database {
1020 database.add_order_book(&book)?;
1021 }
1022 }
1023
1024 self.books.insert(book.instrument_id, book);
1025 Ok(())
1026 }
1027
1028 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1030 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1031
1032 if self.config.save_market_data {
1033 if let Some(database) = &mut self.database {
1034 database.add_quote("e)?;
1035 }
1036 }
1037
1038 let quotes_deque = self
1039 .quotes
1040 .entry(quote.instrument_id)
1041 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1042 quotes_deque.push_front(quote);
1043 Ok(())
1044 }
1045
1046 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1048 check_slice_not_empty(quotes, stringify!(quotes)).unwrap();
1049
1050 let instrument_id = quotes[0].instrument_id;
1051 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1052
1053 if self.config.save_market_data {
1054 if let Some(database) = &mut self.database {
1055 for quote in quotes {
1056 database.add_quote(quote).unwrap();
1057 }
1058 }
1059 }
1060
1061 let quotes_deque = self
1062 .quotes
1063 .entry(instrument_id)
1064 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1065
1066 for quote in quotes {
1067 quotes_deque.push_front(*quote);
1068 }
1069 Ok(())
1070 }
1071
1072 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1074 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1075
1076 if self.config.save_market_data {
1077 if let Some(database) = &mut self.database {
1078 database.add_trade(&trade)?;
1079 }
1080 }
1081
1082 let trades_deque = self
1083 .trades
1084 .entry(trade.instrument_id)
1085 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1086 trades_deque.push_front(trade);
1087 Ok(())
1088 }
1089
1090 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1092 check_slice_not_empty(trades, stringify!(trades)).unwrap();
1093
1094 let instrument_id = trades[0].instrument_id;
1095 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1096
1097 if self.config.save_market_data {
1098 if let Some(database) = &mut self.database {
1099 for trade in trades {
1100 database.add_trade(trade).unwrap();
1101 }
1102 }
1103 }
1104
1105 let trades_deque = self
1106 .trades
1107 .entry(instrument_id)
1108 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1109
1110 for trade in trades {
1111 trades_deque.push_front(*trade);
1112 }
1113 Ok(())
1114 }
1115
1116 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1118 log::debug!("Adding `Bar` {}", bar.bar_type);
1119
1120 if self.config.save_market_data {
1121 if let Some(database) = &mut self.database {
1122 database.add_bar(&bar)?;
1123 }
1124 }
1125
1126 let bars = self
1127 .bars
1128 .entry(bar.bar_type)
1129 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1130 bars.push_front(bar);
1131 Ok(())
1132 }
1133
1134 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1136 check_slice_not_empty(bars, stringify!(bars)).unwrap();
1137
1138 let bar_type = bars[0].bar_type;
1139 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1140
1141 if self.config.save_market_data {
1142 if let Some(database) = &mut self.database {
1143 for bar in bars {
1144 database.add_bar(bar).unwrap();
1145 }
1146 }
1147 }
1148
1149 let bars_deque = self
1150 .bars
1151 .entry(bar_type)
1152 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1153
1154 for bar in bars {
1155 bars_deque.push_front(*bar);
1156 }
1157 Ok(())
1158 }
1159
1160 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1162 log::debug!("Adding `Currency` {}", currency.code);
1163
1164 if let Some(database) = &mut self.database {
1165 database.add_currency(¤cy)?;
1166 }
1167
1168 self.currencies.insert(currency.code, currency);
1169 Ok(())
1170 }
1171
1172 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1174 log::debug!("Adding `Instrument` {}", instrument.id());
1175
1176 if let Some(database) = &mut self.database {
1177 database.add_instrument(&instrument)?;
1178 }
1179
1180 self.instruments.insert(instrument.id(), instrument);
1181 Ok(())
1182 }
1183
1184 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1186 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1187
1188 if let Some(database) = &mut self.database {
1189 database.add_synthetic(&synthetic)?;
1190 }
1191
1192 self.synthetics.insert(synthetic.id, synthetic);
1193 Ok(())
1194 }
1195
1196 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1198 log::debug!("Adding `Account` {}", account.id());
1199
1200 if let Some(database) = &mut self.database {
1201 database.add_account(&account)?;
1202 }
1203
1204 let account_id = account.id();
1205 self.accounts.insert(account_id, account);
1206 self.index
1207 .venue_account
1208 .insert(account_id.get_issuer(), account_id);
1209 Ok(())
1210 }
1211
1212 pub fn add_venue_order_id(
1216 &mut self,
1217 client_order_id: &ClientOrderId,
1218 venue_order_id: &VenueOrderId,
1219 overwrite: bool,
1220 ) -> anyhow::Result<()> {
1221 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id) {
1222 if !overwrite && existing_venue_order_id != venue_order_id {
1223 anyhow::bail!(
1224 "Existing {existing_venue_order_id} for {client_order_id}
1225 did not match the given {venue_order_id}.
1226 If you are writing a test then try a different `venue_order_id`,
1227 otherwise this is probably a bug."
1228 );
1229 }
1230 }
1231
1232 self.index
1233 .client_order_ids
1234 .insert(*client_order_id, *venue_order_id);
1235 self.index
1236 .venue_order_ids
1237 .insert(*venue_order_id, *client_order_id);
1238
1239 Ok(())
1240 }
1241
1242 pub fn add_order(
1255 &mut self,
1256 order: OrderAny,
1257 position_id: Option<PositionId>,
1258 client_id: Option<ClientId>,
1259 replace_existing: bool,
1260 ) -> anyhow::Result<()> {
1261 let instrument_id = order.instrument_id();
1262 let venue = instrument_id.venue;
1263 let client_order_id = order.client_order_id();
1264 let strategy_id = order.strategy_id();
1265 let exec_algorithm_id = order.exec_algorithm_id();
1266 let exec_spawn_id = order.exec_spawn_id();
1267
1268 if !replace_existing {
1269 check_key_not_in_map(
1270 &client_order_id,
1271 &self.orders,
1272 stringify!(client_order_id),
1273 stringify!(orders),
1274 )
1275 .expect(FAILED);
1276 check_key_not_in_map(
1277 &client_order_id,
1278 &self.orders,
1279 stringify!(client_order_id),
1280 stringify!(orders),
1281 )
1282 .expect(FAILED);
1283 check_key_not_in_map(
1284 &client_order_id,
1285 &self.orders,
1286 stringify!(client_order_id),
1287 stringify!(orders),
1288 )
1289 .expect(FAILED);
1290 check_key_not_in_map(
1291 &client_order_id,
1292 &self.orders,
1293 stringify!(client_order_id),
1294 stringify!(orders),
1295 )
1296 .expect(FAILED);
1297 }
1298
1299 log::debug!("Adding {order:?}");
1300
1301 self.index.orders.insert(client_order_id);
1302 self.index
1303 .order_strategy
1304 .insert(client_order_id, strategy_id);
1305 self.index.strategies.insert(strategy_id);
1306
1307 self.index
1309 .venue_orders
1310 .entry(venue)
1311 .or_default()
1312 .insert(client_order_id);
1313
1314 self.index
1316 .instrument_orders
1317 .entry(instrument_id)
1318 .or_default()
1319 .insert(client_order_id);
1320
1321 self.index
1323 .strategy_orders
1324 .entry(strategy_id)
1325 .or_default()
1326 .insert(client_order_id);
1327
1328 if let Some(exec_algorithm_id) = exec_algorithm_id {
1330 self.index.exec_algorithms.insert(exec_algorithm_id);
1331
1332 self.index
1333 .exec_algorithm_orders
1334 .entry(exec_algorithm_id)
1335 .or_default()
1336 .insert(client_order_id);
1337
1338 self.index
1339 .exec_spawn_orders
1340 .entry(exec_spawn_id.expect("`exec_spawn_id` is guaranteed to exist"))
1341 .or_default()
1342 .insert(client_order_id);
1343 }
1344
1345 match order.emulation_trigger() {
1347 Some(_) => {
1348 self.index.orders_emulated.remove(&client_order_id);
1349 }
1350 None => {
1351 self.index.orders_emulated.insert(client_order_id);
1352 }
1353 }
1354
1355 if let Some(position_id) = position_id {
1357 self.add_position_id(
1358 &position_id,
1359 &order.instrument_id().venue,
1360 &client_order_id,
1361 &strategy_id,
1362 )?;
1363 }
1364
1365 if let Some(client_id) = client_id {
1367 self.index.order_client.insert(client_order_id, client_id);
1368 log::debug!("Indexed {client_id:?}");
1369 }
1370
1371 if let Some(database) = &mut self.database {
1372 database.add_order(&order, client_id)?;
1373 }
1378
1379 self.orders.insert(client_order_id, order);
1380
1381 Ok(())
1382 }
1383
1384 pub fn add_position_id(
1386 &mut self,
1387 position_id: &PositionId,
1388 venue: &Venue,
1389 client_order_id: &ClientOrderId,
1390 strategy_id: &StrategyId,
1391 ) -> anyhow::Result<()> {
1392 self.index
1393 .order_position
1394 .insert(*client_order_id, *position_id);
1395
1396 if let Some(database) = &mut self.database {
1398 database.index_order_position(*client_order_id, *position_id)?;
1399 }
1400
1401 self.index
1403 .position_strategy
1404 .insert(*position_id, *strategy_id);
1405
1406 self.index
1408 .position_orders
1409 .entry(*position_id)
1410 .or_default()
1411 .insert(*client_order_id);
1412
1413 self.index
1415 .strategy_positions
1416 .entry(*strategy_id)
1417 .or_default()
1418 .insert(*position_id);
1419
1420 Ok(())
1421 }
1422
1423 pub fn add_position(&mut self, position: Position, oms_type: OmsType) -> anyhow::Result<()> {
1425 self.positions.insert(position.id, position.clone());
1426 self.index.positions.insert(position.id);
1427 self.index.positions_open.insert(position.id);
1428
1429 log::debug!("Adding {position}");
1430
1431 self.add_position_id(
1432 &position.id,
1433 &position.instrument_id.venue,
1434 &position.opening_order_id,
1435 &position.strategy_id,
1436 )?;
1437
1438 let venue = position.instrument_id.venue;
1439 let venue_positions = self.index.venue_positions.entry(venue).or_default();
1440 venue_positions.insert(position.id);
1441
1442 let instrument_id = position.instrument_id;
1444 let instrument_positions = self
1445 .index
1446 .instrument_positions
1447 .entry(instrument_id)
1448 .or_default();
1449 instrument_positions.insert(position.id);
1450
1451 if let Some(database) = &mut self.database {
1452 database.add_position(&position)?;
1453 }
1462
1463 Ok(())
1464 }
1465
1466 pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1468 if let Some(database) = &mut self.database {
1469 database.update_account(&account)?;
1470 }
1471 Ok(())
1472 }
1473
1474 pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1476 let client_order_id = order.client_order_id();
1477
1478 if let Some(venue_order_id) = order.venue_order_id() {
1480 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1483 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1485 }
1486 }
1487
1488 if order.is_inflight() {
1490 self.index.orders_inflight.insert(client_order_id);
1491 } else {
1492 self.index.orders_inflight.remove(&client_order_id);
1493 }
1494
1495 if order.is_open() {
1497 self.index.orders_closed.remove(&client_order_id);
1498 self.index.orders_open.insert(client_order_id);
1499 } else if order.is_closed() {
1500 self.index.orders_open.remove(&client_order_id);
1501 self.index.orders_pending_cancel.remove(&client_order_id);
1502 self.index.orders_closed.insert(client_order_id);
1503 }
1504
1505 if let Some(emulation_trigger) = order.emulation_trigger() {
1507 match emulation_trigger {
1508 TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1509 _ => self.index.orders_emulated.insert(client_order_id),
1510 };
1511 }
1512
1513 if let Some(database) = &mut self.database {
1514 database.update_order(order.last_event())?;
1515 }
1520
1521 self.orders.insert(client_order_id, order.clone());
1523
1524 Ok(())
1525 }
1526
1527 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1529 self.index
1530 .orders_pending_cancel
1531 .insert(order.client_order_id());
1532 }
1533
1534 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1536 if position.is_open() {
1538 self.index.positions_open.insert(position.id);
1539 self.index.positions_closed.remove(&position.id);
1540 } else {
1541 self.index.positions_closed.insert(position.id);
1542 self.index.positions_open.remove(&position.id);
1543 }
1544
1545 if let Some(database) = &mut self.database {
1546 database.update_position(position)?;
1547 }
1552 Ok(())
1553 }
1554
1555 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1558 let position_id = position.id;
1559
1560 let mut copied_position = position.clone();
1561 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1562 copied_position.id = PositionId::new(new_id);
1563
1564 let position_serialized = bincode::serialize(&copied_position)?;
1566
1567 let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1568 let new_snapshots = match snapshots {
1569 Some(existing_snapshots) => {
1570 let mut combined = existing_snapshots.to_vec();
1571 combined.extend(position_serialized);
1572 Bytes::from(combined)
1573 }
1574 None => Bytes::from(position_serialized),
1575 };
1576 self.position_snapshots.insert(position_id, new_snapshots);
1577
1578 log::debug!("Snapshot {}", copied_position);
1579 Ok(())
1580 }
1581
1582 pub fn snapshot_position_state(
1583 &mut self,
1584 position: &Position,
1585 open_only: Option<bool>,
1588 ) -> anyhow::Result<()> {
1589 let open_only = open_only.unwrap_or(true);
1590
1591 if open_only && !position.is_open() {
1592 return Ok(());
1593 }
1594
1595 if let Some(database) = &mut self.database {
1596 database.snapshot_position_state(position).map_err(|e| {
1597 log::error!(
1598 "Failed to snapshot position state for {}: {:?}",
1599 position.id,
1600 e
1601 );
1602 e
1603 })?;
1604 } else {
1605 log::warn!(
1606 "Cannot snapshot position state for {} (no database configured)",
1607 position.id
1608 );
1609 }
1610
1611 todo!()
1613 }
1614
1615 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1616 let database = if let Some(database) = &self.database {
1617 database
1618 } else {
1619 log::warn!(
1620 "Cannot snapshot order state for {} (no database configured)",
1621 order.client_order_id()
1622 );
1623 return Ok(());
1624 };
1625
1626 database.snapshot_order_state(order)
1627 }
1628
1629 fn build_order_query_filter_set(
1632 &self,
1633 venue: Option<&Venue>,
1634 instrument_id: Option<&InstrumentId>,
1635 strategy_id: Option<&StrategyId>,
1636 ) -> Option<HashSet<ClientOrderId>> {
1637 let mut query: Option<HashSet<ClientOrderId>> = None;
1638
1639 if let Some(venue) = venue {
1640 query = Some(
1641 self.index
1642 .venue_orders
1643 .get(venue)
1644 .map_or(HashSet::new(), |o| o.iter().copied().collect()),
1645 );
1646 }
1647
1648 if let Some(instrument_id) = instrument_id {
1649 let instrument_orders = self
1650 .index
1651 .instrument_orders
1652 .get(instrument_id)
1653 .map_or(HashSet::new(), |o| o.iter().copied().collect());
1654
1655 if let Some(existing_query) = &mut query {
1656 *existing_query = existing_query
1657 .intersection(&instrument_orders)
1658 .copied()
1659 .collect();
1660 } else {
1661 query = Some(instrument_orders);
1662 }
1663 }
1664
1665 if let Some(strategy_id) = strategy_id {
1666 let strategy_orders = self
1667 .index
1668 .strategy_orders
1669 .get(strategy_id)
1670 .map_or(HashSet::new(), |o| o.iter().copied().collect());
1671
1672 if let Some(existing_query) = &mut query {
1673 *existing_query = existing_query
1674 .intersection(&strategy_orders)
1675 .copied()
1676 .collect();
1677 } else {
1678 query = Some(strategy_orders);
1679 }
1680 }
1681
1682 query
1683 }
1684
1685 fn build_position_query_filter_set(
1686 &self,
1687 venue: Option<&Venue>,
1688 instrument_id: Option<&InstrumentId>,
1689 strategy_id: Option<&StrategyId>,
1690 ) -> Option<HashSet<PositionId>> {
1691 let mut query: Option<HashSet<PositionId>> = None;
1692
1693 if let Some(venue) = venue {
1694 query = Some(
1695 self.index
1696 .venue_positions
1697 .get(venue)
1698 .map_or(HashSet::new(), |p| p.iter().copied().collect()),
1699 );
1700 }
1701
1702 if let Some(instrument_id) = instrument_id {
1703 let instrument_positions = self
1704 .index
1705 .instrument_positions
1706 .get(instrument_id)
1707 .map_or(HashSet::new(), |p| p.iter().copied().collect());
1708
1709 if let Some(existing_query) = query {
1710 query = Some(
1711 existing_query
1712 .intersection(&instrument_positions)
1713 .copied()
1714 .collect(),
1715 );
1716 } else {
1717 query = Some(instrument_positions);
1718 }
1719 }
1720
1721 if let Some(strategy_id) = strategy_id {
1722 let strategy_positions = self
1723 .index
1724 .strategy_positions
1725 .get(strategy_id)
1726 .map_or(HashSet::new(), |p| p.iter().copied().collect());
1727
1728 if let Some(existing_query) = query {
1729 query = Some(
1730 existing_query
1731 .intersection(&strategy_positions)
1732 .copied()
1733 .collect(),
1734 );
1735 } else {
1736 query = Some(strategy_positions);
1737 }
1738 }
1739
1740 query
1741 }
1742
1743 fn get_orders_for_ids(
1744 &self,
1745 client_order_ids: &HashSet<ClientOrderId>,
1746 side: Option<OrderSide>,
1747 ) -> Vec<&OrderAny> {
1748 let side = side.unwrap_or(OrderSide::NoOrderSide);
1749 let mut orders = Vec::new();
1750
1751 for client_order_id in client_order_ids {
1752 let order = self
1753 .orders
1754 .get(client_order_id)
1755 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
1756 if side == OrderSide::NoOrderSide || side == order.order_side() {
1757 orders.push(order);
1758 }
1759 }
1760
1761 orders
1762 }
1763
1764 fn get_positions_for_ids(
1765 &self,
1766 position_ids: &HashSet<PositionId>,
1767 side: Option<PositionSide>,
1768 ) -> Vec<&Position> {
1769 let side = side.unwrap_or(PositionSide::NoPositionSide);
1770 let mut positions = Vec::new();
1771
1772 for position_id in position_ids {
1773 let position = self
1774 .positions
1775 .get(position_id)
1776 .unwrap_or_else(|| panic!("Position {position_id} not found"));
1777 if side == PositionSide::NoPositionSide || side == position.side {
1778 positions.push(position);
1779 }
1780 }
1781
1782 positions
1783 }
1784
1785 #[must_use]
1787 pub fn client_order_ids(
1788 &self,
1789 venue: Option<&Venue>,
1790 instrument_id: Option<&InstrumentId>,
1791 strategy_id: Option<&StrategyId>,
1792 ) -> HashSet<ClientOrderId> {
1793 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1794 match query {
1795 Some(query) => self.index.orders.intersection(&query).copied().collect(),
1796 None => self.index.orders.clone(),
1797 }
1798 }
1799
1800 #[must_use]
1802 pub fn client_order_ids_open(
1803 &self,
1804 venue: Option<&Venue>,
1805 instrument_id: Option<&InstrumentId>,
1806 strategy_id: Option<&StrategyId>,
1807 ) -> HashSet<ClientOrderId> {
1808 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1809 match query {
1810 Some(query) => self
1811 .index
1812 .orders_open
1813 .intersection(&query)
1814 .copied()
1815 .collect(),
1816 None => self.index.orders_open.clone(),
1817 }
1818 }
1819
1820 #[must_use]
1822 pub fn client_order_ids_closed(
1823 &self,
1824 venue: Option<&Venue>,
1825 instrument_id: Option<&InstrumentId>,
1826 strategy_id: Option<&StrategyId>,
1827 ) -> HashSet<ClientOrderId> {
1828 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1829 match query {
1830 Some(query) => self
1831 .index
1832 .orders_closed
1833 .intersection(&query)
1834 .copied()
1835 .collect(),
1836 None => self.index.orders_closed.clone(),
1837 }
1838 }
1839
1840 #[must_use]
1842 pub fn client_order_ids_emulated(
1843 &self,
1844 venue: Option<&Venue>,
1845 instrument_id: Option<&InstrumentId>,
1846 strategy_id: Option<&StrategyId>,
1847 ) -> HashSet<ClientOrderId> {
1848 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1849 match query {
1850 Some(query) => self
1851 .index
1852 .orders_emulated
1853 .intersection(&query)
1854 .copied()
1855 .collect(),
1856 None => self.index.orders_emulated.clone(),
1857 }
1858 }
1859
1860 #[must_use]
1862 pub fn client_order_ids_inflight(
1863 &self,
1864 venue: Option<&Venue>,
1865 instrument_id: Option<&InstrumentId>,
1866 strategy_id: Option<&StrategyId>,
1867 ) -> HashSet<ClientOrderId> {
1868 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1869 match query {
1870 Some(query) => self
1871 .index
1872 .orders_inflight
1873 .intersection(&query)
1874 .copied()
1875 .collect(),
1876 None => self.index.orders_inflight.clone(),
1877 }
1878 }
1879
1880 #[must_use]
1882 pub fn position_ids(
1883 &self,
1884 venue: Option<&Venue>,
1885 instrument_id: Option<&InstrumentId>,
1886 strategy_id: Option<&StrategyId>,
1887 ) -> HashSet<PositionId> {
1888 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
1889 match query {
1890 Some(query) => self.index.positions.intersection(&query).copied().collect(),
1891 None => self.index.positions.clone(),
1892 }
1893 }
1894
1895 #[must_use]
1897 pub fn position_open_ids(
1898 &self,
1899 venue: Option<&Venue>,
1900 instrument_id: Option<&InstrumentId>,
1901 strategy_id: Option<&StrategyId>,
1902 ) -> HashSet<PositionId> {
1903 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
1904 match query {
1905 Some(query) => self
1906 .index
1907 .positions_open
1908 .intersection(&query)
1909 .copied()
1910 .collect(),
1911 None => self.index.positions_open.clone(),
1912 }
1913 }
1914
1915 #[must_use]
1917 pub fn position_closed_ids(
1918 &self,
1919 venue: Option<&Venue>,
1920 instrument_id: Option<&InstrumentId>,
1921 strategy_id: Option<&StrategyId>,
1922 ) -> HashSet<PositionId> {
1923 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
1924 match query {
1925 Some(query) => self
1926 .index
1927 .positions_closed
1928 .intersection(&query)
1929 .copied()
1930 .collect(),
1931 None => self.index.positions_closed.clone(),
1932 }
1933 }
1934
1935 #[must_use]
1937 pub fn actor_ids(&self) -> HashSet<ComponentId> {
1938 self.index.actors.clone()
1939 }
1940
1941 #[must_use]
1943 pub fn strategy_ids(&self) -> HashSet<StrategyId> {
1944 self.index.strategies.clone()
1945 }
1946
1947 #[must_use]
1949 pub fn exec_algorithm_ids(&self) -> HashSet<ExecAlgorithmId> {
1950 self.index.exec_algorithms.clone()
1951 }
1952
1953 #[must_use]
1957 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
1958 self.orders.get(client_order_id)
1959 }
1960
1961 #[must_use]
1963 pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
1964 self.orders.get_mut(client_order_id)
1965 }
1966
1967 #[must_use]
1969 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
1970 self.index.venue_order_ids.get(venue_order_id)
1971 }
1972
1973 #[must_use]
1975 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
1976 self.index.client_order_ids.get(client_order_id)
1977 }
1978
1979 #[must_use]
1981 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
1982 self.index.order_client.get(client_order_id)
1983 }
1984
1985 #[must_use]
1987 pub fn orders(
1988 &self,
1989 venue: Option<&Venue>,
1990 instrument_id: Option<&InstrumentId>,
1991 strategy_id: Option<&StrategyId>,
1992 side: Option<OrderSide>,
1993 ) -> Vec<&OrderAny> {
1994 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
1995 self.get_orders_for_ids(&client_order_ids, side)
1996 }
1997
1998 #[must_use]
2000 pub fn orders_open(
2001 &self,
2002 venue: Option<&Venue>,
2003 instrument_id: Option<&InstrumentId>,
2004 strategy_id: Option<&StrategyId>,
2005 side: Option<OrderSide>,
2006 ) -> Vec<&OrderAny> {
2007 let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2008 self.get_orders_for_ids(&client_order_ids, side)
2009 }
2010
2011 #[must_use]
2013 pub fn orders_closed(
2014 &self,
2015 venue: Option<&Venue>,
2016 instrument_id: Option<&InstrumentId>,
2017 strategy_id: Option<&StrategyId>,
2018 side: Option<OrderSide>,
2019 ) -> Vec<&OrderAny> {
2020 let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2021 self.get_orders_for_ids(&client_order_ids, side)
2022 }
2023
2024 #[must_use]
2026 pub fn orders_emulated(
2027 &self,
2028 venue: Option<&Venue>,
2029 instrument_id: Option<&InstrumentId>,
2030 strategy_id: Option<&StrategyId>,
2031 side: Option<OrderSide>,
2032 ) -> Vec<&OrderAny> {
2033 let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2034 self.get_orders_for_ids(&client_order_ids, side)
2035 }
2036
2037 #[must_use]
2039 pub fn orders_inflight(
2040 &self,
2041 venue: Option<&Venue>,
2042 instrument_id: Option<&InstrumentId>,
2043 strategy_id: Option<&StrategyId>,
2044 side: Option<OrderSide>,
2045 ) -> Vec<&OrderAny> {
2046 let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2047 self.get_orders_for_ids(&client_order_ids, side)
2048 }
2049
2050 #[must_use]
2052 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2053 let client_order_ids = self.index.position_orders.get(position_id);
2054 match client_order_ids {
2055 Some(client_order_ids) => {
2056 self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2057 }
2058 None => Vec::new(),
2059 }
2060 }
2061
2062 #[must_use]
2064 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2065 self.index.orders.contains(client_order_id)
2066 }
2067
2068 #[must_use]
2070 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2071 self.index.orders_open.contains(client_order_id)
2072 }
2073
2074 #[must_use]
2076 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2077 self.index.orders_closed.contains(client_order_id)
2078 }
2079
2080 #[must_use]
2082 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2083 self.index.orders_emulated.contains(client_order_id)
2084 }
2085
2086 #[must_use]
2088 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2089 self.index.orders_inflight.contains(client_order_id)
2090 }
2091
2092 #[must_use]
2094 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2095 self.index.orders_pending_cancel.contains(client_order_id)
2096 }
2097
2098 #[must_use]
2100 pub fn orders_open_count(
2101 &self,
2102 venue: Option<&Venue>,
2103 instrument_id: Option<&InstrumentId>,
2104 strategy_id: Option<&StrategyId>,
2105 side: Option<OrderSide>,
2106 ) -> usize {
2107 self.orders_open(venue, instrument_id, strategy_id, side)
2108 .len()
2109 }
2110
2111 #[must_use]
2113 pub fn orders_closed_count(
2114 &self,
2115 venue: Option<&Venue>,
2116 instrument_id: Option<&InstrumentId>,
2117 strategy_id: Option<&StrategyId>,
2118 side: Option<OrderSide>,
2119 ) -> usize {
2120 self.orders_closed(venue, instrument_id, strategy_id, side)
2121 .len()
2122 }
2123
2124 #[must_use]
2126 pub fn orders_emulated_count(
2127 &self,
2128 venue: Option<&Venue>,
2129 instrument_id: Option<&InstrumentId>,
2130 strategy_id: Option<&StrategyId>,
2131 side: Option<OrderSide>,
2132 ) -> usize {
2133 self.orders_emulated(venue, instrument_id, strategy_id, side)
2134 .len()
2135 }
2136
2137 #[must_use]
2139 pub fn orders_inflight_count(
2140 &self,
2141 venue: Option<&Venue>,
2142 instrument_id: Option<&InstrumentId>,
2143 strategy_id: Option<&StrategyId>,
2144 side: Option<OrderSide>,
2145 ) -> usize {
2146 self.orders_inflight(venue, instrument_id, strategy_id, side)
2147 .len()
2148 }
2149
2150 #[must_use]
2152 pub fn orders_total_count(
2153 &self,
2154 venue: Option<&Venue>,
2155 instrument_id: Option<&InstrumentId>,
2156 strategy_id: Option<&StrategyId>,
2157 side: Option<OrderSide>,
2158 ) -> usize {
2159 self.orders(venue, instrument_id, strategy_id, side).len()
2160 }
2161
2162 #[must_use]
2164 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2165 self.order_lists.get(order_list_id)
2166 }
2167
2168 #[must_use]
2170 pub fn order_lists(
2171 &self,
2172 venue: Option<&Venue>,
2173 instrument_id: Option<&InstrumentId>,
2174 strategy_id: Option<&StrategyId>,
2175 ) -> Vec<&OrderList> {
2176 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2177
2178 if let Some(venue) = venue {
2179 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2180 }
2181
2182 if let Some(instrument_id) = instrument_id {
2183 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2184 }
2185
2186 if let Some(strategy_id) = strategy_id {
2187 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2188 }
2189
2190 order_lists
2191 }
2192
2193 #[must_use]
2195 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2196 self.order_lists.contains_key(order_list_id)
2197 }
2198
2199 #[must_use]
2204 pub fn orders_for_exec_algorithm(
2205 &self,
2206 exec_algorithm_id: &ExecAlgorithmId,
2207 venue: Option<&Venue>,
2208 instrument_id: Option<&InstrumentId>,
2209 strategy_id: Option<&StrategyId>,
2210 side: Option<OrderSide>,
2211 ) -> Vec<&OrderAny> {
2212 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2213 let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2214
2215 if let Some(query) = query {
2216 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2217 let exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2218 }
2219 }
2220
2221 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2222 self.get_orders_for_ids(exec_algorithm_order_ids, side)
2223 } else {
2224 Vec::new()
2225 }
2226 }
2227
2228 #[must_use]
2230 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2231 self.get_orders_for_ids(
2232 self.index
2233 .exec_spawn_orders
2234 .get(exec_spawn_id)
2235 .unwrap_or(&HashSet::new()),
2236 None,
2237 )
2238 }
2239
2240 #[must_use]
2242 pub fn exec_spawn_total_quantity(
2243 &self,
2244 exec_spawn_id: &ClientOrderId,
2245 active_only: bool,
2246 ) -> Option<Quantity> {
2247 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2248
2249 let mut total_quantity: Option<Quantity> = None;
2250
2251 for spawn_order in exec_spawn_orders {
2252 if !active_only || !spawn_order.is_closed() {
2253 if let Some(mut total_quantity) = total_quantity {
2254 total_quantity += spawn_order.quantity();
2255 }
2256 } else {
2257 total_quantity = Some(spawn_order.quantity());
2258 }
2259 }
2260
2261 total_quantity
2262 }
2263
2264 #[must_use]
2266 pub fn exec_spawn_total_filled_qty(
2267 &self,
2268 exec_spawn_id: &ClientOrderId,
2269 active_only: bool,
2270 ) -> Option<Quantity> {
2271 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2272
2273 let mut total_quantity: Option<Quantity> = None;
2274
2275 for spawn_order in exec_spawn_orders {
2276 if !active_only || !spawn_order.is_closed() {
2277 if let Some(mut total_quantity) = total_quantity {
2278 total_quantity += spawn_order.filled_qty();
2279 }
2280 } else {
2281 total_quantity = Some(spawn_order.filled_qty());
2282 }
2283 }
2284
2285 total_quantity
2286 }
2287
2288 #[must_use]
2290 pub fn exec_spawn_total_leaves_qty(
2291 &self,
2292 exec_spawn_id: &ClientOrderId,
2293 active_only: bool,
2294 ) -> Option<Quantity> {
2295 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2296
2297 let mut total_quantity: Option<Quantity> = None;
2298
2299 for spawn_order in exec_spawn_orders {
2300 if !active_only || !spawn_order.is_closed() {
2301 if let Some(mut total_quantity) = total_quantity {
2302 total_quantity += spawn_order.leaves_qty();
2303 }
2304 } else {
2305 total_quantity = Some(spawn_order.leaves_qty());
2306 }
2307 }
2308
2309 total_quantity
2310 }
2311
2312 #[must_use]
2316 pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2317 self.positions.get(position_id)
2318 }
2319
2320 #[must_use]
2322 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2323 self.index
2324 .order_position
2325 .get(client_order_id)
2326 .and_then(|position_id| self.positions.get(position_id))
2327 }
2328
2329 #[must_use]
2331 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2332 self.index.order_position.get(client_order_id)
2333 }
2334
2335 #[must_use]
2337 pub fn positions(
2338 &self,
2339 venue: Option<&Venue>,
2340 instrument_id: Option<&InstrumentId>,
2341 strategy_id: Option<&StrategyId>,
2342 side: Option<PositionSide>,
2343 ) -> Vec<&Position> {
2344 let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2345 self.get_positions_for_ids(&position_ids, side)
2346 }
2347
2348 #[must_use]
2350 pub fn positions_open(
2351 &self,
2352 venue: Option<&Venue>,
2353 instrument_id: Option<&InstrumentId>,
2354 strategy_id: Option<&StrategyId>,
2355 side: Option<PositionSide>,
2356 ) -> Vec<&Position> {
2357 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2358 self.get_positions_for_ids(&position_ids, side)
2359 }
2360
2361 #[must_use]
2363 pub fn positions_closed(
2364 &self,
2365 venue: Option<&Venue>,
2366 instrument_id: Option<&InstrumentId>,
2367 strategy_id: Option<&StrategyId>,
2368 side: Option<PositionSide>,
2369 ) -> Vec<&Position> {
2370 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2371 self.get_positions_for_ids(&position_ids, side)
2372 }
2373
2374 #[must_use]
2376 pub fn position_exists(&self, position_id: &PositionId) -> bool {
2377 self.index.positions.contains(position_id)
2378 }
2379
2380 #[must_use]
2382 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2383 self.index.positions_open.contains(position_id)
2384 }
2385
2386 #[must_use]
2388 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2389 self.index.positions_closed.contains(position_id)
2390 }
2391
2392 #[must_use]
2394 pub fn positions_open_count(
2395 &self,
2396 venue: Option<&Venue>,
2397 instrument_id: Option<&InstrumentId>,
2398 strategy_id: Option<&StrategyId>,
2399 side: Option<PositionSide>,
2400 ) -> usize {
2401 self.positions_open(venue, instrument_id, strategy_id, side)
2402 .len()
2403 }
2404
2405 #[must_use]
2407 pub fn positions_closed_count(
2408 &self,
2409 venue: Option<&Venue>,
2410 instrument_id: Option<&InstrumentId>,
2411 strategy_id: Option<&StrategyId>,
2412 side: Option<PositionSide>,
2413 ) -> usize {
2414 self.positions_closed(venue, instrument_id, strategy_id, side)
2415 .len()
2416 }
2417
2418 #[must_use]
2420 pub fn positions_total_count(
2421 &self,
2422 venue: Option<&Venue>,
2423 instrument_id: Option<&InstrumentId>,
2424 strategy_id: Option<&StrategyId>,
2425 side: Option<PositionSide>,
2426 ) -> usize {
2427 self.positions(venue, instrument_id, strategy_id, side)
2428 .len()
2429 }
2430
2431 #[must_use]
2435 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2436 self.index.order_strategy.get(client_order_id)
2437 }
2438
2439 #[must_use]
2441 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2442 self.index.position_strategy.get(position_id)
2443 }
2444
2445 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2449 check_valid_string(key, stringify!(key)).expect(FAILED);
2450
2451 Ok(self.general.get(key))
2452 }
2453
2454 #[must_use]
2458 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2459 match price_type {
2460 PriceType::Bid => self
2461 .quotes
2462 .get(instrument_id)
2463 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2464 PriceType::Ask => self
2465 .quotes
2466 .get(instrument_id)
2467 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2468 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2469 quotes.front().map(|quote| {
2470 Price::new(
2471 (quote.ask_price.as_f64() + quote.bid_price.as_f64()) / 2.0,
2472 quote.bid_price.precision + 1,
2473 )
2474 })
2475 }),
2476 PriceType::Last => self
2477 .trades
2478 .get(instrument_id)
2479 .and_then(|trades| trades.front().map(|trade| trade.price)),
2480 }
2481 }
2482
2483 #[must_use]
2485 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2486 self.quotes
2487 .get(instrument_id)
2488 .map(|quotes| quotes.iter().copied().collect())
2489 }
2490
2491 #[must_use]
2493 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2494 self.trades
2495 .get(instrument_id)
2496 .map(|trades| trades.iter().copied().collect())
2497 }
2498
2499 #[must_use]
2501 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2502 self.bars
2503 .get(bar_type)
2504 .map(|bars| bars.iter().copied().collect())
2505 }
2506
2507 #[must_use]
2509 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
2510 self.books.get(instrument_id)
2511 }
2512
2513 #[must_use]
2515 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
2516 self.books.get_mut(instrument_id)
2517 }
2518
2519 #[must_use]
2521 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
2522 self.quotes
2523 .get(instrument_id)
2524 .and_then(|quotes| quotes.front())
2525 }
2526
2527 #[must_use]
2529 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
2530 self.trades
2531 .get(instrument_id)
2532 .and_then(|trades| trades.front())
2533 }
2534
2535 #[must_use]
2537 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
2538 self.bars.get(bar_type).and_then(|bars| bars.front())
2539 }
2540
2541 #[must_use]
2543 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
2544 self.books.get(instrument_id).map_or(0, |book| book.count) as usize
2545 }
2546
2547 #[must_use]
2549 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
2550 self.quotes
2551 .get(instrument_id)
2552 .map_or(0, std::collections::VecDeque::len)
2553 }
2554
2555 #[must_use]
2557 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
2558 self.trades
2559 .get(instrument_id)
2560 .map_or(0, std::collections::VecDeque::len)
2561 }
2562
2563 #[must_use]
2565 pub fn bar_count(&self, bar_type: &BarType) -> usize {
2566 self.bars
2567 .get(bar_type)
2568 .map_or(0, std::collections::VecDeque::len)
2569 }
2570
2571 #[must_use]
2573 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
2574 self.books.contains_key(instrument_id)
2575 }
2576
2577 #[must_use]
2579 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
2580 self.quote_count(instrument_id) > 0
2581 }
2582
2583 #[must_use]
2585 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
2586 self.trade_count(instrument_id) > 0
2587 }
2588
2589 #[must_use]
2591 pub fn has_bars(&self, bar_type: &BarType) -> bool {
2592 self.bar_count(bar_type) > 0
2593 }
2594
2595 #[must_use]
2596 pub fn get_xrate(
2597 &self,
2598 venue: Venue,
2599 from_currency: Currency,
2600 to_currency: Currency,
2601 price_type: PriceType,
2602 ) -> Decimal {
2603 if from_currency == to_currency {
2604 return Decimal::ONE;
2605 }
2606
2607 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
2608
2609 get_exchange_rate(from_currency, to_currency, price_type, bid_quote, ask_quote)
2610 }
2611
2612 fn build_quote_table(
2613 &self,
2614 venue: &Venue,
2615 ) -> (HashMap<Symbol, Decimal>, HashMap<Symbol, Decimal>) {
2616 let mut bid_quotes = HashMap::new();
2617 let mut ask_quotes = HashMap::new();
2618
2619 for instrument_id in self.instruments.keys() {
2620 if instrument_id.venue != *venue {
2621 continue;
2622 }
2623
2624 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
2625 if let Some(tick) = ticks.front() {
2626 (tick.bid_price, tick.ask_price)
2627 } else {
2628 continue; }
2630 } else {
2631 let bid_bar = self
2632 .bars
2633 .iter()
2634 .find(|(k, _)| {
2635 k.instrument_id() == *instrument_id
2636 && matches!(k.spec().price_type, PriceType::Bid)
2637 })
2638 .map(|(_, v)| v);
2639
2640 let ask_bar = self
2641 .bars
2642 .iter()
2643 .find(|(k, _)| {
2644 k.instrument_id() == *instrument_id
2645 && matches!(k.spec().price_type, PriceType::Ask)
2646 })
2647 .map(|(_, v)| v);
2648
2649 match (bid_bar, ask_bar) {
2650 (Some(bid), Some(ask)) => {
2651 let bid_price = bid.front().unwrap().close;
2652 let ask_price = ask.front().unwrap().close;
2653
2654 (bid_price, ask_price)
2655 }
2656 _ => continue,
2657 }
2658 };
2659
2660 bid_quotes.insert(instrument_id.symbol, bid_price.as_decimal());
2661 ask_quotes.insert(instrument_id.symbol, ask_price.as_decimal());
2662 }
2663
2664 (bid_quotes, ask_quotes)
2665 }
2666
2667 #[must_use]
2671 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
2672 self.instruments.get(instrument_id)
2673 }
2674
2675 #[must_use]
2677 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
2678 self.instruments
2679 .keys()
2680 .filter(|i| venue.is_none() || &i.venue == venue.unwrap())
2681 .collect()
2682 }
2683
2684 #[must_use]
2686 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
2687 self.instruments
2688 .values()
2689 .filter(|i| &i.id().venue == venue)
2690 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(u)))
2691 .collect()
2692 }
2693
2694 #[must_use]
2696 pub fn bar_types(
2697 &self,
2698 instrument_id: Option<&InstrumentId>,
2699 price_type: Option<&PriceType>,
2700 aggregation_source: AggregationSource,
2701 ) -> Vec<&BarType> {
2702 let mut bar_types = self
2703 .bars
2704 .keys()
2705 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
2706 .collect::<Vec<&BarType>>();
2707
2708 if let Some(instrument_id) = instrument_id {
2709 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
2710 }
2711
2712 if let Some(price_type) = price_type {
2713 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
2714 }
2715
2716 bar_types
2717 }
2718
2719 #[must_use]
2723 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
2724 self.synthetics.get(instrument_id)
2725 }
2726
2727 #[must_use]
2729 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
2730 self.synthetics.keys().collect()
2731 }
2732
2733 #[must_use]
2735 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
2736 self.synthetics.values().collect()
2737 }
2738
2739 #[must_use]
2743 pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
2744 self.accounts.get(account_id)
2745 }
2746
2747 #[must_use]
2749 pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
2750 self.index
2751 .venue_account
2752 .get(venue)
2753 .and_then(|account_id| self.accounts.get(account_id))
2754 }
2755
2756 #[must_use]
2758 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
2759 self.index.venue_account.get(venue)
2760 }
2761
2762 #[must_use]
2764 pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
2765 self.accounts
2766 .values()
2767 .filter(|account| &account.id() == account_id)
2768 .collect()
2769 }
2770}