1pub mod config;
19pub mod database;
20
21mod index;
22
23#[cfg(test)]
24mod tests;
25
26use std::{
27 collections::{HashMap, HashSet, VecDeque},
28 time::{SystemTime, UNIX_EPOCH},
29};
30
31use bytes::Bytes;
32pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
34use index::CacheIndex;
35use nautilus_core::{
36 UUID4, UnixNanos,
37 correctness::{
38 FAILED, check_key_not_in_map, check_predicate_false, check_slice_not_empty,
39 check_valid_string,
40 },
41 datetime::secs_to_nanos,
42};
43use nautilus_model::{
44 accounts::AccountAny,
45 data::{
46 Bar, BarType, GreeksData, QuoteTick, TradeTick, YieldCurveData,
47 prices::{IndexPriceUpdate, MarkPriceUpdate},
48 },
49 enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
50 identifiers::{
51 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
52 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
53 },
54 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
55 orderbook::{OrderBook, own::OwnOrderBook},
56 orders::{Order, OrderAny, OrderList},
57 position::Position,
58 types::{Currency, Money, Price, Quantity},
59};
60use ustr::Ustr;
61
62use crate::xrate::get_exchange_rate;
63
64#[allow(missing_debug_implementations)]
66pub struct Cache {
67 config: CacheConfig,
68 index: CacheIndex,
69 database: Option<Box<dyn CacheDatabaseAdapter>>,
70 general: HashMap<String, Bytes>,
71 currencies: HashMap<Ustr, Currency>,
72 instruments: HashMap<InstrumentId, InstrumentAny>,
73 synthetics: HashMap<InstrumentId, SyntheticInstrument>,
74 books: HashMap<InstrumentId, OrderBook>,
75 own_books: HashMap<InstrumentId, OwnOrderBook>,
76 quotes: HashMap<InstrumentId, VecDeque<QuoteTick>>,
77 trades: HashMap<InstrumentId, VecDeque<TradeTick>>,
78 mark_xrates: HashMap<(Currency, Currency), f64>,
79 mark_prices: HashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
80 index_prices: HashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
81 bars: HashMap<BarType, VecDeque<Bar>>,
82 greeks: HashMap<InstrumentId, GreeksData>,
83 yield_curves: HashMap<String, YieldCurveData>,
84 accounts: HashMap<AccountId, AccountAny>,
85 orders: HashMap<ClientOrderId, OrderAny>,
86 order_lists: HashMap<OrderListId, OrderList>,
87 positions: HashMap<PositionId, Position>,
88 position_snapshots: HashMap<PositionId, Bytes>,
89}
90
91impl Default for Cache {
92 fn default() -> Self {
94 Self::new(Some(CacheConfig::default()), None)
95 }
96}
97
98impl Cache {
99 #[must_use]
101 pub fn new(
102 config: Option<CacheConfig>,
103 database: Option<Box<dyn CacheDatabaseAdapter>>,
104 ) -> Self {
105 Self {
106 config: config.unwrap_or_default(),
107 index: CacheIndex::default(),
108 database,
109 general: HashMap::new(),
110 currencies: HashMap::new(),
111 instruments: HashMap::new(),
112 synthetics: HashMap::new(),
113 books: HashMap::new(),
114 own_books: HashMap::new(),
115 quotes: HashMap::new(),
116 trades: HashMap::new(),
117 mark_xrates: HashMap::new(),
118 mark_prices: HashMap::new(),
119 index_prices: HashMap::new(),
120 bars: HashMap::new(),
121 greeks: HashMap::new(),
122 yield_curves: HashMap::new(),
123 accounts: HashMap::new(),
124 orders: HashMap::new(),
125 order_lists: HashMap::new(),
126 positions: HashMap::new(),
127 position_snapshots: HashMap::new(),
128 }
129 }
130
131 #[must_use]
133 pub fn memory_address(&self) -> String {
134 format!("{:?}", std::ptr::from_ref(self))
135 }
136
137 pub fn cache_general(&mut self) -> anyhow::Result<()> {
141 self.general = match &mut self.database {
142 Some(db) => db.load()?,
143 None => HashMap::new(),
144 };
145
146 log::info!(
147 "Cached {} general object(s) from database",
148 self.general.len()
149 );
150 Ok(())
151 }
152
153 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
155 let cache_map = match &self.database {
156 Some(db) => db.load_all().await?,
157 None => CacheMap::default(),
158 };
159
160 self.currencies = cache_map.currencies;
161 self.instruments = cache_map.instruments;
162 self.synthetics = cache_map.synthetics;
163 self.accounts = cache_map.accounts;
164 self.orders = cache_map.orders;
165 self.positions = cache_map.positions;
166 Ok(())
167 }
168
169 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
171 self.currencies = match &mut self.database {
172 Some(db) => db.load_currencies().await?,
173 None => HashMap::new(),
174 };
175
176 log::info!("Cached {} currencies from database", self.general.len());
177 Ok(())
178 }
179
180 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
182 self.instruments = match &mut self.database {
183 Some(db) => db.load_instruments().await?,
184 None => HashMap::new(),
185 };
186
187 log::info!("Cached {} instruments from database", self.general.len());
188 Ok(())
189 }
190
191 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
194 self.synthetics = match &mut self.database {
195 Some(db) => db.load_synthetics().await?,
196 None => HashMap::new(),
197 };
198
199 log::info!(
200 "Cached {} synthetic instruments from database",
201 self.general.len()
202 );
203 Ok(())
204 }
205
206 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
208 self.accounts = match &mut self.database {
209 Some(db) => db.load_accounts().await?,
210 None => HashMap::new(),
211 };
212
213 log::info!(
214 "Cached {} synthetic instruments from database",
215 self.general.len()
216 );
217 Ok(())
218 }
219
220 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
222 self.orders = match &mut self.database {
223 Some(db) => db.load_orders().await?,
224 None => HashMap::new(),
225 };
226
227 log::info!("Cached {} orders from database", self.general.len());
228 Ok(())
229 }
230
231 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
233 self.positions = match &mut self.database {
234 Some(db) => db.load_positions().await?,
235 None => HashMap::new(),
236 };
237
238 log::info!("Cached {} positions from database", self.general.len());
239 Ok(())
240 }
241
242 pub fn build_index(&mut self) {
244 log::debug!("Building index");
245
246 for account_id in self.accounts.keys() {
248 self.index
249 .venue_account
250 .insert(account_id.get_issuer(), *account_id);
251 }
252
253 for (client_order_id, order) in &self.orders {
255 let instrument_id = order.instrument_id();
256 let venue = instrument_id.venue;
257 let strategy_id = order.strategy_id();
258
259 self.index
261 .venue_orders
262 .entry(venue)
263 .or_default()
264 .insert(*client_order_id);
265
266 if let Some(venue_order_id) = order.venue_order_id() {
268 self.index
269 .venue_order_ids
270 .insert(venue_order_id, *client_order_id);
271 }
272
273 if let Some(position_id) = order.position_id() {
275 self.index
276 .order_position
277 .insert(*client_order_id, position_id);
278 }
279
280 self.index
282 .order_strategy
283 .insert(*client_order_id, order.strategy_id());
284
285 self.index
287 .instrument_orders
288 .entry(instrument_id)
289 .or_default()
290 .insert(*client_order_id);
291
292 self.index
294 .strategy_orders
295 .entry(strategy_id)
296 .or_default()
297 .insert(*client_order_id);
298
299 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
301 self.index
302 .exec_algorithm_orders
303 .entry(exec_algorithm_id)
304 .or_default()
305 .insert(*client_order_id);
306 }
307
308 if let Some(exec_spawn_id) = order.exec_spawn_id() {
310 self.index
311 .exec_spawn_orders
312 .entry(exec_spawn_id)
313 .or_default()
314 .insert(*client_order_id);
315 }
316
317 self.index.orders.insert(*client_order_id);
319
320 if order.is_open() {
322 self.index.orders_open.insert(*client_order_id);
323 }
324
325 if order.is_closed() {
327 self.index.orders_closed.insert(*client_order_id);
328 }
329
330 if let Some(emulation_trigger) = order.emulation_trigger() {
332 if emulation_trigger != TriggerType::NoTrigger && !order.is_closed() {
333 self.index.orders_emulated.insert(*client_order_id);
334 }
335 }
336
337 if order.is_inflight() {
339 self.index.orders_inflight.insert(*client_order_id);
340 }
341
342 self.index.strategies.insert(strategy_id);
344
345 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
347 self.index.exec_algorithms.insert(exec_algorithm_id);
348 }
349 }
350
351 for (position_id, position) in &self.positions {
353 let instrument_id = position.instrument_id;
354 let venue = instrument_id.venue;
355 let strategy_id = position.strategy_id;
356
357 self.index
359 .venue_positions
360 .entry(venue)
361 .or_default()
362 .insert(*position_id);
363
364 self.index
366 .position_strategy
367 .insert(*position_id, position.strategy_id);
368
369 self.index
371 .position_orders
372 .entry(*position_id)
373 .or_default()
374 .extend(position.client_order_ids().into_iter());
375
376 self.index
378 .instrument_positions
379 .entry(instrument_id)
380 .or_default()
381 .insert(*position_id);
382
383 self.index
385 .strategy_positions
386 .entry(strategy_id)
387 .or_default()
388 .insert(*position_id);
389
390 self.index.positions.insert(*position_id);
392
393 if position.is_open() {
395 self.index.positions_open.insert(*position_id);
396 }
397
398 if position.is_closed() {
400 self.index.positions_closed.insert(*position_id);
401 }
402
403 self.index.strategies.insert(strategy_id);
405 }
406 }
407
408 #[must_use]
410 pub const fn has_backing(&self) -> bool {
411 self.config.database.is_some()
412 }
413
414 #[must_use]
416 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
417 let quote = if let Some(quote) = self.quote(&position.instrument_id) {
418 quote
419 } else {
420 log::warn!(
421 "Cannot calculate unrealized PnL for {}, no quotes for {}",
422 position.id,
423 position.instrument_id
424 );
425 return None;
426 };
427
428 let last = match position.side {
429 PositionSide::Flat | PositionSide::NoPositionSide => {
430 return Some(Money::new(0.0, position.settlement_currency));
431 }
432 PositionSide::Long => quote.ask_price,
433 PositionSide::Short => quote.bid_price,
434 };
435
436 Some(position.unrealized_pnl(last))
437 }
438
439 #[must_use]
444 pub fn check_integrity(&mut self) -> bool {
445 let mut error_count = 0;
446 let failure = "Integrity failure";
447
448 let timestamp_us = SystemTime::now()
450 .duration_since(UNIX_EPOCH)
451 .expect("Time went backwards")
452 .as_micros();
453
454 log::info!("Checking data integrity");
455
456 for account_id in self.accounts.keys() {
458 if !self
459 .index
460 .venue_account
461 .contains_key(&account_id.get_issuer())
462 {
463 log::error!(
464 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
465 );
466 error_count += 1;
467 }
468 }
469
470 for (client_order_id, order) in &self.orders {
471 if !self.index.order_strategy.contains_key(client_order_id) {
472 log::error!(
473 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
474 );
475 error_count += 1;
476 }
477 if !self.index.orders.contains(client_order_id) {
478 log::error!(
479 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
480 );
481 error_count += 1;
482 }
483 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
484 log::error!(
485 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
486 );
487 error_count += 1;
488 }
489 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
490 log::error!(
491 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
492 );
493 error_count += 1;
494 }
495 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
496 log::error!(
497 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
498 );
499 error_count += 1;
500 }
501 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
502 if !self
503 .index
504 .exec_algorithm_orders
505 .contains_key(&exec_algorithm_id)
506 {
507 log::error!(
508 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
509 );
510 error_count += 1;
511 }
512 if order.exec_spawn_id().is_none()
513 && !self.index.exec_spawn_orders.contains_key(client_order_id)
514 {
515 log::error!(
516 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
517 );
518 error_count += 1;
519 }
520 }
521 }
522
523 for (position_id, position) in &self.positions {
524 if !self.index.position_strategy.contains_key(position_id) {
525 log::error!(
526 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
527 );
528 error_count += 1;
529 }
530 if !self.index.position_orders.contains_key(position_id) {
531 log::error!(
532 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
533 );
534 error_count += 1;
535 }
536 if !self.index.positions.contains(position_id) {
537 log::error!(
538 "{failure} in positions: {position_id} not found in `self.index.positions`",
539 );
540 error_count += 1;
541 }
542 if position.is_open() && !self.index.positions_open.contains(position_id) {
543 log::error!(
544 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
545 );
546 error_count += 1;
547 }
548 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
549 log::error!(
550 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
551 );
552 error_count += 1;
553 }
554 }
555
556 for account_id in self.index.venue_account.values() {
558 if !self.accounts.contains_key(account_id) {
559 log::error!(
560 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
561 );
562 error_count += 1;
563 }
564 }
565
566 for client_order_id in self.index.venue_order_ids.values() {
567 if !self.orders.contains_key(client_order_id) {
568 log::error!(
569 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
570 );
571 error_count += 1;
572 }
573 }
574
575 for client_order_id in self.index.client_order_ids.keys() {
576 if !self.orders.contains_key(client_order_id) {
577 log::error!(
578 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
579 );
580 error_count += 1;
581 }
582 }
583
584 for client_order_id in self.index.order_position.keys() {
585 if !self.orders.contains_key(client_order_id) {
586 log::error!(
587 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
588 );
589 error_count += 1;
590 }
591 }
592
593 for client_order_id in self.index.order_strategy.keys() {
595 if !self.orders.contains_key(client_order_id) {
596 log::error!(
597 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
598 );
599 error_count += 1;
600 }
601 }
602
603 for position_id in self.index.position_strategy.keys() {
604 if !self.positions.contains_key(position_id) {
605 log::error!(
606 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
607 );
608 error_count += 1;
609 }
610 }
611
612 for position_id in self.index.position_orders.keys() {
613 if !self.positions.contains_key(position_id) {
614 log::error!(
615 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
616 );
617 error_count += 1;
618 }
619 }
620
621 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
622 for client_order_id in client_order_ids {
623 if !self.orders.contains_key(client_order_id) {
624 log::error!(
625 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
626 );
627 error_count += 1;
628 }
629 }
630 }
631
632 for instrument_id in self.index.instrument_positions.keys() {
633 if !self.index.instrument_orders.contains_key(instrument_id) {
634 log::error!(
635 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
636 );
637 error_count += 1;
638 }
639 }
640
641 for client_order_ids in self.index.strategy_orders.values() {
642 for client_order_id in client_order_ids {
643 if !self.orders.contains_key(client_order_id) {
644 log::error!(
645 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
646 );
647 error_count += 1;
648 }
649 }
650 }
651
652 for position_ids in self.index.strategy_positions.values() {
653 for position_id in position_ids {
654 if !self.positions.contains_key(position_id) {
655 log::error!(
656 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
657 );
658 error_count += 1;
659 }
660 }
661 }
662
663 for client_order_id in &self.index.orders {
664 if !self.orders.contains_key(client_order_id) {
665 log::error!(
666 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
667 );
668 error_count += 1;
669 }
670 }
671
672 for client_order_id in &self.index.orders_emulated {
673 if !self.orders.contains_key(client_order_id) {
674 log::error!(
675 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
676 );
677 error_count += 1;
678 }
679 }
680
681 for client_order_id in &self.index.orders_inflight {
682 if !self.orders.contains_key(client_order_id) {
683 log::error!(
684 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
685 );
686 error_count += 1;
687 }
688 }
689
690 for client_order_id in &self.index.orders_open {
691 if !self.orders.contains_key(client_order_id) {
692 log::error!(
693 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
694 );
695 error_count += 1;
696 }
697 }
698
699 for client_order_id in &self.index.orders_closed {
700 if !self.orders.contains_key(client_order_id) {
701 log::error!(
702 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
703 );
704 error_count += 1;
705 }
706 }
707
708 for position_id in &self.index.positions {
709 if !self.positions.contains_key(position_id) {
710 log::error!(
711 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
712 );
713 error_count += 1;
714 }
715 }
716
717 for position_id in &self.index.positions_open {
718 if !self.positions.contains_key(position_id) {
719 log::error!(
720 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
721 );
722 error_count += 1;
723 }
724 }
725
726 for position_id in &self.index.positions_closed {
727 if !self.positions.contains_key(position_id) {
728 log::error!(
729 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
730 );
731 error_count += 1;
732 }
733 }
734
735 for strategy_id in &self.index.strategies {
736 if !self.index.strategy_orders.contains_key(strategy_id) {
737 log::error!(
738 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
739 );
740 error_count += 1;
741 }
742 }
743
744 for exec_algorithm_id in &self.index.exec_algorithms {
745 if !self
746 .index
747 .exec_algorithm_orders
748 .contains_key(exec_algorithm_id)
749 {
750 log::error!(
751 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
752 );
753 error_count += 1;
754 }
755 }
756
757 let total_us = SystemTime::now()
758 .duration_since(UNIX_EPOCH)
759 .expect("Time went backwards")
760 .as_micros()
761 - timestamp_us;
762
763 if error_count == 0 {
764 log::info!("Integrity check passed in {total_us}μs");
765 true
766 } else {
767 log::error!(
768 "Integrity check failed with {error_count} error{} in {total_us}μs",
769 if error_count == 1 { "" } else { "s" },
770 );
771 false
772 }
773 }
774
775 #[must_use]
779 pub fn check_residuals(&self) -> bool {
780 log::debug!("Checking residuals");
781
782 let mut residuals = false;
783
784 for order in self.orders_open(None, None, None, None) {
786 residuals = true;
787 log::warn!("Residual {order:?}");
788 }
789
790 for position in self.positions_open(None, None, None, None) {
792 residuals = true;
793 log::warn!("Residual {position}");
794 }
795
796 residuals
797 }
798
799 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
805 log::debug!(
806 "Purging closed orders{}",
807 if buffer_secs > 0 {
808 format!(" with buffer_secs={buffer_secs}")
809 } else {
810 String::new()
811 }
812 );
813
814 let buffer_ns = secs_to_nanos(buffer_secs as f64);
815
816 for client_order_id in self.index.orders_closed.clone() {
817 if let Some(order) = self.orders.get(&client_order_id) {
818 if let Some(ts_closed) = order.ts_closed() {
819 if ts_closed + buffer_ns <= ts_now {
820 self.purge_order(client_order_id);
821 }
822 }
823 }
824 }
825 }
826
827 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
829 log::debug!(
830 "Purging closed positions{}",
831 if buffer_secs > 0 {
832 format!(" with buffer_secs={buffer_secs}")
833 } else {
834 String::new()
835 }
836 );
837
838 let buffer_ns = secs_to_nanos(buffer_secs as f64);
839
840 for position_id in self.index.positions_closed.clone() {
841 if let Some(position) = self.positions.get(&position_id) {
842 if let Some(ts_closed) = position.ts_closed {
843 if ts_closed + buffer_ns <= ts_now {
844 self.purge_position(position_id);
845 }
846 }
847 }
848 }
849 }
850
851 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
855 if let Some(position_id) = self.index.order_position.get(&client_order_id) {
857 if let Some(position) = self.positions.get_mut(position_id) {
858 position.purge_events_for_order(client_order_id);
859 }
860 }
861
862 if let Some(order) = self.orders.remove(&client_order_id) {
863 if let Some(venue_orders) = self
865 .index
866 .venue_orders
867 .get_mut(&order.instrument_id().venue)
868 {
869 venue_orders.remove(&client_order_id);
870 }
871
872 if let Some(venue_order_id) = order.venue_order_id() {
874 self.index.venue_order_ids.remove(&venue_order_id);
875 }
876
877 if let Some(instrument_orders) =
879 self.index.instrument_orders.get_mut(&order.instrument_id())
880 {
881 instrument_orders.remove(&client_order_id);
882 }
883
884 if let Some(position_id) = order.position_id() {
886 if let Some(position_orders) = self.index.position_orders.get_mut(&position_id) {
887 position_orders.remove(&client_order_id);
888 }
889 }
890
891 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
893 if let Some(exec_algorithm_orders) =
894 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
895 {
896 exec_algorithm_orders.remove(&client_order_id);
897 }
898 }
899
900 log::info!("Purged order {client_order_id}");
901 } else {
902 log::warn!("Order {client_order_id} not found when purging");
903 }
904
905 self.index.order_position.remove(&client_order_id);
907 self.index.order_strategy.remove(&client_order_id);
908 self.index.order_client.remove(&client_order_id);
909 self.index.client_order_ids.remove(&client_order_id);
910 self.index.exec_spawn_orders.remove(&client_order_id);
911 self.index.orders.remove(&client_order_id);
912 self.index.orders_closed.remove(&client_order_id);
913 self.index.orders_emulated.remove(&client_order_id);
914 self.index.orders_inflight.remove(&client_order_id);
915 self.index.orders_pending_cancel.remove(&client_order_id);
916 }
917
918 pub fn purge_position(&mut self, position_id: PositionId) {
920 if let Some(position) = self.positions.remove(&position_id) {
921 if let Some(venue_positions) = self
923 .index
924 .venue_positions
925 .get_mut(&position.instrument_id.venue)
926 {
927 venue_positions.remove(&position_id);
928 }
929
930 if let Some(instrument_positions) = self
932 .index
933 .instrument_positions
934 .get_mut(&position.instrument_id)
935 {
936 instrument_positions.remove(&position_id);
937 }
938
939 if let Some(strategy_positions) =
941 self.index.strategy_positions.get_mut(&position.strategy_id)
942 {
943 strategy_positions.remove(&position_id);
944 }
945
946 for client_order_id in position.client_order_ids() {
948 self.index.order_position.remove(&client_order_id);
949 }
950
951 log::info!("Purged position {position_id}");
952 } else {
953 log::warn!("Position {position_id} not found when purging");
954 }
955
956 self.index.position_strategy.remove(&position_id);
958 self.index.position_orders.remove(&position_id);
959 self.index.positions.remove(&position_id);
960 self.index.positions_open.remove(&position_id);
961 self.index.positions_closed.remove(&position_id);
962 }
963
964 pub fn purge_account_events(&mut self, _ts_now: UnixNanos, lookback_secs: u64) {
969 log::debug!(
970 "Purging account events{}",
971 if lookback_secs > 0 {
972 format!(" with lookback_secs={lookback_secs}")
973 } else {
974 String::new()
975 }
976 );
977
978 }
992
993 pub fn clear_index(&mut self) {
995 self.index.clear();
996 log::debug!("Cleared index");
997 }
998
999 pub fn reset(&mut self) {
1003 log::debug!("Resetting cache");
1004
1005 self.general.clear();
1006 self.currencies.clear();
1007 self.instruments.clear();
1008 self.synthetics.clear();
1009 self.books.clear();
1010 self.own_books.clear();
1011 self.quotes.clear();
1012 self.trades.clear();
1013 self.mark_xrates.clear();
1014 self.mark_prices.clear();
1015 self.index_prices.clear();
1016 self.bars.clear();
1017 self.accounts.clear();
1018 self.orders.clear();
1019 self.order_lists.clear();
1020 self.positions.clear();
1021 self.position_snapshots.clear();
1022 self.greeks.clear();
1023 self.yield_curves.clear();
1024
1025 self.clear_index();
1026
1027 log::info!("Reset cache");
1028 }
1029
1030 pub fn dispose(&mut self) {
1032 if let Some(database) = &mut self.database {
1033 database.close().expect("Failed to close database");
1034 }
1035 }
1036
1037 pub fn flush_db(&mut self) {
1039 if let Some(database) = &mut self.database {
1040 database.flush().expect("Failed to flush database");
1041 }
1042 }
1043
1044 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1049 check_valid_string(key, stringify!(key)).expect(FAILED);
1050 check_predicate_false(value.is_empty(), stringify!(value)).expect(FAILED);
1051
1052 log::debug!("Adding general {key}");
1053 self.general.insert(key.to_string(), value.clone());
1054
1055 if let Some(database) = &mut self.database {
1056 database.add(key.to_string(), value)?;
1057 }
1058 Ok(())
1059 }
1060
1061 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1063 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1064
1065 if self.config.save_market_data {
1066 if let Some(database) = &mut self.database {
1067 database.add_order_book(&book)?;
1068 }
1069 }
1070
1071 self.books.insert(book.instrument_id, book);
1072 Ok(())
1073 }
1074
1075 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1077 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1078
1079 self.own_books.insert(own_book.instrument_id, own_book);
1080 Ok(())
1081 }
1082
1083 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1085 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1086
1087 if self.config.save_market_data {
1088 }
1090
1091 let mark_prices_deque = self
1092 .mark_prices
1093 .entry(mark_price.instrument_id)
1094 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1095 mark_prices_deque.push_front(mark_price);
1096 Ok(())
1097 }
1098
1099 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1101 log::debug!(
1102 "Adding `IndexPriceUpdate` for {}",
1103 index_price.instrument_id
1104 );
1105
1106 if self.config.save_market_data {
1107 }
1109
1110 let index_prices_deque = self
1111 .index_prices
1112 .entry(index_price.instrument_id)
1113 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1114 index_prices_deque.push_front(index_price);
1115 Ok(())
1116 }
1117
1118 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1120 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1121
1122 if self.config.save_market_data {
1123 if let Some(database) = &mut self.database {
1124 database.add_quote("e)?;
1125 }
1126 }
1127
1128 let quotes_deque = self
1129 .quotes
1130 .entry(quote.instrument_id)
1131 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1132 quotes_deque.push_front(quote);
1133 Ok(())
1134 }
1135
1136 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1138 check_slice_not_empty(quotes, stringify!(quotes)).unwrap();
1139
1140 let instrument_id = quotes[0].instrument_id;
1141 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1142
1143 if self.config.save_market_data {
1144 if let Some(database) = &mut self.database {
1145 for quote in quotes {
1146 database.add_quote(quote).unwrap();
1147 }
1148 }
1149 }
1150
1151 let quotes_deque = self
1152 .quotes
1153 .entry(instrument_id)
1154 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1155
1156 for quote in quotes {
1157 quotes_deque.push_front(*quote);
1158 }
1159 Ok(())
1160 }
1161
1162 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1164 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1165
1166 if self.config.save_market_data {
1167 if let Some(database) = &mut self.database {
1168 database.add_trade(&trade)?;
1169 }
1170 }
1171
1172 let trades_deque = self
1173 .trades
1174 .entry(trade.instrument_id)
1175 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1176 trades_deque.push_front(trade);
1177 Ok(())
1178 }
1179
1180 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1182 check_slice_not_empty(trades, stringify!(trades)).unwrap();
1183
1184 let instrument_id = trades[0].instrument_id;
1185 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1186
1187 if self.config.save_market_data {
1188 if let Some(database) = &mut self.database {
1189 for trade in trades {
1190 database.add_trade(trade).unwrap();
1191 }
1192 }
1193 }
1194
1195 let trades_deque = self
1196 .trades
1197 .entry(instrument_id)
1198 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1199
1200 for trade in trades {
1201 trades_deque.push_front(*trade);
1202 }
1203 Ok(())
1204 }
1205
1206 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1208 log::debug!("Adding `Bar` {}", bar.bar_type);
1209
1210 if self.config.save_market_data {
1211 if let Some(database) = &mut self.database {
1212 database.add_bar(&bar)?;
1213 }
1214 }
1215
1216 let bars = self
1217 .bars
1218 .entry(bar.bar_type)
1219 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1220 bars.push_front(bar);
1221 Ok(())
1222 }
1223
1224 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1226 check_slice_not_empty(bars, stringify!(bars)).unwrap();
1227
1228 let bar_type = bars[0].bar_type;
1229 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1230
1231 if self.config.save_market_data {
1232 if let Some(database) = &mut self.database {
1233 for bar in bars {
1234 database.add_bar(bar).unwrap();
1235 }
1236 }
1237 }
1238
1239 let bars_deque = self
1240 .bars
1241 .entry(bar_type)
1242 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1243
1244 for bar in bars {
1245 bars_deque.push_front(*bar);
1246 }
1247 Ok(())
1248 }
1249
1250 pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1252 log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1253
1254 if self.config.save_market_data {
1255 if let Some(_database) = &mut self.database {
1256 }
1258 }
1259
1260 self.greeks.insert(greeks.instrument_id, greeks);
1261 Ok(())
1262 }
1263
1264 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1266 self.greeks.get(instrument_id).cloned()
1267 }
1268
1269 pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1271 log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1272
1273 if self.config.save_market_data {
1274 if let Some(_database) = &mut self.database {
1275 }
1277 }
1278
1279 self.yield_curves
1280 .insert(yield_curve.curve_name.clone(), yield_curve);
1281 Ok(())
1282 }
1283
1284 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1286 self.yield_curves.get(key).map(|curve| {
1287 let curve_clone = curve.clone();
1288 Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1289 as Box<dyn Fn(f64) -> f64>
1290 })
1291 }
1292
1293 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1295 log::debug!("Adding `Currency` {}", currency.code);
1296
1297 if let Some(database) = &mut self.database {
1298 database.add_currency(¤cy)?;
1299 }
1300
1301 self.currencies.insert(currency.code, currency);
1302 Ok(())
1303 }
1304
1305 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1307 log::debug!("Adding `Instrument` {}", instrument.id());
1308
1309 if let Some(database) = &mut self.database {
1310 database.add_instrument(&instrument)?;
1311 }
1312
1313 self.instruments.insert(instrument.id(), instrument);
1314 Ok(())
1315 }
1316
1317 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1319 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1320
1321 if let Some(database) = &mut self.database {
1322 database.add_synthetic(&synthetic)?;
1323 }
1324
1325 self.synthetics.insert(synthetic.id, synthetic);
1326 Ok(())
1327 }
1328
1329 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1331 log::debug!("Adding `Account` {}", account.id());
1332
1333 if let Some(database) = &mut self.database {
1334 database.add_account(&account)?;
1335 }
1336
1337 let account_id = account.id();
1338 self.accounts.insert(account_id, account);
1339 self.index
1340 .venue_account
1341 .insert(account_id.get_issuer(), account_id);
1342 Ok(())
1343 }
1344
1345 pub fn add_venue_order_id(
1349 &mut self,
1350 client_order_id: &ClientOrderId,
1351 venue_order_id: &VenueOrderId,
1352 overwrite: bool,
1353 ) -> anyhow::Result<()> {
1354 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id) {
1355 if !overwrite && existing_venue_order_id != venue_order_id {
1356 anyhow::bail!(
1357 "Existing {existing_venue_order_id} for {client_order_id}
1358 did not match the given {venue_order_id}.
1359 If you are writing a test then try a different `venue_order_id`,
1360 otherwise this is probably a bug."
1361 );
1362 }
1363 }
1364
1365 self.index
1366 .client_order_ids
1367 .insert(*client_order_id, *venue_order_id);
1368 self.index
1369 .venue_order_ids
1370 .insert(*venue_order_id, *client_order_id);
1371
1372 Ok(())
1373 }
1374
1375 pub fn add_order(
1388 &mut self,
1389 order: OrderAny,
1390 position_id: Option<PositionId>,
1391 client_id: Option<ClientId>,
1392 replace_existing: bool,
1393 ) -> anyhow::Result<()> {
1394 let instrument_id = order.instrument_id();
1395 let venue = instrument_id.venue;
1396 let client_order_id = order.client_order_id();
1397 let strategy_id = order.strategy_id();
1398 let exec_algorithm_id = order.exec_algorithm_id();
1399 let exec_spawn_id = order.exec_spawn_id();
1400
1401 if !replace_existing {
1402 check_key_not_in_map(
1403 &client_order_id,
1404 &self.orders,
1405 stringify!(client_order_id),
1406 stringify!(orders),
1407 )
1408 .expect(FAILED);
1409 check_key_not_in_map(
1410 &client_order_id,
1411 &self.orders,
1412 stringify!(client_order_id),
1413 stringify!(orders),
1414 )
1415 .expect(FAILED);
1416 check_key_not_in_map(
1417 &client_order_id,
1418 &self.orders,
1419 stringify!(client_order_id),
1420 stringify!(orders),
1421 )
1422 .expect(FAILED);
1423 check_key_not_in_map(
1424 &client_order_id,
1425 &self.orders,
1426 stringify!(client_order_id),
1427 stringify!(orders),
1428 )
1429 .expect(FAILED);
1430 }
1431
1432 log::debug!("Adding {order:?}");
1433
1434 self.index.orders.insert(client_order_id);
1435 self.index
1436 .order_strategy
1437 .insert(client_order_id, strategy_id);
1438 self.index.strategies.insert(strategy_id);
1439
1440 self.index
1442 .venue_orders
1443 .entry(venue)
1444 .or_default()
1445 .insert(client_order_id);
1446
1447 self.index
1449 .instrument_orders
1450 .entry(instrument_id)
1451 .or_default()
1452 .insert(client_order_id);
1453
1454 self.index
1456 .strategy_orders
1457 .entry(strategy_id)
1458 .or_default()
1459 .insert(client_order_id);
1460
1461 if let Some(exec_algorithm_id) = exec_algorithm_id {
1463 self.index.exec_algorithms.insert(exec_algorithm_id);
1464
1465 self.index
1466 .exec_algorithm_orders
1467 .entry(exec_algorithm_id)
1468 .or_default()
1469 .insert(client_order_id);
1470
1471 self.index
1472 .exec_spawn_orders
1473 .entry(exec_spawn_id.expect("`exec_spawn_id` is guaranteed to exist"))
1474 .or_default()
1475 .insert(client_order_id);
1476 }
1477
1478 match order.emulation_trigger() {
1480 Some(_) => {
1481 self.index.orders_emulated.remove(&client_order_id);
1482 }
1483 None => {
1484 self.index.orders_emulated.insert(client_order_id);
1485 }
1486 }
1487
1488 if let Some(position_id) = position_id {
1490 self.add_position_id(
1491 &position_id,
1492 &order.instrument_id().venue,
1493 &client_order_id,
1494 &strategy_id,
1495 )?;
1496 }
1497
1498 if let Some(client_id) = client_id {
1500 self.index.order_client.insert(client_order_id, client_id);
1501 log::debug!("Indexed {client_id:?}");
1502 }
1503
1504 if let Some(database) = &mut self.database {
1505 database.add_order(&order, client_id)?;
1506 }
1511
1512 self.orders.insert(client_order_id, order);
1513
1514 Ok(())
1515 }
1516
1517 pub fn add_position_id(
1519 &mut self,
1520 position_id: &PositionId,
1521 venue: &Venue,
1522 client_order_id: &ClientOrderId,
1523 strategy_id: &StrategyId,
1524 ) -> anyhow::Result<()> {
1525 self.index
1526 .order_position
1527 .insert(*client_order_id, *position_id);
1528
1529 if let Some(database) = &mut self.database {
1531 database.index_order_position(*client_order_id, *position_id)?;
1532 }
1533
1534 self.index
1536 .position_strategy
1537 .insert(*position_id, *strategy_id);
1538
1539 self.index
1541 .position_orders
1542 .entry(*position_id)
1543 .or_default()
1544 .insert(*client_order_id);
1545
1546 self.index
1548 .strategy_positions
1549 .entry(*strategy_id)
1550 .or_default()
1551 .insert(*position_id);
1552
1553 self.index
1555 .venue_positions
1556 .entry(*venue)
1557 .or_default()
1558 .insert(*position_id);
1559
1560 Ok(())
1561 }
1562
1563 pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1565 self.positions.insert(position.id, position.clone());
1566 self.index.positions.insert(position.id);
1567 self.index.positions_open.insert(position.id);
1568
1569 log::debug!("Adding {position}");
1570
1571 self.add_position_id(
1572 &position.id,
1573 &position.instrument_id.venue,
1574 &position.opening_order_id,
1575 &position.strategy_id,
1576 )?;
1577
1578 let venue = position.instrument_id.venue;
1579 let venue_positions = self.index.venue_positions.entry(venue).or_default();
1580 venue_positions.insert(position.id);
1581
1582 let instrument_id = position.instrument_id;
1584 let instrument_positions = self
1585 .index
1586 .instrument_positions
1587 .entry(instrument_id)
1588 .or_default();
1589 instrument_positions.insert(position.id);
1590
1591 if let Some(database) = &mut self.database {
1592 database.add_position(&position)?;
1593 }
1602
1603 Ok(())
1604 }
1605
1606 pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1608 if let Some(database) = &mut self.database {
1609 database.update_account(&account)?;
1610 }
1611 Ok(())
1612 }
1613
1614 pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1616 let client_order_id = order.client_order_id();
1617
1618 if let Some(venue_order_id) = order.venue_order_id() {
1620 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1623 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1625 }
1626 }
1627
1628 if order.is_inflight() {
1630 self.index.orders_inflight.insert(client_order_id);
1631 } else {
1632 self.index.orders_inflight.remove(&client_order_id);
1633 }
1634
1635 if order.is_open() {
1637 self.index.orders_closed.remove(&client_order_id);
1638 self.index.orders_open.insert(client_order_id);
1639 } else if order.is_closed() {
1640 self.index.orders_open.remove(&client_order_id);
1641 self.index.orders_pending_cancel.remove(&client_order_id);
1642 self.index.orders_closed.insert(client_order_id);
1643 }
1644
1645 if let Some(emulation_trigger) = order.emulation_trigger() {
1647 match emulation_trigger {
1648 TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1649 _ => self.index.orders_emulated.insert(client_order_id),
1650 };
1651 }
1652
1653 if let Some(database) = &mut self.database {
1654 database.update_order(order.last_event())?;
1655 }
1660
1661 self.orders.insert(client_order_id, order.clone());
1663
1664 Ok(())
1665 }
1666
1667 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1669 self.index
1670 .orders_pending_cancel
1671 .insert(order.client_order_id());
1672 }
1673
1674 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1676 if position.is_open() {
1678 self.index.positions_open.insert(position.id);
1679 self.index.positions_closed.remove(&position.id);
1680 } else {
1681 self.index.positions_closed.insert(position.id);
1682 self.index.positions_open.remove(&position.id);
1683 }
1684
1685 if let Some(database) = &mut self.database {
1686 database.update_position(position)?;
1687 }
1692 Ok(())
1693 }
1694
1695 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1698 let position_id = position.id;
1699
1700 let mut copied_position = position.clone();
1701 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1702 copied_position.id = PositionId::new(new_id);
1703
1704 let position_serialized = serde_json::to_vec(&copied_position)?;
1706
1707 let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1708 let new_snapshots = match snapshots {
1709 Some(existing_snapshots) => {
1710 let mut combined = existing_snapshots.to_vec();
1711 combined.extend(position_serialized);
1712 Bytes::from(combined)
1713 }
1714 None => Bytes::from(position_serialized),
1715 };
1716 self.position_snapshots.insert(position_id, new_snapshots);
1717
1718 log::debug!("Snapshot {}", copied_position);
1719 Ok(())
1720 }
1721
1722 pub fn snapshot_position_state(
1723 &mut self,
1724 position: &Position,
1725 open_only: Option<bool>,
1728 ) -> anyhow::Result<()> {
1729 let open_only = open_only.unwrap_or(true);
1730
1731 if open_only && !position.is_open() {
1732 return Ok(());
1733 }
1734
1735 if let Some(database) = &mut self.database {
1736 database.snapshot_position_state(position).map_err(|e| {
1737 log::error!(
1738 "Failed to snapshot position state for {}: {e:?}",
1739 position.id
1740 );
1741 e
1742 })?;
1743 } else {
1744 log::warn!(
1745 "Cannot snapshot position state for {} (no database configured)",
1746 position.id
1747 );
1748 }
1749
1750 todo!()
1752 }
1753
1754 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1755 let database = if let Some(database) = &self.database {
1756 database
1757 } else {
1758 log::warn!(
1759 "Cannot snapshot order state for {} (no database configured)",
1760 order.client_order_id()
1761 );
1762 return Ok(());
1763 };
1764
1765 database.snapshot_order_state(order)
1766 }
1767
1768 fn build_order_query_filter_set(
1771 &self,
1772 venue: Option<&Venue>,
1773 instrument_id: Option<&InstrumentId>,
1774 strategy_id: Option<&StrategyId>,
1775 ) -> Option<HashSet<ClientOrderId>> {
1776 let mut query: Option<HashSet<ClientOrderId>> = None;
1777
1778 if let Some(venue) = venue {
1779 query = Some(
1780 self.index
1781 .venue_orders
1782 .get(venue)
1783 .cloned()
1784 .unwrap_or_default(),
1785 );
1786 }
1787
1788 if let Some(instrument_id) = instrument_id {
1789 let instrument_orders = self
1790 .index
1791 .instrument_orders
1792 .get(instrument_id)
1793 .cloned()
1794 .unwrap_or_default();
1795
1796 if let Some(existing_query) = &mut query {
1797 *existing_query = existing_query
1798 .intersection(&instrument_orders)
1799 .copied()
1800 .collect();
1801 } else {
1802 query = Some(instrument_orders);
1803 }
1804 }
1805
1806 if let Some(strategy_id) = strategy_id {
1807 let strategy_orders = self
1808 .index
1809 .strategy_orders
1810 .get(strategy_id)
1811 .cloned()
1812 .unwrap_or_default();
1813
1814 if let Some(existing_query) = &mut query {
1815 *existing_query = existing_query
1816 .intersection(&strategy_orders)
1817 .copied()
1818 .collect();
1819 } else {
1820 query = Some(strategy_orders);
1821 }
1822 }
1823
1824 query
1825 }
1826
1827 fn build_position_query_filter_set(
1828 &self,
1829 venue: Option<&Venue>,
1830 instrument_id: Option<&InstrumentId>,
1831 strategy_id: Option<&StrategyId>,
1832 ) -> Option<HashSet<PositionId>> {
1833 let mut query: Option<HashSet<PositionId>> = None;
1834
1835 if let Some(venue) = venue {
1836 query = Some(
1837 self.index
1838 .venue_positions
1839 .get(venue)
1840 .cloned()
1841 .unwrap_or_default(),
1842 );
1843 }
1844
1845 if let Some(instrument_id) = instrument_id {
1846 let instrument_positions = self
1847 .index
1848 .instrument_positions
1849 .get(instrument_id)
1850 .cloned()
1851 .unwrap_or_default();
1852
1853 if let Some(existing_query) = query {
1854 query = Some(
1855 existing_query
1856 .intersection(&instrument_positions)
1857 .copied()
1858 .collect(),
1859 );
1860 } else {
1861 query = Some(instrument_positions);
1862 }
1863 }
1864
1865 if let Some(strategy_id) = strategy_id {
1866 let strategy_positions = self
1867 .index
1868 .strategy_positions
1869 .get(strategy_id)
1870 .cloned()
1871 .unwrap_or_default();
1872
1873 if let Some(existing_query) = query {
1874 query = Some(
1875 existing_query
1876 .intersection(&strategy_positions)
1877 .copied()
1878 .collect(),
1879 );
1880 } else {
1881 query = Some(strategy_positions);
1882 }
1883 }
1884
1885 query
1886 }
1887
1888 fn get_orders_for_ids(
1889 &self,
1890 client_order_ids: &HashSet<ClientOrderId>,
1891 side: Option<OrderSide>,
1892 ) -> Vec<&OrderAny> {
1893 let side = side.unwrap_or(OrderSide::NoOrderSide);
1894 let mut orders = Vec::new();
1895
1896 for client_order_id in client_order_ids {
1897 let order = self
1898 .orders
1899 .get(client_order_id)
1900 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
1901 if side == OrderSide::NoOrderSide || side == order.order_side() {
1902 orders.push(order);
1903 }
1904 }
1905
1906 orders
1907 }
1908
1909 fn get_positions_for_ids(
1910 &self,
1911 position_ids: &HashSet<PositionId>,
1912 side: Option<PositionSide>,
1913 ) -> Vec<&Position> {
1914 let side = side.unwrap_or(PositionSide::NoPositionSide);
1915 let mut positions = Vec::new();
1916
1917 for position_id in position_ids {
1918 let position = self
1919 .positions
1920 .get(position_id)
1921 .unwrap_or_else(|| panic!("Position {position_id} not found"));
1922 if side == PositionSide::NoPositionSide || side == position.side {
1923 positions.push(position);
1924 }
1925 }
1926
1927 positions
1928 }
1929
1930 #[must_use]
1932 pub fn client_order_ids(
1933 &self,
1934 venue: Option<&Venue>,
1935 instrument_id: Option<&InstrumentId>,
1936 strategy_id: Option<&StrategyId>,
1937 ) -> HashSet<ClientOrderId> {
1938 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1939 match query {
1940 Some(query) => self.index.orders.intersection(&query).copied().collect(),
1941 None => self.index.orders.clone(),
1942 }
1943 }
1944
1945 #[must_use]
1947 pub fn client_order_ids_open(
1948 &self,
1949 venue: Option<&Venue>,
1950 instrument_id: Option<&InstrumentId>,
1951 strategy_id: Option<&StrategyId>,
1952 ) -> HashSet<ClientOrderId> {
1953 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1954 match query {
1955 Some(query) => self
1956 .index
1957 .orders_open
1958 .intersection(&query)
1959 .copied()
1960 .collect(),
1961 None => self.index.orders_open.clone(),
1962 }
1963 }
1964
1965 #[must_use]
1967 pub fn client_order_ids_closed(
1968 &self,
1969 venue: Option<&Venue>,
1970 instrument_id: Option<&InstrumentId>,
1971 strategy_id: Option<&StrategyId>,
1972 ) -> HashSet<ClientOrderId> {
1973 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1974 match query {
1975 Some(query) => self
1976 .index
1977 .orders_closed
1978 .intersection(&query)
1979 .copied()
1980 .collect(),
1981 None => self.index.orders_closed.clone(),
1982 }
1983 }
1984
1985 #[must_use]
1987 pub fn client_order_ids_emulated(
1988 &self,
1989 venue: Option<&Venue>,
1990 instrument_id: Option<&InstrumentId>,
1991 strategy_id: Option<&StrategyId>,
1992 ) -> HashSet<ClientOrderId> {
1993 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1994 match query {
1995 Some(query) => self
1996 .index
1997 .orders_emulated
1998 .intersection(&query)
1999 .copied()
2000 .collect(),
2001 None => self.index.orders_emulated.clone(),
2002 }
2003 }
2004
2005 #[must_use]
2007 pub fn client_order_ids_inflight(
2008 &self,
2009 venue: Option<&Venue>,
2010 instrument_id: Option<&InstrumentId>,
2011 strategy_id: Option<&StrategyId>,
2012 ) -> HashSet<ClientOrderId> {
2013 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2014 match query {
2015 Some(query) => self
2016 .index
2017 .orders_inflight
2018 .intersection(&query)
2019 .copied()
2020 .collect(),
2021 None => self.index.orders_inflight.clone(),
2022 }
2023 }
2024
2025 #[must_use]
2027 pub fn position_ids(
2028 &self,
2029 venue: Option<&Venue>,
2030 instrument_id: Option<&InstrumentId>,
2031 strategy_id: Option<&StrategyId>,
2032 ) -> HashSet<PositionId> {
2033 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2034 match query {
2035 Some(query) => self.index.positions.intersection(&query).copied().collect(),
2036 None => self.index.positions.clone(),
2037 }
2038 }
2039
2040 #[must_use]
2042 pub fn position_open_ids(
2043 &self,
2044 venue: Option<&Venue>,
2045 instrument_id: Option<&InstrumentId>,
2046 strategy_id: Option<&StrategyId>,
2047 ) -> HashSet<PositionId> {
2048 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2049 match query {
2050 Some(query) => self
2051 .index
2052 .positions_open
2053 .intersection(&query)
2054 .copied()
2055 .collect(),
2056 None => self.index.positions_open.clone(),
2057 }
2058 }
2059
2060 #[must_use]
2062 pub fn position_closed_ids(
2063 &self,
2064 venue: Option<&Venue>,
2065 instrument_id: Option<&InstrumentId>,
2066 strategy_id: Option<&StrategyId>,
2067 ) -> HashSet<PositionId> {
2068 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2069 match query {
2070 Some(query) => self
2071 .index
2072 .positions_closed
2073 .intersection(&query)
2074 .copied()
2075 .collect(),
2076 None => self.index.positions_closed.clone(),
2077 }
2078 }
2079
2080 #[must_use]
2082 pub fn actor_ids(&self) -> HashSet<ComponentId> {
2083 self.index.actors.clone()
2084 }
2085
2086 #[must_use]
2088 pub fn strategy_ids(&self) -> HashSet<StrategyId> {
2089 self.index.strategies.clone()
2090 }
2091
2092 #[must_use]
2094 pub fn exec_algorithm_ids(&self) -> HashSet<ExecAlgorithmId> {
2095 self.index.exec_algorithms.clone()
2096 }
2097
2098 #[must_use]
2102 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2103 self.orders.get(client_order_id)
2104 }
2105
2106 #[must_use]
2108 pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2109 self.orders.get_mut(client_order_id)
2110 }
2111
2112 #[must_use]
2114 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2115 self.index.venue_order_ids.get(venue_order_id)
2116 }
2117
2118 #[must_use]
2120 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2121 self.index.client_order_ids.get(client_order_id)
2122 }
2123
2124 #[must_use]
2126 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2127 self.index.order_client.get(client_order_id)
2128 }
2129
2130 #[must_use]
2132 pub fn orders(
2133 &self,
2134 venue: Option<&Venue>,
2135 instrument_id: Option<&InstrumentId>,
2136 strategy_id: Option<&StrategyId>,
2137 side: Option<OrderSide>,
2138 ) -> Vec<&OrderAny> {
2139 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2140 self.get_orders_for_ids(&client_order_ids, side)
2141 }
2142
2143 #[must_use]
2145 pub fn orders_open(
2146 &self,
2147 venue: Option<&Venue>,
2148 instrument_id: Option<&InstrumentId>,
2149 strategy_id: Option<&StrategyId>,
2150 side: Option<OrderSide>,
2151 ) -> Vec<&OrderAny> {
2152 let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2153 self.get_orders_for_ids(&client_order_ids, side)
2154 }
2155
2156 #[must_use]
2158 pub fn orders_closed(
2159 &self,
2160 venue: Option<&Venue>,
2161 instrument_id: Option<&InstrumentId>,
2162 strategy_id: Option<&StrategyId>,
2163 side: Option<OrderSide>,
2164 ) -> Vec<&OrderAny> {
2165 let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2166 self.get_orders_for_ids(&client_order_ids, side)
2167 }
2168
2169 #[must_use]
2171 pub fn orders_emulated(
2172 &self,
2173 venue: Option<&Venue>,
2174 instrument_id: Option<&InstrumentId>,
2175 strategy_id: Option<&StrategyId>,
2176 side: Option<OrderSide>,
2177 ) -> Vec<&OrderAny> {
2178 let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2179 self.get_orders_for_ids(&client_order_ids, side)
2180 }
2181
2182 #[must_use]
2184 pub fn orders_inflight(
2185 &self,
2186 venue: Option<&Venue>,
2187 instrument_id: Option<&InstrumentId>,
2188 strategy_id: Option<&StrategyId>,
2189 side: Option<OrderSide>,
2190 ) -> Vec<&OrderAny> {
2191 let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2192 self.get_orders_for_ids(&client_order_ids, side)
2193 }
2194
2195 #[must_use]
2197 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2198 let client_order_ids = self.index.position_orders.get(position_id);
2199 match client_order_ids {
2200 Some(client_order_ids) => {
2201 self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2202 }
2203 None => Vec::new(),
2204 }
2205 }
2206
2207 #[must_use]
2209 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2210 self.index.orders.contains(client_order_id)
2211 }
2212
2213 #[must_use]
2215 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2216 self.index.orders_open.contains(client_order_id)
2217 }
2218
2219 #[must_use]
2221 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2222 self.index.orders_closed.contains(client_order_id)
2223 }
2224
2225 #[must_use]
2227 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2228 self.index.orders_emulated.contains(client_order_id)
2229 }
2230
2231 #[must_use]
2233 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2234 self.index.orders_inflight.contains(client_order_id)
2235 }
2236
2237 #[must_use]
2239 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2240 self.index.orders_pending_cancel.contains(client_order_id)
2241 }
2242
2243 #[must_use]
2245 pub fn orders_open_count(
2246 &self,
2247 venue: Option<&Venue>,
2248 instrument_id: Option<&InstrumentId>,
2249 strategy_id: Option<&StrategyId>,
2250 side: Option<OrderSide>,
2251 ) -> usize {
2252 self.orders_open(venue, instrument_id, strategy_id, side)
2253 .len()
2254 }
2255
2256 #[must_use]
2258 pub fn orders_closed_count(
2259 &self,
2260 venue: Option<&Venue>,
2261 instrument_id: Option<&InstrumentId>,
2262 strategy_id: Option<&StrategyId>,
2263 side: Option<OrderSide>,
2264 ) -> usize {
2265 self.orders_closed(venue, instrument_id, strategy_id, side)
2266 .len()
2267 }
2268
2269 #[must_use]
2271 pub fn orders_emulated_count(
2272 &self,
2273 venue: Option<&Venue>,
2274 instrument_id: Option<&InstrumentId>,
2275 strategy_id: Option<&StrategyId>,
2276 side: Option<OrderSide>,
2277 ) -> usize {
2278 self.orders_emulated(venue, instrument_id, strategy_id, side)
2279 .len()
2280 }
2281
2282 #[must_use]
2284 pub fn orders_inflight_count(
2285 &self,
2286 venue: Option<&Venue>,
2287 instrument_id: Option<&InstrumentId>,
2288 strategy_id: Option<&StrategyId>,
2289 side: Option<OrderSide>,
2290 ) -> usize {
2291 self.orders_inflight(venue, instrument_id, strategy_id, side)
2292 .len()
2293 }
2294
2295 #[must_use]
2297 pub fn orders_total_count(
2298 &self,
2299 venue: Option<&Venue>,
2300 instrument_id: Option<&InstrumentId>,
2301 strategy_id: Option<&StrategyId>,
2302 side: Option<OrderSide>,
2303 ) -> usize {
2304 self.orders(venue, instrument_id, strategy_id, side).len()
2305 }
2306
2307 #[must_use]
2309 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2310 self.order_lists.get(order_list_id)
2311 }
2312
2313 #[must_use]
2315 pub fn order_lists(
2316 &self,
2317 venue: Option<&Venue>,
2318 instrument_id: Option<&InstrumentId>,
2319 strategy_id: Option<&StrategyId>,
2320 ) -> Vec<&OrderList> {
2321 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2322
2323 if let Some(venue) = venue {
2324 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2325 }
2326
2327 if let Some(instrument_id) = instrument_id {
2328 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2329 }
2330
2331 if let Some(strategy_id) = strategy_id {
2332 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2333 }
2334
2335 order_lists
2336 }
2337
2338 #[must_use]
2340 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2341 self.order_lists.contains_key(order_list_id)
2342 }
2343
2344 #[must_use]
2349 pub fn orders_for_exec_algorithm(
2350 &self,
2351 exec_algorithm_id: &ExecAlgorithmId,
2352 venue: Option<&Venue>,
2353 instrument_id: Option<&InstrumentId>,
2354 strategy_id: Option<&StrategyId>,
2355 side: Option<OrderSide>,
2356 ) -> Vec<&OrderAny> {
2357 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2358 let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2359
2360 if let Some(query) = query {
2361 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2362 let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2363 }
2364 }
2365
2366 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2367 self.get_orders_for_ids(exec_algorithm_order_ids, side)
2368 } else {
2369 Vec::new()
2370 }
2371 }
2372
2373 #[must_use]
2375 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2376 self.get_orders_for_ids(
2377 self.index
2378 .exec_spawn_orders
2379 .get(exec_spawn_id)
2380 .unwrap_or(&HashSet::new()),
2381 None,
2382 )
2383 }
2384
2385 #[must_use]
2387 pub fn exec_spawn_total_quantity(
2388 &self,
2389 exec_spawn_id: &ClientOrderId,
2390 active_only: bool,
2391 ) -> Option<Quantity> {
2392 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2393
2394 let mut total_quantity: Option<Quantity> = None;
2395
2396 for spawn_order in exec_spawn_orders {
2397 if !active_only || !spawn_order.is_closed() {
2398 if let Some(mut total_quantity) = total_quantity {
2399 total_quantity += spawn_order.quantity();
2400 }
2401 } else {
2402 total_quantity = Some(spawn_order.quantity());
2403 }
2404 }
2405
2406 total_quantity
2407 }
2408
2409 #[must_use]
2411 pub fn exec_spawn_total_filled_qty(
2412 &self,
2413 exec_spawn_id: &ClientOrderId,
2414 active_only: bool,
2415 ) -> Option<Quantity> {
2416 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2417
2418 let mut total_quantity: Option<Quantity> = None;
2419
2420 for spawn_order in exec_spawn_orders {
2421 if !active_only || !spawn_order.is_closed() {
2422 if let Some(mut total_quantity) = total_quantity {
2423 total_quantity += spawn_order.filled_qty();
2424 }
2425 } else {
2426 total_quantity = Some(spawn_order.filled_qty());
2427 }
2428 }
2429
2430 total_quantity
2431 }
2432
2433 #[must_use]
2435 pub fn exec_spawn_total_leaves_qty(
2436 &self,
2437 exec_spawn_id: &ClientOrderId,
2438 active_only: bool,
2439 ) -> Option<Quantity> {
2440 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2441
2442 let mut total_quantity: Option<Quantity> = None;
2443
2444 for spawn_order in exec_spawn_orders {
2445 if !active_only || !spawn_order.is_closed() {
2446 if let Some(mut total_quantity) = total_quantity {
2447 total_quantity += spawn_order.leaves_qty();
2448 }
2449 } else {
2450 total_quantity = Some(spawn_order.leaves_qty());
2451 }
2452 }
2453
2454 total_quantity
2455 }
2456
2457 #[must_use]
2461 pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2462 self.positions.get(position_id)
2463 }
2464
2465 #[must_use]
2467 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2468 self.index
2469 .order_position
2470 .get(client_order_id)
2471 .and_then(|position_id| self.positions.get(position_id))
2472 }
2473
2474 #[must_use]
2476 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2477 self.index.order_position.get(client_order_id)
2478 }
2479
2480 #[must_use]
2482 pub fn positions(
2483 &self,
2484 venue: Option<&Venue>,
2485 instrument_id: Option<&InstrumentId>,
2486 strategy_id: Option<&StrategyId>,
2487 side: Option<PositionSide>,
2488 ) -> Vec<&Position> {
2489 let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2490 self.get_positions_for_ids(&position_ids, side)
2491 }
2492
2493 #[must_use]
2495 pub fn positions_open(
2496 &self,
2497 venue: Option<&Venue>,
2498 instrument_id: Option<&InstrumentId>,
2499 strategy_id: Option<&StrategyId>,
2500 side: Option<PositionSide>,
2501 ) -> Vec<&Position> {
2502 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2503 self.get_positions_for_ids(&position_ids, side)
2504 }
2505
2506 #[must_use]
2508 pub fn positions_closed(
2509 &self,
2510 venue: Option<&Venue>,
2511 instrument_id: Option<&InstrumentId>,
2512 strategy_id: Option<&StrategyId>,
2513 side: Option<PositionSide>,
2514 ) -> Vec<&Position> {
2515 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2516 self.get_positions_for_ids(&position_ids, side)
2517 }
2518
2519 #[must_use]
2521 pub fn position_exists(&self, position_id: &PositionId) -> bool {
2522 self.index.positions.contains(position_id)
2523 }
2524
2525 #[must_use]
2527 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2528 self.index.positions_open.contains(position_id)
2529 }
2530
2531 #[must_use]
2533 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2534 self.index.positions_closed.contains(position_id)
2535 }
2536
2537 #[must_use]
2539 pub fn positions_open_count(
2540 &self,
2541 venue: Option<&Venue>,
2542 instrument_id: Option<&InstrumentId>,
2543 strategy_id: Option<&StrategyId>,
2544 side: Option<PositionSide>,
2545 ) -> usize {
2546 self.positions_open(venue, instrument_id, strategy_id, side)
2547 .len()
2548 }
2549
2550 #[must_use]
2552 pub fn positions_closed_count(
2553 &self,
2554 venue: Option<&Venue>,
2555 instrument_id: Option<&InstrumentId>,
2556 strategy_id: Option<&StrategyId>,
2557 side: Option<PositionSide>,
2558 ) -> usize {
2559 self.positions_closed(venue, instrument_id, strategy_id, side)
2560 .len()
2561 }
2562
2563 #[must_use]
2565 pub fn positions_total_count(
2566 &self,
2567 venue: Option<&Venue>,
2568 instrument_id: Option<&InstrumentId>,
2569 strategy_id: Option<&StrategyId>,
2570 side: Option<PositionSide>,
2571 ) -> usize {
2572 self.positions(venue, instrument_id, strategy_id, side)
2573 .len()
2574 }
2575
2576 #[must_use]
2580 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2581 self.index.order_strategy.get(client_order_id)
2582 }
2583
2584 #[must_use]
2586 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2587 self.index.position_strategy.get(position_id)
2588 }
2589
2590 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2594 check_valid_string(key, stringify!(key)).expect(FAILED);
2595
2596 Ok(self.general.get(key))
2597 }
2598
2599 #[must_use]
2603 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2604 match price_type {
2605 PriceType::Bid => self
2606 .quotes
2607 .get(instrument_id)
2608 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2609 PriceType::Ask => self
2610 .quotes
2611 .get(instrument_id)
2612 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2613 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2614 quotes.front().map(|quote| {
2615 Price::new(
2616 f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2617 quote.bid_price.precision + 1,
2618 )
2619 })
2620 }),
2621 PriceType::Last => self
2622 .trades
2623 .get(instrument_id)
2624 .and_then(|trades| trades.front().map(|trade| trade.price)),
2625 PriceType::Mark => self
2626 .mark_prices
2627 .get(instrument_id)
2628 .and_then(|marks| marks.front().map(|mark| mark.value)),
2629 }
2630 }
2631
2632 #[must_use]
2634 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2635 self.quotes
2636 .get(instrument_id)
2637 .map(|quotes| quotes.iter().copied().collect())
2638 }
2639
2640 #[must_use]
2642 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2643 self.trades
2644 .get(instrument_id)
2645 .map(|trades| trades.iter().copied().collect())
2646 }
2647
2648 #[must_use]
2650 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
2651 self.mark_prices
2652 .get(instrument_id)
2653 .map(|mark_prices| mark_prices.iter().copied().collect())
2654 }
2655
2656 #[must_use]
2658 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
2659 self.index_prices
2660 .get(instrument_id)
2661 .map(|index_prices| index_prices.iter().copied().collect())
2662 }
2663
2664 #[must_use]
2666 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2667 self.bars
2668 .get(bar_type)
2669 .map(|bars| bars.iter().copied().collect())
2670 }
2671
2672 #[must_use]
2674 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
2675 self.books.get(instrument_id)
2676 }
2677
2678 #[must_use]
2680 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
2681 self.books.get_mut(instrument_id)
2682 }
2683
2684 #[must_use]
2686 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
2687 self.own_books.get(instrument_id)
2688 }
2689
2690 #[must_use]
2692 pub fn own_order_book_mut(
2693 &mut self,
2694 instrument_id: &InstrumentId,
2695 ) -> Option<&mut OwnOrderBook> {
2696 self.own_books.get_mut(instrument_id)
2697 }
2698
2699 #[must_use]
2701 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
2702 self.quotes
2703 .get(instrument_id)
2704 .and_then(|quotes| quotes.front())
2705 }
2706
2707 #[must_use]
2709 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
2710 self.trades
2711 .get(instrument_id)
2712 .and_then(|trades| trades.front())
2713 }
2714
2715 #[must_use]
2717 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
2718 self.mark_prices
2719 .get(instrument_id)
2720 .and_then(|mark_prices| mark_prices.front())
2721 }
2722
2723 #[must_use]
2725 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
2726 self.index_prices
2727 .get(instrument_id)
2728 .and_then(|index_prices| index_prices.front())
2729 }
2730
2731 #[must_use]
2733 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
2734 self.bars.get(bar_type).and_then(|bars| bars.front())
2735 }
2736
2737 #[must_use]
2739 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
2740 self.books
2741 .get(instrument_id)
2742 .map_or(0, |book| book.update_count) as usize
2743 }
2744
2745 #[must_use]
2747 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
2748 self.quotes
2749 .get(instrument_id)
2750 .map_or(0, std::collections::VecDeque::len)
2751 }
2752
2753 #[must_use]
2755 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
2756 self.trades
2757 .get(instrument_id)
2758 .map_or(0, std::collections::VecDeque::len)
2759 }
2760
2761 #[must_use]
2763 pub fn bar_count(&self, bar_type: &BarType) -> usize {
2764 self.bars
2765 .get(bar_type)
2766 .map_or(0, std::collections::VecDeque::len)
2767 }
2768
2769 #[must_use]
2771 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
2772 self.books.contains_key(instrument_id)
2773 }
2774
2775 #[must_use]
2777 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
2778 self.quote_count(instrument_id) > 0
2779 }
2780
2781 #[must_use]
2783 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
2784 self.trade_count(instrument_id) > 0
2785 }
2786
2787 #[must_use]
2789 pub fn has_bars(&self, bar_type: &BarType) -> bool {
2790 self.bar_count(bar_type) > 0
2791 }
2792
2793 #[must_use]
2794 pub fn get_xrate(
2795 &self,
2796 venue: Venue,
2797 from_currency: Currency,
2798 to_currency: Currency,
2799 price_type: PriceType,
2800 ) -> Option<f64> {
2801 if from_currency == to_currency {
2802 return Some(1.0);
2805 }
2806
2807 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
2808
2809 match get_exchange_rate(
2810 from_currency.code,
2811 to_currency.code,
2812 price_type,
2813 bid_quote,
2814 ask_quote,
2815 ) {
2816 Ok(rate) => rate,
2817 Err(e) => {
2818 log::error!("Failed to calculate xrate: {e}");
2819 None
2820 }
2821 }
2822 }
2823
2824 fn build_quote_table(&self, venue: &Venue) -> (HashMap<String, f64>, HashMap<String, f64>) {
2825 let mut bid_quotes = HashMap::new();
2826 let mut ask_quotes = HashMap::new();
2827
2828 for instrument_id in self.instruments.keys() {
2829 if instrument_id.venue != *venue {
2830 continue;
2831 }
2832
2833 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
2834 if let Some(tick) = ticks.front() {
2835 (tick.bid_price, tick.ask_price)
2836 } else {
2837 continue; }
2839 } else {
2840 let bid_bar = self
2841 .bars
2842 .iter()
2843 .find(|(k, _)| {
2844 k.instrument_id() == *instrument_id
2845 && matches!(k.spec().price_type, PriceType::Bid)
2846 })
2847 .map(|(_, v)| v);
2848
2849 let ask_bar = self
2850 .bars
2851 .iter()
2852 .find(|(k, _)| {
2853 k.instrument_id() == *instrument_id
2854 && matches!(k.spec().price_type, PriceType::Ask)
2855 })
2856 .map(|(_, v)| v);
2857
2858 match (bid_bar, ask_bar) {
2859 (Some(bid), Some(ask)) => {
2860 let bid_price = bid.front().unwrap().close;
2861 let ask_price = ask.front().unwrap().close;
2862
2863 (bid_price, ask_price)
2864 }
2865 _ => continue,
2866 }
2867 };
2868
2869 bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
2870 ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
2871 }
2872
2873 (bid_quotes, ask_quotes)
2874 }
2875
2876 #[must_use]
2878 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
2879 self.mark_xrates.get(&(from_currency, to_currency)).copied()
2880 }
2881
2882 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
2888 assert!(xrate > 0.0, "xrate was zero");
2889 self.mark_xrates.insert((from_currency, to_currency), xrate);
2890 self.mark_xrates
2891 .insert((to_currency, from_currency), 1.0 / xrate);
2892 }
2893
2894 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
2896 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
2897 }
2898
2899 pub fn clear_mark_xrates(&mut self) {
2901 self.mark_xrates.clear();
2902 }
2903
2904 #[must_use]
2908 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
2909 self.instruments.get(instrument_id)
2910 }
2911
2912 #[must_use]
2914 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
2915 self.instruments
2916 .keys()
2917 .filter(|i| venue.is_none() || &i.venue == venue.unwrap())
2918 .collect()
2919 }
2920
2921 #[must_use]
2923 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
2924 self.instruments
2925 .values()
2926 .filter(|i| &i.id().venue == venue)
2927 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
2928 .collect()
2929 }
2930
2931 #[must_use]
2933 pub fn bar_types(
2934 &self,
2935 instrument_id: Option<&InstrumentId>,
2936 price_type: Option<&PriceType>,
2937 aggregation_source: AggregationSource,
2938 ) -> Vec<&BarType> {
2939 let mut bar_types = self
2940 .bars
2941 .keys()
2942 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
2943 .collect::<Vec<&BarType>>();
2944
2945 if let Some(instrument_id) = instrument_id {
2946 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
2947 }
2948
2949 if let Some(price_type) = price_type {
2950 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
2951 }
2952
2953 bar_types
2954 }
2955
2956 #[must_use]
2960 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
2961 self.synthetics.get(instrument_id)
2962 }
2963
2964 #[must_use]
2966 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
2967 self.synthetics.keys().collect()
2968 }
2969
2970 #[must_use]
2972 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
2973 self.synthetics.values().collect()
2974 }
2975
2976 #[must_use]
2980 pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
2981 self.accounts.get(account_id)
2982 }
2983
2984 #[must_use]
2986 pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
2987 self.index
2988 .venue_account
2989 .get(venue)
2990 .and_then(|account_id| self.accounts.get(account_id))
2991 }
2992
2993 #[must_use]
2995 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
2996 self.index.venue_account.get(venue)
2997 }
2998
2999 #[must_use]
3001 pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3002 self.accounts
3003 .values()
3004 .filter(|account| &account.id() == account_id)
3005 .collect()
3006 }
3007}