nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! In-memory cache for market and execution data, with optional persistent backing.
17//!
18//! Provides methods to load, query, and update cached data such as instruments, orders, and prices.
19
20pub mod config;
21pub mod database;
22
23mod index;
24
25#[cfg(test)]
26mod tests;
27
28use std::{
29    collections::{HashSet, VecDeque},
30    fmt::Debug,
31    time::{SystemTime, UNIX_EPOCH},
32};
33
34use ahash::{AHashMap, AHashSet};
35use bytes::Bytes;
36pub use config::CacheConfig; // Re-export
37use database::{CacheDatabaseAdapter, CacheMap};
38use index::CacheIndex;
39use nautilus_core::{
40    UUID4, UnixNanos,
41    correctness::{
42        check_key_not_in_map, check_predicate_false, check_slice_not_empty,
43        check_valid_string_ascii,
44    },
45    datetime::secs_to_nanos,
46};
47use nautilus_model::{
48    accounts::{Account, AccountAny},
49    data::{
50        Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
51        TradeTick, YieldCurveData,
52    },
53    enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
54    identifiers::{
55        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
56        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
57    },
58    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
59    orderbook::{
60        OrderBook,
61        own::{OwnOrderBook, should_handle_own_book_order},
62    },
63    orders::{Order, OrderAny, OrderList},
64    position::Position,
65    types::{Currency, Money, Price, Quantity},
66};
67use ustr::Ustr;
68
69use crate::xrate::get_exchange_rate;
70
71/// A common in-memory `Cache` for market and execution related data.
72#[cfg_attr(
73    feature = "python",
74    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
75)]
76pub struct Cache {
77    config: CacheConfig,
78    index: CacheIndex,
79    database: Option<Box<dyn CacheDatabaseAdapter>>,
80    general: AHashMap<String, Bytes>,
81    currencies: AHashMap<Ustr, Currency>,
82    instruments: AHashMap<InstrumentId, InstrumentAny>,
83    synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
84    books: AHashMap<InstrumentId, OrderBook>,
85    own_books: AHashMap<InstrumentId, OwnOrderBook>,
86    quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
87    trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
88    mark_xrates: AHashMap<(Currency, Currency), f64>,
89    mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
90    index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
91    funding_rates: AHashMap<InstrumentId, FundingRateUpdate>,
92    bars: AHashMap<BarType, VecDeque<Bar>>,
93    greeks: AHashMap<InstrumentId, GreeksData>,
94    yield_curves: AHashMap<String, YieldCurveData>,
95    accounts: AHashMap<AccountId, AccountAny>,
96    orders: AHashMap<ClientOrderId, OrderAny>,
97    order_lists: AHashMap<OrderListId, OrderList>,
98    positions: AHashMap<PositionId, Position>,
99    position_snapshots: AHashMap<PositionId, Bytes>,
100    #[cfg(feature = "defi")]
101    pub(crate) defi: crate::defi::cache::DefiCache,
102}
103
104impl Debug for Cache {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct(stringify!(Cache))
107            .field("config", &self.config)
108            .field("index", &self.index)
109            .field("general", &self.general)
110            .field("currencies", &self.currencies)
111            .field("instruments", &self.instruments)
112            .field("synthetics", &self.synthetics)
113            .field("books", &self.books)
114            .field("own_books", &self.own_books)
115            .field("quotes", &self.quotes)
116            .field("trades", &self.trades)
117            .field("mark_xrates", &self.mark_xrates)
118            .field("mark_prices", &self.mark_prices)
119            .field("index_prices", &self.index_prices)
120            .field("funding_rates", &self.funding_rates)
121            .field("bars", &self.bars)
122            .field("greeks", &self.greeks)
123            .field("yield_curves", &self.yield_curves)
124            .field("accounts", &self.accounts)
125            .field("orders", &self.orders)
126            .field("order_lists", &self.order_lists)
127            .field("positions", &self.positions)
128            .field("position_snapshots", &self.position_snapshots)
129            .finish()
130    }
131}
132
133impl Default for Cache {
134    /// Creates a new default [`Cache`] instance.
135    fn default() -> Self {
136        Self::new(Some(CacheConfig::default()), None)
137    }
138}
139
140impl Cache {
141    /// Creates a new [`Cache`] instance with optional configuration and database adapter.
142    #[must_use]
143    /// # Note
144    ///
145    /// Uses provided `CacheConfig` or defaults, and optional `CacheDatabaseAdapter` for persistence.
146    pub fn new(
147        config: Option<CacheConfig>,
148        database: Option<Box<dyn CacheDatabaseAdapter>>,
149    ) -> Self {
150        Self {
151            config: config.unwrap_or_default(),
152            index: CacheIndex::default(),
153            database,
154            general: AHashMap::new(),
155            currencies: AHashMap::new(),
156            instruments: AHashMap::new(),
157            synthetics: AHashMap::new(),
158            books: AHashMap::new(),
159            own_books: AHashMap::new(),
160            quotes: AHashMap::new(),
161            trades: AHashMap::new(),
162            mark_xrates: AHashMap::new(),
163            mark_prices: AHashMap::new(),
164            index_prices: AHashMap::new(),
165            funding_rates: AHashMap::new(),
166            bars: AHashMap::new(),
167            greeks: AHashMap::new(),
168            yield_curves: AHashMap::new(),
169            accounts: AHashMap::new(),
170            orders: AHashMap::new(),
171            order_lists: AHashMap::new(),
172            positions: AHashMap::new(),
173            position_snapshots: AHashMap::new(),
174            #[cfg(feature = "defi")]
175            defi: crate::defi::cache::DefiCache::default(),
176        }
177    }
178
179    /// Returns the cache instances memory address.
180    #[must_use]
181    pub fn memory_address(&self) -> String {
182        format!("{:?}", std::ptr::from_ref(self))
183    }
184
185    // -- COMMANDS --------------------------------------------------------------------------------
186
187    /// Clears and reloads general entries from the database into the cache.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if loading general cache data fails.
192    pub fn cache_general(&mut self) -> anyhow::Result<()> {
193        self.general = match &mut self.database {
194            Some(db) => db.load()?,
195            None => AHashMap::new(),
196        };
197
198        log::info!(
199            "Cached {} general object(s) from database",
200            self.general.len()
201        );
202        Ok(())
203    }
204
205    /// Loads all core caches (currencies, instruments, accounts, orders, positions) from the database.
206    ///
207    /// # Errors
208    ///
209    /// Returns an error if loading all cache data fails.
210    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
211        let cache_map = match &self.database {
212            Some(db) => db.load_all().await?,
213            None => CacheMap::default(),
214        };
215
216        self.currencies = cache_map.currencies;
217        self.instruments = cache_map.instruments;
218        self.synthetics = cache_map.synthetics;
219        self.accounts = cache_map.accounts;
220        self.orders = cache_map.orders;
221        self.positions = cache_map.positions;
222        Ok(())
223    }
224
225    /// Clears and reloads the currency cache from the database.
226    ///
227    /// # Errors
228    ///
229    /// Returns an error if loading currencies cache fails.
230    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
231        self.currencies = match &mut self.database {
232            Some(db) => db.load_currencies().await?,
233            None => AHashMap::new(),
234        };
235
236        log::info!("Cached {} currencies from database", self.general.len());
237        Ok(())
238    }
239
240    /// Clears and reloads the instrument cache from the database.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if loading instruments cache fails.
245    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
246        self.instruments = match &mut self.database {
247            Some(db) => db.load_instruments().await?,
248            None => AHashMap::new(),
249        };
250
251        log::info!("Cached {} instruments from database", self.general.len());
252        Ok(())
253    }
254
255    /// Clears and reloads the synthetic instrument cache from the database.
256    ///
257    /// # Errors
258    ///
259    /// Returns an error if loading synthetic instruments cache fails.
260    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
261        self.synthetics = match &mut self.database {
262            Some(db) => db.load_synthetics().await?,
263            None => AHashMap::new(),
264        };
265
266        log::info!(
267            "Cached {} synthetic instruments from database",
268            self.general.len()
269        );
270        Ok(())
271    }
272
273    /// Clears and reloads the account cache from the database.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if loading accounts cache fails.
278    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
279        self.accounts = match &mut self.database {
280            Some(db) => db.load_accounts().await?,
281            None => AHashMap::new(),
282        };
283
284        log::info!(
285            "Cached {} synthetic instruments from database",
286            self.general.len()
287        );
288        Ok(())
289    }
290
291    /// Clears and reloads the order cache from the database.
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if loading orders cache fails.
296    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
297        self.orders = match &mut self.database {
298            Some(db) => db.load_orders().await?,
299            None => AHashMap::new(),
300        };
301
302        log::info!("Cached {} orders from database", self.general.len());
303        Ok(())
304    }
305
306    /// Clears and reloads the position cache from the database.
307    ///
308    /// # Errors
309    ///
310    /// Returns an error if loading positions cache fails.
311    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
312        self.positions = match &mut self.database {
313            Some(db) => db.load_positions().await?,
314            None => AHashMap::new(),
315        };
316
317        log::info!("Cached {} positions from database", self.general.len());
318        Ok(())
319    }
320
321    /// Clears the current cache index and re-build.
322    pub fn build_index(&mut self) {
323        log::debug!("Building index");
324
325        // Index accounts
326        for account_id in self.accounts.keys() {
327            self.index
328                .venue_account
329                .insert(account_id.get_issuer(), *account_id);
330        }
331
332        // Index orders
333        for (client_order_id, order) in &self.orders {
334            let instrument_id = order.instrument_id();
335            let venue = instrument_id.venue;
336            let strategy_id = order.strategy_id();
337
338            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
339            self.index
340                .venue_orders
341                .entry(venue)
342                .or_default()
343                .insert(*client_order_id);
344
345            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
346            if let Some(venue_order_id) = order.venue_order_id() {
347                self.index
348                    .venue_order_ids
349                    .insert(venue_order_id, *client_order_id);
350            }
351
352            // 3: Build index.order_position -> {ClientOrderId, PositionId}
353            if let Some(position_id) = order.position_id() {
354                self.index
355                    .order_position
356                    .insert(*client_order_id, position_id);
357            }
358
359            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
360            self.index
361                .order_strategy
362                .insert(*client_order_id, order.strategy_id());
363
364            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
365            self.index
366                .instrument_orders
367                .entry(instrument_id)
368                .or_default()
369                .insert(*client_order_id);
370
371            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
372            self.index
373                .strategy_orders
374                .entry(strategy_id)
375                .or_default()
376                .insert(*client_order_id);
377
378            // 7: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
379            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
380                self.index
381                    .exec_algorithm_orders
382                    .entry(exec_algorithm_id)
383                    .or_default()
384                    .insert(*client_order_id);
385            }
386
387            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
388            if let Some(exec_spawn_id) = order.exec_spawn_id() {
389                self.index
390                    .exec_spawn_orders
391                    .entry(exec_spawn_id)
392                    .or_default()
393                    .insert(*client_order_id);
394            }
395
396            // 9: Build index.orders -> {ClientOrderId}
397            self.index.orders.insert(*client_order_id);
398
399            // 10: Build index.orders_open -> {ClientOrderId}
400            if order.is_open() {
401                self.index.orders_open.insert(*client_order_id);
402            }
403
404            // 11: Build index.orders_closed -> {ClientOrderId}
405            if order.is_closed() {
406                self.index.orders_closed.insert(*client_order_id);
407            }
408
409            // 12: Build index.orders_emulated -> {ClientOrderId}
410            if let Some(emulation_trigger) = order.emulation_trigger()
411                && emulation_trigger != TriggerType::NoTrigger
412                && !order.is_closed()
413            {
414                self.index.orders_emulated.insert(*client_order_id);
415            }
416
417            // 13: Build index.orders_inflight -> {ClientOrderId}
418            if order.is_inflight() {
419                self.index.orders_inflight.insert(*client_order_id);
420            }
421
422            // 14: Build index.strategies -> {StrategyId}
423            self.index.strategies.insert(strategy_id);
424
425            // 15: Build index.strategies -> {ExecAlgorithmId}
426            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
427                self.index.exec_algorithms.insert(exec_algorithm_id);
428            }
429        }
430
431        // Index positions
432        for (position_id, position) in &self.positions {
433            let instrument_id = position.instrument_id;
434            let venue = instrument_id.venue;
435            let strategy_id = position.strategy_id;
436
437            // 1: Build index.venue_positions -> {Venue, {PositionId}}
438            self.index
439                .venue_positions
440                .entry(venue)
441                .or_default()
442                .insert(*position_id);
443
444            // 2: Build index.position_strategy -> {PositionId, StrategyId}
445            self.index
446                .position_strategy
447                .insert(*position_id, position.strategy_id);
448
449            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
450            self.index
451                .position_orders
452                .entry(*position_id)
453                .or_default()
454                .extend(position.client_order_ids().into_iter());
455
456            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
457            self.index
458                .instrument_positions
459                .entry(instrument_id)
460                .or_default()
461                .insert(*position_id);
462
463            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
464            self.index
465                .strategy_positions
466                .entry(strategy_id)
467                .or_default()
468                .insert(*position_id);
469
470            // 6: Build index.positions -> {PositionId}
471            self.index.positions.insert(*position_id);
472
473            // 7: Build index.positions_open -> {PositionId}
474            if position.is_open() {
475                self.index.positions_open.insert(*position_id);
476            }
477
478            // 8: Build index.positions_closed -> {PositionId}
479            if position.is_closed() {
480                self.index.positions_closed.insert(*position_id);
481            }
482
483            // 9: Build index.strategies -> {StrategyId}
484            self.index.strategies.insert(strategy_id);
485        }
486    }
487
488    /// Returns whether the cache has a backing database.
489    #[must_use]
490    pub const fn has_backing(&self) -> bool {
491        self.config.database.is_some()
492    }
493
494    // Calculate the unrealized profit and loss (PnL) for `position`.
495    #[must_use]
496    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
497        let quote = if let Some(quote) = self.quote(&position.instrument_id) {
498            quote
499        } else {
500            log::warn!(
501                "Cannot calculate unrealized PnL for {}, no quotes for {}",
502                position.id,
503                position.instrument_id
504            );
505            return None;
506        };
507
508        let last = match position.side {
509            PositionSide::Flat | PositionSide::NoPositionSide => {
510                return Some(Money::new(0.0, position.settlement_currency));
511            }
512            PositionSide::Long => quote.ask_price,
513            PositionSide::Short => quote.bid_price,
514        };
515
516        Some(position.unrealized_pnl(last))
517    }
518
519    /// Checks integrity of data within the cache.
520    ///
521    /// All data should be loaded from the database prior to this call.
522    /// If an error is found then a log error message will also be produced.
523    ///
524    /// # Panics
525    ///
526    /// Panics if failure calling system clock.
527    #[must_use]
528    pub fn check_integrity(&mut self) -> bool {
529        let mut error_count = 0;
530        let failure = "Integrity failure";
531
532        // Get current timestamp in microseconds
533        let timestamp_us = SystemTime::now()
534            .duration_since(UNIX_EPOCH)
535            .expect("Time went backwards")
536            .as_micros();
537
538        log::info!("Checking data integrity");
539
540        // Check object caches
541        for account_id in self.accounts.keys() {
542            if !self
543                .index
544                .venue_account
545                .contains_key(&account_id.get_issuer())
546            {
547                log::error!(
548                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
549                );
550                error_count += 1;
551            }
552        }
553
554        for (client_order_id, order) in &self.orders {
555            if !self.index.order_strategy.contains_key(client_order_id) {
556                log::error!(
557                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
558                );
559                error_count += 1;
560            }
561            if !self.index.orders.contains(client_order_id) {
562                log::error!(
563                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
564                );
565                error_count += 1;
566            }
567            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
568                log::error!(
569                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
570                );
571                error_count += 1;
572            }
573            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
574                log::error!(
575                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
576                );
577                error_count += 1;
578            }
579            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
580                log::error!(
581                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
582                );
583                error_count += 1;
584            }
585            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
586                if !self
587                    .index
588                    .exec_algorithm_orders
589                    .contains_key(&exec_algorithm_id)
590                {
591                    log::error!(
592                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
593                    );
594                    error_count += 1;
595                }
596                if order.exec_spawn_id().is_none()
597                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
598                {
599                    log::error!(
600                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
601                    );
602                    error_count += 1;
603                }
604            }
605        }
606
607        for (position_id, position) in &self.positions {
608            if !self.index.position_strategy.contains_key(position_id) {
609                log::error!(
610                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
611                );
612                error_count += 1;
613            }
614            if !self.index.position_orders.contains_key(position_id) {
615                log::error!(
616                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
617                );
618                error_count += 1;
619            }
620            if !self.index.positions.contains(position_id) {
621                log::error!(
622                    "{failure} in positions: {position_id} not found in `self.index.positions`",
623                );
624                error_count += 1;
625            }
626            if position.is_open() && !self.index.positions_open.contains(position_id) {
627                log::error!(
628                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
629                );
630                error_count += 1;
631            }
632            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
633                log::error!(
634                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
635                );
636                error_count += 1;
637            }
638        }
639
640        // Check indexes
641        for account_id in self.index.venue_account.values() {
642            if !self.accounts.contains_key(account_id) {
643                log::error!(
644                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
645                );
646                error_count += 1;
647            }
648        }
649
650        for client_order_id in self.index.venue_order_ids.values() {
651            if !self.orders.contains_key(client_order_id) {
652                log::error!(
653                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
654                );
655                error_count += 1;
656            }
657        }
658
659        for client_order_id in self.index.client_order_ids.keys() {
660            if !self.orders.contains_key(client_order_id) {
661                log::error!(
662                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
663                );
664                error_count += 1;
665            }
666        }
667
668        for client_order_id in self.index.order_position.keys() {
669            if !self.orders.contains_key(client_order_id) {
670                log::error!(
671                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
672                );
673                error_count += 1;
674            }
675        }
676
677        // Check indexes
678        for client_order_id in self.index.order_strategy.keys() {
679            if !self.orders.contains_key(client_order_id) {
680                log::error!(
681                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
682                );
683                error_count += 1;
684            }
685        }
686
687        for position_id in self.index.position_strategy.keys() {
688            if !self.positions.contains_key(position_id) {
689                log::error!(
690                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
691                );
692                error_count += 1;
693            }
694        }
695
696        for position_id in self.index.position_orders.keys() {
697            if !self.positions.contains_key(position_id) {
698                log::error!(
699                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
700                );
701                error_count += 1;
702            }
703        }
704
705        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
706            for client_order_id in client_order_ids {
707                if !self.orders.contains_key(client_order_id) {
708                    log::error!(
709                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
710                    );
711                    error_count += 1;
712                }
713            }
714        }
715
716        for instrument_id in self.index.instrument_positions.keys() {
717            if !self.index.instrument_orders.contains_key(instrument_id) {
718                log::error!(
719                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
720                );
721                error_count += 1;
722            }
723        }
724
725        for client_order_ids in self.index.strategy_orders.values() {
726            for client_order_id in client_order_ids {
727                if !self.orders.contains_key(client_order_id) {
728                    log::error!(
729                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
730                    );
731                    error_count += 1;
732                }
733            }
734        }
735
736        for position_ids in self.index.strategy_positions.values() {
737            for position_id in position_ids {
738                if !self.positions.contains_key(position_id) {
739                    log::error!(
740                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
741                    );
742                    error_count += 1;
743                }
744            }
745        }
746
747        for client_order_id in &self.index.orders {
748            if !self.orders.contains_key(client_order_id) {
749                log::error!(
750                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
751                );
752                error_count += 1;
753            }
754        }
755
756        for client_order_id in &self.index.orders_emulated {
757            if !self.orders.contains_key(client_order_id) {
758                log::error!(
759                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
760                );
761                error_count += 1;
762            }
763        }
764
765        for client_order_id in &self.index.orders_inflight {
766            if !self.orders.contains_key(client_order_id) {
767                log::error!(
768                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
769                );
770                error_count += 1;
771            }
772        }
773
774        for client_order_id in &self.index.orders_open {
775            if !self.orders.contains_key(client_order_id) {
776                log::error!(
777                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
778                );
779                error_count += 1;
780            }
781        }
782
783        for client_order_id in &self.index.orders_closed {
784            if !self.orders.contains_key(client_order_id) {
785                log::error!(
786                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
787                );
788                error_count += 1;
789            }
790        }
791
792        for position_id in &self.index.positions {
793            if !self.positions.contains_key(position_id) {
794                log::error!(
795                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
796                );
797                error_count += 1;
798            }
799        }
800
801        for position_id in &self.index.positions_open {
802            if !self.positions.contains_key(position_id) {
803                log::error!(
804                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
805                );
806                error_count += 1;
807            }
808        }
809
810        for position_id in &self.index.positions_closed {
811            if !self.positions.contains_key(position_id) {
812                log::error!(
813                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
814                );
815                error_count += 1;
816            }
817        }
818
819        for strategy_id in &self.index.strategies {
820            if !self.index.strategy_orders.contains_key(strategy_id) {
821                log::error!(
822                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
823                );
824                error_count += 1;
825            }
826        }
827
828        for exec_algorithm_id in &self.index.exec_algorithms {
829            if !self
830                .index
831                .exec_algorithm_orders
832                .contains_key(exec_algorithm_id)
833            {
834                log::error!(
835                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
836                );
837                error_count += 1;
838            }
839        }
840
841        let total_us = SystemTime::now()
842            .duration_since(UNIX_EPOCH)
843            .expect("Time went backwards")
844            .as_micros()
845            - timestamp_us;
846
847        if error_count == 0 {
848            log::info!("Integrity check passed in {total_us}μs");
849            true
850        } else {
851            log::error!(
852                "Integrity check failed with {error_count} error{} in {total_us}μs",
853                if error_count == 1 { "" } else { "s" },
854            );
855            false
856        }
857    }
858
859    /// Checks for any residual open state and log warnings if any are found.
860    ///
861    ///'Open state' is considered to be open orders and open positions.
862    #[must_use]
863    pub fn check_residuals(&self) -> bool {
864        log::debug!("Checking residuals");
865
866        let mut residuals = false;
867
868        // Check for any open orders
869        for order in self.orders_open(None, None, None, None) {
870            residuals = true;
871            log::warn!("Residual {order:?}");
872        }
873
874        // Check for any open positions
875        for position in self.positions_open(None, None, None, None) {
876            residuals = true;
877            log::warn!("Residual {position}");
878        }
879
880        residuals
881    }
882
883    /// Purges all closed orders from the cache that are older than `buffer_secs`.
884    ///
885    ///
886    /// Only orders that have been closed for at least this amount of time will be purged.
887    /// A value of 0 means purge all closed orders regardless of when they were closed.
888    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
889        log::debug!(
890            "Purging closed orders{}",
891            if buffer_secs > 0 {
892                format!(" with buffer_secs={buffer_secs}")
893            } else {
894                String::new()
895            }
896        );
897
898        let buffer_ns = secs_to_nanos(buffer_secs as f64);
899
900        'outer: for client_order_id in self.index.orders_closed.clone() {
901            if let Some(order) = self.orders.get(&client_order_id)
902                && order.is_closed()
903                && let Some(ts_closed) = order.ts_closed()
904                && ts_closed + buffer_ns <= ts_now
905            {
906                // Check any linked orders (contingency orders)
907                if let Some(linked_order_ids) = order.linked_order_ids() {
908                    for linked_order_id in linked_order_ids {
909                        if let Some(linked_order) = self.orders.get(linked_order_id)
910                            && linked_order.is_open()
911                        {
912                            // Do not purge if linked order still open
913                            continue 'outer;
914                        }
915                    }
916                }
917
918                self.purge_order(client_order_id);
919            }
920        }
921    }
922
923    /// Purges all closed positions from the cache that are older than `buffer_secs`.
924    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
925        log::debug!(
926            "Purging closed positions{}",
927            if buffer_secs > 0 {
928                format!(" with buffer_secs={buffer_secs}")
929            } else {
930                String::new()
931            }
932        );
933
934        let buffer_ns = secs_to_nanos(buffer_secs as f64);
935
936        for position_id in self.index.positions_closed.clone() {
937            if let Some(position) = self.positions.get(&position_id)
938                && position.is_closed()
939                && let Some(ts_closed) = position.ts_closed
940                && ts_closed + buffer_ns <= ts_now
941            {
942                self.purge_position(position_id);
943            }
944        }
945    }
946
947    /// Purges the order with the `client_order_id` from the cache (if found).
948    ///
949    /// For safety, an order is prevented from being purged if it's open.
950    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
951        // Check if order exists and is safe to purge before removing
952        let order = self.orders.get(&client_order_id).cloned();
953
954        // SAFETY: Prevent purging open orders
955        if let Some(ref ord) = order
956            && ord.is_open()
957        {
958            log::warn!("Order {client_order_id} found open when purging, skipping purge");
959            return;
960        }
961
962        // If order exists in cache, remove it and clean up order-specific indices
963        if let Some(ref ord) = order {
964            // Safe to purge
965            self.orders.remove(&client_order_id);
966
967            // Remove order from venue index
968            if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
969            {
970                venue_orders.remove(&client_order_id);
971            }
972
973            // Remove venue order ID index if exists
974            if let Some(venue_order_id) = ord.venue_order_id() {
975                self.index.venue_order_ids.remove(&venue_order_id);
976            }
977
978            // Remove from instrument orders index
979            if let Some(instrument_orders) =
980                self.index.instrument_orders.get_mut(&ord.instrument_id())
981            {
982                instrument_orders.remove(&client_order_id);
983            }
984
985            // Remove from position orders index if associated with a position
986            if let Some(position_id) = ord.position_id()
987                && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
988            {
989                position_orders.remove(&client_order_id);
990            }
991
992            // Remove from exec algorithm orders index if it has an exec algorithm
993            if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
994                && let Some(exec_algorithm_orders) =
995                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
996            {
997                exec_algorithm_orders.remove(&client_order_id);
998            }
999
1000            // Clean up strategy orders reverse index
1001            if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1002                strategy_orders.remove(&client_order_id);
1003                if strategy_orders.is_empty() {
1004                    self.index.strategy_orders.remove(&ord.strategy_id());
1005                }
1006            }
1007
1008            // Clean up exec spawn reverse index (if this order is a spawned child)
1009            if let Some(exec_spawn_id) = ord.exec_spawn_id()
1010                && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1011            {
1012                spawn_orders.remove(&client_order_id);
1013                if spawn_orders.is_empty() {
1014                    self.index.exec_spawn_orders.remove(&exec_spawn_id);
1015                }
1016            }
1017
1018            log::info!("Purged order {client_order_id}");
1019        } else {
1020            log::warn!("Order {client_order_id} not found when purging");
1021        }
1022
1023        // Always clean up order indices (even if order was not in cache)
1024        self.index.order_position.remove(&client_order_id);
1025        let strategy_id = self.index.order_strategy.remove(&client_order_id);
1026        self.index.order_client.remove(&client_order_id);
1027        self.index.client_order_ids.remove(&client_order_id);
1028
1029        // Clean up reverse index when order not in cache (using forward index)
1030        if let Some(strategy_id) = strategy_id
1031            && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1032        {
1033            strategy_orders.remove(&client_order_id);
1034            if strategy_orders.is_empty() {
1035                self.index.strategy_orders.remove(&strategy_id);
1036            }
1037        }
1038
1039        // Remove spawn parent entry if this order was a spawn root
1040        self.index.exec_spawn_orders.remove(&client_order_id);
1041
1042        self.index.orders.remove(&client_order_id);
1043        self.index.orders_closed.remove(&client_order_id);
1044        self.index.orders_emulated.remove(&client_order_id);
1045        self.index.orders_inflight.remove(&client_order_id);
1046        self.index.orders_pending_cancel.remove(&client_order_id);
1047    }
1048
1049    /// Purges the position with the `position_id` from the cache (if found).
1050    ///
1051    /// For safety, a position is prevented from being purged if it's open.
1052    pub fn purge_position(&mut self, position_id: PositionId) {
1053        // Check if position exists and is safe to purge before removing
1054        let position = self.positions.get(&position_id).cloned();
1055
1056        // SAFETY: Prevent purging open positions
1057        if let Some(ref pos) = position
1058            && pos.is_open()
1059        {
1060            log::warn!("Position {position_id} found open when purging, skipping purge");
1061            return;
1062        }
1063
1064        // If position exists in cache, remove it and clean up position-specific indices
1065        if let Some(ref pos) = position {
1066            self.positions.remove(&position_id);
1067
1068            // Remove from venue positions index
1069            if let Some(venue_positions) =
1070                self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1071            {
1072                venue_positions.remove(&position_id);
1073            }
1074
1075            // Remove from instrument positions index
1076            if let Some(instrument_positions) =
1077                self.index.instrument_positions.get_mut(&pos.instrument_id)
1078            {
1079                instrument_positions.remove(&position_id);
1080            }
1081
1082            // Remove from strategy positions index
1083            if let Some(strategy_positions) =
1084                self.index.strategy_positions.get_mut(&pos.strategy_id)
1085            {
1086                strategy_positions.remove(&position_id);
1087            }
1088
1089            // Remove position ID from orders that reference it
1090            for client_order_id in pos.client_order_ids() {
1091                self.index.order_position.remove(&client_order_id);
1092            }
1093
1094            log::info!("Purged position {position_id}");
1095        } else {
1096            log::warn!("Position {position_id} not found when purging");
1097        }
1098
1099        // Always clean up position indices (even if position not in cache)
1100        self.index.position_strategy.remove(&position_id);
1101        self.index.position_orders.remove(&position_id);
1102        self.index.positions.remove(&position_id);
1103        self.index.positions_open.remove(&position_id);
1104        self.index.positions_closed.remove(&position_id);
1105
1106        // Always clean up position snapshots (even if position not in cache)
1107        self.position_snapshots.remove(&position_id);
1108    }
1109
1110    /// Purges all account state events which are outside the lookback window.
1111    ///
1112    /// Only events which are outside the lookback window will be purged.
1113    /// A value of 0 means purge all account state events.
1114    pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1115        log::debug!(
1116            "Purging account events{}",
1117            if lookback_secs > 0 {
1118                format!(" with lookback_secs={lookback_secs}")
1119            } else {
1120                String::new()
1121            }
1122        );
1123
1124        for account in self.accounts.values_mut() {
1125            let event_count = account.event_count();
1126            account.purge_account_events(ts_now, lookback_secs);
1127            let count_diff = event_count - account.event_count();
1128            if count_diff > 0 {
1129                log::info!(
1130                    "Purged {} event(s) from account {}",
1131                    count_diff,
1132                    account.id()
1133                );
1134            }
1135        }
1136    }
1137
1138    /// Clears the caches index.
1139    pub fn clear_index(&mut self) {
1140        self.index.clear();
1141        log::debug!("Cleared index");
1142    }
1143
1144    /// Resets the cache.
1145    ///
1146    /// All stateful fields are reset to their initial value.
1147    pub fn reset(&mut self) {
1148        log::debug!("Resetting cache");
1149
1150        self.general.clear();
1151        self.currencies.clear();
1152        self.instruments.clear();
1153        self.synthetics.clear();
1154        self.books.clear();
1155        self.own_books.clear();
1156        self.quotes.clear();
1157        self.trades.clear();
1158        self.mark_xrates.clear();
1159        self.mark_prices.clear();
1160        self.index_prices.clear();
1161        self.bars.clear();
1162        self.accounts.clear();
1163        self.orders.clear();
1164        self.order_lists.clear();
1165        self.positions.clear();
1166        self.position_snapshots.clear();
1167        self.greeks.clear();
1168        self.yield_curves.clear();
1169
1170        #[cfg(feature = "defi")]
1171        {
1172            self.defi.pools.clear();
1173            self.defi.pool_profilers.clear();
1174        }
1175
1176        self.clear_index();
1177
1178        log::info!("Reset cache");
1179    }
1180
1181    /// Dispose of the cache which will close any underlying database adapter.
1182    ///
1183    /// # Panics
1184    ///
1185    /// Panics if closing the database connection fails.
1186    pub fn dispose(&mut self) {
1187        if let Some(database) = &mut self.database {
1188            database.close().expect("Failed to close database");
1189        }
1190    }
1191
1192    /// Flushes the caches database which permanently removes all persisted data.
1193    ///
1194    /// # Panics
1195    ///
1196    /// Panics if flushing the database connection fails.
1197    pub fn flush_db(&mut self) {
1198        if let Some(database) = &mut self.database {
1199            database.flush().expect("Failed to flush database");
1200        }
1201    }
1202
1203    /// Adds a raw bytes `value` to the cache under the `key`.
1204    ///
1205    /// The cache stores only raw bytes; interpretation is the caller's responsibility.
1206    ///
1207    /// # Errors
1208    ///
1209    /// Returns an error if persisting the entry to the backing database fails.
1210    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1211        check_valid_string_ascii(key, stringify!(key))?;
1212        check_predicate_false(value.is_empty(), stringify!(value))?;
1213
1214        log::debug!("Adding general {key}");
1215        self.general.insert(key.to_string(), value.clone());
1216
1217        if let Some(database) = &mut self.database {
1218            database.add(key.to_string(), value)?;
1219        }
1220        Ok(())
1221    }
1222
1223    /// Adds an `OrderBook` to the cache.
1224    ///
1225    /// # Errors
1226    ///
1227    /// Returns an error if persisting the order book to the backing database fails.
1228    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1229        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1230
1231        if self.config.save_market_data
1232            && let Some(database) = &mut self.database
1233        {
1234            database.add_order_book(&book)?;
1235        }
1236
1237        self.books.insert(book.instrument_id, book);
1238        Ok(())
1239    }
1240
1241    /// Adds an `OwnOrderBook` to the cache.
1242    ///
1243    /// # Errors
1244    ///
1245    /// Returns an error if persisting the own order book fails.
1246    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1247        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1248
1249        self.own_books.insert(own_book.instrument_id, own_book);
1250        Ok(())
1251    }
1252
1253    /// Adds the `mark_price` update to the cache.
1254    ///
1255    /// # Errors
1256    ///
1257    /// Returns an error if persisting the mark price to the backing database fails.
1258    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1259        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1260
1261        if self.config.save_market_data {
1262            // TODO: Placeholder and return Result for consistency
1263        }
1264
1265        let mark_prices_deque = self
1266            .mark_prices
1267            .entry(mark_price.instrument_id)
1268            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1269        mark_prices_deque.push_front(mark_price);
1270        Ok(())
1271    }
1272
1273    /// Adds the `index_price` update to the cache.
1274    ///
1275    /// # Errors
1276    ///
1277    /// Returns an error if persisting the index price to the backing database fails.
1278    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1279        log::debug!(
1280            "Adding `IndexPriceUpdate` for {}",
1281            index_price.instrument_id
1282        );
1283
1284        if self.config.save_market_data {
1285            // TODO: Placeholder and return Result for consistency
1286        }
1287
1288        let index_prices_deque = self
1289            .index_prices
1290            .entry(index_price.instrument_id)
1291            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1292        index_prices_deque.push_front(index_price);
1293        Ok(())
1294    }
1295
1296    /// Adds the `funding_rate` update to the cache.
1297    ///
1298    /// # Errors
1299    ///
1300    /// Returns an error if persisting the funding rate update to the backing database fails.
1301    pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1302        log::debug!(
1303            "Adding `FundingRateUpdate` for {}",
1304            funding_rate.instrument_id
1305        );
1306
1307        if self.config.save_market_data {
1308            // TODO: Placeholder and return Result for consistency
1309        }
1310
1311        self.funding_rates
1312            .insert(funding_rate.instrument_id, funding_rate);
1313        Ok(())
1314    }
1315
1316    /// Adds the `quote` tick to the cache.
1317    ///
1318    /// # Errors
1319    ///
1320    /// Returns an error if persisting the quote tick to the backing database fails.
1321    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1322        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1323
1324        if self.config.save_market_data
1325            && let Some(database) = &mut self.database
1326        {
1327            database.add_quote(&quote)?;
1328        }
1329
1330        let quotes_deque = self
1331            .quotes
1332            .entry(quote.instrument_id)
1333            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1334        quotes_deque.push_front(quote);
1335        Ok(())
1336    }
1337
1338    /// Adds the `quotes` to the cache.
1339    ///
1340    /// # Errors
1341    ///
1342    /// Returns an error if persisting the quote ticks to the backing database fails.
1343    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1344        check_slice_not_empty(quotes, stringify!(quotes))?;
1345
1346        let instrument_id = quotes[0].instrument_id;
1347        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1348
1349        if self.config.save_market_data
1350            && let Some(database) = &mut self.database
1351        {
1352            for quote in quotes {
1353                database.add_quote(quote)?;
1354            }
1355        }
1356
1357        let quotes_deque = self
1358            .quotes
1359            .entry(instrument_id)
1360            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1361
1362        for quote in quotes {
1363            quotes_deque.push_front(*quote);
1364        }
1365        Ok(())
1366    }
1367
1368    /// Adds the `trade` tick to the cache.
1369    ///
1370    /// # Errors
1371    ///
1372    /// Returns an error if persisting the trade tick to the backing database fails.
1373    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1374        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1375
1376        if self.config.save_market_data
1377            && let Some(database) = &mut self.database
1378        {
1379            database.add_trade(&trade)?;
1380        }
1381
1382        let trades_deque = self
1383            .trades
1384            .entry(trade.instrument_id)
1385            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1386        trades_deque.push_front(trade);
1387        Ok(())
1388    }
1389
1390    /// Adds the give `trades` to the cache.
1391    ///
1392    /// # Errors
1393    ///
1394    /// Returns an error if persisting the trade ticks to the backing database fails.
1395    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1396        check_slice_not_empty(trades, stringify!(trades))?;
1397
1398        let instrument_id = trades[0].instrument_id;
1399        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1400
1401        if self.config.save_market_data
1402            && let Some(database) = &mut self.database
1403        {
1404            for trade in trades {
1405                database.add_trade(trade)?;
1406            }
1407        }
1408
1409        let trades_deque = self
1410            .trades
1411            .entry(instrument_id)
1412            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1413
1414        for trade in trades {
1415            trades_deque.push_front(*trade);
1416        }
1417        Ok(())
1418    }
1419
1420    /// Adds the `bar` to the cache.
1421    ///
1422    /// # Errors
1423    ///
1424    /// Returns an error if persisting the bar to the backing database fails.
1425    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1426        log::debug!("Adding `Bar` {}", bar.bar_type);
1427
1428        if self.config.save_market_data
1429            && let Some(database) = &mut self.database
1430        {
1431            database.add_bar(&bar)?;
1432        }
1433
1434        let bars = self
1435            .bars
1436            .entry(bar.bar_type)
1437            .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1438        bars.push_front(bar);
1439        Ok(())
1440    }
1441
1442    /// Adds the `bars` to the cache.
1443    ///
1444    /// # Errors
1445    ///
1446    /// Returns an error if persisting the bars to the backing database fails.
1447    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1448        check_slice_not_empty(bars, stringify!(bars))?;
1449
1450        let bar_type = bars[0].bar_type;
1451        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1452
1453        if self.config.save_market_data
1454            && let Some(database) = &mut self.database
1455        {
1456            for bar in bars {
1457                database.add_bar(bar)?;
1458            }
1459        }
1460
1461        let bars_deque = self
1462            .bars
1463            .entry(bar_type)
1464            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1465
1466        for bar in bars {
1467            bars_deque.push_front(*bar);
1468        }
1469        Ok(())
1470    }
1471
1472    /// Adds the `greeks` data to the cache.
1473    ///
1474    /// # Errors
1475    ///
1476    /// Returns an error if persisting the greeks data to the backing database fails.
1477    pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1478        log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1479
1480        if self.config.save_market_data
1481            && let Some(_database) = &mut self.database
1482        {
1483            // TODO: Implement database.add_greeks(&greeks) when database adapter is updated
1484        }
1485
1486        self.greeks.insert(greeks.instrument_id, greeks);
1487        Ok(())
1488    }
1489
1490    /// Gets the greeks data for the `instrument_id`.
1491    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1492        self.greeks.get(instrument_id).cloned()
1493    }
1494
1495    /// Adds the `yield_curve` data to the cache.
1496    ///
1497    /// # Errors
1498    ///
1499    /// Returns an error if persisting the yield curve data to the backing database fails.
1500    pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1501        log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1502
1503        if self.config.save_market_data
1504            && let Some(_database) = &mut self.database
1505        {
1506            // TODO: Implement database.add_yield_curve(&yield_curve) when database adapter is updated
1507        }
1508
1509        self.yield_curves
1510            .insert(yield_curve.curve_name.clone(), yield_curve);
1511        Ok(())
1512    }
1513
1514    /// Gets the yield curve for the `key`.
1515    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1516        self.yield_curves.get(key).map(|curve| {
1517            let curve_clone = curve.clone();
1518            Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1519                as Box<dyn Fn(f64) -> f64>
1520        })
1521    }
1522
1523    /// Adds the `currency` to the cache.
1524    ///
1525    /// # Errors
1526    ///
1527    /// Returns an error if persisting the currency to the backing database fails.
1528    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1529        log::debug!("Adding `Currency` {}", currency.code);
1530
1531        if let Some(database) = &mut self.database {
1532            database.add_currency(&currency)?;
1533        }
1534
1535        self.currencies.insert(currency.code, currency);
1536        Ok(())
1537    }
1538
1539    /// Adds the `instrument` to the cache.
1540    ///
1541    /// # Errors
1542    ///
1543    /// Returns an error if persisting the instrument to the backing database fails.
1544    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1545        log::debug!("Adding `Instrument` {}", instrument.id());
1546
1547        if let Some(database) = &mut self.database {
1548            database.add_instrument(&instrument)?;
1549        }
1550
1551        self.instruments.insert(instrument.id(), instrument);
1552        Ok(())
1553    }
1554
1555    /// Adds the `synthetic` instrument to the cache.
1556    ///
1557    /// # Errors
1558    ///
1559    /// Returns an error if persisting the synthetic instrument to the backing database fails.
1560    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1561        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1562
1563        if let Some(database) = &mut self.database {
1564            database.add_synthetic(&synthetic)?;
1565        }
1566
1567        self.synthetics.insert(synthetic.id, synthetic);
1568        Ok(())
1569    }
1570
1571    /// Adds the `account` to the cache.
1572    ///
1573    /// # Errors
1574    ///
1575    /// Returns an error if persisting the account to the backing database fails.
1576    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1577        log::debug!("Adding `Account` {}", account.id());
1578
1579        if let Some(database) = &mut self.database {
1580            database.add_account(&account)?;
1581        }
1582
1583        let account_id = account.id();
1584        self.accounts.insert(account_id, account);
1585        self.index
1586            .venue_account
1587            .insert(account_id.get_issuer(), account_id);
1588        Ok(())
1589    }
1590
1591    /// Indexes the `client_order_id` with the `venue_order_id`.
1592    ///
1593    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
1594    ///
1595    /// # Errors
1596    ///
1597    /// Returns an error if the existing venue order ID conflicts and overwrite is false.
1598    pub fn add_venue_order_id(
1599        &mut self,
1600        client_order_id: &ClientOrderId,
1601        venue_order_id: &VenueOrderId,
1602        overwrite: bool,
1603    ) -> anyhow::Result<()> {
1604        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1605            && !overwrite
1606            && existing_venue_order_id != venue_order_id
1607        {
1608            anyhow::bail!(
1609                "Existing {existing_venue_order_id} for {client_order_id}
1610                    did not match the given {venue_order_id}.
1611                    If you are writing a test then try a different `venue_order_id`,
1612                    otherwise this is probably a bug."
1613            );
1614        }
1615
1616        self.index
1617            .client_order_ids
1618            .insert(*client_order_id, *venue_order_id);
1619        self.index
1620            .venue_order_ids
1621            .insert(*venue_order_id, *client_order_id);
1622
1623        Ok(())
1624    }
1625
1626    /// Adds the `order` to the cache indexed with any given identifiers.
1627    ///
1628    /// # Parameters
1629    ///
1630    /// `override_existing`: If the added order should 'override' any existing order and replace
1631    /// it in the cache. This is currently used for emulated orders which are
1632    /// being released and transformed into another type.
1633    ///
1634    /// # Errors
1635    ///
1636    /// Returns an error if not `replace_existing` and the `order.client_order_id` is already contained in the cache.
1637    pub fn add_order(
1638        &mut self,
1639        order: OrderAny,
1640        position_id: Option<PositionId>,
1641        client_id: Option<ClientId>,
1642        replace_existing: bool,
1643    ) -> anyhow::Result<()> {
1644        let instrument_id = order.instrument_id();
1645        let venue = instrument_id.venue;
1646        let client_order_id = order.client_order_id();
1647        let strategy_id = order.strategy_id();
1648        let exec_algorithm_id = order.exec_algorithm_id();
1649        let exec_spawn_id = order.exec_spawn_id();
1650
1651        if !replace_existing {
1652            check_key_not_in_map(
1653                &client_order_id,
1654                &self.orders,
1655                stringify!(client_order_id),
1656                stringify!(orders),
1657            )?;
1658        }
1659
1660        log::debug!("Adding {order:?}");
1661
1662        self.index.orders.insert(client_order_id);
1663        self.index
1664            .order_strategy
1665            .insert(client_order_id, strategy_id);
1666        self.index.strategies.insert(strategy_id);
1667
1668        // Update venue -> orders index
1669        self.index
1670            .venue_orders
1671            .entry(venue)
1672            .or_default()
1673            .insert(client_order_id);
1674
1675        // Update instrument -> orders index
1676        self.index
1677            .instrument_orders
1678            .entry(instrument_id)
1679            .or_default()
1680            .insert(client_order_id);
1681
1682        // Update strategy -> orders index
1683        self.index
1684            .strategy_orders
1685            .entry(strategy_id)
1686            .or_default()
1687            .insert(client_order_id);
1688
1689        // Update exec_algorithm -> orders index
1690        if let Some(exec_algorithm_id) = exec_algorithm_id {
1691            self.index.exec_algorithms.insert(exec_algorithm_id);
1692
1693            self.index
1694                .exec_algorithm_orders
1695                .entry(exec_algorithm_id)
1696                .or_default()
1697                .insert(client_order_id);
1698        }
1699
1700        // Update exec_spawn -> orders index
1701        if let Some(exec_spawn_id) = exec_spawn_id {
1702            self.index
1703                .exec_spawn_orders
1704                .entry(exec_spawn_id)
1705                .or_default()
1706                .insert(client_order_id);
1707        }
1708
1709        // Update emulation index
1710        match order.emulation_trigger() {
1711            Some(_) => {
1712                self.index.orders_emulated.remove(&client_order_id);
1713            }
1714            None => {
1715                self.index.orders_emulated.insert(client_order_id);
1716            }
1717        }
1718
1719        // Index position ID if provided
1720        if let Some(position_id) = position_id {
1721            self.add_position_id(
1722                &position_id,
1723                &order.instrument_id().venue,
1724                &client_order_id,
1725                &strategy_id,
1726            )?;
1727        }
1728
1729        // Index client ID if provided
1730        if let Some(client_id) = client_id {
1731            self.index.order_client.insert(client_order_id, client_id);
1732            log::debug!("Indexed {client_id:?}");
1733        }
1734
1735        if let Some(database) = &mut self.database {
1736            database.add_order(&order, client_id)?;
1737            // TODO: Implement
1738            // if self.config.snapshot_orders {
1739            //     database.snapshot_order_state(order)?;
1740            // }
1741        }
1742
1743        self.orders.insert(client_order_id, order);
1744
1745        Ok(())
1746    }
1747
1748    /// Indexes the `position_id` with the other given IDs.
1749    ///
1750    /// # Errors
1751    ///
1752    /// Returns an error if indexing position ID in the backing database fails.
1753    pub fn add_position_id(
1754        &mut self,
1755        position_id: &PositionId,
1756        venue: &Venue,
1757        client_order_id: &ClientOrderId,
1758        strategy_id: &StrategyId,
1759    ) -> anyhow::Result<()> {
1760        self.index
1761            .order_position
1762            .insert(*client_order_id, *position_id);
1763
1764        // Index: ClientOrderId -> PositionId
1765        if let Some(database) = &mut self.database {
1766            database.index_order_position(*client_order_id, *position_id)?;
1767        }
1768
1769        // Index: PositionId -> StrategyId
1770        self.index
1771            .position_strategy
1772            .insert(*position_id, *strategy_id);
1773
1774        // Index: PositionId -> set[ClientOrderId]
1775        self.index
1776            .position_orders
1777            .entry(*position_id)
1778            .or_default()
1779            .insert(*client_order_id);
1780
1781        // Index: StrategyId -> set[PositionId]
1782        self.index
1783            .strategy_positions
1784            .entry(*strategy_id)
1785            .or_default()
1786            .insert(*position_id);
1787
1788        // Index: Venue -> set[PositionId]
1789        self.index
1790            .venue_positions
1791            .entry(*venue)
1792            .or_default()
1793            .insert(*position_id);
1794
1795        Ok(())
1796    }
1797
1798    /// Adds the `position` to the cache.
1799    ///
1800    /// # Errors
1801    ///
1802    /// Returns an error if persisting the position to the backing database fails.
1803    pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1804        self.positions.insert(position.id, position.clone());
1805        self.index.positions.insert(position.id);
1806        self.index.positions_open.insert(position.id);
1807
1808        log::debug!("Adding {position}");
1809
1810        self.add_position_id(
1811            &position.id,
1812            &position.instrument_id.venue,
1813            &position.opening_order_id,
1814            &position.strategy_id,
1815        )?;
1816
1817        let venue = position.instrument_id.venue;
1818        let venue_positions = self.index.venue_positions.entry(venue).or_default();
1819        venue_positions.insert(position.id);
1820
1821        // Index: InstrumentId -> AHashSet
1822        let instrument_id = position.instrument_id;
1823        let instrument_positions = self
1824            .index
1825            .instrument_positions
1826            .entry(instrument_id)
1827            .or_default();
1828        instrument_positions.insert(position.id);
1829
1830        if let Some(database) = &mut self.database {
1831            database.add_position(&position)?;
1832            // TODO: Implement position snapshots
1833            // if self.snapshot_positions {
1834            //     database.snapshot_position_state(
1835            //         position,
1836            //         position.ts_last,
1837            //         self.calculate_unrealized_pnl(&position),
1838            //     )?;
1839            // }
1840        }
1841
1842        Ok(())
1843    }
1844
1845    /// Updates the `account` in the cache.
1846    ///
1847    /// # Errors
1848    ///
1849    /// Returns an error if updating the account in the database fails.
1850    pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1851        if let Some(database) = &mut self.database {
1852            database.update_account(&account)?;
1853        }
1854        Ok(())
1855    }
1856
1857    /// Updates the `order` in the cache.
1858    ///
1859    /// # Errors
1860    ///
1861    /// Returns an error if updating the order in the database fails.
1862    pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1863        let client_order_id = order.client_order_id();
1864
1865        // Update venue order ID
1866        if let Some(venue_order_id) = order.venue_order_id() {
1867            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
1868            // venues which use a cancel+replace update strategy.
1869            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1870                // TODO: If the last event was `OrderUpdated` then overwrite should be true
1871                self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1872            }
1873        }
1874
1875        // Update in-flight state
1876        if order.is_inflight() {
1877            self.index.orders_inflight.insert(client_order_id);
1878        } else {
1879            self.index.orders_inflight.remove(&client_order_id);
1880        }
1881
1882        // Update open/closed state
1883        if order.is_open() {
1884            self.index.orders_closed.remove(&client_order_id);
1885            self.index.orders_open.insert(client_order_id);
1886        } else if order.is_closed() {
1887            self.index.orders_open.remove(&client_order_id);
1888            self.index.orders_pending_cancel.remove(&client_order_id);
1889            self.index.orders_closed.insert(client_order_id);
1890        }
1891
1892        // Update emulation
1893        if let Some(emulation_trigger) = order.emulation_trigger() {
1894            match emulation_trigger {
1895                TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1896                _ => self.index.orders_emulated.insert(client_order_id),
1897            };
1898        }
1899
1900        // Update own book
1901        if self.own_order_book(&order.instrument_id()).is_some()
1902            && should_handle_own_book_order(order)
1903        {
1904            self.update_own_order_book(order);
1905        }
1906
1907        if let Some(database) = &mut self.database {
1908            database.update_order(order.last_event())?;
1909            // TODO: Implement order snapshots
1910            // if self.snapshot_orders {
1911            //     database.snapshot_order_state(order)?;
1912            // }
1913        }
1914
1915        // update the order in the cache
1916        self.orders.insert(client_order_id, order.clone());
1917
1918        Ok(())
1919    }
1920
1921    /// Updates the `order` as pending cancel locally.
1922    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1923        self.index
1924            .orders_pending_cancel
1925            .insert(order.client_order_id());
1926    }
1927
1928    /// Updates the `position` in the cache.
1929    ///
1930    /// # Errors
1931    ///
1932    /// Returns an error if updating the position in the database fails.
1933    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1934        // Update open/closed state
1935
1936        if position.is_open() {
1937            self.index.positions_open.insert(position.id);
1938            self.index.positions_closed.remove(&position.id);
1939        } else {
1940            self.index.positions_closed.insert(position.id);
1941            self.index.positions_open.remove(&position.id);
1942        }
1943
1944        if let Some(database) = &mut self.database {
1945            database.update_position(position)?;
1946            // TODO: Implement order snapshots
1947            // if self.snapshot_orders {
1948            //     database.snapshot_order_state(order)?;
1949            // }
1950        }
1951
1952        self.positions.insert(position.id, position.clone());
1953
1954        Ok(())
1955    }
1956
1957    /// Creates a snapshot of the `position` by cloning it, assigning a new ID,
1958    /// serializing it, and storing it in the position snapshots.
1959    ///
1960    /// # Errors
1961    ///
1962    /// Returns an error if serializing or storing the position snapshot fails.
1963    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1964        let position_id = position.id;
1965
1966        let mut copied_position = position.clone();
1967        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1968        copied_position.id = PositionId::new(new_id);
1969
1970        // Serialize the position (TODO: temporily just to JSON to remove a dependency)
1971        let position_serialized = serde_json::to_vec(&copied_position)?;
1972
1973        let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1974        let new_snapshots = match snapshots {
1975            Some(existing_snapshots) => {
1976                let mut combined = existing_snapshots.to_vec();
1977                combined.extend(position_serialized);
1978                Bytes::from(combined)
1979            }
1980            None => Bytes::from(position_serialized),
1981        };
1982        self.position_snapshots.insert(position_id, new_snapshots);
1983
1984        log::debug!("Snapshot {copied_position}");
1985        Ok(())
1986    }
1987
1988    /// Creates a snapshot of the `position` state in the database.
1989    ///
1990    /// # Errors
1991    ///
1992    /// Returns an error if snapshotting the position state fails.
1993    pub fn snapshot_position_state(
1994        &mut self,
1995        position: &Position,
1996        // ts_snapshot: u64,
1997        // unrealized_pnl: Option<Money>,
1998        open_only: Option<bool>,
1999    ) -> anyhow::Result<()> {
2000        let open_only = open_only.unwrap_or(true);
2001
2002        if open_only && !position.is_open() {
2003            return Ok(());
2004        }
2005
2006        if let Some(database) = &mut self.database {
2007            database.snapshot_position_state(position).map_err(|e| {
2008                log::error!(
2009                    "Failed to snapshot position state for {}: {e:?}",
2010                    position.id
2011                );
2012                e
2013            })?;
2014        } else {
2015            log::warn!(
2016                "Cannot snapshot position state for {} (no database configured)",
2017                position.id
2018            );
2019        }
2020
2021        // Ok(())
2022        todo!()
2023    }
2024
2025    /// Gets the OMS type for the `position_id`.
2026    #[must_use]
2027    pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2028        // Get OMS type from the index
2029        if self.index.position_strategy.contains_key(position_id) {
2030            // For now, we'll default to NETTING
2031            // TODO: Store and retrieve actual OMS type per position
2032            Some(OmsType::Netting)
2033        } else {
2034            None
2035        }
2036    }
2037
2038    /// Gets position snapshot bytes for the `position_id`.
2039    #[must_use]
2040    pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
2041        self.position_snapshots.get(position_id).map(|b| b.to_vec())
2042    }
2043
2044    /// Gets position snapshot IDs for the `instrument_id`.
2045    #[must_use]
2046    pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> HashSet<PositionId> {
2047        // Get snapshot position IDs that match the instrument
2048        let mut result = HashSet::new();
2049        for (position_id, _) in &self.position_snapshots {
2050            // Check if this position is for the requested instrument
2051            if let Some(position) = self.positions.get(position_id)
2052                && position.instrument_id == *instrument_id
2053            {
2054                result.insert(*position_id);
2055            }
2056        }
2057        result
2058    }
2059
2060    /// Snapshots the `order` state in the database.
2061    ///
2062    /// # Errors
2063    ///
2064    /// Returns an error if snapshotting the order state fails.
2065    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2066        let database = if let Some(database) = &self.database {
2067            database
2068        } else {
2069            log::warn!(
2070                "Cannot snapshot order state for {} (no database configured)",
2071                order.client_order_id()
2072            );
2073            return Ok(());
2074        };
2075
2076        database.snapshot_order_state(order)
2077    }
2078
2079    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
2080
2081    fn build_order_query_filter_set(
2082        &self,
2083        venue: Option<&Venue>,
2084        instrument_id: Option<&InstrumentId>,
2085        strategy_id: Option<&StrategyId>,
2086    ) -> Option<AHashSet<ClientOrderId>> {
2087        let mut query: Option<AHashSet<ClientOrderId>> = None;
2088
2089        if let Some(venue) = venue {
2090            query = Some(
2091                self.index
2092                    .venue_orders
2093                    .get(venue)
2094                    .cloned()
2095                    .unwrap_or_default(),
2096            );
2097        }
2098
2099        if let Some(instrument_id) = instrument_id {
2100            let instrument_orders = self
2101                .index
2102                .instrument_orders
2103                .get(instrument_id)
2104                .cloned()
2105                .unwrap_or_default();
2106
2107            if let Some(existing_query) = &mut query {
2108                *existing_query = existing_query
2109                    .intersection(&instrument_orders)
2110                    .copied()
2111                    .collect();
2112            } else {
2113                query = Some(instrument_orders);
2114            }
2115        }
2116
2117        if let Some(strategy_id) = strategy_id {
2118            let strategy_orders = self
2119                .index
2120                .strategy_orders
2121                .get(strategy_id)
2122                .cloned()
2123                .unwrap_or_default();
2124
2125            if let Some(existing_query) = &mut query {
2126                *existing_query = existing_query
2127                    .intersection(&strategy_orders)
2128                    .copied()
2129                    .collect();
2130            } else {
2131                query = Some(strategy_orders);
2132            }
2133        }
2134
2135        query
2136    }
2137
2138    fn build_position_query_filter_set(
2139        &self,
2140        venue: Option<&Venue>,
2141        instrument_id: Option<&InstrumentId>,
2142        strategy_id: Option<&StrategyId>,
2143    ) -> Option<AHashSet<PositionId>> {
2144        let mut query: Option<AHashSet<PositionId>> = None;
2145
2146        if let Some(venue) = venue {
2147            query = Some(
2148                self.index
2149                    .venue_positions
2150                    .get(venue)
2151                    .cloned()
2152                    .unwrap_or_default(),
2153            );
2154        }
2155
2156        if let Some(instrument_id) = instrument_id {
2157            let instrument_positions = self
2158                .index
2159                .instrument_positions
2160                .get(instrument_id)
2161                .cloned()
2162                .unwrap_or_default();
2163
2164            if let Some(existing_query) = query {
2165                query = Some(
2166                    existing_query
2167                        .intersection(&instrument_positions)
2168                        .copied()
2169                        .collect(),
2170                );
2171            } else {
2172                query = Some(instrument_positions);
2173            }
2174        }
2175
2176        if let Some(strategy_id) = strategy_id {
2177            let strategy_positions = self
2178                .index
2179                .strategy_positions
2180                .get(strategy_id)
2181                .cloned()
2182                .unwrap_or_default();
2183
2184            if let Some(existing_query) = query {
2185                query = Some(
2186                    existing_query
2187                        .intersection(&strategy_positions)
2188                        .copied()
2189                        .collect(),
2190                );
2191            } else {
2192                query = Some(strategy_positions);
2193            }
2194        }
2195
2196        query
2197    }
2198
2199    /// Retrieves orders corresponding to the `client_order_ids`, optionally filtering by `side`.
2200    ///
2201    /// # Panics
2202    ///
2203    /// Panics if any `client_order_id` in the set is not found in the cache.
2204    fn get_orders_for_ids(
2205        &self,
2206        client_order_ids: &AHashSet<ClientOrderId>,
2207        side: Option<OrderSide>,
2208    ) -> Vec<&OrderAny> {
2209        let side = side.unwrap_or(OrderSide::NoOrderSide);
2210        let mut orders = Vec::new();
2211
2212        for client_order_id in client_order_ids {
2213            let order = self
2214                .orders
2215                .get(client_order_id)
2216                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2217            if side == OrderSide::NoOrderSide || side == order.order_side() {
2218                orders.push(order);
2219            }
2220        }
2221
2222        orders
2223    }
2224
2225    /// Retrieves positions corresponding to the `position_ids`, optionally filtering by `side`.
2226    ///
2227    /// # Panics
2228    ///
2229    /// Panics if any `position_id` in the set is not found in the cache.
2230    fn get_positions_for_ids(
2231        &self,
2232        position_ids: &AHashSet<PositionId>,
2233        side: Option<PositionSide>,
2234    ) -> Vec<&Position> {
2235        let side = side.unwrap_or(PositionSide::NoPositionSide);
2236        let mut positions = Vec::new();
2237
2238        for position_id in position_ids {
2239            let position = self
2240                .positions
2241                .get(position_id)
2242                .unwrap_or_else(|| panic!("Position {position_id} not found"));
2243            if side == PositionSide::NoPositionSide || side == position.side {
2244                positions.push(position);
2245            }
2246        }
2247
2248        positions
2249    }
2250
2251    /// Returns the `ClientOrderId`s of all orders.
2252    #[must_use]
2253    pub fn client_order_ids(
2254        &self,
2255        venue: Option<&Venue>,
2256        instrument_id: Option<&InstrumentId>,
2257        strategy_id: Option<&StrategyId>,
2258    ) -> AHashSet<ClientOrderId> {
2259        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2260        match query {
2261            Some(query) => self.index.orders.intersection(&query).copied().collect(),
2262            None => self.index.orders.clone(),
2263        }
2264    }
2265
2266    /// Returns the `ClientOrderId`s of all open orders.
2267    #[must_use]
2268    pub fn client_order_ids_open(
2269        &self,
2270        venue: Option<&Venue>,
2271        instrument_id: Option<&InstrumentId>,
2272        strategy_id: Option<&StrategyId>,
2273    ) -> AHashSet<ClientOrderId> {
2274        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2275        match query {
2276            Some(query) => self
2277                .index
2278                .orders_open
2279                .intersection(&query)
2280                .copied()
2281                .collect(),
2282            None => self.index.orders_open.clone(),
2283        }
2284    }
2285
2286    /// Returns the `ClientOrderId`s of all closed orders.
2287    #[must_use]
2288    pub fn client_order_ids_closed(
2289        &self,
2290        venue: Option<&Venue>,
2291        instrument_id: Option<&InstrumentId>,
2292        strategy_id: Option<&StrategyId>,
2293    ) -> AHashSet<ClientOrderId> {
2294        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2295        match query {
2296            Some(query) => self
2297                .index
2298                .orders_closed
2299                .intersection(&query)
2300                .copied()
2301                .collect(),
2302            None => self.index.orders_closed.clone(),
2303        }
2304    }
2305
2306    /// Returns the `ClientOrderId`s of all emulated orders.
2307    #[must_use]
2308    pub fn client_order_ids_emulated(
2309        &self,
2310        venue: Option<&Venue>,
2311        instrument_id: Option<&InstrumentId>,
2312        strategy_id: Option<&StrategyId>,
2313    ) -> AHashSet<ClientOrderId> {
2314        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2315        match query {
2316            Some(query) => self
2317                .index
2318                .orders_emulated
2319                .intersection(&query)
2320                .copied()
2321                .collect(),
2322            None => self.index.orders_emulated.clone(),
2323        }
2324    }
2325
2326    /// Returns the `ClientOrderId`s of all in-flight orders.
2327    #[must_use]
2328    pub fn client_order_ids_inflight(
2329        &self,
2330        venue: Option<&Venue>,
2331        instrument_id: Option<&InstrumentId>,
2332        strategy_id: Option<&StrategyId>,
2333    ) -> AHashSet<ClientOrderId> {
2334        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2335        match query {
2336            Some(query) => self
2337                .index
2338                .orders_inflight
2339                .intersection(&query)
2340                .copied()
2341                .collect(),
2342            None => self.index.orders_inflight.clone(),
2343        }
2344    }
2345
2346    /// Returns `PositionId`s of all positions.
2347    #[must_use]
2348    pub fn position_ids(
2349        &self,
2350        venue: Option<&Venue>,
2351        instrument_id: Option<&InstrumentId>,
2352        strategy_id: Option<&StrategyId>,
2353    ) -> AHashSet<PositionId> {
2354        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2355        match query {
2356            Some(query) => self.index.positions.intersection(&query).copied().collect(),
2357            None => self.index.positions.clone(),
2358        }
2359    }
2360
2361    /// Returns the `PositionId`s of all open positions.
2362    #[must_use]
2363    pub fn position_open_ids(
2364        &self,
2365        venue: Option<&Venue>,
2366        instrument_id: Option<&InstrumentId>,
2367        strategy_id: Option<&StrategyId>,
2368    ) -> AHashSet<PositionId> {
2369        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2370        match query {
2371            Some(query) => self
2372                .index
2373                .positions_open
2374                .intersection(&query)
2375                .copied()
2376                .collect(),
2377            None => self.index.positions_open.clone(),
2378        }
2379    }
2380
2381    /// Returns the `PositionId`s of all closed positions.
2382    #[must_use]
2383    pub fn position_closed_ids(
2384        &self,
2385        venue: Option<&Venue>,
2386        instrument_id: Option<&InstrumentId>,
2387        strategy_id: Option<&StrategyId>,
2388    ) -> AHashSet<PositionId> {
2389        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2390        match query {
2391            Some(query) => self
2392                .index
2393                .positions_closed
2394                .intersection(&query)
2395                .copied()
2396                .collect(),
2397            None => self.index.positions_closed.clone(),
2398        }
2399    }
2400
2401    /// Returns the `ComponentId`s of all actors.
2402    #[must_use]
2403    pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2404        self.index.actors.clone()
2405    }
2406
2407    /// Returns the `StrategyId`s of all strategies.
2408    #[must_use]
2409    pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2410        self.index.strategies.clone()
2411    }
2412
2413    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
2414    #[must_use]
2415    pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2416        self.index.exec_algorithms.clone()
2417    }
2418
2419    // -- ORDER QUERIES ---------------------------------------------------------------------------
2420
2421    /// Gets a reference to the order with the `client_order_id` (if found).
2422    #[must_use]
2423    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2424        self.orders.get(client_order_id)
2425    }
2426
2427    /// Gets a reference to the order with the `client_order_id` (if found).
2428    #[must_use]
2429    pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2430        self.orders.get_mut(client_order_id)
2431    }
2432
2433    /// Gets a reference to the client order ID for the `venue_order_id` (if found).
2434    #[must_use]
2435    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2436        self.index.venue_order_ids.get(venue_order_id)
2437    }
2438
2439    /// Gets a reference to the venue order ID for the `client_order_id` (if found).
2440    #[must_use]
2441    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2442        self.index.client_order_ids.get(client_order_id)
2443    }
2444
2445    /// Gets a reference to the client ID indexed for then `client_order_id` (if found).
2446    #[must_use]
2447    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2448        self.index.order_client.get(client_order_id)
2449    }
2450
2451    /// Returns references to all orders matching the optional filter parameters.
2452    #[must_use]
2453    pub fn orders(
2454        &self,
2455        venue: Option<&Venue>,
2456        instrument_id: Option<&InstrumentId>,
2457        strategy_id: Option<&StrategyId>,
2458        side: Option<OrderSide>,
2459    ) -> Vec<&OrderAny> {
2460        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2461        self.get_orders_for_ids(&client_order_ids, side)
2462    }
2463
2464    /// Returns references to all open orders matching the optional filter parameters.
2465    #[must_use]
2466    pub fn orders_open(
2467        &self,
2468        venue: Option<&Venue>,
2469        instrument_id: Option<&InstrumentId>,
2470        strategy_id: Option<&StrategyId>,
2471        side: Option<OrderSide>,
2472    ) -> Vec<&OrderAny> {
2473        let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2474        self.get_orders_for_ids(&client_order_ids, side)
2475    }
2476
2477    /// Returns references to all closed orders matching the optional filter parameters.
2478    #[must_use]
2479    pub fn orders_closed(
2480        &self,
2481        venue: Option<&Venue>,
2482        instrument_id: Option<&InstrumentId>,
2483        strategy_id: Option<&StrategyId>,
2484        side: Option<OrderSide>,
2485    ) -> Vec<&OrderAny> {
2486        let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2487        self.get_orders_for_ids(&client_order_ids, side)
2488    }
2489
2490    /// Returns references to all emulated orders matching the optional filter parameters.
2491    #[must_use]
2492    pub fn orders_emulated(
2493        &self,
2494        venue: Option<&Venue>,
2495        instrument_id: Option<&InstrumentId>,
2496        strategy_id: Option<&StrategyId>,
2497        side: Option<OrderSide>,
2498    ) -> Vec<&OrderAny> {
2499        let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2500        self.get_orders_for_ids(&client_order_ids, side)
2501    }
2502
2503    /// Returns references to all in-flight orders matching the optional filter parameters.
2504    #[must_use]
2505    pub fn orders_inflight(
2506        &self,
2507        venue: Option<&Venue>,
2508        instrument_id: Option<&InstrumentId>,
2509        strategy_id: Option<&StrategyId>,
2510        side: Option<OrderSide>,
2511    ) -> Vec<&OrderAny> {
2512        let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2513        self.get_orders_for_ids(&client_order_ids, side)
2514    }
2515
2516    /// Returns references to all orders for the `position_id`.
2517    #[must_use]
2518    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2519        let client_order_ids = self.index.position_orders.get(position_id);
2520        match client_order_ids {
2521            Some(client_order_ids) => {
2522                self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2523            }
2524            None => Vec::new(),
2525        }
2526    }
2527
2528    /// Returns whether an order with the `client_order_id` exists.
2529    #[must_use]
2530    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2531        self.index.orders.contains(client_order_id)
2532    }
2533
2534    /// Returns whether an order with the `client_order_id` is open.
2535    #[must_use]
2536    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2537        self.index.orders_open.contains(client_order_id)
2538    }
2539
2540    /// Returns whether an order with the `client_order_id` is closed.
2541    #[must_use]
2542    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2543        self.index.orders_closed.contains(client_order_id)
2544    }
2545
2546    /// Returns whether an order with the `client_order_id` is emulated.
2547    #[must_use]
2548    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2549        self.index.orders_emulated.contains(client_order_id)
2550    }
2551
2552    /// Returns whether an order with the `client_order_id` is in-flight.
2553    #[must_use]
2554    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2555        self.index.orders_inflight.contains(client_order_id)
2556    }
2557
2558    /// Returns whether an order with the `client_order_id` is `PENDING_CANCEL` locally.
2559    #[must_use]
2560    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2561        self.index.orders_pending_cancel.contains(client_order_id)
2562    }
2563
2564    /// Returns the count of all open orders.
2565    #[must_use]
2566    pub fn orders_open_count(
2567        &self,
2568        venue: Option<&Venue>,
2569        instrument_id: Option<&InstrumentId>,
2570        strategy_id: Option<&StrategyId>,
2571        side: Option<OrderSide>,
2572    ) -> usize {
2573        self.orders_open(venue, instrument_id, strategy_id, side)
2574            .len()
2575    }
2576
2577    /// Returns the count of all closed orders.
2578    #[must_use]
2579    pub fn orders_closed_count(
2580        &self,
2581        venue: Option<&Venue>,
2582        instrument_id: Option<&InstrumentId>,
2583        strategy_id: Option<&StrategyId>,
2584        side: Option<OrderSide>,
2585    ) -> usize {
2586        self.orders_closed(venue, instrument_id, strategy_id, side)
2587            .len()
2588    }
2589
2590    /// Returns the count of all emulated orders.
2591    #[must_use]
2592    pub fn orders_emulated_count(
2593        &self,
2594        venue: Option<&Venue>,
2595        instrument_id: Option<&InstrumentId>,
2596        strategy_id: Option<&StrategyId>,
2597        side: Option<OrderSide>,
2598    ) -> usize {
2599        self.orders_emulated(venue, instrument_id, strategy_id, side)
2600            .len()
2601    }
2602
2603    /// Returns the count of all in-flight orders.
2604    #[must_use]
2605    pub fn orders_inflight_count(
2606        &self,
2607        venue: Option<&Venue>,
2608        instrument_id: Option<&InstrumentId>,
2609        strategy_id: Option<&StrategyId>,
2610        side: Option<OrderSide>,
2611    ) -> usize {
2612        self.orders_inflight(venue, instrument_id, strategy_id, side)
2613            .len()
2614    }
2615
2616    /// Returns the count of all orders.
2617    #[must_use]
2618    pub fn orders_total_count(
2619        &self,
2620        venue: Option<&Venue>,
2621        instrument_id: Option<&InstrumentId>,
2622        strategy_id: Option<&StrategyId>,
2623        side: Option<OrderSide>,
2624    ) -> usize {
2625        self.orders(venue, instrument_id, strategy_id, side).len()
2626    }
2627
2628    /// Returns the order list for the `order_list_id`.
2629    #[must_use]
2630    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2631        self.order_lists.get(order_list_id)
2632    }
2633
2634    /// Returns all order lists matching the optional filter parameters.
2635    #[must_use]
2636    pub fn order_lists(
2637        &self,
2638        venue: Option<&Venue>,
2639        instrument_id: Option<&InstrumentId>,
2640        strategy_id: Option<&StrategyId>,
2641    ) -> Vec<&OrderList> {
2642        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2643
2644        if let Some(venue) = venue {
2645            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2646        }
2647
2648        if let Some(instrument_id) = instrument_id {
2649            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2650        }
2651
2652        if let Some(strategy_id) = strategy_id {
2653            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2654        }
2655
2656        order_lists
2657    }
2658
2659    /// Returns whether an order list with the `order_list_id` exists.
2660    #[must_use]
2661    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2662        self.order_lists.contains_key(order_list_id)
2663    }
2664
2665    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
2666
2667    /// Returns references to all orders associated with the `exec_algorithm_id` matching the
2668    /// optional filter parameters.
2669    #[must_use]
2670    pub fn orders_for_exec_algorithm(
2671        &self,
2672        exec_algorithm_id: &ExecAlgorithmId,
2673        venue: Option<&Venue>,
2674        instrument_id: Option<&InstrumentId>,
2675        strategy_id: Option<&StrategyId>,
2676        side: Option<OrderSide>,
2677    ) -> Vec<&OrderAny> {
2678        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2679        let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2680
2681        if let Some(query) = query
2682            && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
2683        {
2684            let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2685        }
2686
2687        if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2688            self.get_orders_for_ids(exec_algorithm_order_ids, side)
2689        } else {
2690            Vec::new()
2691        }
2692    }
2693
2694    /// Returns references to all orders with the `exec_spawn_id`.
2695    #[must_use]
2696    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2697        self.get_orders_for_ids(
2698            self.index
2699                .exec_spawn_orders
2700                .get(exec_spawn_id)
2701                .unwrap_or(&AHashSet::new()),
2702            None,
2703        )
2704    }
2705
2706    /// Returns the total order quantity for the `exec_spawn_id`.
2707    #[must_use]
2708    pub fn exec_spawn_total_quantity(
2709        &self,
2710        exec_spawn_id: &ClientOrderId,
2711        active_only: bool,
2712    ) -> Option<Quantity> {
2713        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2714
2715        let mut total_quantity: Option<Quantity> = None;
2716
2717        for spawn_order in exec_spawn_orders {
2718            if !active_only || !spawn_order.is_closed() {
2719                if let Some(mut total_quantity) = total_quantity {
2720                    total_quantity += spawn_order.quantity();
2721                }
2722            } else {
2723                total_quantity = Some(spawn_order.quantity());
2724            }
2725        }
2726
2727        total_quantity
2728    }
2729
2730    /// Returns the total filled quantity for all orders with the `exec_spawn_id`.
2731    #[must_use]
2732    pub fn exec_spawn_total_filled_qty(
2733        &self,
2734        exec_spawn_id: &ClientOrderId,
2735        active_only: bool,
2736    ) -> Option<Quantity> {
2737        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2738
2739        let mut total_quantity: Option<Quantity> = None;
2740
2741        for spawn_order in exec_spawn_orders {
2742            if !active_only || !spawn_order.is_closed() {
2743                if let Some(mut total_quantity) = total_quantity {
2744                    total_quantity += spawn_order.filled_qty();
2745                }
2746            } else {
2747                total_quantity = Some(spawn_order.filled_qty());
2748            }
2749        }
2750
2751        total_quantity
2752    }
2753
2754    /// Returns the total leaves quantity for all orders with the `exec_spawn_id`.
2755    #[must_use]
2756    pub fn exec_spawn_total_leaves_qty(
2757        &self,
2758        exec_spawn_id: &ClientOrderId,
2759        active_only: bool,
2760    ) -> Option<Quantity> {
2761        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2762
2763        let mut total_quantity: Option<Quantity> = None;
2764
2765        for spawn_order in exec_spawn_orders {
2766            if !active_only || !spawn_order.is_closed() {
2767                if let Some(mut total_quantity) = total_quantity {
2768                    total_quantity += spawn_order.leaves_qty();
2769                }
2770            } else {
2771                total_quantity = Some(spawn_order.leaves_qty());
2772            }
2773        }
2774
2775        total_quantity
2776    }
2777
2778    // -- POSITION QUERIES ------------------------------------------------------------------------
2779
2780    /// Returns a reference to the position with the `position_id` (if found).
2781    #[must_use]
2782    pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2783        self.positions.get(position_id)
2784    }
2785
2786    /// Returns a reference to the position for the `client_order_id` (if found).
2787    #[must_use]
2788    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2789        self.index
2790            .order_position
2791            .get(client_order_id)
2792            .and_then(|position_id| self.positions.get(position_id))
2793    }
2794
2795    /// Returns a reference to the position ID for the `client_order_id` (if found).
2796    #[must_use]
2797    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2798        self.index.order_position.get(client_order_id)
2799    }
2800
2801    /// Returns a reference to all positions matching the optional filter parameters.
2802    #[must_use]
2803    pub fn positions(
2804        &self,
2805        venue: Option<&Venue>,
2806        instrument_id: Option<&InstrumentId>,
2807        strategy_id: Option<&StrategyId>,
2808        side: Option<PositionSide>,
2809    ) -> Vec<&Position> {
2810        let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2811        self.get_positions_for_ids(&position_ids, side)
2812    }
2813
2814    /// Returns a reference to all open positions matching the optional filter parameters.
2815    #[must_use]
2816    pub fn positions_open(
2817        &self,
2818        venue: Option<&Venue>,
2819        instrument_id: Option<&InstrumentId>,
2820        strategy_id: Option<&StrategyId>,
2821        side: Option<PositionSide>,
2822    ) -> Vec<&Position> {
2823        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2824        self.get_positions_for_ids(&position_ids, side)
2825    }
2826
2827    /// Returns a reference to all closed positions matching the optional filter parameters.
2828    #[must_use]
2829    pub fn positions_closed(
2830        &self,
2831        venue: Option<&Venue>,
2832        instrument_id: Option<&InstrumentId>,
2833        strategy_id: Option<&StrategyId>,
2834        side: Option<PositionSide>,
2835    ) -> Vec<&Position> {
2836        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2837        self.get_positions_for_ids(&position_ids, side)
2838    }
2839
2840    /// Returns whether a position with the `position_id` exists.
2841    #[must_use]
2842    pub fn position_exists(&self, position_id: &PositionId) -> bool {
2843        self.index.positions.contains(position_id)
2844    }
2845
2846    /// Returns whether a position with the `position_id` is open.
2847    #[must_use]
2848    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2849        self.index.positions_open.contains(position_id)
2850    }
2851
2852    /// Returns whether a position with the `position_id` is closed.
2853    #[must_use]
2854    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2855        self.index.positions_closed.contains(position_id)
2856    }
2857
2858    /// Returns the count of all open positions.
2859    #[must_use]
2860    pub fn positions_open_count(
2861        &self,
2862        venue: Option<&Venue>,
2863        instrument_id: Option<&InstrumentId>,
2864        strategy_id: Option<&StrategyId>,
2865        side: Option<PositionSide>,
2866    ) -> usize {
2867        self.positions_open(venue, instrument_id, strategy_id, side)
2868            .len()
2869    }
2870
2871    /// Returns the count of all closed positions.
2872    #[must_use]
2873    pub fn positions_closed_count(
2874        &self,
2875        venue: Option<&Venue>,
2876        instrument_id: Option<&InstrumentId>,
2877        strategy_id: Option<&StrategyId>,
2878        side: Option<PositionSide>,
2879    ) -> usize {
2880        self.positions_closed(venue, instrument_id, strategy_id, side)
2881            .len()
2882    }
2883
2884    /// Returns the count of all positions.
2885    #[must_use]
2886    pub fn positions_total_count(
2887        &self,
2888        venue: Option<&Venue>,
2889        instrument_id: Option<&InstrumentId>,
2890        strategy_id: Option<&StrategyId>,
2891        side: Option<PositionSide>,
2892    ) -> usize {
2893        self.positions(venue, instrument_id, strategy_id, side)
2894            .len()
2895    }
2896
2897    // -- STRATEGY QUERIES ------------------------------------------------------------------------
2898
2899    /// Gets a reference to the strategy ID for the `client_order_id` (if found).
2900    #[must_use]
2901    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2902        self.index.order_strategy.get(client_order_id)
2903    }
2904
2905    /// Gets a reference to the strategy ID for the `position_id` (if found).
2906    #[must_use]
2907    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2908        self.index.position_strategy.get(position_id)
2909    }
2910
2911    // -- GENERAL ---------------------------------------------------------------------------------
2912
2913    /// Gets a reference to the general value for the `key` (if found).
2914    ///
2915    /// # Errors
2916    ///
2917    /// Returns an error if the `key` is invalid.
2918    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2919        check_valid_string_ascii(key, stringify!(key))?;
2920
2921        Ok(self.general.get(key))
2922    }
2923
2924    // -- DATA QUERIES ----------------------------------------------------------------------------
2925
2926    /// Returns the price for the `instrument_id` and `price_type` (if found).
2927    #[must_use]
2928    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2929        match price_type {
2930            PriceType::Bid => self
2931                .quotes
2932                .get(instrument_id)
2933                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2934            PriceType::Ask => self
2935                .quotes
2936                .get(instrument_id)
2937                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2938            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2939                quotes.front().map(|quote| {
2940                    Price::new(
2941                        f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2942                        quote.bid_price.precision + 1,
2943                    )
2944                })
2945            }),
2946            PriceType::Last => self
2947                .trades
2948                .get(instrument_id)
2949                .and_then(|trades| trades.front().map(|trade| trade.price)),
2950            PriceType::Mark => self
2951                .mark_prices
2952                .get(instrument_id)
2953                .and_then(|marks| marks.front().map(|mark| mark.value)),
2954        }
2955    }
2956
2957    /// Gets all quotes for the `instrument_id`.
2958    #[must_use]
2959    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2960        self.quotes
2961            .get(instrument_id)
2962            .map(|quotes| quotes.iter().copied().collect())
2963    }
2964
2965    /// Gets all trades for the `instrument_id`.
2966    #[must_use]
2967    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2968        self.trades
2969            .get(instrument_id)
2970            .map(|trades| trades.iter().copied().collect())
2971    }
2972
2973    /// Gets all mark price updates for the `instrument_id`.
2974    #[must_use]
2975    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
2976        self.mark_prices
2977            .get(instrument_id)
2978            .map(|mark_prices| mark_prices.iter().copied().collect())
2979    }
2980
2981    /// Gets all index price updates for the `instrument_id`.
2982    #[must_use]
2983    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
2984        self.index_prices
2985            .get(instrument_id)
2986            .map(|index_prices| index_prices.iter().copied().collect())
2987    }
2988
2989    /// Gets all bars for the `bar_type`.
2990    #[must_use]
2991    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2992        self.bars
2993            .get(bar_type)
2994            .map(|bars| bars.iter().copied().collect())
2995    }
2996
2997    /// Gets a reference to the order book for the `instrument_id`.
2998    #[must_use]
2999    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3000        self.books.get(instrument_id)
3001    }
3002
3003    /// Gets a reference to the order book for the `instrument_id`.
3004    #[must_use]
3005    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3006        self.books.get_mut(instrument_id)
3007    }
3008
3009    /// Gets a reference to the own order book for the `instrument_id`.
3010    #[must_use]
3011    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3012        self.own_books.get(instrument_id)
3013    }
3014
3015    /// Gets a reference to the own order book for the `instrument_id`.
3016    #[must_use]
3017    pub fn own_order_book_mut(
3018        &mut self,
3019        instrument_id: &InstrumentId,
3020    ) -> Option<&mut OwnOrderBook> {
3021        self.own_books.get_mut(instrument_id)
3022    }
3023
3024    /// Gets a reference to the latest quote for the `instrument_id`.
3025    #[must_use]
3026    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3027        self.quotes
3028            .get(instrument_id)
3029            .and_then(|quotes| quotes.front())
3030    }
3031
3032    /// Gets a reference to the latest trade for the `instrument_id`.
3033    #[must_use]
3034    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3035        self.trades
3036            .get(instrument_id)
3037            .and_then(|trades| trades.front())
3038    }
3039
3040    /// Gets a referenece to the latest mark price update for the `instrument_id`.
3041    #[must_use]
3042    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3043        self.mark_prices
3044            .get(instrument_id)
3045            .and_then(|mark_prices| mark_prices.front())
3046    }
3047
3048    /// Gets a referenece to the latest index price update for the `instrument_id`.
3049    #[must_use]
3050    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3051        self.index_prices
3052            .get(instrument_id)
3053            .and_then(|index_prices| index_prices.front())
3054    }
3055
3056    /// Gets a reference to the funding rate update for the `instrument_id`.
3057    #[must_use]
3058    pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3059        self.funding_rates.get(instrument_id)
3060    }
3061
3062    /// Gets a reference to the latest bar for the `bar_type`.
3063    #[must_use]
3064    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3065        self.bars.get(bar_type).and_then(|bars| bars.front())
3066    }
3067
3068    /// Gets the order book update count for the `instrument_id`.
3069    #[must_use]
3070    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3071        self.books
3072            .get(instrument_id)
3073            .map_or(0, |book| book.update_count) as usize
3074    }
3075
3076    /// Gets the quote tick count for the `instrument_id`.
3077    #[must_use]
3078    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3079        self.quotes
3080            .get(instrument_id)
3081            .map_or(0, std::collections::VecDeque::len)
3082    }
3083
3084    /// Gets the trade tick count for the `instrument_id`.
3085    #[must_use]
3086    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3087        self.trades
3088            .get(instrument_id)
3089            .map_or(0, std::collections::VecDeque::len)
3090    }
3091
3092    /// Gets the bar count for the `instrument_id`.
3093    #[must_use]
3094    pub fn bar_count(&self, bar_type: &BarType) -> usize {
3095        self.bars
3096            .get(bar_type)
3097            .map_or(0, std::collections::VecDeque::len)
3098    }
3099
3100    /// Returns whether the cache contains an order book for the `instrument_id`.
3101    #[must_use]
3102    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3103        self.books.contains_key(instrument_id)
3104    }
3105
3106    /// Returns whether the cache contains quotes for the `instrument_id`.
3107    #[must_use]
3108    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3109        self.quote_count(instrument_id) > 0
3110    }
3111
3112    /// Returns whether the cache contains trades for the `instrument_id`.
3113    #[must_use]
3114    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3115        self.trade_count(instrument_id) > 0
3116    }
3117
3118    /// Returns whether the cache contains bars for the `bar_type`.
3119    #[must_use]
3120    pub fn has_bars(&self, bar_type: &BarType) -> bool {
3121        self.bar_count(bar_type) > 0
3122    }
3123
3124    #[must_use]
3125    pub fn get_xrate(
3126        &self,
3127        venue: Venue,
3128        from_currency: Currency,
3129        to_currency: Currency,
3130        price_type: PriceType,
3131    ) -> Option<f64> {
3132        if from_currency == to_currency {
3133            // When the source and target currencies are identical,
3134            // no conversion is needed; return an exchange rate of 1.0.
3135            return Some(1.0);
3136        }
3137
3138        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3139
3140        match get_exchange_rate(
3141            from_currency.code,
3142            to_currency.code,
3143            price_type,
3144            bid_quote,
3145            ask_quote,
3146        ) {
3147            Ok(rate) => rate,
3148            Err(e) => {
3149                log::error!("Failed to calculate xrate: {e}");
3150                None
3151            }
3152        }
3153    }
3154
3155    fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3156        let mut bid_quotes = AHashMap::new();
3157        let mut ask_quotes = AHashMap::new();
3158
3159        for instrument_id in self.instruments.keys() {
3160            if instrument_id.venue != *venue {
3161                continue;
3162            }
3163
3164            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3165                if let Some(tick) = ticks.front() {
3166                    (tick.bid_price, tick.ask_price)
3167                } else {
3168                    continue; // Empty ticks vector
3169                }
3170            } else {
3171                let bid_bar = self
3172                    .bars
3173                    .iter()
3174                    .find(|(k, _)| {
3175                        k.instrument_id() == *instrument_id
3176                            && matches!(k.spec().price_type, PriceType::Bid)
3177                    })
3178                    .map(|(_, v)| v);
3179
3180                let ask_bar = self
3181                    .bars
3182                    .iter()
3183                    .find(|(k, _)| {
3184                        k.instrument_id() == *instrument_id
3185                            && matches!(k.spec().price_type, PriceType::Ask)
3186                    })
3187                    .map(|(_, v)| v);
3188
3189                match (bid_bar, ask_bar) {
3190                    (Some(bid), Some(ask)) => {
3191                        let bid_price = bid.front().unwrap().close;
3192                        let ask_price = ask.front().unwrap().close;
3193
3194                        (bid_price, ask_price)
3195                    }
3196                    _ => continue,
3197                }
3198            };
3199
3200            bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3201            ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3202        }
3203
3204        (bid_quotes, ask_quotes)
3205    }
3206
3207    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
3208    #[must_use]
3209    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3210        self.mark_xrates.get(&(from_currency, to_currency)).copied()
3211    }
3212
3213    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
3214    ///
3215    /// # Panics
3216    ///
3217    /// Panics if `xrate` is not positive.
3218    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3219        assert!(xrate > 0.0, "xrate was zero");
3220        self.mark_xrates.insert((from_currency, to_currency), xrate);
3221        self.mark_xrates
3222            .insert((to_currency, from_currency), 1.0 / xrate);
3223    }
3224
3225    /// Clears the mark exchange rate for the given currency pair.
3226    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3227        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3228    }
3229
3230    /// Clears all mark exchange rates.
3231    pub fn clear_mark_xrates(&mut self) {
3232        self.mark_xrates.clear();
3233    }
3234
3235    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
3236
3237    /// Returns a reference to the instrument for the `instrument_id` (if found).
3238    #[must_use]
3239    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3240        self.instruments.get(instrument_id)
3241    }
3242
3243    /// Returns references to all instrument IDs for the `venue`.
3244    #[must_use]
3245    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3246        match venue {
3247            Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3248            None => self.instruments.keys().collect(),
3249        }
3250    }
3251
3252    /// Returns references to all instruments for the `venue`.
3253    #[must_use]
3254    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3255        self.instruments
3256            .values()
3257            .filter(|i| &i.id().venue == venue)
3258            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3259            .collect()
3260    }
3261
3262    /// Returns references to all bar types contained in the cache.
3263    #[must_use]
3264    pub fn bar_types(
3265        &self,
3266        instrument_id: Option<&InstrumentId>,
3267        price_type: Option<&PriceType>,
3268        aggregation_source: AggregationSource,
3269    ) -> Vec<&BarType> {
3270        let mut bar_types = self
3271            .bars
3272            .keys()
3273            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3274            .collect::<Vec<&BarType>>();
3275
3276        if let Some(instrument_id) = instrument_id {
3277            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3278        }
3279
3280        if let Some(price_type) = price_type {
3281            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3282        }
3283
3284        bar_types
3285    }
3286
3287    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
3288
3289    /// Returns a reference to the synthetic instrument for the `instrument_id` (if found).
3290    #[must_use]
3291    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3292        self.synthetics.get(instrument_id)
3293    }
3294
3295    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
3296    #[must_use]
3297    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3298        self.synthetics.keys().collect()
3299    }
3300
3301    /// Returns references to all synthetic instruments contained in the cache.
3302    #[must_use]
3303    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3304        self.synthetics.values().collect()
3305    }
3306
3307    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
3308
3309    /// Returns a reference to the account for the `account_id` (if found).
3310    #[must_use]
3311    pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3312        self.accounts.get(account_id)
3313    }
3314
3315    /// Returns a reference to the account for the `venue` (if found).
3316    #[must_use]
3317    pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3318        self.index
3319            .venue_account
3320            .get(venue)
3321            .and_then(|account_id| self.accounts.get(account_id))
3322    }
3323
3324    /// Returns a reference to the account ID for the `venue` (if found).
3325    #[must_use]
3326    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3327        self.index.venue_account.get(venue)
3328    }
3329
3330    /// Returns references to all accounts for the `account_id`.
3331    #[must_use]
3332    pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3333        self.accounts
3334            .values()
3335            .filter(|account| &account.id() == account_id)
3336            .collect()
3337    }
3338
3339    /// Updates the own order book with an order.
3340    ///
3341    /// This method adds, updates, or removes an order from the own order book
3342    /// based on the order's current state.
3343    ///
3344    /// Orders without prices (MARKET, etc.) are skipped as they cannot be
3345    /// represented in own books.
3346    pub fn update_own_order_book(&mut self, order: &OrderAny) {
3347        if !order.has_price() {
3348            return;
3349        }
3350
3351        let instrument_id = order.instrument_id();
3352
3353        let own_book = self
3354            .own_books
3355            .entry(instrument_id)
3356            .or_insert_with(|| OwnOrderBook::new(instrument_id));
3357
3358        let own_book_order = order.to_own_book_order();
3359
3360        if order.is_closed() {
3361            if let Err(e) = own_book.delete(own_book_order) {
3362                log::debug!(
3363                    "Failed to delete order {} from own book: {e}",
3364                    order.client_order_id(),
3365                );
3366            } else {
3367                log::debug!("Deleted order {} from own book", order.client_order_id());
3368            }
3369        } else {
3370            // Add or update the order in the own book
3371            if let Err(e) = own_book.update(own_book_order) {
3372                log::debug!(
3373                    "Failed to update order {} in own book: {e}; inserting instead",
3374                    order.client_order_id(),
3375                );
3376                own_book.add(own_book_order);
3377            }
3378            log::debug!("Updated order {} in own book", order.client_order_id());
3379        }
3380    }
3381
3382    /// Force removal of an order from own order books and clean up all indexes.
3383    ///
3384    /// This method is used when order event application fails and we need to ensure
3385    /// terminal orders are properly cleaned up from own books and all relevant indexes.
3386    /// Replicates the index cleanup that update_order performs for closed orders.
3387    pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
3388        let order = match self.orders.get(client_order_id) {
3389            Some(order) => order,
3390            None => return,
3391        };
3392
3393        self.index.orders_open.remove(client_order_id);
3394        self.index.orders_pending_cancel.remove(client_order_id);
3395        self.index.orders_inflight.remove(client_order_id);
3396        self.index.orders_emulated.remove(client_order_id);
3397
3398        if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
3399            && order.has_price()
3400        {
3401            let own_book_order = order.to_own_book_order();
3402            if let Err(e) = own_book.delete(own_book_order) {
3403                log::debug!("Could not force delete {client_order_id} from own book: {e}");
3404            } else {
3405                log::debug!("Force deleted {client_order_id} from own book");
3406            }
3407        }
3408
3409        self.index.orders_closed.insert(*client_order_id);
3410    }
3411
3412    /// Audit all own order books against open and inflight order indexes.
3413    ///
3414    /// Ensures closed orders are removed from own order books. This includes both
3415    /// orders tracked in `orders_open` (ACCEPTED, TRIGGERED, PENDING_*, PARTIALLY_FILLED)
3416    /// and `orders_inflight` (INITIALIZED, SUBMITTED) to prevent false positives
3417    /// during venue latency windows.
3418    pub fn audit_own_order_books(&mut self) {
3419        log::debug!("Starting own books audit");
3420        let start = std::time::Instant::now();
3421
3422        // Build union of open and inflight orders for audit,
3423        // this prevents false positives for SUBMITTED orders during venue latency.
3424        let valid_order_ids: HashSet<ClientOrderId> = self
3425            .index
3426            .orders_open
3427            .union(&self.index.orders_inflight)
3428            .copied()
3429            .collect();
3430
3431        for own_book in self.own_books.values_mut() {
3432            own_book.audit_open_orders(&valid_order_ids);
3433        }
3434
3435        log::debug!("Completed own books audit in {:?}", start.elapsed());
3436    }
3437}