1pub mod config;
21pub mod database;
22
23mod index;
24
25#[cfg(test)]
26mod tests;
27
28use std::{
29 collections::{HashSet, VecDeque},
30 fmt::Debug,
31 time::{SystemTime, UNIX_EPOCH},
32};
33
34use ahash::{AHashMap, AHashSet};
35use bytes::Bytes;
36pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
38use index::CacheIndex;
39use nautilus_core::{
40 UUID4, UnixNanos,
41 correctness::{
42 check_key_not_in_map, check_predicate_false, check_slice_not_empty,
43 check_valid_string_ascii,
44 },
45 datetime::secs_to_nanos,
46};
47use nautilus_model::{
48 accounts::{Account, AccountAny},
49 data::{
50 Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
51 TradeTick, YieldCurveData,
52 },
53 enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
54 identifiers::{
55 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
56 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
57 },
58 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
59 orderbook::{
60 OrderBook,
61 own::{OwnOrderBook, should_handle_own_book_order},
62 },
63 orders::{Order, OrderAny, OrderList},
64 position::Position,
65 types::{Currency, Money, Price, Quantity},
66};
67use ustr::Ustr;
68
69use crate::xrate::get_exchange_rate;
70
71#[cfg_attr(
73 feature = "python",
74 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
75)]
76pub struct Cache {
77 config: CacheConfig,
78 index: CacheIndex,
79 database: Option<Box<dyn CacheDatabaseAdapter>>,
80 general: AHashMap<String, Bytes>,
81 currencies: AHashMap<Ustr, Currency>,
82 instruments: AHashMap<InstrumentId, InstrumentAny>,
83 synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
84 books: AHashMap<InstrumentId, OrderBook>,
85 own_books: AHashMap<InstrumentId, OwnOrderBook>,
86 quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
87 trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
88 mark_xrates: AHashMap<(Currency, Currency), f64>,
89 mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
90 index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
91 funding_rates: AHashMap<InstrumentId, FundingRateUpdate>,
92 bars: AHashMap<BarType, VecDeque<Bar>>,
93 greeks: AHashMap<InstrumentId, GreeksData>,
94 yield_curves: AHashMap<String, YieldCurveData>,
95 accounts: AHashMap<AccountId, AccountAny>,
96 orders: AHashMap<ClientOrderId, OrderAny>,
97 order_lists: AHashMap<OrderListId, OrderList>,
98 positions: AHashMap<PositionId, Position>,
99 position_snapshots: AHashMap<PositionId, Bytes>,
100 #[cfg(feature = "defi")]
101 pub(crate) defi: crate::defi::cache::DefiCache,
102}
103
104impl Debug for Cache {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct(stringify!(Cache))
107 .field("config", &self.config)
108 .field("index", &self.index)
109 .field("general", &self.general)
110 .field("currencies", &self.currencies)
111 .field("instruments", &self.instruments)
112 .field("synthetics", &self.synthetics)
113 .field("books", &self.books)
114 .field("own_books", &self.own_books)
115 .field("quotes", &self.quotes)
116 .field("trades", &self.trades)
117 .field("mark_xrates", &self.mark_xrates)
118 .field("mark_prices", &self.mark_prices)
119 .field("index_prices", &self.index_prices)
120 .field("funding_rates", &self.funding_rates)
121 .field("bars", &self.bars)
122 .field("greeks", &self.greeks)
123 .field("yield_curves", &self.yield_curves)
124 .field("accounts", &self.accounts)
125 .field("orders", &self.orders)
126 .field("order_lists", &self.order_lists)
127 .field("positions", &self.positions)
128 .field("position_snapshots", &self.position_snapshots)
129 .finish()
130 }
131}
132
133impl Default for Cache {
134 fn default() -> Self {
136 Self::new(Some(CacheConfig::default()), None)
137 }
138}
139
140impl Cache {
141 #[must_use]
143 pub fn new(
147 config: Option<CacheConfig>,
148 database: Option<Box<dyn CacheDatabaseAdapter>>,
149 ) -> Self {
150 Self {
151 config: config.unwrap_or_default(),
152 index: CacheIndex::default(),
153 database,
154 general: AHashMap::new(),
155 currencies: AHashMap::new(),
156 instruments: AHashMap::new(),
157 synthetics: AHashMap::new(),
158 books: AHashMap::new(),
159 own_books: AHashMap::new(),
160 quotes: AHashMap::new(),
161 trades: AHashMap::new(),
162 mark_xrates: AHashMap::new(),
163 mark_prices: AHashMap::new(),
164 index_prices: AHashMap::new(),
165 funding_rates: AHashMap::new(),
166 bars: AHashMap::new(),
167 greeks: AHashMap::new(),
168 yield_curves: AHashMap::new(),
169 accounts: AHashMap::new(),
170 orders: AHashMap::new(),
171 order_lists: AHashMap::new(),
172 positions: AHashMap::new(),
173 position_snapshots: AHashMap::new(),
174 #[cfg(feature = "defi")]
175 defi: crate::defi::cache::DefiCache::default(),
176 }
177 }
178
179 #[must_use]
181 pub fn memory_address(&self) -> String {
182 format!("{:?}", std::ptr::from_ref(self))
183 }
184
185 pub fn cache_general(&mut self) -> anyhow::Result<()> {
193 self.general = match &mut self.database {
194 Some(db) => db.load()?,
195 None => AHashMap::new(),
196 };
197
198 log::info!(
199 "Cached {} general object(s) from database",
200 self.general.len()
201 );
202 Ok(())
203 }
204
205 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
211 let cache_map = match &self.database {
212 Some(db) => db.load_all().await?,
213 None => CacheMap::default(),
214 };
215
216 self.currencies = cache_map.currencies;
217 self.instruments = cache_map.instruments;
218 self.synthetics = cache_map.synthetics;
219 self.accounts = cache_map.accounts;
220 self.orders = cache_map.orders;
221 self.positions = cache_map.positions;
222 Ok(())
223 }
224
225 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
231 self.currencies = match &mut self.database {
232 Some(db) => db.load_currencies().await?,
233 None => AHashMap::new(),
234 };
235
236 log::info!("Cached {} currencies from database", self.general.len());
237 Ok(())
238 }
239
240 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
246 self.instruments = match &mut self.database {
247 Some(db) => db.load_instruments().await?,
248 None => AHashMap::new(),
249 };
250
251 log::info!("Cached {} instruments from database", self.general.len());
252 Ok(())
253 }
254
255 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
261 self.synthetics = match &mut self.database {
262 Some(db) => db.load_synthetics().await?,
263 None => AHashMap::new(),
264 };
265
266 log::info!(
267 "Cached {} synthetic instruments from database",
268 self.general.len()
269 );
270 Ok(())
271 }
272
273 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
279 self.accounts = match &mut self.database {
280 Some(db) => db.load_accounts().await?,
281 None => AHashMap::new(),
282 };
283
284 log::info!(
285 "Cached {} synthetic instruments from database",
286 self.general.len()
287 );
288 Ok(())
289 }
290
291 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
297 self.orders = match &mut self.database {
298 Some(db) => db.load_orders().await?,
299 None => AHashMap::new(),
300 };
301
302 log::info!("Cached {} orders from database", self.general.len());
303 Ok(())
304 }
305
306 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
312 self.positions = match &mut self.database {
313 Some(db) => db.load_positions().await?,
314 None => AHashMap::new(),
315 };
316
317 log::info!("Cached {} positions from database", self.general.len());
318 Ok(())
319 }
320
321 pub fn build_index(&mut self) {
323 log::debug!("Building index");
324
325 for account_id in self.accounts.keys() {
327 self.index
328 .venue_account
329 .insert(account_id.get_issuer(), *account_id);
330 }
331
332 for (client_order_id, order) in &self.orders {
334 let instrument_id = order.instrument_id();
335 let venue = instrument_id.venue;
336 let strategy_id = order.strategy_id();
337
338 self.index
340 .venue_orders
341 .entry(venue)
342 .or_default()
343 .insert(*client_order_id);
344
345 if let Some(venue_order_id) = order.venue_order_id() {
347 self.index
348 .venue_order_ids
349 .insert(venue_order_id, *client_order_id);
350 }
351
352 if let Some(position_id) = order.position_id() {
354 self.index
355 .order_position
356 .insert(*client_order_id, position_id);
357 }
358
359 self.index
361 .order_strategy
362 .insert(*client_order_id, order.strategy_id());
363
364 self.index
366 .instrument_orders
367 .entry(instrument_id)
368 .or_default()
369 .insert(*client_order_id);
370
371 self.index
373 .strategy_orders
374 .entry(strategy_id)
375 .or_default()
376 .insert(*client_order_id);
377
378 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
380 self.index
381 .exec_algorithm_orders
382 .entry(exec_algorithm_id)
383 .or_default()
384 .insert(*client_order_id);
385 }
386
387 if let Some(exec_spawn_id) = order.exec_spawn_id() {
389 self.index
390 .exec_spawn_orders
391 .entry(exec_spawn_id)
392 .or_default()
393 .insert(*client_order_id);
394 }
395
396 self.index.orders.insert(*client_order_id);
398
399 if order.is_open() {
401 self.index.orders_open.insert(*client_order_id);
402 }
403
404 if order.is_closed() {
406 self.index.orders_closed.insert(*client_order_id);
407 }
408
409 if let Some(emulation_trigger) = order.emulation_trigger()
411 && emulation_trigger != TriggerType::NoTrigger
412 && !order.is_closed()
413 {
414 self.index.orders_emulated.insert(*client_order_id);
415 }
416
417 if order.is_inflight() {
419 self.index.orders_inflight.insert(*client_order_id);
420 }
421
422 self.index.strategies.insert(strategy_id);
424
425 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
427 self.index.exec_algorithms.insert(exec_algorithm_id);
428 }
429 }
430
431 for (position_id, position) in &self.positions {
433 let instrument_id = position.instrument_id;
434 let venue = instrument_id.venue;
435 let strategy_id = position.strategy_id;
436
437 self.index
439 .venue_positions
440 .entry(venue)
441 .or_default()
442 .insert(*position_id);
443
444 self.index
446 .position_strategy
447 .insert(*position_id, position.strategy_id);
448
449 self.index
451 .position_orders
452 .entry(*position_id)
453 .or_default()
454 .extend(position.client_order_ids().into_iter());
455
456 self.index
458 .instrument_positions
459 .entry(instrument_id)
460 .or_default()
461 .insert(*position_id);
462
463 self.index
465 .strategy_positions
466 .entry(strategy_id)
467 .or_default()
468 .insert(*position_id);
469
470 self.index.positions.insert(*position_id);
472
473 if position.is_open() {
475 self.index.positions_open.insert(*position_id);
476 }
477
478 if position.is_closed() {
480 self.index.positions_closed.insert(*position_id);
481 }
482
483 self.index.strategies.insert(strategy_id);
485 }
486 }
487
488 #[must_use]
490 pub const fn has_backing(&self) -> bool {
491 self.config.database.is_some()
492 }
493
494 #[must_use]
496 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
497 let quote = if let Some(quote) = self.quote(&position.instrument_id) {
498 quote
499 } else {
500 log::warn!(
501 "Cannot calculate unrealized PnL for {}, no quotes for {}",
502 position.id,
503 position.instrument_id
504 );
505 return None;
506 };
507
508 let last = match position.side {
509 PositionSide::Flat | PositionSide::NoPositionSide => {
510 return Some(Money::new(0.0, position.settlement_currency));
511 }
512 PositionSide::Long => quote.ask_price,
513 PositionSide::Short => quote.bid_price,
514 };
515
516 Some(position.unrealized_pnl(last))
517 }
518
519 #[must_use]
528 pub fn check_integrity(&mut self) -> bool {
529 let mut error_count = 0;
530 let failure = "Integrity failure";
531
532 let timestamp_us = SystemTime::now()
534 .duration_since(UNIX_EPOCH)
535 .expect("Time went backwards")
536 .as_micros();
537
538 log::info!("Checking data integrity");
539
540 for account_id in self.accounts.keys() {
542 if !self
543 .index
544 .venue_account
545 .contains_key(&account_id.get_issuer())
546 {
547 log::error!(
548 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
549 );
550 error_count += 1;
551 }
552 }
553
554 for (client_order_id, order) in &self.orders {
555 if !self.index.order_strategy.contains_key(client_order_id) {
556 log::error!(
557 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
558 );
559 error_count += 1;
560 }
561 if !self.index.orders.contains(client_order_id) {
562 log::error!(
563 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
564 );
565 error_count += 1;
566 }
567 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
568 log::error!(
569 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
570 );
571 error_count += 1;
572 }
573 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
574 log::error!(
575 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
576 );
577 error_count += 1;
578 }
579 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
580 log::error!(
581 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
582 );
583 error_count += 1;
584 }
585 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
586 if !self
587 .index
588 .exec_algorithm_orders
589 .contains_key(&exec_algorithm_id)
590 {
591 log::error!(
592 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
593 );
594 error_count += 1;
595 }
596 if order.exec_spawn_id().is_none()
597 && !self.index.exec_spawn_orders.contains_key(client_order_id)
598 {
599 log::error!(
600 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
601 );
602 error_count += 1;
603 }
604 }
605 }
606
607 for (position_id, position) in &self.positions {
608 if !self.index.position_strategy.contains_key(position_id) {
609 log::error!(
610 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
611 );
612 error_count += 1;
613 }
614 if !self.index.position_orders.contains_key(position_id) {
615 log::error!(
616 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
617 );
618 error_count += 1;
619 }
620 if !self.index.positions.contains(position_id) {
621 log::error!(
622 "{failure} in positions: {position_id} not found in `self.index.positions`",
623 );
624 error_count += 1;
625 }
626 if position.is_open() && !self.index.positions_open.contains(position_id) {
627 log::error!(
628 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
629 );
630 error_count += 1;
631 }
632 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
633 log::error!(
634 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
635 );
636 error_count += 1;
637 }
638 }
639
640 for account_id in self.index.venue_account.values() {
642 if !self.accounts.contains_key(account_id) {
643 log::error!(
644 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
645 );
646 error_count += 1;
647 }
648 }
649
650 for client_order_id in self.index.venue_order_ids.values() {
651 if !self.orders.contains_key(client_order_id) {
652 log::error!(
653 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
654 );
655 error_count += 1;
656 }
657 }
658
659 for client_order_id in self.index.client_order_ids.keys() {
660 if !self.orders.contains_key(client_order_id) {
661 log::error!(
662 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
663 );
664 error_count += 1;
665 }
666 }
667
668 for client_order_id in self.index.order_position.keys() {
669 if !self.orders.contains_key(client_order_id) {
670 log::error!(
671 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
672 );
673 error_count += 1;
674 }
675 }
676
677 for client_order_id in self.index.order_strategy.keys() {
679 if !self.orders.contains_key(client_order_id) {
680 log::error!(
681 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
682 );
683 error_count += 1;
684 }
685 }
686
687 for position_id in self.index.position_strategy.keys() {
688 if !self.positions.contains_key(position_id) {
689 log::error!(
690 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
691 );
692 error_count += 1;
693 }
694 }
695
696 for position_id in self.index.position_orders.keys() {
697 if !self.positions.contains_key(position_id) {
698 log::error!(
699 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
700 );
701 error_count += 1;
702 }
703 }
704
705 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
706 for client_order_id in client_order_ids {
707 if !self.orders.contains_key(client_order_id) {
708 log::error!(
709 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
710 );
711 error_count += 1;
712 }
713 }
714 }
715
716 for instrument_id in self.index.instrument_positions.keys() {
717 if !self.index.instrument_orders.contains_key(instrument_id) {
718 log::error!(
719 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
720 );
721 error_count += 1;
722 }
723 }
724
725 for client_order_ids in self.index.strategy_orders.values() {
726 for client_order_id in client_order_ids {
727 if !self.orders.contains_key(client_order_id) {
728 log::error!(
729 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
730 );
731 error_count += 1;
732 }
733 }
734 }
735
736 for position_ids in self.index.strategy_positions.values() {
737 for position_id in position_ids {
738 if !self.positions.contains_key(position_id) {
739 log::error!(
740 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
741 );
742 error_count += 1;
743 }
744 }
745 }
746
747 for client_order_id in &self.index.orders {
748 if !self.orders.contains_key(client_order_id) {
749 log::error!(
750 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
751 );
752 error_count += 1;
753 }
754 }
755
756 for client_order_id in &self.index.orders_emulated {
757 if !self.orders.contains_key(client_order_id) {
758 log::error!(
759 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
760 );
761 error_count += 1;
762 }
763 }
764
765 for client_order_id in &self.index.orders_inflight {
766 if !self.orders.contains_key(client_order_id) {
767 log::error!(
768 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
769 );
770 error_count += 1;
771 }
772 }
773
774 for client_order_id in &self.index.orders_open {
775 if !self.orders.contains_key(client_order_id) {
776 log::error!(
777 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
778 );
779 error_count += 1;
780 }
781 }
782
783 for client_order_id in &self.index.orders_closed {
784 if !self.orders.contains_key(client_order_id) {
785 log::error!(
786 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
787 );
788 error_count += 1;
789 }
790 }
791
792 for position_id in &self.index.positions {
793 if !self.positions.contains_key(position_id) {
794 log::error!(
795 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
796 );
797 error_count += 1;
798 }
799 }
800
801 for position_id in &self.index.positions_open {
802 if !self.positions.contains_key(position_id) {
803 log::error!(
804 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
805 );
806 error_count += 1;
807 }
808 }
809
810 for position_id in &self.index.positions_closed {
811 if !self.positions.contains_key(position_id) {
812 log::error!(
813 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
814 );
815 error_count += 1;
816 }
817 }
818
819 for strategy_id in &self.index.strategies {
820 if !self.index.strategy_orders.contains_key(strategy_id) {
821 log::error!(
822 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
823 );
824 error_count += 1;
825 }
826 }
827
828 for exec_algorithm_id in &self.index.exec_algorithms {
829 if !self
830 .index
831 .exec_algorithm_orders
832 .contains_key(exec_algorithm_id)
833 {
834 log::error!(
835 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
836 );
837 error_count += 1;
838 }
839 }
840
841 let total_us = SystemTime::now()
842 .duration_since(UNIX_EPOCH)
843 .expect("Time went backwards")
844 .as_micros()
845 - timestamp_us;
846
847 if error_count == 0 {
848 log::info!("Integrity check passed in {total_us}μs");
849 true
850 } else {
851 log::error!(
852 "Integrity check failed with {error_count} error{} in {total_us}μs",
853 if error_count == 1 { "" } else { "s" },
854 );
855 false
856 }
857 }
858
859 #[must_use]
863 pub fn check_residuals(&self) -> bool {
864 log::debug!("Checking residuals");
865
866 let mut residuals = false;
867
868 for order in self.orders_open(None, None, None, None) {
870 residuals = true;
871 log::warn!("Residual {order:?}");
872 }
873
874 for position in self.positions_open(None, None, None, None) {
876 residuals = true;
877 log::warn!("Residual {position}");
878 }
879
880 residuals
881 }
882
883 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
889 log::debug!(
890 "Purging closed orders{}",
891 if buffer_secs > 0 {
892 format!(" with buffer_secs={buffer_secs}")
893 } else {
894 String::new()
895 }
896 );
897
898 let buffer_ns = secs_to_nanos(buffer_secs as f64);
899
900 'outer: for client_order_id in self.index.orders_closed.clone() {
901 if let Some(order) = self.orders.get(&client_order_id)
902 && order.is_closed()
903 && let Some(ts_closed) = order.ts_closed()
904 && ts_closed + buffer_ns <= ts_now
905 {
906 if let Some(linked_order_ids) = order.linked_order_ids() {
908 for linked_order_id in linked_order_ids {
909 if let Some(linked_order) = self.orders.get(linked_order_id)
910 && linked_order.is_open()
911 {
912 continue 'outer;
914 }
915 }
916 }
917
918 self.purge_order(client_order_id);
919 }
920 }
921 }
922
923 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
925 log::debug!(
926 "Purging closed positions{}",
927 if buffer_secs > 0 {
928 format!(" with buffer_secs={buffer_secs}")
929 } else {
930 String::new()
931 }
932 );
933
934 let buffer_ns = secs_to_nanos(buffer_secs as f64);
935
936 for position_id in self.index.positions_closed.clone() {
937 if let Some(position) = self.positions.get(&position_id)
938 && position.is_closed()
939 && let Some(ts_closed) = position.ts_closed
940 && ts_closed + buffer_ns <= ts_now
941 {
942 self.purge_position(position_id);
943 }
944 }
945 }
946
947 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
951 let order = self.orders.get(&client_order_id).cloned();
953
954 if let Some(ref ord) = order
956 && ord.is_open()
957 {
958 log::warn!("Order {client_order_id} found open when purging, skipping purge");
959 return;
960 }
961
962 if let Some(ref ord) = order {
964 self.orders.remove(&client_order_id);
966
967 if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
969 {
970 venue_orders.remove(&client_order_id);
971 }
972
973 if let Some(venue_order_id) = ord.venue_order_id() {
975 self.index.venue_order_ids.remove(&venue_order_id);
976 }
977
978 if let Some(instrument_orders) =
980 self.index.instrument_orders.get_mut(&ord.instrument_id())
981 {
982 instrument_orders.remove(&client_order_id);
983 }
984
985 if let Some(position_id) = ord.position_id()
987 && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
988 {
989 position_orders.remove(&client_order_id);
990 }
991
992 if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
994 && let Some(exec_algorithm_orders) =
995 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
996 {
997 exec_algorithm_orders.remove(&client_order_id);
998 }
999
1000 if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1002 strategy_orders.remove(&client_order_id);
1003 if strategy_orders.is_empty() {
1004 self.index.strategy_orders.remove(&ord.strategy_id());
1005 }
1006 }
1007
1008 if let Some(exec_spawn_id) = ord.exec_spawn_id()
1010 && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1011 {
1012 spawn_orders.remove(&client_order_id);
1013 if spawn_orders.is_empty() {
1014 self.index.exec_spawn_orders.remove(&exec_spawn_id);
1015 }
1016 }
1017
1018 log::info!("Purged order {client_order_id}");
1019 } else {
1020 log::warn!("Order {client_order_id} not found when purging");
1021 }
1022
1023 self.index.order_position.remove(&client_order_id);
1025 let strategy_id = self.index.order_strategy.remove(&client_order_id);
1026 self.index.order_client.remove(&client_order_id);
1027 self.index.client_order_ids.remove(&client_order_id);
1028
1029 if let Some(strategy_id) = strategy_id
1031 && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1032 {
1033 strategy_orders.remove(&client_order_id);
1034 if strategy_orders.is_empty() {
1035 self.index.strategy_orders.remove(&strategy_id);
1036 }
1037 }
1038
1039 self.index.exec_spawn_orders.remove(&client_order_id);
1041
1042 self.index.orders.remove(&client_order_id);
1043 self.index.orders_closed.remove(&client_order_id);
1044 self.index.orders_emulated.remove(&client_order_id);
1045 self.index.orders_inflight.remove(&client_order_id);
1046 self.index.orders_pending_cancel.remove(&client_order_id);
1047 }
1048
1049 pub fn purge_position(&mut self, position_id: PositionId) {
1053 let position = self.positions.get(&position_id).cloned();
1055
1056 if let Some(ref pos) = position
1058 && pos.is_open()
1059 {
1060 log::warn!("Position {position_id} found open when purging, skipping purge");
1061 return;
1062 }
1063
1064 if let Some(ref pos) = position {
1066 self.positions.remove(&position_id);
1067
1068 if let Some(venue_positions) =
1070 self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1071 {
1072 venue_positions.remove(&position_id);
1073 }
1074
1075 if let Some(instrument_positions) =
1077 self.index.instrument_positions.get_mut(&pos.instrument_id)
1078 {
1079 instrument_positions.remove(&position_id);
1080 }
1081
1082 if let Some(strategy_positions) =
1084 self.index.strategy_positions.get_mut(&pos.strategy_id)
1085 {
1086 strategy_positions.remove(&position_id);
1087 }
1088
1089 for client_order_id in pos.client_order_ids() {
1091 self.index.order_position.remove(&client_order_id);
1092 }
1093
1094 log::info!("Purged position {position_id}");
1095 } else {
1096 log::warn!("Position {position_id} not found when purging");
1097 }
1098
1099 self.index.position_strategy.remove(&position_id);
1101 self.index.position_orders.remove(&position_id);
1102 self.index.positions.remove(&position_id);
1103 self.index.positions_open.remove(&position_id);
1104 self.index.positions_closed.remove(&position_id);
1105
1106 self.position_snapshots.remove(&position_id);
1108 }
1109
1110 pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1115 log::debug!(
1116 "Purging account events{}",
1117 if lookback_secs > 0 {
1118 format!(" with lookback_secs={lookback_secs}")
1119 } else {
1120 String::new()
1121 }
1122 );
1123
1124 for account in self.accounts.values_mut() {
1125 let event_count = account.event_count();
1126 account.purge_account_events(ts_now, lookback_secs);
1127 let count_diff = event_count - account.event_count();
1128 if count_diff > 0 {
1129 log::info!(
1130 "Purged {} event(s) from account {}",
1131 count_diff,
1132 account.id()
1133 );
1134 }
1135 }
1136 }
1137
1138 pub fn clear_index(&mut self) {
1140 self.index.clear();
1141 log::debug!("Cleared index");
1142 }
1143
1144 pub fn reset(&mut self) {
1148 log::debug!("Resetting cache");
1149
1150 self.general.clear();
1151 self.currencies.clear();
1152 self.instruments.clear();
1153 self.synthetics.clear();
1154 self.books.clear();
1155 self.own_books.clear();
1156 self.quotes.clear();
1157 self.trades.clear();
1158 self.mark_xrates.clear();
1159 self.mark_prices.clear();
1160 self.index_prices.clear();
1161 self.bars.clear();
1162 self.accounts.clear();
1163 self.orders.clear();
1164 self.order_lists.clear();
1165 self.positions.clear();
1166 self.position_snapshots.clear();
1167 self.greeks.clear();
1168 self.yield_curves.clear();
1169
1170 #[cfg(feature = "defi")]
1171 {
1172 self.defi.pools.clear();
1173 self.defi.pool_profilers.clear();
1174 }
1175
1176 self.clear_index();
1177
1178 log::info!("Reset cache");
1179 }
1180
1181 pub fn dispose(&mut self) {
1187 if let Some(database) = &mut self.database {
1188 database.close().expect("Failed to close database");
1189 }
1190 }
1191
1192 pub fn flush_db(&mut self) {
1198 if let Some(database) = &mut self.database {
1199 database.flush().expect("Failed to flush database");
1200 }
1201 }
1202
1203 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1211 check_valid_string_ascii(key, stringify!(key))?;
1212 check_predicate_false(value.is_empty(), stringify!(value))?;
1213
1214 log::debug!("Adding general {key}");
1215 self.general.insert(key.to_string(), value.clone());
1216
1217 if let Some(database) = &mut self.database {
1218 database.add(key.to_string(), value)?;
1219 }
1220 Ok(())
1221 }
1222
1223 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1229 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1230
1231 if self.config.save_market_data
1232 && let Some(database) = &mut self.database
1233 {
1234 database.add_order_book(&book)?;
1235 }
1236
1237 self.books.insert(book.instrument_id, book);
1238 Ok(())
1239 }
1240
1241 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1247 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1248
1249 self.own_books.insert(own_book.instrument_id, own_book);
1250 Ok(())
1251 }
1252
1253 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1259 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1260
1261 if self.config.save_market_data {
1262 }
1264
1265 let mark_prices_deque = self
1266 .mark_prices
1267 .entry(mark_price.instrument_id)
1268 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1269 mark_prices_deque.push_front(mark_price);
1270 Ok(())
1271 }
1272
1273 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1279 log::debug!(
1280 "Adding `IndexPriceUpdate` for {}",
1281 index_price.instrument_id
1282 );
1283
1284 if self.config.save_market_data {
1285 }
1287
1288 let index_prices_deque = self
1289 .index_prices
1290 .entry(index_price.instrument_id)
1291 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1292 index_prices_deque.push_front(index_price);
1293 Ok(())
1294 }
1295
1296 pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1302 log::debug!(
1303 "Adding `FundingRateUpdate` for {}",
1304 funding_rate.instrument_id
1305 );
1306
1307 if self.config.save_market_data {
1308 }
1310
1311 self.funding_rates
1312 .insert(funding_rate.instrument_id, funding_rate);
1313 Ok(())
1314 }
1315
1316 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1322 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1323
1324 if self.config.save_market_data
1325 && let Some(database) = &mut self.database
1326 {
1327 database.add_quote("e)?;
1328 }
1329
1330 let quotes_deque = self
1331 .quotes
1332 .entry(quote.instrument_id)
1333 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1334 quotes_deque.push_front(quote);
1335 Ok(())
1336 }
1337
1338 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1344 check_slice_not_empty(quotes, stringify!(quotes))?;
1345
1346 let instrument_id = quotes[0].instrument_id;
1347 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1348
1349 if self.config.save_market_data
1350 && let Some(database) = &mut self.database
1351 {
1352 for quote in quotes {
1353 database.add_quote(quote)?;
1354 }
1355 }
1356
1357 let quotes_deque = self
1358 .quotes
1359 .entry(instrument_id)
1360 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1361
1362 for quote in quotes {
1363 quotes_deque.push_front(*quote);
1364 }
1365 Ok(())
1366 }
1367
1368 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1374 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1375
1376 if self.config.save_market_data
1377 && let Some(database) = &mut self.database
1378 {
1379 database.add_trade(&trade)?;
1380 }
1381
1382 let trades_deque = self
1383 .trades
1384 .entry(trade.instrument_id)
1385 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1386 trades_deque.push_front(trade);
1387 Ok(())
1388 }
1389
1390 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1396 check_slice_not_empty(trades, stringify!(trades))?;
1397
1398 let instrument_id = trades[0].instrument_id;
1399 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1400
1401 if self.config.save_market_data
1402 && let Some(database) = &mut self.database
1403 {
1404 for trade in trades {
1405 database.add_trade(trade)?;
1406 }
1407 }
1408
1409 let trades_deque = self
1410 .trades
1411 .entry(instrument_id)
1412 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1413
1414 for trade in trades {
1415 trades_deque.push_front(*trade);
1416 }
1417 Ok(())
1418 }
1419
1420 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1426 log::debug!("Adding `Bar` {}", bar.bar_type);
1427
1428 if self.config.save_market_data
1429 && let Some(database) = &mut self.database
1430 {
1431 database.add_bar(&bar)?;
1432 }
1433
1434 let bars = self
1435 .bars
1436 .entry(bar.bar_type)
1437 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1438 bars.push_front(bar);
1439 Ok(())
1440 }
1441
1442 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1448 check_slice_not_empty(bars, stringify!(bars))?;
1449
1450 let bar_type = bars[0].bar_type;
1451 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1452
1453 if self.config.save_market_data
1454 && let Some(database) = &mut self.database
1455 {
1456 for bar in bars {
1457 database.add_bar(bar)?;
1458 }
1459 }
1460
1461 let bars_deque = self
1462 .bars
1463 .entry(bar_type)
1464 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1465
1466 for bar in bars {
1467 bars_deque.push_front(*bar);
1468 }
1469 Ok(())
1470 }
1471
1472 pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1478 log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1479
1480 if self.config.save_market_data
1481 && let Some(_database) = &mut self.database
1482 {
1483 }
1485
1486 self.greeks.insert(greeks.instrument_id, greeks);
1487 Ok(())
1488 }
1489
1490 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1492 self.greeks.get(instrument_id).cloned()
1493 }
1494
1495 pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1501 log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1502
1503 if self.config.save_market_data
1504 && let Some(_database) = &mut self.database
1505 {
1506 }
1508
1509 self.yield_curves
1510 .insert(yield_curve.curve_name.clone(), yield_curve);
1511 Ok(())
1512 }
1513
1514 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1516 self.yield_curves.get(key).map(|curve| {
1517 let curve_clone = curve.clone();
1518 Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1519 as Box<dyn Fn(f64) -> f64>
1520 })
1521 }
1522
1523 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1529 log::debug!("Adding `Currency` {}", currency.code);
1530
1531 if let Some(database) = &mut self.database {
1532 database.add_currency(¤cy)?;
1533 }
1534
1535 self.currencies.insert(currency.code, currency);
1536 Ok(())
1537 }
1538
1539 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1545 log::debug!("Adding `Instrument` {}", instrument.id());
1546
1547 if let Some(database) = &mut self.database {
1548 database.add_instrument(&instrument)?;
1549 }
1550
1551 self.instruments.insert(instrument.id(), instrument);
1552 Ok(())
1553 }
1554
1555 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1561 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1562
1563 if let Some(database) = &mut self.database {
1564 database.add_synthetic(&synthetic)?;
1565 }
1566
1567 self.synthetics.insert(synthetic.id, synthetic);
1568 Ok(())
1569 }
1570
1571 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1577 log::debug!("Adding `Account` {}", account.id());
1578
1579 if let Some(database) = &mut self.database {
1580 database.add_account(&account)?;
1581 }
1582
1583 let account_id = account.id();
1584 self.accounts.insert(account_id, account);
1585 self.index
1586 .venue_account
1587 .insert(account_id.get_issuer(), account_id);
1588 Ok(())
1589 }
1590
1591 pub fn add_venue_order_id(
1599 &mut self,
1600 client_order_id: &ClientOrderId,
1601 venue_order_id: &VenueOrderId,
1602 overwrite: bool,
1603 ) -> anyhow::Result<()> {
1604 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1605 && !overwrite
1606 && existing_venue_order_id != venue_order_id
1607 {
1608 anyhow::bail!(
1609 "Existing {existing_venue_order_id} for {client_order_id}
1610 did not match the given {venue_order_id}.
1611 If you are writing a test then try a different `venue_order_id`,
1612 otherwise this is probably a bug."
1613 );
1614 }
1615
1616 self.index
1617 .client_order_ids
1618 .insert(*client_order_id, *venue_order_id);
1619 self.index
1620 .venue_order_ids
1621 .insert(*venue_order_id, *client_order_id);
1622
1623 Ok(())
1624 }
1625
1626 pub fn add_order(
1638 &mut self,
1639 order: OrderAny,
1640 position_id: Option<PositionId>,
1641 client_id: Option<ClientId>,
1642 replace_existing: bool,
1643 ) -> anyhow::Result<()> {
1644 let instrument_id = order.instrument_id();
1645 let venue = instrument_id.venue;
1646 let client_order_id = order.client_order_id();
1647 let strategy_id = order.strategy_id();
1648 let exec_algorithm_id = order.exec_algorithm_id();
1649 let exec_spawn_id = order.exec_spawn_id();
1650
1651 if !replace_existing {
1652 check_key_not_in_map(
1653 &client_order_id,
1654 &self.orders,
1655 stringify!(client_order_id),
1656 stringify!(orders),
1657 )?;
1658 }
1659
1660 log::debug!("Adding {order:?}");
1661
1662 self.index.orders.insert(client_order_id);
1663 self.index
1664 .order_strategy
1665 .insert(client_order_id, strategy_id);
1666 self.index.strategies.insert(strategy_id);
1667
1668 self.index
1670 .venue_orders
1671 .entry(venue)
1672 .or_default()
1673 .insert(client_order_id);
1674
1675 self.index
1677 .instrument_orders
1678 .entry(instrument_id)
1679 .or_default()
1680 .insert(client_order_id);
1681
1682 self.index
1684 .strategy_orders
1685 .entry(strategy_id)
1686 .or_default()
1687 .insert(client_order_id);
1688
1689 if let Some(exec_algorithm_id) = exec_algorithm_id {
1691 self.index.exec_algorithms.insert(exec_algorithm_id);
1692
1693 self.index
1694 .exec_algorithm_orders
1695 .entry(exec_algorithm_id)
1696 .or_default()
1697 .insert(client_order_id);
1698 }
1699
1700 if let Some(exec_spawn_id) = exec_spawn_id {
1702 self.index
1703 .exec_spawn_orders
1704 .entry(exec_spawn_id)
1705 .or_default()
1706 .insert(client_order_id);
1707 }
1708
1709 match order.emulation_trigger() {
1711 Some(_) => {
1712 self.index.orders_emulated.remove(&client_order_id);
1713 }
1714 None => {
1715 self.index.orders_emulated.insert(client_order_id);
1716 }
1717 }
1718
1719 if let Some(position_id) = position_id {
1721 self.add_position_id(
1722 &position_id,
1723 &order.instrument_id().venue,
1724 &client_order_id,
1725 &strategy_id,
1726 )?;
1727 }
1728
1729 if let Some(client_id) = client_id {
1731 self.index.order_client.insert(client_order_id, client_id);
1732 log::debug!("Indexed {client_id:?}");
1733 }
1734
1735 if let Some(database) = &mut self.database {
1736 database.add_order(&order, client_id)?;
1737 }
1742
1743 self.orders.insert(client_order_id, order);
1744
1745 Ok(())
1746 }
1747
1748 pub fn add_position_id(
1754 &mut self,
1755 position_id: &PositionId,
1756 venue: &Venue,
1757 client_order_id: &ClientOrderId,
1758 strategy_id: &StrategyId,
1759 ) -> anyhow::Result<()> {
1760 self.index
1761 .order_position
1762 .insert(*client_order_id, *position_id);
1763
1764 if let Some(database) = &mut self.database {
1766 database.index_order_position(*client_order_id, *position_id)?;
1767 }
1768
1769 self.index
1771 .position_strategy
1772 .insert(*position_id, *strategy_id);
1773
1774 self.index
1776 .position_orders
1777 .entry(*position_id)
1778 .or_default()
1779 .insert(*client_order_id);
1780
1781 self.index
1783 .strategy_positions
1784 .entry(*strategy_id)
1785 .or_default()
1786 .insert(*position_id);
1787
1788 self.index
1790 .venue_positions
1791 .entry(*venue)
1792 .or_default()
1793 .insert(*position_id);
1794
1795 Ok(())
1796 }
1797
1798 pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1804 self.positions.insert(position.id, position.clone());
1805 self.index.positions.insert(position.id);
1806 self.index.positions_open.insert(position.id);
1807
1808 log::debug!("Adding {position}");
1809
1810 self.add_position_id(
1811 &position.id,
1812 &position.instrument_id.venue,
1813 &position.opening_order_id,
1814 &position.strategy_id,
1815 )?;
1816
1817 let venue = position.instrument_id.venue;
1818 let venue_positions = self.index.venue_positions.entry(venue).or_default();
1819 venue_positions.insert(position.id);
1820
1821 let instrument_id = position.instrument_id;
1823 let instrument_positions = self
1824 .index
1825 .instrument_positions
1826 .entry(instrument_id)
1827 .or_default();
1828 instrument_positions.insert(position.id);
1829
1830 if let Some(database) = &mut self.database {
1831 database.add_position(&position)?;
1832 }
1841
1842 Ok(())
1843 }
1844
1845 pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1851 if let Some(database) = &mut self.database {
1852 database.update_account(&account)?;
1853 }
1854 Ok(())
1855 }
1856
1857 pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1863 let client_order_id = order.client_order_id();
1864
1865 if let Some(venue_order_id) = order.venue_order_id() {
1867 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1870 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1872 }
1873 }
1874
1875 if order.is_inflight() {
1877 self.index.orders_inflight.insert(client_order_id);
1878 } else {
1879 self.index.orders_inflight.remove(&client_order_id);
1880 }
1881
1882 if order.is_open() {
1884 self.index.orders_closed.remove(&client_order_id);
1885 self.index.orders_open.insert(client_order_id);
1886 } else if order.is_closed() {
1887 self.index.orders_open.remove(&client_order_id);
1888 self.index.orders_pending_cancel.remove(&client_order_id);
1889 self.index.orders_closed.insert(client_order_id);
1890 }
1891
1892 if let Some(emulation_trigger) = order.emulation_trigger() {
1894 match emulation_trigger {
1895 TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1896 _ => self.index.orders_emulated.insert(client_order_id),
1897 };
1898 }
1899
1900 if self.own_order_book(&order.instrument_id()).is_some()
1902 && should_handle_own_book_order(order)
1903 {
1904 self.update_own_order_book(order);
1905 }
1906
1907 if let Some(database) = &mut self.database {
1908 database.update_order(order.last_event())?;
1909 }
1914
1915 self.orders.insert(client_order_id, order.clone());
1917
1918 Ok(())
1919 }
1920
1921 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1923 self.index
1924 .orders_pending_cancel
1925 .insert(order.client_order_id());
1926 }
1927
1928 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1934 if position.is_open() {
1937 self.index.positions_open.insert(position.id);
1938 self.index.positions_closed.remove(&position.id);
1939 } else {
1940 self.index.positions_closed.insert(position.id);
1941 self.index.positions_open.remove(&position.id);
1942 }
1943
1944 if let Some(database) = &mut self.database {
1945 database.update_position(position)?;
1946 }
1951
1952 self.positions.insert(position.id, position.clone());
1953
1954 Ok(())
1955 }
1956
1957 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1964 let position_id = position.id;
1965
1966 let mut copied_position = position.clone();
1967 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1968 copied_position.id = PositionId::new(new_id);
1969
1970 let position_serialized = serde_json::to_vec(&copied_position)?;
1972
1973 let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1974 let new_snapshots = match snapshots {
1975 Some(existing_snapshots) => {
1976 let mut combined = existing_snapshots.to_vec();
1977 combined.extend(position_serialized);
1978 Bytes::from(combined)
1979 }
1980 None => Bytes::from(position_serialized),
1981 };
1982 self.position_snapshots.insert(position_id, new_snapshots);
1983
1984 log::debug!("Snapshot {copied_position}");
1985 Ok(())
1986 }
1987
1988 pub fn snapshot_position_state(
1994 &mut self,
1995 position: &Position,
1996 open_only: Option<bool>,
1999 ) -> anyhow::Result<()> {
2000 let open_only = open_only.unwrap_or(true);
2001
2002 if open_only && !position.is_open() {
2003 return Ok(());
2004 }
2005
2006 if let Some(database) = &mut self.database {
2007 database.snapshot_position_state(position).map_err(|e| {
2008 log::error!(
2009 "Failed to snapshot position state for {}: {e:?}",
2010 position.id
2011 );
2012 e
2013 })?;
2014 } else {
2015 log::warn!(
2016 "Cannot snapshot position state for {} (no database configured)",
2017 position.id
2018 );
2019 }
2020
2021 todo!()
2023 }
2024
2025 #[must_use]
2027 pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2028 if self.index.position_strategy.contains_key(position_id) {
2030 Some(OmsType::Netting)
2033 } else {
2034 None
2035 }
2036 }
2037
2038 #[must_use]
2040 pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
2041 self.position_snapshots.get(position_id).map(|b| b.to_vec())
2042 }
2043
2044 #[must_use]
2046 pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> HashSet<PositionId> {
2047 let mut result = HashSet::new();
2049 for (position_id, _) in &self.position_snapshots {
2050 if let Some(position) = self.positions.get(position_id)
2052 && position.instrument_id == *instrument_id
2053 {
2054 result.insert(*position_id);
2055 }
2056 }
2057 result
2058 }
2059
2060 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2066 let database = if let Some(database) = &self.database {
2067 database
2068 } else {
2069 log::warn!(
2070 "Cannot snapshot order state for {} (no database configured)",
2071 order.client_order_id()
2072 );
2073 return Ok(());
2074 };
2075
2076 database.snapshot_order_state(order)
2077 }
2078
2079 fn build_order_query_filter_set(
2082 &self,
2083 venue: Option<&Venue>,
2084 instrument_id: Option<&InstrumentId>,
2085 strategy_id: Option<&StrategyId>,
2086 ) -> Option<AHashSet<ClientOrderId>> {
2087 let mut query: Option<AHashSet<ClientOrderId>> = None;
2088
2089 if let Some(venue) = venue {
2090 query = Some(
2091 self.index
2092 .venue_orders
2093 .get(venue)
2094 .cloned()
2095 .unwrap_or_default(),
2096 );
2097 }
2098
2099 if let Some(instrument_id) = instrument_id {
2100 let instrument_orders = self
2101 .index
2102 .instrument_orders
2103 .get(instrument_id)
2104 .cloned()
2105 .unwrap_or_default();
2106
2107 if let Some(existing_query) = &mut query {
2108 *existing_query = existing_query
2109 .intersection(&instrument_orders)
2110 .copied()
2111 .collect();
2112 } else {
2113 query = Some(instrument_orders);
2114 }
2115 }
2116
2117 if let Some(strategy_id) = strategy_id {
2118 let strategy_orders = self
2119 .index
2120 .strategy_orders
2121 .get(strategy_id)
2122 .cloned()
2123 .unwrap_or_default();
2124
2125 if let Some(existing_query) = &mut query {
2126 *existing_query = existing_query
2127 .intersection(&strategy_orders)
2128 .copied()
2129 .collect();
2130 } else {
2131 query = Some(strategy_orders);
2132 }
2133 }
2134
2135 query
2136 }
2137
2138 fn build_position_query_filter_set(
2139 &self,
2140 venue: Option<&Venue>,
2141 instrument_id: Option<&InstrumentId>,
2142 strategy_id: Option<&StrategyId>,
2143 ) -> Option<AHashSet<PositionId>> {
2144 let mut query: Option<AHashSet<PositionId>> = None;
2145
2146 if let Some(venue) = venue {
2147 query = Some(
2148 self.index
2149 .venue_positions
2150 .get(venue)
2151 .cloned()
2152 .unwrap_or_default(),
2153 );
2154 }
2155
2156 if let Some(instrument_id) = instrument_id {
2157 let instrument_positions = self
2158 .index
2159 .instrument_positions
2160 .get(instrument_id)
2161 .cloned()
2162 .unwrap_or_default();
2163
2164 if let Some(existing_query) = query {
2165 query = Some(
2166 existing_query
2167 .intersection(&instrument_positions)
2168 .copied()
2169 .collect(),
2170 );
2171 } else {
2172 query = Some(instrument_positions);
2173 }
2174 }
2175
2176 if let Some(strategy_id) = strategy_id {
2177 let strategy_positions = self
2178 .index
2179 .strategy_positions
2180 .get(strategy_id)
2181 .cloned()
2182 .unwrap_or_default();
2183
2184 if let Some(existing_query) = query {
2185 query = Some(
2186 existing_query
2187 .intersection(&strategy_positions)
2188 .copied()
2189 .collect(),
2190 );
2191 } else {
2192 query = Some(strategy_positions);
2193 }
2194 }
2195
2196 query
2197 }
2198
2199 fn get_orders_for_ids(
2205 &self,
2206 client_order_ids: &AHashSet<ClientOrderId>,
2207 side: Option<OrderSide>,
2208 ) -> Vec<&OrderAny> {
2209 let side = side.unwrap_or(OrderSide::NoOrderSide);
2210 let mut orders = Vec::new();
2211
2212 for client_order_id in client_order_ids {
2213 let order = self
2214 .orders
2215 .get(client_order_id)
2216 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2217 if side == OrderSide::NoOrderSide || side == order.order_side() {
2218 orders.push(order);
2219 }
2220 }
2221
2222 orders
2223 }
2224
2225 fn get_positions_for_ids(
2231 &self,
2232 position_ids: &AHashSet<PositionId>,
2233 side: Option<PositionSide>,
2234 ) -> Vec<&Position> {
2235 let side = side.unwrap_or(PositionSide::NoPositionSide);
2236 let mut positions = Vec::new();
2237
2238 for position_id in position_ids {
2239 let position = self
2240 .positions
2241 .get(position_id)
2242 .unwrap_or_else(|| panic!("Position {position_id} not found"));
2243 if side == PositionSide::NoPositionSide || side == position.side {
2244 positions.push(position);
2245 }
2246 }
2247
2248 positions
2249 }
2250
2251 #[must_use]
2253 pub fn client_order_ids(
2254 &self,
2255 venue: Option<&Venue>,
2256 instrument_id: Option<&InstrumentId>,
2257 strategy_id: Option<&StrategyId>,
2258 ) -> AHashSet<ClientOrderId> {
2259 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2260 match query {
2261 Some(query) => self.index.orders.intersection(&query).copied().collect(),
2262 None => self.index.orders.clone(),
2263 }
2264 }
2265
2266 #[must_use]
2268 pub fn client_order_ids_open(
2269 &self,
2270 venue: Option<&Venue>,
2271 instrument_id: Option<&InstrumentId>,
2272 strategy_id: Option<&StrategyId>,
2273 ) -> AHashSet<ClientOrderId> {
2274 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2275 match query {
2276 Some(query) => self
2277 .index
2278 .orders_open
2279 .intersection(&query)
2280 .copied()
2281 .collect(),
2282 None => self.index.orders_open.clone(),
2283 }
2284 }
2285
2286 #[must_use]
2288 pub fn client_order_ids_closed(
2289 &self,
2290 venue: Option<&Venue>,
2291 instrument_id: Option<&InstrumentId>,
2292 strategy_id: Option<&StrategyId>,
2293 ) -> AHashSet<ClientOrderId> {
2294 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2295 match query {
2296 Some(query) => self
2297 .index
2298 .orders_closed
2299 .intersection(&query)
2300 .copied()
2301 .collect(),
2302 None => self.index.orders_closed.clone(),
2303 }
2304 }
2305
2306 #[must_use]
2308 pub fn client_order_ids_emulated(
2309 &self,
2310 venue: Option<&Venue>,
2311 instrument_id: Option<&InstrumentId>,
2312 strategy_id: Option<&StrategyId>,
2313 ) -> AHashSet<ClientOrderId> {
2314 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2315 match query {
2316 Some(query) => self
2317 .index
2318 .orders_emulated
2319 .intersection(&query)
2320 .copied()
2321 .collect(),
2322 None => self.index.orders_emulated.clone(),
2323 }
2324 }
2325
2326 #[must_use]
2328 pub fn client_order_ids_inflight(
2329 &self,
2330 venue: Option<&Venue>,
2331 instrument_id: Option<&InstrumentId>,
2332 strategy_id: Option<&StrategyId>,
2333 ) -> AHashSet<ClientOrderId> {
2334 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2335 match query {
2336 Some(query) => self
2337 .index
2338 .orders_inflight
2339 .intersection(&query)
2340 .copied()
2341 .collect(),
2342 None => self.index.orders_inflight.clone(),
2343 }
2344 }
2345
2346 #[must_use]
2348 pub fn position_ids(
2349 &self,
2350 venue: Option<&Venue>,
2351 instrument_id: Option<&InstrumentId>,
2352 strategy_id: Option<&StrategyId>,
2353 ) -> AHashSet<PositionId> {
2354 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2355 match query {
2356 Some(query) => self.index.positions.intersection(&query).copied().collect(),
2357 None => self.index.positions.clone(),
2358 }
2359 }
2360
2361 #[must_use]
2363 pub fn position_open_ids(
2364 &self,
2365 venue: Option<&Venue>,
2366 instrument_id: Option<&InstrumentId>,
2367 strategy_id: Option<&StrategyId>,
2368 ) -> AHashSet<PositionId> {
2369 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2370 match query {
2371 Some(query) => self
2372 .index
2373 .positions_open
2374 .intersection(&query)
2375 .copied()
2376 .collect(),
2377 None => self.index.positions_open.clone(),
2378 }
2379 }
2380
2381 #[must_use]
2383 pub fn position_closed_ids(
2384 &self,
2385 venue: Option<&Venue>,
2386 instrument_id: Option<&InstrumentId>,
2387 strategy_id: Option<&StrategyId>,
2388 ) -> AHashSet<PositionId> {
2389 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2390 match query {
2391 Some(query) => self
2392 .index
2393 .positions_closed
2394 .intersection(&query)
2395 .copied()
2396 .collect(),
2397 None => self.index.positions_closed.clone(),
2398 }
2399 }
2400
2401 #[must_use]
2403 pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2404 self.index.actors.clone()
2405 }
2406
2407 #[must_use]
2409 pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2410 self.index.strategies.clone()
2411 }
2412
2413 #[must_use]
2415 pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2416 self.index.exec_algorithms.clone()
2417 }
2418
2419 #[must_use]
2423 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2424 self.orders.get(client_order_id)
2425 }
2426
2427 #[must_use]
2429 pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2430 self.orders.get_mut(client_order_id)
2431 }
2432
2433 #[must_use]
2435 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2436 self.index.venue_order_ids.get(venue_order_id)
2437 }
2438
2439 #[must_use]
2441 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2442 self.index.client_order_ids.get(client_order_id)
2443 }
2444
2445 #[must_use]
2447 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2448 self.index.order_client.get(client_order_id)
2449 }
2450
2451 #[must_use]
2453 pub fn orders(
2454 &self,
2455 venue: Option<&Venue>,
2456 instrument_id: Option<&InstrumentId>,
2457 strategy_id: Option<&StrategyId>,
2458 side: Option<OrderSide>,
2459 ) -> Vec<&OrderAny> {
2460 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2461 self.get_orders_for_ids(&client_order_ids, side)
2462 }
2463
2464 #[must_use]
2466 pub fn orders_open(
2467 &self,
2468 venue: Option<&Venue>,
2469 instrument_id: Option<&InstrumentId>,
2470 strategy_id: Option<&StrategyId>,
2471 side: Option<OrderSide>,
2472 ) -> Vec<&OrderAny> {
2473 let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2474 self.get_orders_for_ids(&client_order_ids, side)
2475 }
2476
2477 #[must_use]
2479 pub fn orders_closed(
2480 &self,
2481 venue: Option<&Venue>,
2482 instrument_id: Option<&InstrumentId>,
2483 strategy_id: Option<&StrategyId>,
2484 side: Option<OrderSide>,
2485 ) -> Vec<&OrderAny> {
2486 let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2487 self.get_orders_for_ids(&client_order_ids, side)
2488 }
2489
2490 #[must_use]
2492 pub fn orders_emulated(
2493 &self,
2494 venue: Option<&Venue>,
2495 instrument_id: Option<&InstrumentId>,
2496 strategy_id: Option<&StrategyId>,
2497 side: Option<OrderSide>,
2498 ) -> Vec<&OrderAny> {
2499 let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2500 self.get_orders_for_ids(&client_order_ids, side)
2501 }
2502
2503 #[must_use]
2505 pub fn orders_inflight(
2506 &self,
2507 venue: Option<&Venue>,
2508 instrument_id: Option<&InstrumentId>,
2509 strategy_id: Option<&StrategyId>,
2510 side: Option<OrderSide>,
2511 ) -> Vec<&OrderAny> {
2512 let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2513 self.get_orders_for_ids(&client_order_ids, side)
2514 }
2515
2516 #[must_use]
2518 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2519 let client_order_ids = self.index.position_orders.get(position_id);
2520 match client_order_ids {
2521 Some(client_order_ids) => {
2522 self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2523 }
2524 None => Vec::new(),
2525 }
2526 }
2527
2528 #[must_use]
2530 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2531 self.index.orders.contains(client_order_id)
2532 }
2533
2534 #[must_use]
2536 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2537 self.index.orders_open.contains(client_order_id)
2538 }
2539
2540 #[must_use]
2542 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2543 self.index.orders_closed.contains(client_order_id)
2544 }
2545
2546 #[must_use]
2548 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2549 self.index.orders_emulated.contains(client_order_id)
2550 }
2551
2552 #[must_use]
2554 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2555 self.index.orders_inflight.contains(client_order_id)
2556 }
2557
2558 #[must_use]
2560 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2561 self.index.orders_pending_cancel.contains(client_order_id)
2562 }
2563
2564 #[must_use]
2566 pub fn orders_open_count(
2567 &self,
2568 venue: Option<&Venue>,
2569 instrument_id: Option<&InstrumentId>,
2570 strategy_id: Option<&StrategyId>,
2571 side: Option<OrderSide>,
2572 ) -> usize {
2573 self.orders_open(venue, instrument_id, strategy_id, side)
2574 .len()
2575 }
2576
2577 #[must_use]
2579 pub fn orders_closed_count(
2580 &self,
2581 venue: Option<&Venue>,
2582 instrument_id: Option<&InstrumentId>,
2583 strategy_id: Option<&StrategyId>,
2584 side: Option<OrderSide>,
2585 ) -> usize {
2586 self.orders_closed(venue, instrument_id, strategy_id, side)
2587 .len()
2588 }
2589
2590 #[must_use]
2592 pub fn orders_emulated_count(
2593 &self,
2594 venue: Option<&Venue>,
2595 instrument_id: Option<&InstrumentId>,
2596 strategy_id: Option<&StrategyId>,
2597 side: Option<OrderSide>,
2598 ) -> usize {
2599 self.orders_emulated(venue, instrument_id, strategy_id, side)
2600 .len()
2601 }
2602
2603 #[must_use]
2605 pub fn orders_inflight_count(
2606 &self,
2607 venue: Option<&Venue>,
2608 instrument_id: Option<&InstrumentId>,
2609 strategy_id: Option<&StrategyId>,
2610 side: Option<OrderSide>,
2611 ) -> usize {
2612 self.orders_inflight(venue, instrument_id, strategy_id, side)
2613 .len()
2614 }
2615
2616 #[must_use]
2618 pub fn orders_total_count(
2619 &self,
2620 venue: Option<&Venue>,
2621 instrument_id: Option<&InstrumentId>,
2622 strategy_id: Option<&StrategyId>,
2623 side: Option<OrderSide>,
2624 ) -> usize {
2625 self.orders(venue, instrument_id, strategy_id, side).len()
2626 }
2627
2628 #[must_use]
2630 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2631 self.order_lists.get(order_list_id)
2632 }
2633
2634 #[must_use]
2636 pub fn order_lists(
2637 &self,
2638 venue: Option<&Venue>,
2639 instrument_id: Option<&InstrumentId>,
2640 strategy_id: Option<&StrategyId>,
2641 ) -> Vec<&OrderList> {
2642 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2643
2644 if let Some(venue) = venue {
2645 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2646 }
2647
2648 if let Some(instrument_id) = instrument_id {
2649 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2650 }
2651
2652 if let Some(strategy_id) = strategy_id {
2653 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2654 }
2655
2656 order_lists
2657 }
2658
2659 #[must_use]
2661 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2662 self.order_lists.contains_key(order_list_id)
2663 }
2664
2665 #[must_use]
2670 pub fn orders_for_exec_algorithm(
2671 &self,
2672 exec_algorithm_id: &ExecAlgorithmId,
2673 venue: Option<&Venue>,
2674 instrument_id: Option<&InstrumentId>,
2675 strategy_id: Option<&StrategyId>,
2676 side: Option<OrderSide>,
2677 ) -> Vec<&OrderAny> {
2678 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2679 let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2680
2681 if let Some(query) = query
2682 && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
2683 {
2684 let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2685 }
2686
2687 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2688 self.get_orders_for_ids(exec_algorithm_order_ids, side)
2689 } else {
2690 Vec::new()
2691 }
2692 }
2693
2694 #[must_use]
2696 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2697 self.get_orders_for_ids(
2698 self.index
2699 .exec_spawn_orders
2700 .get(exec_spawn_id)
2701 .unwrap_or(&AHashSet::new()),
2702 None,
2703 )
2704 }
2705
2706 #[must_use]
2708 pub fn exec_spawn_total_quantity(
2709 &self,
2710 exec_spawn_id: &ClientOrderId,
2711 active_only: bool,
2712 ) -> Option<Quantity> {
2713 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2714
2715 let mut total_quantity: Option<Quantity> = None;
2716
2717 for spawn_order in exec_spawn_orders {
2718 if !active_only || !spawn_order.is_closed() {
2719 if let Some(mut total_quantity) = total_quantity {
2720 total_quantity += spawn_order.quantity();
2721 }
2722 } else {
2723 total_quantity = Some(spawn_order.quantity());
2724 }
2725 }
2726
2727 total_quantity
2728 }
2729
2730 #[must_use]
2732 pub fn exec_spawn_total_filled_qty(
2733 &self,
2734 exec_spawn_id: &ClientOrderId,
2735 active_only: bool,
2736 ) -> Option<Quantity> {
2737 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2738
2739 let mut total_quantity: Option<Quantity> = None;
2740
2741 for spawn_order in exec_spawn_orders {
2742 if !active_only || !spawn_order.is_closed() {
2743 if let Some(mut total_quantity) = total_quantity {
2744 total_quantity += spawn_order.filled_qty();
2745 }
2746 } else {
2747 total_quantity = Some(spawn_order.filled_qty());
2748 }
2749 }
2750
2751 total_quantity
2752 }
2753
2754 #[must_use]
2756 pub fn exec_spawn_total_leaves_qty(
2757 &self,
2758 exec_spawn_id: &ClientOrderId,
2759 active_only: bool,
2760 ) -> Option<Quantity> {
2761 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2762
2763 let mut total_quantity: Option<Quantity> = None;
2764
2765 for spawn_order in exec_spawn_orders {
2766 if !active_only || !spawn_order.is_closed() {
2767 if let Some(mut total_quantity) = total_quantity {
2768 total_quantity += spawn_order.leaves_qty();
2769 }
2770 } else {
2771 total_quantity = Some(spawn_order.leaves_qty());
2772 }
2773 }
2774
2775 total_quantity
2776 }
2777
2778 #[must_use]
2782 pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2783 self.positions.get(position_id)
2784 }
2785
2786 #[must_use]
2788 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2789 self.index
2790 .order_position
2791 .get(client_order_id)
2792 .and_then(|position_id| self.positions.get(position_id))
2793 }
2794
2795 #[must_use]
2797 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2798 self.index.order_position.get(client_order_id)
2799 }
2800
2801 #[must_use]
2803 pub fn positions(
2804 &self,
2805 venue: Option<&Venue>,
2806 instrument_id: Option<&InstrumentId>,
2807 strategy_id: Option<&StrategyId>,
2808 side: Option<PositionSide>,
2809 ) -> Vec<&Position> {
2810 let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2811 self.get_positions_for_ids(&position_ids, side)
2812 }
2813
2814 #[must_use]
2816 pub fn positions_open(
2817 &self,
2818 venue: Option<&Venue>,
2819 instrument_id: Option<&InstrumentId>,
2820 strategy_id: Option<&StrategyId>,
2821 side: Option<PositionSide>,
2822 ) -> Vec<&Position> {
2823 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2824 self.get_positions_for_ids(&position_ids, side)
2825 }
2826
2827 #[must_use]
2829 pub fn positions_closed(
2830 &self,
2831 venue: Option<&Venue>,
2832 instrument_id: Option<&InstrumentId>,
2833 strategy_id: Option<&StrategyId>,
2834 side: Option<PositionSide>,
2835 ) -> Vec<&Position> {
2836 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2837 self.get_positions_for_ids(&position_ids, side)
2838 }
2839
2840 #[must_use]
2842 pub fn position_exists(&self, position_id: &PositionId) -> bool {
2843 self.index.positions.contains(position_id)
2844 }
2845
2846 #[must_use]
2848 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2849 self.index.positions_open.contains(position_id)
2850 }
2851
2852 #[must_use]
2854 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2855 self.index.positions_closed.contains(position_id)
2856 }
2857
2858 #[must_use]
2860 pub fn positions_open_count(
2861 &self,
2862 venue: Option<&Venue>,
2863 instrument_id: Option<&InstrumentId>,
2864 strategy_id: Option<&StrategyId>,
2865 side: Option<PositionSide>,
2866 ) -> usize {
2867 self.positions_open(venue, instrument_id, strategy_id, side)
2868 .len()
2869 }
2870
2871 #[must_use]
2873 pub fn positions_closed_count(
2874 &self,
2875 venue: Option<&Venue>,
2876 instrument_id: Option<&InstrumentId>,
2877 strategy_id: Option<&StrategyId>,
2878 side: Option<PositionSide>,
2879 ) -> usize {
2880 self.positions_closed(venue, instrument_id, strategy_id, side)
2881 .len()
2882 }
2883
2884 #[must_use]
2886 pub fn positions_total_count(
2887 &self,
2888 venue: Option<&Venue>,
2889 instrument_id: Option<&InstrumentId>,
2890 strategy_id: Option<&StrategyId>,
2891 side: Option<PositionSide>,
2892 ) -> usize {
2893 self.positions(venue, instrument_id, strategy_id, side)
2894 .len()
2895 }
2896
2897 #[must_use]
2901 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2902 self.index.order_strategy.get(client_order_id)
2903 }
2904
2905 #[must_use]
2907 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2908 self.index.position_strategy.get(position_id)
2909 }
2910
2911 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2919 check_valid_string_ascii(key, stringify!(key))?;
2920
2921 Ok(self.general.get(key))
2922 }
2923
2924 #[must_use]
2928 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2929 match price_type {
2930 PriceType::Bid => self
2931 .quotes
2932 .get(instrument_id)
2933 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2934 PriceType::Ask => self
2935 .quotes
2936 .get(instrument_id)
2937 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2938 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2939 quotes.front().map(|quote| {
2940 Price::new(
2941 f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2942 quote.bid_price.precision + 1,
2943 )
2944 })
2945 }),
2946 PriceType::Last => self
2947 .trades
2948 .get(instrument_id)
2949 .and_then(|trades| trades.front().map(|trade| trade.price)),
2950 PriceType::Mark => self
2951 .mark_prices
2952 .get(instrument_id)
2953 .and_then(|marks| marks.front().map(|mark| mark.value)),
2954 }
2955 }
2956
2957 #[must_use]
2959 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2960 self.quotes
2961 .get(instrument_id)
2962 .map(|quotes| quotes.iter().copied().collect())
2963 }
2964
2965 #[must_use]
2967 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2968 self.trades
2969 .get(instrument_id)
2970 .map(|trades| trades.iter().copied().collect())
2971 }
2972
2973 #[must_use]
2975 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
2976 self.mark_prices
2977 .get(instrument_id)
2978 .map(|mark_prices| mark_prices.iter().copied().collect())
2979 }
2980
2981 #[must_use]
2983 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
2984 self.index_prices
2985 .get(instrument_id)
2986 .map(|index_prices| index_prices.iter().copied().collect())
2987 }
2988
2989 #[must_use]
2991 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2992 self.bars
2993 .get(bar_type)
2994 .map(|bars| bars.iter().copied().collect())
2995 }
2996
2997 #[must_use]
2999 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3000 self.books.get(instrument_id)
3001 }
3002
3003 #[must_use]
3005 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3006 self.books.get_mut(instrument_id)
3007 }
3008
3009 #[must_use]
3011 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3012 self.own_books.get(instrument_id)
3013 }
3014
3015 #[must_use]
3017 pub fn own_order_book_mut(
3018 &mut self,
3019 instrument_id: &InstrumentId,
3020 ) -> Option<&mut OwnOrderBook> {
3021 self.own_books.get_mut(instrument_id)
3022 }
3023
3024 #[must_use]
3026 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3027 self.quotes
3028 .get(instrument_id)
3029 .and_then(|quotes| quotes.front())
3030 }
3031
3032 #[must_use]
3034 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3035 self.trades
3036 .get(instrument_id)
3037 .and_then(|trades| trades.front())
3038 }
3039
3040 #[must_use]
3042 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3043 self.mark_prices
3044 .get(instrument_id)
3045 .and_then(|mark_prices| mark_prices.front())
3046 }
3047
3048 #[must_use]
3050 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3051 self.index_prices
3052 .get(instrument_id)
3053 .and_then(|index_prices| index_prices.front())
3054 }
3055
3056 #[must_use]
3058 pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3059 self.funding_rates.get(instrument_id)
3060 }
3061
3062 #[must_use]
3064 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3065 self.bars.get(bar_type).and_then(|bars| bars.front())
3066 }
3067
3068 #[must_use]
3070 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3071 self.books
3072 .get(instrument_id)
3073 .map_or(0, |book| book.update_count) as usize
3074 }
3075
3076 #[must_use]
3078 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3079 self.quotes
3080 .get(instrument_id)
3081 .map_or(0, std::collections::VecDeque::len)
3082 }
3083
3084 #[must_use]
3086 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3087 self.trades
3088 .get(instrument_id)
3089 .map_or(0, std::collections::VecDeque::len)
3090 }
3091
3092 #[must_use]
3094 pub fn bar_count(&self, bar_type: &BarType) -> usize {
3095 self.bars
3096 .get(bar_type)
3097 .map_or(0, std::collections::VecDeque::len)
3098 }
3099
3100 #[must_use]
3102 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3103 self.books.contains_key(instrument_id)
3104 }
3105
3106 #[must_use]
3108 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3109 self.quote_count(instrument_id) > 0
3110 }
3111
3112 #[must_use]
3114 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3115 self.trade_count(instrument_id) > 0
3116 }
3117
3118 #[must_use]
3120 pub fn has_bars(&self, bar_type: &BarType) -> bool {
3121 self.bar_count(bar_type) > 0
3122 }
3123
3124 #[must_use]
3125 pub fn get_xrate(
3126 &self,
3127 venue: Venue,
3128 from_currency: Currency,
3129 to_currency: Currency,
3130 price_type: PriceType,
3131 ) -> Option<f64> {
3132 if from_currency == to_currency {
3133 return Some(1.0);
3136 }
3137
3138 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3139
3140 match get_exchange_rate(
3141 from_currency.code,
3142 to_currency.code,
3143 price_type,
3144 bid_quote,
3145 ask_quote,
3146 ) {
3147 Ok(rate) => rate,
3148 Err(e) => {
3149 log::error!("Failed to calculate xrate: {e}");
3150 None
3151 }
3152 }
3153 }
3154
3155 fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3156 let mut bid_quotes = AHashMap::new();
3157 let mut ask_quotes = AHashMap::new();
3158
3159 for instrument_id in self.instruments.keys() {
3160 if instrument_id.venue != *venue {
3161 continue;
3162 }
3163
3164 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3165 if let Some(tick) = ticks.front() {
3166 (tick.bid_price, tick.ask_price)
3167 } else {
3168 continue; }
3170 } else {
3171 let bid_bar = self
3172 .bars
3173 .iter()
3174 .find(|(k, _)| {
3175 k.instrument_id() == *instrument_id
3176 && matches!(k.spec().price_type, PriceType::Bid)
3177 })
3178 .map(|(_, v)| v);
3179
3180 let ask_bar = self
3181 .bars
3182 .iter()
3183 .find(|(k, _)| {
3184 k.instrument_id() == *instrument_id
3185 && matches!(k.spec().price_type, PriceType::Ask)
3186 })
3187 .map(|(_, v)| v);
3188
3189 match (bid_bar, ask_bar) {
3190 (Some(bid), Some(ask)) => {
3191 let bid_price = bid.front().unwrap().close;
3192 let ask_price = ask.front().unwrap().close;
3193
3194 (bid_price, ask_price)
3195 }
3196 _ => continue,
3197 }
3198 };
3199
3200 bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3201 ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3202 }
3203
3204 (bid_quotes, ask_quotes)
3205 }
3206
3207 #[must_use]
3209 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3210 self.mark_xrates.get(&(from_currency, to_currency)).copied()
3211 }
3212
3213 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3219 assert!(xrate > 0.0, "xrate was zero");
3220 self.mark_xrates.insert((from_currency, to_currency), xrate);
3221 self.mark_xrates
3222 .insert((to_currency, from_currency), 1.0 / xrate);
3223 }
3224
3225 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3227 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3228 }
3229
3230 pub fn clear_mark_xrates(&mut self) {
3232 self.mark_xrates.clear();
3233 }
3234
3235 #[must_use]
3239 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3240 self.instruments.get(instrument_id)
3241 }
3242
3243 #[must_use]
3245 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3246 match venue {
3247 Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3248 None => self.instruments.keys().collect(),
3249 }
3250 }
3251
3252 #[must_use]
3254 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3255 self.instruments
3256 .values()
3257 .filter(|i| &i.id().venue == venue)
3258 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3259 .collect()
3260 }
3261
3262 #[must_use]
3264 pub fn bar_types(
3265 &self,
3266 instrument_id: Option<&InstrumentId>,
3267 price_type: Option<&PriceType>,
3268 aggregation_source: AggregationSource,
3269 ) -> Vec<&BarType> {
3270 let mut bar_types = self
3271 .bars
3272 .keys()
3273 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3274 .collect::<Vec<&BarType>>();
3275
3276 if let Some(instrument_id) = instrument_id {
3277 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3278 }
3279
3280 if let Some(price_type) = price_type {
3281 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3282 }
3283
3284 bar_types
3285 }
3286
3287 #[must_use]
3291 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3292 self.synthetics.get(instrument_id)
3293 }
3294
3295 #[must_use]
3297 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3298 self.synthetics.keys().collect()
3299 }
3300
3301 #[must_use]
3303 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3304 self.synthetics.values().collect()
3305 }
3306
3307 #[must_use]
3311 pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3312 self.accounts.get(account_id)
3313 }
3314
3315 #[must_use]
3317 pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3318 self.index
3319 .venue_account
3320 .get(venue)
3321 .and_then(|account_id| self.accounts.get(account_id))
3322 }
3323
3324 #[must_use]
3326 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3327 self.index.venue_account.get(venue)
3328 }
3329
3330 #[must_use]
3332 pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3333 self.accounts
3334 .values()
3335 .filter(|account| &account.id() == account_id)
3336 .collect()
3337 }
3338
3339 pub fn update_own_order_book(&mut self, order: &OrderAny) {
3347 if !order.has_price() {
3348 return;
3349 }
3350
3351 let instrument_id = order.instrument_id();
3352
3353 let own_book = self
3354 .own_books
3355 .entry(instrument_id)
3356 .or_insert_with(|| OwnOrderBook::new(instrument_id));
3357
3358 let own_book_order = order.to_own_book_order();
3359
3360 if order.is_closed() {
3361 if let Err(e) = own_book.delete(own_book_order) {
3362 log::debug!(
3363 "Failed to delete order {} from own book: {e}",
3364 order.client_order_id(),
3365 );
3366 } else {
3367 log::debug!("Deleted order {} from own book", order.client_order_id());
3368 }
3369 } else {
3370 if let Err(e) = own_book.update(own_book_order) {
3372 log::debug!(
3373 "Failed to update order {} in own book: {e}; inserting instead",
3374 order.client_order_id(),
3375 );
3376 own_book.add(own_book_order);
3377 }
3378 log::debug!("Updated order {} in own book", order.client_order_id());
3379 }
3380 }
3381
3382 pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
3388 let order = match self.orders.get(client_order_id) {
3389 Some(order) => order,
3390 None => return,
3391 };
3392
3393 self.index.orders_open.remove(client_order_id);
3394 self.index.orders_pending_cancel.remove(client_order_id);
3395 self.index.orders_inflight.remove(client_order_id);
3396 self.index.orders_emulated.remove(client_order_id);
3397
3398 if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
3399 && order.has_price()
3400 {
3401 let own_book_order = order.to_own_book_order();
3402 if let Err(e) = own_book.delete(own_book_order) {
3403 log::debug!("Could not force delete {client_order_id} from own book: {e}");
3404 } else {
3405 log::debug!("Force deleted {client_order_id} from own book");
3406 }
3407 }
3408
3409 self.index.orders_closed.insert(*client_order_id);
3410 }
3411
3412 pub fn audit_own_order_books(&mut self) {
3419 log::debug!("Starting own books audit");
3420 let start = std::time::Instant::now();
3421
3422 let valid_order_ids: HashSet<ClientOrderId> = self
3425 .index
3426 .orders_open
3427 .union(&self.index.orders_inflight)
3428 .copied()
3429 .collect();
3430
3431 for own_book in self.own_books.values_mut() {
3432 own_book.audit_open_orders(&valid_order_ids);
3433 }
3434
3435 log::debug!("Completed own books audit in {:?}", start.elapsed());
3436 }
3437}