1pub mod config;
21pub mod database;
22
23mod index;
24
25#[cfg(test)]
26mod tests;
27
28use std::{
29 collections::{HashSet, VecDeque},
30 fmt::Debug,
31 time::{SystemTime, UNIX_EPOCH},
32};
33
34use ahash::{AHashMap, AHashSet};
35use bytes::Bytes;
36pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
38use index::CacheIndex;
39use nautilus_core::{
40 UUID4, UnixNanos,
41 correctness::{
42 check_key_not_in_map, check_predicate_false, check_slice_not_empty, check_valid_string,
43 },
44 datetime::secs_to_nanos,
45};
46#[cfg(feature = "defi")]
47use nautilus_model::defi::Pool;
48use nautilus_model::{
49 accounts::{Account, AccountAny},
50 data::{
51 Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
52 TradeTick, YieldCurveData,
53 },
54 enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
55 identifiers::{
56 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
57 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
58 },
59 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
60 orderbook::{
61 OrderBook,
62 own::{OwnOrderBook, should_handle_own_book_order},
63 },
64 orders::{Order, OrderAny, OrderList},
65 position::Position,
66 types::{Currency, Money, Price, Quantity},
67};
68use ustr::Ustr;
69
70use crate::xrate::get_exchange_rate;
71
72#[cfg_attr(
74 feature = "python",
75 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
76)]
77pub struct Cache {
78 config: CacheConfig,
79 index: CacheIndex,
80 database: Option<Box<dyn CacheDatabaseAdapter>>,
81 general: AHashMap<String, Bytes>,
82 currencies: AHashMap<Ustr, Currency>,
83 instruments: AHashMap<InstrumentId, InstrumentAny>,
84 synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
85 books: AHashMap<InstrumentId, OrderBook>,
86 own_books: AHashMap<InstrumentId, OwnOrderBook>,
87 quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
88 trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
89 mark_xrates: AHashMap<(Currency, Currency), f64>,
90 mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
91 index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
92 funding_rates: AHashMap<InstrumentId, FundingRateUpdate>,
93 bars: AHashMap<BarType, VecDeque<Bar>>,
94 greeks: AHashMap<InstrumentId, GreeksData>,
95 yield_curves: AHashMap<String, YieldCurveData>,
96 accounts: AHashMap<AccountId, AccountAny>,
97 orders: AHashMap<ClientOrderId, OrderAny>,
98 order_lists: AHashMap<OrderListId, OrderList>,
99 positions: AHashMap<PositionId, Position>,
100 position_snapshots: AHashMap<PositionId, Bytes>,
101 #[cfg(feature = "defi")]
102 pools: AHashMap<InstrumentId, Pool>,
103}
104
105impl Debug for Cache {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 f.debug_struct(stringify!(Cache))
108 .field("config", &self.config)
109 .field("index", &self.index)
110 .field("general", &self.general)
111 .field("currencies", &self.currencies)
112 .field("instruments", &self.instruments)
113 .field("synthetics", &self.synthetics)
114 .field("books", &self.books)
115 .field("own_books", &self.own_books)
116 .field("quotes", &self.quotes)
117 .field("trades", &self.trades)
118 .field("mark_xrates", &self.mark_xrates)
119 .field("mark_prices", &self.mark_prices)
120 .field("index_prices", &self.index_prices)
121 .field("funding_rates", &self.funding_rates)
122 .field("bars", &self.bars)
123 .field("greeks", &self.greeks)
124 .field("yield_curves", &self.yield_curves)
125 .field("accounts", &self.accounts)
126 .field("orders", &self.orders)
127 .field("order_lists", &self.order_lists)
128 .field("positions", &self.positions)
129 .field("position_snapshots", &self.position_snapshots)
130 .finish()
131 }
132}
133
134impl Default for Cache {
135 fn default() -> Self {
137 Self::new(Some(CacheConfig::default()), None)
138 }
139}
140
141impl Cache {
142 #[must_use]
144 pub fn new(
148 config: Option<CacheConfig>,
149 database: Option<Box<dyn CacheDatabaseAdapter>>,
150 ) -> Self {
151 Self {
152 config: config.unwrap_or_default(),
153 index: CacheIndex::default(),
154 database,
155 general: AHashMap::new(),
156 currencies: AHashMap::new(),
157 instruments: AHashMap::new(),
158 synthetics: AHashMap::new(),
159 books: AHashMap::new(),
160 own_books: AHashMap::new(),
161 quotes: AHashMap::new(),
162 trades: AHashMap::new(),
163 mark_xrates: AHashMap::new(),
164 mark_prices: AHashMap::new(),
165 index_prices: AHashMap::new(),
166 funding_rates: AHashMap::new(),
167 bars: AHashMap::new(),
168 greeks: AHashMap::new(),
169 yield_curves: AHashMap::new(),
170 accounts: AHashMap::new(),
171 orders: AHashMap::new(),
172 order_lists: AHashMap::new(),
173 positions: AHashMap::new(),
174 position_snapshots: AHashMap::new(),
175 #[cfg(feature = "defi")]
176 pools: AHashMap::new(),
177 }
178 }
179
180 #[must_use]
182 pub fn memory_address(&self) -> String {
183 format!("{:?}", std::ptr::from_ref(self))
184 }
185
186 pub fn cache_general(&mut self) -> anyhow::Result<()> {
194 self.general = match &mut self.database {
195 Some(db) => db.load()?,
196 None => AHashMap::new(),
197 };
198
199 log::info!(
200 "Cached {} general object(s) from database",
201 self.general.len()
202 );
203 Ok(())
204 }
205
206 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
212 let cache_map = match &self.database {
213 Some(db) => db.load_all().await?,
214 None => CacheMap::default(),
215 };
216
217 self.currencies = cache_map.currencies;
218 self.instruments = cache_map.instruments;
219 self.synthetics = cache_map.synthetics;
220 self.accounts = cache_map.accounts;
221 self.orders = cache_map.orders;
222 self.positions = cache_map.positions;
223 Ok(())
224 }
225
226 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
232 self.currencies = match &mut self.database {
233 Some(db) => db.load_currencies().await?,
234 None => AHashMap::new(),
235 };
236
237 log::info!("Cached {} currencies from database", self.general.len());
238 Ok(())
239 }
240
241 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
247 self.instruments = match &mut self.database {
248 Some(db) => db.load_instruments().await?,
249 None => AHashMap::new(),
250 };
251
252 log::info!("Cached {} instruments from database", self.general.len());
253 Ok(())
254 }
255
256 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
262 self.synthetics = match &mut self.database {
263 Some(db) => db.load_synthetics().await?,
264 None => AHashMap::new(),
265 };
266
267 log::info!(
268 "Cached {} synthetic instruments from database",
269 self.general.len()
270 );
271 Ok(())
272 }
273
274 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
280 self.accounts = match &mut self.database {
281 Some(db) => db.load_accounts().await?,
282 None => AHashMap::new(),
283 };
284
285 log::info!(
286 "Cached {} synthetic instruments from database",
287 self.general.len()
288 );
289 Ok(())
290 }
291
292 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
298 self.orders = match &mut self.database {
299 Some(db) => db.load_orders().await?,
300 None => AHashMap::new(),
301 };
302
303 log::info!("Cached {} orders from database", self.general.len());
304 Ok(())
305 }
306
307 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
313 self.positions = match &mut self.database {
314 Some(db) => db.load_positions().await?,
315 None => AHashMap::new(),
316 };
317
318 log::info!("Cached {} positions from database", self.general.len());
319 Ok(())
320 }
321
322 pub fn build_index(&mut self) {
324 log::debug!("Building index");
325
326 for account_id in self.accounts.keys() {
328 self.index
329 .venue_account
330 .insert(account_id.get_issuer(), *account_id);
331 }
332
333 for (client_order_id, order) in &self.orders {
335 let instrument_id = order.instrument_id();
336 let venue = instrument_id.venue;
337 let strategy_id = order.strategy_id();
338
339 self.index
341 .venue_orders
342 .entry(venue)
343 .or_default()
344 .insert(*client_order_id);
345
346 if let Some(venue_order_id) = order.venue_order_id() {
348 self.index
349 .venue_order_ids
350 .insert(venue_order_id, *client_order_id);
351 }
352
353 if let Some(position_id) = order.position_id() {
355 self.index
356 .order_position
357 .insert(*client_order_id, position_id);
358 }
359
360 self.index
362 .order_strategy
363 .insert(*client_order_id, order.strategy_id());
364
365 self.index
367 .instrument_orders
368 .entry(instrument_id)
369 .or_default()
370 .insert(*client_order_id);
371
372 self.index
374 .strategy_orders
375 .entry(strategy_id)
376 .or_default()
377 .insert(*client_order_id);
378
379 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
381 self.index
382 .exec_algorithm_orders
383 .entry(exec_algorithm_id)
384 .or_default()
385 .insert(*client_order_id);
386 }
387
388 if let Some(exec_spawn_id) = order.exec_spawn_id() {
390 self.index
391 .exec_spawn_orders
392 .entry(exec_spawn_id)
393 .or_default()
394 .insert(*client_order_id);
395 }
396
397 self.index.orders.insert(*client_order_id);
399
400 if order.is_open() {
402 self.index.orders_open.insert(*client_order_id);
403 }
404
405 if order.is_closed() {
407 self.index.orders_closed.insert(*client_order_id);
408 }
409
410 if let Some(emulation_trigger) = order.emulation_trigger()
412 && emulation_trigger != TriggerType::NoTrigger
413 && !order.is_closed()
414 {
415 self.index.orders_emulated.insert(*client_order_id);
416 }
417
418 if order.is_inflight() {
420 self.index.orders_inflight.insert(*client_order_id);
421 }
422
423 self.index.strategies.insert(strategy_id);
425
426 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
428 self.index.exec_algorithms.insert(exec_algorithm_id);
429 }
430 }
431
432 for (position_id, position) in &self.positions {
434 let instrument_id = position.instrument_id;
435 let venue = instrument_id.venue;
436 let strategy_id = position.strategy_id;
437
438 self.index
440 .venue_positions
441 .entry(venue)
442 .or_default()
443 .insert(*position_id);
444
445 self.index
447 .position_strategy
448 .insert(*position_id, position.strategy_id);
449
450 self.index
452 .position_orders
453 .entry(*position_id)
454 .or_default()
455 .extend(position.client_order_ids().into_iter());
456
457 self.index
459 .instrument_positions
460 .entry(instrument_id)
461 .or_default()
462 .insert(*position_id);
463
464 self.index
466 .strategy_positions
467 .entry(strategy_id)
468 .or_default()
469 .insert(*position_id);
470
471 self.index.positions.insert(*position_id);
473
474 if position.is_open() {
476 self.index.positions_open.insert(*position_id);
477 }
478
479 if position.is_closed() {
481 self.index.positions_closed.insert(*position_id);
482 }
483
484 self.index.strategies.insert(strategy_id);
486 }
487 }
488
489 #[must_use]
491 pub const fn has_backing(&self) -> bool {
492 self.config.database.is_some()
493 }
494
495 #[must_use]
497 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
498 let quote = if let Some(quote) = self.quote(&position.instrument_id) {
499 quote
500 } else {
501 log::warn!(
502 "Cannot calculate unrealized PnL for {}, no quotes for {}",
503 position.id,
504 position.instrument_id
505 );
506 return None;
507 };
508
509 let last = match position.side {
510 PositionSide::Flat | PositionSide::NoPositionSide => {
511 return Some(Money::new(0.0, position.settlement_currency));
512 }
513 PositionSide::Long => quote.ask_price,
514 PositionSide::Short => quote.bid_price,
515 };
516
517 Some(position.unrealized_pnl(last))
518 }
519
520 #[must_use]
529 pub fn check_integrity(&mut self) -> bool {
530 let mut error_count = 0;
531 let failure = "Integrity failure";
532
533 let timestamp_us = SystemTime::now()
535 .duration_since(UNIX_EPOCH)
536 .expect("Time went backwards")
537 .as_micros();
538
539 log::info!("Checking data integrity");
540
541 for account_id in self.accounts.keys() {
543 if !self
544 .index
545 .venue_account
546 .contains_key(&account_id.get_issuer())
547 {
548 log::error!(
549 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
550 );
551 error_count += 1;
552 }
553 }
554
555 for (client_order_id, order) in &self.orders {
556 if !self.index.order_strategy.contains_key(client_order_id) {
557 log::error!(
558 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
559 );
560 error_count += 1;
561 }
562 if !self.index.orders.contains(client_order_id) {
563 log::error!(
564 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
565 );
566 error_count += 1;
567 }
568 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
569 log::error!(
570 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
571 );
572 error_count += 1;
573 }
574 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
575 log::error!(
576 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
577 );
578 error_count += 1;
579 }
580 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
581 log::error!(
582 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
583 );
584 error_count += 1;
585 }
586 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
587 if !self
588 .index
589 .exec_algorithm_orders
590 .contains_key(&exec_algorithm_id)
591 {
592 log::error!(
593 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
594 );
595 error_count += 1;
596 }
597 if order.exec_spawn_id().is_none()
598 && !self.index.exec_spawn_orders.contains_key(client_order_id)
599 {
600 log::error!(
601 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
602 );
603 error_count += 1;
604 }
605 }
606 }
607
608 for (position_id, position) in &self.positions {
609 if !self.index.position_strategy.contains_key(position_id) {
610 log::error!(
611 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
612 );
613 error_count += 1;
614 }
615 if !self.index.position_orders.contains_key(position_id) {
616 log::error!(
617 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
618 );
619 error_count += 1;
620 }
621 if !self.index.positions.contains(position_id) {
622 log::error!(
623 "{failure} in positions: {position_id} not found in `self.index.positions`",
624 );
625 error_count += 1;
626 }
627 if position.is_open() && !self.index.positions_open.contains(position_id) {
628 log::error!(
629 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
630 );
631 error_count += 1;
632 }
633 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
634 log::error!(
635 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
636 );
637 error_count += 1;
638 }
639 }
640
641 for account_id in self.index.venue_account.values() {
643 if !self.accounts.contains_key(account_id) {
644 log::error!(
645 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
646 );
647 error_count += 1;
648 }
649 }
650
651 for client_order_id in self.index.venue_order_ids.values() {
652 if !self.orders.contains_key(client_order_id) {
653 log::error!(
654 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
655 );
656 error_count += 1;
657 }
658 }
659
660 for client_order_id in self.index.client_order_ids.keys() {
661 if !self.orders.contains_key(client_order_id) {
662 log::error!(
663 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
664 );
665 error_count += 1;
666 }
667 }
668
669 for client_order_id in self.index.order_position.keys() {
670 if !self.orders.contains_key(client_order_id) {
671 log::error!(
672 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
673 );
674 error_count += 1;
675 }
676 }
677
678 for client_order_id in self.index.order_strategy.keys() {
680 if !self.orders.contains_key(client_order_id) {
681 log::error!(
682 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
683 );
684 error_count += 1;
685 }
686 }
687
688 for position_id in self.index.position_strategy.keys() {
689 if !self.positions.contains_key(position_id) {
690 log::error!(
691 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
692 );
693 error_count += 1;
694 }
695 }
696
697 for position_id in self.index.position_orders.keys() {
698 if !self.positions.contains_key(position_id) {
699 log::error!(
700 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
701 );
702 error_count += 1;
703 }
704 }
705
706 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
707 for client_order_id in client_order_ids {
708 if !self.orders.contains_key(client_order_id) {
709 log::error!(
710 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
711 );
712 error_count += 1;
713 }
714 }
715 }
716
717 for instrument_id in self.index.instrument_positions.keys() {
718 if !self.index.instrument_orders.contains_key(instrument_id) {
719 log::error!(
720 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
721 );
722 error_count += 1;
723 }
724 }
725
726 for client_order_ids in self.index.strategy_orders.values() {
727 for client_order_id in client_order_ids {
728 if !self.orders.contains_key(client_order_id) {
729 log::error!(
730 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
731 );
732 error_count += 1;
733 }
734 }
735 }
736
737 for position_ids in self.index.strategy_positions.values() {
738 for position_id in position_ids {
739 if !self.positions.contains_key(position_id) {
740 log::error!(
741 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
742 );
743 error_count += 1;
744 }
745 }
746 }
747
748 for client_order_id in &self.index.orders {
749 if !self.orders.contains_key(client_order_id) {
750 log::error!(
751 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
752 );
753 error_count += 1;
754 }
755 }
756
757 for client_order_id in &self.index.orders_emulated {
758 if !self.orders.contains_key(client_order_id) {
759 log::error!(
760 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
761 );
762 error_count += 1;
763 }
764 }
765
766 for client_order_id in &self.index.orders_inflight {
767 if !self.orders.contains_key(client_order_id) {
768 log::error!(
769 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
770 );
771 error_count += 1;
772 }
773 }
774
775 for client_order_id in &self.index.orders_open {
776 if !self.orders.contains_key(client_order_id) {
777 log::error!(
778 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
779 );
780 error_count += 1;
781 }
782 }
783
784 for client_order_id in &self.index.orders_closed {
785 if !self.orders.contains_key(client_order_id) {
786 log::error!(
787 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
788 );
789 error_count += 1;
790 }
791 }
792
793 for position_id in &self.index.positions {
794 if !self.positions.contains_key(position_id) {
795 log::error!(
796 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
797 );
798 error_count += 1;
799 }
800 }
801
802 for position_id in &self.index.positions_open {
803 if !self.positions.contains_key(position_id) {
804 log::error!(
805 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
806 );
807 error_count += 1;
808 }
809 }
810
811 for position_id in &self.index.positions_closed {
812 if !self.positions.contains_key(position_id) {
813 log::error!(
814 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
815 );
816 error_count += 1;
817 }
818 }
819
820 for strategy_id in &self.index.strategies {
821 if !self.index.strategy_orders.contains_key(strategy_id) {
822 log::error!(
823 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
824 );
825 error_count += 1;
826 }
827 }
828
829 for exec_algorithm_id in &self.index.exec_algorithms {
830 if !self
831 .index
832 .exec_algorithm_orders
833 .contains_key(exec_algorithm_id)
834 {
835 log::error!(
836 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
837 );
838 error_count += 1;
839 }
840 }
841
842 let total_us = SystemTime::now()
843 .duration_since(UNIX_EPOCH)
844 .expect("Time went backwards")
845 .as_micros()
846 - timestamp_us;
847
848 if error_count == 0 {
849 log::info!("Integrity check passed in {total_us}μs");
850 true
851 } else {
852 log::error!(
853 "Integrity check failed with {error_count} error{} in {total_us}μs",
854 if error_count == 1 { "" } else { "s" },
855 );
856 false
857 }
858 }
859
860 #[must_use]
864 pub fn check_residuals(&self) -> bool {
865 log::debug!("Checking residuals");
866
867 let mut residuals = false;
868
869 for order in self.orders_open(None, None, None, None) {
871 residuals = true;
872 log::warn!("Residual {order:?}");
873 }
874
875 for position in self.positions_open(None, None, None, None) {
877 residuals = true;
878 log::warn!("Residual {position}");
879 }
880
881 residuals
882 }
883
884 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
890 log::debug!(
891 "Purging closed orders{}",
892 if buffer_secs > 0 {
893 format!(" with buffer_secs={buffer_secs}")
894 } else {
895 String::new()
896 }
897 );
898
899 let buffer_ns = secs_to_nanos(buffer_secs as f64);
900
901 'outer: for client_order_id in self.index.orders_closed.clone() {
902 if let Some(order) = self.orders.get(&client_order_id)
903 && let Some(ts_closed) = order.ts_closed()
904 && ts_closed + buffer_ns <= ts_now
905 {
906 if let Some(linked_order_ids) = order.linked_order_ids() {
908 for linked_order_id in linked_order_ids {
909 if let Some(linked_order) = self.orders.get(linked_order_id)
910 && linked_order.is_open()
911 {
912 continue 'outer;
914 }
915 }
916 }
917
918 self.purge_order(client_order_id);
919 }
920 }
921 }
922
923 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
925 log::debug!(
926 "Purging closed positions{}",
927 if buffer_secs > 0 {
928 format!(" with buffer_secs={buffer_secs}")
929 } else {
930 String::new()
931 }
932 );
933
934 let buffer_ns = secs_to_nanos(buffer_secs as f64);
935
936 for position_id in self.index.positions_closed.clone() {
937 if let Some(position) = self.positions.get(&position_id)
938 && let Some(ts_closed) = position.ts_closed
939 && ts_closed + buffer_ns <= ts_now
940 {
941 self.purge_position(position_id);
942 }
943 }
944 }
945
946 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
950 if let Some(position_id) = self.index.order_position.get(&client_order_id)
952 && let Some(position) = self.positions.get_mut(position_id)
953 {
954 position.purge_events_for_order(client_order_id);
955 }
956
957 if let Some(order) = self.orders.remove(&client_order_id) {
958 if let Some(venue_orders) = self
960 .index
961 .venue_orders
962 .get_mut(&order.instrument_id().venue)
963 {
964 venue_orders.remove(&client_order_id);
965 }
966
967 if let Some(venue_order_id) = order.venue_order_id() {
969 self.index.venue_order_ids.remove(&venue_order_id);
970 }
971
972 if let Some(instrument_orders) =
974 self.index.instrument_orders.get_mut(&order.instrument_id())
975 {
976 instrument_orders.remove(&client_order_id);
977 }
978
979 if let Some(position_id) = order.position_id()
981 && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
982 {
983 position_orders.remove(&client_order_id);
984 }
985
986 if let Some(exec_algorithm_id) = order.exec_algorithm_id()
988 && let Some(exec_algorithm_orders) =
989 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
990 {
991 exec_algorithm_orders.remove(&client_order_id);
992 }
993
994 log::info!("Purged order {client_order_id}");
995 } else {
996 log::warn!("Order {client_order_id} not found when purging");
997 }
998
999 self.index.order_position.remove(&client_order_id);
1001 self.index.order_strategy.remove(&client_order_id);
1002 self.index.order_client.remove(&client_order_id);
1003 self.index.client_order_ids.remove(&client_order_id);
1004 self.index.exec_spawn_orders.remove(&client_order_id);
1005 self.index.orders.remove(&client_order_id);
1006 self.index.orders_closed.remove(&client_order_id);
1007 self.index.orders_emulated.remove(&client_order_id);
1008 self.index.orders_inflight.remove(&client_order_id);
1009 self.index.orders_pending_cancel.remove(&client_order_id);
1010 }
1011
1012 pub fn purge_position(&mut self, position_id: PositionId) {
1014 if let Some(position) = self.positions.remove(&position_id) {
1015 if let Some(venue_positions) = self
1017 .index
1018 .venue_positions
1019 .get_mut(&position.instrument_id.venue)
1020 {
1021 venue_positions.remove(&position_id);
1022 }
1023
1024 if let Some(instrument_positions) = self
1026 .index
1027 .instrument_positions
1028 .get_mut(&position.instrument_id)
1029 {
1030 instrument_positions.remove(&position_id);
1031 }
1032
1033 if let Some(strategy_positions) =
1035 self.index.strategy_positions.get_mut(&position.strategy_id)
1036 {
1037 strategy_positions.remove(&position_id);
1038 }
1039
1040 for client_order_id in position.client_order_ids() {
1042 self.index.order_position.remove(&client_order_id);
1043 }
1044
1045 log::info!("Purged position {position_id}");
1046 } else {
1047 log::warn!("Position {position_id} not found when purging");
1048 }
1049
1050 self.index.position_strategy.remove(&position_id);
1052 self.index.position_orders.remove(&position_id);
1053 self.index.positions.remove(&position_id);
1054 self.index.positions_open.remove(&position_id);
1055 self.index.positions_closed.remove(&position_id);
1056 }
1057
1058 pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1063 log::debug!(
1064 "Purging account events{}",
1065 if lookback_secs > 0 {
1066 format!(" with lookback_secs={lookback_secs}")
1067 } else {
1068 String::new()
1069 }
1070 );
1071
1072 for account in self.accounts.values_mut() {
1073 let event_count = account.event_count();
1074 account.purge_account_events(ts_now, lookback_secs);
1075 let count_diff = event_count - account.event_count();
1076 if count_diff > 0 {
1077 log::info!(
1078 "Purged {} event(s) from account {}",
1079 count_diff,
1080 account.id()
1081 );
1082 }
1083 }
1084 }
1085
1086 pub fn clear_index(&mut self) {
1088 self.index.clear();
1089 log::debug!("Cleared index");
1090 }
1091
1092 pub fn reset(&mut self) {
1096 log::debug!("Resetting cache");
1097
1098 self.general.clear();
1099 self.currencies.clear();
1100 self.instruments.clear();
1101 self.synthetics.clear();
1102 self.books.clear();
1103 self.own_books.clear();
1104 self.quotes.clear();
1105 self.trades.clear();
1106 self.mark_xrates.clear();
1107 self.mark_prices.clear();
1108 self.index_prices.clear();
1109 self.bars.clear();
1110 self.accounts.clear();
1111 self.orders.clear();
1112 self.order_lists.clear();
1113 self.positions.clear();
1114 self.position_snapshots.clear();
1115 self.greeks.clear();
1116 self.yield_curves.clear();
1117
1118 #[cfg(feature = "defi")]
1119 self.pools.clear();
1120
1121 self.clear_index();
1122
1123 log::info!("Reset cache");
1124 }
1125
1126 pub fn dispose(&mut self) {
1132 if let Some(database) = &mut self.database {
1133 database.close().expect("Failed to close database");
1134 }
1135 }
1136
1137 pub fn flush_db(&mut self) {
1143 if let Some(database) = &mut self.database {
1144 database.flush().expect("Failed to flush database");
1145 }
1146 }
1147
1148 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1156 check_valid_string(key, stringify!(key))?;
1157 check_predicate_false(value.is_empty(), stringify!(value))?;
1158
1159 log::debug!("Adding general {key}");
1160 self.general.insert(key.to_string(), value.clone());
1161
1162 if let Some(database) = &mut self.database {
1163 database.add(key.to_string(), value)?;
1164 }
1165 Ok(())
1166 }
1167
1168 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1174 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1175
1176 if self.config.save_market_data
1177 && let Some(database) = &mut self.database
1178 {
1179 database.add_order_book(&book)?;
1180 }
1181
1182 self.books.insert(book.instrument_id, book);
1183 Ok(())
1184 }
1185
1186 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1192 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1193
1194 self.own_books.insert(own_book.instrument_id, own_book);
1195 Ok(())
1196 }
1197
1198 #[cfg(feature = "defi")]
1204 pub fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
1205 log::debug!("Adding `Pool` {}", pool.instrument_id);
1206
1207 self.pools.insert(pool.instrument_id, pool);
1208 Ok(())
1209 }
1210
1211 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1217 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1218
1219 if self.config.save_market_data {
1220 }
1222
1223 let mark_prices_deque = self
1224 .mark_prices
1225 .entry(mark_price.instrument_id)
1226 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1227 mark_prices_deque.push_front(mark_price);
1228 Ok(())
1229 }
1230
1231 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1237 log::debug!(
1238 "Adding `IndexPriceUpdate` for {}",
1239 index_price.instrument_id
1240 );
1241
1242 if self.config.save_market_data {
1243 }
1245
1246 let index_prices_deque = self
1247 .index_prices
1248 .entry(index_price.instrument_id)
1249 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1250 index_prices_deque.push_front(index_price);
1251 Ok(())
1252 }
1253
1254 pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1260 log::debug!(
1261 "Adding `FundingRateUpdate` for {}",
1262 funding_rate.instrument_id
1263 );
1264
1265 if self.config.save_market_data {
1266 }
1268
1269 self.funding_rates
1270 .insert(funding_rate.instrument_id, funding_rate);
1271 Ok(())
1272 }
1273
1274 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1280 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1281
1282 if self.config.save_market_data
1283 && let Some(database) = &mut self.database
1284 {
1285 database.add_quote("e)?;
1286 }
1287
1288 let quotes_deque = self
1289 .quotes
1290 .entry(quote.instrument_id)
1291 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1292 quotes_deque.push_front(quote);
1293 Ok(())
1294 }
1295
1296 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1302 check_slice_not_empty(quotes, stringify!(quotes))?;
1303
1304 let instrument_id = quotes[0].instrument_id;
1305 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1306
1307 if self.config.save_market_data
1308 && let Some(database) = &mut self.database
1309 {
1310 for quote in quotes {
1311 database.add_quote(quote)?;
1312 }
1313 }
1314
1315 let quotes_deque = self
1316 .quotes
1317 .entry(instrument_id)
1318 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1319
1320 for quote in quotes {
1321 quotes_deque.push_front(*quote);
1322 }
1323 Ok(())
1324 }
1325
1326 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1332 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1333
1334 if self.config.save_market_data
1335 && let Some(database) = &mut self.database
1336 {
1337 database.add_trade(&trade)?;
1338 }
1339
1340 let trades_deque = self
1341 .trades
1342 .entry(trade.instrument_id)
1343 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1344 trades_deque.push_front(trade);
1345 Ok(())
1346 }
1347
1348 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1354 check_slice_not_empty(trades, stringify!(trades))?;
1355
1356 let instrument_id = trades[0].instrument_id;
1357 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1358
1359 if self.config.save_market_data
1360 && let Some(database) = &mut self.database
1361 {
1362 for trade in trades {
1363 database.add_trade(trade)?;
1364 }
1365 }
1366
1367 let trades_deque = self
1368 .trades
1369 .entry(instrument_id)
1370 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1371
1372 for trade in trades {
1373 trades_deque.push_front(*trade);
1374 }
1375 Ok(())
1376 }
1377
1378 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1384 log::debug!("Adding `Bar` {}", bar.bar_type);
1385
1386 if self.config.save_market_data
1387 && let Some(database) = &mut self.database
1388 {
1389 database.add_bar(&bar)?;
1390 }
1391
1392 let bars = self
1393 .bars
1394 .entry(bar.bar_type)
1395 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1396 bars.push_front(bar);
1397 Ok(())
1398 }
1399
1400 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1406 check_slice_not_empty(bars, stringify!(bars))?;
1407
1408 let bar_type = bars[0].bar_type;
1409 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1410
1411 if self.config.save_market_data
1412 && let Some(database) = &mut self.database
1413 {
1414 for bar in bars {
1415 database.add_bar(bar)?;
1416 }
1417 }
1418
1419 let bars_deque = self
1420 .bars
1421 .entry(bar_type)
1422 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1423
1424 for bar in bars {
1425 bars_deque.push_front(*bar);
1426 }
1427 Ok(())
1428 }
1429
1430 pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1436 log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1437
1438 if self.config.save_market_data
1439 && let Some(_database) = &mut self.database
1440 {
1441 }
1443
1444 self.greeks.insert(greeks.instrument_id, greeks);
1445 Ok(())
1446 }
1447
1448 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1450 self.greeks.get(instrument_id).cloned()
1451 }
1452
1453 pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1459 log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1460
1461 if self.config.save_market_data
1462 && let Some(_database) = &mut self.database
1463 {
1464 }
1466
1467 self.yield_curves
1468 .insert(yield_curve.curve_name.clone(), yield_curve);
1469 Ok(())
1470 }
1471
1472 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1474 self.yield_curves.get(key).map(|curve| {
1475 let curve_clone = curve.clone();
1476 Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1477 as Box<dyn Fn(f64) -> f64>
1478 })
1479 }
1480
1481 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1487 log::debug!("Adding `Currency` {}", currency.code);
1488
1489 if let Some(database) = &mut self.database {
1490 database.add_currency(¤cy)?;
1491 }
1492
1493 self.currencies.insert(currency.code, currency);
1494 Ok(())
1495 }
1496
1497 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1503 log::debug!("Adding `Instrument` {}", instrument.id());
1504
1505 if let Some(database) = &mut self.database {
1506 database.add_instrument(&instrument)?;
1507 }
1508
1509 self.instruments.insert(instrument.id(), instrument);
1510 Ok(())
1511 }
1512
1513 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1519 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1520
1521 if let Some(database) = &mut self.database {
1522 database.add_synthetic(&synthetic)?;
1523 }
1524
1525 self.synthetics.insert(synthetic.id, synthetic);
1526 Ok(())
1527 }
1528
1529 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1535 log::debug!("Adding `Account` {}", account.id());
1536
1537 if let Some(database) = &mut self.database {
1538 database.add_account(&account)?;
1539 }
1540
1541 let account_id = account.id();
1542 self.accounts.insert(account_id, account);
1543 self.index
1544 .venue_account
1545 .insert(account_id.get_issuer(), account_id);
1546 Ok(())
1547 }
1548
1549 pub fn add_venue_order_id(
1557 &mut self,
1558 client_order_id: &ClientOrderId,
1559 venue_order_id: &VenueOrderId,
1560 overwrite: bool,
1561 ) -> anyhow::Result<()> {
1562 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1563 && !overwrite
1564 && existing_venue_order_id != venue_order_id
1565 {
1566 anyhow::bail!(
1567 "Existing {existing_venue_order_id} for {client_order_id}
1568 did not match the given {venue_order_id}.
1569 If you are writing a test then try a different `venue_order_id`,
1570 otherwise this is probably a bug."
1571 );
1572 }
1573
1574 self.index
1575 .client_order_ids
1576 .insert(*client_order_id, *venue_order_id);
1577 self.index
1578 .venue_order_ids
1579 .insert(*venue_order_id, *client_order_id);
1580
1581 Ok(())
1582 }
1583
1584 pub fn add_order(
1596 &mut self,
1597 order: OrderAny,
1598 position_id: Option<PositionId>,
1599 client_id: Option<ClientId>,
1600 replace_existing: bool,
1601 ) -> anyhow::Result<()> {
1602 let instrument_id = order.instrument_id();
1603 let venue = instrument_id.venue;
1604 let client_order_id = order.client_order_id();
1605 let strategy_id = order.strategy_id();
1606 let exec_algorithm_id = order.exec_algorithm_id();
1607 let exec_spawn_id = order.exec_spawn_id();
1608
1609 if !replace_existing {
1610 check_key_not_in_map(
1611 &client_order_id,
1612 &self.orders,
1613 stringify!(client_order_id),
1614 stringify!(orders),
1615 )?;
1616 }
1617
1618 log::debug!("Adding {order:?}");
1619
1620 self.index.orders.insert(client_order_id);
1621 self.index
1622 .order_strategy
1623 .insert(client_order_id, strategy_id);
1624 self.index.strategies.insert(strategy_id);
1625
1626 self.index
1628 .venue_orders
1629 .entry(venue)
1630 .or_default()
1631 .insert(client_order_id);
1632
1633 self.index
1635 .instrument_orders
1636 .entry(instrument_id)
1637 .or_default()
1638 .insert(client_order_id);
1639
1640 self.index
1642 .strategy_orders
1643 .entry(strategy_id)
1644 .or_default()
1645 .insert(client_order_id);
1646
1647 if let (Some(exec_algorithm_id), Some(exec_spawn_id)) = (exec_algorithm_id, exec_spawn_id) {
1650 self.index.exec_algorithms.insert(exec_algorithm_id);
1651
1652 self.index
1653 .exec_algorithm_orders
1654 .entry(exec_algorithm_id)
1655 .or_default()
1656 .insert(client_order_id);
1657
1658 self.index
1659 .exec_spawn_orders
1660 .entry(exec_spawn_id)
1661 .or_default()
1662 .insert(client_order_id);
1663 }
1664
1665 match order.emulation_trigger() {
1667 Some(_) => {
1668 self.index.orders_emulated.remove(&client_order_id);
1669 }
1670 None => {
1671 self.index.orders_emulated.insert(client_order_id);
1672 }
1673 }
1674
1675 if let Some(position_id) = position_id {
1677 self.add_position_id(
1678 &position_id,
1679 &order.instrument_id().venue,
1680 &client_order_id,
1681 &strategy_id,
1682 )?;
1683 }
1684
1685 if let Some(client_id) = client_id {
1687 self.index.order_client.insert(client_order_id, client_id);
1688 log::debug!("Indexed {client_id:?}");
1689 }
1690
1691 if let Some(database) = &mut self.database {
1692 database.add_order(&order, client_id)?;
1693 }
1698
1699 self.orders.insert(client_order_id, order);
1700
1701 Ok(())
1702 }
1703
1704 pub fn add_position_id(
1710 &mut self,
1711 position_id: &PositionId,
1712 venue: &Venue,
1713 client_order_id: &ClientOrderId,
1714 strategy_id: &StrategyId,
1715 ) -> anyhow::Result<()> {
1716 self.index
1717 .order_position
1718 .insert(*client_order_id, *position_id);
1719
1720 if let Some(database) = &mut self.database {
1722 database.index_order_position(*client_order_id, *position_id)?;
1723 }
1724
1725 self.index
1727 .position_strategy
1728 .insert(*position_id, *strategy_id);
1729
1730 self.index
1732 .position_orders
1733 .entry(*position_id)
1734 .or_default()
1735 .insert(*client_order_id);
1736
1737 self.index
1739 .strategy_positions
1740 .entry(*strategy_id)
1741 .or_default()
1742 .insert(*position_id);
1743
1744 self.index
1746 .venue_positions
1747 .entry(*venue)
1748 .or_default()
1749 .insert(*position_id);
1750
1751 Ok(())
1752 }
1753
1754 pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1760 self.positions.insert(position.id, position.clone());
1761 self.index.positions.insert(position.id);
1762 self.index.positions_open.insert(position.id);
1763
1764 log::debug!("Adding {position}");
1765
1766 self.add_position_id(
1767 &position.id,
1768 &position.instrument_id.venue,
1769 &position.opening_order_id,
1770 &position.strategy_id,
1771 )?;
1772
1773 let venue = position.instrument_id.venue;
1774 let venue_positions = self.index.venue_positions.entry(venue).or_default();
1775 venue_positions.insert(position.id);
1776
1777 let instrument_id = position.instrument_id;
1779 let instrument_positions = self
1780 .index
1781 .instrument_positions
1782 .entry(instrument_id)
1783 .or_default();
1784 instrument_positions.insert(position.id);
1785
1786 if let Some(database) = &mut self.database {
1787 database.add_position(&position)?;
1788 }
1797
1798 Ok(())
1799 }
1800
1801 pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1807 if let Some(database) = &mut self.database {
1808 database.update_account(&account)?;
1809 }
1810 Ok(())
1811 }
1812
1813 pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1819 let client_order_id = order.client_order_id();
1820
1821 if let Some(venue_order_id) = order.venue_order_id() {
1823 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1826 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1828 }
1829 }
1830
1831 if order.is_inflight() {
1833 self.index.orders_inflight.insert(client_order_id);
1834 } else {
1835 self.index.orders_inflight.remove(&client_order_id);
1836 }
1837
1838 if order.is_open() {
1840 self.index.orders_closed.remove(&client_order_id);
1841 self.index.orders_open.insert(client_order_id);
1842 } else if order.is_closed() {
1843 self.index.orders_open.remove(&client_order_id);
1844 self.index.orders_pending_cancel.remove(&client_order_id);
1845 self.index.orders_closed.insert(client_order_id);
1846 }
1847
1848 if let Some(emulation_trigger) = order.emulation_trigger() {
1850 match emulation_trigger {
1851 TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1852 _ => self.index.orders_emulated.insert(client_order_id),
1853 };
1854 }
1855
1856 if self.own_order_book(&order.instrument_id()).is_some()
1858 && should_handle_own_book_order(order)
1859 {
1860 self.update_own_order_book(order);
1861 }
1862
1863 if let Some(database) = &mut self.database {
1864 database.update_order(order.last_event())?;
1865 }
1870
1871 self.orders.insert(client_order_id, order.clone());
1873
1874 Ok(())
1875 }
1876
1877 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1879 self.index
1880 .orders_pending_cancel
1881 .insert(order.client_order_id());
1882 }
1883
1884 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1890 if position.is_open() {
1893 self.index.positions_open.insert(position.id);
1894 self.index.positions_closed.remove(&position.id);
1895 } else {
1896 self.index.positions_closed.insert(position.id);
1897 self.index.positions_open.remove(&position.id);
1898 }
1899
1900 if let Some(database) = &mut self.database {
1901 database.update_position(position)?;
1902 }
1907
1908 self.positions.insert(position.id, position.clone());
1909
1910 Ok(())
1911 }
1912
1913 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1920 let position_id = position.id;
1921
1922 let mut copied_position = position.clone();
1923 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1924 copied_position.id = PositionId::new(new_id);
1925
1926 let position_serialized = serde_json::to_vec(&copied_position)?;
1928
1929 let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1930 let new_snapshots = match snapshots {
1931 Some(existing_snapshots) => {
1932 let mut combined = existing_snapshots.to_vec();
1933 combined.extend(position_serialized);
1934 Bytes::from(combined)
1935 }
1936 None => Bytes::from(position_serialized),
1937 };
1938 self.position_snapshots.insert(position_id, new_snapshots);
1939
1940 log::debug!("Snapshot {copied_position}");
1941 Ok(())
1942 }
1943
1944 pub fn snapshot_position_state(
1950 &mut self,
1951 position: &Position,
1952 open_only: Option<bool>,
1955 ) -> anyhow::Result<()> {
1956 let open_only = open_only.unwrap_or(true);
1957
1958 if open_only && !position.is_open() {
1959 return Ok(());
1960 }
1961
1962 if let Some(database) = &mut self.database {
1963 database.snapshot_position_state(position).map_err(|e| {
1964 log::error!(
1965 "Failed to snapshot position state for {}: {e:?}",
1966 position.id
1967 );
1968 e
1969 })?;
1970 } else {
1971 log::warn!(
1972 "Cannot snapshot position state for {} (no database configured)",
1973 position.id
1974 );
1975 }
1976
1977 todo!()
1979 }
1980
1981 #[must_use]
1983 pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
1984 if self.index.position_strategy.contains_key(position_id) {
1986 Some(OmsType::Netting)
1989 } else {
1990 None
1991 }
1992 }
1993
1994 #[must_use]
1996 pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
1997 self.position_snapshots.get(position_id).map(|b| b.to_vec())
1998 }
1999
2000 #[must_use]
2002 pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> HashSet<PositionId> {
2003 let mut result = HashSet::new();
2005 for (position_id, _) in &self.position_snapshots {
2006 if let Some(position) = self.positions.get(position_id)
2008 && position.instrument_id == *instrument_id
2009 {
2010 result.insert(*position_id);
2011 }
2012 }
2013 result
2014 }
2015
2016 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2022 let database = if let Some(database) = &self.database {
2023 database
2024 } else {
2025 log::warn!(
2026 "Cannot snapshot order state for {} (no database configured)",
2027 order.client_order_id()
2028 );
2029 return Ok(());
2030 };
2031
2032 database.snapshot_order_state(order)
2033 }
2034
2035 fn build_order_query_filter_set(
2038 &self,
2039 venue: Option<&Venue>,
2040 instrument_id: Option<&InstrumentId>,
2041 strategy_id: Option<&StrategyId>,
2042 ) -> Option<AHashSet<ClientOrderId>> {
2043 let mut query: Option<AHashSet<ClientOrderId>> = None;
2044
2045 if let Some(venue) = venue {
2046 query = Some(
2047 self.index
2048 .venue_orders
2049 .get(venue)
2050 .cloned()
2051 .unwrap_or_default(),
2052 );
2053 }
2054
2055 if let Some(instrument_id) = instrument_id {
2056 let instrument_orders = self
2057 .index
2058 .instrument_orders
2059 .get(instrument_id)
2060 .cloned()
2061 .unwrap_or_default();
2062
2063 if let Some(existing_query) = &mut query {
2064 *existing_query = existing_query
2065 .intersection(&instrument_orders)
2066 .copied()
2067 .collect();
2068 } else {
2069 query = Some(instrument_orders);
2070 }
2071 }
2072
2073 if let Some(strategy_id) = strategy_id {
2074 let strategy_orders = self
2075 .index
2076 .strategy_orders
2077 .get(strategy_id)
2078 .cloned()
2079 .unwrap_or_default();
2080
2081 if let Some(existing_query) = &mut query {
2082 *existing_query = existing_query
2083 .intersection(&strategy_orders)
2084 .copied()
2085 .collect();
2086 } else {
2087 query = Some(strategy_orders);
2088 }
2089 }
2090
2091 query
2092 }
2093
2094 fn build_position_query_filter_set(
2095 &self,
2096 venue: Option<&Venue>,
2097 instrument_id: Option<&InstrumentId>,
2098 strategy_id: Option<&StrategyId>,
2099 ) -> Option<AHashSet<PositionId>> {
2100 let mut query: Option<AHashSet<PositionId>> = None;
2101
2102 if let Some(venue) = venue {
2103 query = Some(
2104 self.index
2105 .venue_positions
2106 .get(venue)
2107 .cloned()
2108 .unwrap_or_default(),
2109 );
2110 }
2111
2112 if let Some(instrument_id) = instrument_id {
2113 let instrument_positions = self
2114 .index
2115 .instrument_positions
2116 .get(instrument_id)
2117 .cloned()
2118 .unwrap_or_default();
2119
2120 if let Some(existing_query) = query {
2121 query = Some(
2122 existing_query
2123 .intersection(&instrument_positions)
2124 .copied()
2125 .collect(),
2126 );
2127 } else {
2128 query = Some(instrument_positions);
2129 }
2130 }
2131
2132 if let Some(strategy_id) = strategy_id {
2133 let strategy_positions = self
2134 .index
2135 .strategy_positions
2136 .get(strategy_id)
2137 .cloned()
2138 .unwrap_or_default();
2139
2140 if let Some(existing_query) = query {
2141 query = Some(
2142 existing_query
2143 .intersection(&strategy_positions)
2144 .copied()
2145 .collect(),
2146 );
2147 } else {
2148 query = Some(strategy_positions);
2149 }
2150 }
2151
2152 query
2153 }
2154
2155 fn get_orders_for_ids(
2161 &self,
2162 client_order_ids: &AHashSet<ClientOrderId>,
2163 side: Option<OrderSide>,
2164 ) -> Vec<&OrderAny> {
2165 let side = side.unwrap_or(OrderSide::NoOrderSide);
2166 let mut orders = Vec::new();
2167
2168 for client_order_id in client_order_ids {
2169 let order = self
2170 .orders
2171 .get(client_order_id)
2172 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2173 if side == OrderSide::NoOrderSide || side == order.order_side() {
2174 orders.push(order);
2175 }
2176 }
2177
2178 orders
2179 }
2180
2181 fn get_positions_for_ids(
2187 &self,
2188 position_ids: &AHashSet<PositionId>,
2189 side: Option<PositionSide>,
2190 ) -> Vec<&Position> {
2191 let side = side.unwrap_or(PositionSide::NoPositionSide);
2192 let mut positions = Vec::new();
2193
2194 for position_id in position_ids {
2195 let position = self
2196 .positions
2197 .get(position_id)
2198 .unwrap_or_else(|| panic!("Position {position_id} not found"));
2199 if side == PositionSide::NoPositionSide || side == position.side {
2200 positions.push(position);
2201 }
2202 }
2203
2204 positions
2205 }
2206
2207 #[must_use]
2209 pub fn client_order_ids(
2210 &self,
2211 venue: Option<&Venue>,
2212 instrument_id: Option<&InstrumentId>,
2213 strategy_id: Option<&StrategyId>,
2214 ) -> AHashSet<ClientOrderId> {
2215 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2216 match query {
2217 Some(query) => self.index.orders.intersection(&query).copied().collect(),
2218 None => self.index.orders.clone(),
2219 }
2220 }
2221
2222 #[must_use]
2224 pub fn client_order_ids_open(
2225 &self,
2226 venue: Option<&Venue>,
2227 instrument_id: Option<&InstrumentId>,
2228 strategy_id: Option<&StrategyId>,
2229 ) -> AHashSet<ClientOrderId> {
2230 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2231 match query {
2232 Some(query) => self
2233 .index
2234 .orders_open
2235 .intersection(&query)
2236 .copied()
2237 .collect(),
2238 None => self.index.orders_open.clone(),
2239 }
2240 }
2241
2242 #[must_use]
2244 pub fn client_order_ids_closed(
2245 &self,
2246 venue: Option<&Venue>,
2247 instrument_id: Option<&InstrumentId>,
2248 strategy_id: Option<&StrategyId>,
2249 ) -> AHashSet<ClientOrderId> {
2250 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2251 match query {
2252 Some(query) => self
2253 .index
2254 .orders_closed
2255 .intersection(&query)
2256 .copied()
2257 .collect(),
2258 None => self.index.orders_closed.clone(),
2259 }
2260 }
2261
2262 #[must_use]
2264 pub fn client_order_ids_emulated(
2265 &self,
2266 venue: Option<&Venue>,
2267 instrument_id: Option<&InstrumentId>,
2268 strategy_id: Option<&StrategyId>,
2269 ) -> AHashSet<ClientOrderId> {
2270 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2271 match query {
2272 Some(query) => self
2273 .index
2274 .orders_emulated
2275 .intersection(&query)
2276 .copied()
2277 .collect(),
2278 None => self.index.orders_emulated.clone(),
2279 }
2280 }
2281
2282 #[must_use]
2284 pub fn client_order_ids_inflight(
2285 &self,
2286 venue: Option<&Venue>,
2287 instrument_id: Option<&InstrumentId>,
2288 strategy_id: Option<&StrategyId>,
2289 ) -> AHashSet<ClientOrderId> {
2290 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2291 match query {
2292 Some(query) => self
2293 .index
2294 .orders_inflight
2295 .intersection(&query)
2296 .copied()
2297 .collect(),
2298 None => self.index.orders_inflight.clone(),
2299 }
2300 }
2301
2302 #[must_use]
2304 pub fn position_ids(
2305 &self,
2306 venue: Option<&Venue>,
2307 instrument_id: Option<&InstrumentId>,
2308 strategy_id: Option<&StrategyId>,
2309 ) -> AHashSet<PositionId> {
2310 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2311 match query {
2312 Some(query) => self.index.positions.intersection(&query).copied().collect(),
2313 None => self.index.positions.clone(),
2314 }
2315 }
2316
2317 #[must_use]
2319 pub fn position_open_ids(
2320 &self,
2321 venue: Option<&Venue>,
2322 instrument_id: Option<&InstrumentId>,
2323 strategy_id: Option<&StrategyId>,
2324 ) -> AHashSet<PositionId> {
2325 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2326 match query {
2327 Some(query) => self
2328 .index
2329 .positions_open
2330 .intersection(&query)
2331 .copied()
2332 .collect(),
2333 None => self.index.positions_open.clone(),
2334 }
2335 }
2336
2337 #[must_use]
2339 pub fn position_closed_ids(
2340 &self,
2341 venue: Option<&Venue>,
2342 instrument_id: Option<&InstrumentId>,
2343 strategy_id: Option<&StrategyId>,
2344 ) -> AHashSet<PositionId> {
2345 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2346 match query {
2347 Some(query) => self
2348 .index
2349 .positions_closed
2350 .intersection(&query)
2351 .copied()
2352 .collect(),
2353 None => self.index.positions_closed.clone(),
2354 }
2355 }
2356
2357 #[must_use]
2359 pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2360 self.index.actors.clone()
2361 }
2362
2363 #[must_use]
2365 pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2366 self.index.strategies.clone()
2367 }
2368
2369 #[must_use]
2371 pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2372 self.index.exec_algorithms.clone()
2373 }
2374
2375 #[must_use]
2379 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2380 self.orders.get(client_order_id)
2381 }
2382
2383 #[must_use]
2385 pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2386 self.orders.get_mut(client_order_id)
2387 }
2388
2389 #[must_use]
2391 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2392 self.index.venue_order_ids.get(venue_order_id)
2393 }
2394
2395 #[must_use]
2397 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2398 self.index.client_order_ids.get(client_order_id)
2399 }
2400
2401 #[must_use]
2403 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2404 self.index.order_client.get(client_order_id)
2405 }
2406
2407 #[must_use]
2409 pub fn orders(
2410 &self,
2411 venue: Option<&Venue>,
2412 instrument_id: Option<&InstrumentId>,
2413 strategy_id: Option<&StrategyId>,
2414 side: Option<OrderSide>,
2415 ) -> Vec<&OrderAny> {
2416 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2417 self.get_orders_for_ids(&client_order_ids, side)
2418 }
2419
2420 #[must_use]
2422 pub fn orders_open(
2423 &self,
2424 venue: Option<&Venue>,
2425 instrument_id: Option<&InstrumentId>,
2426 strategy_id: Option<&StrategyId>,
2427 side: Option<OrderSide>,
2428 ) -> Vec<&OrderAny> {
2429 let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2430 self.get_orders_for_ids(&client_order_ids, side)
2431 }
2432
2433 #[must_use]
2435 pub fn orders_closed(
2436 &self,
2437 venue: Option<&Venue>,
2438 instrument_id: Option<&InstrumentId>,
2439 strategy_id: Option<&StrategyId>,
2440 side: Option<OrderSide>,
2441 ) -> Vec<&OrderAny> {
2442 let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2443 self.get_orders_for_ids(&client_order_ids, side)
2444 }
2445
2446 #[must_use]
2448 pub fn orders_emulated(
2449 &self,
2450 venue: Option<&Venue>,
2451 instrument_id: Option<&InstrumentId>,
2452 strategy_id: Option<&StrategyId>,
2453 side: Option<OrderSide>,
2454 ) -> Vec<&OrderAny> {
2455 let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2456 self.get_orders_for_ids(&client_order_ids, side)
2457 }
2458
2459 #[must_use]
2461 pub fn orders_inflight(
2462 &self,
2463 venue: Option<&Venue>,
2464 instrument_id: Option<&InstrumentId>,
2465 strategy_id: Option<&StrategyId>,
2466 side: Option<OrderSide>,
2467 ) -> Vec<&OrderAny> {
2468 let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2469 self.get_orders_for_ids(&client_order_ids, side)
2470 }
2471
2472 #[must_use]
2474 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2475 let client_order_ids = self.index.position_orders.get(position_id);
2476 match client_order_ids {
2477 Some(client_order_ids) => {
2478 self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2479 }
2480 None => Vec::new(),
2481 }
2482 }
2483
2484 #[must_use]
2486 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2487 self.index.orders.contains(client_order_id)
2488 }
2489
2490 #[must_use]
2492 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2493 self.index.orders_open.contains(client_order_id)
2494 }
2495
2496 #[must_use]
2498 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2499 self.index.orders_closed.contains(client_order_id)
2500 }
2501
2502 #[must_use]
2504 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2505 self.index.orders_emulated.contains(client_order_id)
2506 }
2507
2508 #[must_use]
2510 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2511 self.index.orders_inflight.contains(client_order_id)
2512 }
2513
2514 #[must_use]
2516 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2517 self.index.orders_pending_cancel.contains(client_order_id)
2518 }
2519
2520 #[must_use]
2522 pub fn orders_open_count(
2523 &self,
2524 venue: Option<&Venue>,
2525 instrument_id: Option<&InstrumentId>,
2526 strategy_id: Option<&StrategyId>,
2527 side: Option<OrderSide>,
2528 ) -> usize {
2529 self.orders_open(venue, instrument_id, strategy_id, side)
2530 .len()
2531 }
2532
2533 #[must_use]
2535 pub fn orders_closed_count(
2536 &self,
2537 venue: Option<&Venue>,
2538 instrument_id: Option<&InstrumentId>,
2539 strategy_id: Option<&StrategyId>,
2540 side: Option<OrderSide>,
2541 ) -> usize {
2542 self.orders_closed(venue, instrument_id, strategy_id, side)
2543 .len()
2544 }
2545
2546 #[must_use]
2548 pub fn orders_emulated_count(
2549 &self,
2550 venue: Option<&Venue>,
2551 instrument_id: Option<&InstrumentId>,
2552 strategy_id: Option<&StrategyId>,
2553 side: Option<OrderSide>,
2554 ) -> usize {
2555 self.orders_emulated(venue, instrument_id, strategy_id, side)
2556 .len()
2557 }
2558
2559 #[must_use]
2561 pub fn orders_inflight_count(
2562 &self,
2563 venue: Option<&Venue>,
2564 instrument_id: Option<&InstrumentId>,
2565 strategy_id: Option<&StrategyId>,
2566 side: Option<OrderSide>,
2567 ) -> usize {
2568 self.orders_inflight(venue, instrument_id, strategy_id, side)
2569 .len()
2570 }
2571
2572 #[must_use]
2574 pub fn orders_total_count(
2575 &self,
2576 venue: Option<&Venue>,
2577 instrument_id: Option<&InstrumentId>,
2578 strategy_id: Option<&StrategyId>,
2579 side: Option<OrderSide>,
2580 ) -> usize {
2581 self.orders(venue, instrument_id, strategy_id, side).len()
2582 }
2583
2584 #[must_use]
2586 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2587 self.order_lists.get(order_list_id)
2588 }
2589
2590 #[must_use]
2592 pub fn order_lists(
2593 &self,
2594 venue: Option<&Venue>,
2595 instrument_id: Option<&InstrumentId>,
2596 strategy_id: Option<&StrategyId>,
2597 ) -> Vec<&OrderList> {
2598 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2599
2600 if let Some(venue) = venue {
2601 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2602 }
2603
2604 if let Some(instrument_id) = instrument_id {
2605 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2606 }
2607
2608 if let Some(strategy_id) = strategy_id {
2609 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2610 }
2611
2612 order_lists
2613 }
2614
2615 #[must_use]
2617 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2618 self.order_lists.contains_key(order_list_id)
2619 }
2620
2621 #[must_use]
2626 pub fn orders_for_exec_algorithm(
2627 &self,
2628 exec_algorithm_id: &ExecAlgorithmId,
2629 venue: Option<&Venue>,
2630 instrument_id: Option<&InstrumentId>,
2631 strategy_id: Option<&StrategyId>,
2632 side: Option<OrderSide>,
2633 ) -> Vec<&OrderAny> {
2634 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2635 let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2636
2637 if let Some(query) = query
2638 && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
2639 {
2640 let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2641 }
2642
2643 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2644 self.get_orders_for_ids(exec_algorithm_order_ids, side)
2645 } else {
2646 Vec::new()
2647 }
2648 }
2649
2650 #[must_use]
2652 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2653 self.get_orders_for_ids(
2654 self.index
2655 .exec_spawn_orders
2656 .get(exec_spawn_id)
2657 .unwrap_or(&AHashSet::new()),
2658 None,
2659 )
2660 }
2661
2662 #[must_use]
2664 pub fn exec_spawn_total_quantity(
2665 &self,
2666 exec_spawn_id: &ClientOrderId,
2667 active_only: bool,
2668 ) -> Option<Quantity> {
2669 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2670
2671 let mut total_quantity: Option<Quantity> = None;
2672
2673 for spawn_order in exec_spawn_orders {
2674 if !active_only || !spawn_order.is_closed() {
2675 if let Some(mut total_quantity) = total_quantity {
2676 total_quantity += spawn_order.quantity();
2677 }
2678 } else {
2679 total_quantity = Some(spawn_order.quantity());
2680 }
2681 }
2682
2683 total_quantity
2684 }
2685
2686 #[must_use]
2688 pub fn exec_spawn_total_filled_qty(
2689 &self,
2690 exec_spawn_id: &ClientOrderId,
2691 active_only: bool,
2692 ) -> Option<Quantity> {
2693 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2694
2695 let mut total_quantity: Option<Quantity> = None;
2696
2697 for spawn_order in exec_spawn_orders {
2698 if !active_only || !spawn_order.is_closed() {
2699 if let Some(mut total_quantity) = total_quantity {
2700 total_quantity += spawn_order.filled_qty();
2701 }
2702 } else {
2703 total_quantity = Some(spawn_order.filled_qty());
2704 }
2705 }
2706
2707 total_quantity
2708 }
2709
2710 #[must_use]
2712 pub fn exec_spawn_total_leaves_qty(
2713 &self,
2714 exec_spawn_id: &ClientOrderId,
2715 active_only: bool,
2716 ) -> Option<Quantity> {
2717 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2718
2719 let mut total_quantity: Option<Quantity> = None;
2720
2721 for spawn_order in exec_spawn_orders {
2722 if !active_only || !spawn_order.is_closed() {
2723 if let Some(mut total_quantity) = total_quantity {
2724 total_quantity += spawn_order.leaves_qty();
2725 }
2726 } else {
2727 total_quantity = Some(spawn_order.leaves_qty());
2728 }
2729 }
2730
2731 total_quantity
2732 }
2733
2734 #[must_use]
2738 pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2739 self.positions.get(position_id)
2740 }
2741
2742 #[must_use]
2744 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2745 self.index
2746 .order_position
2747 .get(client_order_id)
2748 .and_then(|position_id| self.positions.get(position_id))
2749 }
2750
2751 #[must_use]
2753 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2754 self.index.order_position.get(client_order_id)
2755 }
2756
2757 #[must_use]
2759 pub fn positions(
2760 &self,
2761 venue: Option<&Venue>,
2762 instrument_id: Option<&InstrumentId>,
2763 strategy_id: Option<&StrategyId>,
2764 side: Option<PositionSide>,
2765 ) -> Vec<&Position> {
2766 let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2767 self.get_positions_for_ids(&position_ids, side)
2768 }
2769
2770 #[must_use]
2772 pub fn positions_open(
2773 &self,
2774 venue: Option<&Venue>,
2775 instrument_id: Option<&InstrumentId>,
2776 strategy_id: Option<&StrategyId>,
2777 side: Option<PositionSide>,
2778 ) -> Vec<&Position> {
2779 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2780 self.get_positions_for_ids(&position_ids, side)
2781 }
2782
2783 #[must_use]
2785 pub fn positions_closed(
2786 &self,
2787 venue: Option<&Venue>,
2788 instrument_id: Option<&InstrumentId>,
2789 strategy_id: Option<&StrategyId>,
2790 side: Option<PositionSide>,
2791 ) -> Vec<&Position> {
2792 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2793 self.get_positions_for_ids(&position_ids, side)
2794 }
2795
2796 #[must_use]
2798 pub fn position_exists(&self, position_id: &PositionId) -> bool {
2799 self.index.positions.contains(position_id)
2800 }
2801
2802 #[must_use]
2804 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2805 self.index.positions_open.contains(position_id)
2806 }
2807
2808 #[must_use]
2810 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2811 self.index.positions_closed.contains(position_id)
2812 }
2813
2814 #[must_use]
2816 pub fn positions_open_count(
2817 &self,
2818 venue: Option<&Venue>,
2819 instrument_id: Option<&InstrumentId>,
2820 strategy_id: Option<&StrategyId>,
2821 side: Option<PositionSide>,
2822 ) -> usize {
2823 self.positions_open(venue, instrument_id, strategy_id, side)
2824 .len()
2825 }
2826
2827 #[must_use]
2829 pub fn positions_closed_count(
2830 &self,
2831 venue: Option<&Venue>,
2832 instrument_id: Option<&InstrumentId>,
2833 strategy_id: Option<&StrategyId>,
2834 side: Option<PositionSide>,
2835 ) -> usize {
2836 self.positions_closed(venue, instrument_id, strategy_id, side)
2837 .len()
2838 }
2839
2840 #[must_use]
2842 pub fn positions_total_count(
2843 &self,
2844 venue: Option<&Venue>,
2845 instrument_id: Option<&InstrumentId>,
2846 strategy_id: Option<&StrategyId>,
2847 side: Option<PositionSide>,
2848 ) -> usize {
2849 self.positions(venue, instrument_id, strategy_id, side)
2850 .len()
2851 }
2852
2853 #[must_use]
2857 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2858 self.index.order_strategy.get(client_order_id)
2859 }
2860
2861 #[must_use]
2863 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2864 self.index.position_strategy.get(position_id)
2865 }
2866
2867 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2875 check_valid_string(key, stringify!(key))?;
2876
2877 Ok(self.general.get(key))
2878 }
2879
2880 #[must_use]
2884 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2885 match price_type {
2886 PriceType::Bid => self
2887 .quotes
2888 .get(instrument_id)
2889 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2890 PriceType::Ask => self
2891 .quotes
2892 .get(instrument_id)
2893 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2894 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2895 quotes.front().map(|quote| {
2896 Price::new(
2897 f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2898 quote.bid_price.precision + 1,
2899 )
2900 })
2901 }),
2902 PriceType::Last => self
2903 .trades
2904 .get(instrument_id)
2905 .and_then(|trades| trades.front().map(|trade| trade.price)),
2906 PriceType::Mark => self
2907 .mark_prices
2908 .get(instrument_id)
2909 .and_then(|marks| marks.front().map(|mark| mark.value)),
2910 }
2911 }
2912
2913 #[must_use]
2915 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2916 self.quotes
2917 .get(instrument_id)
2918 .map(|quotes| quotes.iter().copied().collect())
2919 }
2920
2921 #[must_use]
2923 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2924 self.trades
2925 .get(instrument_id)
2926 .map(|trades| trades.iter().copied().collect())
2927 }
2928
2929 #[must_use]
2931 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
2932 self.mark_prices
2933 .get(instrument_id)
2934 .map(|mark_prices| mark_prices.iter().copied().collect())
2935 }
2936
2937 #[must_use]
2939 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
2940 self.index_prices
2941 .get(instrument_id)
2942 .map(|index_prices| index_prices.iter().copied().collect())
2943 }
2944
2945 #[must_use]
2947 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2948 self.bars
2949 .get(bar_type)
2950 .map(|bars| bars.iter().copied().collect())
2951 }
2952
2953 #[must_use]
2955 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
2956 self.books.get(instrument_id)
2957 }
2958
2959 #[must_use]
2961 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
2962 self.books.get_mut(instrument_id)
2963 }
2964
2965 #[must_use]
2967 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
2968 self.own_books.get(instrument_id)
2969 }
2970
2971 #[must_use]
2973 pub fn own_order_book_mut(
2974 &mut self,
2975 instrument_id: &InstrumentId,
2976 ) -> Option<&mut OwnOrderBook> {
2977 self.own_books.get_mut(instrument_id)
2978 }
2979
2980 #[cfg(feature = "defi")]
2982 #[must_use]
2983 pub fn pool(&self, instrument_id: &InstrumentId) -> Option<&Pool> {
2984 self.pools.get(instrument_id)
2985 }
2986
2987 #[cfg(feature = "defi")]
2989 #[must_use]
2990 pub fn pool_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut Pool> {
2991 self.pools.get_mut(instrument_id)
2992 }
2993
2994 #[must_use]
2996 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
2997 self.quotes
2998 .get(instrument_id)
2999 .and_then(|quotes| quotes.front())
3000 }
3001
3002 #[must_use]
3004 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3005 self.trades
3006 .get(instrument_id)
3007 .and_then(|trades| trades.front())
3008 }
3009
3010 #[must_use]
3012 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3013 self.mark_prices
3014 .get(instrument_id)
3015 .and_then(|mark_prices| mark_prices.front())
3016 }
3017
3018 #[must_use]
3020 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3021 self.index_prices
3022 .get(instrument_id)
3023 .and_then(|index_prices| index_prices.front())
3024 }
3025
3026 #[must_use]
3028 pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3029 self.funding_rates.get(instrument_id)
3030 }
3031
3032 #[must_use]
3034 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3035 self.bars.get(bar_type).and_then(|bars| bars.front())
3036 }
3037
3038 #[must_use]
3040 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3041 self.books
3042 .get(instrument_id)
3043 .map_or(0, |book| book.update_count) as usize
3044 }
3045
3046 #[must_use]
3048 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3049 self.quotes
3050 .get(instrument_id)
3051 .map_or(0, std::collections::VecDeque::len)
3052 }
3053
3054 #[must_use]
3056 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3057 self.trades
3058 .get(instrument_id)
3059 .map_or(0, std::collections::VecDeque::len)
3060 }
3061
3062 #[must_use]
3064 pub fn bar_count(&self, bar_type: &BarType) -> usize {
3065 self.bars
3066 .get(bar_type)
3067 .map_or(0, std::collections::VecDeque::len)
3068 }
3069
3070 #[must_use]
3072 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3073 self.books.contains_key(instrument_id)
3074 }
3075
3076 #[must_use]
3078 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3079 self.quote_count(instrument_id) > 0
3080 }
3081
3082 #[must_use]
3084 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3085 self.trade_count(instrument_id) > 0
3086 }
3087
3088 #[must_use]
3090 pub fn has_bars(&self, bar_type: &BarType) -> bool {
3091 self.bar_count(bar_type) > 0
3092 }
3093
3094 #[must_use]
3095 pub fn get_xrate(
3096 &self,
3097 venue: Venue,
3098 from_currency: Currency,
3099 to_currency: Currency,
3100 price_type: PriceType,
3101 ) -> Option<f64> {
3102 if from_currency == to_currency {
3103 return Some(1.0);
3106 }
3107
3108 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3109
3110 match get_exchange_rate(
3111 from_currency.code,
3112 to_currency.code,
3113 price_type,
3114 bid_quote,
3115 ask_quote,
3116 ) {
3117 Ok(rate) => rate,
3118 Err(e) => {
3119 log::error!("Failed to calculate xrate: {e}");
3120 None
3121 }
3122 }
3123 }
3124
3125 fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3126 let mut bid_quotes = AHashMap::new();
3127 let mut ask_quotes = AHashMap::new();
3128
3129 for instrument_id in self.instruments.keys() {
3130 if instrument_id.venue != *venue {
3131 continue;
3132 }
3133
3134 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3135 if let Some(tick) = ticks.front() {
3136 (tick.bid_price, tick.ask_price)
3137 } else {
3138 continue; }
3140 } else {
3141 let bid_bar = self
3142 .bars
3143 .iter()
3144 .find(|(k, _)| {
3145 k.instrument_id() == *instrument_id
3146 && matches!(k.spec().price_type, PriceType::Bid)
3147 })
3148 .map(|(_, v)| v);
3149
3150 let ask_bar = self
3151 .bars
3152 .iter()
3153 .find(|(k, _)| {
3154 k.instrument_id() == *instrument_id
3155 && matches!(k.spec().price_type, PriceType::Ask)
3156 })
3157 .map(|(_, v)| v);
3158
3159 match (bid_bar, ask_bar) {
3160 (Some(bid), Some(ask)) => {
3161 let bid_price = bid.front().unwrap().close;
3162 let ask_price = ask.front().unwrap().close;
3163
3164 (bid_price, ask_price)
3165 }
3166 _ => continue,
3167 }
3168 };
3169
3170 bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3171 ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3172 }
3173
3174 (bid_quotes, ask_quotes)
3175 }
3176
3177 #[must_use]
3179 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3180 self.mark_xrates.get(&(from_currency, to_currency)).copied()
3181 }
3182
3183 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3189 assert!(xrate > 0.0, "xrate was zero");
3190 self.mark_xrates.insert((from_currency, to_currency), xrate);
3191 self.mark_xrates
3192 .insert((to_currency, from_currency), 1.0 / xrate);
3193 }
3194
3195 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3197 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3198 }
3199
3200 pub fn clear_mark_xrates(&mut self) {
3202 self.mark_xrates.clear();
3203 }
3204
3205 #[must_use]
3209 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3210 self.instruments.get(instrument_id)
3211 }
3212
3213 #[must_use]
3215 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3216 match venue {
3217 Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3218 None => self.instruments.keys().collect(),
3219 }
3220 }
3221
3222 #[must_use]
3224 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3225 self.instruments
3226 .values()
3227 .filter(|i| &i.id().venue == venue)
3228 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3229 .collect()
3230 }
3231
3232 #[must_use]
3234 pub fn bar_types(
3235 &self,
3236 instrument_id: Option<&InstrumentId>,
3237 price_type: Option<&PriceType>,
3238 aggregation_source: AggregationSource,
3239 ) -> Vec<&BarType> {
3240 let mut bar_types = self
3241 .bars
3242 .keys()
3243 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3244 .collect::<Vec<&BarType>>();
3245
3246 if let Some(instrument_id) = instrument_id {
3247 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3248 }
3249
3250 if let Some(price_type) = price_type {
3251 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3252 }
3253
3254 bar_types
3255 }
3256
3257 #[must_use]
3261 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3262 self.synthetics.get(instrument_id)
3263 }
3264
3265 #[must_use]
3267 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3268 self.synthetics.keys().collect()
3269 }
3270
3271 #[must_use]
3273 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3274 self.synthetics.values().collect()
3275 }
3276
3277 #[must_use]
3281 pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3282 self.accounts.get(account_id)
3283 }
3284
3285 #[must_use]
3287 pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3288 self.index
3289 .venue_account
3290 .get(venue)
3291 .and_then(|account_id| self.accounts.get(account_id))
3292 }
3293
3294 #[must_use]
3296 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3297 self.index.venue_account.get(venue)
3298 }
3299
3300 #[must_use]
3302 pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3303 self.accounts
3304 .values()
3305 .filter(|account| &account.id() == account_id)
3306 .collect()
3307 }
3308
3309 pub fn update_own_order_book(&mut self, order: &OrderAny) {
3314 let instrument_id = order.instrument_id();
3315
3316 let own_book = self
3318 .own_books
3319 .entry(instrument_id)
3320 .or_insert_with(|| OwnOrderBook::new(instrument_id));
3321
3322 let own_book_order = order.to_own_book_order();
3324
3325 if order.is_closed() {
3326 if let Err(e) = own_book.delete(own_book_order) {
3328 log::debug!(
3329 "Failed to delete order {} from own book: {}",
3330 order.client_order_id(),
3331 e
3332 );
3333 } else {
3334 log::debug!("Deleted order {} from own book", order.client_order_id());
3335 }
3336 } else {
3337 own_book.update(own_book_order).unwrap_or_else(|e| {
3339 log::debug!(
3340 "Failed to update order {} in own book: {}",
3341 order.client_order_id(),
3342 e
3343 );
3344 });
3345 log::debug!("Updated order {} in own book", order.client_order_id());
3346 }
3347 }
3348}