nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! In-memory cache for market and execution data, with optional persistent backing.
17//!
18//! Provides methods to load, query, and update cached data such as instruments, orders, and prices.
19
20pub mod config;
21pub mod database;
22pub mod quote;
23
24mod index;
25
26#[cfg(test)]
27mod tests;
28
29use std::{
30    collections::VecDeque,
31    fmt::Debug,
32    time::{SystemTime, UNIX_EPOCH},
33};
34
35use ahash::{AHashMap, AHashSet};
36use bytes::Bytes;
37pub use config::CacheConfig; // Re-export
38use database::{CacheDatabaseAdapter, CacheMap};
39use index::CacheIndex;
40use nautilus_core::{
41    UUID4, UnixNanos,
42    correctness::{
43        check_key_not_in_map, check_predicate_false, check_slice_not_empty,
44        check_valid_string_ascii,
45    },
46    datetime::secs_to_nanos_unchecked,
47};
48use nautilus_model::{
49    accounts::{Account, AccountAny},
50    data::{
51        Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
52        TradeTick, YieldCurveData,
53    },
54    enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
55    identifiers::{
56        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
57        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
58    },
59    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
60    orderbook::{
61        OrderBook,
62        own::{OwnOrderBook, should_handle_own_book_order},
63    },
64    orders::{Order, OrderAny, OrderList},
65    position::Position,
66    types::{Currency, Money, Price, Quantity},
67};
68use ustr::Ustr;
69
70use crate::xrate::get_exchange_rate;
71
72/// A common in-memory `Cache` for market and execution related data.
73#[cfg_attr(
74    feature = "python",
75    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
76)]
77pub struct Cache {
78    config: CacheConfig,
79    index: CacheIndex,
80    database: Option<Box<dyn CacheDatabaseAdapter>>,
81    general: AHashMap<String, Bytes>,
82    currencies: AHashMap<Ustr, Currency>,
83    instruments: AHashMap<InstrumentId, InstrumentAny>,
84    synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
85    books: AHashMap<InstrumentId, OrderBook>,
86    own_books: AHashMap<InstrumentId, OwnOrderBook>,
87    quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
88    trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
89    mark_xrates: AHashMap<(Currency, Currency), f64>,
90    mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
91    index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
92    funding_rates: AHashMap<InstrumentId, FundingRateUpdate>,
93    bars: AHashMap<BarType, VecDeque<Bar>>,
94    greeks: AHashMap<InstrumentId, GreeksData>,
95    yield_curves: AHashMap<String, YieldCurveData>,
96    accounts: AHashMap<AccountId, AccountAny>,
97    orders: AHashMap<ClientOrderId, OrderAny>,
98    order_lists: AHashMap<OrderListId, OrderList>,
99    positions: AHashMap<PositionId, Position>,
100    position_snapshots: AHashMap<PositionId, Bytes>,
101    #[cfg(feature = "defi")]
102    pub(crate) defi: crate::defi::cache::DefiCache,
103}
104
105impl Debug for Cache {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        f.debug_struct(stringify!(Cache))
108            .field("config", &self.config)
109            .field("index", &self.index)
110            .field("general", &self.general)
111            .field("currencies", &self.currencies)
112            .field("instruments", &self.instruments)
113            .field("synthetics", &self.synthetics)
114            .field("books", &self.books)
115            .field("own_books", &self.own_books)
116            .field("quotes", &self.quotes)
117            .field("trades", &self.trades)
118            .field("mark_xrates", &self.mark_xrates)
119            .field("mark_prices", &self.mark_prices)
120            .field("index_prices", &self.index_prices)
121            .field("funding_rates", &self.funding_rates)
122            .field("bars", &self.bars)
123            .field("greeks", &self.greeks)
124            .field("yield_curves", &self.yield_curves)
125            .field("accounts", &self.accounts)
126            .field("orders", &self.orders)
127            .field("order_lists", &self.order_lists)
128            .field("positions", &self.positions)
129            .field("position_snapshots", &self.position_snapshots)
130            .finish()
131    }
132}
133
134impl Default for Cache {
135    /// Creates a new default [`Cache`] instance.
136    fn default() -> Self {
137        Self::new(Some(CacheConfig::default()), None)
138    }
139}
140
141impl Cache {
142    /// Creates a new [`Cache`] instance with optional configuration and database adapter.
143    #[must_use]
144    /// # Note
145    ///
146    /// Uses provided `CacheConfig` or defaults, and optional `CacheDatabaseAdapter` for persistence.
147    pub fn new(
148        config: Option<CacheConfig>,
149        database: Option<Box<dyn CacheDatabaseAdapter>>,
150    ) -> Self {
151        Self {
152            config: config.unwrap_or_default(),
153            index: CacheIndex::default(),
154            database,
155            general: AHashMap::new(),
156            currencies: AHashMap::new(),
157            instruments: AHashMap::new(),
158            synthetics: AHashMap::new(),
159            books: AHashMap::new(),
160            own_books: AHashMap::new(),
161            quotes: AHashMap::new(),
162            trades: AHashMap::new(),
163            mark_xrates: AHashMap::new(),
164            mark_prices: AHashMap::new(),
165            index_prices: AHashMap::new(),
166            funding_rates: AHashMap::new(),
167            bars: AHashMap::new(),
168            greeks: AHashMap::new(),
169            yield_curves: AHashMap::new(),
170            accounts: AHashMap::new(),
171            orders: AHashMap::new(),
172            order_lists: AHashMap::new(),
173            positions: AHashMap::new(),
174            position_snapshots: AHashMap::new(),
175            #[cfg(feature = "defi")]
176            defi: crate::defi::cache::DefiCache::default(),
177        }
178    }
179
180    /// Returns the cache instances memory address.
181    #[must_use]
182    pub fn memory_address(&self) -> String {
183        format!("{:?}", std::ptr::from_ref(self))
184    }
185
186    /// Sets the cache database adapter for persistence.
187    ///
188    /// This allows setting or replacing the database adapter after cache construction.
189    pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
190        let type_name = std::any::type_name_of_val(&*database);
191        log::info!("Cache database adapter set: {type_name}");
192        self.database = Some(database);
193    }
194
195    // -- COMMANDS --------------------------------------------------------------------------------
196
197    /// Clears and reloads general entries from the database into the cache.
198    ///
199    /// # Errors
200    ///
201    /// Returns an error if loading general cache data fails.
202    pub fn cache_general(&mut self) -> anyhow::Result<()> {
203        self.general = match &mut self.database {
204            Some(db) => db.load()?,
205            None => AHashMap::new(),
206        };
207
208        log::info!(
209            "Cached {} general object(s) from database",
210            self.general.len()
211        );
212        Ok(())
213    }
214
215    /// Loads all core caches (currencies, instruments, accounts, orders, positions) from the database.
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if loading all cache data fails.
220    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
221        let cache_map = match &self.database {
222            Some(db) => db.load_all().await?,
223            None => CacheMap::default(),
224        };
225
226        self.currencies = cache_map.currencies;
227        self.instruments = cache_map.instruments;
228        self.synthetics = cache_map.synthetics;
229        self.accounts = cache_map.accounts;
230        self.orders = cache_map.orders;
231        self.positions = cache_map.positions;
232        Ok(())
233    }
234
235    /// Clears and reloads the currency cache from the database.
236    ///
237    /// # Errors
238    ///
239    /// Returns an error if loading currencies cache fails.
240    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
241        self.currencies = match &mut self.database {
242            Some(db) => db.load_currencies().await?,
243            None => AHashMap::new(),
244        };
245
246        log::info!("Cached {} currencies from database", self.general.len());
247        Ok(())
248    }
249
250    /// Clears and reloads the instrument cache from the database.
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if loading instruments cache fails.
255    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
256        self.instruments = match &mut self.database {
257            Some(db) => db.load_instruments().await?,
258            None => AHashMap::new(),
259        };
260
261        log::info!("Cached {} instruments from database", self.general.len());
262        Ok(())
263    }
264
265    /// Clears and reloads the synthetic instrument cache from the database.
266    ///
267    /// # Errors
268    ///
269    /// Returns an error if loading synthetic instruments cache fails.
270    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
271        self.synthetics = match &mut self.database {
272            Some(db) => db.load_synthetics().await?,
273            None => AHashMap::new(),
274        };
275
276        log::info!(
277            "Cached {} synthetic instruments from database",
278            self.general.len()
279        );
280        Ok(())
281    }
282
283    /// Clears and reloads the account cache from the database.
284    ///
285    /// # Errors
286    ///
287    /// Returns an error if loading accounts cache fails.
288    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
289        self.accounts = match &mut self.database {
290            Some(db) => db.load_accounts().await?,
291            None => AHashMap::new(),
292        };
293
294        log::info!(
295            "Cached {} synthetic instruments from database",
296            self.general.len()
297        );
298        Ok(())
299    }
300
301    /// Clears and reloads the order cache from the database.
302    ///
303    /// # Errors
304    ///
305    /// Returns an error if loading orders cache fails.
306    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
307        self.orders = match &mut self.database {
308            Some(db) => db.load_orders().await?,
309            None => AHashMap::new(),
310        };
311
312        log::info!("Cached {} orders from database", self.general.len());
313        Ok(())
314    }
315
316    /// Clears and reloads the position cache from the database.
317    ///
318    /// # Errors
319    ///
320    /// Returns an error if loading positions cache fails.
321    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
322        self.positions = match &mut self.database {
323            Some(db) => db.load_positions().await?,
324            None => AHashMap::new(),
325        };
326
327        log::info!("Cached {} positions from database", self.general.len());
328        Ok(())
329    }
330
331    /// Clears the current cache index and re-build.
332    pub fn build_index(&mut self) {
333        log::debug!("Building index");
334
335        // Index accounts
336        for account_id in self.accounts.keys() {
337            self.index
338                .venue_account
339                .insert(account_id.get_issuer(), *account_id);
340        }
341
342        // Index orders
343        for (client_order_id, order) in &self.orders {
344            let instrument_id = order.instrument_id();
345            let venue = instrument_id.venue;
346            let strategy_id = order.strategy_id();
347
348            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
349            self.index
350                .venue_orders
351                .entry(venue)
352                .or_default()
353                .insert(*client_order_id);
354
355            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
356            if let Some(venue_order_id) = order.venue_order_id() {
357                self.index
358                    .venue_order_ids
359                    .insert(venue_order_id, *client_order_id);
360            }
361
362            // 3: Build index.order_position -> {ClientOrderId, PositionId}
363            if let Some(position_id) = order.position_id() {
364                self.index
365                    .order_position
366                    .insert(*client_order_id, position_id);
367            }
368
369            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
370            self.index
371                .order_strategy
372                .insert(*client_order_id, order.strategy_id());
373
374            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
375            self.index
376                .instrument_orders
377                .entry(instrument_id)
378                .or_default()
379                .insert(*client_order_id);
380
381            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
382            self.index
383                .strategy_orders
384                .entry(strategy_id)
385                .or_default()
386                .insert(*client_order_id);
387
388            // 7: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
389            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
390                self.index
391                    .exec_algorithm_orders
392                    .entry(exec_algorithm_id)
393                    .or_default()
394                    .insert(*client_order_id);
395            }
396
397            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
398            if let Some(exec_spawn_id) = order.exec_spawn_id() {
399                self.index
400                    .exec_spawn_orders
401                    .entry(exec_spawn_id)
402                    .or_default()
403                    .insert(*client_order_id);
404            }
405
406            // 9: Build index.orders -> {ClientOrderId}
407            self.index.orders.insert(*client_order_id);
408
409            // 10: Build index.orders_open -> {ClientOrderId}
410            if order.is_open() {
411                self.index.orders_open.insert(*client_order_id);
412            }
413
414            // 11: Build index.orders_closed -> {ClientOrderId}
415            if order.is_closed() {
416                self.index.orders_closed.insert(*client_order_id);
417            }
418
419            // 12: Build index.orders_emulated -> {ClientOrderId}
420            if let Some(emulation_trigger) = order.emulation_trigger()
421                && emulation_trigger != TriggerType::NoTrigger
422                && !order.is_closed()
423            {
424                self.index.orders_emulated.insert(*client_order_id);
425            }
426
427            // 13: Build index.orders_inflight -> {ClientOrderId}
428            if order.is_inflight() {
429                self.index.orders_inflight.insert(*client_order_id);
430            }
431
432            // 14: Build index.strategies -> {StrategyId}
433            self.index.strategies.insert(strategy_id);
434
435            // 15: Build index.strategies -> {ExecAlgorithmId}
436            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
437                self.index.exec_algorithms.insert(exec_algorithm_id);
438            }
439        }
440
441        // Index positions
442        for (position_id, position) in &self.positions {
443            let instrument_id = position.instrument_id;
444            let venue = instrument_id.venue;
445            let strategy_id = position.strategy_id;
446
447            // 1: Build index.venue_positions -> {Venue, {PositionId}}
448            self.index
449                .venue_positions
450                .entry(venue)
451                .or_default()
452                .insert(*position_id);
453
454            // 2: Build index.position_strategy -> {PositionId, StrategyId}
455            self.index
456                .position_strategy
457                .insert(*position_id, position.strategy_id);
458
459            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
460            self.index
461                .position_orders
462                .entry(*position_id)
463                .or_default()
464                .extend(position.client_order_ids().into_iter());
465
466            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
467            self.index
468                .instrument_positions
469                .entry(instrument_id)
470                .or_default()
471                .insert(*position_id);
472
473            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
474            self.index
475                .strategy_positions
476                .entry(strategy_id)
477                .or_default()
478                .insert(*position_id);
479
480            // 6: Build index.positions -> {PositionId}
481            self.index.positions.insert(*position_id);
482
483            // 7: Build index.positions_open -> {PositionId}
484            if position.is_open() {
485                self.index.positions_open.insert(*position_id);
486            }
487
488            // 8: Build index.positions_closed -> {PositionId}
489            if position.is_closed() {
490                self.index.positions_closed.insert(*position_id);
491            }
492
493            // 9: Build index.strategies -> {StrategyId}
494            self.index.strategies.insert(strategy_id);
495        }
496    }
497
498    /// Returns whether the cache has a backing database.
499    #[must_use]
500    pub const fn has_backing(&self) -> bool {
501        self.config.database.is_some()
502    }
503
504    // Calculate the unrealized profit and loss (PnL) for `position`.
505    #[must_use]
506    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
507        let quote = if let Some(quote) = self.quote(&position.instrument_id) {
508            quote
509        } else {
510            log::warn!(
511                "Cannot calculate unrealized PnL for {}, no quotes for {}",
512                position.id,
513                position.instrument_id
514            );
515            return None;
516        };
517
518        // Use exit price for mark-to-market: longs exit at bid, shorts exit at ask
519        let last = match position.side {
520            PositionSide::Flat | PositionSide::NoPositionSide => {
521                return Some(Money::new(0.0, position.settlement_currency));
522            }
523            PositionSide::Long => quote.bid_price,
524            PositionSide::Short => quote.ask_price,
525        };
526
527        Some(position.unrealized_pnl(last))
528    }
529
530    /// Checks integrity of data within the cache.
531    ///
532    /// All data should be loaded from the database prior to this call.
533    /// If an error is found then a log error message will also be produced.
534    ///
535    /// # Panics
536    ///
537    /// Panics if failure calling system clock.
538    #[must_use]
539    pub fn check_integrity(&mut self) -> bool {
540        let mut error_count = 0;
541        let failure = "Integrity failure";
542
543        // Get current timestamp in microseconds
544        let timestamp_us = SystemTime::now()
545            .duration_since(UNIX_EPOCH)
546            .expect("Time went backwards")
547            .as_micros();
548
549        log::info!("Checking data integrity");
550
551        // Check object caches
552        for account_id in self.accounts.keys() {
553            if !self
554                .index
555                .venue_account
556                .contains_key(&account_id.get_issuer())
557            {
558                log::error!(
559                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
560                );
561                error_count += 1;
562            }
563        }
564
565        for (client_order_id, order) in &self.orders {
566            if !self.index.order_strategy.contains_key(client_order_id) {
567                log::error!(
568                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
569                );
570                error_count += 1;
571            }
572            if !self.index.orders.contains(client_order_id) {
573                log::error!(
574                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
575                );
576                error_count += 1;
577            }
578            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
579                log::error!(
580                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
581                );
582                error_count += 1;
583            }
584            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
585                log::error!(
586                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
587                );
588                error_count += 1;
589            }
590            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
591                log::error!(
592                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
593                );
594                error_count += 1;
595            }
596            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
597                if !self
598                    .index
599                    .exec_algorithm_orders
600                    .contains_key(&exec_algorithm_id)
601                {
602                    log::error!(
603                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
604                    );
605                    error_count += 1;
606                }
607                if order.exec_spawn_id().is_none()
608                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
609                {
610                    log::error!(
611                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
612                    );
613                    error_count += 1;
614                }
615            }
616        }
617
618        for (position_id, position) in &self.positions {
619            if !self.index.position_strategy.contains_key(position_id) {
620                log::error!(
621                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
622                );
623                error_count += 1;
624            }
625            if !self.index.position_orders.contains_key(position_id) {
626                log::error!(
627                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
628                );
629                error_count += 1;
630            }
631            if !self.index.positions.contains(position_id) {
632                log::error!(
633                    "{failure} in positions: {position_id} not found in `self.index.positions`",
634                );
635                error_count += 1;
636            }
637            if position.is_open() && !self.index.positions_open.contains(position_id) {
638                log::error!(
639                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
640                );
641                error_count += 1;
642            }
643            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
644                log::error!(
645                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
646                );
647                error_count += 1;
648            }
649        }
650
651        // Check indexes
652        for account_id in self.index.venue_account.values() {
653            if !self.accounts.contains_key(account_id) {
654                log::error!(
655                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
656                );
657                error_count += 1;
658            }
659        }
660
661        for client_order_id in self.index.venue_order_ids.values() {
662            if !self.orders.contains_key(client_order_id) {
663                log::error!(
664                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
665                );
666                error_count += 1;
667            }
668        }
669
670        for client_order_id in self.index.client_order_ids.keys() {
671            if !self.orders.contains_key(client_order_id) {
672                log::error!(
673                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
674                );
675                error_count += 1;
676            }
677        }
678
679        for client_order_id in self.index.order_position.keys() {
680            if !self.orders.contains_key(client_order_id) {
681                log::error!(
682                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
683                );
684                error_count += 1;
685            }
686        }
687
688        // Check indexes
689        for client_order_id in self.index.order_strategy.keys() {
690            if !self.orders.contains_key(client_order_id) {
691                log::error!(
692                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
693                );
694                error_count += 1;
695            }
696        }
697
698        for position_id in self.index.position_strategy.keys() {
699            if !self.positions.contains_key(position_id) {
700                log::error!(
701                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
702                );
703                error_count += 1;
704            }
705        }
706
707        for position_id in self.index.position_orders.keys() {
708            if !self.positions.contains_key(position_id) {
709                log::error!(
710                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
711                );
712                error_count += 1;
713            }
714        }
715
716        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
717            for client_order_id in client_order_ids {
718                if !self.orders.contains_key(client_order_id) {
719                    log::error!(
720                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
721                    );
722                    error_count += 1;
723                }
724            }
725        }
726
727        for instrument_id in self.index.instrument_positions.keys() {
728            if !self.index.instrument_orders.contains_key(instrument_id) {
729                log::error!(
730                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
731                );
732                error_count += 1;
733            }
734        }
735
736        for client_order_ids in self.index.strategy_orders.values() {
737            for client_order_id in client_order_ids {
738                if !self.orders.contains_key(client_order_id) {
739                    log::error!(
740                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
741                    );
742                    error_count += 1;
743                }
744            }
745        }
746
747        for position_ids in self.index.strategy_positions.values() {
748            for position_id in position_ids {
749                if !self.positions.contains_key(position_id) {
750                    log::error!(
751                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
752                    );
753                    error_count += 1;
754                }
755            }
756        }
757
758        for client_order_id in &self.index.orders {
759            if !self.orders.contains_key(client_order_id) {
760                log::error!(
761                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
762                );
763                error_count += 1;
764            }
765        }
766
767        for client_order_id in &self.index.orders_emulated {
768            if !self.orders.contains_key(client_order_id) {
769                log::error!(
770                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
771                );
772                error_count += 1;
773            }
774        }
775
776        for client_order_id in &self.index.orders_inflight {
777            if !self.orders.contains_key(client_order_id) {
778                log::error!(
779                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
780                );
781                error_count += 1;
782            }
783        }
784
785        for client_order_id in &self.index.orders_open {
786            if !self.orders.contains_key(client_order_id) {
787                log::error!(
788                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
789                );
790                error_count += 1;
791            }
792        }
793
794        for client_order_id in &self.index.orders_closed {
795            if !self.orders.contains_key(client_order_id) {
796                log::error!(
797                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
798                );
799                error_count += 1;
800            }
801        }
802
803        for position_id in &self.index.positions {
804            if !self.positions.contains_key(position_id) {
805                log::error!(
806                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
807                );
808                error_count += 1;
809            }
810        }
811
812        for position_id in &self.index.positions_open {
813            if !self.positions.contains_key(position_id) {
814                log::error!(
815                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
816                );
817                error_count += 1;
818            }
819        }
820
821        for position_id in &self.index.positions_closed {
822            if !self.positions.contains_key(position_id) {
823                log::error!(
824                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
825                );
826                error_count += 1;
827            }
828        }
829
830        for strategy_id in &self.index.strategies {
831            if !self.index.strategy_orders.contains_key(strategy_id) {
832                log::error!(
833                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
834                );
835                error_count += 1;
836            }
837        }
838
839        for exec_algorithm_id in &self.index.exec_algorithms {
840            if !self
841                .index
842                .exec_algorithm_orders
843                .contains_key(exec_algorithm_id)
844            {
845                log::error!(
846                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
847                );
848                error_count += 1;
849            }
850        }
851
852        let total_us = SystemTime::now()
853            .duration_since(UNIX_EPOCH)
854            .expect("Time went backwards")
855            .as_micros()
856            - timestamp_us;
857
858        if error_count == 0 {
859            log::info!("Integrity check passed in {total_us}μs");
860            true
861        } else {
862            log::error!(
863                "Integrity check failed with {error_count} error{} in {total_us}μs",
864                if error_count == 1 { "" } else { "s" },
865            );
866            false
867        }
868    }
869
870    /// Checks for any residual open state and log warnings if any are found.
871    ///
872    ///'Open state' is considered to be open orders and open positions.
873    #[must_use]
874    pub fn check_residuals(&self) -> bool {
875        log::debug!("Checking residuals");
876
877        let mut residuals = false;
878
879        // Check for any open orders
880        for order in self.orders_open(None, None, None, None) {
881            residuals = true;
882            log::warn!("Residual {order}");
883        }
884
885        // Check for any open positions
886        for position in self.positions_open(None, None, None, None) {
887            residuals = true;
888            log::warn!("Residual {position}");
889        }
890
891        residuals
892    }
893
894    /// Purges all closed orders from the cache that are older than `buffer_secs`.
895    ///
896    ///
897    /// Only orders that have been closed for at least this amount of time will be purged.
898    /// A value of 0 means purge all closed orders regardless of when they were closed.
899    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
900        log::debug!(
901            "Purging closed orders{}",
902            if buffer_secs > 0 {
903                format!(" with buffer_secs={buffer_secs}")
904            } else {
905                String::new()
906            }
907        );
908
909        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
910
911        'outer: for client_order_id in self.index.orders_closed.clone() {
912            if let Some(order) = self.orders.get(&client_order_id)
913                && order.is_closed()
914                && let Some(ts_closed) = order.ts_closed()
915                && ts_closed + buffer_ns <= ts_now
916            {
917                // Check any linked orders (contingency orders)
918                if let Some(linked_order_ids) = order.linked_order_ids() {
919                    for linked_order_id in linked_order_ids {
920                        if let Some(linked_order) = self.orders.get(linked_order_id)
921                            && linked_order.is_open()
922                        {
923                            // Do not purge if linked order still open
924                            continue 'outer;
925                        }
926                    }
927                }
928
929                self.purge_order(client_order_id);
930            }
931        }
932    }
933
934    /// Purges all closed positions from the cache that are older than `buffer_secs`.
935    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
936        log::debug!(
937            "Purging closed positions{}",
938            if buffer_secs > 0 {
939                format!(" with buffer_secs={buffer_secs}")
940            } else {
941                String::new()
942            }
943        );
944
945        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
946
947        for position_id in self.index.positions_closed.clone() {
948            if let Some(position) = self.positions.get(&position_id)
949                && position.is_closed()
950                && let Some(ts_closed) = position.ts_closed
951                && ts_closed + buffer_ns <= ts_now
952            {
953                self.purge_position(position_id);
954            }
955        }
956    }
957
958    /// Purges the order with the `client_order_id` from the cache (if found).
959    ///
960    /// For safety, an order is prevented from being purged if it's open.
961    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
962        // Check if order exists and is safe to purge before removing
963        let order = self.orders.get(&client_order_id).cloned();
964
965        // SAFETY: Prevent purging open orders
966        if let Some(ref ord) = order
967            && ord.is_open()
968        {
969            log::warn!("Order {client_order_id} found open when purging, skipping purge");
970            return;
971        }
972
973        // If order exists in cache, remove it and clean up order-specific indices
974        if let Some(ref ord) = order {
975            // Safe to purge
976            self.orders.remove(&client_order_id);
977
978            // Remove order from venue index
979            if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
980            {
981                venue_orders.remove(&client_order_id);
982            }
983
984            // Remove venue order ID index if exists
985            if let Some(venue_order_id) = ord.venue_order_id() {
986                self.index.venue_order_ids.remove(&venue_order_id);
987            }
988
989            // Remove from instrument orders index
990            if let Some(instrument_orders) =
991                self.index.instrument_orders.get_mut(&ord.instrument_id())
992            {
993                instrument_orders.remove(&client_order_id);
994            }
995
996            // Remove from position orders index if associated with a position
997            if let Some(position_id) = ord.position_id()
998                && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
999            {
1000                position_orders.remove(&client_order_id);
1001            }
1002
1003            // Remove from exec algorithm orders index if it has an exec algorithm
1004            if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
1005                && let Some(exec_algorithm_orders) =
1006                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
1007            {
1008                exec_algorithm_orders.remove(&client_order_id);
1009            }
1010
1011            // Clean up strategy orders reverse index
1012            if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1013                strategy_orders.remove(&client_order_id);
1014                if strategy_orders.is_empty() {
1015                    self.index.strategy_orders.remove(&ord.strategy_id());
1016                }
1017            }
1018
1019            // Clean up exec spawn reverse index (if this order is a spawned child)
1020            if let Some(exec_spawn_id) = ord.exec_spawn_id()
1021                && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1022            {
1023                spawn_orders.remove(&client_order_id);
1024                if spawn_orders.is_empty() {
1025                    self.index.exec_spawn_orders.remove(&exec_spawn_id);
1026                }
1027            }
1028
1029            log::info!("Purged order {client_order_id}");
1030        } else {
1031            log::warn!("Order {client_order_id} not found when purging");
1032        }
1033
1034        // Always clean up order indices (even if order was not in cache)
1035        self.index.order_position.remove(&client_order_id);
1036        let strategy_id = self.index.order_strategy.remove(&client_order_id);
1037        self.index.order_client.remove(&client_order_id);
1038        self.index.client_order_ids.remove(&client_order_id);
1039
1040        // Clean up reverse index when order not in cache (using forward index)
1041        if let Some(strategy_id) = strategy_id
1042            && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1043        {
1044            strategy_orders.remove(&client_order_id);
1045            if strategy_orders.is_empty() {
1046                self.index.strategy_orders.remove(&strategy_id);
1047            }
1048        }
1049
1050        // Remove spawn parent entry if this order was a spawn root
1051        self.index.exec_spawn_orders.remove(&client_order_id);
1052
1053        self.index.orders.remove(&client_order_id);
1054        self.index.orders_closed.remove(&client_order_id);
1055        self.index.orders_emulated.remove(&client_order_id);
1056        self.index.orders_inflight.remove(&client_order_id);
1057        self.index.orders_pending_cancel.remove(&client_order_id);
1058    }
1059
1060    /// Purges the position with the `position_id` from the cache (if found).
1061    ///
1062    /// For safety, a position is prevented from being purged if it's open.
1063    pub fn purge_position(&mut self, position_id: PositionId) {
1064        // Check if position exists and is safe to purge before removing
1065        let position = self.positions.get(&position_id).cloned();
1066
1067        // SAFETY: Prevent purging open positions
1068        if let Some(ref pos) = position
1069            && pos.is_open()
1070        {
1071            log::warn!("Position {position_id} found open when purging, skipping purge");
1072            return;
1073        }
1074
1075        // If position exists in cache, remove it and clean up position-specific indices
1076        if let Some(ref pos) = position {
1077            self.positions.remove(&position_id);
1078
1079            // Remove from venue positions index
1080            if let Some(venue_positions) =
1081                self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1082            {
1083                venue_positions.remove(&position_id);
1084            }
1085
1086            // Remove from instrument positions index
1087            if let Some(instrument_positions) =
1088                self.index.instrument_positions.get_mut(&pos.instrument_id)
1089            {
1090                instrument_positions.remove(&position_id);
1091            }
1092
1093            // Remove from strategy positions index
1094            if let Some(strategy_positions) =
1095                self.index.strategy_positions.get_mut(&pos.strategy_id)
1096            {
1097                strategy_positions.remove(&position_id);
1098            }
1099
1100            // Remove position ID from orders that reference it
1101            for client_order_id in pos.client_order_ids() {
1102                self.index.order_position.remove(&client_order_id);
1103            }
1104
1105            log::info!("Purged position {position_id}");
1106        } else {
1107            log::warn!("Position {position_id} not found when purging");
1108        }
1109
1110        // Always clean up position indices (even if position not in cache)
1111        self.index.position_strategy.remove(&position_id);
1112        self.index.position_orders.remove(&position_id);
1113        self.index.positions.remove(&position_id);
1114        self.index.positions_open.remove(&position_id);
1115        self.index.positions_closed.remove(&position_id);
1116
1117        // Always clean up position snapshots (even if position not in cache)
1118        self.position_snapshots.remove(&position_id);
1119    }
1120
1121    /// Purges all account state events which are outside the lookback window.
1122    ///
1123    /// Only events which are outside the lookback window will be purged.
1124    /// A value of 0 means purge all account state events.
1125    pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1126        log::debug!(
1127            "Purging account events{}",
1128            if lookback_secs > 0 {
1129                format!(" with lookback_secs={lookback_secs}")
1130            } else {
1131                String::new()
1132            }
1133        );
1134
1135        for account in self.accounts.values_mut() {
1136            let event_count = account.event_count();
1137            account.purge_account_events(ts_now, lookback_secs);
1138            let count_diff = event_count - account.event_count();
1139            if count_diff > 0 {
1140                log::info!(
1141                    "Purged {} event(s) from account {}",
1142                    count_diff,
1143                    account.id()
1144                );
1145            }
1146        }
1147    }
1148
1149    /// Clears the caches index.
1150    pub fn clear_index(&mut self) {
1151        self.index.clear();
1152        log::debug!("Cleared index");
1153    }
1154
1155    /// Resets the cache.
1156    ///
1157    /// All stateful fields are reset to their initial value.
1158    pub fn reset(&mut self) {
1159        log::debug!("Resetting cache");
1160
1161        self.general.clear();
1162        self.currencies.clear();
1163        self.instruments.clear();
1164        self.synthetics.clear();
1165        self.books.clear();
1166        self.own_books.clear();
1167        self.quotes.clear();
1168        self.trades.clear();
1169        self.mark_xrates.clear();
1170        self.mark_prices.clear();
1171        self.index_prices.clear();
1172        self.bars.clear();
1173        self.accounts.clear();
1174        self.orders.clear();
1175        self.order_lists.clear();
1176        self.positions.clear();
1177        self.position_snapshots.clear();
1178        self.greeks.clear();
1179        self.yield_curves.clear();
1180
1181        #[cfg(feature = "defi")]
1182        {
1183            self.defi.pools.clear();
1184            self.defi.pool_profilers.clear();
1185        }
1186
1187        self.clear_index();
1188
1189        log::info!("Reset cache");
1190    }
1191
1192    /// Dispose of the cache which will close any underlying database adapter.
1193    ///
1194    /// If closing the database connection fails, an error is logged.
1195    pub fn dispose(&mut self) {
1196        if let Some(database) = &mut self.database
1197            && let Err(e) = database.close()
1198        {
1199            log::error!("Failed to close database during dispose: {e}");
1200        }
1201    }
1202
1203    /// Flushes the caches database which permanently removes all persisted data.
1204    ///
1205    /// If flushing the database connection fails, an error is logged.
1206    pub fn flush_db(&mut self) {
1207        if let Some(database) = &mut self.database
1208            && let Err(e) = database.flush()
1209        {
1210            log::error!("Failed to flush database: {e}");
1211        }
1212    }
1213
1214    /// Adds a raw bytes `value` to the cache under the `key`.
1215    ///
1216    /// The cache stores only raw bytes; interpretation is the caller's responsibility.
1217    ///
1218    /// # Errors
1219    ///
1220    /// Returns an error if persisting the entry to the backing database fails.
1221    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1222        check_valid_string_ascii(key, stringify!(key))?;
1223        check_predicate_false(value.is_empty(), stringify!(value))?;
1224
1225        log::debug!("Adding general {key}");
1226        self.general.insert(key.to_string(), value.clone());
1227
1228        if let Some(database) = &mut self.database {
1229            database.add(key.to_string(), value)?;
1230        }
1231        Ok(())
1232    }
1233
1234    /// Adds an `OrderBook` to the cache.
1235    ///
1236    /// # Errors
1237    ///
1238    /// Returns an error if persisting the order book to the backing database fails.
1239    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1240        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1241
1242        if self.config.save_market_data
1243            && let Some(database) = &mut self.database
1244        {
1245            database.add_order_book(&book)?;
1246        }
1247
1248        self.books.insert(book.instrument_id, book);
1249        Ok(())
1250    }
1251
1252    /// Adds an `OwnOrderBook` to the cache.
1253    ///
1254    /// # Errors
1255    ///
1256    /// Returns an error if persisting the own order book fails.
1257    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1258        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1259
1260        self.own_books.insert(own_book.instrument_id, own_book);
1261        Ok(())
1262    }
1263
1264    /// Adds the `mark_price` update to the cache.
1265    ///
1266    /// # Errors
1267    ///
1268    /// Returns an error if persisting the mark price to the backing database fails.
1269    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1270        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1271
1272        if self.config.save_market_data {
1273            // TODO: Placeholder and return Result for consistency
1274        }
1275
1276        let mark_prices_deque = self
1277            .mark_prices
1278            .entry(mark_price.instrument_id)
1279            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1280        mark_prices_deque.push_front(mark_price);
1281        Ok(())
1282    }
1283
1284    /// Adds the `index_price` update to the cache.
1285    ///
1286    /// # Errors
1287    ///
1288    /// Returns an error if persisting the index price to the backing database fails.
1289    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1290        log::debug!(
1291            "Adding `IndexPriceUpdate` for {}",
1292            index_price.instrument_id
1293        );
1294
1295        if self.config.save_market_data {
1296            // TODO: Placeholder and return Result for consistency
1297        }
1298
1299        let index_prices_deque = self
1300            .index_prices
1301            .entry(index_price.instrument_id)
1302            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1303        index_prices_deque.push_front(index_price);
1304        Ok(())
1305    }
1306
1307    /// Adds the `funding_rate` update to the cache.
1308    ///
1309    /// # Errors
1310    ///
1311    /// Returns an error if persisting the funding rate update to the backing database fails.
1312    pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1313        log::debug!(
1314            "Adding `FundingRateUpdate` for {}",
1315            funding_rate.instrument_id
1316        );
1317
1318        if self.config.save_market_data {
1319            // TODO: Placeholder and return Result for consistency
1320        }
1321
1322        self.funding_rates
1323            .insert(funding_rate.instrument_id, funding_rate);
1324        Ok(())
1325    }
1326
1327    /// Adds the `quote` tick to the cache.
1328    ///
1329    /// # Errors
1330    ///
1331    /// Returns an error if persisting the quote tick to the backing database fails.
1332    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1333        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1334
1335        if self.config.save_market_data
1336            && let Some(database) = &mut self.database
1337        {
1338            database.add_quote(&quote)?;
1339        }
1340
1341        let quotes_deque = self
1342            .quotes
1343            .entry(quote.instrument_id)
1344            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1345        quotes_deque.push_front(quote);
1346        Ok(())
1347    }
1348
1349    /// Adds the `quotes` to the cache.
1350    ///
1351    /// # Errors
1352    ///
1353    /// Returns an error if persisting the quote ticks to the backing database fails.
1354    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1355        check_slice_not_empty(quotes, stringify!(quotes))?;
1356
1357        let instrument_id = quotes[0].instrument_id;
1358        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1359
1360        if self.config.save_market_data
1361            && let Some(database) = &mut self.database
1362        {
1363            for quote in quotes {
1364                database.add_quote(quote)?;
1365            }
1366        }
1367
1368        let quotes_deque = self
1369            .quotes
1370            .entry(instrument_id)
1371            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1372
1373        for quote in quotes {
1374            quotes_deque.push_front(*quote);
1375        }
1376        Ok(())
1377    }
1378
1379    /// Adds the `trade` tick to the cache.
1380    ///
1381    /// # Errors
1382    ///
1383    /// Returns an error if persisting the trade tick to the backing database fails.
1384    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1385        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1386
1387        if self.config.save_market_data
1388            && let Some(database) = &mut self.database
1389        {
1390            database.add_trade(&trade)?;
1391        }
1392
1393        let trades_deque = self
1394            .trades
1395            .entry(trade.instrument_id)
1396            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1397        trades_deque.push_front(trade);
1398        Ok(())
1399    }
1400
1401    /// Adds the give `trades` to the cache.
1402    ///
1403    /// # Errors
1404    ///
1405    /// Returns an error if persisting the trade ticks to the backing database fails.
1406    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1407        check_slice_not_empty(trades, stringify!(trades))?;
1408
1409        let instrument_id = trades[0].instrument_id;
1410        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1411
1412        if self.config.save_market_data
1413            && let Some(database) = &mut self.database
1414        {
1415            for trade in trades {
1416                database.add_trade(trade)?;
1417            }
1418        }
1419
1420        let trades_deque = self
1421            .trades
1422            .entry(instrument_id)
1423            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1424
1425        for trade in trades {
1426            trades_deque.push_front(*trade);
1427        }
1428        Ok(())
1429    }
1430
1431    /// Adds the `bar` to the cache.
1432    ///
1433    /// # Errors
1434    ///
1435    /// Returns an error if persisting the bar to the backing database fails.
1436    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1437        log::debug!("Adding `Bar` {}", bar.bar_type);
1438
1439        if self.config.save_market_data
1440            && let Some(database) = &mut self.database
1441        {
1442            database.add_bar(&bar)?;
1443        }
1444
1445        let bars = self
1446            .bars
1447            .entry(bar.bar_type)
1448            .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1449        bars.push_front(bar);
1450        Ok(())
1451    }
1452
1453    /// Adds the `bars` to the cache.
1454    ///
1455    /// # Errors
1456    ///
1457    /// Returns an error if persisting the bars to the backing database fails.
1458    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1459        check_slice_not_empty(bars, stringify!(bars))?;
1460
1461        let bar_type = bars[0].bar_type;
1462        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1463
1464        if self.config.save_market_data
1465            && let Some(database) = &mut self.database
1466        {
1467            for bar in bars {
1468                database.add_bar(bar)?;
1469            }
1470        }
1471
1472        let bars_deque = self
1473            .bars
1474            .entry(bar_type)
1475            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1476
1477        for bar in bars {
1478            bars_deque.push_front(*bar);
1479        }
1480        Ok(())
1481    }
1482
1483    /// Adds the `greeks` data to the cache.
1484    ///
1485    /// # Errors
1486    ///
1487    /// Returns an error if persisting the greeks data to the backing database fails.
1488    pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1489        log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1490
1491        if self.config.save_market_data
1492            && let Some(_database) = &mut self.database
1493        {
1494            // TODO: Implement database.add_greeks(&greeks) when database adapter is updated
1495        }
1496
1497        self.greeks.insert(greeks.instrument_id, greeks);
1498        Ok(())
1499    }
1500
1501    /// Gets the greeks data for the `instrument_id`.
1502    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1503        self.greeks.get(instrument_id).cloned()
1504    }
1505
1506    /// Adds the `yield_curve` data to the cache.
1507    ///
1508    /// # Errors
1509    ///
1510    /// Returns an error if persisting the yield curve data to the backing database fails.
1511    pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1512        log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1513
1514        if self.config.save_market_data
1515            && let Some(_database) = &mut self.database
1516        {
1517            // TODO: Implement database.add_yield_curve(&yield_curve) when database adapter is updated
1518        }
1519
1520        self.yield_curves
1521            .insert(yield_curve.curve_name.clone(), yield_curve);
1522        Ok(())
1523    }
1524
1525    /// Gets the yield curve for the `key`.
1526    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1527        self.yield_curves.get(key).map(|curve| {
1528            let curve_clone = curve.clone();
1529            Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1530                as Box<dyn Fn(f64) -> f64>
1531        })
1532    }
1533
1534    /// Adds the `currency` to the cache.
1535    ///
1536    /// # Errors
1537    ///
1538    /// Returns an error if persisting the currency to the backing database fails.
1539    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1540        if self.currencies.contains_key(&currency.code) {
1541            return Ok(());
1542        }
1543        log::debug!("Adding `Currency` {}", currency.code);
1544
1545        if let Some(database) = &mut self.database {
1546            database.add_currency(&currency)?;
1547        }
1548
1549        self.currencies.insert(currency.code, currency);
1550        Ok(())
1551    }
1552
1553    /// Adds the `instrument` to the cache.
1554    ///
1555    /// # Errors
1556    ///
1557    /// Returns an error if persisting the instrument to the backing database fails.
1558    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1559        log::debug!("Adding `Instrument` {}", instrument.id());
1560
1561        // Ensure currencies exist in cache - safe to call repeatedly as add_currency is idempotent
1562        if let Some(base_currency) = instrument.base_currency() {
1563            self.add_currency(base_currency)?;
1564        }
1565        self.add_currency(instrument.quote_currency())?;
1566        self.add_currency(instrument.settlement_currency())?;
1567
1568        if let Some(database) = &mut self.database {
1569            database.add_instrument(&instrument)?;
1570        }
1571
1572        self.instruments.insert(instrument.id(), instrument);
1573        Ok(())
1574    }
1575
1576    /// Adds the `synthetic` instrument to the cache.
1577    ///
1578    /// # Errors
1579    ///
1580    /// Returns an error if persisting the synthetic instrument to the backing database fails.
1581    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1582        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1583
1584        if let Some(database) = &mut self.database {
1585            database.add_synthetic(&synthetic)?;
1586        }
1587
1588        self.synthetics.insert(synthetic.id, synthetic);
1589        Ok(())
1590    }
1591
1592    /// Adds the `account` to the cache.
1593    ///
1594    /// # Errors
1595    ///
1596    /// Returns an error if persisting the account to the backing database fails.
1597    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1598        log::debug!("Adding `Account` {}", account.id());
1599
1600        if let Some(database) = &mut self.database {
1601            database.add_account(&account)?;
1602        }
1603
1604        let account_id = account.id();
1605        self.accounts.insert(account_id, account);
1606        self.index
1607            .venue_account
1608            .insert(account_id.get_issuer(), account_id);
1609        Ok(())
1610    }
1611
1612    /// Indexes the `client_order_id` with the `venue_order_id`.
1613    ///
1614    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
1615    ///
1616    /// # Errors
1617    ///
1618    /// Returns an error if the existing venue order ID conflicts and overwrite is false.
1619    pub fn add_venue_order_id(
1620        &mut self,
1621        client_order_id: &ClientOrderId,
1622        venue_order_id: &VenueOrderId,
1623        overwrite: bool,
1624    ) -> anyhow::Result<()> {
1625        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1626            && !overwrite
1627            && existing_venue_order_id != venue_order_id
1628        {
1629            anyhow::bail!(
1630                "Existing {existing_venue_order_id} for {client_order_id}
1631                    did not match the given {venue_order_id}.
1632                    If you are writing a test then try a different `venue_order_id`,
1633                    otherwise this is probably a bug."
1634            );
1635        }
1636
1637        self.index
1638            .client_order_ids
1639            .insert(*client_order_id, *venue_order_id);
1640        self.index
1641            .venue_order_ids
1642            .insert(*venue_order_id, *client_order_id);
1643
1644        Ok(())
1645    }
1646
1647    /// Adds the `order` to the cache indexed with any given identifiers.
1648    ///
1649    /// # Parameters
1650    ///
1651    /// `override_existing`: If the added order should 'override' any existing order and replace
1652    /// it in the cache. This is currently used for emulated orders which are
1653    /// being released and transformed into another type.
1654    ///
1655    /// # Errors
1656    ///
1657    /// Returns an error if not `replace_existing` and the `order.client_order_id` is already contained in the cache.
1658    pub fn add_order(
1659        &mut self,
1660        order: OrderAny,
1661        position_id: Option<PositionId>,
1662        client_id: Option<ClientId>,
1663        replace_existing: bool,
1664    ) -> anyhow::Result<()> {
1665        let instrument_id = order.instrument_id();
1666        let venue = instrument_id.venue;
1667        let client_order_id = order.client_order_id();
1668        let strategy_id = order.strategy_id();
1669        let exec_algorithm_id = order.exec_algorithm_id();
1670        let exec_spawn_id = order.exec_spawn_id();
1671
1672        if !replace_existing {
1673            check_key_not_in_map(
1674                &client_order_id,
1675                &self.orders,
1676                stringify!(client_order_id),
1677                stringify!(orders),
1678            )?;
1679        }
1680
1681        log::debug!("Adding {order:?}");
1682
1683        self.index.orders.insert(client_order_id);
1684        self.index
1685            .order_strategy
1686            .insert(client_order_id, strategy_id);
1687        self.index.strategies.insert(strategy_id);
1688
1689        // Update venue -> orders index
1690        self.index
1691            .venue_orders
1692            .entry(venue)
1693            .or_default()
1694            .insert(client_order_id);
1695
1696        // Update instrument -> orders index
1697        self.index
1698            .instrument_orders
1699            .entry(instrument_id)
1700            .or_default()
1701            .insert(client_order_id);
1702
1703        // Update strategy -> orders index
1704        self.index
1705            .strategy_orders
1706            .entry(strategy_id)
1707            .or_default()
1708            .insert(client_order_id);
1709
1710        // Update exec_algorithm -> orders index
1711        if let Some(exec_algorithm_id) = exec_algorithm_id {
1712            self.index.exec_algorithms.insert(exec_algorithm_id);
1713
1714            self.index
1715                .exec_algorithm_orders
1716                .entry(exec_algorithm_id)
1717                .or_default()
1718                .insert(client_order_id);
1719        }
1720
1721        // Update exec_spawn -> orders index
1722        if let Some(exec_spawn_id) = exec_spawn_id {
1723            self.index
1724                .exec_spawn_orders
1725                .entry(exec_spawn_id)
1726                .or_default()
1727                .insert(client_order_id);
1728        }
1729
1730        // Update emulation index
1731        match order.emulation_trigger() {
1732            Some(_) => {
1733                self.index.orders_emulated.remove(&client_order_id);
1734            }
1735            None => {
1736                self.index.orders_emulated.insert(client_order_id);
1737            }
1738        }
1739
1740        // Index position ID if provided
1741        if let Some(position_id) = position_id {
1742            self.add_position_id(
1743                &position_id,
1744                &order.instrument_id().venue,
1745                &client_order_id,
1746                &strategy_id,
1747            )?;
1748        }
1749
1750        // Index client ID if provided
1751        if let Some(client_id) = client_id {
1752            self.index.order_client.insert(client_order_id, client_id);
1753            log::debug!("Indexed {client_id:?}");
1754        }
1755
1756        if let Some(database) = &mut self.database {
1757            database.add_order(&order, client_id)?;
1758            // TODO: Implement
1759            // if self.config.snapshot_orders {
1760            //     database.snapshot_order_state(order)?;
1761            // }
1762        }
1763
1764        self.orders.insert(client_order_id, order);
1765
1766        Ok(())
1767    }
1768
1769    /// Indexes the `position_id` with the other given IDs.
1770    ///
1771    /// # Errors
1772    ///
1773    /// Returns an error if indexing position ID in the backing database fails.
1774    pub fn add_position_id(
1775        &mut self,
1776        position_id: &PositionId,
1777        venue: &Venue,
1778        client_order_id: &ClientOrderId,
1779        strategy_id: &StrategyId,
1780    ) -> anyhow::Result<()> {
1781        self.index
1782            .order_position
1783            .insert(*client_order_id, *position_id);
1784
1785        // Index: ClientOrderId -> PositionId
1786        if let Some(database) = &mut self.database {
1787            database.index_order_position(*client_order_id, *position_id)?;
1788        }
1789
1790        // Index: PositionId -> StrategyId
1791        self.index
1792            .position_strategy
1793            .insert(*position_id, *strategy_id);
1794
1795        // Index: PositionId -> set[ClientOrderId]
1796        self.index
1797            .position_orders
1798            .entry(*position_id)
1799            .or_default()
1800            .insert(*client_order_id);
1801
1802        // Index: StrategyId -> set[PositionId]
1803        self.index
1804            .strategy_positions
1805            .entry(*strategy_id)
1806            .or_default()
1807            .insert(*position_id);
1808
1809        // Index: Venue -> set[PositionId]
1810        self.index
1811            .venue_positions
1812            .entry(*venue)
1813            .or_default()
1814            .insert(*position_id);
1815
1816        Ok(())
1817    }
1818
1819    /// Adds the `position` to the cache.
1820    ///
1821    /// # Errors
1822    ///
1823    /// Returns an error if persisting the position to the backing database fails.
1824    pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1825        self.positions.insert(position.id, position.clone());
1826        self.index.positions.insert(position.id);
1827        self.index.positions_open.insert(position.id);
1828        self.index.positions_closed.remove(&position.id); // Cleanup for NETTING reopen
1829
1830        log::debug!("Adding {position}");
1831
1832        self.add_position_id(
1833            &position.id,
1834            &position.instrument_id.venue,
1835            &position.opening_order_id,
1836            &position.strategy_id,
1837        )?;
1838
1839        let venue = position.instrument_id.venue;
1840        let venue_positions = self.index.venue_positions.entry(venue).or_default();
1841        venue_positions.insert(position.id);
1842
1843        // Index: InstrumentId -> AHashSet
1844        let instrument_id = position.instrument_id;
1845        let instrument_positions = self
1846            .index
1847            .instrument_positions
1848            .entry(instrument_id)
1849            .or_default();
1850        instrument_positions.insert(position.id);
1851
1852        if let Some(database) = &mut self.database {
1853            database.add_position(&position)?;
1854            // TODO: Implement position snapshots
1855            // if self.snapshot_positions {
1856            //     database.snapshot_position_state(
1857            //         position,
1858            //         position.ts_last,
1859            //         self.calculate_unrealized_pnl(&position),
1860            //     )?;
1861            // }
1862        }
1863
1864        Ok(())
1865    }
1866
1867    /// Updates the `account` in the cache.
1868    ///
1869    /// # Errors
1870    ///
1871    /// Returns an error if updating the account in the database fails.
1872    pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1873        if let Some(database) = &mut self.database {
1874            database.update_account(&account)?;
1875        }
1876        Ok(())
1877    }
1878
1879    /// Updates the `order` in the cache.
1880    ///
1881    /// # Errors
1882    ///
1883    /// Returns an error if updating the order in the database fails.
1884    pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1885        let client_order_id = order.client_order_id();
1886
1887        // Update venue order ID
1888        if let Some(venue_order_id) = order.venue_order_id() {
1889            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
1890            // venues which use a cancel+replace update strategy.
1891            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1892                // TODO: If the last event was `OrderUpdated` then overwrite should be true
1893                self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1894            }
1895        }
1896
1897        // Update in-flight state
1898        if order.is_inflight() {
1899            self.index.orders_inflight.insert(client_order_id);
1900        } else {
1901            self.index.orders_inflight.remove(&client_order_id);
1902        }
1903
1904        // Update open/closed state
1905        if order.is_open() {
1906            self.index.orders_closed.remove(&client_order_id);
1907            self.index.orders_open.insert(client_order_id);
1908        } else if order.is_closed() {
1909            self.index.orders_open.remove(&client_order_id);
1910            self.index.orders_pending_cancel.remove(&client_order_id);
1911            self.index.orders_closed.insert(client_order_id);
1912        }
1913
1914        // Update emulation
1915        if let Some(emulation_trigger) = order.emulation_trigger() {
1916            match emulation_trigger {
1917                TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1918                _ => self.index.orders_emulated.insert(client_order_id),
1919            };
1920        }
1921
1922        // Update own book
1923        if self.own_order_book(&order.instrument_id()).is_some()
1924            && should_handle_own_book_order(order)
1925        {
1926            self.update_own_order_book(order);
1927        }
1928
1929        if let Some(database) = &mut self.database {
1930            database.update_order(order.last_event())?;
1931            // TODO: Implement order snapshots
1932            // if self.snapshot_orders {
1933            //     database.snapshot_order_state(order)?;
1934            // }
1935        }
1936
1937        // update the order in the cache
1938        self.orders.insert(client_order_id, order.clone());
1939
1940        Ok(())
1941    }
1942
1943    /// Updates the `order` as pending cancel locally.
1944    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1945        self.index
1946            .orders_pending_cancel
1947            .insert(order.client_order_id());
1948    }
1949
1950    /// Updates the `position` in the cache.
1951    ///
1952    /// # Errors
1953    ///
1954    /// Returns an error if updating the position in the database fails.
1955    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1956        // Update open/closed state
1957
1958        if position.is_open() {
1959            self.index.positions_open.insert(position.id);
1960            self.index.positions_closed.remove(&position.id);
1961        } else {
1962            self.index.positions_closed.insert(position.id);
1963            self.index.positions_open.remove(&position.id);
1964        }
1965
1966        if let Some(database) = &mut self.database {
1967            database.update_position(position)?;
1968            // TODO: Implement order snapshots
1969            // if self.snapshot_orders {
1970            //     database.snapshot_order_state(order)?;
1971            // }
1972        }
1973
1974        self.positions.insert(position.id, position.clone());
1975
1976        Ok(())
1977    }
1978
1979    /// Creates a snapshot of the `position` by cloning it, assigning a new ID,
1980    /// serializing it, and storing it in the position snapshots.
1981    ///
1982    /// # Errors
1983    ///
1984    /// Returns an error if serializing or storing the position snapshot fails.
1985    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1986        let position_id = position.id;
1987
1988        let mut copied_position = position.clone();
1989        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1990        copied_position.id = PositionId::new(new_id);
1991
1992        // Serialize the position (TODO: temporarily just to JSON to remove a dependency)
1993        let position_serialized = serde_json::to_vec(&copied_position)?;
1994
1995        let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1996        let new_snapshots = match snapshots {
1997            Some(existing_snapshots) => {
1998                let mut combined = existing_snapshots.to_vec();
1999                combined.extend(position_serialized);
2000                Bytes::from(combined)
2001            }
2002            None => Bytes::from(position_serialized),
2003        };
2004        self.position_snapshots.insert(position_id, new_snapshots);
2005
2006        log::debug!("Snapshot {copied_position}");
2007        Ok(())
2008    }
2009
2010    /// Creates a snapshot of the `position` state in the database.
2011    ///
2012    /// # Errors
2013    ///
2014    /// Returns an error if snapshotting the position state fails.
2015    pub fn snapshot_position_state(
2016        &mut self,
2017        position: &Position,
2018        // ts_snapshot: u64,
2019        // unrealized_pnl: Option<Money>,
2020        open_only: Option<bool>,
2021    ) -> anyhow::Result<()> {
2022        let open_only = open_only.unwrap_or(true);
2023
2024        if open_only && !position.is_open() {
2025            return Ok(());
2026        }
2027
2028        if let Some(database) = &mut self.database {
2029            database.snapshot_position_state(position).map_err(|e| {
2030                log::error!(
2031                    "Failed to snapshot position state for {}: {e:?}",
2032                    position.id
2033                );
2034                e
2035            })?;
2036        } else {
2037            log::warn!(
2038                "Cannot snapshot position state for {} (no database configured)",
2039                position.id
2040            );
2041        }
2042
2043        // Ok(())
2044        todo!()
2045    }
2046
2047    /// Gets the OMS type for the `position_id`.
2048    #[must_use]
2049    pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2050        // Get OMS type from the index
2051        if self.index.position_strategy.contains_key(position_id) {
2052            // For now, we'll default to NETTING
2053            // TODO: Store and retrieve actual OMS type per position
2054            Some(OmsType::Netting)
2055        } else {
2056            None
2057        }
2058    }
2059
2060    /// Gets position snapshot bytes for the `position_id`.
2061    #[must_use]
2062    pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
2063        self.position_snapshots.get(position_id).map(|b| b.to_vec())
2064    }
2065
2066    /// Gets position snapshot IDs for the `instrument_id`.
2067    #[must_use]
2068    pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2069        // Get snapshot position IDs that match the instrument
2070        let mut result = AHashSet::new();
2071        for (position_id, _) in &self.position_snapshots {
2072            // Check if this position is for the requested instrument
2073            if let Some(position) = self.positions.get(position_id)
2074                && position.instrument_id == *instrument_id
2075            {
2076                result.insert(*position_id);
2077            }
2078        }
2079        result
2080    }
2081
2082    /// Snapshots the `order` state in the database.
2083    ///
2084    /// # Errors
2085    ///
2086    /// Returns an error if snapshotting the order state fails.
2087    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2088        let database = if let Some(database) = &self.database {
2089            database
2090        } else {
2091            log::warn!(
2092                "Cannot snapshot order state for {} (no database configured)",
2093                order.client_order_id()
2094            );
2095            return Ok(());
2096        };
2097
2098        database.snapshot_order_state(order)
2099    }
2100
2101    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
2102
2103    fn build_order_query_filter_set(
2104        &self,
2105        venue: Option<&Venue>,
2106        instrument_id: Option<&InstrumentId>,
2107        strategy_id: Option<&StrategyId>,
2108    ) -> Option<AHashSet<ClientOrderId>> {
2109        let mut query: Option<AHashSet<ClientOrderId>> = None;
2110
2111        if let Some(venue) = venue {
2112            query = Some(
2113                self.index
2114                    .venue_orders
2115                    .get(venue)
2116                    .cloned()
2117                    .unwrap_or_default(),
2118            );
2119        }
2120
2121        if let Some(instrument_id) = instrument_id {
2122            let instrument_orders = self
2123                .index
2124                .instrument_orders
2125                .get(instrument_id)
2126                .cloned()
2127                .unwrap_or_default();
2128
2129            if let Some(existing_query) = &mut query {
2130                *existing_query = existing_query
2131                    .intersection(&instrument_orders)
2132                    .copied()
2133                    .collect();
2134            } else {
2135                query = Some(instrument_orders);
2136            }
2137        }
2138
2139        if let Some(strategy_id) = strategy_id {
2140            let strategy_orders = self
2141                .index
2142                .strategy_orders
2143                .get(strategy_id)
2144                .cloned()
2145                .unwrap_or_default();
2146
2147            if let Some(existing_query) = &mut query {
2148                *existing_query = existing_query
2149                    .intersection(&strategy_orders)
2150                    .copied()
2151                    .collect();
2152            } else {
2153                query = Some(strategy_orders);
2154            }
2155        }
2156
2157        query
2158    }
2159
2160    fn build_position_query_filter_set(
2161        &self,
2162        venue: Option<&Venue>,
2163        instrument_id: Option<&InstrumentId>,
2164        strategy_id: Option<&StrategyId>,
2165    ) -> Option<AHashSet<PositionId>> {
2166        let mut query: Option<AHashSet<PositionId>> = None;
2167
2168        if let Some(venue) = venue {
2169            query = Some(
2170                self.index
2171                    .venue_positions
2172                    .get(venue)
2173                    .cloned()
2174                    .unwrap_or_default(),
2175            );
2176        }
2177
2178        if let Some(instrument_id) = instrument_id {
2179            let instrument_positions = self
2180                .index
2181                .instrument_positions
2182                .get(instrument_id)
2183                .cloned()
2184                .unwrap_or_default();
2185
2186            if let Some(existing_query) = query {
2187                query = Some(
2188                    existing_query
2189                        .intersection(&instrument_positions)
2190                        .copied()
2191                        .collect(),
2192                );
2193            } else {
2194                query = Some(instrument_positions);
2195            }
2196        }
2197
2198        if let Some(strategy_id) = strategy_id {
2199            let strategy_positions = self
2200                .index
2201                .strategy_positions
2202                .get(strategy_id)
2203                .cloned()
2204                .unwrap_or_default();
2205
2206            if let Some(existing_query) = query {
2207                query = Some(
2208                    existing_query
2209                        .intersection(&strategy_positions)
2210                        .copied()
2211                        .collect(),
2212                );
2213            } else {
2214                query = Some(strategy_positions);
2215            }
2216        }
2217
2218        query
2219    }
2220
2221    /// Retrieves orders corresponding to the `client_order_ids`, optionally filtering by `side`.
2222    ///
2223    /// # Panics
2224    ///
2225    /// Panics if any `client_order_id` in the set is not found in the cache.
2226    fn get_orders_for_ids(
2227        &self,
2228        client_order_ids: &AHashSet<ClientOrderId>,
2229        side: Option<OrderSide>,
2230    ) -> Vec<&OrderAny> {
2231        let side = side.unwrap_or(OrderSide::NoOrderSide);
2232        let mut orders = Vec::new();
2233
2234        for client_order_id in client_order_ids {
2235            let order = self
2236                .orders
2237                .get(client_order_id)
2238                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2239            if side == OrderSide::NoOrderSide || side == order.order_side() {
2240                orders.push(order);
2241            }
2242        }
2243
2244        orders
2245    }
2246
2247    /// Retrieves positions corresponding to the `position_ids`, optionally filtering by `side`.
2248    ///
2249    /// # Panics
2250    ///
2251    /// Panics if any `position_id` in the set is not found in the cache.
2252    fn get_positions_for_ids(
2253        &self,
2254        position_ids: &AHashSet<PositionId>,
2255        side: Option<PositionSide>,
2256    ) -> Vec<&Position> {
2257        let side = side.unwrap_or(PositionSide::NoPositionSide);
2258        let mut positions = Vec::new();
2259
2260        for position_id in position_ids {
2261            let position = self
2262                .positions
2263                .get(position_id)
2264                .unwrap_or_else(|| panic!("Position {position_id} not found"));
2265            if side == PositionSide::NoPositionSide || side == position.side {
2266                positions.push(position);
2267            }
2268        }
2269
2270        positions
2271    }
2272
2273    /// Returns the `ClientOrderId`s of all orders.
2274    #[must_use]
2275    pub fn client_order_ids(
2276        &self,
2277        venue: Option<&Venue>,
2278        instrument_id: Option<&InstrumentId>,
2279        strategy_id: Option<&StrategyId>,
2280    ) -> AHashSet<ClientOrderId> {
2281        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2282        match query {
2283            Some(query) => self.index.orders.intersection(&query).copied().collect(),
2284            None => self.index.orders.clone(),
2285        }
2286    }
2287
2288    /// Returns the `ClientOrderId`s of all open orders.
2289    #[must_use]
2290    pub fn client_order_ids_open(
2291        &self,
2292        venue: Option<&Venue>,
2293        instrument_id: Option<&InstrumentId>,
2294        strategy_id: Option<&StrategyId>,
2295    ) -> AHashSet<ClientOrderId> {
2296        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2297        match query {
2298            Some(query) => self
2299                .index
2300                .orders_open
2301                .intersection(&query)
2302                .copied()
2303                .collect(),
2304            None => self.index.orders_open.clone(),
2305        }
2306    }
2307
2308    /// Returns the `ClientOrderId`s of all closed orders.
2309    #[must_use]
2310    pub fn client_order_ids_closed(
2311        &self,
2312        venue: Option<&Venue>,
2313        instrument_id: Option<&InstrumentId>,
2314        strategy_id: Option<&StrategyId>,
2315    ) -> AHashSet<ClientOrderId> {
2316        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2317        match query {
2318            Some(query) => self
2319                .index
2320                .orders_closed
2321                .intersection(&query)
2322                .copied()
2323                .collect(),
2324            None => self.index.orders_closed.clone(),
2325        }
2326    }
2327
2328    /// Returns the `ClientOrderId`s of all emulated orders.
2329    #[must_use]
2330    pub fn client_order_ids_emulated(
2331        &self,
2332        venue: Option<&Venue>,
2333        instrument_id: Option<&InstrumentId>,
2334        strategy_id: Option<&StrategyId>,
2335    ) -> AHashSet<ClientOrderId> {
2336        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2337        match query {
2338            Some(query) => self
2339                .index
2340                .orders_emulated
2341                .intersection(&query)
2342                .copied()
2343                .collect(),
2344            None => self.index.orders_emulated.clone(),
2345        }
2346    }
2347
2348    /// Returns the `ClientOrderId`s of all in-flight orders.
2349    #[must_use]
2350    pub fn client_order_ids_inflight(
2351        &self,
2352        venue: Option<&Venue>,
2353        instrument_id: Option<&InstrumentId>,
2354        strategy_id: Option<&StrategyId>,
2355    ) -> AHashSet<ClientOrderId> {
2356        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2357        match query {
2358            Some(query) => self
2359                .index
2360                .orders_inflight
2361                .intersection(&query)
2362                .copied()
2363                .collect(),
2364            None => self.index.orders_inflight.clone(),
2365        }
2366    }
2367
2368    /// Returns `PositionId`s of all positions.
2369    #[must_use]
2370    pub fn position_ids(
2371        &self,
2372        venue: Option<&Venue>,
2373        instrument_id: Option<&InstrumentId>,
2374        strategy_id: Option<&StrategyId>,
2375    ) -> AHashSet<PositionId> {
2376        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2377        match query {
2378            Some(query) => self.index.positions.intersection(&query).copied().collect(),
2379            None => self.index.positions.clone(),
2380        }
2381    }
2382
2383    /// Returns the `PositionId`s of all open positions.
2384    #[must_use]
2385    pub fn position_open_ids(
2386        &self,
2387        venue: Option<&Venue>,
2388        instrument_id: Option<&InstrumentId>,
2389        strategy_id: Option<&StrategyId>,
2390    ) -> AHashSet<PositionId> {
2391        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2392        match query {
2393            Some(query) => self
2394                .index
2395                .positions_open
2396                .intersection(&query)
2397                .copied()
2398                .collect(),
2399            None => self.index.positions_open.clone(),
2400        }
2401    }
2402
2403    /// Returns the `PositionId`s of all closed positions.
2404    #[must_use]
2405    pub fn position_closed_ids(
2406        &self,
2407        venue: Option<&Venue>,
2408        instrument_id: Option<&InstrumentId>,
2409        strategy_id: Option<&StrategyId>,
2410    ) -> AHashSet<PositionId> {
2411        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2412        match query {
2413            Some(query) => self
2414                .index
2415                .positions_closed
2416                .intersection(&query)
2417                .copied()
2418                .collect(),
2419            None => self.index.positions_closed.clone(),
2420        }
2421    }
2422
2423    /// Returns the `ComponentId`s of all actors.
2424    #[must_use]
2425    pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2426        self.index.actors.clone()
2427    }
2428
2429    /// Returns the `StrategyId`s of all strategies.
2430    #[must_use]
2431    pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2432        self.index.strategies.clone()
2433    }
2434
2435    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
2436    #[must_use]
2437    pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2438        self.index.exec_algorithms.clone()
2439    }
2440
2441    // -- ORDER QUERIES ---------------------------------------------------------------------------
2442
2443    /// Gets a reference to the order with the `client_order_id` (if found).
2444    #[must_use]
2445    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2446        self.orders.get(client_order_id)
2447    }
2448
2449    /// Gets a reference to the order with the `client_order_id` (if found).
2450    #[must_use]
2451    pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2452        self.orders.get_mut(client_order_id)
2453    }
2454
2455    /// Gets a reference to the client order ID for the `venue_order_id` (if found).
2456    #[must_use]
2457    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2458        self.index.venue_order_ids.get(venue_order_id)
2459    }
2460
2461    /// Gets a reference to the venue order ID for the `client_order_id` (if found).
2462    #[must_use]
2463    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2464        self.index.client_order_ids.get(client_order_id)
2465    }
2466
2467    /// Gets a reference to the client ID indexed for then `client_order_id` (if found).
2468    #[must_use]
2469    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2470        self.index.order_client.get(client_order_id)
2471    }
2472
2473    /// Returns references to all orders matching the optional filter parameters.
2474    #[must_use]
2475    pub fn orders(
2476        &self,
2477        venue: Option<&Venue>,
2478        instrument_id: Option<&InstrumentId>,
2479        strategy_id: Option<&StrategyId>,
2480        side: Option<OrderSide>,
2481    ) -> Vec<&OrderAny> {
2482        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2483        self.get_orders_for_ids(&client_order_ids, side)
2484    }
2485
2486    /// Returns references to all open orders matching the optional filter parameters.
2487    #[must_use]
2488    pub fn orders_open(
2489        &self,
2490        venue: Option<&Venue>,
2491        instrument_id: Option<&InstrumentId>,
2492        strategy_id: Option<&StrategyId>,
2493        side: Option<OrderSide>,
2494    ) -> Vec<&OrderAny> {
2495        let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2496        self.get_orders_for_ids(&client_order_ids, side)
2497    }
2498
2499    /// Returns references to all closed orders matching the optional filter parameters.
2500    #[must_use]
2501    pub fn orders_closed(
2502        &self,
2503        venue: Option<&Venue>,
2504        instrument_id: Option<&InstrumentId>,
2505        strategy_id: Option<&StrategyId>,
2506        side: Option<OrderSide>,
2507    ) -> Vec<&OrderAny> {
2508        let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2509        self.get_orders_for_ids(&client_order_ids, side)
2510    }
2511
2512    /// Returns references to all emulated orders matching the optional filter parameters.
2513    #[must_use]
2514    pub fn orders_emulated(
2515        &self,
2516        venue: Option<&Venue>,
2517        instrument_id: Option<&InstrumentId>,
2518        strategy_id: Option<&StrategyId>,
2519        side: Option<OrderSide>,
2520    ) -> Vec<&OrderAny> {
2521        let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2522        self.get_orders_for_ids(&client_order_ids, side)
2523    }
2524
2525    /// Returns references to all in-flight orders matching the optional filter parameters.
2526    #[must_use]
2527    pub fn orders_inflight(
2528        &self,
2529        venue: Option<&Venue>,
2530        instrument_id: Option<&InstrumentId>,
2531        strategy_id: Option<&StrategyId>,
2532        side: Option<OrderSide>,
2533    ) -> Vec<&OrderAny> {
2534        let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2535        self.get_orders_for_ids(&client_order_ids, side)
2536    }
2537
2538    /// Returns references to all orders for the `position_id`.
2539    #[must_use]
2540    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2541        let client_order_ids = self.index.position_orders.get(position_id);
2542        match client_order_ids {
2543            Some(client_order_ids) => {
2544                self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2545            }
2546            None => Vec::new(),
2547        }
2548    }
2549
2550    /// Returns whether an order with the `client_order_id` exists.
2551    #[must_use]
2552    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2553        self.index.orders.contains(client_order_id)
2554    }
2555
2556    /// Returns whether an order with the `client_order_id` is open.
2557    #[must_use]
2558    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2559        self.index.orders_open.contains(client_order_id)
2560    }
2561
2562    /// Returns whether an order with the `client_order_id` is closed.
2563    #[must_use]
2564    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2565        self.index.orders_closed.contains(client_order_id)
2566    }
2567
2568    /// Returns whether an order with the `client_order_id` is emulated.
2569    #[must_use]
2570    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2571        self.index.orders_emulated.contains(client_order_id)
2572    }
2573
2574    /// Returns whether an order with the `client_order_id` is in-flight.
2575    #[must_use]
2576    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2577        self.index.orders_inflight.contains(client_order_id)
2578    }
2579
2580    /// Returns whether an order with the `client_order_id` is `PENDING_CANCEL` locally.
2581    #[must_use]
2582    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2583        self.index.orders_pending_cancel.contains(client_order_id)
2584    }
2585
2586    /// Returns the count of all open orders.
2587    #[must_use]
2588    pub fn orders_open_count(
2589        &self,
2590        venue: Option<&Venue>,
2591        instrument_id: Option<&InstrumentId>,
2592        strategy_id: Option<&StrategyId>,
2593        side: Option<OrderSide>,
2594    ) -> usize {
2595        self.orders_open(venue, instrument_id, strategy_id, side)
2596            .len()
2597    }
2598
2599    /// Returns the count of all closed orders.
2600    #[must_use]
2601    pub fn orders_closed_count(
2602        &self,
2603        venue: Option<&Venue>,
2604        instrument_id: Option<&InstrumentId>,
2605        strategy_id: Option<&StrategyId>,
2606        side: Option<OrderSide>,
2607    ) -> usize {
2608        self.orders_closed(venue, instrument_id, strategy_id, side)
2609            .len()
2610    }
2611
2612    /// Returns the count of all emulated orders.
2613    #[must_use]
2614    pub fn orders_emulated_count(
2615        &self,
2616        venue: Option<&Venue>,
2617        instrument_id: Option<&InstrumentId>,
2618        strategy_id: Option<&StrategyId>,
2619        side: Option<OrderSide>,
2620    ) -> usize {
2621        self.orders_emulated(venue, instrument_id, strategy_id, side)
2622            .len()
2623    }
2624
2625    /// Returns the count of all in-flight orders.
2626    #[must_use]
2627    pub fn orders_inflight_count(
2628        &self,
2629        venue: Option<&Venue>,
2630        instrument_id: Option<&InstrumentId>,
2631        strategy_id: Option<&StrategyId>,
2632        side: Option<OrderSide>,
2633    ) -> usize {
2634        self.orders_inflight(venue, instrument_id, strategy_id, side)
2635            .len()
2636    }
2637
2638    /// Returns the count of all orders.
2639    #[must_use]
2640    pub fn orders_total_count(
2641        &self,
2642        venue: Option<&Venue>,
2643        instrument_id: Option<&InstrumentId>,
2644        strategy_id: Option<&StrategyId>,
2645        side: Option<OrderSide>,
2646    ) -> usize {
2647        self.orders(venue, instrument_id, strategy_id, side).len()
2648    }
2649
2650    /// Returns the order list for the `order_list_id`.
2651    #[must_use]
2652    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2653        self.order_lists.get(order_list_id)
2654    }
2655
2656    /// Returns all order lists matching the optional filter parameters.
2657    #[must_use]
2658    pub fn order_lists(
2659        &self,
2660        venue: Option<&Venue>,
2661        instrument_id: Option<&InstrumentId>,
2662        strategy_id: Option<&StrategyId>,
2663    ) -> Vec<&OrderList> {
2664        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2665
2666        if let Some(venue) = venue {
2667            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2668        }
2669
2670        if let Some(instrument_id) = instrument_id {
2671            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2672        }
2673
2674        if let Some(strategy_id) = strategy_id {
2675            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2676        }
2677
2678        order_lists
2679    }
2680
2681    /// Returns whether an order list with the `order_list_id` exists.
2682    #[must_use]
2683    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2684        self.order_lists.contains_key(order_list_id)
2685    }
2686
2687    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
2688
2689    /// Returns references to all orders associated with the `exec_algorithm_id` matching the
2690    /// optional filter parameters.
2691    #[must_use]
2692    pub fn orders_for_exec_algorithm(
2693        &self,
2694        exec_algorithm_id: &ExecAlgorithmId,
2695        venue: Option<&Venue>,
2696        instrument_id: Option<&InstrumentId>,
2697        strategy_id: Option<&StrategyId>,
2698        side: Option<OrderSide>,
2699    ) -> Vec<&OrderAny> {
2700        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2701        let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2702
2703        if let Some(query) = query
2704            && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
2705        {
2706            let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2707        }
2708
2709        if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2710            self.get_orders_for_ids(exec_algorithm_order_ids, side)
2711        } else {
2712            Vec::new()
2713        }
2714    }
2715
2716    /// Returns references to all orders with the `exec_spawn_id`.
2717    #[must_use]
2718    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2719        self.get_orders_for_ids(
2720            self.index
2721                .exec_spawn_orders
2722                .get(exec_spawn_id)
2723                .unwrap_or(&AHashSet::new()),
2724            None,
2725        )
2726    }
2727
2728    /// Returns the total order quantity for the `exec_spawn_id`.
2729    #[must_use]
2730    pub fn exec_spawn_total_quantity(
2731        &self,
2732        exec_spawn_id: &ClientOrderId,
2733        active_only: bool,
2734    ) -> Option<Quantity> {
2735        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2736
2737        let mut total_quantity: Option<Quantity> = None;
2738
2739        for spawn_order in exec_spawn_orders {
2740            if active_only && spawn_order.is_closed() {
2741                continue;
2742            }
2743
2744            match total_quantity.as_mut() {
2745                Some(total) => *total += spawn_order.quantity(),
2746                None => total_quantity = Some(spawn_order.quantity()),
2747            }
2748        }
2749
2750        total_quantity
2751    }
2752
2753    /// Returns the total filled quantity for all orders with the `exec_spawn_id`.
2754    #[must_use]
2755    pub fn exec_spawn_total_filled_qty(
2756        &self,
2757        exec_spawn_id: &ClientOrderId,
2758        active_only: bool,
2759    ) -> Option<Quantity> {
2760        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2761
2762        let mut total_quantity: Option<Quantity> = None;
2763
2764        for spawn_order in exec_spawn_orders {
2765            if active_only && spawn_order.is_closed() {
2766                continue;
2767            }
2768
2769            match total_quantity.as_mut() {
2770                Some(total) => *total += spawn_order.filled_qty(),
2771                None => total_quantity = Some(spawn_order.filled_qty()),
2772            }
2773        }
2774
2775        total_quantity
2776    }
2777
2778    /// Returns the total leaves quantity for all orders with the `exec_spawn_id`.
2779    #[must_use]
2780    pub fn exec_spawn_total_leaves_qty(
2781        &self,
2782        exec_spawn_id: &ClientOrderId,
2783        active_only: bool,
2784    ) -> Option<Quantity> {
2785        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2786
2787        let mut total_quantity: Option<Quantity> = None;
2788
2789        for spawn_order in exec_spawn_orders {
2790            if active_only && spawn_order.is_closed() {
2791                continue;
2792            }
2793
2794            match total_quantity.as_mut() {
2795                Some(total) => *total += spawn_order.leaves_qty(),
2796                None => total_quantity = Some(spawn_order.leaves_qty()),
2797            }
2798        }
2799
2800        total_quantity
2801    }
2802
2803    // -- POSITION QUERIES ------------------------------------------------------------------------
2804
2805    /// Returns a reference to the position with the `position_id` (if found).
2806    #[must_use]
2807    pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2808        self.positions.get(position_id)
2809    }
2810
2811    /// Returns a reference to the position for the `client_order_id` (if found).
2812    #[must_use]
2813    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2814        self.index
2815            .order_position
2816            .get(client_order_id)
2817            .and_then(|position_id| self.positions.get(position_id))
2818    }
2819
2820    /// Returns a reference to the position ID for the `client_order_id` (if found).
2821    #[must_use]
2822    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2823        self.index.order_position.get(client_order_id)
2824    }
2825
2826    /// Returns a reference to all positions matching the optional filter parameters.
2827    #[must_use]
2828    pub fn positions(
2829        &self,
2830        venue: Option<&Venue>,
2831        instrument_id: Option<&InstrumentId>,
2832        strategy_id: Option<&StrategyId>,
2833        side: Option<PositionSide>,
2834    ) -> Vec<&Position> {
2835        let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2836        self.get_positions_for_ids(&position_ids, side)
2837    }
2838
2839    /// Returns a reference to all open positions matching the optional filter parameters.
2840    #[must_use]
2841    pub fn positions_open(
2842        &self,
2843        venue: Option<&Venue>,
2844        instrument_id: Option<&InstrumentId>,
2845        strategy_id: Option<&StrategyId>,
2846        side: Option<PositionSide>,
2847    ) -> Vec<&Position> {
2848        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2849        self.get_positions_for_ids(&position_ids, side)
2850    }
2851
2852    /// Returns a reference to all closed positions matching the optional filter parameters.
2853    #[must_use]
2854    pub fn positions_closed(
2855        &self,
2856        venue: Option<&Venue>,
2857        instrument_id: Option<&InstrumentId>,
2858        strategy_id: Option<&StrategyId>,
2859        side: Option<PositionSide>,
2860    ) -> Vec<&Position> {
2861        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2862        self.get_positions_for_ids(&position_ids, side)
2863    }
2864
2865    /// Returns whether a position with the `position_id` exists.
2866    #[must_use]
2867    pub fn position_exists(&self, position_id: &PositionId) -> bool {
2868        self.index.positions.contains(position_id)
2869    }
2870
2871    /// Returns whether a position with the `position_id` is open.
2872    #[must_use]
2873    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2874        self.index.positions_open.contains(position_id)
2875    }
2876
2877    /// Returns whether a position with the `position_id` is closed.
2878    #[must_use]
2879    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2880        self.index.positions_closed.contains(position_id)
2881    }
2882
2883    /// Returns the count of all open positions.
2884    #[must_use]
2885    pub fn positions_open_count(
2886        &self,
2887        venue: Option<&Venue>,
2888        instrument_id: Option<&InstrumentId>,
2889        strategy_id: Option<&StrategyId>,
2890        side: Option<PositionSide>,
2891    ) -> usize {
2892        self.positions_open(venue, instrument_id, strategy_id, side)
2893            .len()
2894    }
2895
2896    /// Returns the count of all closed positions.
2897    #[must_use]
2898    pub fn positions_closed_count(
2899        &self,
2900        venue: Option<&Venue>,
2901        instrument_id: Option<&InstrumentId>,
2902        strategy_id: Option<&StrategyId>,
2903        side: Option<PositionSide>,
2904    ) -> usize {
2905        self.positions_closed(venue, instrument_id, strategy_id, side)
2906            .len()
2907    }
2908
2909    /// Returns the count of all positions.
2910    #[must_use]
2911    pub fn positions_total_count(
2912        &self,
2913        venue: Option<&Venue>,
2914        instrument_id: Option<&InstrumentId>,
2915        strategy_id: Option<&StrategyId>,
2916        side: Option<PositionSide>,
2917    ) -> usize {
2918        self.positions(venue, instrument_id, strategy_id, side)
2919            .len()
2920    }
2921
2922    // -- STRATEGY QUERIES ------------------------------------------------------------------------
2923
2924    /// Gets a reference to the strategy ID for the `client_order_id` (if found).
2925    #[must_use]
2926    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2927        self.index.order_strategy.get(client_order_id)
2928    }
2929
2930    /// Gets a reference to the strategy ID for the `position_id` (if found).
2931    #[must_use]
2932    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2933        self.index.position_strategy.get(position_id)
2934    }
2935
2936    // -- GENERAL ---------------------------------------------------------------------------------
2937
2938    /// Gets a reference to the general value for the `key` (if found).
2939    ///
2940    /// # Errors
2941    ///
2942    /// Returns an error if the `key` is invalid.
2943    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2944        check_valid_string_ascii(key, stringify!(key))?;
2945
2946        Ok(self.general.get(key))
2947    }
2948
2949    // -- DATA QUERIES ----------------------------------------------------------------------------
2950
2951    /// Returns the price for the `instrument_id` and `price_type` (if found).
2952    #[must_use]
2953    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2954        match price_type {
2955            PriceType::Bid => self
2956                .quotes
2957                .get(instrument_id)
2958                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2959            PriceType::Ask => self
2960                .quotes
2961                .get(instrument_id)
2962                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2963            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2964                quotes.front().map(|quote| {
2965                    Price::new(
2966                        f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2967                        quote.bid_price.precision + 1,
2968                    )
2969                })
2970            }),
2971            PriceType::Last => self
2972                .trades
2973                .get(instrument_id)
2974                .and_then(|trades| trades.front().map(|trade| trade.price)),
2975            PriceType::Mark => self
2976                .mark_prices
2977                .get(instrument_id)
2978                .and_then(|marks| marks.front().map(|mark| mark.value)),
2979        }
2980    }
2981
2982    /// Gets all quotes for the `instrument_id`.
2983    #[must_use]
2984    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2985        self.quotes
2986            .get(instrument_id)
2987            .map(|quotes| quotes.iter().copied().collect())
2988    }
2989
2990    /// Gets all trades for the `instrument_id`.
2991    #[must_use]
2992    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2993        self.trades
2994            .get(instrument_id)
2995            .map(|trades| trades.iter().copied().collect())
2996    }
2997
2998    /// Gets all mark price updates for the `instrument_id`.
2999    #[must_use]
3000    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
3001        self.mark_prices
3002            .get(instrument_id)
3003            .map(|mark_prices| mark_prices.iter().copied().collect())
3004    }
3005
3006    /// Gets all index price updates for the `instrument_id`.
3007    #[must_use]
3008    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
3009        self.index_prices
3010            .get(instrument_id)
3011            .map(|index_prices| index_prices.iter().copied().collect())
3012    }
3013
3014    /// Gets all bars for the `bar_type`.
3015    #[must_use]
3016    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
3017        self.bars
3018            .get(bar_type)
3019            .map(|bars| bars.iter().copied().collect())
3020    }
3021
3022    /// Gets a reference to the order book for the `instrument_id`.
3023    #[must_use]
3024    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3025        self.books.get(instrument_id)
3026    }
3027
3028    /// Gets a reference to the order book for the `instrument_id`.
3029    #[must_use]
3030    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3031        self.books.get_mut(instrument_id)
3032    }
3033
3034    /// Gets a reference to the own order book for the `instrument_id`.
3035    #[must_use]
3036    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3037        self.own_books.get(instrument_id)
3038    }
3039
3040    /// Gets a reference to the own order book for the `instrument_id`.
3041    #[must_use]
3042    pub fn own_order_book_mut(
3043        &mut self,
3044        instrument_id: &InstrumentId,
3045    ) -> Option<&mut OwnOrderBook> {
3046        self.own_books.get_mut(instrument_id)
3047    }
3048
3049    /// Gets a reference to the latest quote for the `instrument_id`.
3050    #[must_use]
3051    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3052        self.quotes
3053            .get(instrument_id)
3054            .and_then(|quotes| quotes.front())
3055    }
3056
3057    /// Gets a reference to the latest trade for the `instrument_id`.
3058    #[must_use]
3059    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3060        self.trades
3061            .get(instrument_id)
3062            .and_then(|trades| trades.front())
3063    }
3064
3065    /// Gets a reference to the latest mark price update for the `instrument_id`.
3066    #[must_use]
3067    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3068        self.mark_prices
3069            .get(instrument_id)
3070            .and_then(|mark_prices| mark_prices.front())
3071    }
3072
3073    /// Gets a reference to the latest index price update for the `instrument_id`.
3074    #[must_use]
3075    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3076        self.index_prices
3077            .get(instrument_id)
3078            .and_then(|index_prices| index_prices.front())
3079    }
3080
3081    /// Gets a reference to the funding rate update for the `instrument_id`.
3082    #[must_use]
3083    pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3084        self.funding_rates.get(instrument_id)
3085    }
3086
3087    /// Gets a reference to the latest bar for the `bar_type`.
3088    #[must_use]
3089    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3090        self.bars.get(bar_type).and_then(|bars| bars.front())
3091    }
3092
3093    /// Gets the order book update count for the `instrument_id`.
3094    #[must_use]
3095    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3096        self.books
3097            .get(instrument_id)
3098            .map_or(0, |book| book.update_count) as usize
3099    }
3100
3101    /// Gets the quote tick count for the `instrument_id`.
3102    #[must_use]
3103    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3104        self.quotes
3105            .get(instrument_id)
3106            .map_or(0, std::collections::VecDeque::len)
3107    }
3108
3109    /// Gets the trade tick count for the `instrument_id`.
3110    #[must_use]
3111    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3112        self.trades
3113            .get(instrument_id)
3114            .map_or(0, std::collections::VecDeque::len)
3115    }
3116
3117    /// Gets the bar count for the `instrument_id`.
3118    #[must_use]
3119    pub fn bar_count(&self, bar_type: &BarType) -> usize {
3120        self.bars
3121            .get(bar_type)
3122            .map_or(0, std::collections::VecDeque::len)
3123    }
3124
3125    /// Returns whether the cache contains an order book for the `instrument_id`.
3126    #[must_use]
3127    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3128        self.books.contains_key(instrument_id)
3129    }
3130
3131    /// Returns whether the cache contains quotes for the `instrument_id`.
3132    #[must_use]
3133    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3134        self.quote_count(instrument_id) > 0
3135    }
3136
3137    /// Returns whether the cache contains trades for the `instrument_id`.
3138    #[must_use]
3139    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3140        self.trade_count(instrument_id) > 0
3141    }
3142
3143    /// Returns whether the cache contains bars for the `bar_type`.
3144    #[must_use]
3145    pub fn has_bars(&self, bar_type: &BarType) -> bool {
3146        self.bar_count(bar_type) > 0
3147    }
3148
3149    #[must_use]
3150    pub fn get_xrate(
3151        &self,
3152        venue: Venue,
3153        from_currency: Currency,
3154        to_currency: Currency,
3155        price_type: PriceType,
3156    ) -> Option<f64> {
3157        if from_currency == to_currency {
3158            // When the source and target currencies are identical,
3159            // no conversion is needed; return an exchange rate of 1.0.
3160            return Some(1.0);
3161        }
3162
3163        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3164
3165        match get_exchange_rate(
3166            from_currency.code,
3167            to_currency.code,
3168            price_type,
3169            bid_quote,
3170            ask_quote,
3171        ) {
3172            Ok(rate) => rate,
3173            Err(e) => {
3174                log::error!("Failed to calculate xrate: {e}");
3175                None
3176            }
3177        }
3178    }
3179
3180    fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3181        let mut bid_quotes = AHashMap::new();
3182        let mut ask_quotes = AHashMap::new();
3183
3184        for instrument_id in self.instruments.keys() {
3185            if instrument_id.venue != *venue {
3186                continue;
3187            }
3188
3189            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3190                if let Some(tick) = ticks.front() {
3191                    (tick.bid_price, tick.ask_price)
3192                } else {
3193                    continue; // Empty ticks vector
3194                }
3195            } else {
3196                let bid_bar = self
3197                    .bars
3198                    .iter()
3199                    .find(|(k, _)| {
3200                        k.instrument_id() == *instrument_id
3201                            && matches!(k.spec().price_type, PriceType::Bid)
3202                    })
3203                    .map(|(_, v)| v);
3204
3205                let ask_bar = self
3206                    .bars
3207                    .iter()
3208                    .find(|(k, _)| {
3209                        k.instrument_id() == *instrument_id
3210                            && matches!(k.spec().price_type, PriceType::Ask)
3211                    })
3212                    .map(|(_, v)| v);
3213
3214                match (bid_bar, ask_bar) {
3215                    (Some(bid), Some(ask)) => {
3216                        match (bid.front(), ask.front()) {
3217                            (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
3218                            _ => {
3219                                // Empty bar VecDeques
3220                                continue;
3221                            }
3222                        }
3223                    }
3224                    _ => continue,
3225                }
3226            };
3227
3228            bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3229            ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3230        }
3231
3232        (bid_quotes, ask_quotes)
3233    }
3234
3235    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
3236    #[must_use]
3237    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3238        self.mark_xrates.get(&(from_currency, to_currency)).copied()
3239    }
3240
3241    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
3242    ///
3243    /// # Panics
3244    ///
3245    /// Panics if `xrate` is not positive.
3246    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3247        assert!(xrate > 0.0, "xrate was zero");
3248        self.mark_xrates.insert((from_currency, to_currency), xrate);
3249        self.mark_xrates
3250            .insert((to_currency, from_currency), 1.0 / xrate);
3251    }
3252
3253    /// Clears the mark exchange rate for the given currency pair.
3254    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3255        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3256    }
3257
3258    /// Clears all mark exchange rates.
3259    pub fn clear_mark_xrates(&mut self) {
3260        self.mark_xrates.clear();
3261    }
3262
3263    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
3264
3265    /// Returns a reference to the instrument for the `instrument_id` (if found).
3266    #[must_use]
3267    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3268        self.instruments.get(instrument_id)
3269    }
3270
3271    /// Returns references to all instrument IDs for the `venue`.
3272    #[must_use]
3273    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3274        match venue {
3275            Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3276            None => self.instruments.keys().collect(),
3277        }
3278    }
3279
3280    /// Returns references to all instruments for the `venue`.
3281    #[must_use]
3282    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3283        self.instruments
3284            .values()
3285            .filter(|i| &i.id().venue == venue)
3286            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3287            .collect()
3288    }
3289
3290    /// Returns references to all bar types contained in the cache.
3291    #[must_use]
3292    pub fn bar_types(
3293        &self,
3294        instrument_id: Option<&InstrumentId>,
3295        price_type: Option<&PriceType>,
3296        aggregation_source: AggregationSource,
3297    ) -> Vec<&BarType> {
3298        let mut bar_types = self
3299            .bars
3300            .keys()
3301            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3302            .collect::<Vec<&BarType>>();
3303
3304        if let Some(instrument_id) = instrument_id {
3305            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3306        }
3307
3308        if let Some(price_type) = price_type {
3309            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3310        }
3311
3312        bar_types
3313    }
3314
3315    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
3316
3317    /// Returns a reference to the synthetic instrument for the `instrument_id` (if found).
3318    #[must_use]
3319    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3320        self.synthetics.get(instrument_id)
3321    }
3322
3323    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
3324    #[must_use]
3325    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3326        self.synthetics.keys().collect()
3327    }
3328
3329    /// Returns references to all synthetic instruments contained in the cache.
3330    #[must_use]
3331    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3332        self.synthetics.values().collect()
3333    }
3334
3335    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
3336
3337    /// Returns a reference to the account for the `account_id` (if found).
3338    #[must_use]
3339    pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3340        self.accounts.get(account_id)
3341    }
3342
3343    /// Returns a reference to the account for the `venue` (if found).
3344    #[must_use]
3345    pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3346        self.index
3347            .venue_account
3348            .get(venue)
3349            .and_then(|account_id| self.accounts.get(account_id))
3350    }
3351
3352    /// Returns a reference to the account ID for the `venue` (if found).
3353    #[must_use]
3354    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3355        self.index.venue_account.get(venue)
3356    }
3357
3358    /// Returns references to all accounts for the `account_id`.
3359    #[must_use]
3360    pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3361        self.accounts
3362            .values()
3363            .filter(|account| &account.id() == account_id)
3364            .collect()
3365    }
3366
3367    /// Updates the own order book with an order.
3368    ///
3369    /// This method adds, updates, or removes an order from the own order book
3370    /// based on the order's current state.
3371    ///
3372    /// Orders without prices (MARKET, etc.) are skipped as they cannot be
3373    /// represented in own books.
3374    pub fn update_own_order_book(&mut self, order: &OrderAny) {
3375        if !order.has_price() {
3376            return;
3377        }
3378
3379        let instrument_id = order.instrument_id();
3380
3381        let own_book = self
3382            .own_books
3383            .entry(instrument_id)
3384            .or_insert_with(|| OwnOrderBook::new(instrument_id));
3385
3386        let own_book_order = order.to_own_book_order();
3387
3388        if order.is_closed() {
3389            if let Err(e) = own_book.delete(own_book_order) {
3390                log::debug!(
3391                    "Failed to delete order {} from own book: {e}",
3392                    order.client_order_id(),
3393                );
3394            } else {
3395                log::debug!("Deleted order {} from own book", order.client_order_id());
3396            }
3397        } else {
3398            // Add or update the order in the own book
3399            if let Err(e) = own_book.update(own_book_order) {
3400                log::debug!(
3401                    "Failed to update order {} in own book: {e}; inserting instead",
3402                    order.client_order_id(),
3403                );
3404                own_book.add(own_book_order);
3405            }
3406            log::debug!("Updated order {} in own book", order.client_order_id());
3407        }
3408    }
3409
3410    /// Force removal of an order from own order books and clean up all indexes.
3411    ///
3412    /// This method is used when order event application fails and we need to ensure
3413    /// terminal orders are properly cleaned up from own books and all relevant indexes.
3414    /// Replicates the index cleanup that update_order performs for closed orders.
3415    pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
3416        let order = match self.orders.get(client_order_id) {
3417            Some(order) => order,
3418            None => return,
3419        };
3420
3421        self.index.orders_open.remove(client_order_id);
3422        self.index.orders_pending_cancel.remove(client_order_id);
3423        self.index.orders_inflight.remove(client_order_id);
3424        self.index.orders_emulated.remove(client_order_id);
3425
3426        if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
3427            && order.has_price()
3428        {
3429            let own_book_order = order.to_own_book_order();
3430            if let Err(e) = own_book.delete(own_book_order) {
3431                log::debug!("Could not force delete {client_order_id} from own book: {e}");
3432            } else {
3433                log::debug!("Force deleted {client_order_id} from own book");
3434            }
3435        }
3436
3437        self.index.orders_closed.insert(*client_order_id);
3438    }
3439
3440    /// Audit all own order books against open and inflight order indexes.
3441    ///
3442    /// Ensures closed orders are removed from own order books. This includes both
3443    /// orders tracked in `orders_open` (ACCEPTED, TRIGGERED, PENDING_*, PARTIALLY_FILLED)
3444    /// and `orders_inflight` (INITIALIZED, SUBMITTED) to prevent false positives
3445    /// during venue latency windows.
3446    pub fn audit_own_order_books(&mut self) {
3447        log::debug!("Starting own books audit");
3448        let start = std::time::Instant::now();
3449
3450        // Build union of open and inflight orders for audit,
3451        // this prevents false positives for SUBMITTED orders during venue latency.
3452        let valid_order_ids: AHashSet<ClientOrderId> = self
3453            .index
3454            .orders_open
3455            .union(&self.index.orders_inflight)
3456            .copied()
3457            .collect();
3458
3459        for own_book in self.own_books.values_mut() {
3460            own_book.audit_open_orders(&valid_order_ids);
3461        }
3462
3463        log::debug!("Completed own books audit in {:?}", start.elapsed());
3464    }
3465}