1pub mod config;
19pub mod database;
20
21mod index;
22
23#[cfg(test)]
24mod tests;
25
26use std::{
27 collections::{HashMap, HashSet, VecDeque},
28 time::{SystemTime, UNIX_EPOCH},
29};
30
31use bytes::Bytes;
32pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
34use index::CacheIndex;
35use nautilus_core::{
36 UUID4, UnixNanos,
37 correctness::{
38 FAILED, check_key_not_in_map, check_predicate_false, check_slice_not_empty,
39 check_valid_string,
40 },
41 datetime::secs_to_nanos,
42};
43use nautilus_model::{
44 accounts::AccountAny,
45 data::{
46 Bar, BarType, QuoteTick, TradeTick,
47 prices::{IndexPriceUpdate, MarkPriceUpdate},
48 },
49 enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
50 identifiers::{
51 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
52 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
53 },
54 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
55 orderbook::{OrderBook, own::OwnOrderBook},
56 orders::{Order, OrderAny, OrderList},
57 position::Position,
58 types::{Currency, Money, Price, Quantity},
59};
60use ustr::Ustr;
61
62use crate::xrate::get_exchange_rate;
63
64pub struct Cache {
66 config: CacheConfig,
67 index: CacheIndex,
68 database: Option<Box<dyn CacheDatabaseAdapter>>,
69 general: HashMap<String, Bytes>,
70 currencies: HashMap<Ustr, Currency>,
71 instruments: HashMap<InstrumentId, InstrumentAny>,
72 synthetics: HashMap<InstrumentId, SyntheticInstrument>,
73 books: HashMap<InstrumentId, OrderBook>,
74 own_books: HashMap<InstrumentId, OwnOrderBook>,
75 quotes: HashMap<InstrumentId, VecDeque<QuoteTick>>,
76 trades: HashMap<InstrumentId, VecDeque<TradeTick>>,
77 mark_xrates: HashMap<(Currency, Currency), f64>,
78 mark_prices: HashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
79 index_prices: HashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
80 bars: HashMap<BarType, VecDeque<Bar>>,
81 accounts: HashMap<AccountId, AccountAny>,
82 orders: HashMap<ClientOrderId, OrderAny>,
83 order_lists: HashMap<OrderListId, OrderList>,
84 positions: HashMap<PositionId, Position>,
85 position_snapshots: HashMap<PositionId, Bytes>,
86}
87
88impl Default for Cache {
89 fn default() -> Self {
91 Self::new(Some(CacheConfig::default()), None)
92 }
93}
94
95impl Cache {
96 #[must_use]
98 pub fn new(
99 config: Option<CacheConfig>,
100 database: Option<Box<dyn CacheDatabaseAdapter>>,
101 ) -> Self {
102 Self {
103 config: config.unwrap_or_default(),
104 index: CacheIndex::default(),
105 database,
106 general: HashMap::new(),
107 currencies: HashMap::new(),
108 instruments: HashMap::new(),
109 synthetics: HashMap::new(),
110 books: HashMap::new(),
111 own_books: HashMap::new(),
112 quotes: HashMap::new(),
113 trades: HashMap::new(),
114 mark_xrates: HashMap::new(),
115 mark_prices: HashMap::new(),
116 index_prices: HashMap::new(),
117 bars: HashMap::new(),
118 accounts: HashMap::new(),
119 orders: HashMap::new(),
120 order_lists: HashMap::new(),
121 positions: HashMap::new(),
122 position_snapshots: HashMap::new(),
123 }
124 }
125
126 #[must_use]
128 pub fn memory_address(&self) -> String {
129 format!("{:?}", std::ptr::from_ref(self))
130 }
131
132 pub fn cache_general(&mut self) -> anyhow::Result<()> {
136 self.general = match &mut self.database {
137 Some(db) => db.load()?,
138 None => HashMap::new(),
139 };
140
141 log::info!(
142 "Cached {} general object(s) from database",
143 self.general.len()
144 );
145 Ok(())
146 }
147
148 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
150 let cache_map = match &self.database {
151 Some(db) => db.load_all().await?,
152 None => CacheMap::default(),
153 };
154
155 self.currencies = cache_map.currencies;
156 self.instruments = cache_map.instruments;
157 self.synthetics = cache_map.synthetics;
158 self.accounts = cache_map.accounts;
159 self.orders = cache_map.orders;
160 self.positions = cache_map.positions;
161 Ok(())
162 }
163
164 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
166 self.currencies = match &mut self.database {
167 Some(db) => db.load_currencies().await?,
168 None => HashMap::new(),
169 };
170
171 log::info!("Cached {} currencies from database", self.general.len());
172 Ok(())
173 }
174
175 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
177 self.instruments = match &mut self.database {
178 Some(db) => db.load_instruments().await?,
179 None => HashMap::new(),
180 };
181
182 log::info!("Cached {} instruments from database", self.general.len());
183 Ok(())
184 }
185
186 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
189 self.synthetics = match &mut self.database {
190 Some(db) => db.load_synthetics().await?,
191 None => HashMap::new(),
192 };
193
194 log::info!(
195 "Cached {} synthetic instruments from database",
196 self.general.len()
197 );
198 Ok(())
199 }
200
201 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
203 self.accounts = match &mut self.database {
204 Some(db) => db.load_accounts().await?,
205 None => HashMap::new(),
206 };
207
208 log::info!(
209 "Cached {} synthetic instruments from database",
210 self.general.len()
211 );
212 Ok(())
213 }
214
215 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
217 self.orders = match &mut self.database {
218 Some(db) => db.load_orders().await?,
219 None => HashMap::new(),
220 };
221
222 log::info!("Cached {} orders from database", self.general.len());
223 Ok(())
224 }
225
226 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
228 self.positions = match &mut self.database {
229 Some(db) => db.load_positions().await?,
230 None => HashMap::new(),
231 };
232
233 log::info!("Cached {} positions from database", self.general.len());
234 Ok(())
235 }
236
237 pub fn build_index(&mut self) {
239 log::debug!("Building index");
240
241 for account_id in self.accounts.keys() {
243 self.index
244 .venue_account
245 .insert(account_id.get_issuer(), *account_id);
246 }
247
248 for (client_order_id, order) in &self.orders {
250 let instrument_id = order.instrument_id();
251 let venue = instrument_id.venue;
252 let strategy_id = order.strategy_id();
253
254 self.index
256 .venue_orders
257 .entry(venue)
258 .or_default()
259 .insert(*client_order_id);
260
261 if let Some(venue_order_id) = order.venue_order_id() {
263 self.index
264 .venue_order_ids
265 .insert(venue_order_id, *client_order_id);
266 }
267
268 if let Some(position_id) = order.position_id() {
270 self.index
271 .order_position
272 .insert(*client_order_id, position_id);
273 }
274
275 self.index
277 .order_strategy
278 .insert(*client_order_id, order.strategy_id());
279
280 self.index
282 .instrument_orders
283 .entry(instrument_id)
284 .or_default()
285 .insert(*client_order_id);
286
287 self.index
289 .strategy_orders
290 .entry(strategy_id)
291 .or_default()
292 .insert(*client_order_id);
293
294 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
296 self.index
297 .exec_algorithm_orders
298 .entry(exec_algorithm_id)
299 .or_default()
300 .insert(*client_order_id);
301 }
302
303 if let Some(exec_spawn_id) = order.exec_spawn_id() {
305 self.index
306 .exec_spawn_orders
307 .entry(exec_spawn_id)
308 .or_default()
309 .insert(*client_order_id);
310 }
311
312 self.index.orders.insert(*client_order_id);
314
315 if order.is_open() {
317 self.index.orders_open.insert(*client_order_id);
318 }
319
320 if order.is_closed() {
322 self.index.orders_closed.insert(*client_order_id);
323 }
324
325 if let Some(emulation_trigger) = order.emulation_trigger() {
327 if emulation_trigger != TriggerType::NoTrigger && !order.is_closed() {
328 self.index.orders_emulated.insert(*client_order_id);
329 }
330 }
331
332 if order.is_inflight() {
334 self.index.orders_inflight.insert(*client_order_id);
335 }
336
337 self.index.strategies.insert(strategy_id);
339
340 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
342 self.index.exec_algorithms.insert(exec_algorithm_id);
343 }
344 }
345
346 for (position_id, position) in &self.positions {
348 let instrument_id = position.instrument_id;
349 let venue = instrument_id.venue;
350 let strategy_id = position.strategy_id;
351
352 self.index
354 .venue_positions
355 .entry(venue)
356 .or_default()
357 .insert(*position_id);
358
359 self.index
361 .position_strategy
362 .insert(*position_id, position.strategy_id);
363
364 self.index
366 .position_orders
367 .entry(*position_id)
368 .or_default()
369 .extend(position.client_order_ids().into_iter());
370
371 self.index
373 .instrument_positions
374 .entry(instrument_id)
375 .or_default()
376 .insert(*position_id);
377
378 self.index
380 .strategy_positions
381 .entry(strategy_id)
382 .or_default()
383 .insert(*position_id);
384
385 self.index.positions.insert(*position_id);
387
388 if position.is_open() {
390 self.index.positions_open.insert(*position_id);
391 }
392
393 if position.is_closed() {
395 self.index.positions_closed.insert(*position_id);
396 }
397
398 self.index.strategies.insert(strategy_id);
400 }
401 }
402
403 #[must_use]
405 pub const fn has_backing(&self) -> bool {
406 self.config.database.is_some()
407 }
408
409 #[must_use]
411 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
412 let quote = if let Some(quote) = self.quote(&position.instrument_id) {
413 quote
414 } else {
415 log::warn!(
416 "Cannot calculate unrealized PnL for {}, no quotes for {}",
417 position.id,
418 position.instrument_id
419 );
420 return None;
421 };
422
423 let last = match position.side {
424 PositionSide::Flat | PositionSide::NoPositionSide => {
425 return Some(Money::new(0.0, position.settlement_currency));
426 }
427 PositionSide::Long => quote.ask_price,
428 PositionSide::Short => quote.bid_price,
429 };
430
431 Some(position.unrealized_pnl(last))
432 }
433
434 #[must_use]
439 pub fn check_integrity(&mut self) -> bool {
440 let mut error_count = 0;
441 let failure = "Integrity failure";
442
443 let timestamp_us = SystemTime::now()
445 .duration_since(UNIX_EPOCH)
446 .expect("Time went backwards")
447 .as_micros();
448
449 log::info!("Checking data integrity");
450
451 for account_id in self.accounts.keys() {
453 if !self
454 .index
455 .venue_account
456 .contains_key(&account_id.get_issuer())
457 {
458 log::error!(
459 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
460 );
461 error_count += 1;
462 }
463 }
464
465 for (client_order_id, order) in &self.orders {
466 if !self.index.order_strategy.contains_key(client_order_id) {
467 log::error!(
468 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
469 );
470 error_count += 1;
471 }
472 if !self.index.orders.contains(client_order_id) {
473 log::error!(
474 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
475 );
476 error_count += 1;
477 }
478 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
479 log::error!(
480 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
481 );
482 error_count += 1;
483 }
484 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
485 log::error!(
486 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
487 );
488 error_count += 1;
489 }
490 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
491 log::error!(
492 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
493 );
494 error_count += 1;
495 }
496 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
497 if !self
498 .index
499 .exec_algorithm_orders
500 .contains_key(&exec_algorithm_id)
501 {
502 log::error!(
503 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
504 );
505 error_count += 1;
506 }
507 if order.exec_spawn_id().is_none()
508 && !self.index.exec_spawn_orders.contains_key(client_order_id)
509 {
510 log::error!(
511 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
512 );
513 error_count += 1;
514 }
515 }
516 }
517
518 for (position_id, position) in &self.positions {
519 if !self.index.position_strategy.contains_key(position_id) {
520 log::error!(
521 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
522 );
523 error_count += 1;
524 }
525 if !self.index.position_orders.contains_key(position_id) {
526 log::error!(
527 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
528 );
529 error_count += 1;
530 }
531 if !self.index.positions.contains(position_id) {
532 log::error!(
533 "{failure} in positions: {position_id} not found in `self.index.positions`",
534 );
535 error_count += 1;
536 }
537 if position.is_open() && !self.index.positions_open.contains(position_id) {
538 log::error!(
539 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
540 );
541 error_count += 1;
542 }
543 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
544 log::error!(
545 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
546 );
547 error_count += 1;
548 }
549 }
550
551 for account_id in self.index.venue_account.values() {
553 if !self.accounts.contains_key(account_id) {
554 log::error!(
555 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
556 );
557 error_count += 1;
558 }
559 }
560
561 for client_order_id in self.index.venue_order_ids.values() {
562 if !self.orders.contains_key(client_order_id) {
563 log::error!(
564 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
565 );
566 error_count += 1;
567 }
568 }
569
570 for client_order_id in self.index.client_order_ids.keys() {
571 if !self.orders.contains_key(client_order_id) {
572 log::error!(
573 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
574 );
575 error_count += 1;
576 }
577 }
578
579 for client_order_id in self.index.order_position.keys() {
580 if !self.orders.contains_key(client_order_id) {
581 log::error!(
582 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
583 );
584 error_count += 1;
585 }
586 }
587
588 for client_order_id in self.index.order_strategy.keys() {
590 if !self.orders.contains_key(client_order_id) {
591 log::error!(
592 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
593 );
594 error_count += 1;
595 }
596 }
597
598 for position_id in self.index.position_strategy.keys() {
599 if !self.positions.contains_key(position_id) {
600 log::error!(
601 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
602 );
603 error_count += 1;
604 }
605 }
606
607 for position_id in self.index.position_orders.keys() {
608 if !self.positions.contains_key(position_id) {
609 log::error!(
610 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
611 );
612 error_count += 1;
613 }
614 }
615
616 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
617 for client_order_id in client_order_ids {
618 if !self.orders.contains_key(client_order_id) {
619 log::error!(
620 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
621 );
622 error_count += 1;
623 }
624 }
625 }
626
627 for instrument_id in self.index.instrument_positions.keys() {
628 if !self.index.instrument_orders.contains_key(instrument_id) {
629 log::error!(
630 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
631 );
632 error_count += 1;
633 }
634 }
635
636 for client_order_ids in self.index.strategy_orders.values() {
637 for client_order_id in client_order_ids {
638 if !self.orders.contains_key(client_order_id) {
639 log::error!(
640 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
641 );
642 error_count += 1;
643 }
644 }
645 }
646
647 for position_ids in self.index.strategy_positions.values() {
648 for position_id in position_ids {
649 if !self.positions.contains_key(position_id) {
650 log::error!(
651 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
652 );
653 error_count += 1;
654 }
655 }
656 }
657
658 for client_order_id in &self.index.orders {
659 if !self.orders.contains_key(client_order_id) {
660 log::error!(
661 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
662 );
663 error_count += 1;
664 }
665 }
666
667 for client_order_id in &self.index.orders_emulated {
668 if !self.orders.contains_key(client_order_id) {
669 log::error!(
670 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
671 );
672 error_count += 1;
673 }
674 }
675
676 for client_order_id in &self.index.orders_inflight {
677 if !self.orders.contains_key(client_order_id) {
678 log::error!(
679 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
680 );
681 error_count += 1;
682 }
683 }
684
685 for client_order_id in &self.index.orders_open {
686 if !self.orders.contains_key(client_order_id) {
687 log::error!(
688 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
689 );
690 error_count += 1;
691 }
692 }
693
694 for client_order_id in &self.index.orders_closed {
695 if !self.orders.contains_key(client_order_id) {
696 log::error!(
697 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
698 );
699 error_count += 1;
700 }
701 }
702
703 for position_id in &self.index.positions {
704 if !self.positions.contains_key(position_id) {
705 log::error!(
706 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
707 );
708 error_count += 1;
709 }
710 }
711
712 for position_id in &self.index.positions_open {
713 if !self.positions.contains_key(position_id) {
714 log::error!(
715 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
716 );
717 error_count += 1;
718 }
719 }
720
721 for position_id in &self.index.positions_closed {
722 if !self.positions.contains_key(position_id) {
723 log::error!(
724 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
725 );
726 error_count += 1;
727 }
728 }
729
730 for strategy_id in &self.index.strategies {
731 if !self.index.strategy_orders.contains_key(strategy_id) {
732 log::error!(
733 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
734 );
735 error_count += 1;
736 }
737 }
738
739 for exec_algorithm_id in &self.index.exec_algorithms {
740 if !self
741 .index
742 .exec_algorithm_orders
743 .contains_key(exec_algorithm_id)
744 {
745 log::error!(
746 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
747 );
748 error_count += 1;
749 }
750 }
751
752 let total_us = SystemTime::now()
753 .duration_since(UNIX_EPOCH)
754 .expect("Time went backwards")
755 .as_micros()
756 - timestamp_us;
757
758 if error_count == 0 {
759 log::info!("Integrity check passed in {total_us}μs");
760 true
761 } else {
762 log::error!(
763 "Integrity check failed with {error_count} error{} in {total_us}μs",
764 if error_count == 1 { "" } else { "s" },
765 );
766 false
767 }
768 }
769
770 #[must_use]
774 pub fn check_residuals(&self) -> bool {
775 log::debug!("Checking residuals");
776
777 let mut residuals = false;
778
779 for order in self.orders_open(None, None, None, None) {
781 residuals = true;
782 log::warn!("Residual {order:?}");
783 }
784
785 for position in self.positions_open(None, None, None, None) {
787 residuals = true;
788 log::warn!("Residual {position}");
789 }
790
791 residuals
792 }
793
794 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
800 log::debug!(
801 "Purging closed orders{}",
802 if buffer_secs > 0 {
803 format!(" with buffer_secs={buffer_secs}")
804 } else {
805 String::new()
806 }
807 );
808
809 let buffer_ns = secs_to_nanos(buffer_secs as f64);
810
811 for client_order_id in self.index.orders_closed.clone() {
812 if let Some(order) = self.orders.get(&client_order_id) {
813 if let Some(ts_closed) = order.ts_closed() {
814 if ts_closed + buffer_ns <= ts_now {
815 self.purge_order(client_order_id);
816 }
817 }
818 }
819 }
820 }
821
822 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
824 log::debug!(
825 "Purging closed positions{}",
826 if buffer_secs > 0 {
827 format!(" with buffer_secs={buffer_secs}")
828 } else {
829 String::new()
830 }
831 );
832
833 let buffer_ns = secs_to_nanos(buffer_secs as f64);
834
835 for position_id in self.index.positions_closed.clone() {
836 if let Some(position) = self.positions.get(&position_id) {
837 if let Some(ts_closed) = position.ts_closed {
838 if ts_closed + buffer_ns <= ts_now {
839 self.purge_position(position_id);
840 }
841 }
842 }
843 }
844 }
845
846 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
850 if let Some(order) = self.orders.remove(&client_order_id) {
851 if let Some(venue_orders) = self
853 .index
854 .venue_orders
855 .get_mut(&order.instrument_id().venue)
856 {
857 venue_orders.remove(&client_order_id);
858 }
859
860 if let Some(venue_order_id) = order.venue_order_id() {
862 self.index.venue_order_ids.remove(&venue_order_id);
863 }
864
865 if let Some(instrument_orders) =
867 self.index.instrument_orders.get_mut(&order.instrument_id())
868 {
869 instrument_orders.remove(&client_order_id);
870 }
871
872 if let Some(position_id) = order.position_id() {
874 if let Some(position_orders) = self.index.position_orders.get_mut(&position_id) {
875 position_orders.remove(&client_order_id);
876 }
877 }
878
879 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
881 if let Some(exec_algorithm_orders) =
882 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
883 {
884 exec_algorithm_orders.remove(&client_order_id);
885 }
886 }
887
888 log::info!("Purged order {client_order_id}");
889 } else {
890 log::warn!("Order {client_order_id} not found when purging");
891 }
892
893 self.index.order_position.remove(&client_order_id);
895 self.index.order_strategy.remove(&client_order_id);
896 self.index.order_client.remove(&client_order_id);
897 self.index.client_order_ids.remove(&client_order_id);
898 self.index.exec_spawn_orders.remove(&client_order_id);
899 self.index.orders.remove(&client_order_id);
900 self.index.orders_closed.remove(&client_order_id);
901 self.index.orders_emulated.remove(&client_order_id);
902 self.index.orders_inflight.remove(&client_order_id);
903 self.index.orders_pending_cancel.remove(&client_order_id);
904
905 if let Some(position_id) = self.index.order_position.get(&client_order_id) {
907 if let Some(position) = self.positions.get_mut(position_id) {
908 position.purge_events_for_order(client_order_id);
909 }
910 }
911 }
912
913 pub fn purge_position(&mut self, position_id: PositionId) {
915 if let Some(position) = self.positions.remove(&position_id) {
916 if let Some(venue_positions) = self
918 .index
919 .venue_positions
920 .get_mut(&position.instrument_id.venue)
921 {
922 venue_positions.remove(&position_id);
923 }
924
925 if let Some(instrument_positions) = self
927 .index
928 .instrument_positions
929 .get_mut(&position.instrument_id)
930 {
931 instrument_positions.remove(&position_id);
932 }
933
934 if let Some(strategy_positions) =
936 self.index.strategy_positions.get_mut(&position.strategy_id)
937 {
938 strategy_positions.remove(&position_id);
939 }
940
941 for client_order_id in position.client_order_ids() {
943 self.index.order_position.remove(&client_order_id);
944 }
945
946 log::info!("Purged position {position_id}");
947 } else {
948 log::warn!("Position {position_id} not found when purging");
949 }
950
951 self.index.position_strategy.remove(&position_id);
953 self.index.position_orders.remove(&position_id);
954 self.index.positions.remove(&position_id);
955 self.index.positions_open.remove(&position_id);
956 self.index.positions_closed.remove(&position_id);
957 }
958
959 pub fn purge_account_events(&mut self, _ts_now: UnixNanos, lookback_secs: u64) {
964 log::debug!(
965 "Purging account events{}",
966 if lookback_secs > 0 {
967 format!(" with lookback_secs={lookback_secs}")
968 } else {
969 String::new()
970 }
971 );
972
973 }
987
988 pub fn clear_index(&mut self) {
990 self.index.clear();
991 log::debug!("Cleared index");
992 }
993
994 pub fn reset(&mut self) {
998 log::debug!("Resetting cache");
999
1000 self.general.clear();
1001 self.currencies.clear();
1002 self.instruments.clear();
1003 self.synthetics.clear();
1004 self.books.clear();
1005 self.own_books.clear();
1006 self.quotes.clear();
1007 self.trades.clear();
1008 self.mark_xrates.clear();
1009 self.mark_prices.clear();
1010 self.index_prices.clear();
1011 self.bars.clear();
1012 self.accounts.clear();
1013 self.orders.clear();
1014 self.order_lists.clear();
1015 self.positions.clear();
1016 self.position_snapshots.clear();
1017
1018 self.clear_index();
1019
1020 log::info!("Reset cache");
1021 }
1022
1023 pub fn dispose(&mut self) {
1025 if let Some(database) = &mut self.database {
1026 database.close().expect("Failed to close database");
1027 }
1028 }
1029
1030 pub fn flush_db(&mut self) {
1032 if let Some(database) = &mut self.database {
1033 database.flush().expect("Failed to flush database");
1034 }
1035 }
1036
1037 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1042 check_valid_string(key, stringify!(key)).expect(FAILED);
1043 check_predicate_false(value.is_empty(), stringify!(value)).expect(FAILED);
1044
1045 log::debug!("Adding general {key}");
1046 self.general.insert(key.to_string(), value.clone());
1047
1048 if let Some(database) = &mut self.database {
1049 database.add(key.to_string(), value)?;
1050 }
1051 Ok(())
1052 }
1053
1054 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1056 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1057
1058 if self.config.save_market_data {
1059 if let Some(database) = &mut self.database {
1060 database.add_order_book(&book)?;
1061 }
1062 }
1063
1064 self.books.insert(book.instrument_id, book);
1065 Ok(())
1066 }
1067
1068 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1070 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1071
1072 self.own_books.insert(own_book.instrument_id, own_book);
1073 Ok(())
1074 }
1075
1076 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1078 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1079
1080 if self.config.save_market_data {
1081 }
1083
1084 let mark_prices_deque = self
1085 .mark_prices
1086 .entry(mark_price.instrument_id)
1087 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1088 mark_prices_deque.push_front(mark_price);
1089 Ok(())
1090 }
1091
1092 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1094 log::debug!(
1095 "Adding `IndexPriceUpdate` for {}",
1096 index_price.instrument_id
1097 );
1098
1099 if self.config.save_market_data {
1100 }
1102
1103 let index_prices_deque = self
1104 .index_prices
1105 .entry(index_price.instrument_id)
1106 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1107 index_prices_deque.push_front(index_price);
1108 Ok(())
1109 }
1110
1111 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1113 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1114
1115 if self.config.save_market_data {
1116 if let Some(database) = &mut self.database {
1117 database.add_quote("e)?;
1118 }
1119 }
1120
1121 let quotes_deque = self
1122 .quotes
1123 .entry(quote.instrument_id)
1124 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1125 quotes_deque.push_front(quote);
1126 Ok(())
1127 }
1128
1129 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1131 check_slice_not_empty(quotes, stringify!(quotes)).unwrap();
1132
1133 let instrument_id = quotes[0].instrument_id;
1134 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1135
1136 if self.config.save_market_data {
1137 if let Some(database) = &mut self.database {
1138 for quote in quotes {
1139 database.add_quote(quote).unwrap();
1140 }
1141 }
1142 }
1143
1144 let quotes_deque = self
1145 .quotes
1146 .entry(instrument_id)
1147 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1148
1149 for quote in quotes {
1150 quotes_deque.push_front(*quote);
1151 }
1152 Ok(())
1153 }
1154
1155 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1157 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1158
1159 if self.config.save_market_data {
1160 if let Some(database) = &mut self.database {
1161 database.add_trade(&trade)?;
1162 }
1163 }
1164
1165 let trades_deque = self
1166 .trades
1167 .entry(trade.instrument_id)
1168 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1169 trades_deque.push_front(trade);
1170 Ok(())
1171 }
1172
1173 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1175 check_slice_not_empty(trades, stringify!(trades)).unwrap();
1176
1177 let instrument_id = trades[0].instrument_id;
1178 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1179
1180 if self.config.save_market_data {
1181 if let Some(database) = &mut self.database {
1182 for trade in trades {
1183 database.add_trade(trade).unwrap();
1184 }
1185 }
1186 }
1187
1188 let trades_deque = self
1189 .trades
1190 .entry(instrument_id)
1191 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1192
1193 for trade in trades {
1194 trades_deque.push_front(*trade);
1195 }
1196 Ok(())
1197 }
1198
1199 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1201 log::debug!("Adding `Bar` {}", bar.bar_type);
1202
1203 if self.config.save_market_data {
1204 if let Some(database) = &mut self.database {
1205 database.add_bar(&bar)?;
1206 }
1207 }
1208
1209 let bars = self
1210 .bars
1211 .entry(bar.bar_type)
1212 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1213 bars.push_front(bar);
1214 Ok(())
1215 }
1216
1217 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1219 check_slice_not_empty(bars, stringify!(bars)).unwrap();
1220
1221 let bar_type = bars[0].bar_type;
1222 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1223
1224 if self.config.save_market_data {
1225 if let Some(database) = &mut self.database {
1226 for bar in bars {
1227 database.add_bar(bar).unwrap();
1228 }
1229 }
1230 }
1231
1232 let bars_deque = self
1233 .bars
1234 .entry(bar_type)
1235 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1236
1237 for bar in bars {
1238 bars_deque.push_front(*bar);
1239 }
1240 Ok(())
1241 }
1242
1243 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1245 log::debug!("Adding `Currency` {}", currency.code);
1246
1247 if let Some(database) = &mut self.database {
1248 database.add_currency(¤cy)?;
1249 }
1250
1251 self.currencies.insert(currency.code, currency);
1252 Ok(())
1253 }
1254
1255 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1257 log::debug!("Adding `Instrument` {}", instrument.id());
1258
1259 if let Some(database) = &mut self.database {
1260 database.add_instrument(&instrument)?;
1261 }
1262
1263 self.instruments.insert(instrument.id(), instrument);
1264 Ok(())
1265 }
1266
1267 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1269 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1270
1271 if let Some(database) = &mut self.database {
1272 database.add_synthetic(&synthetic)?;
1273 }
1274
1275 self.synthetics.insert(synthetic.id, synthetic);
1276 Ok(())
1277 }
1278
1279 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1281 log::debug!("Adding `Account` {}", account.id());
1282
1283 if let Some(database) = &mut self.database {
1284 database.add_account(&account)?;
1285 }
1286
1287 let account_id = account.id();
1288 self.accounts.insert(account_id, account);
1289 self.index
1290 .venue_account
1291 .insert(account_id.get_issuer(), account_id);
1292 Ok(())
1293 }
1294
1295 pub fn add_venue_order_id(
1299 &mut self,
1300 client_order_id: &ClientOrderId,
1301 venue_order_id: &VenueOrderId,
1302 overwrite: bool,
1303 ) -> anyhow::Result<()> {
1304 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id) {
1305 if !overwrite && existing_venue_order_id != venue_order_id {
1306 anyhow::bail!(
1307 "Existing {existing_venue_order_id} for {client_order_id}
1308 did not match the given {venue_order_id}.
1309 If you are writing a test then try a different `venue_order_id`,
1310 otherwise this is probably a bug."
1311 );
1312 }
1313 }
1314
1315 self.index
1316 .client_order_ids
1317 .insert(*client_order_id, *venue_order_id);
1318 self.index
1319 .venue_order_ids
1320 .insert(*venue_order_id, *client_order_id);
1321
1322 Ok(())
1323 }
1324
1325 pub fn add_order(
1338 &mut self,
1339 order: OrderAny,
1340 position_id: Option<PositionId>,
1341 client_id: Option<ClientId>,
1342 replace_existing: bool,
1343 ) -> anyhow::Result<()> {
1344 let instrument_id = order.instrument_id();
1345 let venue = instrument_id.venue;
1346 let client_order_id = order.client_order_id();
1347 let strategy_id = order.strategy_id();
1348 let exec_algorithm_id = order.exec_algorithm_id();
1349 let exec_spawn_id = order.exec_spawn_id();
1350
1351 if !replace_existing {
1352 check_key_not_in_map(
1353 &client_order_id,
1354 &self.orders,
1355 stringify!(client_order_id),
1356 stringify!(orders),
1357 )
1358 .expect(FAILED);
1359 check_key_not_in_map(
1360 &client_order_id,
1361 &self.orders,
1362 stringify!(client_order_id),
1363 stringify!(orders),
1364 )
1365 .expect(FAILED);
1366 check_key_not_in_map(
1367 &client_order_id,
1368 &self.orders,
1369 stringify!(client_order_id),
1370 stringify!(orders),
1371 )
1372 .expect(FAILED);
1373 check_key_not_in_map(
1374 &client_order_id,
1375 &self.orders,
1376 stringify!(client_order_id),
1377 stringify!(orders),
1378 )
1379 .expect(FAILED);
1380 }
1381
1382 log::debug!("Adding {order:?}");
1383
1384 self.index.orders.insert(client_order_id);
1385 self.index
1386 .order_strategy
1387 .insert(client_order_id, strategy_id);
1388 self.index.strategies.insert(strategy_id);
1389
1390 self.index
1392 .venue_orders
1393 .entry(venue)
1394 .or_default()
1395 .insert(client_order_id);
1396
1397 self.index
1399 .instrument_orders
1400 .entry(instrument_id)
1401 .or_default()
1402 .insert(client_order_id);
1403
1404 self.index
1406 .strategy_orders
1407 .entry(strategy_id)
1408 .or_default()
1409 .insert(client_order_id);
1410
1411 if let Some(exec_algorithm_id) = exec_algorithm_id {
1413 self.index.exec_algorithms.insert(exec_algorithm_id);
1414
1415 self.index
1416 .exec_algorithm_orders
1417 .entry(exec_algorithm_id)
1418 .or_default()
1419 .insert(client_order_id);
1420
1421 self.index
1422 .exec_spawn_orders
1423 .entry(exec_spawn_id.expect("`exec_spawn_id` is guaranteed to exist"))
1424 .or_default()
1425 .insert(client_order_id);
1426 }
1427
1428 match order.emulation_trigger() {
1430 Some(_) => {
1431 self.index.orders_emulated.remove(&client_order_id);
1432 }
1433 None => {
1434 self.index.orders_emulated.insert(client_order_id);
1435 }
1436 }
1437
1438 if let Some(position_id) = position_id {
1440 self.add_position_id(
1441 &position_id,
1442 &order.instrument_id().venue,
1443 &client_order_id,
1444 &strategy_id,
1445 )?;
1446 }
1447
1448 if let Some(client_id) = client_id {
1450 self.index.order_client.insert(client_order_id, client_id);
1451 log::debug!("Indexed {client_id:?}");
1452 }
1453
1454 if let Some(database) = &mut self.database {
1455 database.add_order(&order, client_id)?;
1456 }
1461
1462 self.orders.insert(client_order_id, order);
1463
1464 Ok(())
1465 }
1466
1467 pub fn add_position_id(
1469 &mut self,
1470 position_id: &PositionId,
1471 venue: &Venue,
1472 client_order_id: &ClientOrderId,
1473 strategy_id: &StrategyId,
1474 ) -> anyhow::Result<()> {
1475 self.index
1476 .order_position
1477 .insert(*client_order_id, *position_id);
1478
1479 if let Some(database) = &mut self.database {
1481 database.index_order_position(*client_order_id, *position_id)?;
1482 }
1483
1484 self.index
1486 .position_strategy
1487 .insert(*position_id, *strategy_id);
1488
1489 self.index
1491 .position_orders
1492 .entry(*position_id)
1493 .or_default()
1494 .insert(*client_order_id);
1495
1496 self.index
1498 .strategy_positions
1499 .entry(*strategy_id)
1500 .or_default()
1501 .insert(*position_id);
1502
1503 self.index
1505 .venue_positions
1506 .entry(*venue)
1507 .or_default()
1508 .insert(*position_id);
1509
1510 Ok(())
1511 }
1512
1513 pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1515 self.positions.insert(position.id, position.clone());
1516 self.index.positions.insert(position.id);
1517 self.index.positions_open.insert(position.id);
1518
1519 log::debug!("Adding {position}");
1520
1521 self.add_position_id(
1522 &position.id,
1523 &position.instrument_id.venue,
1524 &position.opening_order_id,
1525 &position.strategy_id,
1526 )?;
1527
1528 let venue = position.instrument_id.venue;
1529 let venue_positions = self.index.venue_positions.entry(venue).or_default();
1530 venue_positions.insert(position.id);
1531
1532 let instrument_id = position.instrument_id;
1534 let instrument_positions = self
1535 .index
1536 .instrument_positions
1537 .entry(instrument_id)
1538 .or_default();
1539 instrument_positions.insert(position.id);
1540
1541 if let Some(database) = &mut self.database {
1542 database.add_position(&position)?;
1543 }
1552
1553 Ok(())
1554 }
1555
1556 pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1558 if let Some(database) = &mut self.database {
1559 database.update_account(&account)?;
1560 }
1561 Ok(())
1562 }
1563
1564 pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1566 let client_order_id = order.client_order_id();
1567
1568 if let Some(venue_order_id) = order.venue_order_id() {
1570 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1573 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1575 }
1576 }
1577
1578 if order.is_inflight() {
1580 self.index.orders_inflight.insert(client_order_id);
1581 } else {
1582 self.index.orders_inflight.remove(&client_order_id);
1583 }
1584
1585 if order.is_open() {
1587 self.index.orders_closed.remove(&client_order_id);
1588 self.index.orders_open.insert(client_order_id);
1589 } else if order.is_closed() {
1590 self.index.orders_open.remove(&client_order_id);
1591 self.index.orders_pending_cancel.remove(&client_order_id);
1592 self.index.orders_closed.insert(client_order_id);
1593 }
1594
1595 if let Some(emulation_trigger) = order.emulation_trigger() {
1597 match emulation_trigger {
1598 TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1599 _ => self.index.orders_emulated.insert(client_order_id),
1600 };
1601 }
1602
1603 if let Some(database) = &mut self.database {
1604 database.update_order(order.last_event())?;
1605 }
1610
1611 self.orders.insert(client_order_id, order.clone());
1613
1614 Ok(())
1615 }
1616
1617 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1619 self.index
1620 .orders_pending_cancel
1621 .insert(order.client_order_id());
1622 }
1623
1624 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1626 if position.is_open() {
1628 self.index.positions_open.insert(position.id);
1629 self.index.positions_closed.remove(&position.id);
1630 } else {
1631 self.index.positions_closed.insert(position.id);
1632 self.index.positions_open.remove(&position.id);
1633 }
1634
1635 if let Some(database) = &mut self.database {
1636 database.update_position(position)?;
1637 }
1642 Ok(())
1643 }
1644
1645 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1648 let position_id = position.id;
1649
1650 let mut copied_position = position.clone();
1651 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1652 copied_position.id = PositionId::new(new_id);
1653
1654 let position_serialized = serde_json::to_vec(&copied_position)?;
1656
1657 let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1658 let new_snapshots = match snapshots {
1659 Some(existing_snapshots) => {
1660 let mut combined = existing_snapshots.to_vec();
1661 combined.extend(position_serialized);
1662 Bytes::from(combined)
1663 }
1664 None => Bytes::from(position_serialized),
1665 };
1666 self.position_snapshots.insert(position_id, new_snapshots);
1667
1668 log::debug!("Snapshot {}", copied_position);
1669 Ok(())
1670 }
1671
1672 pub fn snapshot_position_state(
1673 &mut self,
1674 position: &Position,
1675 open_only: Option<bool>,
1678 ) -> anyhow::Result<()> {
1679 let open_only = open_only.unwrap_or(true);
1680
1681 if open_only && !position.is_open() {
1682 return Ok(());
1683 }
1684
1685 if let Some(database) = &mut self.database {
1686 database.snapshot_position_state(position).map_err(|e| {
1687 log::error!(
1688 "Failed to snapshot position state for {}: {e:?}",
1689 position.id
1690 );
1691 e
1692 })?;
1693 } else {
1694 log::warn!(
1695 "Cannot snapshot position state for {} (no database configured)",
1696 position.id
1697 );
1698 }
1699
1700 todo!()
1702 }
1703
1704 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1705 let database = if let Some(database) = &self.database {
1706 database
1707 } else {
1708 log::warn!(
1709 "Cannot snapshot order state for {} (no database configured)",
1710 order.client_order_id()
1711 );
1712 return Ok(());
1713 };
1714
1715 database.snapshot_order_state(order)
1716 }
1717
1718 fn build_order_query_filter_set(
1721 &self,
1722 venue: Option<&Venue>,
1723 instrument_id: Option<&InstrumentId>,
1724 strategy_id: Option<&StrategyId>,
1725 ) -> Option<HashSet<ClientOrderId>> {
1726 let mut query: Option<HashSet<ClientOrderId>> = None;
1727
1728 if let Some(venue) = venue {
1729 query = Some(
1730 self.index
1731 .venue_orders
1732 .get(venue)
1733 .cloned()
1734 .unwrap_or_default(),
1735 );
1736 }
1737
1738 if let Some(instrument_id) = instrument_id {
1739 let instrument_orders = self
1740 .index
1741 .instrument_orders
1742 .get(instrument_id)
1743 .cloned()
1744 .unwrap_or_default();
1745
1746 if let Some(existing_query) = &mut query {
1747 *existing_query = existing_query
1748 .intersection(&instrument_orders)
1749 .copied()
1750 .collect();
1751 } else {
1752 query = Some(instrument_orders);
1753 }
1754 }
1755
1756 if let Some(strategy_id) = strategy_id {
1757 let strategy_orders = self
1758 .index
1759 .strategy_orders
1760 .get(strategy_id)
1761 .cloned()
1762 .unwrap_or_default();
1763
1764 if let Some(existing_query) = &mut query {
1765 *existing_query = existing_query
1766 .intersection(&strategy_orders)
1767 .copied()
1768 .collect();
1769 } else {
1770 query = Some(strategy_orders);
1771 }
1772 }
1773
1774 query
1775 }
1776
1777 fn build_position_query_filter_set(
1778 &self,
1779 venue: Option<&Venue>,
1780 instrument_id: Option<&InstrumentId>,
1781 strategy_id: Option<&StrategyId>,
1782 ) -> Option<HashSet<PositionId>> {
1783 let mut query: Option<HashSet<PositionId>> = None;
1784
1785 if let Some(venue) = venue {
1786 query = Some(
1787 self.index
1788 .venue_positions
1789 .get(venue)
1790 .cloned()
1791 .unwrap_or_default(),
1792 );
1793 }
1794
1795 if let Some(instrument_id) = instrument_id {
1796 let instrument_positions = self
1797 .index
1798 .instrument_positions
1799 .get(instrument_id)
1800 .cloned()
1801 .unwrap_or_default();
1802
1803 if let Some(existing_query) = query {
1804 query = Some(
1805 existing_query
1806 .intersection(&instrument_positions)
1807 .copied()
1808 .collect(),
1809 );
1810 } else {
1811 query = Some(instrument_positions);
1812 }
1813 }
1814
1815 if let Some(strategy_id) = strategy_id {
1816 let strategy_positions = self
1817 .index
1818 .strategy_positions
1819 .get(strategy_id)
1820 .cloned()
1821 .unwrap_or_default();
1822
1823 if let Some(existing_query) = query {
1824 query = Some(
1825 existing_query
1826 .intersection(&strategy_positions)
1827 .copied()
1828 .collect(),
1829 );
1830 } else {
1831 query = Some(strategy_positions);
1832 }
1833 }
1834
1835 query
1836 }
1837
1838 fn get_orders_for_ids(
1839 &self,
1840 client_order_ids: &HashSet<ClientOrderId>,
1841 side: Option<OrderSide>,
1842 ) -> Vec<&OrderAny> {
1843 let side = side.unwrap_or(OrderSide::NoOrderSide);
1844 let mut orders = Vec::new();
1845
1846 for client_order_id in client_order_ids {
1847 let order = self
1848 .orders
1849 .get(client_order_id)
1850 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
1851 if side == OrderSide::NoOrderSide || side == order.order_side() {
1852 orders.push(order);
1853 }
1854 }
1855
1856 orders
1857 }
1858
1859 fn get_positions_for_ids(
1860 &self,
1861 position_ids: &HashSet<PositionId>,
1862 side: Option<PositionSide>,
1863 ) -> Vec<&Position> {
1864 let side = side.unwrap_or(PositionSide::NoPositionSide);
1865 let mut positions = Vec::new();
1866
1867 for position_id in position_ids {
1868 let position = self
1869 .positions
1870 .get(position_id)
1871 .unwrap_or_else(|| panic!("Position {position_id} not found"));
1872 if side == PositionSide::NoPositionSide || side == position.side {
1873 positions.push(position);
1874 }
1875 }
1876
1877 positions
1878 }
1879
1880 #[must_use]
1882 pub fn client_order_ids(
1883 &self,
1884 venue: Option<&Venue>,
1885 instrument_id: Option<&InstrumentId>,
1886 strategy_id: Option<&StrategyId>,
1887 ) -> HashSet<ClientOrderId> {
1888 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1889 match query {
1890 Some(query) => self.index.orders.intersection(&query).copied().collect(),
1891 None => self.index.orders.clone(),
1892 }
1893 }
1894
1895 #[must_use]
1897 pub fn client_order_ids_open(
1898 &self,
1899 venue: Option<&Venue>,
1900 instrument_id: Option<&InstrumentId>,
1901 strategy_id: Option<&StrategyId>,
1902 ) -> HashSet<ClientOrderId> {
1903 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1904 match query {
1905 Some(query) => self
1906 .index
1907 .orders_open
1908 .intersection(&query)
1909 .copied()
1910 .collect(),
1911 None => self.index.orders_open.clone(),
1912 }
1913 }
1914
1915 #[must_use]
1917 pub fn client_order_ids_closed(
1918 &self,
1919 venue: Option<&Venue>,
1920 instrument_id: Option<&InstrumentId>,
1921 strategy_id: Option<&StrategyId>,
1922 ) -> HashSet<ClientOrderId> {
1923 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1924 match query {
1925 Some(query) => self
1926 .index
1927 .orders_closed
1928 .intersection(&query)
1929 .copied()
1930 .collect(),
1931 None => self.index.orders_closed.clone(),
1932 }
1933 }
1934
1935 #[must_use]
1937 pub fn client_order_ids_emulated(
1938 &self,
1939 venue: Option<&Venue>,
1940 instrument_id: Option<&InstrumentId>,
1941 strategy_id: Option<&StrategyId>,
1942 ) -> HashSet<ClientOrderId> {
1943 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1944 match query {
1945 Some(query) => self
1946 .index
1947 .orders_emulated
1948 .intersection(&query)
1949 .copied()
1950 .collect(),
1951 None => self.index.orders_emulated.clone(),
1952 }
1953 }
1954
1955 #[must_use]
1957 pub fn client_order_ids_inflight(
1958 &self,
1959 venue: Option<&Venue>,
1960 instrument_id: Option<&InstrumentId>,
1961 strategy_id: Option<&StrategyId>,
1962 ) -> HashSet<ClientOrderId> {
1963 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1964 match query {
1965 Some(query) => self
1966 .index
1967 .orders_inflight
1968 .intersection(&query)
1969 .copied()
1970 .collect(),
1971 None => self.index.orders_inflight.clone(),
1972 }
1973 }
1974
1975 #[must_use]
1977 pub fn position_ids(
1978 &self,
1979 venue: Option<&Venue>,
1980 instrument_id: Option<&InstrumentId>,
1981 strategy_id: Option<&StrategyId>,
1982 ) -> HashSet<PositionId> {
1983 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
1984 match query {
1985 Some(query) => self.index.positions.intersection(&query).copied().collect(),
1986 None => self.index.positions.clone(),
1987 }
1988 }
1989
1990 #[must_use]
1992 pub fn position_open_ids(
1993 &self,
1994 venue: Option<&Venue>,
1995 instrument_id: Option<&InstrumentId>,
1996 strategy_id: Option<&StrategyId>,
1997 ) -> HashSet<PositionId> {
1998 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
1999 match query {
2000 Some(query) => self
2001 .index
2002 .positions_open
2003 .intersection(&query)
2004 .copied()
2005 .collect(),
2006 None => self.index.positions_open.clone(),
2007 }
2008 }
2009
2010 #[must_use]
2012 pub fn position_closed_ids(
2013 &self,
2014 venue: Option<&Venue>,
2015 instrument_id: Option<&InstrumentId>,
2016 strategy_id: Option<&StrategyId>,
2017 ) -> HashSet<PositionId> {
2018 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2019 match query {
2020 Some(query) => self
2021 .index
2022 .positions_closed
2023 .intersection(&query)
2024 .copied()
2025 .collect(),
2026 None => self.index.positions_closed.clone(),
2027 }
2028 }
2029
2030 #[must_use]
2032 pub fn actor_ids(&self) -> HashSet<ComponentId> {
2033 self.index.actors.clone()
2034 }
2035
2036 #[must_use]
2038 pub fn strategy_ids(&self) -> HashSet<StrategyId> {
2039 self.index.strategies.clone()
2040 }
2041
2042 #[must_use]
2044 pub fn exec_algorithm_ids(&self) -> HashSet<ExecAlgorithmId> {
2045 self.index.exec_algorithms.clone()
2046 }
2047
2048 #[must_use]
2052 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2053 self.orders.get(client_order_id)
2054 }
2055
2056 #[must_use]
2058 pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2059 self.orders.get_mut(client_order_id)
2060 }
2061
2062 #[must_use]
2064 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2065 self.index.venue_order_ids.get(venue_order_id)
2066 }
2067
2068 #[must_use]
2070 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2071 self.index.client_order_ids.get(client_order_id)
2072 }
2073
2074 #[must_use]
2076 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2077 self.index.order_client.get(client_order_id)
2078 }
2079
2080 #[must_use]
2082 pub fn orders(
2083 &self,
2084 venue: Option<&Venue>,
2085 instrument_id: Option<&InstrumentId>,
2086 strategy_id: Option<&StrategyId>,
2087 side: Option<OrderSide>,
2088 ) -> Vec<&OrderAny> {
2089 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2090 self.get_orders_for_ids(&client_order_ids, side)
2091 }
2092
2093 #[must_use]
2095 pub fn orders_open(
2096 &self,
2097 venue: Option<&Venue>,
2098 instrument_id: Option<&InstrumentId>,
2099 strategy_id: Option<&StrategyId>,
2100 side: Option<OrderSide>,
2101 ) -> Vec<&OrderAny> {
2102 let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2103 self.get_orders_for_ids(&client_order_ids, side)
2104 }
2105
2106 #[must_use]
2108 pub fn orders_closed(
2109 &self,
2110 venue: Option<&Venue>,
2111 instrument_id: Option<&InstrumentId>,
2112 strategy_id: Option<&StrategyId>,
2113 side: Option<OrderSide>,
2114 ) -> Vec<&OrderAny> {
2115 let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2116 self.get_orders_for_ids(&client_order_ids, side)
2117 }
2118
2119 #[must_use]
2121 pub fn orders_emulated(
2122 &self,
2123 venue: Option<&Venue>,
2124 instrument_id: Option<&InstrumentId>,
2125 strategy_id: Option<&StrategyId>,
2126 side: Option<OrderSide>,
2127 ) -> Vec<&OrderAny> {
2128 let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2129 self.get_orders_for_ids(&client_order_ids, side)
2130 }
2131
2132 #[must_use]
2134 pub fn orders_inflight(
2135 &self,
2136 venue: Option<&Venue>,
2137 instrument_id: Option<&InstrumentId>,
2138 strategy_id: Option<&StrategyId>,
2139 side: Option<OrderSide>,
2140 ) -> Vec<&OrderAny> {
2141 let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2142 self.get_orders_for_ids(&client_order_ids, side)
2143 }
2144
2145 #[must_use]
2147 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2148 let client_order_ids = self.index.position_orders.get(position_id);
2149 match client_order_ids {
2150 Some(client_order_ids) => {
2151 self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2152 }
2153 None => Vec::new(),
2154 }
2155 }
2156
2157 #[must_use]
2159 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2160 self.index.orders.contains(client_order_id)
2161 }
2162
2163 #[must_use]
2165 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2166 self.index.orders_open.contains(client_order_id)
2167 }
2168
2169 #[must_use]
2171 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2172 self.index.orders_closed.contains(client_order_id)
2173 }
2174
2175 #[must_use]
2177 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2178 self.index.orders_emulated.contains(client_order_id)
2179 }
2180
2181 #[must_use]
2183 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2184 self.index.orders_inflight.contains(client_order_id)
2185 }
2186
2187 #[must_use]
2189 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2190 self.index.orders_pending_cancel.contains(client_order_id)
2191 }
2192
2193 #[must_use]
2195 pub fn orders_open_count(
2196 &self,
2197 venue: Option<&Venue>,
2198 instrument_id: Option<&InstrumentId>,
2199 strategy_id: Option<&StrategyId>,
2200 side: Option<OrderSide>,
2201 ) -> usize {
2202 self.orders_open(venue, instrument_id, strategy_id, side)
2203 .len()
2204 }
2205
2206 #[must_use]
2208 pub fn orders_closed_count(
2209 &self,
2210 venue: Option<&Venue>,
2211 instrument_id: Option<&InstrumentId>,
2212 strategy_id: Option<&StrategyId>,
2213 side: Option<OrderSide>,
2214 ) -> usize {
2215 self.orders_closed(venue, instrument_id, strategy_id, side)
2216 .len()
2217 }
2218
2219 #[must_use]
2221 pub fn orders_emulated_count(
2222 &self,
2223 venue: Option<&Venue>,
2224 instrument_id: Option<&InstrumentId>,
2225 strategy_id: Option<&StrategyId>,
2226 side: Option<OrderSide>,
2227 ) -> usize {
2228 self.orders_emulated(venue, instrument_id, strategy_id, side)
2229 .len()
2230 }
2231
2232 #[must_use]
2234 pub fn orders_inflight_count(
2235 &self,
2236 venue: Option<&Venue>,
2237 instrument_id: Option<&InstrumentId>,
2238 strategy_id: Option<&StrategyId>,
2239 side: Option<OrderSide>,
2240 ) -> usize {
2241 self.orders_inflight(venue, instrument_id, strategy_id, side)
2242 .len()
2243 }
2244
2245 #[must_use]
2247 pub fn orders_total_count(
2248 &self,
2249 venue: Option<&Venue>,
2250 instrument_id: Option<&InstrumentId>,
2251 strategy_id: Option<&StrategyId>,
2252 side: Option<OrderSide>,
2253 ) -> usize {
2254 self.orders(venue, instrument_id, strategy_id, side).len()
2255 }
2256
2257 #[must_use]
2259 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2260 self.order_lists.get(order_list_id)
2261 }
2262
2263 #[must_use]
2265 pub fn order_lists(
2266 &self,
2267 venue: Option<&Venue>,
2268 instrument_id: Option<&InstrumentId>,
2269 strategy_id: Option<&StrategyId>,
2270 ) -> Vec<&OrderList> {
2271 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2272
2273 if let Some(venue) = venue {
2274 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2275 }
2276
2277 if let Some(instrument_id) = instrument_id {
2278 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2279 }
2280
2281 if let Some(strategy_id) = strategy_id {
2282 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2283 }
2284
2285 order_lists
2286 }
2287
2288 #[must_use]
2290 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2291 self.order_lists.contains_key(order_list_id)
2292 }
2293
2294 #[must_use]
2299 pub fn orders_for_exec_algorithm(
2300 &self,
2301 exec_algorithm_id: &ExecAlgorithmId,
2302 venue: Option<&Venue>,
2303 instrument_id: Option<&InstrumentId>,
2304 strategy_id: Option<&StrategyId>,
2305 side: Option<OrderSide>,
2306 ) -> Vec<&OrderAny> {
2307 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2308 let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2309
2310 if let Some(query) = query {
2311 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2312 let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2313 }
2314 }
2315
2316 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2317 self.get_orders_for_ids(exec_algorithm_order_ids, side)
2318 } else {
2319 Vec::new()
2320 }
2321 }
2322
2323 #[must_use]
2325 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2326 self.get_orders_for_ids(
2327 self.index
2328 .exec_spawn_orders
2329 .get(exec_spawn_id)
2330 .unwrap_or(&HashSet::new()),
2331 None,
2332 )
2333 }
2334
2335 #[must_use]
2337 pub fn exec_spawn_total_quantity(
2338 &self,
2339 exec_spawn_id: &ClientOrderId,
2340 active_only: bool,
2341 ) -> Option<Quantity> {
2342 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2343
2344 let mut total_quantity: Option<Quantity> = None;
2345
2346 for spawn_order in exec_spawn_orders {
2347 if !active_only || !spawn_order.is_closed() {
2348 if let Some(mut total_quantity) = total_quantity {
2349 total_quantity += spawn_order.quantity();
2350 }
2351 } else {
2352 total_quantity = Some(spawn_order.quantity());
2353 }
2354 }
2355
2356 total_quantity
2357 }
2358
2359 #[must_use]
2361 pub fn exec_spawn_total_filled_qty(
2362 &self,
2363 exec_spawn_id: &ClientOrderId,
2364 active_only: bool,
2365 ) -> Option<Quantity> {
2366 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2367
2368 let mut total_quantity: Option<Quantity> = None;
2369
2370 for spawn_order in exec_spawn_orders {
2371 if !active_only || !spawn_order.is_closed() {
2372 if let Some(mut total_quantity) = total_quantity {
2373 total_quantity += spawn_order.filled_qty();
2374 }
2375 } else {
2376 total_quantity = Some(spawn_order.filled_qty());
2377 }
2378 }
2379
2380 total_quantity
2381 }
2382
2383 #[must_use]
2385 pub fn exec_spawn_total_leaves_qty(
2386 &self,
2387 exec_spawn_id: &ClientOrderId,
2388 active_only: bool,
2389 ) -> Option<Quantity> {
2390 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2391
2392 let mut total_quantity: Option<Quantity> = None;
2393
2394 for spawn_order in exec_spawn_orders {
2395 if !active_only || !spawn_order.is_closed() {
2396 if let Some(mut total_quantity) = total_quantity {
2397 total_quantity += spawn_order.leaves_qty();
2398 }
2399 } else {
2400 total_quantity = Some(spawn_order.leaves_qty());
2401 }
2402 }
2403
2404 total_quantity
2405 }
2406
2407 #[must_use]
2411 pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2412 self.positions.get(position_id)
2413 }
2414
2415 #[must_use]
2417 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2418 self.index
2419 .order_position
2420 .get(client_order_id)
2421 .and_then(|position_id| self.positions.get(position_id))
2422 }
2423
2424 #[must_use]
2426 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2427 self.index.order_position.get(client_order_id)
2428 }
2429
2430 #[must_use]
2432 pub fn positions(
2433 &self,
2434 venue: Option<&Venue>,
2435 instrument_id: Option<&InstrumentId>,
2436 strategy_id: Option<&StrategyId>,
2437 side: Option<PositionSide>,
2438 ) -> Vec<&Position> {
2439 let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2440 self.get_positions_for_ids(&position_ids, side)
2441 }
2442
2443 #[must_use]
2445 pub fn positions_open(
2446 &self,
2447 venue: Option<&Venue>,
2448 instrument_id: Option<&InstrumentId>,
2449 strategy_id: Option<&StrategyId>,
2450 side: Option<PositionSide>,
2451 ) -> Vec<&Position> {
2452 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2453 self.get_positions_for_ids(&position_ids, side)
2454 }
2455
2456 #[must_use]
2458 pub fn positions_closed(
2459 &self,
2460 venue: Option<&Venue>,
2461 instrument_id: Option<&InstrumentId>,
2462 strategy_id: Option<&StrategyId>,
2463 side: Option<PositionSide>,
2464 ) -> Vec<&Position> {
2465 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2466 self.get_positions_for_ids(&position_ids, side)
2467 }
2468
2469 #[must_use]
2471 pub fn position_exists(&self, position_id: &PositionId) -> bool {
2472 self.index.positions.contains(position_id)
2473 }
2474
2475 #[must_use]
2477 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2478 self.index.positions_open.contains(position_id)
2479 }
2480
2481 #[must_use]
2483 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2484 self.index.positions_closed.contains(position_id)
2485 }
2486
2487 #[must_use]
2489 pub fn positions_open_count(
2490 &self,
2491 venue: Option<&Venue>,
2492 instrument_id: Option<&InstrumentId>,
2493 strategy_id: Option<&StrategyId>,
2494 side: Option<PositionSide>,
2495 ) -> usize {
2496 self.positions_open(venue, instrument_id, strategy_id, side)
2497 .len()
2498 }
2499
2500 #[must_use]
2502 pub fn positions_closed_count(
2503 &self,
2504 venue: Option<&Venue>,
2505 instrument_id: Option<&InstrumentId>,
2506 strategy_id: Option<&StrategyId>,
2507 side: Option<PositionSide>,
2508 ) -> usize {
2509 self.positions_closed(venue, instrument_id, strategy_id, side)
2510 .len()
2511 }
2512
2513 #[must_use]
2515 pub fn positions_total_count(
2516 &self,
2517 venue: Option<&Venue>,
2518 instrument_id: Option<&InstrumentId>,
2519 strategy_id: Option<&StrategyId>,
2520 side: Option<PositionSide>,
2521 ) -> usize {
2522 self.positions(venue, instrument_id, strategy_id, side)
2523 .len()
2524 }
2525
2526 #[must_use]
2530 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2531 self.index.order_strategy.get(client_order_id)
2532 }
2533
2534 #[must_use]
2536 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2537 self.index.position_strategy.get(position_id)
2538 }
2539
2540 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2544 check_valid_string(key, stringify!(key)).expect(FAILED);
2545
2546 Ok(self.general.get(key))
2547 }
2548
2549 #[must_use]
2553 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2554 match price_type {
2555 PriceType::Bid => self
2556 .quotes
2557 .get(instrument_id)
2558 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2559 PriceType::Ask => self
2560 .quotes
2561 .get(instrument_id)
2562 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2563 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2564 quotes.front().map(|quote| {
2565 Price::new(
2566 f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2567 quote.bid_price.precision + 1,
2568 )
2569 })
2570 }),
2571 PriceType::Last => self
2572 .trades
2573 .get(instrument_id)
2574 .and_then(|trades| trades.front().map(|trade| trade.price)),
2575 PriceType::Mark => self
2576 .mark_prices
2577 .get(instrument_id)
2578 .and_then(|marks| marks.front().map(|mark| mark.value)),
2579 }
2580 }
2581
2582 #[must_use]
2584 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2585 self.quotes
2586 .get(instrument_id)
2587 .map(|quotes| quotes.iter().copied().collect())
2588 }
2589
2590 #[must_use]
2592 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2593 self.trades
2594 .get(instrument_id)
2595 .map(|trades| trades.iter().copied().collect())
2596 }
2597
2598 #[must_use]
2600 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
2601 self.mark_prices
2602 .get(instrument_id)
2603 .map(|mark_prices| mark_prices.iter().copied().collect())
2604 }
2605
2606 #[must_use]
2608 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
2609 self.index_prices
2610 .get(instrument_id)
2611 .map(|index_prices| index_prices.iter().copied().collect())
2612 }
2613
2614 #[must_use]
2616 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2617 self.bars
2618 .get(bar_type)
2619 .map(|bars| bars.iter().copied().collect())
2620 }
2621
2622 #[must_use]
2624 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
2625 self.books.get(instrument_id)
2626 }
2627
2628 #[must_use]
2630 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
2631 self.books.get_mut(instrument_id)
2632 }
2633
2634 #[must_use]
2636 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
2637 self.own_books.get(instrument_id)
2638 }
2639
2640 #[must_use]
2642 pub fn own_order_book_mut(
2643 &mut self,
2644 instrument_id: &InstrumentId,
2645 ) -> Option<&mut OwnOrderBook> {
2646 self.own_books.get_mut(instrument_id)
2647 }
2648
2649 #[must_use]
2651 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
2652 self.quotes
2653 .get(instrument_id)
2654 .and_then(|quotes| quotes.front())
2655 }
2656
2657 #[must_use]
2659 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
2660 self.trades
2661 .get(instrument_id)
2662 .and_then(|trades| trades.front())
2663 }
2664
2665 #[must_use]
2667 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
2668 self.mark_prices
2669 .get(instrument_id)
2670 .and_then(|mark_prices| mark_prices.front())
2671 }
2672
2673 #[must_use]
2675 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
2676 self.index_prices
2677 .get(instrument_id)
2678 .and_then(|index_prices| index_prices.front())
2679 }
2680
2681 #[must_use]
2683 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
2684 self.bars.get(bar_type).and_then(|bars| bars.front())
2685 }
2686
2687 #[must_use]
2689 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
2690 self.books
2691 .get(instrument_id)
2692 .map_or(0, |book| book.update_count) as usize
2693 }
2694
2695 #[must_use]
2697 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
2698 self.quotes
2699 .get(instrument_id)
2700 .map_or(0, std::collections::VecDeque::len)
2701 }
2702
2703 #[must_use]
2705 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
2706 self.trades
2707 .get(instrument_id)
2708 .map_or(0, std::collections::VecDeque::len)
2709 }
2710
2711 #[must_use]
2713 pub fn bar_count(&self, bar_type: &BarType) -> usize {
2714 self.bars
2715 .get(bar_type)
2716 .map_or(0, std::collections::VecDeque::len)
2717 }
2718
2719 #[must_use]
2721 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
2722 self.books.contains_key(instrument_id)
2723 }
2724
2725 #[must_use]
2727 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
2728 self.quote_count(instrument_id) > 0
2729 }
2730
2731 #[must_use]
2733 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
2734 self.trade_count(instrument_id) > 0
2735 }
2736
2737 #[must_use]
2739 pub fn has_bars(&self, bar_type: &BarType) -> bool {
2740 self.bar_count(bar_type) > 0
2741 }
2742
2743 #[must_use]
2744 pub fn get_xrate(
2745 &self,
2746 venue: Venue,
2747 from_currency: Currency,
2748 to_currency: Currency,
2749 price_type: PriceType,
2750 ) -> Option<f64> {
2751 if from_currency == to_currency {
2752 return Some(1.0);
2755 }
2756
2757 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
2758
2759 match get_exchange_rate(
2760 from_currency.code,
2761 to_currency.code,
2762 price_type,
2763 bid_quote,
2764 ask_quote,
2765 ) {
2766 Ok(rate) => rate,
2767 Err(e) => {
2768 log::error!("Failed to calculate xrate: {e}");
2769 None
2770 }
2771 }
2772 }
2773
2774 fn build_quote_table(&self, venue: &Venue) -> (HashMap<String, f64>, HashMap<String, f64>) {
2775 let mut bid_quotes = HashMap::new();
2776 let mut ask_quotes = HashMap::new();
2777
2778 for instrument_id in self.instruments.keys() {
2779 if instrument_id.venue != *venue {
2780 continue;
2781 }
2782
2783 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
2784 if let Some(tick) = ticks.front() {
2785 (tick.bid_price, tick.ask_price)
2786 } else {
2787 continue; }
2789 } else {
2790 let bid_bar = self
2791 .bars
2792 .iter()
2793 .find(|(k, _)| {
2794 k.instrument_id() == *instrument_id
2795 && matches!(k.spec().price_type, PriceType::Bid)
2796 })
2797 .map(|(_, v)| v);
2798
2799 let ask_bar = self
2800 .bars
2801 .iter()
2802 .find(|(k, _)| {
2803 k.instrument_id() == *instrument_id
2804 && matches!(k.spec().price_type, PriceType::Ask)
2805 })
2806 .map(|(_, v)| v);
2807
2808 match (bid_bar, ask_bar) {
2809 (Some(bid), Some(ask)) => {
2810 let bid_price = bid.front().unwrap().close;
2811 let ask_price = ask.front().unwrap().close;
2812
2813 (bid_price, ask_price)
2814 }
2815 _ => continue,
2816 }
2817 };
2818
2819 bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
2820 ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
2821 }
2822
2823 (bid_quotes, ask_quotes)
2824 }
2825
2826 #[must_use]
2828 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
2829 self.mark_xrates.get(&(from_currency, to_currency)).copied()
2830 }
2831
2832 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
2838 assert!(xrate > 0.0, "xrate was zero");
2839 self.mark_xrates.insert((from_currency, to_currency), xrate);
2840 self.mark_xrates
2841 .insert((to_currency, from_currency), 1.0 / xrate);
2842 }
2843
2844 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
2846 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
2847 }
2848
2849 pub fn clear_mark_xrates(&mut self) {
2851 self.mark_xrates.clear();
2852 }
2853
2854 #[must_use]
2858 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
2859 self.instruments.get(instrument_id)
2860 }
2861
2862 #[must_use]
2864 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
2865 self.instruments
2866 .keys()
2867 .filter(|i| venue.is_none() || &i.venue == venue.unwrap())
2868 .collect()
2869 }
2870
2871 #[must_use]
2873 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
2874 self.instruments
2875 .values()
2876 .filter(|i| &i.id().venue == venue)
2877 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
2878 .collect()
2879 }
2880
2881 #[must_use]
2883 pub fn bar_types(
2884 &self,
2885 instrument_id: Option<&InstrumentId>,
2886 price_type: Option<&PriceType>,
2887 aggregation_source: AggregationSource,
2888 ) -> Vec<&BarType> {
2889 let mut bar_types = self
2890 .bars
2891 .keys()
2892 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
2893 .collect::<Vec<&BarType>>();
2894
2895 if let Some(instrument_id) = instrument_id {
2896 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
2897 }
2898
2899 if let Some(price_type) = price_type {
2900 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
2901 }
2902
2903 bar_types
2904 }
2905
2906 #[must_use]
2910 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
2911 self.synthetics.get(instrument_id)
2912 }
2913
2914 #[must_use]
2916 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
2917 self.synthetics.keys().collect()
2918 }
2919
2920 #[must_use]
2922 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
2923 self.synthetics.values().collect()
2924 }
2925
2926 #[must_use]
2930 pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
2931 self.accounts.get(account_id)
2932 }
2933
2934 #[must_use]
2936 pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
2937 self.index
2938 .venue_account
2939 .get(venue)
2940 .and_then(|account_id| self.accounts.get(account_id))
2941 }
2942
2943 #[must_use]
2945 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
2946 self.index.venue_account.get(venue)
2947 }
2948
2949 #[must_use]
2951 pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
2952 self.accounts
2953 .values()
2954 .filter(|account| &account.id() == account_id)
2955 .collect()
2956 }
2957}