1pub mod config;
21pub mod database;
22pub mod fifo;
23pub mod quote;
24
25mod index;
26
27#[cfg(test)]
28mod tests;
29
30use std::{
31 collections::VecDeque,
32 fmt::{Debug, Display},
33 time::{SystemTime, UNIX_EPOCH},
34};
35
36use ahash::{AHashMap, AHashSet};
37use bytes::Bytes;
38pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
40use index::CacheIndex;
41use nautilus_core::{
42 UUID4, UnixNanos,
43 correctness::{
44 check_key_not_in_map, check_predicate_false, check_slice_not_empty,
45 check_valid_string_ascii,
46 },
47 datetime::secs_to_nanos_unchecked,
48};
49use nautilus_model::{
50 accounts::{Account, AccountAny},
51 data::{
52 Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
53 TradeTick, YieldCurveData,
54 },
55 enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
56 identifiers::{
57 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
58 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
59 },
60 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
61 orderbook::{
62 OrderBook,
63 own::{OwnOrderBook, should_handle_own_book_order},
64 },
65 orders::{Order, OrderAny, OrderList},
66 position::Position,
67 types::{Currency, Money, Price, Quantity},
68};
69use ustr::Ustr;
70
71use crate::xrate::get_exchange_rate;
72
73#[cfg_attr(
75 feature = "python",
76 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
77)]
78pub struct Cache {
79 config: CacheConfig,
80 index: CacheIndex,
81 database: Option<Box<dyn CacheDatabaseAdapter>>,
82 general: AHashMap<String, Bytes>,
83 currencies: AHashMap<Ustr, Currency>,
84 instruments: AHashMap<InstrumentId, InstrumentAny>,
85 synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
86 books: AHashMap<InstrumentId, OrderBook>,
87 own_books: AHashMap<InstrumentId, OwnOrderBook>,
88 quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
89 trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
90 mark_xrates: AHashMap<(Currency, Currency), f64>,
91 mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
92 index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
93 funding_rates: AHashMap<InstrumentId, VecDeque<FundingRateUpdate>>,
94 bars: AHashMap<BarType, VecDeque<Bar>>,
95 greeks: AHashMap<InstrumentId, GreeksData>,
96 yield_curves: AHashMap<String, YieldCurveData>,
97 accounts: AHashMap<AccountId, AccountAny>,
98 orders: AHashMap<ClientOrderId, OrderAny>,
99 order_lists: AHashMap<OrderListId, OrderList>,
100 positions: AHashMap<PositionId, Position>,
101 position_snapshots: AHashMap<PositionId, Bytes>,
102 #[cfg(feature = "defi")]
103 pub(crate) defi: crate::defi::cache::DefiCache,
104}
105
106impl Debug for Cache {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 f.debug_struct(stringify!(Cache))
109 .field("config", &self.config)
110 .field("index", &self.index)
111 .field("general", &self.general)
112 .field("currencies", &self.currencies)
113 .field("instruments", &self.instruments)
114 .field("synthetics", &self.synthetics)
115 .field("books", &self.books)
116 .field("own_books", &self.own_books)
117 .field("quotes", &self.quotes)
118 .field("trades", &self.trades)
119 .field("mark_xrates", &self.mark_xrates)
120 .field("mark_prices", &self.mark_prices)
121 .field("index_prices", &self.index_prices)
122 .field("funding_rates", &self.funding_rates)
123 .field("bars", &self.bars)
124 .field("greeks", &self.greeks)
125 .field("yield_curves", &self.yield_curves)
126 .field("accounts", &self.accounts)
127 .field("orders", &self.orders)
128 .field("order_lists", &self.order_lists)
129 .field("positions", &self.positions)
130 .field("position_snapshots", &self.position_snapshots)
131 .finish()
132 }
133}
134
135impl Default for Cache {
136 fn default() -> Self {
138 Self::new(Some(CacheConfig::default()), None)
139 }
140}
141
142impl Cache {
143 #[must_use]
145 pub fn new(
149 config: Option<CacheConfig>,
150 database: Option<Box<dyn CacheDatabaseAdapter>>,
151 ) -> Self {
152 Self {
153 config: config.unwrap_or_default(),
154 index: CacheIndex::default(),
155 database,
156 general: AHashMap::new(),
157 currencies: AHashMap::new(),
158 instruments: AHashMap::new(),
159 synthetics: AHashMap::new(),
160 books: AHashMap::new(),
161 own_books: AHashMap::new(),
162 quotes: AHashMap::new(),
163 trades: AHashMap::new(),
164 mark_xrates: AHashMap::new(),
165 mark_prices: AHashMap::new(),
166 index_prices: AHashMap::new(),
167 funding_rates: AHashMap::new(),
168 bars: AHashMap::new(),
169 greeks: AHashMap::new(),
170 yield_curves: AHashMap::new(),
171 accounts: AHashMap::new(),
172 orders: AHashMap::new(),
173 order_lists: AHashMap::new(),
174 positions: AHashMap::new(),
175 position_snapshots: AHashMap::new(),
176 #[cfg(feature = "defi")]
177 defi: crate::defi::cache::DefiCache::default(),
178 }
179 }
180
181 #[must_use]
183 pub fn memory_address(&self) -> String {
184 format!("{:?}", std::ptr::from_ref(self))
185 }
186
187 pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
191 let type_name = std::any::type_name_of_val(&*database);
192 log::info!("Cache database adapter set: {type_name}");
193 self.database = Some(database);
194 }
195
196 pub fn cache_general(&mut self) -> anyhow::Result<()> {
204 self.general = match &mut self.database {
205 Some(db) => db.load()?,
206 None => AHashMap::new(),
207 };
208
209 log::info!(
210 "Cached {} general object(s) from database",
211 self.general.len()
212 );
213 Ok(())
214 }
215
216 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
222 let cache_map = match &self.database {
223 Some(db) => db.load_all().await?,
224 None => CacheMap::default(),
225 };
226
227 self.currencies = cache_map.currencies;
228 self.instruments = cache_map.instruments;
229 self.synthetics = cache_map.synthetics;
230 self.accounts = cache_map.accounts;
231 self.orders = cache_map.orders;
232 self.positions = cache_map.positions;
233 Ok(())
234 }
235
236 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
242 self.currencies = match &mut self.database {
243 Some(db) => db.load_currencies().await?,
244 None => AHashMap::new(),
245 };
246
247 log::info!("Cached {} currencies from database", self.general.len());
248 Ok(())
249 }
250
251 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
257 self.instruments = match &mut self.database {
258 Some(db) => db.load_instruments().await?,
259 None => AHashMap::new(),
260 };
261
262 log::info!("Cached {} instruments from database", self.general.len());
263 Ok(())
264 }
265
266 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
272 self.synthetics = match &mut self.database {
273 Some(db) => db.load_synthetics().await?,
274 None => AHashMap::new(),
275 };
276
277 log::info!(
278 "Cached {} synthetic instruments from database",
279 self.general.len()
280 );
281 Ok(())
282 }
283
284 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
290 self.accounts = match &mut self.database {
291 Some(db) => db.load_accounts().await?,
292 None => AHashMap::new(),
293 };
294
295 log::info!(
296 "Cached {} synthetic instruments from database",
297 self.general.len()
298 );
299 Ok(())
300 }
301
302 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
308 self.orders = match &mut self.database {
309 Some(db) => db.load_orders().await?,
310 None => AHashMap::new(),
311 };
312
313 log::info!("Cached {} orders from database", self.general.len());
314 Ok(())
315 }
316
317 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
323 self.positions = match &mut self.database {
324 Some(db) => db.load_positions().await?,
325 None => AHashMap::new(),
326 };
327
328 log::info!("Cached {} positions from database", self.general.len());
329 Ok(())
330 }
331
332 pub fn build_index(&mut self) {
334 log::debug!("Building index");
335
336 for account_id in self.accounts.keys() {
338 self.index
339 .venue_account
340 .insert(account_id.get_issuer(), *account_id);
341 }
342
343 for (client_order_id, order) in &self.orders {
345 let instrument_id = order.instrument_id();
346 let venue = instrument_id.venue;
347 let strategy_id = order.strategy_id();
348
349 self.index
351 .venue_orders
352 .entry(venue)
353 .or_default()
354 .insert(*client_order_id);
355
356 if let Some(venue_order_id) = order.venue_order_id() {
358 self.index
359 .venue_order_ids
360 .insert(venue_order_id, *client_order_id);
361 }
362
363 if let Some(position_id) = order.position_id() {
365 self.index
366 .order_position
367 .insert(*client_order_id, position_id);
368 }
369
370 self.index
372 .order_strategy
373 .insert(*client_order_id, order.strategy_id());
374
375 self.index
377 .instrument_orders
378 .entry(instrument_id)
379 .or_default()
380 .insert(*client_order_id);
381
382 self.index
384 .strategy_orders
385 .entry(strategy_id)
386 .or_default()
387 .insert(*client_order_id);
388
389 if let Some(account_id) = order.account_id() {
391 self.index
392 .account_orders
393 .entry(account_id)
394 .or_default()
395 .insert(*client_order_id);
396 }
397
398 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
400 self.index
401 .exec_algorithm_orders
402 .entry(exec_algorithm_id)
403 .or_default()
404 .insert(*client_order_id);
405 }
406
407 if let Some(exec_spawn_id) = order.exec_spawn_id() {
409 self.index
410 .exec_spawn_orders
411 .entry(exec_spawn_id)
412 .or_default()
413 .insert(*client_order_id);
414 }
415
416 self.index.orders.insert(*client_order_id);
418
419 if order.is_open() {
421 self.index.orders_open.insert(*client_order_id);
422 }
423
424 if order.is_closed() {
426 self.index.orders_closed.insert(*client_order_id);
427 }
428
429 if let Some(emulation_trigger) = order.emulation_trigger()
431 && emulation_trigger != TriggerType::NoTrigger
432 && !order.is_closed()
433 {
434 self.index.orders_emulated.insert(*client_order_id);
435 }
436
437 if order.is_inflight() {
439 self.index.orders_inflight.insert(*client_order_id);
440 }
441
442 self.index.strategies.insert(strategy_id);
444
445 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
447 self.index.exec_algorithms.insert(exec_algorithm_id);
448 }
449 }
450
451 for (position_id, position) in &self.positions {
453 let instrument_id = position.instrument_id;
454 let venue = instrument_id.venue;
455 let strategy_id = position.strategy_id;
456
457 self.index
459 .venue_positions
460 .entry(venue)
461 .or_default()
462 .insert(*position_id);
463
464 self.index
466 .position_strategy
467 .insert(*position_id, position.strategy_id);
468
469 self.index
471 .position_orders
472 .entry(*position_id)
473 .or_default()
474 .extend(position.client_order_ids().into_iter());
475
476 self.index
478 .instrument_positions
479 .entry(instrument_id)
480 .or_default()
481 .insert(*position_id);
482
483 self.index
485 .strategy_positions
486 .entry(strategy_id)
487 .or_default()
488 .insert(*position_id);
489
490 self.index
492 .account_positions
493 .entry(position.account_id)
494 .or_default()
495 .insert(*position_id);
496
497 self.index.positions.insert(*position_id);
499
500 if position.is_open() {
502 self.index.positions_open.insert(*position_id);
503 }
504
505 if position.is_closed() {
507 self.index.positions_closed.insert(*position_id);
508 }
509
510 self.index.strategies.insert(strategy_id);
512 }
513 }
514
515 #[must_use]
517 pub const fn has_backing(&self) -> bool {
518 self.config.database.is_some()
519 }
520
521 #[must_use]
523 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
524 let quote = if let Some(quote) = self.quote(&position.instrument_id) {
525 quote
526 } else {
527 log::warn!(
528 "Cannot calculate unrealized PnL for {}, no quotes for {}",
529 position.id,
530 position.instrument_id
531 );
532 return None;
533 };
534
535 let last = match position.side {
537 PositionSide::Flat | PositionSide::NoPositionSide => {
538 return Some(Money::new(0.0, position.settlement_currency));
539 }
540 PositionSide::Long => quote.bid_price,
541 PositionSide::Short => quote.ask_price,
542 };
543
544 Some(position.unrealized_pnl(last))
545 }
546
547 #[must_use]
556 pub fn check_integrity(&mut self) -> bool {
557 let mut error_count = 0;
558 let failure = "Integrity failure";
559
560 let timestamp_us = SystemTime::now()
562 .duration_since(UNIX_EPOCH)
563 .expect("Time went backwards")
564 .as_micros();
565
566 log::info!("Checking data integrity");
567
568 for account_id in self.accounts.keys() {
570 if !self
571 .index
572 .venue_account
573 .contains_key(&account_id.get_issuer())
574 {
575 log::error!(
576 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
577 );
578 error_count += 1;
579 }
580 }
581
582 for (client_order_id, order) in &self.orders {
583 if !self.index.order_strategy.contains_key(client_order_id) {
584 log::error!(
585 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
586 );
587 error_count += 1;
588 }
589 if !self.index.orders.contains(client_order_id) {
590 log::error!(
591 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
592 );
593 error_count += 1;
594 }
595 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
596 log::error!(
597 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
598 );
599 error_count += 1;
600 }
601 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
602 log::error!(
603 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
604 );
605 error_count += 1;
606 }
607 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
608 log::error!(
609 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
610 );
611 error_count += 1;
612 }
613 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
614 if !self
615 .index
616 .exec_algorithm_orders
617 .contains_key(&exec_algorithm_id)
618 {
619 log::error!(
620 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
621 );
622 error_count += 1;
623 }
624 if order.exec_spawn_id().is_none()
625 && !self.index.exec_spawn_orders.contains_key(client_order_id)
626 {
627 log::error!(
628 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
629 );
630 error_count += 1;
631 }
632 }
633 }
634
635 for (position_id, position) in &self.positions {
636 if !self.index.position_strategy.contains_key(position_id) {
637 log::error!(
638 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
639 );
640 error_count += 1;
641 }
642 if !self.index.position_orders.contains_key(position_id) {
643 log::error!(
644 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
645 );
646 error_count += 1;
647 }
648 if !self.index.positions.contains(position_id) {
649 log::error!(
650 "{failure} in positions: {position_id} not found in `self.index.positions`",
651 );
652 error_count += 1;
653 }
654 if position.is_open() && !self.index.positions_open.contains(position_id) {
655 log::error!(
656 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
657 );
658 error_count += 1;
659 }
660 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
661 log::error!(
662 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
663 );
664 error_count += 1;
665 }
666 }
667
668 for account_id in self.index.venue_account.values() {
670 if !self.accounts.contains_key(account_id) {
671 log::error!(
672 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
673 );
674 error_count += 1;
675 }
676 }
677
678 for client_order_id in self.index.venue_order_ids.values() {
679 if !self.orders.contains_key(client_order_id) {
680 log::error!(
681 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
682 );
683 error_count += 1;
684 }
685 }
686
687 for client_order_id in self.index.client_order_ids.keys() {
688 if !self.orders.contains_key(client_order_id) {
689 log::error!(
690 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
691 );
692 error_count += 1;
693 }
694 }
695
696 for client_order_id in self.index.order_position.keys() {
697 if !self.orders.contains_key(client_order_id) {
698 log::error!(
699 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
700 );
701 error_count += 1;
702 }
703 }
704
705 for client_order_id in self.index.order_strategy.keys() {
707 if !self.orders.contains_key(client_order_id) {
708 log::error!(
709 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
710 );
711 error_count += 1;
712 }
713 }
714
715 for position_id in self.index.position_strategy.keys() {
716 if !self.positions.contains_key(position_id) {
717 log::error!(
718 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
719 );
720 error_count += 1;
721 }
722 }
723
724 for position_id in self.index.position_orders.keys() {
725 if !self.positions.contains_key(position_id) {
726 log::error!(
727 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
728 );
729 error_count += 1;
730 }
731 }
732
733 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
734 for client_order_id in client_order_ids {
735 if !self.orders.contains_key(client_order_id) {
736 log::error!(
737 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
738 );
739 error_count += 1;
740 }
741 }
742 }
743
744 for instrument_id in self.index.instrument_positions.keys() {
745 if !self.index.instrument_orders.contains_key(instrument_id) {
746 log::error!(
747 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
748 );
749 error_count += 1;
750 }
751 }
752
753 for client_order_ids in self.index.strategy_orders.values() {
754 for client_order_id in client_order_ids {
755 if !self.orders.contains_key(client_order_id) {
756 log::error!(
757 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
758 );
759 error_count += 1;
760 }
761 }
762 }
763
764 for position_ids in self.index.strategy_positions.values() {
765 for position_id in position_ids {
766 if !self.positions.contains_key(position_id) {
767 log::error!(
768 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
769 );
770 error_count += 1;
771 }
772 }
773 }
774
775 for client_order_id in &self.index.orders {
776 if !self.orders.contains_key(client_order_id) {
777 log::error!(
778 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
779 );
780 error_count += 1;
781 }
782 }
783
784 for client_order_id in &self.index.orders_emulated {
785 if !self.orders.contains_key(client_order_id) {
786 log::error!(
787 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
788 );
789 error_count += 1;
790 }
791 }
792
793 for client_order_id in &self.index.orders_inflight {
794 if !self.orders.contains_key(client_order_id) {
795 log::error!(
796 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
797 );
798 error_count += 1;
799 }
800 }
801
802 for client_order_id in &self.index.orders_open {
803 if !self.orders.contains_key(client_order_id) {
804 log::error!(
805 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
806 );
807 error_count += 1;
808 }
809 }
810
811 for client_order_id in &self.index.orders_closed {
812 if !self.orders.contains_key(client_order_id) {
813 log::error!(
814 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
815 );
816 error_count += 1;
817 }
818 }
819
820 for position_id in &self.index.positions {
821 if !self.positions.contains_key(position_id) {
822 log::error!(
823 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
824 );
825 error_count += 1;
826 }
827 }
828
829 for position_id in &self.index.positions_open {
830 if !self.positions.contains_key(position_id) {
831 log::error!(
832 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
833 );
834 error_count += 1;
835 }
836 }
837
838 for position_id in &self.index.positions_closed {
839 if !self.positions.contains_key(position_id) {
840 log::error!(
841 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
842 );
843 error_count += 1;
844 }
845 }
846
847 for strategy_id in &self.index.strategies {
848 if !self.index.strategy_orders.contains_key(strategy_id) {
849 log::error!(
850 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
851 );
852 error_count += 1;
853 }
854 }
855
856 for exec_algorithm_id in &self.index.exec_algorithms {
857 if !self
858 .index
859 .exec_algorithm_orders
860 .contains_key(exec_algorithm_id)
861 {
862 log::error!(
863 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
864 );
865 error_count += 1;
866 }
867 }
868
869 let total_us = SystemTime::now()
870 .duration_since(UNIX_EPOCH)
871 .expect("Time went backwards")
872 .as_micros()
873 - timestamp_us;
874
875 if error_count == 0 {
876 log::info!("Integrity check passed in {total_us}μs");
877 true
878 } else {
879 log::error!(
880 "Integrity check failed with {error_count} error{} in {total_us}μs",
881 if error_count == 1 { "" } else { "s" },
882 );
883 false
884 }
885 }
886
887 #[must_use]
891 pub fn check_residuals(&self) -> bool {
892 log::debug!("Checking residuals");
893
894 let mut residuals = false;
895
896 for order in self.orders_open(None, None, None, None, None) {
898 residuals = true;
899 log::warn!("Residual {order}");
900 }
901
902 for position in self.positions_open(None, None, None, None, None) {
904 residuals = true;
905 log::warn!("Residual {position}");
906 }
907
908 residuals
909 }
910
911 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
917 log::debug!(
918 "Purging closed orders{}",
919 if buffer_secs > 0 {
920 format!(" with buffer_secs={buffer_secs}")
921 } else {
922 String::new()
923 }
924 );
925
926 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
927
928 let mut affected_order_list_ids: AHashSet<OrderListId> = AHashSet::new();
929
930 'outer: for client_order_id in self.index.orders_closed.clone() {
931 if let Some(order) = self.orders.get(&client_order_id)
932 && order.is_closed()
933 && let Some(ts_closed) = order.ts_closed()
934 && ts_closed + buffer_ns <= ts_now
935 {
936 if let Some(linked_order_ids) = order.linked_order_ids() {
938 for linked_order_id in linked_order_ids {
939 if let Some(linked_order) = self.orders.get(linked_order_id)
940 && linked_order.is_open()
941 {
942 continue 'outer;
944 }
945 }
946 }
947
948 if let Some(order_list_id) = order.order_list_id() {
949 affected_order_list_ids.insert(order_list_id);
950 }
951
952 self.purge_order(client_order_id);
953 }
954 }
955
956 for order_list_id in affected_order_list_ids {
957 if let Some(order_list) = self.order_lists.get(&order_list_id) {
958 let all_purged = order_list
959 .client_order_ids
960 .iter()
961 .all(|id| !self.orders.contains_key(id));
962 if all_purged {
963 self.order_lists.remove(&order_list_id);
964 log::info!("Purged {order_list_id}");
965 }
966 }
967 }
968 }
969
970 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
972 log::debug!(
973 "Purging closed positions{}",
974 if buffer_secs > 0 {
975 format!(" with buffer_secs={buffer_secs}")
976 } else {
977 String::new()
978 }
979 );
980
981 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
982
983 for position_id in self.index.positions_closed.clone() {
984 if let Some(position) = self.positions.get(&position_id)
985 && position.is_closed()
986 && let Some(ts_closed) = position.ts_closed
987 && ts_closed + buffer_ns <= ts_now
988 {
989 self.purge_position(position_id);
990 }
991 }
992 }
993
994 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
998 let order = self.orders.get(&client_order_id).cloned();
1000
1001 if let Some(ref ord) = order
1003 && ord.is_open()
1004 {
1005 log::warn!("Order {client_order_id} found open when purging, skipping purge");
1006 return;
1007 }
1008
1009 if let Some(ref ord) = order {
1011 self.orders.remove(&client_order_id);
1013
1014 if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
1016 {
1017 venue_orders.remove(&client_order_id);
1018 }
1019
1020 if let Some(venue_order_id) = ord.venue_order_id() {
1022 self.index.venue_order_ids.remove(&venue_order_id);
1023 }
1024
1025 if let Some(instrument_orders) =
1027 self.index.instrument_orders.get_mut(&ord.instrument_id())
1028 {
1029 instrument_orders.remove(&client_order_id);
1030 }
1031
1032 if let Some(position_id) = ord.position_id()
1034 && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
1035 {
1036 position_orders.remove(&client_order_id);
1037 }
1038
1039 if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
1041 && let Some(exec_algorithm_orders) =
1042 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
1043 {
1044 exec_algorithm_orders.remove(&client_order_id);
1045 }
1046
1047 if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1049 strategy_orders.remove(&client_order_id);
1050 if strategy_orders.is_empty() {
1051 self.index.strategy_orders.remove(&ord.strategy_id());
1052 }
1053 }
1054
1055 if let Some(account_id) = ord.account_id()
1057 && let Some(account_orders) = self.index.account_orders.get_mut(&account_id)
1058 {
1059 account_orders.remove(&client_order_id);
1060 if account_orders.is_empty() {
1061 self.index.account_orders.remove(&account_id);
1062 }
1063 }
1064
1065 if let Some(exec_spawn_id) = ord.exec_spawn_id()
1067 && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1068 {
1069 spawn_orders.remove(&client_order_id);
1070 if spawn_orders.is_empty() {
1071 self.index.exec_spawn_orders.remove(&exec_spawn_id);
1072 }
1073 }
1074
1075 log::info!("Purged order {client_order_id}");
1076 } else {
1077 log::warn!("Order {client_order_id} not found when purging");
1078 }
1079
1080 self.index.order_position.remove(&client_order_id);
1082 let strategy_id = self.index.order_strategy.remove(&client_order_id);
1083 self.index.order_client.remove(&client_order_id);
1084 self.index.client_order_ids.remove(&client_order_id);
1085
1086 if let Some(strategy_id) = strategy_id
1088 && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1089 {
1090 strategy_orders.remove(&client_order_id);
1091 if strategy_orders.is_empty() {
1092 self.index.strategy_orders.remove(&strategy_id);
1093 }
1094 }
1095
1096 self.index.exec_spawn_orders.remove(&client_order_id);
1098
1099 self.index.orders.remove(&client_order_id);
1100 self.index.orders_open.remove(&client_order_id);
1101 self.index.orders_closed.remove(&client_order_id);
1102 self.index.orders_emulated.remove(&client_order_id);
1103 self.index.orders_inflight.remove(&client_order_id);
1104 self.index.orders_pending_cancel.remove(&client_order_id);
1105 }
1106
1107 pub fn purge_position(&mut self, position_id: PositionId) {
1111 let position = self.positions.get(&position_id).cloned();
1113
1114 if let Some(ref pos) = position
1116 && pos.is_open()
1117 {
1118 log::warn!("Position {position_id} found open when purging, skipping purge");
1119 return;
1120 }
1121
1122 if let Some(ref pos) = position {
1124 self.positions.remove(&position_id);
1125
1126 if let Some(venue_positions) =
1128 self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1129 {
1130 venue_positions.remove(&position_id);
1131 }
1132
1133 if let Some(instrument_positions) =
1135 self.index.instrument_positions.get_mut(&pos.instrument_id)
1136 {
1137 instrument_positions.remove(&position_id);
1138 }
1139
1140 if let Some(strategy_positions) =
1142 self.index.strategy_positions.get_mut(&pos.strategy_id)
1143 {
1144 strategy_positions.remove(&position_id);
1145 }
1146
1147 if let Some(account_positions) = self.index.account_positions.get_mut(&pos.account_id) {
1149 account_positions.remove(&position_id);
1150 if account_positions.is_empty() {
1151 self.index.account_positions.remove(&pos.account_id);
1152 }
1153 }
1154
1155 for client_order_id in pos.client_order_ids() {
1157 self.index.order_position.remove(&client_order_id);
1158 }
1159
1160 log::info!("Purged position {position_id}");
1161 } else {
1162 log::warn!("Position {position_id} not found when purging");
1163 }
1164
1165 self.index.position_strategy.remove(&position_id);
1167 self.index.position_orders.remove(&position_id);
1168 self.index.positions.remove(&position_id);
1169 self.index.positions_open.remove(&position_id);
1170 self.index.positions_closed.remove(&position_id);
1171
1172 self.position_snapshots.remove(&position_id);
1174 }
1175
1176 pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1181 log::debug!(
1182 "Purging account events{}",
1183 if lookback_secs > 0 {
1184 format!(" with lookback_secs={lookback_secs}")
1185 } else {
1186 String::new()
1187 }
1188 );
1189
1190 for account in self.accounts.values_mut() {
1191 let event_count = account.event_count();
1192 account.purge_account_events(ts_now, lookback_secs);
1193 let count_diff = event_count - account.event_count();
1194 if count_diff > 0 {
1195 log::info!(
1196 "Purged {} event(s) from account {}",
1197 count_diff,
1198 account.id()
1199 );
1200 }
1201 }
1202 }
1203
1204 pub fn clear_index(&mut self) {
1206 self.index.clear();
1207 log::debug!("Cleared index");
1208 }
1209
1210 pub fn reset(&mut self) {
1214 log::debug!("Resetting cache");
1215
1216 self.general.clear();
1217 self.currencies.clear();
1218 self.instruments.clear();
1219 self.synthetics.clear();
1220 self.books.clear();
1221 self.own_books.clear();
1222 self.quotes.clear();
1223 self.trades.clear();
1224 self.mark_xrates.clear();
1225 self.mark_prices.clear();
1226 self.index_prices.clear();
1227 self.funding_rates.clear();
1228 self.bars.clear();
1229 self.accounts.clear();
1230 self.orders.clear();
1231 self.order_lists.clear();
1232 self.positions.clear();
1233 self.position_snapshots.clear();
1234 self.greeks.clear();
1235 self.yield_curves.clear();
1236
1237 #[cfg(feature = "defi")]
1238 {
1239 self.defi.pools.clear();
1240 self.defi.pool_profilers.clear();
1241 }
1242
1243 self.clear_index();
1244
1245 log::info!("Reset cache");
1246 }
1247
1248 pub fn dispose(&mut self) {
1252 if let Some(database) = &mut self.database
1253 && let Err(e) = database.close()
1254 {
1255 log::error!("Failed to close database during dispose: {e}");
1256 }
1257 }
1258
1259 pub fn flush_db(&mut self) {
1263 if let Some(database) = &mut self.database
1264 && let Err(e) = database.flush()
1265 {
1266 log::error!("Failed to flush database: {e}");
1267 }
1268 }
1269
1270 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1278 check_valid_string_ascii(key, stringify!(key))?;
1279 check_predicate_false(value.is_empty(), stringify!(value))?;
1280
1281 log::debug!("Adding general {key}");
1282 self.general.insert(key.to_string(), value.clone());
1283
1284 if let Some(database) = &mut self.database {
1285 database.add(key.to_string(), value)?;
1286 }
1287 Ok(())
1288 }
1289
1290 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1296 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1297
1298 if self.config.save_market_data
1299 && let Some(database) = &mut self.database
1300 {
1301 database.add_order_book(&book)?;
1302 }
1303
1304 self.books.insert(book.instrument_id, book);
1305 Ok(())
1306 }
1307
1308 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1314 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1315
1316 self.own_books.insert(own_book.instrument_id, own_book);
1317 Ok(())
1318 }
1319
1320 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1326 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1327
1328 if self.config.save_market_data {
1329 }
1331
1332 let mark_prices_deque = self
1333 .mark_prices
1334 .entry(mark_price.instrument_id)
1335 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1336 mark_prices_deque.push_front(mark_price);
1337 Ok(())
1338 }
1339
1340 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1346 log::debug!(
1347 "Adding `IndexPriceUpdate` for {}",
1348 index_price.instrument_id
1349 );
1350
1351 if self.config.save_market_data {
1352 }
1354
1355 let index_prices_deque = self
1356 .index_prices
1357 .entry(index_price.instrument_id)
1358 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1359 index_prices_deque.push_front(index_price);
1360 Ok(())
1361 }
1362
1363 pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1369 log::debug!(
1370 "Adding `FundingRateUpdate` for {}",
1371 funding_rate.instrument_id
1372 );
1373
1374 if self.config.save_market_data {
1375 }
1377
1378 let funding_rates_deque = self
1379 .funding_rates
1380 .entry(funding_rate.instrument_id)
1381 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1382 funding_rates_deque.push_front(funding_rate);
1383 Ok(())
1384 }
1385
1386 pub fn add_funding_rates(&mut self, funding_rates: &[FundingRateUpdate]) -> anyhow::Result<()> {
1392 check_slice_not_empty(funding_rates, stringify!(funding_rates))?;
1393
1394 let instrument_id = funding_rates[0].instrument_id;
1395 log::debug!(
1396 "Adding `FundingRateUpdate`[{}] {instrument_id}",
1397 funding_rates.len()
1398 );
1399
1400 if self.config.save_market_data
1401 && let Some(database) = &mut self.database
1402 {
1403 for funding_rate in funding_rates {
1404 database.add_funding_rate(funding_rate)?;
1405 }
1406 }
1407
1408 let funding_rate_deque = self
1409 .funding_rates
1410 .entry(instrument_id)
1411 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1412
1413 for funding_rate in funding_rates {
1414 funding_rate_deque.push_front(*funding_rate);
1415 }
1416 Ok(())
1417 }
1418
1419 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1425 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1426
1427 if self.config.save_market_data
1428 && let Some(database) = &mut self.database
1429 {
1430 database.add_quote("e)?;
1431 }
1432
1433 let quotes_deque = self
1434 .quotes
1435 .entry(quote.instrument_id)
1436 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1437 quotes_deque.push_front(quote);
1438 Ok(())
1439 }
1440
1441 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1447 check_slice_not_empty(quotes, stringify!(quotes))?;
1448
1449 let instrument_id = quotes[0].instrument_id;
1450 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1451
1452 if self.config.save_market_data
1453 && let Some(database) = &mut self.database
1454 {
1455 for quote in quotes {
1456 database.add_quote(quote)?;
1457 }
1458 }
1459
1460 let quotes_deque = self
1461 .quotes
1462 .entry(instrument_id)
1463 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1464
1465 for quote in quotes {
1466 quotes_deque.push_front(*quote);
1467 }
1468 Ok(())
1469 }
1470
1471 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1477 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1478
1479 if self.config.save_market_data
1480 && let Some(database) = &mut self.database
1481 {
1482 database.add_trade(&trade)?;
1483 }
1484
1485 let trades_deque = self
1486 .trades
1487 .entry(trade.instrument_id)
1488 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1489 trades_deque.push_front(trade);
1490 Ok(())
1491 }
1492
1493 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1499 check_slice_not_empty(trades, stringify!(trades))?;
1500
1501 let instrument_id = trades[0].instrument_id;
1502 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1503
1504 if self.config.save_market_data
1505 && let Some(database) = &mut self.database
1506 {
1507 for trade in trades {
1508 database.add_trade(trade)?;
1509 }
1510 }
1511
1512 let trades_deque = self
1513 .trades
1514 .entry(instrument_id)
1515 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1516
1517 for trade in trades {
1518 trades_deque.push_front(*trade);
1519 }
1520 Ok(())
1521 }
1522
1523 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1529 log::debug!("Adding `Bar` {}", bar.bar_type);
1530
1531 if self.config.save_market_data
1532 && let Some(database) = &mut self.database
1533 {
1534 database.add_bar(&bar)?;
1535 }
1536
1537 let bars = self
1538 .bars
1539 .entry(bar.bar_type)
1540 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1541 bars.push_front(bar);
1542 Ok(())
1543 }
1544
1545 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1551 check_slice_not_empty(bars, stringify!(bars))?;
1552
1553 let bar_type = bars[0].bar_type;
1554 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1555
1556 if self.config.save_market_data
1557 && let Some(database) = &mut self.database
1558 {
1559 for bar in bars {
1560 database.add_bar(bar)?;
1561 }
1562 }
1563
1564 let bars_deque = self
1565 .bars
1566 .entry(bar_type)
1567 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1568
1569 for bar in bars {
1570 bars_deque.push_front(*bar);
1571 }
1572 Ok(())
1573 }
1574
1575 pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1581 log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1582
1583 if self.config.save_market_data
1584 && let Some(_database) = &mut self.database
1585 {
1586 }
1588
1589 self.greeks.insert(greeks.instrument_id, greeks);
1590 Ok(())
1591 }
1592
1593 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1595 self.greeks.get(instrument_id).cloned()
1596 }
1597
1598 pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1604 log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1605
1606 if self.config.save_market_data
1607 && let Some(_database) = &mut self.database
1608 {
1609 }
1611
1612 self.yield_curves
1613 .insert(yield_curve.curve_name.clone(), yield_curve);
1614 Ok(())
1615 }
1616
1617 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1619 self.yield_curves.get(key).map(|curve| {
1620 let curve_clone = curve.clone();
1621 Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1622 as Box<dyn Fn(f64) -> f64>
1623 })
1624 }
1625
1626 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1632 if self.currencies.contains_key(¤cy.code) {
1633 return Ok(());
1634 }
1635 log::debug!("Adding `Currency` {}", currency.code);
1636
1637 if let Some(database) = &mut self.database {
1638 database.add_currency(¤cy)?;
1639 }
1640
1641 self.currencies.insert(currency.code, currency);
1642 Ok(())
1643 }
1644
1645 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1651 log::debug!("Adding `Instrument` {}", instrument.id());
1652
1653 if let Some(base_currency) = instrument.base_currency() {
1655 self.add_currency(base_currency)?;
1656 }
1657 self.add_currency(instrument.quote_currency())?;
1658 self.add_currency(instrument.settlement_currency())?;
1659
1660 if let Some(database) = &mut self.database {
1661 database.add_instrument(&instrument)?;
1662 }
1663
1664 self.instruments.insert(instrument.id(), instrument);
1665 Ok(())
1666 }
1667
1668 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1674 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1675
1676 if let Some(database) = &mut self.database {
1677 database.add_synthetic(&synthetic)?;
1678 }
1679
1680 self.synthetics.insert(synthetic.id, synthetic);
1681 Ok(())
1682 }
1683
1684 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1690 log::debug!("Adding `Account` {}", account.id());
1691
1692 if let Some(database) = &mut self.database {
1693 database.add_account(&account)?;
1694 }
1695
1696 let account_id = account.id();
1697 self.accounts.insert(account_id, account);
1698 self.index
1699 .venue_account
1700 .insert(account_id.get_issuer(), account_id);
1701 Ok(())
1702 }
1703
1704 pub fn add_venue_order_id(
1712 &mut self,
1713 client_order_id: &ClientOrderId,
1714 venue_order_id: &VenueOrderId,
1715 overwrite: bool,
1716 ) -> anyhow::Result<()> {
1717 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1718 && !overwrite
1719 && existing_venue_order_id != venue_order_id
1720 {
1721 anyhow::bail!(
1722 "Existing {existing_venue_order_id} for {client_order_id}
1723 did not match the given {venue_order_id}.
1724 If you are writing a test then try a different `venue_order_id`,
1725 otherwise this is probably a bug."
1726 );
1727 }
1728
1729 self.index
1730 .client_order_ids
1731 .insert(*client_order_id, *venue_order_id);
1732 self.index
1733 .venue_order_ids
1734 .insert(*venue_order_id, *client_order_id);
1735
1736 Ok(())
1737 }
1738
1739 pub fn add_order(
1751 &mut self,
1752 order: OrderAny,
1753 position_id: Option<PositionId>,
1754 client_id: Option<ClientId>,
1755 replace_existing: bool,
1756 ) -> anyhow::Result<()> {
1757 let instrument_id = order.instrument_id();
1758 let venue = instrument_id.venue;
1759 let client_order_id = order.client_order_id();
1760 let strategy_id = order.strategy_id();
1761 let exec_algorithm_id = order.exec_algorithm_id();
1762 let exec_spawn_id = order.exec_spawn_id();
1763
1764 if !replace_existing {
1765 check_key_not_in_map(
1766 &client_order_id,
1767 &self.orders,
1768 stringify!(client_order_id),
1769 stringify!(orders),
1770 )?;
1771 }
1772
1773 log::debug!("Adding {order:?}");
1774
1775 self.index.orders.insert(client_order_id);
1776 self.index
1777 .order_strategy
1778 .insert(client_order_id, strategy_id);
1779 self.index.strategies.insert(strategy_id);
1780
1781 self.index
1783 .venue_orders
1784 .entry(venue)
1785 .or_default()
1786 .insert(client_order_id);
1787
1788 self.index
1790 .instrument_orders
1791 .entry(instrument_id)
1792 .or_default()
1793 .insert(client_order_id);
1794
1795 self.index
1797 .strategy_orders
1798 .entry(strategy_id)
1799 .or_default()
1800 .insert(client_order_id);
1801
1802 if let Some(account_id) = order.account_id() {
1804 self.index
1805 .account_orders
1806 .entry(account_id)
1807 .or_default()
1808 .insert(client_order_id);
1809 }
1810
1811 if let Some(exec_algorithm_id) = exec_algorithm_id {
1813 self.index.exec_algorithms.insert(exec_algorithm_id);
1814
1815 self.index
1816 .exec_algorithm_orders
1817 .entry(exec_algorithm_id)
1818 .or_default()
1819 .insert(client_order_id);
1820 }
1821
1822 if let Some(exec_spawn_id) = exec_spawn_id {
1824 self.index
1825 .exec_spawn_orders
1826 .entry(exec_spawn_id)
1827 .or_default()
1828 .insert(client_order_id);
1829 }
1830
1831 if let Some(emulation_trigger) = order.emulation_trigger()
1833 && emulation_trigger != TriggerType::NoTrigger
1834 {
1835 self.index.orders_emulated.insert(client_order_id);
1836 }
1837
1838 if let Some(position_id) = position_id {
1840 self.add_position_id(
1841 &position_id,
1842 &order.instrument_id().venue,
1843 &client_order_id,
1844 &strategy_id,
1845 )?;
1846 }
1847
1848 if let Some(client_id) = client_id {
1850 self.index.order_client.insert(client_order_id, client_id);
1851 log::debug!("Indexed {client_id:?}");
1852 }
1853
1854 if let Some(database) = &mut self.database {
1855 database.add_order(&order, client_id)?;
1856 }
1861
1862 self.orders.insert(client_order_id, order);
1863
1864 Ok(())
1865 }
1866
1867 pub fn add_order_list(&mut self, order_list: OrderList) -> anyhow::Result<()> {
1873 let order_list_id = order_list.id;
1874 check_key_not_in_map(
1875 &order_list_id,
1876 &self.order_lists,
1877 stringify!(order_list_id),
1878 stringify!(order_lists),
1879 )?;
1880
1881 log::debug!("Adding {order_list:?}");
1882 self.order_lists.insert(order_list_id, order_list);
1883 Ok(())
1884 }
1885
1886 pub fn add_position_id(
1892 &mut self,
1893 position_id: &PositionId,
1894 venue: &Venue,
1895 client_order_id: &ClientOrderId,
1896 strategy_id: &StrategyId,
1897 ) -> anyhow::Result<()> {
1898 self.index
1899 .order_position
1900 .insert(*client_order_id, *position_id);
1901
1902 if let Some(database) = &mut self.database {
1904 database.index_order_position(*client_order_id, *position_id)?;
1905 }
1906
1907 self.index
1909 .position_strategy
1910 .insert(*position_id, *strategy_id);
1911
1912 self.index
1914 .position_orders
1915 .entry(*position_id)
1916 .or_default()
1917 .insert(*client_order_id);
1918
1919 self.index
1921 .strategy_positions
1922 .entry(*strategy_id)
1923 .or_default()
1924 .insert(*position_id);
1925
1926 self.index
1928 .venue_positions
1929 .entry(*venue)
1930 .or_default()
1931 .insert(*position_id);
1932
1933 Ok(())
1934 }
1935
1936 pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1942 self.positions.insert(position.id, position.clone());
1943 self.index.positions.insert(position.id);
1944 self.index.positions_open.insert(position.id);
1945 self.index.positions_closed.remove(&position.id); log::debug!("Adding {position}");
1948
1949 self.add_position_id(
1950 &position.id,
1951 &position.instrument_id.venue,
1952 &position.opening_order_id,
1953 &position.strategy_id,
1954 )?;
1955
1956 let venue = position.instrument_id.venue;
1957 let venue_positions = self.index.venue_positions.entry(venue).or_default();
1958 venue_positions.insert(position.id);
1959
1960 let instrument_id = position.instrument_id;
1962 let instrument_positions = self
1963 .index
1964 .instrument_positions
1965 .entry(instrument_id)
1966 .or_default();
1967 instrument_positions.insert(position.id);
1968
1969 self.index
1971 .account_positions
1972 .entry(position.account_id)
1973 .or_default()
1974 .insert(position.id);
1975
1976 if let Some(database) = &mut self.database {
1977 database.add_position(&position)?;
1978 }
1987
1988 Ok(())
1989 }
1990
1991 pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1997 let account_id = account.id();
1998 self.accounts.insert(account_id, account.clone());
1999
2000 if let Some(database) = &mut self.database {
2001 database.update_account(&account)?;
2002 }
2003 Ok(())
2004 }
2005
2006 pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2012 let client_order_id = order.client_order_id();
2013
2014 if let Some(venue_order_id) = order.venue_order_id() {
2016 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
2019 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
2021 }
2022 }
2023
2024 if order.is_inflight() {
2026 self.index.orders_inflight.insert(client_order_id);
2027 } else {
2028 self.index.orders_inflight.remove(&client_order_id);
2029 }
2030
2031 if order.is_open() {
2033 self.index.orders_closed.remove(&client_order_id);
2034 self.index.orders_open.insert(client_order_id);
2035 } else if order.is_closed() {
2036 self.index.orders_open.remove(&client_order_id);
2037 self.index.orders_pending_cancel.remove(&client_order_id);
2038 self.index.orders_closed.insert(client_order_id);
2039 }
2040
2041 if let Some(emulation_trigger) = order.emulation_trigger()
2043 && emulation_trigger != TriggerType::NoTrigger
2044 && !order.is_closed()
2045 {
2046 self.index.orders_emulated.insert(client_order_id);
2047 } else {
2048 self.index.orders_emulated.remove(&client_order_id);
2049 }
2050
2051 if let Some(account_id) = order.account_id() {
2053 self.index
2054 .account_orders
2055 .entry(account_id)
2056 .or_default()
2057 .insert(client_order_id);
2058 }
2059
2060 if self.own_order_book(&order.instrument_id()).is_some()
2062 && should_handle_own_book_order(order)
2063 {
2064 self.update_own_order_book(order);
2065 }
2066
2067 if let Some(database) = &mut self.database {
2068 database.update_order(order.last_event())?;
2069 }
2074
2075 self.orders.insert(client_order_id, order.clone());
2077
2078 Ok(())
2079 }
2080
2081 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
2083 self.index
2084 .orders_pending_cancel
2085 .insert(order.client_order_id());
2086 }
2087
2088 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
2094 if position.is_open() {
2097 self.index.positions_open.insert(position.id);
2098 self.index.positions_closed.remove(&position.id);
2099 } else {
2100 self.index.positions_closed.insert(position.id);
2101 self.index.positions_open.remove(&position.id);
2102 }
2103
2104 if let Some(database) = &mut self.database {
2105 database.update_position(position)?;
2106 }
2111
2112 self.positions.insert(position.id, position.clone());
2113
2114 Ok(())
2115 }
2116
2117 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
2124 let position_id = position.id;
2125
2126 let mut copied_position = position.clone();
2127 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
2128 copied_position.id = PositionId::new(new_id);
2129
2130 let position_serialized = serde_json::to_vec(&copied_position)?;
2132
2133 let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
2134 let new_snapshots = match snapshots {
2135 Some(existing_snapshots) => {
2136 let mut combined = existing_snapshots.to_vec();
2137 combined.extend(position_serialized);
2138 Bytes::from(combined)
2139 }
2140 None => Bytes::from(position_serialized),
2141 };
2142 self.position_snapshots.insert(position_id, new_snapshots);
2143
2144 log::debug!("Snapshot {copied_position}");
2145 Ok(())
2146 }
2147
2148 pub fn snapshot_position_state(
2154 &mut self,
2155 position: &Position,
2156 open_only: Option<bool>,
2159 ) -> anyhow::Result<()> {
2160 let open_only = open_only.unwrap_or(true);
2161
2162 if open_only && !position.is_open() {
2163 return Ok(());
2164 }
2165
2166 if let Some(database) = &mut self.database {
2167 database.snapshot_position_state(position).map_err(|e| {
2168 log::error!(
2169 "Failed to snapshot position state for {}: {e:?}",
2170 position.id
2171 );
2172 e
2173 })?;
2174 } else {
2175 log::warn!(
2176 "Cannot snapshot position state for {} (no database configured)",
2177 position.id
2178 );
2179 }
2180
2181 todo!()
2183 }
2184
2185 #[must_use]
2187 pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2188 if self.index.position_strategy.contains_key(position_id) {
2190 Some(OmsType::Netting)
2193 } else {
2194 None
2195 }
2196 }
2197
2198 #[must_use]
2200 pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
2201 self.position_snapshots.get(position_id).map(|b| b.to_vec())
2202 }
2203
2204 #[must_use]
2206 pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2207 let mut result = AHashSet::new();
2209 for (position_id, _) in &self.position_snapshots {
2210 if let Some(position) = self.positions.get(position_id)
2212 && position.instrument_id == *instrument_id
2213 {
2214 result.insert(*position_id);
2215 }
2216 }
2217 result
2218 }
2219
2220 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2226 let database = if let Some(database) = &self.database {
2227 database
2228 } else {
2229 log::warn!(
2230 "Cannot snapshot order state for {} (no database configured)",
2231 order.client_order_id()
2232 );
2233 return Ok(());
2234 };
2235
2236 database.snapshot_order_state(order)
2237 }
2238
2239 fn build_order_query_filter_set(
2242 &self,
2243 venue: Option<&Venue>,
2244 instrument_id: Option<&InstrumentId>,
2245 strategy_id: Option<&StrategyId>,
2246 account_id: Option<&AccountId>,
2247 ) -> Option<AHashSet<ClientOrderId>> {
2248 let mut query: Option<AHashSet<ClientOrderId>> = None;
2249
2250 if let Some(venue) = venue {
2251 query = Some(
2252 self.index
2253 .venue_orders
2254 .get(venue)
2255 .cloned()
2256 .unwrap_or_default(),
2257 );
2258 }
2259
2260 if let Some(instrument_id) = instrument_id {
2261 let instrument_orders = self
2262 .index
2263 .instrument_orders
2264 .get(instrument_id)
2265 .cloned()
2266 .unwrap_or_default();
2267
2268 if let Some(existing_query) = &mut query {
2269 *existing_query = existing_query
2270 .intersection(&instrument_orders)
2271 .copied()
2272 .collect();
2273 } else {
2274 query = Some(instrument_orders);
2275 }
2276 }
2277
2278 if let Some(strategy_id) = strategy_id {
2279 let strategy_orders = self
2280 .index
2281 .strategy_orders
2282 .get(strategy_id)
2283 .cloned()
2284 .unwrap_or_default();
2285
2286 if let Some(existing_query) = &mut query {
2287 *existing_query = existing_query
2288 .intersection(&strategy_orders)
2289 .copied()
2290 .collect();
2291 } else {
2292 query = Some(strategy_orders);
2293 }
2294 }
2295
2296 if let Some(account_id) = account_id {
2297 let account_orders = self
2298 .index
2299 .account_orders
2300 .get(account_id)
2301 .cloned()
2302 .unwrap_or_default();
2303
2304 if let Some(existing_query) = &mut query {
2305 *existing_query = existing_query
2306 .intersection(&account_orders)
2307 .copied()
2308 .collect();
2309 } else {
2310 query = Some(account_orders);
2311 }
2312 }
2313
2314 query
2315 }
2316
2317 fn build_position_query_filter_set(
2318 &self,
2319 venue: Option<&Venue>,
2320 instrument_id: Option<&InstrumentId>,
2321 strategy_id: Option<&StrategyId>,
2322 account_id: Option<&AccountId>,
2323 ) -> Option<AHashSet<PositionId>> {
2324 let mut query: Option<AHashSet<PositionId>> = None;
2325
2326 if let Some(venue) = venue {
2327 query = Some(
2328 self.index
2329 .venue_positions
2330 .get(venue)
2331 .cloned()
2332 .unwrap_or_default(),
2333 );
2334 }
2335
2336 if let Some(instrument_id) = instrument_id {
2337 let instrument_positions = self
2338 .index
2339 .instrument_positions
2340 .get(instrument_id)
2341 .cloned()
2342 .unwrap_or_default();
2343
2344 if let Some(existing_query) = query {
2345 query = Some(
2346 existing_query
2347 .intersection(&instrument_positions)
2348 .copied()
2349 .collect(),
2350 );
2351 } else {
2352 query = Some(instrument_positions);
2353 }
2354 }
2355
2356 if let Some(strategy_id) = strategy_id {
2357 let strategy_positions = self
2358 .index
2359 .strategy_positions
2360 .get(strategy_id)
2361 .cloned()
2362 .unwrap_or_default();
2363
2364 if let Some(existing_query) = query {
2365 query = Some(
2366 existing_query
2367 .intersection(&strategy_positions)
2368 .copied()
2369 .collect(),
2370 );
2371 } else {
2372 query = Some(strategy_positions);
2373 }
2374 }
2375
2376 if let Some(account_id) = account_id {
2377 let account_positions = self
2378 .index
2379 .account_positions
2380 .get(account_id)
2381 .cloned()
2382 .unwrap_or_default();
2383
2384 if let Some(existing_query) = query {
2385 query = Some(
2386 existing_query
2387 .intersection(&account_positions)
2388 .copied()
2389 .collect(),
2390 );
2391 } else {
2392 query = Some(account_positions);
2393 }
2394 }
2395
2396 query
2397 }
2398
2399 fn get_orders_for_ids(
2405 &self,
2406 client_order_ids: &AHashSet<ClientOrderId>,
2407 side: Option<OrderSide>,
2408 ) -> Vec<&OrderAny> {
2409 let side = side.unwrap_or(OrderSide::NoOrderSide);
2410 let mut orders = Vec::new();
2411
2412 for client_order_id in client_order_ids {
2413 let order = self
2414 .orders
2415 .get(client_order_id)
2416 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2417 if side == OrderSide::NoOrderSide || side == order.order_side() {
2418 orders.push(order);
2419 }
2420 }
2421
2422 orders
2423 }
2424
2425 fn get_positions_for_ids(
2431 &self,
2432 position_ids: &AHashSet<PositionId>,
2433 side: Option<PositionSide>,
2434 ) -> Vec<&Position> {
2435 let side = side.unwrap_or(PositionSide::NoPositionSide);
2436 let mut positions = Vec::new();
2437
2438 for position_id in position_ids {
2439 let position = self
2440 .positions
2441 .get(position_id)
2442 .unwrap_or_else(|| panic!("Position {position_id} not found"));
2443 if side == PositionSide::NoPositionSide || side == position.side {
2444 positions.push(position);
2445 }
2446 }
2447
2448 positions
2449 }
2450
2451 #[must_use]
2453 pub fn client_order_ids(
2454 &self,
2455 venue: Option<&Venue>,
2456 instrument_id: Option<&InstrumentId>,
2457 strategy_id: Option<&StrategyId>,
2458 account_id: Option<&AccountId>,
2459 ) -> AHashSet<ClientOrderId> {
2460 let query =
2461 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2462 match query {
2463 Some(query) => self.index.orders.intersection(&query).copied().collect(),
2464 None => self.index.orders.clone(),
2465 }
2466 }
2467
2468 #[must_use]
2470 pub fn client_order_ids_open(
2471 &self,
2472 venue: Option<&Venue>,
2473 instrument_id: Option<&InstrumentId>,
2474 strategy_id: Option<&StrategyId>,
2475 account_id: Option<&AccountId>,
2476 ) -> AHashSet<ClientOrderId> {
2477 let query =
2478 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2479 match query {
2480 Some(query) => self
2481 .index
2482 .orders_open
2483 .intersection(&query)
2484 .copied()
2485 .collect(),
2486 None => self.index.orders_open.clone(),
2487 }
2488 }
2489
2490 #[must_use]
2492 pub fn client_order_ids_closed(
2493 &self,
2494 venue: Option<&Venue>,
2495 instrument_id: Option<&InstrumentId>,
2496 strategy_id: Option<&StrategyId>,
2497 account_id: Option<&AccountId>,
2498 ) -> AHashSet<ClientOrderId> {
2499 let query =
2500 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2501 match query {
2502 Some(query) => self
2503 .index
2504 .orders_closed
2505 .intersection(&query)
2506 .copied()
2507 .collect(),
2508 None => self.index.orders_closed.clone(),
2509 }
2510 }
2511
2512 #[must_use]
2514 pub fn client_order_ids_emulated(
2515 &self,
2516 venue: Option<&Venue>,
2517 instrument_id: Option<&InstrumentId>,
2518 strategy_id: Option<&StrategyId>,
2519 account_id: Option<&AccountId>,
2520 ) -> AHashSet<ClientOrderId> {
2521 let query =
2522 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2523 match query {
2524 Some(query) => self
2525 .index
2526 .orders_emulated
2527 .intersection(&query)
2528 .copied()
2529 .collect(),
2530 None => self.index.orders_emulated.clone(),
2531 }
2532 }
2533
2534 #[must_use]
2536 pub fn client_order_ids_inflight(
2537 &self,
2538 venue: Option<&Venue>,
2539 instrument_id: Option<&InstrumentId>,
2540 strategy_id: Option<&StrategyId>,
2541 account_id: Option<&AccountId>,
2542 ) -> AHashSet<ClientOrderId> {
2543 let query =
2544 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2545 match query {
2546 Some(query) => self
2547 .index
2548 .orders_inflight
2549 .intersection(&query)
2550 .copied()
2551 .collect(),
2552 None => self.index.orders_inflight.clone(),
2553 }
2554 }
2555
2556 #[must_use]
2558 pub fn position_ids(
2559 &self,
2560 venue: Option<&Venue>,
2561 instrument_id: Option<&InstrumentId>,
2562 strategy_id: Option<&StrategyId>,
2563 account_id: Option<&AccountId>,
2564 ) -> AHashSet<PositionId> {
2565 let query =
2566 self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2567 match query {
2568 Some(query) => self.index.positions.intersection(&query).copied().collect(),
2569 None => self.index.positions.clone(),
2570 }
2571 }
2572
2573 #[must_use]
2575 pub fn position_open_ids(
2576 &self,
2577 venue: Option<&Venue>,
2578 instrument_id: Option<&InstrumentId>,
2579 strategy_id: Option<&StrategyId>,
2580 account_id: Option<&AccountId>,
2581 ) -> AHashSet<PositionId> {
2582 let query =
2583 self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2584 match query {
2585 Some(query) => self
2586 .index
2587 .positions_open
2588 .intersection(&query)
2589 .copied()
2590 .collect(),
2591 None => self.index.positions_open.clone(),
2592 }
2593 }
2594
2595 #[must_use]
2597 pub fn position_closed_ids(
2598 &self,
2599 venue: Option<&Venue>,
2600 instrument_id: Option<&InstrumentId>,
2601 strategy_id: Option<&StrategyId>,
2602 account_id: Option<&AccountId>,
2603 ) -> AHashSet<PositionId> {
2604 let query =
2605 self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2606 match query {
2607 Some(query) => self
2608 .index
2609 .positions_closed
2610 .intersection(&query)
2611 .copied()
2612 .collect(),
2613 None => self.index.positions_closed.clone(),
2614 }
2615 }
2616
2617 #[must_use]
2619 pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2620 self.index.actors.clone()
2621 }
2622
2623 #[must_use]
2625 pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2626 self.index.strategies.clone()
2627 }
2628
2629 #[must_use]
2631 pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2632 self.index.exec_algorithms.clone()
2633 }
2634
2635 #[must_use]
2639 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2640 self.orders.get(client_order_id)
2641 }
2642
2643 #[must_use]
2645 pub fn orders_for_ids(
2646 &self,
2647 client_order_ids: &[ClientOrderId],
2648 context: &dyn Display,
2649 ) -> Vec<OrderAny> {
2650 let mut orders = Vec::with_capacity(client_order_ids.len());
2651 for id in client_order_ids {
2652 match self.orders.get(id) {
2653 Some(order) => orders.push(order.clone()),
2654 None => log::error!("Order {id} not found in cache for {context}"),
2655 }
2656 }
2657 orders
2658 }
2659
2660 #[must_use]
2662 pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2663 self.orders.get_mut(client_order_id)
2664 }
2665
2666 #[must_use]
2668 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2669 self.index.venue_order_ids.get(venue_order_id)
2670 }
2671
2672 #[must_use]
2674 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2675 self.index.client_order_ids.get(client_order_id)
2676 }
2677
2678 #[must_use]
2680 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2681 self.index.order_client.get(client_order_id)
2682 }
2683
2684 #[must_use]
2686 pub fn orders(
2687 &self,
2688 venue: Option<&Venue>,
2689 instrument_id: Option<&InstrumentId>,
2690 strategy_id: Option<&StrategyId>,
2691 account_id: Option<&AccountId>,
2692 side: Option<OrderSide>,
2693 ) -> Vec<&OrderAny> {
2694 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id, account_id);
2695 self.get_orders_for_ids(&client_order_ids, side)
2696 }
2697
2698 #[must_use]
2700 pub fn orders_open(
2701 &self,
2702 venue: Option<&Venue>,
2703 instrument_id: Option<&InstrumentId>,
2704 strategy_id: Option<&StrategyId>,
2705 account_id: Option<&AccountId>,
2706 side: Option<OrderSide>,
2707 ) -> Vec<&OrderAny> {
2708 let client_order_ids =
2709 self.client_order_ids_open(venue, instrument_id, strategy_id, account_id);
2710 self.get_orders_for_ids(&client_order_ids, side)
2711 }
2712
2713 #[must_use]
2715 pub fn orders_closed(
2716 &self,
2717 venue: Option<&Venue>,
2718 instrument_id: Option<&InstrumentId>,
2719 strategy_id: Option<&StrategyId>,
2720 account_id: Option<&AccountId>,
2721 side: Option<OrderSide>,
2722 ) -> Vec<&OrderAny> {
2723 let client_order_ids =
2724 self.client_order_ids_closed(venue, instrument_id, strategy_id, account_id);
2725 self.get_orders_for_ids(&client_order_ids, side)
2726 }
2727
2728 #[must_use]
2730 pub fn orders_emulated(
2731 &self,
2732 venue: Option<&Venue>,
2733 instrument_id: Option<&InstrumentId>,
2734 strategy_id: Option<&StrategyId>,
2735 account_id: Option<&AccountId>,
2736 side: Option<OrderSide>,
2737 ) -> Vec<&OrderAny> {
2738 let client_order_ids =
2739 self.client_order_ids_emulated(venue, instrument_id, strategy_id, account_id);
2740 self.get_orders_for_ids(&client_order_ids, side)
2741 }
2742
2743 #[must_use]
2745 pub fn orders_inflight(
2746 &self,
2747 venue: Option<&Venue>,
2748 instrument_id: Option<&InstrumentId>,
2749 strategy_id: Option<&StrategyId>,
2750 account_id: Option<&AccountId>,
2751 side: Option<OrderSide>,
2752 ) -> Vec<&OrderAny> {
2753 let client_order_ids =
2754 self.client_order_ids_inflight(venue, instrument_id, strategy_id, account_id);
2755 self.get_orders_for_ids(&client_order_ids, side)
2756 }
2757
2758 #[must_use]
2760 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2761 let client_order_ids = self.index.position_orders.get(position_id);
2762 match client_order_ids {
2763 Some(client_order_ids) => {
2764 self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2765 }
2766 None => Vec::new(),
2767 }
2768 }
2769
2770 #[must_use]
2772 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2773 self.index.orders.contains(client_order_id)
2774 }
2775
2776 #[must_use]
2778 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2779 self.index.orders_open.contains(client_order_id)
2780 }
2781
2782 #[must_use]
2784 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2785 self.index.orders_closed.contains(client_order_id)
2786 }
2787
2788 #[must_use]
2790 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2791 self.index.orders_emulated.contains(client_order_id)
2792 }
2793
2794 #[must_use]
2796 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2797 self.index.orders_inflight.contains(client_order_id)
2798 }
2799
2800 #[must_use]
2802 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2803 self.index.orders_pending_cancel.contains(client_order_id)
2804 }
2805
2806 #[must_use]
2808 pub fn orders_open_count(
2809 &self,
2810 venue: Option<&Venue>,
2811 instrument_id: Option<&InstrumentId>,
2812 strategy_id: Option<&StrategyId>,
2813 account_id: Option<&AccountId>,
2814 side: Option<OrderSide>,
2815 ) -> usize {
2816 self.orders_open(venue, instrument_id, strategy_id, account_id, side)
2817 .len()
2818 }
2819
2820 #[must_use]
2822 pub fn orders_closed_count(
2823 &self,
2824 venue: Option<&Venue>,
2825 instrument_id: Option<&InstrumentId>,
2826 strategy_id: Option<&StrategyId>,
2827 account_id: Option<&AccountId>,
2828 side: Option<OrderSide>,
2829 ) -> usize {
2830 self.orders_closed(venue, instrument_id, strategy_id, account_id, side)
2831 .len()
2832 }
2833
2834 #[must_use]
2836 pub fn orders_emulated_count(
2837 &self,
2838 venue: Option<&Venue>,
2839 instrument_id: Option<&InstrumentId>,
2840 strategy_id: Option<&StrategyId>,
2841 account_id: Option<&AccountId>,
2842 side: Option<OrderSide>,
2843 ) -> usize {
2844 self.orders_emulated(venue, instrument_id, strategy_id, account_id, side)
2845 .len()
2846 }
2847
2848 #[must_use]
2850 pub fn orders_inflight_count(
2851 &self,
2852 venue: Option<&Venue>,
2853 instrument_id: Option<&InstrumentId>,
2854 strategy_id: Option<&StrategyId>,
2855 account_id: Option<&AccountId>,
2856 side: Option<OrderSide>,
2857 ) -> usize {
2858 self.orders_inflight(venue, instrument_id, strategy_id, account_id, side)
2859 .len()
2860 }
2861
2862 #[must_use]
2864 pub fn orders_total_count(
2865 &self,
2866 venue: Option<&Venue>,
2867 instrument_id: Option<&InstrumentId>,
2868 strategy_id: Option<&StrategyId>,
2869 account_id: Option<&AccountId>,
2870 side: Option<OrderSide>,
2871 ) -> usize {
2872 self.orders(venue, instrument_id, strategy_id, account_id, side)
2873 .len()
2874 }
2875
2876 #[must_use]
2878 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2879 self.order_lists.get(order_list_id)
2880 }
2881
2882 #[must_use]
2884 pub fn order_lists(
2885 &self,
2886 venue: Option<&Venue>,
2887 instrument_id: Option<&InstrumentId>,
2888 strategy_id: Option<&StrategyId>,
2889 account_id: Option<&AccountId>,
2890 ) -> Vec<&OrderList> {
2891 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2892
2893 if let Some(venue) = venue {
2894 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2895 }
2896
2897 if let Some(instrument_id) = instrument_id {
2898 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2899 }
2900
2901 if let Some(strategy_id) = strategy_id {
2902 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2903 }
2904
2905 if let Some(account_id) = account_id {
2906 order_lists.retain(|ol| {
2907 ol.client_order_ids.iter().any(|client_order_id| {
2908 self.orders
2909 .get(client_order_id)
2910 .is_some_and(|order| order.account_id().as_ref() == Some(account_id))
2911 })
2912 });
2913 }
2914
2915 order_lists
2916 }
2917
2918 #[must_use]
2920 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2921 self.order_lists.contains_key(order_list_id)
2922 }
2923
2924 #[must_use]
2929 pub fn orders_for_exec_algorithm(
2930 &self,
2931 exec_algorithm_id: &ExecAlgorithmId,
2932 venue: Option<&Venue>,
2933 instrument_id: Option<&InstrumentId>,
2934 strategy_id: Option<&StrategyId>,
2935 account_id: Option<&AccountId>,
2936 side: Option<OrderSide>,
2937 ) -> Vec<&OrderAny> {
2938 let query =
2939 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2940 let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2941
2942 if let Some(query) = query
2943 && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
2944 {
2945 let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2946 }
2947
2948 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2949 self.get_orders_for_ids(exec_algorithm_order_ids, side)
2950 } else {
2951 Vec::new()
2952 }
2953 }
2954
2955 #[must_use]
2957 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2958 self.get_orders_for_ids(
2959 self.index
2960 .exec_spawn_orders
2961 .get(exec_spawn_id)
2962 .unwrap_or(&AHashSet::new()),
2963 None,
2964 )
2965 }
2966
2967 #[must_use]
2969 pub fn exec_spawn_total_quantity(
2970 &self,
2971 exec_spawn_id: &ClientOrderId,
2972 active_only: bool,
2973 ) -> Option<Quantity> {
2974 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2975
2976 let mut total_quantity: Option<Quantity> = None;
2977
2978 for spawn_order in exec_spawn_orders {
2979 if active_only && spawn_order.is_closed() {
2980 continue;
2981 }
2982
2983 match total_quantity.as_mut() {
2984 Some(total) => *total = *total + spawn_order.quantity(),
2985 None => total_quantity = Some(spawn_order.quantity()),
2986 }
2987 }
2988
2989 total_quantity
2990 }
2991
2992 #[must_use]
2994 pub fn exec_spawn_total_filled_qty(
2995 &self,
2996 exec_spawn_id: &ClientOrderId,
2997 active_only: bool,
2998 ) -> Option<Quantity> {
2999 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3000
3001 let mut total_quantity: Option<Quantity> = None;
3002
3003 for spawn_order in exec_spawn_orders {
3004 if active_only && spawn_order.is_closed() {
3005 continue;
3006 }
3007
3008 match total_quantity.as_mut() {
3009 Some(total) => *total = *total + spawn_order.filled_qty(),
3010 None => total_quantity = Some(spawn_order.filled_qty()),
3011 }
3012 }
3013
3014 total_quantity
3015 }
3016
3017 #[must_use]
3019 pub fn exec_spawn_total_leaves_qty(
3020 &self,
3021 exec_spawn_id: &ClientOrderId,
3022 active_only: bool,
3023 ) -> Option<Quantity> {
3024 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3025
3026 let mut total_quantity: Option<Quantity> = None;
3027
3028 for spawn_order in exec_spawn_orders {
3029 if active_only && spawn_order.is_closed() {
3030 continue;
3031 }
3032
3033 match total_quantity.as_mut() {
3034 Some(total) => *total = *total + spawn_order.leaves_qty(),
3035 None => total_quantity = Some(spawn_order.leaves_qty()),
3036 }
3037 }
3038
3039 total_quantity
3040 }
3041
3042 #[must_use]
3046 pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
3047 self.positions.get(position_id)
3048 }
3049
3050 #[must_use]
3052 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
3053 self.index
3054 .order_position
3055 .get(client_order_id)
3056 .and_then(|position_id| self.positions.get(position_id))
3057 }
3058
3059 #[must_use]
3061 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
3062 self.index.order_position.get(client_order_id)
3063 }
3064
3065 #[must_use]
3067 pub fn positions(
3068 &self,
3069 venue: Option<&Venue>,
3070 instrument_id: Option<&InstrumentId>,
3071 strategy_id: Option<&StrategyId>,
3072 account_id: Option<&AccountId>,
3073 side: Option<PositionSide>,
3074 ) -> Vec<&Position> {
3075 let position_ids = self.position_ids(venue, instrument_id, strategy_id, account_id);
3076 self.get_positions_for_ids(&position_ids, side)
3077 }
3078
3079 #[must_use]
3081 pub fn positions_open(
3082 &self,
3083 venue: Option<&Venue>,
3084 instrument_id: Option<&InstrumentId>,
3085 strategy_id: Option<&StrategyId>,
3086 account_id: Option<&AccountId>,
3087 side: Option<PositionSide>,
3088 ) -> Vec<&Position> {
3089 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id, account_id);
3090 self.get_positions_for_ids(&position_ids, side)
3091 }
3092
3093 #[must_use]
3095 pub fn positions_closed(
3096 &self,
3097 venue: Option<&Venue>,
3098 instrument_id: Option<&InstrumentId>,
3099 strategy_id: Option<&StrategyId>,
3100 account_id: Option<&AccountId>,
3101 side: Option<PositionSide>,
3102 ) -> Vec<&Position> {
3103 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id, account_id);
3104 self.get_positions_for_ids(&position_ids, side)
3105 }
3106
3107 #[must_use]
3109 pub fn position_exists(&self, position_id: &PositionId) -> bool {
3110 self.index.positions.contains(position_id)
3111 }
3112
3113 #[must_use]
3115 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
3116 self.index.positions_open.contains(position_id)
3117 }
3118
3119 #[must_use]
3121 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
3122 self.index.positions_closed.contains(position_id)
3123 }
3124
3125 #[must_use]
3127 pub fn positions_open_count(
3128 &self,
3129 venue: Option<&Venue>,
3130 instrument_id: Option<&InstrumentId>,
3131 strategy_id: Option<&StrategyId>,
3132 account_id: Option<&AccountId>,
3133 side: Option<PositionSide>,
3134 ) -> usize {
3135 self.positions_open(venue, instrument_id, strategy_id, account_id, side)
3136 .len()
3137 }
3138
3139 #[must_use]
3141 pub fn positions_closed_count(
3142 &self,
3143 venue: Option<&Venue>,
3144 instrument_id: Option<&InstrumentId>,
3145 strategy_id: Option<&StrategyId>,
3146 account_id: Option<&AccountId>,
3147 side: Option<PositionSide>,
3148 ) -> usize {
3149 self.positions_closed(venue, instrument_id, strategy_id, account_id, side)
3150 .len()
3151 }
3152
3153 #[must_use]
3155 pub fn positions_total_count(
3156 &self,
3157 venue: Option<&Venue>,
3158 instrument_id: Option<&InstrumentId>,
3159 strategy_id: Option<&StrategyId>,
3160 account_id: Option<&AccountId>,
3161 side: Option<PositionSide>,
3162 ) -> usize {
3163 self.positions(venue, instrument_id, strategy_id, account_id, side)
3164 .len()
3165 }
3166
3167 #[must_use]
3171 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
3172 self.index.order_strategy.get(client_order_id)
3173 }
3174
3175 #[must_use]
3177 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
3178 self.index.position_strategy.get(position_id)
3179 }
3180
3181 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
3189 check_valid_string_ascii(key, stringify!(key))?;
3190
3191 Ok(self.general.get(key))
3192 }
3193
3194 #[must_use]
3198 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
3199 match price_type {
3200 PriceType::Bid => self
3201 .quotes
3202 .get(instrument_id)
3203 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
3204 PriceType::Ask => self
3205 .quotes
3206 .get(instrument_id)
3207 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
3208 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
3209 quotes.front().map(|quote| {
3210 Price::new(
3211 f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
3212 quote.bid_price.precision + 1,
3213 )
3214 })
3215 }),
3216 PriceType::Last => self
3217 .trades
3218 .get(instrument_id)
3219 .and_then(|trades| trades.front().map(|trade| trade.price)),
3220 PriceType::Mark => self
3221 .mark_prices
3222 .get(instrument_id)
3223 .and_then(|marks| marks.front().map(|mark| mark.value)),
3224 }
3225 }
3226
3227 #[must_use]
3229 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
3230 self.quotes
3231 .get(instrument_id)
3232 .map(|quotes| quotes.iter().copied().collect())
3233 }
3234
3235 #[must_use]
3237 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
3238 self.trades
3239 .get(instrument_id)
3240 .map(|trades| trades.iter().copied().collect())
3241 }
3242
3243 #[must_use]
3245 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
3246 self.mark_prices
3247 .get(instrument_id)
3248 .map(|mark_prices| mark_prices.iter().copied().collect())
3249 }
3250
3251 #[must_use]
3253 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
3254 self.index_prices
3255 .get(instrument_id)
3256 .map(|index_prices| index_prices.iter().copied().collect())
3257 }
3258
3259 #[must_use]
3261 pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
3262 self.funding_rates
3263 .get(instrument_id)
3264 .map(|funding_rates| funding_rates.iter().copied().collect())
3265 }
3266
3267 #[must_use]
3269 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
3270 self.bars
3271 .get(bar_type)
3272 .map(|bars| bars.iter().copied().collect())
3273 }
3274
3275 #[must_use]
3277 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3278 self.books.get(instrument_id)
3279 }
3280
3281 #[must_use]
3283 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3284 self.books.get_mut(instrument_id)
3285 }
3286
3287 #[must_use]
3289 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3290 self.own_books.get(instrument_id)
3291 }
3292
3293 #[must_use]
3295 pub fn own_order_book_mut(
3296 &mut self,
3297 instrument_id: &InstrumentId,
3298 ) -> Option<&mut OwnOrderBook> {
3299 self.own_books.get_mut(instrument_id)
3300 }
3301
3302 #[must_use]
3304 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3305 self.quotes
3306 .get(instrument_id)
3307 .and_then(|quotes| quotes.front())
3308 }
3309
3310 #[must_use]
3312 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3313 self.trades
3314 .get(instrument_id)
3315 .and_then(|trades| trades.front())
3316 }
3317
3318 #[must_use]
3320 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3321 self.mark_prices
3322 .get(instrument_id)
3323 .and_then(|mark_prices| mark_prices.front())
3324 }
3325
3326 #[must_use]
3328 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3329 self.index_prices
3330 .get(instrument_id)
3331 .and_then(|index_prices| index_prices.front())
3332 }
3333
3334 #[must_use]
3336 pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3337 self.funding_rates
3338 .get(instrument_id)
3339 .and_then(|funding_rates| funding_rates.front())
3340 }
3341
3342 #[must_use]
3344 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3345 self.bars.get(bar_type).and_then(|bars| bars.front())
3346 }
3347
3348 #[must_use]
3350 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3351 self.books
3352 .get(instrument_id)
3353 .map_or(0, |book| book.update_count) as usize
3354 }
3355
3356 #[must_use]
3358 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3359 self.quotes
3360 .get(instrument_id)
3361 .map_or(0, std::collections::VecDeque::len)
3362 }
3363
3364 #[must_use]
3366 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3367 self.trades
3368 .get(instrument_id)
3369 .map_or(0, std::collections::VecDeque::len)
3370 }
3371
3372 #[must_use]
3374 pub fn bar_count(&self, bar_type: &BarType) -> usize {
3375 self.bars
3376 .get(bar_type)
3377 .map_or(0, std::collections::VecDeque::len)
3378 }
3379
3380 #[must_use]
3382 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3383 self.books.contains_key(instrument_id)
3384 }
3385
3386 #[must_use]
3388 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3389 self.quote_count(instrument_id) > 0
3390 }
3391
3392 #[must_use]
3394 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3395 self.trade_count(instrument_id) > 0
3396 }
3397
3398 #[must_use]
3400 pub fn has_bars(&self, bar_type: &BarType) -> bool {
3401 self.bar_count(bar_type) > 0
3402 }
3403
3404 #[must_use]
3405 pub fn get_xrate(
3406 &self,
3407 venue: Venue,
3408 from_currency: Currency,
3409 to_currency: Currency,
3410 price_type: PriceType,
3411 ) -> Option<f64> {
3412 if from_currency == to_currency {
3413 return Some(1.0);
3416 }
3417
3418 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3419
3420 match get_exchange_rate(
3421 from_currency.code,
3422 to_currency.code,
3423 price_type,
3424 bid_quote,
3425 ask_quote,
3426 ) {
3427 Ok(rate) => rate,
3428 Err(e) => {
3429 log::error!("Failed to calculate xrate: {e}");
3430 None
3431 }
3432 }
3433 }
3434
3435 fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3436 let mut bid_quotes = AHashMap::new();
3437 let mut ask_quotes = AHashMap::new();
3438
3439 for instrument_id in self.instruments.keys() {
3440 if instrument_id.venue != *venue {
3441 continue;
3442 }
3443
3444 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3445 if let Some(tick) = ticks.front() {
3446 (tick.bid_price, tick.ask_price)
3447 } else {
3448 continue; }
3450 } else {
3451 let bid_bar = self
3452 .bars
3453 .iter()
3454 .find(|(k, _)| {
3455 k.instrument_id() == *instrument_id
3456 && matches!(k.spec().price_type, PriceType::Bid)
3457 })
3458 .map(|(_, v)| v);
3459
3460 let ask_bar = self
3461 .bars
3462 .iter()
3463 .find(|(k, _)| {
3464 k.instrument_id() == *instrument_id
3465 && matches!(k.spec().price_type, PriceType::Ask)
3466 })
3467 .map(|(_, v)| v);
3468
3469 match (bid_bar, ask_bar) {
3470 (Some(bid), Some(ask)) => {
3471 match (bid.front(), ask.front()) {
3472 (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
3473 _ => {
3474 continue;
3476 }
3477 }
3478 }
3479 _ => continue,
3480 }
3481 };
3482
3483 bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3484 ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3485 }
3486
3487 (bid_quotes, ask_quotes)
3488 }
3489
3490 #[must_use]
3492 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3493 self.mark_xrates.get(&(from_currency, to_currency)).copied()
3494 }
3495
3496 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3502 assert!(xrate > 0.0, "xrate was zero");
3503 self.mark_xrates.insert((from_currency, to_currency), xrate);
3504 self.mark_xrates
3505 .insert((to_currency, from_currency), 1.0 / xrate);
3506 }
3507
3508 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3510 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3511 }
3512
3513 pub fn clear_mark_xrates(&mut self) {
3515 self.mark_xrates.clear();
3516 }
3517
3518 #[must_use]
3522 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3523 self.instruments.get(instrument_id)
3524 }
3525
3526 #[must_use]
3528 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3529 match venue {
3530 Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3531 None => self.instruments.keys().collect(),
3532 }
3533 }
3534
3535 #[must_use]
3537 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3538 self.instruments
3539 .values()
3540 .filter(|i| &i.id().venue == venue)
3541 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3542 .collect()
3543 }
3544
3545 #[must_use]
3547 pub fn bar_types(
3548 &self,
3549 instrument_id: Option<&InstrumentId>,
3550 price_type: Option<&PriceType>,
3551 aggregation_source: AggregationSource,
3552 ) -> Vec<&BarType> {
3553 let mut bar_types = self
3554 .bars
3555 .keys()
3556 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3557 .collect::<Vec<&BarType>>();
3558
3559 if let Some(instrument_id) = instrument_id {
3560 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3561 }
3562
3563 if let Some(price_type) = price_type {
3564 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3565 }
3566
3567 bar_types
3568 }
3569
3570 #[must_use]
3574 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3575 self.synthetics.get(instrument_id)
3576 }
3577
3578 #[must_use]
3580 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3581 self.synthetics.keys().collect()
3582 }
3583
3584 #[must_use]
3586 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3587 self.synthetics.values().collect()
3588 }
3589
3590 #[must_use]
3594 pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3595 self.accounts.get(account_id)
3596 }
3597
3598 #[must_use]
3600 pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3601 self.index
3602 .venue_account
3603 .get(venue)
3604 .and_then(|account_id| self.accounts.get(account_id))
3605 }
3606
3607 #[must_use]
3609 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3610 self.index.venue_account.get(venue)
3611 }
3612
3613 #[must_use]
3615 pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3616 self.accounts
3617 .values()
3618 .filter(|account| &account.id() == account_id)
3619 .collect()
3620 }
3621
3622 pub fn update_own_order_book(&mut self, order: &OrderAny) {
3630 if !order.has_price() {
3631 return;
3632 }
3633
3634 let instrument_id = order.instrument_id();
3635
3636 let own_book = self
3637 .own_books
3638 .entry(instrument_id)
3639 .or_insert_with(|| OwnOrderBook::new(instrument_id));
3640
3641 let own_book_order = order.to_own_book_order();
3642
3643 if order.is_closed() {
3644 if let Err(e) = own_book.delete(own_book_order) {
3645 log::debug!(
3646 "Failed to delete order {} from own book: {e}",
3647 order.client_order_id(),
3648 );
3649 } else {
3650 log::debug!("Deleted order {} from own book", order.client_order_id());
3651 }
3652 } else {
3653 if let Err(e) = own_book.update(own_book_order) {
3655 log::debug!(
3656 "Failed to update order {} in own book: {e}; inserting instead",
3657 order.client_order_id(),
3658 );
3659 own_book.add(own_book_order);
3660 }
3661 log::debug!("Updated order {} in own book", order.client_order_id());
3662 }
3663 }
3664
3665 pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
3671 let order = match self.orders.get(client_order_id) {
3672 Some(order) => order,
3673 None => return,
3674 };
3675
3676 self.index.orders_open.remove(client_order_id);
3677 self.index.orders_pending_cancel.remove(client_order_id);
3678 self.index.orders_inflight.remove(client_order_id);
3679 self.index.orders_emulated.remove(client_order_id);
3680
3681 if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
3682 && order.has_price()
3683 {
3684 let own_book_order = order.to_own_book_order();
3685 if let Err(e) = own_book.delete(own_book_order) {
3686 log::debug!("Could not force delete {client_order_id} from own book: {e}");
3687 } else {
3688 log::debug!("Force deleted {client_order_id} from own book");
3689 }
3690 }
3691
3692 self.index.orders_closed.insert(*client_order_id);
3693 }
3694
3695 pub fn audit_own_order_books(&mut self) {
3702 log::debug!("Starting own books audit");
3703 let start = std::time::Instant::now();
3704
3705 let valid_order_ids: AHashSet<ClientOrderId> = self
3708 .index
3709 .orders_open
3710 .union(&self.index.orders_inflight)
3711 .copied()
3712 .collect();
3713
3714 for own_book in self.own_books.values_mut() {
3715 own_book.audit_open_orders(&valid_order_ids);
3716 }
3717
3718 log::debug!("Completed own books audit in {:?}", start.elapsed());
3719 }
3720}