1pub mod config;
21pub mod database;
22pub mod quote;
23
24mod index;
25
26#[cfg(test)]
27mod tests;
28
29use std::{
30 collections::VecDeque,
31 fmt::Debug,
32 time::{SystemTime, UNIX_EPOCH},
33};
34
35use ahash::{AHashMap, AHashSet};
36use bytes::Bytes;
37pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
39use index::CacheIndex;
40use nautilus_core::{
41 UUID4, UnixNanos,
42 correctness::{
43 check_key_not_in_map, check_predicate_false, check_slice_not_empty,
44 check_valid_string_ascii,
45 },
46 datetime::secs_to_nanos_unchecked,
47};
48use nautilus_model::{
49 accounts::{Account, AccountAny},
50 data::{
51 Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
52 TradeTick, YieldCurveData,
53 },
54 enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
55 identifiers::{
56 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
57 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
58 },
59 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
60 orderbook::{
61 OrderBook,
62 own::{OwnOrderBook, should_handle_own_book_order},
63 },
64 orders::{Order, OrderAny, OrderList},
65 position::Position,
66 types::{Currency, Money, Price, Quantity},
67};
68use ustr::Ustr;
69
70use crate::xrate::get_exchange_rate;
71
72#[cfg_attr(
74 feature = "python",
75 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
76)]
77pub struct Cache {
78 config: CacheConfig,
79 index: CacheIndex,
80 database: Option<Box<dyn CacheDatabaseAdapter>>,
81 general: AHashMap<String, Bytes>,
82 currencies: AHashMap<Ustr, Currency>,
83 instruments: AHashMap<InstrumentId, InstrumentAny>,
84 synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
85 books: AHashMap<InstrumentId, OrderBook>,
86 own_books: AHashMap<InstrumentId, OwnOrderBook>,
87 quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
88 trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
89 mark_xrates: AHashMap<(Currency, Currency), f64>,
90 mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
91 index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
92 funding_rates: AHashMap<InstrumentId, FundingRateUpdate>,
93 bars: AHashMap<BarType, VecDeque<Bar>>,
94 greeks: AHashMap<InstrumentId, GreeksData>,
95 yield_curves: AHashMap<String, YieldCurveData>,
96 accounts: AHashMap<AccountId, AccountAny>,
97 orders: AHashMap<ClientOrderId, OrderAny>,
98 order_lists: AHashMap<OrderListId, OrderList>,
99 positions: AHashMap<PositionId, Position>,
100 position_snapshots: AHashMap<PositionId, Bytes>,
101 #[cfg(feature = "defi")]
102 pub(crate) defi: crate::defi::cache::DefiCache,
103}
104
105impl Debug for Cache {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 f.debug_struct(stringify!(Cache))
108 .field("config", &self.config)
109 .field("index", &self.index)
110 .field("general", &self.general)
111 .field("currencies", &self.currencies)
112 .field("instruments", &self.instruments)
113 .field("synthetics", &self.synthetics)
114 .field("books", &self.books)
115 .field("own_books", &self.own_books)
116 .field("quotes", &self.quotes)
117 .field("trades", &self.trades)
118 .field("mark_xrates", &self.mark_xrates)
119 .field("mark_prices", &self.mark_prices)
120 .field("index_prices", &self.index_prices)
121 .field("funding_rates", &self.funding_rates)
122 .field("bars", &self.bars)
123 .field("greeks", &self.greeks)
124 .field("yield_curves", &self.yield_curves)
125 .field("accounts", &self.accounts)
126 .field("orders", &self.orders)
127 .field("order_lists", &self.order_lists)
128 .field("positions", &self.positions)
129 .field("position_snapshots", &self.position_snapshots)
130 .finish()
131 }
132}
133
134impl Default for Cache {
135 fn default() -> Self {
137 Self::new(Some(CacheConfig::default()), None)
138 }
139}
140
141impl Cache {
142 #[must_use]
144 pub fn new(
148 config: Option<CacheConfig>,
149 database: Option<Box<dyn CacheDatabaseAdapter>>,
150 ) -> Self {
151 Self {
152 config: config.unwrap_or_default(),
153 index: CacheIndex::default(),
154 database,
155 general: AHashMap::new(),
156 currencies: AHashMap::new(),
157 instruments: AHashMap::new(),
158 synthetics: AHashMap::new(),
159 books: AHashMap::new(),
160 own_books: AHashMap::new(),
161 quotes: AHashMap::new(),
162 trades: AHashMap::new(),
163 mark_xrates: AHashMap::new(),
164 mark_prices: AHashMap::new(),
165 index_prices: AHashMap::new(),
166 funding_rates: AHashMap::new(),
167 bars: AHashMap::new(),
168 greeks: AHashMap::new(),
169 yield_curves: AHashMap::new(),
170 accounts: AHashMap::new(),
171 orders: AHashMap::new(),
172 order_lists: AHashMap::new(),
173 positions: AHashMap::new(),
174 position_snapshots: AHashMap::new(),
175 #[cfg(feature = "defi")]
176 defi: crate::defi::cache::DefiCache::default(),
177 }
178 }
179
180 #[must_use]
182 pub fn memory_address(&self) -> String {
183 format!("{:?}", std::ptr::from_ref(self))
184 }
185
186 pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
190 let type_name = std::any::type_name_of_val(&*database);
191 log::info!("Cache database adapter set: {type_name}");
192 self.database = Some(database);
193 }
194
195 pub fn cache_general(&mut self) -> anyhow::Result<()> {
203 self.general = match &mut self.database {
204 Some(db) => db.load()?,
205 None => AHashMap::new(),
206 };
207
208 log::info!(
209 "Cached {} general object(s) from database",
210 self.general.len()
211 );
212 Ok(())
213 }
214
215 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
221 let cache_map = match &self.database {
222 Some(db) => db.load_all().await?,
223 None => CacheMap::default(),
224 };
225
226 self.currencies = cache_map.currencies;
227 self.instruments = cache_map.instruments;
228 self.synthetics = cache_map.synthetics;
229 self.accounts = cache_map.accounts;
230 self.orders = cache_map.orders;
231 self.positions = cache_map.positions;
232 Ok(())
233 }
234
235 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
241 self.currencies = match &mut self.database {
242 Some(db) => db.load_currencies().await?,
243 None => AHashMap::new(),
244 };
245
246 log::info!("Cached {} currencies from database", self.general.len());
247 Ok(())
248 }
249
250 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
256 self.instruments = match &mut self.database {
257 Some(db) => db.load_instruments().await?,
258 None => AHashMap::new(),
259 };
260
261 log::info!("Cached {} instruments from database", self.general.len());
262 Ok(())
263 }
264
265 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
271 self.synthetics = match &mut self.database {
272 Some(db) => db.load_synthetics().await?,
273 None => AHashMap::new(),
274 };
275
276 log::info!(
277 "Cached {} synthetic instruments from database",
278 self.general.len()
279 );
280 Ok(())
281 }
282
283 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
289 self.accounts = match &mut self.database {
290 Some(db) => db.load_accounts().await?,
291 None => AHashMap::new(),
292 };
293
294 log::info!(
295 "Cached {} synthetic instruments from database",
296 self.general.len()
297 );
298 Ok(())
299 }
300
301 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
307 self.orders = match &mut self.database {
308 Some(db) => db.load_orders().await?,
309 None => AHashMap::new(),
310 };
311
312 log::info!("Cached {} orders from database", self.general.len());
313 Ok(())
314 }
315
316 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
322 self.positions = match &mut self.database {
323 Some(db) => db.load_positions().await?,
324 None => AHashMap::new(),
325 };
326
327 log::info!("Cached {} positions from database", self.general.len());
328 Ok(())
329 }
330
331 pub fn build_index(&mut self) {
333 log::debug!("Building index");
334
335 for account_id in self.accounts.keys() {
337 self.index
338 .venue_account
339 .insert(account_id.get_issuer(), *account_id);
340 }
341
342 for (client_order_id, order) in &self.orders {
344 let instrument_id = order.instrument_id();
345 let venue = instrument_id.venue;
346 let strategy_id = order.strategy_id();
347
348 self.index
350 .venue_orders
351 .entry(venue)
352 .or_default()
353 .insert(*client_order_id);
354
355 if let Some(venue_order_id) = order.venue_order_id() {
357 self.index
358 .venue_order_ids
359 .insert(venue_order_id, *client_order_id);
360 }
361
362 if let Some(position_id) = order.position_id() {
364 self.index
365 .order_position
366 .insert(*client_order_id, position_id);
367 }
368
369 self.index
371 .order_strategy
372 .insert(*client_order_id, order.strategy_id());
373
374 self.index
376 .instrument_orders
377 .entry(instrument_id)
378 .or_default()
379 .insert(*client_order_id);
380
381 self.index
383 .strategy_orders
384 .entry(strategy_id)
385 .or_default()
386 .insert(*client_order_id);
387
388 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
390 self.index
391 .exec_algorithm_orders
392 .entry(exec_algorithm_id)
393 .or_default()
394 .insert(*client_order_id);
395 }
396
397 if let Some(exec_spawn_id) = order.exec_spawn_id() {
399 self.index
400 .exec_spawn_orders
401 .entry(exec_spawn_id)
402 .or_default()
403 .insert(*client_order_id);
404 }
405
406 self.index.orders.insert(*client_order_id);
408
409 if order.is_open() {
411 self.index.orders_open.insert(*client_order_id);
412 }
413
414 if order.is_closed() {
416 self.index.orders_closed.insert(*client_order_id);
417 }
418
419 if let Some(emulation_trigger) = order.emulation_trigger()
421 && emulation_trigger != TriggerType::NoTrigger
422 && !order.is_closed()
423 {
424 self.index.orders_emulated.insert(*client_order_id);
425 }
426
427 if order.is_inflight() {
429 self.index.orders_inflight.insert(*client_order_id);
430 }
431
432 self.index.strategies.insert(strategy_id);
434
435 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
437 self.index.exec_algorithms.insert(exec_algorithm_id);
438 }
439 }
440
441 for (position_id, position) in &self.positions {
443 let instrument_id = position.instrument_id;
444 let venue = instrument_id.venue;
445 let strategy_id = position.strategy_id;
446
447 self.index
449 .venue_positions
450 .entry(venue)
451 .or_default()
452 .insert(*position_id);
453
454 self.index
456 .position_strategy
457 .insert(*position_id, position.strategy_id);
458
459 self.index
461 .position_orders
462 .entry(*position_id)
463 .or_default()
464 .extend(position.client_order_ids().into_iter());
465
466 self.index
468 .instrument_positions
469 .entry(instrument_id)
470 .or_default()
471 .insert(*position_id);
472
473 self.index
475 .strategy_positions
476 .entry(strategy_id)
477 .or_default()
478 .insert(*position_id);
479
480 self.index.positions.insert(*position_id);
482
483 if position.is_open() {
485 self.index.positions_open.insert(*position_id);
486 }
487
488 if position.is_closed() {
490 self.index.positions_closed.insert(*position_id);
491 }
492
493 self.index.strategies.insert(strategy_id);
495 }
496 }
497
498 #[must_use]
500 pub const fn has_backing(&self) -> bool {
501 self.config.database.is_some()
502 }
503
504 #[must_use]
506 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
507 let quote = if let Some(quote) = self.quote(&position.instrument_id) {
508 quote
509 } else {
510 log::warn!(
511 "Cannot calculate unrealized PnL for {}, no quotes for {}",
512 position.id,
513 position.instrument_id
514 );
515 return None;
516 };
517
518 let last = match position.side {
520 PositionSide::Flat | PositionSide::NoPositionSide => {
521 return Some(Money::new(0.0, position.settlement_currency));
522 }
523 PositionSide::Long => quote.bid_price,
524 PositionSide::Short => quote.ask_price,
525 };
526
527 Some(position.unrealized_pnl(last))
528 }
529
530 #[must_use]
539 pub fn check_integrity(&mut self) -> bool {
540 let mut error_count = 0;
541 let failure = "Integrity failure";
542
543 let timestamp_us = SystemTime::now()
545 .duration_since(UNIX_EPOCH)
546 .expect("Time went backwards")
547 .as_micros();
548
549 log::info!("Checking data integrity");
550
551 for account_id in self.accounts.keys() {
553 if !self
554 .index
555 .venue_account
556 .contains_key(&account_id.get_issuer())
557 {
558 log::error!(
559 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
560 );
561 error_count += 1;
562 }
563 }
564
565 for (client_order_id, order) in &self.orders {
566 if !self.index.order_strategy.contains_key(client_order_id) {
567 log::error!(
568 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
569 );
570 error_count += 1;
571 }
572 if !self.index.orders.contains(client_order_id) {
573 log::error!(
574 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
575 );
576 error_count += 1;
577 }
578 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
579 log::error!(
580 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
581 );
582 error_count += 1;
583 }
584 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
585 log::error!(
586 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
587 );
588 error_count += 1;
589 }
590 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
591 log::error!(
592 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
593 );
594 error_count += 1;
595 }
596 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
597 if !self
598 .index
599 .exec_algorithm_orders
600 .contains_key(&exec_algorithm_id)
601 {
602 log::error!(
603 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
604 );
605 error_count += 1;
606 }
607 if order.exec_spawn_id().is_none()
608 && !self.index.exec_spawn_orders.contains_key(client_order_id)
609 {
610 log::error!(
611 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
612 );
613 error_count += 1;
614 }
615 }
616 }
617
618 for (position_id, position) in &self.positions {
619 if !self.index.position_strategy.contains_key(position_id) {
620 log::error!(
621 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
622 );
623 error_count += 1;
624 }
625 if !self.index.position_orders.contains_key(position_id) {
626 log::error!(
627 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
628 );
629 error_count += 1;
630 }
631 if !self.index.positions.contains(position_id) {
632 log::error!(
633 "{failure} in positions: {position_id} not found in `self.index.positions`",
634 );
635 error_count += 1;
636 }
637 if position.is_open() && !self.index.positions_open.contains(position_id) {
638 log::error!(
639 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
640 );
641 error_count += 1;
642 }
643 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
644 log::error!(
645 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
646 );
647 error_count += 1;
648 }
649 }
650
651 for account_id in self.index.venue_account.values() {
653 if !self.accounts.contains_key(account_id) {
654 log::error!(
655 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
656 );
657 error_count += 1;
658 }
659 }
660
661 for client_order_id in self.index.venue_order_ids.values() {
662 if !self.orders.contains_key(client_order_id) {
663 log::error!(
664 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
665 );
666 error_count += 1;
667 }
668 }
669
670 for client_order_id in self.index.client_order_ids.keys() {
671 if !self.orders.contains_key(client_order_id) {
672 log::error!(
673 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
674 );
675 error_count += 1;
676 }
677 }
678
679 for client_order_id in self.index.order_position.keys() {
680 if !self.orders.contains_key(client_order_id) {
681 log::error!(
682 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
683 );
684 error_count += 1;
685 }
686 }
687
688 for client_order_id in self.index.order_strategy.keys() {
690 if !self.orders.contains_key(client_order_id) {
691 log::error!(
692 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
693 );
694 error_count += 1;
695 }
696 }
697
698 for position_id in self.index.position_strategy.keys() {
699 if !self.positions.contains_key(position_id) {
700 log::error!(
701 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
702 );
703 error_count += 1;
704 }
705 }
706
707 for position_id in self.index.position_orders.keys() {
708 if !self.positions.contains_key(position_id) {
709 log::error!(
710 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
711 );
712 error_count += 1;
713 }
714 }
715
716 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
717 for client_order_id in client_order_ids {
718 if !self.orders.contains_key(client_order_id) {
719 log::error!(
720 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
721 );
722 error_count += 1;
723 }
724 }
725 }
726
727 for instrument_id in self.index.instrument_positions.keys() {
728 if !self.index.instrument_orders.contains_key(instrument_id) {
729 log::error!(
730 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
731 );
732 error_count += 1;
733 }
734 }
735
736 for client_order_ids in self.index.strategy_orders.values() {
737 for client_order_id in client_order_ids {
738 if !self.orders.contains_key(client_order_id) {
739 log::error!(
740 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
741 );
742 error_count += 1;
743 }
744 }
745 }
746
747 for position_ids in self.index.strategy_positions.values() {
748 for position_id in position_ids {
749 if !self.positions.contains_key(position_id) {
750 log::error!(
751 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
752 );
753 error_count += 1;
754 }
755 }
756 }
757
758 for client_order_id in &self.index.orders {
759 if !self.orders.contains_key(client_order_id) {
760 log::error!(
761 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
762 );
763 error_count += 1;
764 }
765 }
766
767 for client_order_id in &self.index.orders_emulated {
768 if !self.orders.contains_key(client_order_id) {
769 log::error!(
770 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
771 );
772 error_count += 1;
773 }
774 }
775
776 for client_order_id in &self.index.orders_inflight {
777 if !self.orders.contains_key(client_order_id) {
778 log::error!(
779 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
780 );
781 error_count += 1;
782 }
783 }
784
785 for client_order_id in &self.index.orders_open {
786 if !self.orders.contains_key(client_order_id) {
787 log::error!(
788 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
789 );
790 error_count += 1;
791 }
792 }
793
794 for client_order_id in &self.index.orders_closed {
795 if !self.orders.contains_key(client_order_id) {
796 log::error!(
797 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
798 );
799 error_count += 1;
800 }
801 }
802
803 for position_id in &self.index.positions {
804 if !self.positions.contains_key(position_id) {
805 log::error!(
806 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
807 );
808 error_count += 1;
809 }
810 }
811
812 for position_id in &self.index.positions_open {
813 if !self.positions.contains_key(position_id) {
814 log::error!(
815 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
816 );
817 error_count += 1;
818 }
819 }
820
821 for position_id in &self.index.positions_closed {
822 if !self.positions.contains_key(position_id) {
823 log::error!(
824 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
825 );
826 error_count += 1;
827 }
828 }
829
830 for strategy_id in &self.index.strategies {
831 if !self.index.strategy_orders.contains_key(strategy_id) {
832 log::error!(
833 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
834 );
835 error_count += 1;
836 }
837 }
838
839 for exec_algorithm_id in &self.index.exec_algorithms {
840 if !self
841 .index
842 .exec_algorithm_orders
843 .contains_key(exec_algorithm_id)
844 {
845 log::error!(
846 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
847 );
848 error_count += 1;
849 }
850 }
851
852 let total_us = SystemTime::now()
853 .duration_since(UNIX_EPOCH)
854 .expect("Time went backwards")
855 .as_micros()
856 - timestamp_us;
857
858 if error_count == 0 {
859 log::info!("Integrity check passed in {total_us}μs");
860 true
861 } else {
862 log::error!(
863 "Integrity check failed with {error_count} error{} in {total_us}μs",
864 if error_count == 1 { "" } else { "s" },
865 );
866 false
867 }
868 }
869
870 #[must_use]
874 pub fn check_residuals(&self) -> bool {
875 log::debug!("Checking residuals");
876
877 let mut residuals = false;
878
879 for order in self.orders_open(None, None, None, None) {
881 residuals = true;
882 log::warn!("Residual {order}");
883 }
884
885 for position in self.positions_open(None, None, None, None) {
887 residuals = true;
888 log::warn!("Residual {position}");
889 }
890
891 residuals
892 }
893
894 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
900 log::debug!(
901 "Purging closed orders{}",
902 if buffer_secs > 0 {
903 format!(" with buffer_secs={buffer_secs}")
904 } else {
905 String::new()
906 }
907 );
908
909 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
910
911 'outer: for client_order_id in self.index.orders_closed.clone() {
912 if let Some(order) = self.orders.get(&client_order_id)
913 && order.is_closed()
914 && let Some(ts_closed) = order.ts_closed()
915 && ts_closed + buffer_ns <= ts_now
916 {
917 if let Some(linked_order_ids) = order.linked_order_ids() {
919 for linked_order_id in linked_order_ids {
920 if let Some(linked_order) = self.orders.get(linked_order_id)
921 && linked_order.is_open()
922 {
923 continue 'outer;
925 }
926 }
927 }
928
929 self.purge_order(client_order_id);
930 }
931 }
932 }
933
934 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
936 log::debug!(
937 "Purging closed positions{}",
938 if buffer_secs > 0 {
939 format!(" with buffer_secs={buffer_secs}")
940 } else {
941 String::new()
942 }
943 );
944
945 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
946
947 for position_id in self.index.positions_closed.clone() {
948 if let Some(position) = self.positions.get(&position_id)
949 && position.is_closed()
950 && let Some(ts_closed) = position.ts_closed
951 && ts_closed + buffer_ns <= ts_now
952 {
953 self.purge_position(position_id);
954 }
955 }
956 }
957
958 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
962 let order = self.orders.get(&client_order_id).cloned();
964
965 if let Some(ref ord) = order
967 && ord.is_open()
968 {
969 log::warn!("Order {client_order_id} found open when purging, skipping purge");
970 return;
971 }
972
973 if let Some(ref ord) = order {
975 self.orders.remove(&client_order_id);
977
978 if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
980 {
981 venue_orders.remove(&client_order_id);
982 }
983
984 if let Some(venue_order_id) = ord.venue_order_id() {
986 self.index.venue_order_ids.remove(&venue_order_id);
987 }
988
989 if let Some(instrument_orders) =
991 self.index.instrument_orders.get_mut(&ord.instrument_id())
992 {
993 instrument_orders.remove(&client_order_id);
994 }
995
996 if let Some(position_id) = ord.position_id()
998 && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
999 {
1000 position_orders.remove(&client_order_id);
1001 }
1002
1003 if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
1005 && let Some(exec_algorithm_orders) =
1006 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
1007 {
1008 exec_algorithm_orders.remove(&client_order_id);
1009 }
1010
1011 if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1013 strategy_orders.remove(&client_order_id);
1014 if strategy_orders.is_empty() {
1015 self.index.strategy_orders.remove(&ord.strategy_id());
1016 }
1017 }
1018
1019 if let Some(exec_spawn_id) = ord.exec_spawn_id()
1021 && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1022 {
1023 spawn_orders.remove(&client_order_id);
1024 if spawn_orders.is_empty() {
1025 self.index.exec_spawn_orders.remove(&exec_spawn_id);
1026 }
1027 }
1028
1029 log::info!("Purged order {client_order_id}");
1030 } else {
1031 log::warn!("Order {client_order_id} not found when purging");
1032 }
1033
1034 self.index.order_position.remove(&client_order_id);
1036 let strategy_id = self.index.order_strategy.remove(&client_order_id);
1037 self.index.order_client.remove(&client_order_id);
1038 self.index.client_order_ids.remove(&client_order_id);
1039
1040 if let Some(strategy_id) = strategy_id
1042 && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1043 {
1044 strategy_orders.remove(&client_order_id);
1045 if strategy_orders.is_empty() {
1046 self.index.strategy_orders.remove(&strategy_id);
1047 }
1048 }
1049
1050 self.index.exec_spawn_orders.remove(&client_order_id);
1052
1053 self.index.orders.remove(&client_order_id);
1054 self.index.orders_closed.remove(&client_order_id);
1055 self.index.orders_emulated.remove(&client_order_id);
1056 self.index.orders_inflight.remove(&client_order_id);
1057 self.index.orders_pending_cancel.remove(&client_order_id);
1058 }
1059
1060 pub fn purge_position(&mut self, position_id: PositionId) {
1064 let position = self.positions.get(&position_id).cloned();
1066
1067 if let Some(ref pos) = position
1069 && pos.is_open()
1070 {
1071 log::warn!("Position {position_id} found open when purging, skipping purge");
1072 return;
1073 }
1074
1075 if let Some(ref pos) = position {
1077 self.positions.remove(&position_id);
1078
1079 if let Some(venue_positions) =
1081 self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1082 {
1083 venue_positions.remove(&position_id);
1084 }
1085
1086 if let Some(instrument_positions) =
1088 self.index.instrument_positions.get_mut(&pos.instrument_id)
1089 {
1090 instrument_positions.remove(&position_id);
1091 }
1092
1093 if let Some(strategy_positions) =
1095 self.index.strategy_positions.get_mut(&pos.strategy_id)
1096 {
1097 strategy_positions.remove(&position_id);
1098 }
1099
1100 for client_order_id in pos.client_order_ids() {
1102 self.index.order_position.remove(&client_order_id);
1103 }
1104
1105 log::info!("Purged position {position_id}");
1106 } else {
1107 log::warn!("Position {position_id} not found when purging");
1108 }
1109
1110 self.index.position_strategy.remove(&position_id);
1112 self.index.position_orders.remove(&position_id);
1113 self.index.positions.remove(&position_id);
1114 self.index.positions_open.remove(&position_id);
1115 self.index.positions_closed.remove(&position_id);
1116
1117 self.position_snapshots.remove(&position_id);
1119 }
1120
1121 pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1126 log::debug!(
1127 "Purging account events{}",
1128 if lookback_secs > 0 {
1129 format!(" with lookback_secs={lookback_secs}")
1130 } else {
1131 String::new()
1132 }
1133 );
1134
1135 for account in self.accounts.values_mut() {
1136 let event_count = account.event_count();
1137 account.purge_account_events(ts_now, lookback_secs);
1138 let count_diff = event_count - account.event_count();
1139 if count_diff > 0 {
1140 log::info!(
1141 "Purged {} event(s) from account {}",
1142 count_diff,
1143 account.id()
1144 );
1145 }
1146 }
1147 }
1148
1149 pub fn clear_index(&mut self) {
1151 self.index.clear();
1152 log::debug!("Cleared index");
1153 }
1154
1155 pub fn reset(&mut self) {
1159 log::debug!("Resetting cache");
1160
1161 self.general.clear();
1162 self.currencies.clear();
1163 self.instruments.clear();
1164 self.synthetics.clear();
1165 self.books.clear();
1166 self.own_books.clear();
1167 self.quotes.clear();
1168 self.trades.clear();
1169 self.mark_xrates.clear();
1170 self.mark_prices.clear();
1171 self.index_prices.clear();
1172 self.bars.clear();
1173 self.accounts.clear();
1174 self.orders.clear();
1175 self.order_lists.clear();
1176 self.positions.clear();
1177 self.position_snapshots.clear();
1178 self.greeks.clear();
1179 self.yield_curves.clear();
1180
1181 #[cfg(feature = "defi")]
1182 {
1183 self.defi.pools.clear();
1184 self.defi.pool_profilers.clear();
1185 }
1186
1187 self.clear_index();
1188
1189 log::info!("Reset cache");
1190 }
1191
1192 pub fn dispose(&mut self) {
1196 if let Some(database) = &mut self.database
1197 && let Err(e) = database.close()
1198 {
1199 log::error!("Failed to close database during dispose: {e}");
1200 }
1201 }
1202
1203 pub fn flush_db(&mut self) {
1207 if let Some(database) = &mut self.database
1208 && let Err(e) = database.flush()
1209 {
1210 log::error!("Failed to flush database: {e}");
1211 }
1212 }
1213
1214 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1222 check_valid_string_ascii(key, stringify!(key))?;
1223 check_predicate_false(value.is_empty(), stringify!(value))?;
1224
1225 log::debug!("Adding general {key}");
1226 self.general.insert(key.to_string(), value.clone());
1227
1228 if let Some(database) = &mut self.database {
1229 database.add(key.to_string(), value)?;
1230 }
1231 Ok(())
1232 }
1233
1234 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1240 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1241
1242 if self.config.save_market_data
1243 && let Some(database) = &mut self.database
1244 {
1245 database.add_order_book(&book)?;
1246 }
1247
1248 self.books.insert(book.instrument_id, book);
1249 Ok(())
1250 }
1251
1252 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1258 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1259
1260 self.own_books.insert(own_book.instrument_id, own_book);
1261 Ok(())
1262 }
1263
1264 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1270 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1271
1272 if self.config.save_market_data {
1273 }
1275
1276 let mark_prices_deque = self
1277 .mark_prices
1278 .entry(mark_price.instrument_id)
1279 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1280 mark_prices_deque.push_front(mark_price);
1281 Ok(())
1282 }
1283
1284 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1290 log::debug!(
1291 "Adding `IndexPriceUpdate` for {}",
1292 index_price.instrument_id
1293 );
1294
1295 if self.config.save_market_data {
1296 }
1298
1299 let index_prices_deque = self
1300 .index_prices
1301 .entry(index_price.instrument_id)
1302 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1303 index_prices_deque.push_front(index_price);
1304 Ok(())
1305 }
1306
1307 pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1313 log::debug!(
1314 "Adding `FundingRateUpdate` for {}",
1315 funding_rate.instrument_id
1316 );
1317
1318 if self.config.save_market_data {
1319 }
1321
1322 self.funding_rates
1323 .insert(funding_rate.instrument_id, funding_rate);
1324 Ok(())
1325 }
1326
1327 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1333 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1334
1335 if self.config.save_market_data
1336 && let Some(database) = &mut self.database
1337 {
1338 database.add_quote("e)?;
1339 }
1340
1341 let quotes_deque = self
1342 .quotes
1343 .entry(quote.instrument_id)
1344 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1345 quotes_deque.push_front(quote);
1346 Ok(())
1347 }
1348
1349 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1355 check_slice_not_empty(quotes, stringify!(quotes))?;
1356
1357 let instrument_id = quotes[0].instrument_id;
1358 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1359
1360 if self.config.save_market_data
1361 && let Some(database) = &mut self.database
1362 {
1363 for quote in quotes {
1364 database.add_quote(quote)?;
1365 }
1366 }
1367
1368 let quotes_deque = self
1369 .quotes
1370 .entry(instrument_id)
1371 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1372
1373 for quote in quotes {
1374 quotes_deque.push_front(*quote);
1375 }
1376 Ok(())
1377 }
1378
1379 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1385 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1386
1387 if self.config.save_market_data
1388 && let Some(database) = &mut self.database
1389 {
1390 database.add_trade(&trade)?;
1391 }
1392
1393 let trades_deque = self
1394 .trades
1395 .entry(trade.instrument_id)
1396 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1397 trades_deque.push_front(trade);
1398 Ok(())
1399 }
1400
1401 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1407 check_slice_not_empty(trades, stringify!(trades))?;
1408
1409 let instrument_id = trades[0].instrument_id;
1410 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1411
1412 if self.config.save_market_data
1413 && let Some(database) = &mut self.database
1414 {
1415 for trade in trades {
1416 database.add_trade(trade)?;
1417 }
1418 }
1419
1420 let trades_deque = self
1421 .trades
1422 .entry(instrument_id)
1423 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1424
1425 for trade in trades {
1426 trades_deque.push_front(*trade);
1427 }
1428 Ok(())
1429 }
1430
1431 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1437 log::debug!("Adding `Bar` {}", bar.bar_type);
1438
1439 if self.config.save_market_data
1440 && let Some(database) = &mut self.database
1441 {
1442 database.add_bar(&bar)?;
1443 }
1444
1445 let bars = self
1446 .bars
1447 .entry(bar.bar_type)
1448 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1449 bars.push_front(bar);
1450 Ok(())
1451 }
1452
1453 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1459 check_slice_not_empty(bars, stringify!(bars))?;
1460
1461 let bar_type = bars[0].bar_type;
1462 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1463
1464 if self.config.save_market_data
1465 && let Some(database) = &mut self.database
1466 {
1467 for bar in bars {
1468 database.add_bar(bar)?;
1469 }
1470 }
1471
1472 let bars_deque = self
1473 .bars
1474 .entry(bar_type)
1475 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1476
1477 for bar in bars {
1478 bars_deque.push_front(*bar);
1479 }
1480 Ok(())
1481 }
1482
1483 pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1489 log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1490
1491 if self.config.save_market_data
1492 && let Some(_database) = &mut self.database
1493 {
1494 }
1496
1497 self.greeks.insert(greeks.instrument_id, greeks);
1498 Ok(())
1499 }
1500
1501 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1503 self.greeks.get(instrument_id).cloned()
1504 }
1505
1506 pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1512 log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1513
1514 if self.config.save_market_data
1515 && let Some(_database) = &mut self.database
1516 {
1517 }
1519
1520 self.yield_curves
1521 .insert(yield_curve.curve_name.clone(), yield_curve);
1522 Ok(())
1523 }
1524
1525 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1527 self.yield_curves.get(key).map(|curve| {
1528 let curve_clone = curve.clone();
1529 Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1530 as Box<dyn Fn(f64) -> f64>
1531 })
1532 }
1533
1534 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1540 if self.currencies.contains_key(¤cy.code) {
1541 return Ok(());
1542 }
1543 log::debug!("Adding `Currency` {}", currency.code);
1544
1545 if let Some(database) = &mut self.database {
1546 database.add_currency(¤cy)?;
1547 }
1548
1549 self.currencies.insert(currency.code, currency);
1550 Ok(())
1551 }
1552
1553 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1559 log::debug!("Adding `Instrument` {}", instrument.id());
1560
1561 if let Some(base_currency) = instrument.base_currency() {
1563 self.add_currency(base_currency)?;
1564 }
1565 self.add_currency(instrument.quote_currency())?;
1566 self.add_currency(instrument.settlement_currency())?;
1567
1568 if let Some(database) = &mut self.database {
1569 database.add_instrument(&instrument)?;
1570 }
1571
1572 self.instruments.insert(instrument.id(), instrument);
1573 Ok(())
1574 }
1575
1576 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1582 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1583
1584 if let Some(database) = &mut self.database {
1585 database.add_synthetic(&synthetic)?;
1586 }
1587
1588 self.synthetics.insert(synthetic.id, synthetic);
1589 Ok(())
1590 }
1591
1592 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1598 log::debug!("Adding `Account` {}", account.id());
1599
1600 if let Some(database) = &mut self.database {
1601 database.add_account(&account)?;
1602 }
1603
1604 let account_id = account.id();
1605 self.accounts.insert(account_id, account);
1606 self.index
1607 .venue_account
1608 .insert(account_id.get_issuer(), account_id);
1609 Ok(())
1610 }
1611
1612 pub fn add_venue_order_id(
1620 &mut self,
1621 client_order_id: &ClientOrderId,
1622 venue_order_id: &VenueOrderId,
1623 overwrite: bool,
1624 ) -> anyhow::Result<()> {
1625 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1626 && !overwrite
1627 && existing_venue_order_id != venue_order_id
1628 {
1629 anyhow::bail!(
1630 "Existing {existing_venue_order_id} for {client_order_id}
1631 did not match the given {venue_order_id}.
1632 If you are writing a test then try a different `venue_order_id`,
1633 otherwise this is probably a bug."
1634 );
1635 }
1636
1637 self.index
1638 .client_order_ids
1639 .insert(*client_order_id, *venue_order_id);
1640 self.index
1641 .venue_order_ids
1642 .insert(*venue_order_id, *client_order_id);
1643
1644 Ok(())
1645 }
1646
1647 pub fn add_order(
1659 &mut self,
1660 order: OrderAny,
1661 position_id: Option<PositionId>,
1662 client_id: Option<ClientId>,
1663 replace_existing: bool,
1664 ) -> anyhow::Result<()> {
1665 let instrument_id = order.instrument_id();
1666 let venue = instrument_id.venue;
1667 let client_order_id = order.client_order_id();
1668 let strategy_id = order.strategy_id();
1669 let exec_algorithm_id = order.exec_algorithm_id();
1670 let exec_spawn_id = order.exec_spawn_id();
1671
1672 if !replace_existing {
1673 check_key_not_in_map(
1674 &client_order_id,
1675 &self.orders,
1676 stringify!(client_order_id),
1677 stringify!(orders),
1678 )?;
1679 }
1680
1681 log::debug!("Adding {order:?}");
1682
1683 self.index.orders.insert(client_order_id);
1684 self.index
1685 .order_strategy
1686 .insert(client_order_id, strategy_id);
1687 self.index.strategies.insert(strategy_id);
1688
1689 self.index
1691 .venue_orders
1692 .entry(venue)
1693 .or_default()
1694 .insert(client_order_id);
1695
1696 self.index
1698 .instrument_orders
1699 .entry(instrument_id)
1700 .or_default()
1701 .insert(client_order_id);
1702
1703 self.index
1705 .strategy_orders
1706 .entry(strategy_id)
1707 .or_default()
1708 .insert(client_order_id);
1709
1710 if let Some(exec_algorithm_id) = exec_algorithm_id {
1712 self.index.exec_algorithms.insert(exec_algorithm_id);
1713
1714 self.index
1715 .exec_algorithm_orders
1716 .entry(exec_algorithm_id)
1717 .or_default()
1718 .insert(client_order_id);
1719 }
1720
1721 if let Some(exec_spawn_id) = exec_spawn_id {
1723 self.index
1724 .exec_spawn_orders
1725 .entry(exec_spawn_id)
1726 .or_default()
1727 .insert(client_order_id);
1728 }
1729
1730 match order.emulation_trigger() {
1732 Some(_) => {
1733 self.index.orders_emulated.remove(&client_order_id);
1734 }
1735 None => {
1736 self.index.orders_emulated.insert(client_order_id);
1737 }
1738 }
1739
1740 if let Some(position_id) = position_id {
1742 self.add_position_id(
1743 &position_id,
1744 &order.instrument_id().venue,
1745 &client_order_id,
1746 &strategy_id,
1747 )?;
1748 }
1749
1750 if let Some(client_id) = client_id {
1752 self.index.order_client.insert(client_order_id, client_id);
1753 log::debug!("Indexed {client_id:?}");
1754 }
1755
1756 if let Some(database) = &mut self.database {
1757 database.add_order(&order, client_id)?;
1758 }
1763
1764 self.orders.insert(client_order_id, order);
1765
1766 Ok(())
1767 }
1768
1769 pub fn add_position_id(
1775 &mut self,
1776 position_id: &PositionId,
1777 venue: &Venue,
1778 client_order_id: &ClientOrderId,
1779 strategy_id: &StrategyId,
1780 ) -> anyhow::Result<()> {
1781 self.index
1782 .order_position
1783 .insert(*client_order_id, *position_id);
1784
1785 if let Some(database) = &mut self.database {
1787 database.index_order_position(*client_order_id, *position_id)?;
1788 }
1789
1790 self.index
1792 .position_strategy
1793 .insert(*position_id, *strategy_id);
1794
1795 self.index
1797 .position_orders
1798 .entry(*position_id)
1799 .or_default()
1800 .insert(*client_order_id);
1801
1802 self.index
1804 .strategy_positions
1805 .entry(*strategy_id)
1806 .or_default()
1807 .insert(*position_id);
1808
1809 self.index
1811 .venue_positions
1812 .entry(*venue)
1813 .or_default()
1814 .insert(*position_id);
1815
1816 Ok(())
1817 }
1818
1819 pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1825 self.positions.insert(position.id, position.clone());
1826 self.index.positions.insert(position.id);
1827 self.index.positions_open.insert(position.id);
1828 self.index.positions_closed.remove(&position.id); log::debug!("Adding {position}");
1831
1832 self.add_position_id(
1833 &position.id,
1834 &position.instrument_id.venue,
1835 &position.opening_order_id,
1836 &position.strategy_id,
1837 )?;
1838
1839 let venue = position.instrument_id.venue;
1840 let venue_positions = self.index.venue_positions.entry(venue).or_default();
1841 venue_positions.insert(position.id);
1842
1843 let instrument_id = position.instrument_id;
1845 let instrument_positions = self
1846 .index
1847 .instrument_positions
1848 .entry(instrument_id)
1849 .or_default();
1850 instrument_positions.insert(position.id);
1851
1852 if let Some(database) = &mut self.database {
1853 database.add_position(&position)?;
1854 }
1863
1864 Ok(())
1865 }
1866
1867 pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1873 if let Some(database) = &mut self.database {
1874 database.update_account(&account)?;
1875 }
1876 Ok(())
1877 }
1878
1879 pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1885 let client_order_id = order.client_order_id();
1886
1887 if let Some(venue_order_id) = order.venue_order_id() {
1889 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1892 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1894 }
1895 }
1896
1897 if order.is_inflight() {
1899 self.index.orders_inflight.insert(client_order_id);
1900 } else {
1901 self.index.orders_inflight.remove(&client_order_id);
1902 }
1903
1904 if order.is_open() {
1906 self.index.orders_closed.remove(&client_order_id);
1907 self.index.orders_open.insert(client_order_id);
1908 } else if order.is_closed() {
1909 self.index.orders_open.remove(&client_order_id);
1910 self.index.orders_pending_cancel.remove(&client_order_id);
1911 self.index.orders_closed.insert(client_order_id);
1912 }
1913
1914 if let Some(emulation_trigger) = order.emulation_trigger() {
1916 match emulation_trigger {
1917 TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1918 _ => self.index.orders_emulated.insert(client_order_id),
1919 };
1920 }
1921
1922 if self.own_order_book(&order.instrument_id()).is_some()
1924 && should_handle_own_book_order(order)
1925 {
1926 self.update_own_order_book(order);
1927 }
1928
1929 if let Some(database) = &mut self.database {
1930 database.update_order(order.last_event())?;
1931 }
1936
1937 self.orders.insert(client_order_id, order.clone());
1939
1940 Ok(())
1941 }
1942
1943 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1945 self.index
1946 .orders_pending_cancel
1947 .insert(order.client_order_id());
1948 }
1949
1950 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1956 if position.is_open() {
1959 self.index.positions_open.insert(position.id);
1960 self.index.positions_closed.remove(&position.id);
1961 } else {
1962 self.index.positions_closed.insert(position.id);
1963 self.index.positions_open.remove(&position.id);
1964 }
1965
1966 if let Some(database) = &mut self.database {
1967 database.update_position(position)?;
1968 }
1973
1974 self.positions.insert(position.id, position.clone());
1975
1976 Ok(())
1977 }
1978
1979 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1986 let position_id = position.id;
1987
1988 let mut copied_position = position.clone();
1989 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1990 copied_position.id = PositionId::new(new_id);
1991
1992 let position_serialized = serde_json::to_vec(&copied_position)?;
1994
1995 let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1996 let new_snapshots = match snapshots {
1997 Some(existing_snapshots) => {
1998 let mut combined = existing_snapshots.to_vec();
1999 combined.extend(position_serialized);
2000 Bytes::from(combined)
2001 }
2002 None => Bytes::from(position_serialized),
2003 };
2004 self.position_snapshots.insert(position_id, new_snapshots);
2005
2006 log::debug!("Snapshot {copied_position}");
2007 Ok(())
2008 }
2009
2010 pub fn snapshot_position_state(
2016 &mut self,
2017 position: &Position,
2018 open_only: Option<bool>,
2021 ) -> anyhow::Result<()> {
2022 let open_only = open_only.unwrap_or(true);
2023
2024 if open_only && !position.is_open() {
2025 return Ok(());
2026 }
2027
2028 if let Some(database) = &mut self.database {
2029 database.snapshot_position_state(position).map_err(|e| {
2030 log::error!(
2031 "Failed to snapshot position state for {}: {e:?}",
2032 position.id
2033 );
2034 e
2035 })?;
2036 } else {
2037 log::warn!(
2038 "Cannot snapshot position state for {} (no database configured)",
2039 position.id
2040 );
2041 }
2042
2043 todo!()
2045 }
2046
2047 #[must_use]
2049 pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2050 if self.index.position_strategy.contains_key(position_id) {
2052 Some(OmsType::Netting)
2055 } else {
2056 None
2057 }
2058 }
2059
2060 #[must_use]
2062 pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
2063 self.position_snapshots.get(position_id).map(|b| b.to_vec())
2064 }
2065
2066 #[must_use]
2068 pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2069 let mut result = AHashSet::new();
2071 for (position_id, _) in &self.position_snapshots {
2072 if let Some(position) = self.positions.get(position_id)
2074 && position.instrument_id == *instrument_id
2075 {
2076 result.insert(*position_id);
2077 }
2078 }
2079 result
2080 }
2081
2082 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2088 let database = if let Some(database) = &self.database {
2089 database
2090 } else {
2091 log::warn!(
2092 "Cannot snapshot order state for {} (no database configured)",
2093 order.client_order_id()
2094 );
2095 return Ok(());
2096 };
2097
2098 database.snapshot_order_state(order)
2099 }
2100
2101 fn build_order_query_filter_set(
2104 &self,
2105 venue: Option<&Venue>,
2106 instrument_id: Option<&InstrumentId>,
2107 strategy_id: Option<&StrategyId>,
2108 ) -> Option<AHashSet<ClientOrderId>> {
2109 let mut query: Option<AHashSet<ClientOrderId>> = None;
2110
2111 if let Some(venue) = venue {
2112 query = Some(
2113 self.index
2114 .venue_orders
2115 .get(venue)
2116 .cloned()
2117 .unwrap_or_default(),
2118 );
2119 }
2120
2121 if let Some(instrument_id) = instrument_id {
2122 let instrument_orders = self
2123 .index
2124 .instrument_orders
2125 .get(instrument_id)
2126 .cloned()
2127 .unwrap_or_default();
2128
2129 if let Some(existing_query) = &mut query {
2130 *existing_query = existing_query
2131 .intersection(&instrument_orders)
2132 .copied()
2133 .collect();
2134 } else {
2135 query = Some(instrument_orders);
2136 }
2137 }
2138
2139 if let Some(strategy_id) = strategy_id {
2140 let strategy_orders = self
2141 .index
2142 .strategy_orders
2143 .get(strategy_id)
2144 .cloned()
2145 .unwrap_or_default();
2146
2147 if let Some(existing_query) = &mut query {
2148 *existing_query = existing_query
2149 .intersection(&strategy_orders)
2150 .copied()
2151 .collect();
2152 } else {
2153 query = Some(strategy_orders);
2154 }
2155 }
2156
2157 query
2158 }
2159
2160 fn build_position_query_filter_set(
2161 &self,
2162 venue: Option<&Venue>,
2163 instrument_id: Option<&InstrumentId>,
2164 strategy_id: Option<&StrategyId>,
2165 ) -> Option<AHashSet<PositionId>> {
2166 let mut query: Option<AHashSet<PositionId>> = None;
2167
2168 if let Some(venue) = venue {
2169 query = Some(
2170 self.index
2171 .venue_positions
2172 .get(venue)
2173 .cloned()
2174 .unwrap_or_default(),
2175 );
2176 }
2177
2178 if let Some(instrument_id) = instrument_id {
2179 let instrument_positions = self
2180 .index
2181 .instrument_positions
2182 .get(instrument_id)
2183 .cloned()
2184 .unwrap_or_default();
2185
2186 if let Some(existing_query) = query {
2187 query = Some(
2188 existing_query
2189 .intersection(&instrument_positions)
2190 .copied()
2191 .collect(),
2192 );
2193 } else {
2194 query = Some(instrument_positions);
2195 }
2196 }
2197
2198 if let Some(strategy_id) = strategy_id {
2199 let strategy_positions = self
2200 .index
2201 .strategy_positions
2202 .get(strategy_id)
2203 .cloned()
2204 .unwrap_or_default();
2205
2206 if let Some(existing_query) = query {
2207 query = Some(
2208 existing_query
2209 .intersection(&strategy_positions)
2210 .copied()
2211 .collect(),
2212 );
2213 } else {
2214 query = Some(strategy_positions);
2215 }
2216 }
2217
2218 query
2219 }
2220
2221 fn get_orders_for_ids(
2227 &self,
2228 client_order_ids: &AHashSet<ClientOrderId>,
2229 side: Option<OrderSide>,
2230 ) -> Vec<&OrderAny> {
2231 let side = side.unwrap_or(OrderSide::NoOrderSide);
2232 let mut orders = Vec::new();
2233
2234 for client_order_id in client_order_ids {
2235 let order = self
2236 .orders
2237 .get(client_order_id)
2238 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2239 if side == OrderSide::NoOrderSide || side == order.order_side() {
2240 orders.push(order);
2241 }
2242 }
2243
2244 orders
2245 }
2246
2247 fn get_positions_for_ids(
2253 &self,
2254 position_ids: &AHashSet<PositionId>,
2255 side: Option<PositionSide>,
2256 ) -> Vec<&Position> {
2257 let side = side.unwrap_or(PositionSide::NoPositionSide);
2258 let mut positions = Vec::new();
2259
2260 for position_id in position_ids {
2261 let position = self
2262 .positions
2263 .get(position_id)
2264 .unwrap_or_else(|| panic!("Position {position_id} not found"));
2265 if side == PositionSide::NoPositionSide || side == position.side {
2266 positions.push(position);
2267 }
2268 }
2269
2270 positions
2271 }
2272
2273 #[must_use]
2275 pub fn client_order_ids(
2276 &self,
2277 venue: Option<&Venue>,
2278 instrument_id: Option<&InstrumentId>,
2279 strategy_id: Option<&StrategyId>,
2280 ) -> AHashSet<ClientOrderId> {
2281 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2282 match query {
2283 Some(query) => self.index.orders.intersection(&query).copied().collect(),
2284 None => self.index.orders.clone(),
2285 }
2286 }
2287
2288 #[must_use]
2290 pub fn client_order_ids_open(
2291 &self,
2292 venue: Option<&Venue>,
2293 instrument_id: Option<&InstrumentId>,
2294 strategy_id: Option<&StrategyId>,
2295 ) -> AHashSet<ClientOrderId> {
2296 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2297 match query {
2298 Some(query) => self
2299 .index
2300 .orders_open
2301 .intersection(&query)
2302 .copied()
2303 .collect(),
2304 None => self.index.orders_open.clone(),
2305 }
2306 }
2307
2308 #[must_use]
2310 pub fn client_order_ids_closed(
2311 &self,
2312 venue: Option<&Venue>,
2313 instrument_id: Option<&InstrumentId>,
2314 strategy_id: Option<&StrategyId>,
2315 ) -> AHashSet<ClientOrderId> {
2316 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2317 match query {
2318 Some(query) => self
2319 .index
2320 .orders_closed
2321 .intersection(&query)
2322 .copied()
2323 .collect(),
2324 None => self.index.orders_closed.clone(),
2325 }
2326 }
2327
2328 #[must_use]
2330 pub fn client_order_ids_emulated(
2331 &self,
2332 venue: Option<&Venue>,
2333 instrument_id: Option<&InstrumentId>,
2334 strategy_id: Option<&StrategyId>,
2335 ) -> AHashSet<ClientOrderId> {
2336 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2337 match query {
2338 Some(query) => self
2339 .index
2340 .orders_emulated
2341 .intersection(&query)
2342 .copied()
2343 .collect(),
2344 None => self.index.orders_emulated.clone(),
2345 }
2346 }
2347
2348 #[must_use]
2350 pub fn client_order_ids_inflight(
2351 &self,
2352 venue: Option<&Venue>,
2353 instrument_id: Option<&InstrumentId>,
2354 strategy_id: Option<&StrategyId>,
2355 ) -> AHashSet<ClientOrderId> {
2356 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2357 match query {
2358 Some(query) => self
2359 .index
2360 .orders_inflight
2361 .intersection(&query)
2362 .copied()
2363 .collect(),
2364 None => self.index.orders_inflight.clone(),
2365 }
2366 }
2367
2368 #[must_use]
2370 pub fn position_ids(
2371 &self,
2372 venue: Option<&Venue>,
2373 instrument_id: Option<&InstrumentId>,
2374 strategy_id: Option<&StrategyId>,
2375 ) -> AHashSet<PositionId> {
2376 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2377 match query {
2378 Some(query) => self.index.positions.intersection(&query).copied().collect(),
2379 None => self.index.positions.clone(),
2380 }
2381 }
2382
2383 #[must_use]
2385 pub fn position_open_ids(
2386 &self,
2387 venue: Option<&Venue>,
2388 instrument_id: Option<&InstrumentId>,
2389 strategy_id: Option<&StrategyId>,
2390 ) -> AHashSet<PositionId> {
2391 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2392 match query {
2393 Some(query) => self
2394 .index
2395 .positions_open
2396 .intersection(&query)
2397 .copied()
2398 .collect(),
2399 None => self.index.positions_open.clone(),
2400 }
2401 }
2402
2403 #[must_use]
2405 pub fn position_closed_ids(
2406 &self,
2407 venue: Option<&Venue>,
2408 instrument_id: Option<&InstrumentId>,
2409 strategy_id: Option<&StrategyId>,
2410 ) -> AHashSet<PositionId> {
2411 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2412 match query {
2413 Some(query) => self
2414 .index
2415 .positions_closed
2416 .intersection(&query)
2417 .copied()
2418 .collect(),
2419 None => self.index.positions_closed.clone(),
2420 }
2421 }
2422
2423 #[must_use]
2425 pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2426 self.index.actors.clone()
2427 }
2428
2429 #[must_use]
2431 pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2432 self.index.strategies.clone()
2433 }
2434
2435 #[must_use]
2437 pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2438 self.index.exec_algorithms.clone()
2439 }
2440
2441 #[must_use]
2445 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2446 self.orders.get(client_order_id)
2447 }
2448
2449 #[must_use]
2451 pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2452 self.orders.get_mut(client_order_id)
2453 }
2454
2455 #[must_use]
2457 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2458 self.index.venue_order_ids.get(venue_order_id)
2459 }
2460
2461 #[must_use]
2463 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2464 self.index.client_order_ids.get(client_order_id)
2465 }
2466
2467 #[must_use]
2469 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2470 self.index.order_client.get(client_order_id)
2471 }
2472
2473 #[must_use]
2475 pub fn orders(
2476 &self,
2477 venue: Option<&Venue>,
2478 instrument_id: Option<&InstrumentId>,
2479 strategy_id: Option<&StrategyId>,
2480 side: Option<OrderSide>,
2481 ) -> Vec<&OrderAny> {
2482 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2483 self.get_orders_for_ids(&client_order_ids, side)
2484 }
2485
2486 #[must_use]
2488 pub fn orders_open(
2489 &self,
2490 venue: Option<&Venue>,
2491 instrument_id: Option<&InstrumentId>,
2492 strategy_id: Option<&StrategyId>,
2493 side: Option<OrderSide>,
2494 ) -> Vec<&OrderAny> {
2495 let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2496 self.get_orders_for_ids(&client_order_ids, side)
2497 }
2498
2499 #[must_use]
2501 pub fn orders_closed(
2502 &self,
2503 venue: Option<&Venue>,
2504 instrument_id: Option<&InstrumentId>,
2505 strategy_id: Option<&StrategyId>,
2506 side: Option<OrderSide>,
2507 ) -> Vec<&OrderAny> {
2508 let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2509 self.get_orders_for_ids(&client_order_ids, side)
2510 }
2511
2512 #[must_use]
2514 pub fn orders_emulated(
2515 &self,
2516 venue: Option<&Venue>,
2517 instrument_id: Option<&InstrumentId>,
2518 strategy_id: Option<&StrategyId>,
2519 side: Option<OrderSide>,
2520 ) -> Vec<&OrderAny> {
2521 let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2522 self.get_orders_for_ids(&client_order_ids, side)
2523 }
2524
2525 #[must_use]
2527 pub fn orders_inflight(
2528 &self,
2529 venue: Option<&Venue>,
2530 instrument_id: Option<&InstrumentId>,
2531 strategy_id: Option<&StrategyId>,
2532 side: Option<OrderSide>,
2533 ) -> Vec<&OrderAny> {
2534 let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2535 self.get_orders_for_ids(&client_order_ids, side)
2536 }
2537
2538 #[must_use]
2540 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2541 let client_order_ids = self.index.position_orders.get(position_id);
2542 match client_order_ids {
2543 Some(client_order_ids) => {
2544 self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2545 }
2546 None => Vec::new(),
2547 }
2548 }
2549
2550 #[must_use]
2552 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2553 self.index.orders.contains(client_order_id)
2554 }
2555
2556 #[must_use]
2558 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2559 self.index.orders_open.contains(client_order_id)
2560 }
2561
2562 #[must_use]
2564 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2565 self.index.orders_closed.contains(client_order_id)
2566 }
2567
2568 #[must_use]
2570 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2571 self.index.orders_emulated.contains(client_order_id)
2572 }
2573
2574 #[must_use]
2576 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2577 self.index.orders_inflight.contains(client_order_id)
2578 }
2579
2580 #[must_use]
2582 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2583 self.index.orders_pending_cancel.contains(client_order_id)
2584 }
2585
2586 #[must_use]
2588 pub fn orders_open_count(
2589 &self,
2590 venue: Option<&Venue>,
2591 instrument_id: Option<&InstrumentId>,
2592 strategy_id: Option<&StrategyId>,
2593 side: Option<OrderSide>,
2594 ) -> usize {
2595 self.orders_open(venue, instrument_id, strategy_id, side)
2596 .len()
2597 }
2598
2599 #[must_use]
2601 pub fn orders_closed_count(
2602 &self,
2603 venue: Option<&Venue>,
2604 instrument_id: Option<&InstrumentId>,
2605 strategy_id: Option<&StrategyId>,
2606 side: Option<OrderSide>,
2607 ) -> usize {
2608 self.orders_closed(venue, instrument_id, strategy_id, side)
2609 .len()
2610 }
2611
2612 #[must_use]
2614 pub fn orders_emulated_count(
2615 &self,
2616 venue: Option<&Venue>,
2617 instrument_id: Option<&InstrumentId>,
2618 strategy_id: Option<&StrategyId>,
2619 side: Option<OrderSide>,
2620 ) -> usize {
2621 self.orders_emulated(venue, instrument_id, strategy_id, side)
2622 .len()
2623 }
2624
2625 #[must_use]
2627 pub fn orders_inflight_count(
2628 &self,
2629 venue: Option<&Venue>,
2630 instrument_id: Option<&InstrumentId>,
2631 strategy_id: Option<&StrategyId>,
2632 side: Option<OrderSide>,
2633 ) -> usize {
2634 self.orders_inflight(venue, instrument_id, strategy_id, side)
2635 .len()
2636 }
2637
2638 #[must_use]
2640 pub fn orders_total_count(
2641 &self,
2642 venue: Option<&Venue>,
2643 instrument_id: Option<&InstrumentId>,
2644 strategy_id: Option<&StrategyId>,
2645 side: Option<OrderSide>,
2646 ) -> usize {
2647 self.orders(venue, instrument_id, strategy_id, side).len()
2648 }
2649
2650 #[must_use]
2652 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2653 self.order_lists.get(order_list_id)
2654 }
2655
2656 #[must_use]
2658 pub fn order_lists(
2659 &self,
2660 venue: Option<&Venue>,
2661 instrument_id: Option<&InstrumentId>,
2662 strategy_id: Option<&StrategyId>,
2663 ) -> Vec<&OrderList> {
2664 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2665
2666 if let Some(venue) = venue {
2667 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2668 }
2669
2670 if let Some(instrument_id) = instrument_id {
2671 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2672 }
2673
2674 if let Some(strategy_id) = strategy_id {
2675 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2676 }
2677
2678 order_lists
2679 }
2680
2681 #[must_use]
2683 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2684 self.order_lists.contains_key(order_list_id)
2685 }
2686
2687 #[must_use]
2692 pub fn orders_for_exec_algorithm(
2693 &self,
2694 exec_algorithm_id: &ExecAlgorithmId,
2695 venue: Option<&Venue>,
2696 instrument_id: Option<&InstrumentId>,
2697 strategy_id: Option<&StrategyId>,
2698 side: Option<OrderSide>,
2699 ) -> Vec<&OrderAny> {
2700 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2701 let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2702
2703 if let Some(query) = query
2704 && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
2705 {
2706 let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2707 }
2708
2709 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2710 self.get_orders_for_ids(exec_algorithm_order_ids, side)
2711 } else {
2712 Vec::new()
2713 }
2714 }
2715
2716 #[must_use]
2718 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2719 self.get_orders_for_ids(
2720 self.index
2721 .exec_spawn_orders
2722 .get(exec_spawn_id)
2723 .unwrap_or(&AHashSet::new()),
2724 None,
2725 )
2726 }
2727
2728 #[must_use]
2730 pub fn exec_spawn_total_quantity(
2731 &self,
2732 exec_spawn_id: &ClientOrderId,
2733 active_only: bool,
2734 ) -> Option<Quantity> {
2735 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2736
2737 let mut total_quantity: Option<Quantity> = None;
2738
2739 for spawn_order in exec_spawn_orders {
2740 if active_only && spawn_order.is_closed() {
2741 continue;
2742 }
2743
2744 match total_quantity.as_mut() {
2745 Some(total) => *total += spawn_order.quantity(),
2746 None => total_quantity = Some(spawn_order.quantity()),
2747 }
2748 }
2749
2750 total_quantity
2751 }
2752
2753 #[must_use]
2755 pub fn exec_spawn_total_filled_qty(
2756 &self,
2757 exec_spawn_id: &ClientOrderId,
2758 active_only: bool,
2759 ) -> Option<Quantity> {
2760 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2761
2762 let mut total_quantity: Option<Quantity> = None;
2763
2764 for spawn_order in exec_spawn_orders {
2765 if active_only && spawn_order.is_closed() {
2766 continue;
2767 }
2768
2769 match total_quantity.as_mut() {
2770 Some(total) => *total += spawn_order.filled_qty(),
2771 None => total_quantity = Some(spawn_order.filled_qty()),
2772 }
2773 }
2774
2775 total_quantity
2776 }
2777
2778 #[must_use]
2780 pub fn exec_spawn_total_leaves_qty(
2781 &self,
2782 exec_spawn_id: &ClientOrderId,
2783 active_only: bool,
2784 ) -> Option<Quantity> {
2785 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2786
2787 let mut total_quantity: Option<Quantity> = None;
2788
2789 for spawn_order in exec_spawn_orders {
2790 if active_only && spawn_order.is_closed() {
2791 continue;
2792 }
2793
2794 match total_quantity.as_mut() {
2795 Some(total) => *total += spawn_order.leaves_qty(),
2796 None => total_quantity = Some(spawn_order.leaves_qty()),
2797 }
2798 }
2799
2800 total_quantity
2801 }
2802
2803 #[must_use]
2807 pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2808 self.positions.get(position_id)
2809 }
2810
2811 #[must_use]
2813 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2814 self.index
2815 .order_position
2816 .get(client_order_id)
2817 .and_then(|position_id| self.positions.get(position_id))
2818 }
2819
2820 #[must_use]
2822 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2823 self.index.order_position.get(client_order_id)
2824 }
2825
2826 #[must_use]
2828 pub fn positions(
2829 &self,
2830 venue: Option<&Venue>,
2831 instrument_id: Option<&InstrumentId>,
2832 strategy_id: Option<&StrategyId>,
2833 side: Option<PositionSide>,
2834 ) -> Vec<&Position> {
2835 let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2836 self.get_positions_for_ids(&position_ids, side)
2837 }
2838
2839 #[must_use]
2841 pub fn positions_open(
2842 &self,
2843 venue: Option<&Venue>,
2844 instrument_id: Option<&InstrumentId>,
2845 strategy_id: Option<&StrategyId>,
2846 side: Option<PositionSide>,
2847 ) -> Vec<&Position> {
2848 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2849 self.get_positions_for_ids(&position_ids, side)
2850 }
2851
2852 #[must_use]
2854 pub fn positions_closed(
2855 &self,
2856 venue: Option<&Venue>,
2857 instrument_id: Option<&InstrumentId>,
2858 strategy_id: Option<&StrategyId>,
2859 side: Option<PositionSide>,
2860 ) -> Vec<&Position> {
2861 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2862 self.get_positions_for_ids(&position_ids, side)
2863 }
2864
2865 #[must_use]
2867 pub fn position_exists(&self, position_id: &PositionId) -> bool {
2868 self.index.positions.contains(position_id)
2869 }
2870
2871 #[must_use]
2873 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2874 self.index.positions_open.contains(position_id)
2875 }
2876
2877 #[must_use]
2879 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2880 self.index.positions_closed.contains(position_id)
2881 }
2882
2883 #[must_use]
2885 pub fn positions_open_count(
2886 &self,
2887 venue: Option<&Venue>,
2888 instrument_id: Option<&InstrumentId>,
2889 strategy_id: Option<&StrategyId>,
2890 side: Option<PositionSide>,
2891 ) -> usize {
2892 self.positions_open(venue, instrument_id, strategy_id, side)
2893 .len()
2894 }
2895
2896 #[must_use]
2898 pub fn positions_closed_count(
2899 &self,
2900 venue: Option<&Venue>,
2901 instrument_id: Option<&InstrumentId>,
2902 strategy_id: Option<&StrategyId>,
2903 side: Option<PositionSide>,
2904 ) -> usize {
2905 self.positions_closed(venue, instrument_id, strategy_id, side)
2906 .len()
2907 }
2908
2909 #[must_use]
2911 pub fn positions_total_count(
2912 &self,
2913 venue: Option<&Venue>,
2914 instrument_id: Option<&InstrumentId>,
2915 strategy_id: Option<&StrategyId>,
2916 side: Option<PositionSide>,
2917 ) -> usize {
2918 self.positions(venue, instrument_id, strategy_id, side)
2919 .len()
2920 }
2921
2922 #[must_use]
2926 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2927 self.index.order_strategy.get(client_order_id)
2928 }
2929
2930 #[must_use]
2932 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2933 self.index.position_strategy.get(position_id)
2934 }
2935
2936 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2944 check_valid_string_ascii(key, stringify!(key))?;
2945
2946 Ok(self.general.get(key))
2947 }
2948
2949 #[must_use]
2953 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2954 match price_type {
2955 PriceType::Bid => self
2956 .quotes
2957 .get(instrument_id)
2958 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2959 PriceType::Ask => self
2960 .quotes
2961 .get(instrument_id)
2962 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2963 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2964 quotes.front().map(|quote| {
2965 Price::new(
2966 f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2967 quote.bid_price.precision + 1,
2968 )
2969 })
2970 }),
2971 PriceType::Last => self
2972 .trades
2973 .get(instrument_id)
2974 .and_then(|trades| trades.front().map(|trade| trade.price)),
2975 PriceType::Mark => self
2976 .mark_prices
2977 .get(instrument_id)
2978 .and_then(|marks| marks.front().map(|mark| mark.value)),
2979 }
2980 }
2981
2982 #[must_use]
2984 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2985 self.quotes
2986 .get(instrument_id)
2987 .map(|quotes| quotes.iter().copied().collect())
2988 }
2989
2990 #[must_use]
2992 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2993 self.trades
2994 .get(instrument_id)
2995 .map(|trades| trades.iter().copied().collect())
2996 }
2997
2998 #[must_use]
3000 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
3001 self.mark_prices
3002 .get(instrument_id)
3003 .map(|mark_prices| mark_prices.iter().copied().collect())
3004 }
3005
3006 #[must_use]
3008 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
3009 self.index_prices
3010 .get(instrument_id)
3011 .map(|index_prices| index_prices.iter().copied().collect())
3012 }
3013
3014 #[must_use]
3016 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
3017 self.bars
3018 .get(bar_type)
3019 .map(|bars| bars.iter().copied().collect())
3020 }
3021
3022 #[must_use]
3024 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3025 self.books.get(instrument_id)
3026 }
3027
3028 #[must_use]
3030 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3031 self.books.get_mut(instrument_id)
3032 }
3033
3034 #[must_use]
3036 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3037 self.own_books.get(instrument_id)
3038 }
3039
3040 #[must_use]
3042 pub fn own_order_book_mut(
3043 &mut self,
3044 instrument_id: &InstrumentId,
3045 ) -> Option<&mut OwnOrderBook> {
3046 self.own_books.get_mut(instrument_id)
3047 }
3048
3049 #[must_use]
3051 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3052 self.quotes
3053 .get(instrument_id)
3054 .and_then(|quotes| quotes.front())
3055 }
3056
3057 #[must_use]
3059 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3060 self.trades
3061 .get(instrument_id)
3062 .and_then(|trades| trades.front())
3063 }
3064
3065 #[must_use]
3067 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3068 self.mark_prices
3069 .get(instrument_id)
3070 .and_then(|mark_prices| mark_prices.front())
3071 }
3072
3073 #[must_use]
3075 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3076 self.index_prices
3077 .get(instrument_id)
3078 .and_then(|index_prices| index_prices.front())
3079 }
3080
3081 #[must_use]
3083 pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3084 self.funding_rates.get(instrument_id)
3085 }
3086
3087 #[must_use]
3089 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3090 self.bars.get(bar_type).and_then(|bars| bars.front())
3091 }
3092
3093 #[must_use]
3095 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3096 self.books
3097 .get(instrument_id)
3098 .map_or(0, |book| book.update_count) as usize
3099 }
3100
3101 #[must_use]
3103 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3104 self.quotes
3105 .get(instrument_id)
3106 .map_or(0, std::collections::VecDeque::len)
3107 }
3108
3109 #[must_use]
3111 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3112 self.trades
3113 .get(instrument_id)
3114 .map_or(0, std::collections::VecDeque::len)
3115 }
3116
3117 #[must_use]
3119 pub fn bar_count(&self, bar_type: &BarType) -> usize {
3120 self.bars
3121 .get(bar_type)
3122 .map_or(0, std::collections::VecDeque::len)
3123 }
3124
3125 #[must_use]
3127 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3128 self.books.contains_key(instrument_id)
3129 }
3130
3131 #[must_use]
3133 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3134 self.quote_count(instrument_id) > 0
3135 }
3136
3137 #[must_use]
3139 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3140 self.trade_count(instrument_id) > 0
3141 }
3142
3143 #[must_use]
3145 pub fn has_bars(&self, bar_type: &BarType) -> bool {
3146 self.bar_count(bar_type) > 0
3147 }
3148
3149 #[must_use]
3150 pub fn get_xrate(
3151 &self,
3152 venue: Venue,
3153 from_currency: Currency,
3154 to_currency: Currency,
3155 price_type: PriceType,
3156 ) -> Option<f64> {
3157 if from_currency == to_currency {
3158 return Some(1.0);
3161 }
3162
3163 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3164
3165 match get_exchange_rate(
3166 from_currency.code,
3167 to_currency.code,
3168 price_type,
3169 bid_quote,
3170 ask_quote,
3171 ) {
3172 Ok(rate) => rate,
3173 Err(e) => {
3174 log::error!("Failed to calculate xrate: {e}");
3175 None
3176 }
3177 }
3178 }
3179
3180 fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3181 let mut bid_quotes = AHashMap::new();
3182 let mut ask_quotes = AHashMap::new();
3183
3184 for instrument_id in self.instruments.keys() {
3185 if instrument_id.venue != *venue {
3186 continue;
3187 }
3188
3189 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3190 if let Some(tick) = ticks.front() {
3191 (tick.bid_price, tick.ask_price)
3192 } else {
3193 continue; }
3195 } else {
3196 let bid_bar = self
3197 .bars
3198 .iter()
3199 .find(|(k, _)| {
3200 k.instrument_id() == *instrument_id
3201 && matches!(k.spec().price_type, PriceType::Bid)
3202 })
3203 .map(|(_, v)| v);
3204
3205 let ask_bar = self
3206 .bars
3207 .iter()
3208 .find(|(k, _)| {
3209 k.instrument_id() == *instrument_id
3210 && matches!(k.spec().price_type, PriceType::Ask)
3211 })
3212 .map(|(_, v)| v);
3213
3214 match (bid_bar, ask_bar) {
3215 (Some(bid), Some(ask)) => {
3216 match (bid.front(), ask.front()) {
3217 (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
3218 _ => {
3219 continue;
3221 }
3222 }
3223 }
3224 _ => continue,
3225 }
3226 };
3227
3228 bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3229 ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3230 }
3231
3232 (bid_quotes, ask_quotes)
3233 }
3234
3235 #[must_use]
3237 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3238 self.mark_xrates.get(&(from_currency, to_currency)).copied()
3239 }
3240
3241 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3247 assert!(xrate > 0.0, "xrate was zero");
3248 self.mark_xrates.insert((from_currency, to_currency), xrate);
3249 self.mark_xrates
3250 .insert((to_currency, from_currency), 1.0 / xrate);
3251 }
3252
3253 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3255 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3256 }
3257
3258 pub fn clear_mark_xrates(&mut self) {
3260 self.mark_xrates.clear();
3261 }
3262
3263 #[must_use]
3267 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3268 self.instruments.get(instrument_id)
3269 }
3270
3271 #[must_use]
3273 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3274 match venue {
3275 Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3276 None => self.instruments.keys().collect(),
3277 }
3278 }
3279
3280 #[must_use]
3282 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3283 self.instruments
3284 .values()
3285 .filter(|i| &i.id().venue == venue)
3286 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3287 .collect()
3288 }
3289
3290 #[must_use]
3292 pub fn bar_types(
3293 &self,
3294 instrument_id: Option<&InstrumentId>,
3295 price_type: Option<&PriceType>,
3296 aggregation_source: AggregationSource,
3297 ) -> Vec<&BarType> {
3298 let mut bar_types = self
3299 .bars
3300 .keys()
3301 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3302 .collect::<Vec<&BarType>>();
3303
3304 if let Some(instrument_id) = instrument_id {
3305 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3306 }
3307
3308 if let Some(price_type) = price_type {
3309 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3310 }
3311
3312 bar_types
3313 }
3314
3315 #[must_use]
3319 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3320 self.synthetics.get(instrument_id)
3321 }
3322
3323 #[must_use]
3325 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3326 self.synthetics.keys().collect()
3327 }
3328
3329 #[must_use]
3331 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3332 self.synthetics.values().collect()
3333 }
3334
3335 #[must_use]
3339 pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3340 self.accounts.get(account_id)
3341 }
3342
3343 #[must_use]
3345 pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3346 self.index
3347 .venue_account
3348 .get(venue)
3349 .and_then(|account_id| self.accounts.get(account_id))
3350 }
3351
3352 #[must_use]
3354 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3355 self.index.venue_account.get(venue)
3356 }
3357
3358 #[must_use]
3360 pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3361 self.accounts
3362 .values()
3363 .filter(|account| &account.id() == account_id)
3364 .collect()
3365 }
3366
3367 pub fn update_own_order_book(&mut self, order: &OrderAny) {
3375 if !order.has_price() {
3376 return;
3377 }
3378
3379 let instrument_id = order.instrument_id();
3380
3381 let own_book = self
3382 .own_books
3383 .entry(instrument_id)
3384 .or_insert_with(|| OwnOrderBook::new(instrument_id));
3385
3386 let own_book_order = order.to_own_book_order();
3387
3388 if order.is_closed() {
3389 if let Err(e) = own_book.delete(own_book_order) {
3390 log::debug!(
3391 "Failed to delete order {} from own book: {e}",
3392 order.client_order_id(),
3393 );
3394 } else {
3395 log::debug!("Deleted order {} from own book", order.client_order_id());
3396 }
3397 } else {
3398 if let Err(e) = own_book.update(own_book_order) {
3400 log::debug!(
3401 "Failed to update order {} in own book: {e}; inserting instead",
3402 order.client_order_id(),
3403 );
3404 own_book.add(own_book_order);
3405 }
3406 log::debug!("Updated order {} in own book", order.client_order_id());
3407 }
3408 }
3409
3410 pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
3416 let order = match self.orders.get(client_order_id) {
3417 Some(order) => order,
3418 None => return,
3419 };
3420
3421 self.index.orders_open.remove(client_order_id);
3422 self.index.orders_pending_cancel.remove(client_order_id);
3423 self.index.orders_inflight.remove(client_order_id);
3424 self.index.orders_emulated.remove(client_order_id);
3425
3426 if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
3427 && order.has_price()
3428 {
3429 let own_book_order = order.to_own_book_order();
3430 if let Err(e) = own_book.delete(own_book_order) {
3431 log::debug!("Could not force delete {client_order_id} from own book: {e}");
3432 } else {
3433 log::debug!("Force deleted {client_order_id} from own book");
3434 }
3435 }
3436
3437 self.index.orders_closed.insert(*client_order_id);
3438 }
3439
3440 pub fn audit_own_order_books(&mut self) {
3447 log::debug!("Starting own books audit");
3448 let start = std::time::Instant::now();
3449
3450 let valid_order_ids: AHashSet<ClientOrderId> = self
3453 .index
3454 .orders_open
3455 .union(&self.index.orders_inflight)
3456 .copied()
3457 .collect();
3458
3459 for own_book in self.own_books.values_mut() {
3460 own_book.audit_open_orders(&valid_order_ids);
3461 }
3462
3463 log::debug!("Completed own books audit in {:?}", start.elapsed());
3464 }
3465}