1pub mod config;
21pub mod database;
22pub mod quote;
23
24mod index;
25
26#[cfg(test)]
27mod tests;
28
29use std::{
30 collections::VecDeque,
31 fmt::Debug,
32 time::{SystemTime, UNIX_EPOCH},
33};
34
35use ahash::{AHashMap, AHashSet};
36use bytes::Bytes;
37pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
39use index::CacheIndex;
40use nautilus_core::{
41 UUID4, UnixNanos,
42 correctness::{
43 check_key_not_in_map, check_predicate_false, check_slice_not_empty,
44 check_valid_string_ascii,
45 },
46 datetime::secs_to_nanos_unchecked,
47};
48use nautilus_model::{
49 accounts::{Account, AccountAny},
50 data::{
51 Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
52 TradeTick, YieldCurveData,
53 },
54 enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
55 identifiers::{
56 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
57 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
58 },
59 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
60 orderbook::{
61 OrderBook,
62 own::{OwnOrderBook, should_handle_own_book_order},
63 },
64 orders::{Order, OrderAny, OrderList},
65 position::Position,
66 types::{Currency, Money, Price, Quantity},
67};
68use ustr::Ustr;
69
70use crate::xrate::get_exchange_rate;
71
72#[cfg_attr(
74 feature = "python",
75 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
76)]
77pub struct Cache {
78 config: CacheConfig,
79 index: CacheIndex,
80 database: Option<Box<dyn CacheDatabaseAdapter>>,
81 general: AHashMap<String, Bytes>,
82 currencies: AHashMap<Ustr, Currency>,
83 instruments: AHashMap<InstrumentId, InstrumentAny>,
84 synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
85 books: AHashMap<InstrumentId, OrderBook>,
86 own_books: AHashMap<InstrumentId, OwnOrderBook>,
87 quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
88 trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
89 mark_xrates: AHashMap<(Currency, Currency), f64>,
90 mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
91 index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
92 funding_rates: AHashMap<InstrumentId, FundingRateUpdate>,
93 bars: AHashMap<BarType, VecDeque<Bar>>,
94 greeks: AHashMap<InstrumentId, GreeksData>,
95 yield_curves: AHashMap<String, YieldCurveData>,
96 accounts: AHashMap<AccountId, AccountAny>,
97 orders: AHashMap<ClientOrderId, OrderAny>,
98 order_lists: AHashMap<OrderListId, OrderList>,
99 positions: AHashMap<PositionId, Position>,
100 position_snapshots: AHashMap<PositionId, Bytes>,
101 #[cfg(feature = "defi")]
102 pub(crate) defi: crate::defi::cache::DefiCache,
103}
104
105impl Debug for Cache {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 f.debug_struct(stringify!(Cache))
108 .field("config", &self.config)
109 .field("index", &self.index)
110 .field("general", &self.general)
111 .field("currencies", &self.currencies)
112 .field("instruments", &self.instruments)
113 .field("synthetics", &self.synthetics)
114 .field("books", &self.books)
115 .field("own_books", &self.own_books)
116 .field("quotes", &self.quotes)
117 .field("trades", &self.trades)
118 .field("mark_xrates", &self.mark_xrates)
119 .field("mark_prices", &self.mark_prices)
120 .field("index_prices", &self.index_prices)
121 .field("funding_rates", &self.funding_rates)
122 .field("bars", &self.bars)
123 .field("greeks", &self.greeks)
124 .field("yield_curves", &self.yield_curves)
125 .field("accounts", &self.accounts)
126 .field("orders", &self.orders)
127 .field("order_lists", &self.order_lists)
128 .field("positions", &self.positions)
129 .field("position_snapshots", &self.position_snapshots)
130 .finish()
131 }
132}
133
134impl Default for Cache {
135 fn default() -> Self {
137 Self::new(Some(CacheConfig::default()), None)
138 }
139}
140
141impl Cache {
142 #[must_use]
144 pub fn new(
148 config: Option<CacheConfig>,
149 database: Option<Box<dyn CacheDatabaseAdapter>>,
150 ) -> Self {
151 Self {
152 config: config.unwrap_or_default(),
153 index: CacheIndex::default(),
154 database,
155 general: AHashMap::new(),
156 currencies: AHashMap::new(),
157 instruments: AHashMap::new(),
158 synthetics: AHashMap::new(),
159 books: AHashMap::new(),
160 own_books: AHashMap::new(),
161 quotes: AHashMap::new(),
162 trades: AHashMap::new(),
163 mark_xrates: AHashMap::new(),
164 mark_prices: AHashMap::new(),
165 index_prices: AHashMap::new(),
166 funding_rates: AHashMap::new(),
167 bars: AHashMap::new(),
168 greeks: AHashMap::new(),
169 yield_curves: AHashMap::new(),
170 accounts: AHashMap::new(),
171 orders: AHashMap::new(),
172 order_lists: AHashMap::new(),
173 positions: AHashMap::new(),
174 position_snapshots: AHashMap::new(),
175 #[cfg(feature = "defi")]
176 defi: crate::defi::cache::DefiCache::default(),
177 }
178 }
179
180 #[must_use]
182 pub fn memory_address(&self) -> String {
183 format!("{:?}", std::ptr::from_ref(self))
184 }
185
186 pub fn cache_general(&mut self) -> anyhow::Result<()> {
194 self.general = match &mut self.database {
195 Some(db) => db.load()?,
196 None => AHashMap::new(),
197 };
198
199 log::info!(
200 "Cached {} general object(s) from database",
201 self.general.len()
202 );
203 Ok(())
204 }
205
206 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
212 let cache_map = match &self.database {
213 Some(db) => db.load_all().await?,
214 None => CacheMap::default(),
215 };
216
217 self.currencies = cache_map.currencies;
218 self.instruments = cache_map.instruments;
219 self.synthetics = cache_map.synthetics;
220 self.accounts = cache_map.accounts;
221 self.orders = cache_map.orders;
222 self.positions = cache_map.positions;
223 Ok(())
224 }
225
226 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
232 self.currencies = match &mut self.database {
233 Some(db) => db.load_currencies().await?,
234 None => AHashMap::new(),
235 };
236
237 log::info!("Cached {} currencies from database", self.general.len());
238 Ok(())
239 }
240
241 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
247 self.instruments = match &mut self.database {
248 Some(db) => db.load_instruments().await?,
249 None => AHashMap::new(),
250 };
251
252 log::info!("Cached {} instruments from database", self.general.len());
253 Ok(())
254 }
255
256 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
262 self.synthetics = match &mut self.database {
263 Some(db) => db.load_synthetics().await?,
264 None => AHashMap::new(),
265 };
266
267 log::info!(
268 "Cached {} synthetic instruments from database",
269 self.general.len()
270 );
271 Ok(())
272 }
273
274 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
280 self.accounts = match &mut self.database {
281 Some(db) => db.load_accounts().await?,
282 None => AHashMap::new(),
283 };
284
285 log::info!(
286 "Cached {} synthetic instruments from database",
287 self.general.len()
288 );
289 Ok(())
290 }
291
292 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
298 self.orders = match &mut self.database {
299 Some(db) => db.load_orders().await?,
300 None => AHashMap::new(),
301 };
302
303 log::info!("Cached {} orders from database", self.general.len());
304 Ok(())
305 }
306
307 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
313 self.positions = match &mut self.database {
314 Some(db) => db.load_positions().await?,
315 None => AHashMap::new(),
316 };
317
318 log::info!("Cached {} positions from database", self.general.len());
319 Ok(())
320 }
321
322 pub fn build_index(&mut self) {
324 log::debug!("Building index");
325
326 for account_id in self.accounts.keys() {
328 self.index
329 .venue_account
330 .insert(account_id.get_issuer(), *account_id);
331 }
332
333 for (client_order_id, order) in &self.orders {
335 let instrument_id = order.instrument_id();
336 let venue = instrument_id.venue;
337 let strategy_id = order.strategy_id();
338
339 self.index
341 .venue_orders
342 .entry(venue)
343 .or_default()
344 .insert(*client_order_id);
345
346 if let Some(venue_order_id) = order.venue_order_id() {
348 self.index
349 .venue_order_ids
350 .insert(venue_order_id, *client_order_id);
351 }
352
353 if let Some(position_id) = order.position_id() {
355 self.index
356 .order_position
357 .insert(*client_order_id, position_id);
358 }
359
360 self.index
362 .order_strategy
363 .insert(*client_order_id, order.strategy_id());
364
365 self.index
367 .instrument_orders
368 .entry(instrument_id)
369 .or_default()
370 .insert(*client_order_id);
371
372 self.index
374 .strategy_orders
375 .entry(strategy_id)
376 .or_default()
377 .insert(*client_order_id);
378
379 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
381 self.index
382 .exec_algorithm_orders
383 .entry(exec_algorithm_id)
384 .or_default()
385 .insert(*client_order_id);
386 }
387
388 if let Some(exec_spawn_id) = order.exec_spawn_id() {
390 self.index
391 .exec_spawn_orders
392 .entry(exec_spawn_id)
393 .or_default()
394 .insert(*client_order_id);
395 }
396
397 self.index.orders.insert(*client_order_id);
399
400 if order.is_open() {
402 self.index.orders_open.insert(*client_order_id);
403 }
404
405 if order.is_closed() {
407 self.index.orders_closed.insert(*client_order_id);
408 }
409
410 if let Some(emulation_trigger) = order.emulation_trigger()
412 && emulation_trigger != TriggerType::NoTrigger
413 && !order.is_closed()
414 {
415 self.index.orders_emulated.insert(*client_order_id);
416 }
417
418 if order.is_inflight() {
420 self.index.orders_inflight.insert(*client_order_id);
421 }
422
423 self.index.strategies.insert(strategy_id);
425
426 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
428 self.index.exec_algorithms.insert(exec_algorithm_id);
429 }
430 }
431
432 for (position_id, position) in &self.positions {
434 let instrument_id = position.instrument_id;
435 let venue = instrument_id.venue;
436 let strategy_id = position.strategy_id;
437
438 self.index
440 .venue_positions
441 .entry(venue)
442 .or_default()
443 .insert(*position_id);
444
445 self.index
447 .position_strategy
448 .insert(*position_id, position.strategy_id);
449
450 self.index
452 .position_orders
453 .entry(*position_id)
454 .or_default()
455 .extend(position.client_order_ids().into_iter());
456
457 self.index
459 .instrument_positions
460 .entry(instrument_id)
461 .or_default()
462 .insert(*position_id);
463
464 self.index
466 .strategy_positions
467 .entry(strategy_id)
468 .or_default()
469 .insert(*position_id);
470
471 self.index.positions.insert(*position_id);
473
474 if position.is_open() {
476 self.index.positions_open.insert(*position_id);
477 }
478
479 if position.is_closed() {
481 self.index.positions_closed.insert(*position_id);
482 }
483
484 self.index.strategies.insert(strategy_id);
486 }
487 }
488
489 #[must_use]
491 pub const fn has_backing(&self) -> bool {
492 self.config.database.is_some()
493 }
494
495 #[must_use]
497 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
498 let quote = if let Some(quote) = self.quote(&position.instrument_id) {
499 quote
500 } else {
501 log::warn!(
502 "Cannot calculate unrealized PnL for {}, no quotes for {}",
503 position.id,
504 position.instrument_id
505 );
506 return None;
507 };
508
509 let last = match position.side {
510 PositionSide::Flat | PositionSide::NoPositionSide => {
511 return Some(Money::new(0.0, position.settlement_currency));
512 }
513 PositionSide::Long => quote.ask_price,
514 PositionSide::Short => quote.bid_price,
515 };
516
517 Some(position.unrealized_pnl(last))
518 }
519
520 #[must_use]
529 pub fn check_integrity(&mut self) -> bool {
530 let mut error_count = 0;
531 let failure = "Integrity failure";
532
533 let timestamp_us = SystemTime::now()
535 .duration_since(UNIX_EPOCH)
536 .expect("Time went backwards")
537 .as_micros();
538
539 log::info!("Checking data integrity");
540
541 for account_id in self.accounts.keys() {
543 if !self
544 .index
545 .venue_account
546 .contains_key(&account_id.get_issuer())
547 {
548 log::error!(
549 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
550 );
551 error_count += 1;
552 }
553 }
554
555 for (client_order_id, order) in &self.orders {
556 if !self.index.order_strategy.contains_key(client_order_id) {
557 log::error!(
558 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
559 );
560 error_count += 1;
561 }
562 if !self.index.orders.contains(client_order_id) {
563 log::error!(
564 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
565 );
566 error_count += 1;
567 }
568 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
569 log::error!(
570 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
571 );
572 error_count += 1;
573 }
574 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
575 log::error!(
576 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
577 );
578 error_count += 1;
579 }
580 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
581 log::error!(
582 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
583 );
584 error_count += 1;
585 }
586 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
587 if !self
588 .index
589 .exec_algorithm_orders
590 .contains_key(&exec_algorithm_id)
591 {
592 log::error!(
593 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
594 );
595 error_count += 1;
596 }
597 if order.exec_spawn_id().is_none()
598 && !self.index.exec_spawn_orders.contains_key(client_order_id)
599 {
600 log::error!(
601 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
602 );
603 error_count += 1;
604 }
605 }
606 }
607
608 for (position_id, position) in &self.positions {
609 if !self.index.position_strategy.contains_key(position_id) {
610 log::error!(
611 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
612 );
613 error_count += 1;
614 }
615 if !self.index.position_orders.contains_key(position_id) {
616 log::error!(
617 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
618 );
619 error_count += 1;
620 }
621 if !self.index.positions.contains(position_id) {
622 log::error!(
623 "{failure} in positions: {position_id} not found in `self.index.positions`",
624 );
625 error_count += 1;
626 }
627 if position.is_open() && !self.index.positions_open.contains(position_id) {
628 log::error!(
629 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
630 );
631 error_count += 1;
632 }
633 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
634 log::error!(
635 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
636 );
637 error_count += 1;
638 }
639 }
640
641 for account_id in self.index.venue_account.values() {
643 if !self.accounts.contains_key(account_id) {
644 log::error!(
645 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
646 );
647 error_count += 1;
648 }
649 }
650
651 for client_order_id in self.index.venue_order_ids.values() {
652 if !self.orders.contains_key(client_order_id) {
653 log::error!(
654 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
655 );
656 error_count += 1;
657 }
658 }
659
660 for client_order_id in self.index.client_order_ids.keys() {
661 if !self.orders.contains_key(client_order_id) {
662 log::error!(
663 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
664 );
665 error_count += 1;
666 }
667 }
668
669 for client_order_id in self.index.order_position.keys() {
670 if !self.orders.contains_key(client_order_id) {
671 log::error!(
672 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
673 );
674 error_count += 1;
675 }
676 }
677
678 for client_order_id in self.index.order_strategy.keys() {
680 if !self.orders.contains_key(client_order_id) {
681 log::error!(
682 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
683 );
684 error_count += 1;
685 }
686 }
687
688 for position_id in self.index.position_strategy.keys() {
689 if !self.positions.contains_key(position_id) {
690 log::error!(
691 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
692 );
693 error_count += 1;
694 }
695 }
696
697 for position_id in self.index.position_orders.keys() {
698 if !self.positions.contains_key(position_id) {
699 log::error!(
700 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
701 );
702 error_count += 1;
703 }
704 }
705
706 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
707 for client_order_id in client_order_ids {
708 if !self.orders.contains_key(client_order_id) {
709 log::error!(
710 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
711 );
712 error_count += 1;
713 }
714 }
715 }
716
717 for instrument_id in self.index.instrument_positions.keys() {
718 if !self.index.instrument_orders.contains_key(instrument_id) {
719 log::error!(
720 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
721 );
722 error_count += 1;
723 }
724 }
725
726 for client_order_ids in self.index.strategy_orders.values() {
727 for client_order_id in client_order_ids {
728 if !self.orders.contains_key(client_order_id) {
729 log::error!(
730 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
731 );
732 error_count += 1;
733 }
734 }
735 }
736
737 for position_ids in self.index.strategy_positions.values() {
738 for position_id in position_ids {
739 if !self.positions.contains_key(position_id) {
740 log::error!(
741 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
742 );
743 error_count += 1;
744 }
745 }
746 }
747
748 for client_order_id in &self.index.orders {
749 if !self.orders.contains_key(client_order_id) {
750 log::error!(
751 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
752 );
753 error_count += 1;
754 }
755 }
756
757 for client_order_id in &self.index.orders_emulated {
758 if !self.orders.contains_key(client_order_id) {
759 log::error!(
760 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
761 );
762 error_count += 1;
763 }
764 }
765
766 for client_order_id in &self.index.orders_inflight {
767 if !self.orders.contains_key(client_order_id) {
768 log::error!(
769 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
770 );
771 error_count += 1;
772 }
773 }
774
775 for client_order_id in &self.index.orders_open {
776 if !self.orders.contains_key(client_order_id) {
777 log::error!(
778 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
779 );
780 error_count += 1;
781 }
782 }
783
784 for client_order_id in &self.index.orders_closed {
785 if !self.orders.contains_key(client_order_id) {
786 log::error!(
787 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
788 );
789 error_count += 1;
790 }
791 }
792
793 for position_id in &self.index.positions {
794 if !self.positions.contains_key(position_id) {
795 log::error!(
796 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
797 );
798 error_count += 1;
799 }
800 }
801
802 for position_id in &self.index.positions_open {
803 if !self.positions.contains_key(position_id) {
804 log::error!(
805 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
806 );
807 error_count += 1;
808 }
809 }
810
811 for position_id in &self.index.positions_closed {
812 if !self.positions.contains_key(position_id) {
813 log::error!(
814 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
815 );
816 error_count += 1;
817 }
818 }
819
820 for strategy_id in &self.index.strategies {
821 if !self.index.strategy_orders.contains_key(strategy_id) {
822 log::error!(
823 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
824 );
825 error_count += 1;
826 }
827 }
828
829 for exec_algorithm_id in &self.index.exec_algorithms {
830 if !self
831 .index
832 .exec_algorithm_orders
833 .contains_key(exec_algorithm_id)
834 {
835 log::error!(
836 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
837 );
838 error_count += 1;
839 }
840 }
841
842 let total_us = SystemTime::now()
843 .duration_since(UNIX_EPOCH)
844 .expect("Time went backwards")
845 .as_micros()
846 - timestamp_us;
847
848 if error_count == 0 {
849 log::info!("Integrity check passed in {total_us}μs");
850 true
851 } else {
852 log::error!(
853 "Integrity check failed with {error_count} error{} in {total_us}μs",
854 if error_count == 1 { "" } else { "s" },
855 );
856 false
857 }
858 }
859
860 #[must_use]
864 pub fn check_residuals(&self) -> bool {
865 log::debug!("Checking residuals");
866
867 let mut residuals = false;
868
869 for order in self.orders_open(None, None, None, None) {
871 residuals = true;
872 log::warn!("Residual {order:?}");
873 }
874
875 for position in self.positions_open(None, None, None, None) {
877 residuals = true;
878 log::warn!("Residual {position}");
879 }
880
881 residuals
882 }
883
884 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
890 log::debug!(
891 "Purging closed orders{}",
892 if buffer_secs > 0 {
893 format!(" with buffer_secs={buffer_secs}")
894 } else {
895 String::new()
896 }
897 );
898
899 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
900
901 'outer: for client_order_id in self.index.orders_closed.clone() {
902 if let Some(order) = self.orders.get(&client_order_id)
903 && order.is_closed()
904 && let Some(ts_closed) = order.ts_closed()
905 && ts_closed + buffer_ns <= ts_now
906 {
907 if let Some(linked_order_ids) = order.linked_order_ids() {
909 for linked_order_id in linked_order_ids {
910 if let Some(linked_order) = self.orders.get(linked_order_id)
911 && linked_order.is_open()
912 {
913 continue 'outer;
915 }
916 }
917 }
918
919 self.purge_order(client_order_id);
920 }
921 }
922 }
923
924 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
926 log::debug!(
927 "Purging closed positions{}",
928 if buffer_secs > 0 {
929 format!(" with buffer_secs={buffer_secs}")
930 } else {
931 String::new()
932 }
933 );
934
935 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
936
937 for position_id in self.index.positions_closed.clone() {
938 if let Some(position) = self.positions.get(&position_id)
939 && position.is_closed()
940 && let Some(ts_closed) = position.ts_closed
941 && ts_closed + buffer_ns <= ts_now
942 {
943 self.purge_position(position_id);
944 }
945 }
946 }
947
948 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
952 let order = self.orders.get(&client_order_id).cloned();
954
955 if let Some(ref ord) = order
957 && ord.is_open()
958 {
959 log::warn!("Order {client_order_id} found open when purging, skipping purge");
960 return;
961 }
962
963 if let Some(ref ord) = order {
965 self.orders.remove(&client_order_id);
967
968 if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
970 {
971 venue_orders.remove(&client_order_id);
972 }
973
974 if let Some(venue_order_id) = ord.venue_order_id() {
976 self.index.venue_order_ids.remove(&venue_order_id);
977 }
978
979 if let Some(instrument_orders) =
981 self.index.instrument_orders.get_mut(&ord.instrument_id())
982 {
983 instrument_orders.remove(&client_order_id);
984 }
985
986 if let Some(position_id) = ord.position_id()
988 && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
989 {
990 position_orders.remove(&client_order_id);
991 }
992
993 if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
995 && let Some(exec_algorithm_orders) =
996 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
997 {
998 exec_algorithm_orders.remove(&client_order_id);
999 }
1000
1001 if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1003 strategy_orders.remove(&client_order_id);
1004 if strategy_orders.is_empty() {
1005 self.index.strategy_orders.remove(&ord.strategy_id());
1006 }
1007 }
1008
1009 if let Some(exec_spawn_id) = ord.exec_spawn_id()
1011 && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1012 {
1013 spawn_orders.remove(&client_order_id);
1014 if spawn_orders.is_empty() {
1015 self.index.exec_spawn_orders.remove(&exec_spawn_id);
1016 }
1017 }
1018
1019 log::info!("Purged order {client_order_id}");
1020 } else {
1021 log::warn!("Order {client_order_id} not found when purging");
1022 }
1023
1024 self.index.order_position.remove(&client_order_id);
1026 let strategy_id = self.index.order_strategy.remove(&client_order_id);
1027 self.index.order_client.remove(&client_order_id);
1028 self.index.client_order_ids.remove(&client_order_id);
1029
1030 if let Some(strategy_id) = strategy_id
1032 && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1033 {
1034 strategy_orders.remove(&client_order_id);
1035 if strategy_orders.is_empty() {
1036 self.index.strategy_orders.remove(&strategy_id);
1037 }
1038 }
1039
1040 self.index.exec_spawn_orders.remove(&client_order_id);
1042
1043 self.index.orders.remove(&client_order_id);
1044 self.index.orders_closed.remove(&client_order_id);
1045 self.index.orders_emulated.remove(&client_order_id);
1046 self.index.orders_inflight.remove(&client_order_id);
1047 self.index.orders_pending_cancel.remove(&client_order_id);
1048 }
1049
1050 pub fn purge_position(&mut self, position_id: PositionId) {
1054 let position = self.positions.get(&position_id).cloned();
1056
1057 if let Some(ref pos) = position
1059 && pos.is_open()
1060 {
1061 log::warn!("Position {position_id} found open when purging, skipping purge");
1062 return;
1063 }
1064
1065 if let Some(ref pos) = position {
1067 self.positions.remove(&position_id);
1068
1069 if let Some(venue_positions) =
1071 self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1072 {
1073 venue_positions.remove(&position_id);
1074 }
1075
1076 if let Some(instrument_positions) =
1078 self.index.instrument_positions.get_mut(&pos.instrument_id)
1079 {
1080 instrument_positions.remove(&position_id);
1081 }
1082
1083 if let Some(strategy_positions) =
1085 self.index.strategy_positions.get_mut(&pos.strategy_id)
1086 {
1087 strategy_positions.remove(&position_id);
1088 }
1089
1090 for client_order_id in pos.client_order_ids() {
1092 self.index.order_position.remove(&client_order_id);
1093 }
1094
1095 log::info!("Purged position {position_id}");
1096 } else {
1097 log::warn!("Position {position_id} not found when purging");
1098 }
1099
1100 self.index.position_strategy.remove(&position_id);
1102 self.index.position_orders.remove(&position_id);
1103 self.index.positions.remove(&position_id);
1104 self.index.positions_open.remove(&position_id);
1105 self.index.positions_closed.remove(&position_id);
1106
1107 self.position_snapshots.remove(&position_id);
1109 }
1110
1111 pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1116 log::debug!(
1117 "Purging account events{}",
1118 if lookback_secs > 0 {
1119 format!(" with lookback_secs={lookback_secs}")
1120 } else {
1121 String::new()
1122 }
1123 );
1124
1125 for account in self.accounts.values_mut() {
1126 let event_count = account.event_count();
1127 account.purge_account_events(ts_now, lookback_secs);
1128 let count_diff = event_count - account.event_count();
1129 if count_diff > 0 {
1130 log::info!(
1131 "Purged {} event(s) from account {}",
1132 count_diff,
1133 account.id()
1134 );
1135 }
1136 }
1137 }
1138
1139 pub fn clear_index(&mut self) {
1141 self.index.clear();
1142 log::debug!("Cleared index");
1143 }
1144
1145 pub fn reset(&mut self) {
1149 log::debug!("Resetting cache");
1150
1151 self.general.clear();
1152 self.currencies.clear();
1153 self.instruments.clear();
1154 self.synthetics.clear();
1155 self.books.clear();
1156 self.own_books.clear();
1157 self.quotes.clear();
1158 self.trades.clear();
1159 self.mark_xrates.clear();
1160 self.mark_prices.clear();
1161 self.index_prices.clear();
1162 self.bars.clear();
1163 self.accounts.clear();
1164 self.orders.clear();
1165 self.order_lists.clear();
1166 self.positions.clear();
1167 self.position_snapshots.clear();
1168 self.greeks.clear();
1169 self.yield_curves.clear();
1170
1171 #[cfg(feature = "defi")]
1172 {
1173 self.defi.pools.clear();
1174 self.defi.pool_profilers.clear();
1175 }
1176
1177 self.clear_index();
1178
1179 log::info!("Reset cache");
1180 }
1181
1182 pub fn dispose(&mut self) {
1186 if let Some(database) = &mut self.database
1187 && let Err(e) = database.close()
1188 {
1189 log::error!("Failed to close database during dispose: {e}");
1190 }
1191 }
1192
1193 pub fn flush_db(&mut self) {
1197 if let Some(database) = &mut self.database
1198 && let Err(e) = database.flush()
1199 {
1200 log::error!("Failed to flush database: {e}");
1201 }
1202 }
1203
1204 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1212 check_valid_string_ascii(key, stringify!(key))?;
1213 check_predicate_false(value.is_empty(), stringify!(value))?;
1214
1215 log::debug!("Adding general {key}");
1216 self.general.insert(key.to_string(), value.clone());
1217
1218 if let Some(database) = &mut self.database {
1219 database.add(key.to_string(), value)?;
1220 }
1221 Ok(())
1222 }
1223
1224 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1230 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1231
1232 if self.config.save_market_data
1233 && let Some(database) = &mut self.database
1234 {
1235 database.add_order_book(&book)?;
1236 }
1237
1238 self.books.insert(book.instrument_id, book);
1239 Ok(())
1240 }
1241
1242 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1248 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1249
1250 self.own_books.insert(own_book.instrument_id, own_book);
1251 Ok(())
1252 }
1253
1254 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1260 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1261
1262 if self.config.save_market_data {
1263 }
1265
1266 let mark_prices_deque = self
1267 .mark_prices
1268 .entry(mark_price.instrument_id)
1269 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1270 mark_prices_deque.push_front(mark_price);
1271 Ok(())
1272 }
1273
1274 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1280 log::debug!(
1281 "Adding `IndexPriceUpdate` for {}",
1282 index_price.instrument_id
1283 );
1284
1285 if self.config.save_market_data {
1286 }
1288
1289 let index_prices_deque = self
1290 .index_prices
1291 .entry(index_price.instrument_id)
1292 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1293 index_prices_deque.push_front(index_price);
1294 Ok(())
1295 }
1296
1297 pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1303 log::debug!(
1304 "Adding `FundingRateUpdate` for {}",
1305 funding_rate.instrument_id
1306 );
1307
1308 if self.config.save_market_data {
1309 }
1311
1312 self.funding_rates
1313 .insert(funding_rate.instrument_id, funding_rate);
1314 Ok(())
1315 }
1316
1317 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1323 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1324
1325 if self.config.save_market_data
1326 && let Some(database) = &mut self.database
1327 {
1328 database.add_quote("e)?;
1329 }
1330
1331 let quotes_deque = self
1332 .quotes
1333 .entry(quote.instrument_id)
1334 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1335 quotes_deque.push_front(quote);
1336 Ok(())
1337 }
1338
1339 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1345 check_slice_not_empty(quotes, stringify!(quotes))?;
1346
1347 let instrument_id = quotes[0].instrument_id;
1348 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1349
1350 if self.config.save_market_data
1351 && let Some(database) = &mut self.database
1352 {
1353 for quote in quotes {
1354 database.add_quote(quote)?;
1355 }
1356 }
1357
1358 let quotes_deque = self
1359 .quotes
1360 .entry(instrument_id)
1361 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1362
1363 for quote in quotes {
1364 quotes_deque.push_front(*quote);
1365 }
1366 Ok(())
1367 }
1368
1369 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1375 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1376
1377 if self.config.save_market_data
1378 && let Some(database) = &mut self.database
1379 {
1380 database.add_trade(&trade)?;
1381 }
1382
1383 let trades_deque = self
1384 .trades
1385 .entry(trade.instrument_id)
1386 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1387 trades_deque.push_front(trade);
1388 Ok(())
1389 }
1390
1391 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1397 check_slice_not_empty(trades, stringify!(trades))?;
1398
1399 let instrument_id = trades[0].instrument_id;
1400 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1401
1402 if self.config.save_market_data
1403 && let Some(database) = &mut self.database
1404 {
1405 for trade in trades {
1406 database.add_trade(trade)?;
1407 }
1408 }
1409
1410 let trades_deque = self
1411 .trades
1412 .entry(instrument_id)
1413 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1414
1415 for trade in trades {
1416 trades_deque.push_front(*trade);
1417 }
1418 Ok(())
1419 }
1420
1421 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1427 log::debug!("Adding `Bar` {}", bar.bar_type);
1428
1429 if self.config.save_market_data
1430 && let Some(database) = &mut self.database
1431 {
1432 database.add_bar(&bar)?;
1433 }
1434
1435 let bars = self
1436 .bars
1437 .entry(bar.bar_type)
1438 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1439 bars.push_front(bar);
1440 Ok(())
1441 }
1442
1443 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1449 check_slice_not_empty(bars, stringify!(bars))?;
1450
1451 let bar_type = bars[0].bar_type;
1452 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1453
1454 if self.config.save_market_data
1455 && let Some(database) = &mut self.database
1456 {
1457 for bar in bars {
1458 database.add_bar(bar)?;
1459 }
1460 }
1461
1462 let bars_deque = self
1463 .bars
1464 .entry(bar_type)
1465 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1466
1467 for bar in bars {
1468 bars_deque.push_front(*bar);
1469 }
1470 Ok(())
1471 }
1472
1473 pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1479 log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1480
1481 if self.config.save_market_data
1482 && let Some(_database) = &mut self.database
1483 {
1484 }
1486
1487 self.greeks.insert(greeks.instrument_id, greeks);
1488 Ok(())
1489 }
1490
1491 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1493 self.greeks.get(instrument_id).cloned()
1494 }
1495
1496 pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1502 log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1503
1504 if self.config.save_market_data
1505 && let Some(_database) = &mut self.database
1506 {
1507 }
1509
1510 self.yield_curves
1511 .insert(yield_curve.curve_name.clone(), yield_curve);
1512 Ok(())
1513 }
1514
1515 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1517 self.yield_curves.get(key).map(|curve| {
1518 let curve_clone = curve.clone();
1519 Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1520 as Box<dyn Fn(f64) -> f64>
1521 })
1522 }
1523
1524 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1530 log::debug!("Adding `Currency` {}", currency.code);
1531
1532 if let Some(database) = &mut self.database {
1533 database.add_currency(¤cy)?;
1534 }
1535
1536 self.currencies.insert(currency.code, currency);
1537 Ok(())
1538 }
1539
1540 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1546 log::debug!("Adding `Instrument` {}", instrument.id());
1547
1548 if let Some(database) = &mut self.database {
1549 database.add_instrument(&instrument)?;
1550 }
1551
1552 self.instruments.insert(instrument.id(), instrument);
1553 Ok(())
1554 }
1555
1556 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1562 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1563
1564 if let Some(database) = &mut self.database {
1565 database.add_synthetic(&synthetic)?;
1566 }
1567
1568 self.synthetics.insert(synthetic.id, synthetic);
1569 Ok(())
1570 }
1571
1572 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1578 log::debug!("Adding `Account` {}", account.id());
1579
1580 if let Some(database) = &mut self.database {
1581 database.add_account(&account)?;
1582 }
1583
1584 let account_id = account.id();
1585 self.accounts.insert(account_id, account);
1586 self.index
1587 .venue_account
1588 .insert(account_id.get_issuer(), account_id);
1589 Ok(())
1590 }
1591
1592 pub fn add_venue_order_id(
1600 &mut self,
1601 client_order_id: &ClientOrderId,
1602 venue_order_id: &VenueOrderId,
1603 overwrite: bool,
1604 ) -> anyhow::Result<()> {
1605 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1606 && !overwrite
1607 && existing_venue_order_id != venue_order_id
1608 {
1609 anyhow::bail!(
1610 "Existing {existing_venue_order_id} for {client_order_id}
1611 did not match the given {venue_order_id}.
1612 If you are writing a test then try a different `venue_order_id`,
1613 otherwise this is probably a bug."
1614 );
1615 }
1616
1617 self.index
1618 .client_order_ids
1619 .insert(*client_order_id, *venue_order_id);
1620 self.index
1621 .venue_order_ids
1622 .insert(*venue_order_id, *client_order_id);
1623
1624 Ok(())
1625 }
1626
1627 pub fn add_order(
1639 &mut self,
1640 order: OrderAny,
1641 position_id: Option<PositionId>,
1642 client_id: Option<ClientId>,
1643 replace_existing: bool,
1644 ) -> anyhow::Result<()> {
1645 let instrument_id = order.instrument_id();
1646 let venue = instrument_id.venue;
1647 let client_order_id = order.client_order_id();
1648 let strategy_id = order.strategy_id();
1649 let exec_algorithm_id = order.exec_algorithm_id();
1650 let exec_spawn_id = order.exec_spawn_id();
1651
1652 if !replace_existing {
1653 check_key_not_in_map(
1654 &client_order_id,
1655 &self.orders,
1656 stringify!(client_order_id),
1657 stringify!(orders),
1658 )?;
1659 }
1660
1661 log::debug!("Adding {order:?}");
1662
1663 self.index.orders.insert(client_order_id);
1664 self.index
1665 .order_strategy
1666 .insert(client_order_id, strategy_id);
1667 self.index.strategies.insert(strategy_id);
1668
1669 self.index
1671 .venue_orders
1672 .entry(venue)
1673 .or_default()
1674 .insert(client_order_id);
1675
1676 self.index
1678 .instrument_orders
1679 .entry(instrument_id)
1680 .or_default()
1681 .insert(client_order_id);
1682
1683 self.index
1685 .strategy_orders
1686 .entry(strategy_id)
1687 .or_default()
1688 .insert(client_order_id);
1689
1690 if let Some(exec_algorithm_id) = exec_algorithm_id {
1692 self.index.exec_algorithms.insert(exec_algorithm_id);
1693
1694 self.index
1695 .exec_algorithm_orders
1696 .entry(exec_algorithm_id)
1697 .or_default()
1698 .insert(client_order_id);
1699 }
1700
1701 if let Some(exec_spawn_id) = exec_spawn_id {
1703 self.index
1704 .exec_spawn_orders
1705 .entry(exec_spawn_id)
1706 .or_default()
1707 .insert(client_order_id);
1708 }
1709
1710 match order.emulation_trigger() {
1712 Some(_) => {
1713 self.index.orders_emulated.remove(&client_order_id);
1714 }
1715 None => {
1716 self.index.orders_emulated.insert(client_order_id);
1717 }
1718 }
1719
1720 if let Some(position_id) = position_id {
1722 self.add_position_id(
1723 &position_id,
1724 &order.instrument_id().venue,
1725 &client_order_id,
1726 &strategy_id,
1727 )?;
1728 }
1729
1730 if let Some(client_id) = client_id {
1732 self.index.order_client.insert(client_order_id, client_id);
1733 log::debug!("Indexed {client_id:?}");
1734 }
1735
1736 if let Some(database) = &mut self.database {
1737 database.add_order(&order, client_id)?;
1738 }
1743
1744 self.orders.insert(client_order_id, order);
1745
1746 Ok(())
1747 }
1748
1749 pub fn add_position_id(
1755 &mut self,
1756 position_id: &PositionId,
1757 venue: &Venue,
1758 client_order_id: &ClientOrderId,
1759 strategy_id: &StrategyId,
1760 ) -> anyhow::Result<()> {
1761 self.index
1762 .order_position
1763 .insert(*client_order_id, *position_id);
1764
1765 if let Some(database) = &mut self.database {
1767 database.index_order_position(*client_order_id, *position_id)?;
1768 }
1769
1770 self.index
1772 .position_strategy
1773 .insert(*position_id, *strategy_id);
1774
1775 self.index
1777 .position_orders
1778 .entry(*position_id)
1779 .or_default()
1780 .insert(*client_order_id);
1781
1782 self.index
1784 .strategy_positions
1785 .entry(*strategy_id)
1786 .or_default()
1787 .insert(*position_id);
1788
1789 self.index
1791 .venue_positions
1792 .entry(*venue)
1793 .or_default()
1794 .insert(*position_id);
1795
1796 Ok(())
1797 }
1798
1799 pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1805 self.positions.insert(position.id, position.clone());
1806 self.index.positions.insert(position.id);
1807 self.index.positions_open.insert(position.id);
1808 self.index.positions_closed.remove(&position.id); log::debug!("Adding {position}");
1811
1812 self.add_position_id(
1813 &position.id,
1814 &position.instrument_id.venue,
1815 &position.opening_order_id,
1816 &position.strategy_id,
1817 )?;
1818
1819 let venue = position.instrument_id.venue;
1820 let venue_positions = self.index.venue_positions.entry(venue).or_default();
1821 venue_positions.insert(position.id);
1822
1823 let instrument_id = position.instrument_id;
1825 let instrument_positions = self
1826 .index
1827 .instrument_positions
1828 .entry(instrument_id)
1829 .or_default();
1830 instrument_positions.insert(position.id);
1831
1832 if let Some(database) = &mut self.database {
1833 database.add_position(&position)?;
1834 }
1843
1844 Ok(())
1845 }
1846
1847 pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1853 if let Some(database) = &mut self.database {
1854 database.update_account(&account)?;
1855 }
1856 Ok(())
1857 }
1858
1859 pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1865 let client_order_id = order.client_order_id();
1866
1867 if let Some(venue_order_id) = order.venue_order_id() {
1869 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1872 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1874 }
1875 }
1876
1877 if order.is_inflight() {
1879 self.index.orders_inflight.insert(client_order_id);
1880 } else {
1881 self.index.orders_inflight.remove(&client_order_id);
1882 }
1883
1884 if order.is_open() {
1886 self.index.orders_closed.remove(&client_order_id);
1887 self.index.orders_open.insert(client_order_id);
1888 } else if order.is_closed() {
1889 self.index.orders_open.remove(&client_order_id);
1890 self.index.orders_pending_cancel.remove(&client_order_id);
1891 self.index.orders_closed.insert(client_order_id);
1892 }
1893
1894 if let Some(emulation_trigger) = order.emulation_trigger() {
1896 match emulation_trigger {
1897 TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1898 _ => self.index.orders_emulated.insert(client_order_id),
1899 };
1900 }
1901
1902 if self.own_order_book(&order.instrument_id()).is_some()
1904 && should_handle_own_book_order(order)
1905 {
1906 self.update_own_order_book(order);
1907 }
1908
1909 if let Some(database) = &mut self.database {
1910 database.update_order(order.last_event())?;
1911 }
1916
1917 self.orders.insert(client_order_id, order.clone());
1919
1920 Ok(())
1921 }
1922
1923 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1925 self.index
1926 .orders_pending_cancel
1927 .insert(order.client_order_id());
1928 }
1929
1930 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1936 if position.is_open() {
1939 self.index.positions_open.insert(position.id);
1940 self.index.positions_closed.remove(&position.id);
1941 } else {
1942 self.index.positions_closed.insert(position.id);
1943 self.index.positions_open.remove(&position.id);
1944 }
1945
1946 if let Some(database) = &mut self.database {
1947 database.update_position(position)?;
1948 }
1953
1954 self.positions.insert(position.id, position.clone());
1955
1956 Ok(())
1957 }
1958
1959 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1966 let position_id = position.id;
1967
1968 let mut copied_position = position.clone();
1969 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1970 copied_position.id = PositionId::new(new_id);
1971
1972 let position_serialized = serde_json::to_vec(&copied_position)?;
1974
1975 let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1976 let new_snapshots = match snapshots {
1977 Some(existing_snapshots) => {
1978 let mut combined = existing_snapshots.to_vec();
1979 combined.extend(position_serialized);
1980 Bytes::from(combined)
1981 }
1982 None => Bytes::from(position_serialized),
1983 };
1984 self.position_snapshots.insert(position_id, new_snapshots);
1985
1986 log::debug!("Snapshot {copied_position}");
1987 Ok(())
1988 }
1989
1990 pub fn snapshot_position_state(
1996 &mut self,
1997 position: &Position,
1998 open_only: Option<bool>,
2001 ) -> anyhow::Result<()> {
2002 let open_only = open_only.unwrap_or(true);
2003
2004 if open_only && !position.is_open() {
2005 return Ok(());
2006 }
2007
2008 if let Some(database) = &mut self.database {
2009 database.snapshot_position_state(position).map_err(|e| {
2010 log::error!(
2011 "Failed to snapshot position state for {}: {e:?}",
2012 position.id
2013 );
2014 e
2015 })?;
2016 } else {
2017 log::warn!(
2018 "Cannot snapshot position state for {} (no database configured)",
2019 position.id
2020 );
2021 }
2022
2023 todo!()
2025 }
2026
2027 #[must_use]
2029 pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2030 if self.index.position_strategy.contains_key(position_id) {
2032 Some(OmsType::Netting)
2035 } else {
2036 None
2037 }
2038 }
2039
2040 #[must_use]
2042 pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
2043 self.position_snapshots.get(position_id).map(|b| b.to_vec())
2044 }
2045
2046 #[must_use]
2048 pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2049 let mut result = AHashSet::new();
2051 for (position_id, _) in &self.position_snapshots {
2052 if let Some(position) = self.positions.get(position_id)
2054 && position.instrument_id == *instrument_id
2055 {
2056 result.insert(*position_id);
2057 }
2058 }
2059 result
2060 }
2061
2062 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2068 let database = if let Some(database) = &self.database {
2069 database
2070 } else {
2071 log::warn!(
2072 "Cannot snapshot order state for {} (no database configured)",
2073 order.client_order_id()
2074 );
2075 return Ok(());
2076 };
2077
2078 database.snapshot_order_state(order)
2079 }
2080
2081 fn build_order_query_filter_set(
2084 &self,
2085 venue: Option<&Venue>,
2086 instrument_id: Option<&InstrumentId>,
2087 strategy_id: Option<&StrategyId>,
2088 ) -> Option<AHashSet<ClientOrderId>> {
2089 let mut query: Option<AHashSet<ClientOrderId>> = None;
2090
2091 if let Some(venue) = venue {
2092 query = Some(
2093 self.index
2094 .venue_orders
2095 .get(venue)
2096 .cloned()
2097 .unwrap_or_default(),
2098 );
2099 }
2100
2101 if let Some(instrument_id) = instrument_id {
2102 let instrument_orders = self
2103 .index
2104 .instrument_orders
2105 .get(instrument_id)
2106 .cloned()
2107 .unwrap_or_default();
2108
2109 if let Some(existing_query) = &mut query {
2110 *existing_query = existing_query
2111 .intersection(&instrument_orders)
2112 .copied()
2113 .collect();
2114 } else {
2115 query = Some(instrument_orders);
2116 }
2117 }
2118
2119 if let Some(strategy_id) = strategy_id {
2120 let strategy_orders = self
2121 .index
2122 .strategy_orders
2123 .get(strategy_id)
2124 .cloned()
2125 .unwrap_or_default();
2126
2127 if let Some(existing_query) = &mut query {
2128 *existing_query = existing_query
2129 .intersection(&strategy_orders)
2130 .copied()
2131 .collect();
2132 } else {
2133 query = Some(strategy_orders);
2134 }
2135 }
2136
2137 query
2138 }
2139
2140 fn build_position_query_filter_set(
2141 &self,
2142 venue: Option<&Venue>,
2143 instrument_id: Option<&InstrumentId>,
2144 strategy_id: Option<&StrategyId>,
2145 ) -> Option<AHashSet<PositionId>> {
2146 let mut query: Option<AHashSet<PositionId>> = None;
2147
2148 if let Some(venue) = venue {
2149 query = Some(
2150 self.index
2151 .venue_positions
2152 .get(venue)
2153 .cloned()
2154 .unwrap_or_default(),
2155 );
2156 }
2157
2158 if let Some(instrument_id) = instrument_id {
2159 let instrument_positions = self
2160 .index
2161 .instrument_positions
2162 .get(instrument_id)
2163 .cloned()
2164 .unwrap_or_default();
2165
2166 if let Some(existing_query) = query {
2167 query = Some(
2168 existing_query
2169 .intersection(&instrument_positions)
2170 .copied()
2171 .collect(),
2172 );
2173 } else {
2174 query = Some(instrument_positions);
2175 }
2176 }
2177
2178 if let Some(strategy_id) = strategy_id {
2179 let strategy_positions = self
2180 .index
2181 .strategy_positions
2182 .get(strategy_id)
2183 .cloned()
2184 .unwrap_or_default();
2185
2186 if let Some(existing_query) = query {
2187 query = Some(
2188 existing_query
2189 .intersection(&strategy_positions)
2190 .copied()
2191 .collect(),
2192 );
2193 } else {
2194 query = Some(strategy_positions);
2195 }
2196 }
2197
2198 query
2199 }
2200
2201 fn get_orders_for_ids(
2207 &self,
2208 client_order_ids: &AHashSet<ClientOrderId>,
2209 side: Option<OrderSide>,
2210 ) -> Vec<&OrderAny> {
2211 let side = side.unwrap_or(OrderSide::NoOrderSide);
2212 let mut orders = Vec::new();
2213
2214 for client_order_id in client_order_ids {
2215 let order = self
2216 .orders
2217 .get(client_order_id)
2218 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2219 if side == OrderSide::NoOrderSide || side == order.order_side() {
2220 orders.push(order);
2221 }
2222 }
2223
2224 orders
2225 }
2226
2227 fn get_positions_for_ids(
2233 &self,
2234 position_ids: &AHashSet<PositionId>,
2235 side: Option<PositionSide>,
2236 ) -> Vec<&Position> {
2237 let side = side.unwrap_or(PositionSide::NoPositionSide);
2238 let mut positions = Vec::new();
2239
2240 for position_id in position_ids {
2241 let position = self
2242 .positions
2243 .get(position_id)
2244 .unwrap_or_else(|| panic!("Position {position_id} not found"));
2245 if side == PositionSide::NoPositionSide || side == position.side {
2246 positions.push(position);
2247 }
2248 }
2249
2250 positions
2251 }
2252
2253 #[must_use]
2255 pub fn client_order_ids(
2256 &self,
2257 venue: Option<&Venue>,
2258 instrument_id: Option<&InstrumentId>,
2259 strategy_id: Option<&StrategyId>,
2260 ) -> AHashSet<ClientOrderId> {
2261 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2262 match query {
2263 Some(query) => self.index.orders.intersection(&query).copied().collect(),
2264 None => self.index.orders.clone(),
2265 }
2266 }
2267
2268 #[must_use]
2270 pub fn client_order_ids_open(
2271 &self,
2272 venue: Option<&Venue>,
2273 instrument_id: Option<&InstrumentId>,
2274 strategy_id: Option<&StrategyId>,
2275 ) -> AHashSet<ClientOrderId> {
2276 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2277 match query {
2278 Some(query) => self
2279 .index
2280 .orders_open
2281 .intersection(&query)
2282 .copied()
2283 .collect(),
2284 None => self.index.orders_open.clone(),
2285 }
2286 }
2287
2288 #[must_use]
2290 pub fn client_order_ids_closed(
2291 &self,
2292 venue: Option<&Venue>,
2293 instrument_id: Option<&InstrumentId>,
2294 strategy_id: Option<&StrategyId>,
2295 ) -> AHashSet<ClientOrderId> {
2296 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2297 match query {
2298 Some(query) => self
2299 .index
2300 .orders_closed
2301 .intersection(&query)
2302 .copied()
2303 .collect(),
2304 None => self.index.orders_closed.clone(),
2305 }
2306 }
2307
2308 #[must_use]
2310 pub fn client_order_ids_emulated(
2311 &self,
2312 venue: Option<&Venue>,
2313 instrument_id: Option<&InstrumentId>,
2314 strategy_id: Option<&StrategyId>,
2315 ) -> AHashSet<ClientOrderId> {
2316 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2317 match query {
2318 Some(query) => self
2319 .index
2320 .orders_emulated
2321 .intersection(&query)
2322 .copied()
2323 .collect(),
2324 None => self.index.orders_emulated.clone(),
2325 }
2326 }
2327
2328 #[must_use]
2330 pub fn client_order_ids_inflight(
2331 &self,
2332 venue: Option<&Venue>,
2333 instrument_id: Option<&InstrumentId>,
2334 strategy_id: Option<&StrategyId>,
2335 ) -> AHashSet<ClientOrderId> {
2336 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2337 match query {
2338 Some(query) => self
2339 .index
2340 .orders_inflight
2341 .intersection(&query)
2342 .copied()
2343 .collect(),
2344 None => self.index.orders_inflight.clone(),
2345 }
2346 }
2347
2348 #[must_use]
2350 pub fn position_ids(
2351 &self,
2352 venue: Option<&Venue>,
2353 instrument_id: Option<&InstrumentId>,
2354 strategy_id: Option<&StrategyId>,
2355 ) -> AHashSet<PositionId> {
2356 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2357 match query {
2358 Some(query) => self.index.positions.intersection(&query).copied().collect(),
2359 None => self.index.positions.clone(),
2360 }
2361 }
2362
2363 #[must_use]
2365 pub fn position_open_ids(
2366 &self,
2367 venue: Option<&Venue>,
2368 instrument_id: Option<&InstrumentId>,
2369 strategy_id: Option<&StrategyId>,
2370 ) -> AHashSet<PositionId> {
2371 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2372 match query {
2373 Some(query) => self
2374 .index
2375 .positions_open
2376 .intersection(&query)
2377 .copied()
2378 .collect(),
2379 None => self.index.positions_open.clone(),
2380 }
2381 }
2382
2383 #[must_use]
2385 pub fn position_closed_ids(
2386 &self,
2387 venue: Option<&Venue>,
2388 instrument_id: Option<&InstrumentId>,
2389 strategy_id: Option<&StrategyId>,
2390 ) -> AHashSet<PositionId> {
2391 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2392 match query {
2393 Some(query) => self
2394 .index
2395 .positions_closed
2396 .intersection(&query)
2397 .copied()
2398 .collect(),
2399 None => self.index.positions_closed.clone(),
2400 }
2401 }
2402
2403 #[must_use]
2405 pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2406 self.index.actors.clone()
2407 }
2408
2409 #[must_use]
2411 pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2412 self.index.strategies.clone()
2413 }
2414
2415 #[must_use]
2417 pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2418 self.index.exec_algorithms.clone()
2419 }
2420
2421 #[must_use]
2425 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2426 self.orders.get(client_order_id)
2427 }
2428
2429 #[must_use]
2431 pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2432 self.orders.get_mut(client_order_id)
2433 }
2434
2435 #[must_use]
2437 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2438 self.index.venue_order_ids.get(venue_order_id)
2439 }
2440
2441 #[must_use]
2443 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2444 self.index.client_order_ids.get(client_order_id)
2445 }
2446
2447 #[must_use]
2449 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2450 self.index.order_client.get(client_order_id)
2451 }
2452
2453 #[must_use]
2455 pub fn orders(
2456 &self,
2457 venue: Option<&Venue>,
2458 instrument_id: Option<&InstrumentId>,
2459 strategy_id: Option<&StrategyId>,
2460 side: Option<OrderSide>,
2461 ) -> Vec<&OrderAny> {
2462 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2463 self.get_orders_for_ids(&client_order_ids, side)
2464 }
2465
2466 #[must_use]
2468 pub fn orders_open(
2469 &self,
2470 venue: Option<&Venue>,
2471 instrument_id: Option<&InstrumentId>,
2472 strategy_id: Option<&StrategyId>,
2473 side: Option<OrderSide>,
2474 ) -> Vec<&OrderAny> {
2475 let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2476 self.get_orders_for_ids(&client_order_ids, side)
2477 }
2478
2479 #[must_use]
2481 pub fn orders_closed(
2482 &self,
2483 venue: Option<&Venue>,
2484 instrument_id: Option<&InstrumentId>,
2485 strategy_id: Option<&StrategyId>,
2486 side: Option<OrderSide>,
2487 ) -> Vec<&OrderAny> {
2488 let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2489 self.get_orders_for_ids(&client_order_ids, side)
2490 }
2491
2492 #[must_use]
2494 pub fn orders_emulated(
2495 &self,
2496 venue: Option<&Venue>,
2497 instrument_id: Option<&InstrumentId>,
2498 strategy_id: Option<&StrategyId>,
2499 side: Option<OrderSide>,
2500 ) -> Vec<&OrderAny> {
2501 let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2502 self.get_orders_for_ids(&client_order_ids, side)
2503 }
2504
2505 #[must_use]
2507 pub fn orders_inflight(
2508 &self,
2509 venue: Option<&Venue>,
2510 instrument_id: Option<&InstrumentId>,
2511 strategy_id: Option<&StrategyId>,
2512 side: Option<OrderSide>,
2513 ) -> Vec<&OrderAny> {
2514 let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2515 self.get_orders_for_ids(&client_order_ids, side)
2516 }
2517
2518 #[must_use]
2520 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2521 let client_order_ids = self.index.position_orders.get(position_id);
2522 match client_order_ids {
2523 Some(client_order_ids) => {
2524 self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2525 }
2526 None => Vec::new(),
2527 }
2528 }
2529
2530 #[must_use]
2532 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2533 self.index.orders.contains(client_order_id)
2534 }
2535
2536 #[must_use]
2538 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2539 self.index.orders_open.contains(client_order_id)
2540 }
2541
2542 #[must_use]
2544 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2545 self.index.orders_closed.contains(client_order_id)
2546 }
2547
2548 #[must_use]
2550 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2551 self.index.orders_emulated.contains(client_order_id)
2552 }
2553
2554 #[must_use]
2556 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2557 self.index.orders_inflight.contains(client_order_id)
2558 }
2559
2560 #[must_use]
2562 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2563 self.index.orders_pending_cancel.contains(client_order_id)
2564 }
2565
2566 #[must_use]
2568 pub fn orders_open_count(
2569 &self,
2570 venue: Option<&Venue>,
2571 instrument_id: Option<&InstrumentId>,
2572 strategy_id: Option<&StrategyId>,
2573 side: Option<OrderSide>,
2574 ) -> usize {
2575 self.orders_open(venue, instrument_id, strategy_id, side)
2576 .len()
2577 }
2578
2579 #[must_use]
2581 pub fn orders_closed_count(
2582 &self,
2583 venue: Option<&Venue>,
2584 instrument_id: Option<&InstrumentId>,
2585 strategy_id: Option<&StrategyId>,
2586 side: Option<OrderSide>,
2587 ) -> usize {
2588 self.orders_closed(venue, instrument_id, strategy_id, side)
2589 .len()
2590 }
2591
2592 #[must_use]
2594 pub fn orders_emulated_count(
2595 &self,
2596 venue: Option<&Venue>,
2597 instrument_id: Option<&InstrumentId>,
2598 strategy_id: Option<&StrategyId>,
2599 side: Option<OrderSide>,
2600 ) -> usize {
2601 self.orders_emulated(venue, instrument_id, strategy_id, side)
2602 .len()
2603 }
2604
2605 #[must_use]
2607 pub fn orders_inflight_count(
2608 &self,
2609 venue: Option<&Venue>,
2610 instrument_id: Option<&InstrumentId>,
2611 strategy_id: Option<&StrategyId>,
2612 side: Option<OrderSide>,
2613 ) -> usize {
2614 self.orders_inflight(venue, instrument_id, strategy_id, side)
2615 .len()
2616 }
2617
2618 #[must_use]
2620 pub fn orders_total_count(
2621 &self,
2622 venue: Option<&Venue>,
2623 instrument_id: Option<&InstrumentId>,
2624 strategy_id: Option<&StrategyId>,
2625 side: Option<OrderSide>,
2626 ) -> usize {
2627 self.orders(venue, instrument_id, strategy_id, side).len()
2628 }
2629
2630 #[must_use]
2632 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2633 self.order_lists.get(order_list_id)
2634 }
2635
2636 #[must_use]
2638 pub fn order_lists(
2639 &self,
2640 venue: Option<&Venue>,
2641 instrument_id: Option<&InstrumentId>,
2642 strategy_id: Option<&StrategyId>,
2643 ) -> Vec<&OrderList> {
2644 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2645
2646 if let Some(venue) = venue {
2647 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2648 }
2649
2650 if let Some(instrument_id) = instrument_id {
2651 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2652 }
2653
2654 if let Some(strategy_id) = strategy_id {
2655 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2656 }
2657
2658 order_lists
2659 }
2660
2661 #[must_use]
2663 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2664 self.order_lists.contains_key(order_list_id)
2665 }
2666
2667 #[must_use]
2672 pub fn orders_for_exec_algorithm(
2673 &self,
2674 exec_algorithm_id: &ExecAlgorithmId,
2675 venue: Option<&Venue>,
2676 instrument_id: Option<&InstrumentId>,
2677 strategy_id: Option<&StrategyId>,
2678 side: Option<OrderSide>,
2679 ) -> Vec<&OrderAny> {
2680 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2681 let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2682
2683 if let Some(query) = query
2684 && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
2685 {
2686 let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2687 }
2688
2689 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2690 self.get_orders_for_ids(exec_algorithm_order_ids, side)
2691 } else {
2692 Vec::new()
2693 }
2694 }
2695
2696 #[must_use]
2698 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2699 self.get_orders_for_ids(
2700 self.index
2701 .exec_spawn_orders
2702 .get(exec_spawn_id)
2703 .unwrap_or(&AHashSet::new()),
2704 None,
2705 )
2706 }
2707
2708 #[must_use]
2710 pub fn exec_spawn_total_quantity(
2711 &self,
2712 exec_spawn_id: &ClientOrderId,
2713 active_only: bool,
2714 ) -> Option<Quantity> {
2715 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2716
2717 let mut total_quantity: Option<Quantity> = None;
2718
2719 for spawn_order in exec_spawn_orders {
2720 if !active_only || !spawn_order.is_closed() {
2721 if let Some(mut total_quantity) = total_quantity {
2722 total_quantity += spawn_order.quantity();
2723 }
2724 } else {
2725 total_quantity = Some(spawn_order.quantity());
2726 }
2727 }
2728
2729 total_quantity
2730 }
2731
2732 #[must_use]
2734 pub fn exec_spawn_total_filled_qty(
2735 &self,
2736 exec_spawn_id: &ClientOrderId,
2737 active_only: bool,
2738 ) -> Option<Quantity> {
2739 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2740
2741 let mut total_quantity: Option<Quantity> = None;
2742
2743 for spawn_order in exec_spawn_orders {
2744 if !active_only || !spawn_order.is_closed() {
2745 if let Some(mut total_quantity) = total_quantity {
2746 total_quantity += spawn_order.filled_qty();
2747 }
2748 } else {
2749 total_quantity = Some(spawn_order.filled_qty());
2750 }
2751 }
2752
2753 total_quantity
2754 }
2755
2756 #[must_use]
2758 pub fn exec_spawn_total_leaves_qty(
2759 &self,
2760 exec_spawn_id: &ClientOrderId,
2761 active_only: bool,
2762 ) -> Option<Quantity> {
2763 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2764
2765 let mut total_quantity: Option<Quantity> = None;
2766
2767 for spawn_order in exec_spawn_orders {
2768 if !active_only || !spawn_order.is_closed() {
2769 if let Some(mut total_quantity) = total_quantity {
2770 total_quantity += spawn_order.leaves_qty();
2771 }
2772 } else {
2773 total_quantity = Some(spawn_order.leaves_qty());
2774 }
2775 }
2776
2777 total_quantity
2778 }
2779
2780 #[must_use]
2784 pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2785 self.positions.get(position_id)
2786 }
2787
2788 #[must_use]
2790 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2791 self.index
2792 .order_position
2793 .get(client_order_id)
2794 .and_then(|position_id| self.positions.get(position_id))
2795 }
2796
2797 #[must_use]
2799 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2800 self.index.order_position.get(client_order_id)
2801 }
2802
2803 #[must_use]
2805 pub fn positions(
2806 &self,
2807 venue: Option<&Venue>,
2808 instrument_id: Option<&InstrumentId>,
2809 strategy_id: Option<&StrategyId>,
2810 side: Option<PositionSide>,
2811 ) -> Vec<&Position> {
2812 let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2813 self.get_positions_for_ids(&position_ids, side)
2814 }
2815
2816 #[must_use]
2818 pub fn positions_open(
2819 &self,
2820 venue: Option<&Venue>,
2821 instrument_id: Option<&InstrumentId>,
2822 strategy_id: Option<&StrategyId>,
2823 side: Option<PositionSide>,
2824 ) -> Vec<&Position> {
2825 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2826 self.get_positions_for_ids(&position_ids, side)
2827 }
2828
2829 #[must_use]
2831 pub fn positions_closed(
2832 &self,
2833 venue: Option<&Venue>,
2834 instrument_id: Option<&InstrumentId>,
2835 strategy_id: Option<&StrategyId>,
2836 side: Option<PositionSide>,
2837 ) -> Vec<&Position> {
2838 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2839 self.get_positions_for_ids(&position_ids, side)
2840 }
2841
2842 #[must_use]
2844 pub fn position_exists(&self, position_id: &PositionId) -> bool {
2845 self.index.positions.contains(position_id)
2846 }
2847
2848 #[must_use]
2850 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2851 self.index.positions_open.contains(position_id)
2852 }
2853
2854 #[must_use]
2856 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2857 self.index.positions_closed.contains(position_id)
2858 }
2859
2860 #[must_use]
2862 pub fn positions_open_count(
2863 &self,
2864 venue: Option<&Venue>,
2865 instrument_id: Option<&InstrumentId>,
2866 strategy_id: Option<&StrategyId>,
2867 side: Option<PositionSide>,
2868 ) -> usize {
2869 self.positions_open(venue, instrument_id, strategy_id, side)
2870 .len()
2871 }
2872
2873 #[must_use]
2875 pub fn positions_closed_count(
2876 &self,
2877 venue: Option<&Venue>,
2878 instrument_id: Option<&InstrumentId>,
2879 strategy_id: Option<&StrategyId>,
2880 side: Option<PositionSide>,
2881 ) -> usize {
2882 self.positions_closed(venue, instrument_id, strategy_id, side)
2883 .len()
2884 }
2885
2886 #[must_use]
2888 pub fn positions_total_count(
2889 &self,
2890 venue: Option<&Venue>,
2891 instrument_id: Option<&InstrumentId>,
2892 strategy_id: Option<&StrategyId>,
2893 side: Option<PositionSide>,
2894 ) -> usize {
2895 self.positions(venue, instrument_id, strategy_id, side)
2896 .len()
2897 }
2898
2899 #[must_use]
2903 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2904 self.index.order_strategy.get(client_order_id)
2905 }
2906
2907 #[must_use]
2909 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2910 self.index.position_strategy.get(position_id)
2911 }
2912
2913 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2921 check_valid_string_ascii(key, stringify!(key))?;
2922
2923 Ok(self.general.get(key))
2924 }
2925
2926 #[must_use]
2930 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2931 match price_type {
2932 PriceType::Bid => self
2933 .quotes
2934 .get(instrument_id)
2935 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2936 PriceType::Ask => self
2937 .quotes
2938 .get(instrument_id)
2939 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2940 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2941 quotes.front().map(|quote| {
2942 Price::new(
2943 f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2944 quote.bid_price.precision + 1,
2945 )
2946 })
2947 }),
2948 PriceType::Last => self
2949 .trades
2950 .get(instrument_id)
2951 .and_then(|trades| trades.front().map(|trade| trade.price)),
2952 PriceType::Mark => self
2953 .mark_prices
2954 .get(instrument_id)
2955 .and_then(|marks| marks.front().map(|mark| mark.value)),
2956 }
2957 }
2958
2959 #[must_use]
2961 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2962 self.quotes
2963 .get(instrument_id)
2964 .map(|quotes| quotes.iter().copied().collect())
2965 }
2966
2967 #[must_use]
2969 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2970 self.trades
2971 .get(instrument_id)
2972 .map(|trades| trades.iter().copied().collect())
2973 }
2974
2975 #[must_use]
2977 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
2978 self.mark_prices
2979 .get(instrument_id)
2980 .map(|mark_prices| mark_prices.iter().copied().collect())
2981 }
2982
2983 #[must_use]
2985 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
2986 self.index_prices
2987 .get(instrument_id)
2988 .map(|index_prices| index_prices.iter().copied().collect())
2989 }
2990
2991 #[must_use]
2993 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2994 self.bars
2995 .get(bar_type)
2996 .map(|bars| bars.iter().copied().collect())
2997 }
2998
2999 #[must_use]
3001 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3002 self.books.get(instrument_id)
3003 }
3004
3005 #[must_use]
3007 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3008 self.books.get_mut(instrument_id)
3009 }
3010
3011 #[must_use]
3013 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3014 self.own_books.get(instrument_id)
3015 }
3016
3017 #[must_use]
3019 pub fn own_order_book_mut(
3020 &mut self,
3021 instrument_id: &InstrumentId,
3022 ) -> Option<&mut OwnOrderBook> {
3023 self.own_books.get_mut(instrument_id)
3024 }
3025
3026 #[must_use]
3028 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3029 self.quotes
3030 .get(instrument_id)
3031 .and_then(|quotes| quotes.front())
3032 }
3033
3034 #[must_use]
3036 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3037 self.trades
3038 .get(instrument_id)
3039 .and_then(|trades| trades.front())
3040 }
3041
3042 #[must_use]
3044 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3045 self.mark_prices
3046 .get(instrument_id)
3047 .and_then(|mark_prices| mark_prices.front())
3048 }
3049
3050 #[must_use]
3052 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3053 self.index_prices
3054 .get(instrument_id)
3055 .and_then(|index_prices| index_prices.front())
3056 }
3057
3058 #[must_use]
3060 pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3061 self.funding_rates.get(instrument_id)
3062 }
3063
3064 #[must_use]
3066 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3067 self.bars.get(bar_type).and_then(|bars| bars.front())
3068 }
3069
3070 #[must_use]
3072 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3073 self.books
3074 .get(instrument_id)
3075 .map_or(0, |book| book.update_count) as usize
3076 }
3077
3078 #[must_use]
3080 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3081 self.quotes
3082 .get(instrument_id)
3083 .map_or(0, std::collections::VecDeque::len)
3084 }
3085
3086 #[must_use]
3088 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3089 self.trades
3090 .get(instrument_id)
3091 .map_or(0, std::collections::VecDeque::len)
3092 }
3093
3094 #[must_use]
3096 pub fn bar_count(&self, bar_type: &BarType) -> usize {
3097 self.bars
3098 .get(bar_type)
3099 .map_or(0, std::collections::VecDeque::len)
3100 }
3101
3102 #[must_use]
3104 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3105 self.books.contains_key(instrument_id)
3106 }
3107
3108 #[must_use]
3110 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3111 self.quote_count(instrument_id) > 0
3112 }
3113
3114 #[must_use]
3116 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3117 self.trade_count(instrument_id) > 0
3118 }
3119
3120 #[must_use]
3122 pub fn has_bars(&self, bar_type: &BarType) -> bool {
3123 self.bar_count(bar_type) > 0
3124 }
3125
3126 #[must_use]
3127 pub fn get_xrate(
3128 &self,
3129 venue: Venue,
3130 from_currency: Currency,
3131 to_currency: Currency,
3132 price_type: PriceType,
3133 ) -> Option<f64> {
3134 if from_currency == to_currency {
3135 return Some(1.0);
3138 }
3139
3140 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3141
3142 match get_exchange_rate(
3143 from_currency.code,
3144 to_currency.code,
3145 price_type,
3146 bid_quote,
3147 ask_quote,
3148 ) {
3149 Ok(rate) => rate,
3150 Err(e) => {
3151 log::error!("Failed to calculate xrate: {e}");
3152 None
3153 }
3154 }
3155 }
3156
3157 fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3158 let mut bid_quotes = AHashMap::new();
3159 let mut ask_quotes = AHashMap::new();
3160
3161 for instrument_id in self.instruments.keys() {
3162 if instrument_id.venue != *venue {
3163 continue;
3164 }
3165
3166 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3167 if let Some(tick) = ticks.front() {
3168 (tick.bid_price, tick.ask_price)
3169 } else {
3170 continue; }
3172 } else {
3173 let bid_bar = self
3174 .bars
3175 .iter()
3176 .find(|(k, _)| {
3177 k.instrument_id() == *instrument_id
3178 && matches!(k.spec().price_type, PriceType::Bid)
3179 })
3180 .map(|(_, v)| v);
3181
3182 let ask_bar = self
3183 .bars
3184 .iter()
3185 .find(|(k, _)| {
3186 k.instrument_id() == *instrument_id
3187 && matches!(k.spec().price_type, PriceType::Ask)
3188 })
3189 .map(|(_, v)| v);
3190
3191 match (bid_bar, ask_bar) {
3192 (Some(bid), Some(ask)) => {
3193 match (bid.front(), ask.front()) {
3194 (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
3195 _ => {
3196 continue;
3198 }
3199 }
3200 }
3201 _ => continue,
3202 }
3203 };
3204
3205 bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3206 ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3207 }
3208
3209 (bid_quotes, ask_quotes)
3210 }
3211
3212 #[must_use]
3214 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3215 self.mark_xrates.get(&(from_currency, to_currency)).copied()
3216 }
3217
3218 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3224 assert!(xrate > 0.0, "xrate was zero");
3225 self.mark_xrates.insert((from_currency, to_currency), xrate);
3226 self.mark_xrates
3227 .insert((to_currency, from_currency), 1.0 / xrate);
3228 }
3229
3230 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3232 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3233 }
3234
3235 pub fn clear_mark_xrates(&mut self) {
3237 self.mark_xrates.clear();
3238 }
3239
3240 #[must_use]
3244 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3245 self.instruments.get(instrument_id)
3246 }
3247
3248 #[must_use]
3250 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3251 match venue {
3252 Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3253 None => self.instruments.keys().collect(),
3254 }
3255 }
3256
3257 #[must_use]
3259 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3260 self.instruments
3261 .values()
3262 .filter(|i| &i.id().venue == venue)
3263 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3264 .collect()
3265 }
3266
3267 #[must_use]
3269 pub fn bar_types(
3270 &self,
3271 instrument_id: Option<&InstrumentId>,
3272 price_type: Option<&PriceType>,
3273 aggregation_source: AggregationSource,
3274 ) -> Vec<&BarType> {
3275 let mut bar_types = self
3276 .bars
3277 .keys()
3278 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3279 .collect::<Vec<&BarType>>();
3280
3281 if let Some(instrument_id) = instrument_id {
3282 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3283 }
3284
3285 if let Some(price_type) = price_type {
3286 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3287 }
3288
3289 bar_types
3290 }
3291
3292 #[must_use]
3296 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3297 self.synthetics.get(instrument_id)
3298 }
3299
3300 #[must_use]
3302 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3303 self.synthetics.keys().collect()
3304 }
3305
3306 #[must_use]
3308 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3309 self.synthetics.values().collect()
3310 }
3311
3312 #[must_use]
3316 pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3317 self.accounts.get(account_id)
3318 }
3319
3320 #[must_use]
3322 pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3323 self.index
3324 .venue_account
3325 .get(venue)
3326 .and_then(|account_id| self.accounts.get(account_id))
3327 }
3328
3329 #[must_use]
3331 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3332 self.index.venue_account.get(venue)
3333 }
3334
3335 #[must_use]
3337 pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3338 self.accounts
3339 .values()
3340 .filter(|account| &account.id() == account_id)
3341 .collect()
3342 }
3343
3344 pub fn update_own_order_book(&mut self, order: &OrderAny) {
3352 if !order.has_price() {
3353 return;
3354 }
3355
3356 let instrument_id = order.instrument_id();
3357
3358 let own_book = self
3359 .own_books
3360 .entry(instrument_id)
3361 .or_insert_with(|| OwnOrderBook::new(instrument_id));
3362
3363 let own_book_order = order.to_own_book_order();
3364
3365 if order.is_closed() {
3366 if let Err(e) = own_book.delete(own_book_order) {
3367 log::debug!(
3368 "Failed to delete order {} from own book: {e}",
3369 order.client_order_id(),
3370 );
3371 } else {
3372 log::debug!("Deleted order {} from own book", order.client_order_id());
3373 }
3374 } else {
3375 if let Err(e) = own_book.update(own_book_order) {
3377 log::debug!(
3378 "Failed to update order {} in own book: {e}; inserting instead",
3379 order.client_order_id(),
3380 );
3381 own_book.add(own_book_order);
3382 }
3383 log::debug!("Updated order {} in own book", order.client_order_id());
3384 }
3385 }
3386
3387 pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
3393 let order = match self.orders.get(client_order_id) {
3394 Some(order) => order,
3395 None => return,
3396 };
3397
3398 self.index.orders_open.remove(client_order_id);
3399 self.index.orders_pending_cancel.remove(client_order_id);
3400 self.index.orders_inflight.remove(client_order_id);
3401 self.index.orders_emulated.remove(client_order_id);
3402
3403 if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
3404 && order.has_price()
3405 {
3406 let own_book_order = order.to_own_book_order();
3407 if let Err(e) = own_book.delete(own_book_order) {
3408 log::debug!("Could not force delete {client_order_id} from own book: {e}");
3409 } else {
3410 log::debug!("Force deleted {client_order_id} from own book");
3411 }
3412 }
3413
3414 self.index.orders_closed.insert(*client_order_id);
3415 }
3416
3417 pub fn audit_own_order_books(&mut self) {
3424 log::debug!("Starting own books audit");
3425 let start = std::time::Instant::now();
3426
3427 let valid_order_ids: AHashSet<ClientOrderId> = self
3430 .index
3431 .orders_open
3432 .union(&self.index.orders_inflight)
3433 .copied()
3434 .collect();
3435
3436 for own_book in self.own_books.values_mut() {
3437 own_book.audit_open_orders(&valid_order_ids);
3438 }
3439
3440 log::debug!("Completed own books audit in {:?}", start.elapsed());
3441 }
3442}