Skip to main content

nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! In-memory cache for market and execution data, with optional persistent backing.
17//!
18//! Provides methods to load, query, and update cached data such as instruments, orders, and prices.
19
20pub mod config;
21pub mod database;
22pub mod fifo;
23pub mod quote;
24
25mod index;
26
27#[cfg(test)]
28mod tests;
29
30use std::{
31    collections::VecDeque,
32    fmt::{Debug, Display},
33    time::{SystemTime, UNIX_EPOCH},
34};
35
36use ahash::{AHashMap, AHashSet};
37use bytes::Bytes;
38pub use config::CacheConfig; // Re-export
39use database::{CacheDatabaseAdapter, CacheMap};
40use index::CacheIndex;
41use nautilus_core::{
42    UUID4, UnixNanos,
43    correctness::{
44        check_key_not_in_map, check_predicate_false, check_slice_not_empty,
45        check_valid_string_ascii,
46    },
47    datetime::secs_to_nanos_unchecked,
48};
49use nautilus_model::{
50    accounts::{Account, AccountAny},
51    data::{
52        Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
53        TradeTick, YieldCurveData,
54    },
55    enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
56    identifiers::{
57        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
58        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
59    },
60    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
61    orderbook::{
62        OrderBook,
63        own::{OwnOrderBook, should_handle_own_book_order},
64    },
65    orders::{Order, OrderAny, OrderList},
66    position::Position,
67    types::{Currency, Money, Price, Quantity},
68};
69use ustr::Ustr;
70
71use crate::xrate::get_exchange_rate;
72
73/// A common in-memory `Cache` for market and execution related data.
74#[cfg_attr(
75    feature = "python",
76    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
77)]
78pub struct Cache {
79    config: CacheConfig,
80    index: CacheIndex,
81    database: Option<Box<dyn CacheDatabaseAdapter>>,
82    general: AHashMap<String, Bytes>,
83    currencies: AHashMap<Ustr, Currency>,
84    instruments: AHashMap<InstrumentId, InstrumentAny>,
85    synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
86    books: AHashMap<InstrumentId, OrderBook>,
87    own_books: AHashMap<InstrumentId, OwnOrderBook>,
88    quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
89    trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
90    mark_xrates: AHashMap<(Currency, Currency), f64>,
91    mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
92    index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
93    funding_rates: AHashMap<InstrumentId, VecDeque<FundingRateUpdate>>,
94    bars: AHashMap<BarType, VecDeque<Bar>>,
95    greeks: AHashMap<InstrumentId, GreeksData>,
96    yield_curves: AHashMap<String, YieldCurveData>,
97    accounts: AHashMap<AccountId, AccountAny>,
98    orders: AHashMap<ClientOrderId, OrderAny>,
99    order_lists: AHashMap<OrderListId, OrderList>,
100    positions: AHashMap<PositionId, Position>,
101    position_snapshots: AHashMap<PositionId, Bytes>,
102    #[cfg(feature = "defi")]
103    pub(crate) defi: crate::defi::cache::DefiCache,
104}
105
106impl Debug for Cache {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        f.debug_struct(stringify!(Cache))
109            .field("config", &self.config)
110            .field("index", &self.index)
111            .field("general", &self.general)
112            .field("currencies", &self.currencies)
113            .field("instruments", &self.instruments)
114            .field("synthetics", &self.synthetics)
115            .field("books", &self.books)
116            .field("own_books", &self.own_books)
117            .field("quotes", &self.quotes)
118            .field("trades", &self.trades)
119            .field("mark_xrates", &self.mark_xrates)
120            .field("mark_prices", &self.mark_prices)
121            .field("index_prices", &self.index_prices)
122            .field("funding_rates", &self.funding_rates)
123            .field("bars", &self.bars)
124            .field("greeks", &self.greeks)
125            .field("yield_curves", &self.yield_curves)
126            .field("accounts", &self.accounts)
127            .field("orders", &self.orders)
128            .field("order_lists", &self.order_lists)
129            .field("positions", &self.positions)
130            .field("position_snapshots", &self.position_snapshots)
131            .finish()
132    }
133}
134
135impl Default for Cache {
136    /// Creates a new default [`Cache`] instance.
137    fn default() -> Self {
138        Self::new(Some(CacheConfig::default()), None)
139    }
140}
141
142impl Cache {
143    /// Creates a new [`Cache`] instance with optional configuration and database adapter.
144    #[must_use]
145    /// # Note
146    ///
147    /// Uses provided `CacheConfig` or defaults, and optional `CacheDatabaseAdapter` for persistence.
148    pub fn new(
149        config: Option<CacheConfig>,
150        database: Option<Box<dyn CacheDatabaseAdapter>>,
151    ) -> Self {
152        Self {
153            config: config.unwrap_or_default(),
154            index: CacheIndex::default(),
155            database,
156            general: AHashMap::new(),
157            currencies: AHashMap::new(),
158            instruments: AHashMap::new(),
159            synthetics: AHashMap::new(),
160            books: AHashMap::new(),
161            own_books: AHashMap::new(),
162            quotes: AHashMap::new(),
163            trades: AHashMap::new(),
164            mark_xrates: AHashMap::new(),
165            mark_prices: AHashMap::new(),
166            index_prices: AHashMap::new(),
167            funding_rates: AHashMap::new(),
168            bars: AHashMap::new(),
169            greeks: AHashMap::new(),
170            yield_curves: AHashMap::new(),
171            accounts: AHashMap::new(),
172            orders: AHashMap::new(),
173            order_lists: AHashMap::new(),
174            positions: AHashMap::new(),
175            position_snapshots: AHashMap::new(),
176            #[cfg(feature = "defi")]
177            defi: crate::defi::cache::DefiCache::default(),
178        }
179    }
180
181    /// Returns the cache instances memory address.
182    #[must_use]
183    pub fn memory_address(&self) -> String {
184        format!("{:?}", std::ptr::from_ref(self))
185    }
186
187    /// Sets the cache database adapter for persistence.
188    ///
189    /// This allows setting or replacing the database adapter after cache construction.
190    pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
191        let type_name = std::any::type_name_of_val(&*database);
192        log::info!("Cache database adapter set: {type_name}");
193        self.database = Some(database);
194    }
195
196    // -- COMMANDS --------------------------------------------------------------------------------
197
198    /// Clears and reloads general entries from the database into the cache.
199    ///
200    /// # Errors
201    ///
202    /// Returns an error if loading general cache data fails.
203    pub fn cache_general(&mut self) -> anyhow::Result<()> {
204        self.general = match &mut self.database {
205            Some(db) => db.load()?,
206            None => AHashMap::new(),
207        };
208
209        log::info!(
210            "Cached {} general object(s) from database",
211            self.general.len()
212        );
213        Ok(())
214    }
215
216    /// Loads all core caches (currencies, instruments, accounts, orders, positions) from the database.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if loading all cache data fails.
221    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
222        let cache_map = match &self.database {
223            Some(db) => db.load_all().await?,
224            None => CacheMap::default(),
225        };
226
227        self.currencies = cache_map.currencies;
228        self.instruments = cache_map.instruments;
229        self.synthetics = cache_map.synthetics;
230        self.accounts = cache_map.accounts;
231        self.orders = cache_map.orders;
232        self.positions = cache_map.positions;
233        Ok(())
234    }
235
236    /// Clears and reloads the currency cache from the database.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if loading currencies cache fails.
241    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
242        self.currencies = match &mut self.database {
243            Some(db) => db.load_currencies().await?,
244            None => AHashMap::new(),
245        };
246
247        log::info!("Cached {} currencies from database", self.general.len());
248        Ok(())
249    }
250
251    /// Clears and reloads the instrument cache from the database.
252    ///
253    /// # Errors
254    ///
255    /// Returns an error if loading instruments cache fails.
256    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
257        self.instruments = match &mut self.database {
258            Some(db) => db.load_instruments().await?,
259            None => AHashMap::new(),
260        };
261
262        log::info!("Cached {} instruments from database", self.general.len());
263        Ok(())
264    }
265
266    /// Clears and reloads the synthetic instrument cache from the database.
267    ///
268    /// # Errors
269    ///
270    /// Returns an error if loading synthetic instruments cache fails.
271    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
272        self.synthetics = match &mut self.database {
273            Some(db) => db.load_synthetics().await?,
274            None => AHashMap::new(),
275        };
276
277        log::info!(
278            "Cached {} synthetic instruments from database",
279            self.general.len()
280        );
281        Ok(())
282    }
283
284    /// Clears and reloads the account cache from the database.
285    ///
286    /// # Errors
287    ///
288    /// Returns an error if loading accounts cache fails.
289    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
290        self.accounts = match &mut self.database {
291            Some(db) => db.load_accounts().await?,
292            None => AHashMap::new(),
293        };
294
295        log::info!(
296            "Cached {} synthetic instruments from database",
297            self.general.len()
298        );
299        Ok(())
300    }
301
302    /// Clears and reloads the order cache from the database.
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if loading orders cache fails.
307    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
308        self.orders = match &mut self.database {
309            Some(db) => db.load_orders().await?,
310            None => AHashMap::new(),
311        };
312
313        log::info!("Cached {} orders from database", self.general.len());
314        Ok(())
315    }
316
317    /// Clears and reloads the position cache from the database.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if loading positions cache fails.
322    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
323        self.positions = match &mut self.database {
324            Some(db) => db.load_positions().await?,
325            None => AHashMap::new(),
326        };
327
328        log::info!("Cached {} positions from database", self.general.len());
329        Ok(())
330    }
331
332    /// Clears the current cache index and re-build.
333    pub fn build_index(&mut self) {
334        log::debug!("Building index");
335
336        // Index accounts
337        for account_id in self.accounts.keys() {
338            self.index
339                .venue_account
340                .insert(account_id.get_issuer(), *account_id);
341        }
342
343        // Index orders
344        for (client_order_id, order) in &self.orders {
345            let instrument_id = order.instrument_id();
346            let venue = instrument_id.venue;
347            let strategy_id = order.strategy_id();
348
349            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
350            self.index
351                .venue_orders
352                .entry(venue)
353                .or_default()
354                .insert(*client_order_id);
355
356            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
357            if let Some(venue_order_id) = order.venue_order_id() {
358                self.index
359                    .venue_order_ids
360                    .insert(venue_order_id, *client_order_id);
361            }
362
363            // 3: Build index.order_position -> {ClientOrderId, PositionId}
364            if let Some(position_id) = order.position_id() {
365                self.index
366                    .order_position
367                    .insert(*client_order_id, position_id);
368            }
369
370            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
371            self.index
372                .order_strategy
373                .insert(*client_order_id, order.strategy_id());
374
375            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
376            self.index
377                .instrument_orders
378                .entry(instrument_id)
379                .or_default()
380                .insert(*client_order_id);
381
382            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
383            self.index
384                .strategy_orders
385                .entry(strategy_id)
386                .or_default()
387                .insert(*client_order_id);
388
389            // 7: Build index.account_orders -> {AccountId, {ClientOrderId}}
390            if let Some(account_id) = order.account_id() {
391                self.index
392                    .account_orders
393                    .entry(account_id)
394                    .or_default()
395                    .insert(*client_order_id);
396            }
397
398            // 8: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
399            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
400                self.index
401                    .exec_algorithm_orders
402                    .entry(exec_algorithm_id)
403                    .or_default()
404                    .insert(*client_order_id);
405            }
406
407            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
408            if let Some(exec_spawn_id) = order.exec_spawn_id() {
409                self.index
410                    .exec_spawn_orders
411                    .entry(exec_spawn_id)
412                    .or_default()
413                    .insert(*client_order_id);
414            }
415
416            // 9: Build index.orders -> {ClientOrderId}
417            self.index.orders.insert(*client_order_id);
418
419            // 10: Build index.orders_open -> {ClientOrderId}
420            if order.is_open() {
421                self.index.orders_open.insert(*client_order_id);
422            }
423
424            // 11: Build index.orders_closed -> {ClientOrderId}
425            if order.is_closed() {
426                self.index.orders_closed.insert(*client_order_id);
427            }
428
429            // 12: Build index.orders_emulated -> {ClientOrderId}
430            if let Some(emulation_trigger) = order.emulation_trigger()
431                && emulation_trigger != TriggerType::NoTrigger
432                && !order.is_closed()
433            {
434                self.index.orders_emulated.insert(*client_order_id);
435            }
436
437            // 13: Build index.orders_inflight -> {ClientOrderId}
438            if order.is_inflight() {
439                self.index.orders_inflight.insert(*client_order_id);
440            }
441
442            // 14: Build index.strategies -> {StrategyId}
443            self.index.strategies.insert(strategy_id);
444
445            // 15: Build index.strategies -> {ExecAlgorithmId}
446            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
447                self.index.exec_algorithms.insert(exec_algorithm_id);
448            }
449        }
450
451        // Index positions
452        for (position_id, position) in &self.positions {
453            let instrument_id = position.instrument_id;
454            let venue = instrument_id.venue;
455            let strategy_id = position.strategy_id;
456
457            // 1: Build index.venue_positions -> {Venue, {PositionId}}
458            self.index
459                .venue_positions
460                .entry(venue)
461                .or_default()
462                .insert(*position_id);
463
464            // 2: Build index.position_strategy -> {PositionId, StrategyId}
465            self.index
466                .position_strategy
467                .insert(*position_id, position.strategy_id);
468
469            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
470            self.index
471                .position_orders
472                .entry(*position_id)
473                .or_default()
474                .extend(position.client_order_ids().into_iter());
475
476            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
477            self.index
478                .instrument_positions
479                .entry(instrument_id)
480                .or_default()
481                .insert(*position_id);
482
483            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
484            self.index
485                .strategy_positions
486                .entry(strategy_id)
487                .or_default()
488                .insert(*position_id);
489
490            // 6: Build index.account_positions -> {AccountId, {PositionId}}
491            self.index
492                .account_positions
493                .entry(position.account_id)
494                .or_default()
495                .insert(*position_id);
496
497            // 7: Build index.positions -> {PositionId}
498            self.index.positions.insert(*position_id);
499
500            // 8: Build index.positions_open -> {PositionId}
501            if position.is_open() {
502                self.index.positions_open.insert(*position_id);
503            }
504
505            // 9: Build index.positions_closed -> {PositionId}
506            if position.is_closed() {
507                self.index.positions_closed.insert(*position_id);
508            }
509
510            // 10: Build index.strategies -> {StrategyId}
511            self.index.strategies.insert(strategy_id);
512        }
513    }
514
515    /// Returns whether the cache has a backing database.
516    #[must_use]
517    pub const fn has_backing(&self) -> bool {
518        self.config.database.is_some()
519    }
520
521    // Calculate the unrealized profit and loss (PnL) for `position`.
522    #[must_use]
523    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
524        let quote = if let Some(quote) = self.quote(&position.instrument_id) {
525            quote
526        } else {
527            log::warn!(
528                "Cannot calculate unrealized PnL for {}, no quotes for {}",
529                position.id,
530                position.instrument_id
531            );
532            return None;
533        };
534
535        // Use exit price for mark-to-market: longs exit at bid, shorts exit at ask
536        let last = match position.side {
537            PositionSide::Flat | PositionSide::NoPositionSide => {
538                return Some(Money::new(0.0, position.settlement_currency));
539            }
540            PositionSide::Long => quote.bid_price,
541            PositionSide::Short => quote.ask_price,
542        };
543
544        Some(position.unrealized_pnl(last))
545    }
546
547    /// Checks integrity of data within the cache.
548    ///
549    /// All data should be loaded from the database prior to this call.
550    /// If an error is found then a log error message will also be produced.
551    ///
552    /// # Panics
553    ///
554    /// Panics if failure calling system clock.
555    #[must_use]
556    pub fn check_integrity(&mut self) -> bool {
557        let mut error_count = 0;
558        let failure = "Integrity failure";
559
560        // Get current timestamp in microseconds
561        let timestamp_us = SystemTime::now()
562            .duration_since(UNIX_EPOCH)
563            .expect("Time went backwards")
564            .as_micros();
565
566        log::info!("Checking data integrity");
567
568        // Check object caches
569        for account_id in self.accounts.keys() {
570            if !self
571                .index
572                .venue_account
573                .contains_key(&account_id.get_issuer())
574            {
575                log::error!(
576                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
577                );
578                error_count += 1;
579            }
580        }
581
582        for (client_order_id, order) in &self.orders {
583            if !self.index.order_strategy.contains_key(client_order_id) {
584                log::error!(
585                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
586                );
587                error_count += 1;
588            }
589            if !self.index.orders.contains(client_order_id) {
590                log::error!(
591                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
592                );
593                error_count += 1;
594            }
595            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
596                log::error!(
597                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
598                );
599                error_count += 1;
600            }
601            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
602                log::error!(
603                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
604                );
605                error_count += 1;
606            }
607            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
608                log::error!(
609                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
610                );
611                error_count += 1;
612            }
613            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
614                if !self
615                    .index
616                    .exec_algorithm_orders
617                    .contains_key(&exec_algorithm_id)
618                {
619                    log::error!(
620                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
621                    );
622                    error_count += 1;
623                }
624                if order.exec_spawn_id().is_none()
625                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
626                {
627                    log::error!(
628                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
629                    );
630                    error_count += 1;
631                }
632            }
633        }
634
635        for (position_id, position) in &self.positions {
636            if !self.index.position_strategy.contains_key(position_id) {
637                log::error!(
638                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
639                );
640                error_count += 1;
641            }
642            if !self.index.position_orders.contains_key(position_id) {
643                log::error!(
644                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
645                );
646                error_count += 1;
647            }
648            if !self.index.positions.contains(position_id) {
649                log::error!(
650                    "{failure} in positions: {position_id} not found in `self.index.positions`",
651                );
652                error_count += 1;
653            }
654            if position.is_open() && !self.index.positions_open.contains(position_id) {
655                log::error!(
656                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
657                );
658                error_count += 1;
659            }
660            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
661                log::error!(
662                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
663                );
664                error_count += 1;
665            }
666        }
667
668        // Check indexes
669        for account_id in self.index.venue_account.values() {
670            if !self.accounts.contains_key(account_id) {
671                log::error!(
672                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
673                );
674                error_count += 1;
675            }
676        }
677
678        for client_order_id in self.index.venue_order_ids.values() {
679            if !self.orders.contains_key(client_order_id) {
680                log::error!(
681                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
682                );
683                error_count += 1;
684            }
685        }
686
687        for client_order_id in self.index.client_order_ids.keys() {
688            if !self.orders.contains_key(client_order_id) {
689                log::error!(
690                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
691                );
692                error_count += 1;
693            }
694        }
695
696        for client_order_id in self.index.order_position.keys() {
697            if !self.orders.contains_key(client_order_id) {
698                log::error!(
699                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
700                );
701                error_count += 1;
702            }
703        }
704
705        // Check indexes
706        for client_order_id in self.index.order_strategy.keys() {
707            if !self.orders.contains_key(client_order_id) {
708                log::error!(
709                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
710                );
711                error_count += 1;
712            }
713        }
714
715        for position_id in self.index.position_strategy.keys() {
716            if !self.positions.contains_key(position_id) {
717                log::error!(
718                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
719                );
720                error_count += 1;
721            }
722        }
723
724        for position_id in self.index.position_orders.keys() {
725            if !self.positions.contains_key(position_id) {
726                log::error!(
727                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
728                );
729                error_count += 1;
730            }
731        }
732
733        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
734            for client_order_id in client_order_ids {
735                if !self.orders.contains_key(client_order_id) {
736                    log::error!(
737                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
738                    );
739                    error_count += 1;
740                }
741            }
742        }
743
744        for instrument_id in self.index.instrument_positions.keys() {
745            if !self.index.instrument_orders.contains_key(instrument_id) {
746                log::error!(
747                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
748                );
749                error_count += 1;
750            }
751        }
752
753        for client_order_ids in self.index.strategy_orders.values() {
754            for client_order_id in client_order_ids {
755                if !self.orders.contains_key(client_order_id) {
756                    log::error!(
757                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
758                    );
759                    error_count += 1;
760                }
761            }
762        }
763
764        for position_ids in self.index.strategy_positions.values() {
765            for position_id in position_ids {
766                if !self.positions.contains_key(position_id) {
767                    log::error!(
768                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
769                    );
770                    error_count += 1;
771                }
772            }
773        }
774
775        for client_order_id in &self.index.orders {
776            if !self.orders.contains_key(client_order_id) {
777                log::error!(
778                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
779                );
780                error_count += 1;
781            }
782        }
783
784        for client_order_id in &self.index.orders_emulated {
785            if !self.orders.contains_key(client_order_id) {
786                log::error!(
787                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
788                );
789                error_count += 1;
790            }
791        }
792
793        for client_order_id in &self.index.orders_inflight {
794            if !self.orders.contains_key(client_order_id) {
795                log::error!(
796                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
797                );
798                error_count += 1;
799            }
800        }
801
802        for client_order_id in &self.index.orders_open {
803            if !self.orders.contains_key(client_order_id) {
804                log::error!(
805                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
806                );
807                error_count += 1;
808            }
809        }
810
811        for client_order_id in &self.index.orders_closed {
812            if !self.orders.contains_key(client_order_id) {
813                log::error!(
814                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
815                );
816                error_count += 1;
817            }
818        }
819
820        for position_id in &self.index.positions {
821            if !self.positions.contains_key(position_id) {
822                log::error!(
823                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
824                );
825                error_count += 1;
826            }
827        }
828
829        for position_id in &self.index.positions_open {
830            if !self.positions.contains_key(position_id) {
831                log::error!(
832                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
833                );
834                error_count += 1;
835            }
836        }
837
838        for position_id in &self.index.positions_closed {
839            if !self.positions.contains_key(position_id) {
840                log::error!(
841                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
842                );
843                error_count += 1;
844            }
845        }
846
847        for strategy_id in &self.index.strategies {
848            if !self.index.strategy_orders.contains_key(strategy_id) {
849                log::error!(
850                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
851                );
852                error_count += 1;
853            }
854        }
855
856        for exec_algorithm_id in &self.index.exec_algorithms {
857            if !self
858                .index
859                .exec_algorithm_orders
860                .contains_key(exec_algorithm_id)
861            {
862                log::error!(
863                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
864                );
865                error_count += 1;
866            }
867        }
868
869        let total_us = SystemTime::now()
870            .duration_since(UNIX_EPOCH)
871            .expect("Time went backwards")
872            .as_micros()
873            - timestamp_us;
874
875        if error_count == 0 {
876            log::info!("Integrity check passed in {total_us}μs");
877            true
878        } else {
879            log::error!(
880                "Integrity check failed with {error_count} error{} in {total_us}μs",
881                if error_count == 1 { "" } else { "s" },
882            );
883            false
884        }
885    }
886
887    /// Checks for any residual open state and log warnings if any are found.
888    ///
889    ///'Open state' is considered to be open orders and open positions.
890    #[must_use]
891    pub fn check_residuals(&self) -> bool {
892        log::debug!("Checking residuals");
893
894        let mut residuals = false;
895
896        // Check for any open orders
897        for order in self.orders_open(None, None, None, None, None) {
898            residuals = true;
899            log::warn!("Residual {order}");
900        }
901
902        // Check for any open positions
903        for position in self.positions_open(None, None, None, None, None) {
904            residuals = true;
905            log::warn!("Residual {position}");
906        }
907
908        residuals
909    }
910
911    /// Purges all closed orders from the cache that are older than `buffer_secs`.
912    ///
913    ///
914    /// Only orders that have been closed for at least this amount of time will be purged.
915    /// A value of 0 means purge all closed orders regardless of when they were closed.
916    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
917        log::debug!(
918            "Purging closed orders{}",
919            if buffer_secs > 0 {
920                format!(" with buffer_secs={buffer_secs}")
921            } else {
922                String::new()
923            }
924        );
925
926        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
927
928        let mut affected_order_list_ids: AHashSet<OrderListId> = AHashSet::new();
929
930        'outer: for client_order_id in self.index.orders_closed.clone() {
931            if let Some(order) = self.orders.get(&client_order_id)
932                && order.is_closed()
933                && let Some(ts_closed) = order.ts_closed()
934                && ts_closed + buffer_ns <= ts_now
935            {
936                // Check any linked orders (contingency orders)
937                if let Some(linked_order_ids) = order.linked_order_ids() {
938                    for linked_order_id in linked_order_ids {
939                        if let Some(linked_order) = self.orders.get(linked_order_id)
940                            && linked_order.is_open()
941                        {
942                            // Do not purge if linked order still open
943                            continue 'outer;
944                        }
945                    }
946                }
947
948                if let Some(order_list_id) = order.order_list_id() {
949                    affected_order_list_ids.insert(order_list_id);
950                }
951
952                self.purge_order(client_order_id);
953            }
954        }
955
956        for order_list_id in affected_order_list_ids {
957            if let Some(order_list) = self.order_lists.get(&order_list_id) {
958                let all_purged = order_list
959                    .client_order_ids
960                    .iter()
961                    .all(|id| !self.orders.contains_key(id));
962                if all_purged {
963                    self.order_lists.remove(&order_list_id);
964                    log::info!("Purged {order_list_id}");
965                }
966            }
967        }
968    }
969
970    /// Purges all closed positions from the cache that are older than `buffer_secs`.
971    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
972        log::debug!(
973            "Purging closed positions{}",
974            if buffer_secs > 0 {
975                format!(" with buffer_secs={buffer_secs}")
976            } else {
977                String::new()
978            }
979        );
980
981        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
982
983        for position_id in self.index.positions_closed.clone() {
984            if let Some(position) = self.positions.get(&position_id)
985                && position.is_closed()
986                && let Some(ts_closed) = position.ts_closed
987                && ts_closed + buffer_ns <= ts_now
988            {
989                self.purge_position(position_id);
990            }
991        }
992    }
993
994    /// Purges the order with the `client_order_id` from the cache (if found).
995    ///
996    /// For safety, an order is prevented from being purged if it's open.
997    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
998        // Check if order exists and is safe to purge before removing
999        let order = self.orders.get(&client_order_id).cloned();
1000
1001        // SAFETY: Prevent purging open orders
1002        if let Some(ref ord) = order
1003            && ord.is_open()
1004        {
1005            log::warn!("Order {client_order_id} found open when purging, skipping purge");
1006            return;
1007        }
1008
1009        // If order exists in cache, remove it and clean up order-specific indices
1010        if let Some(ref ord) = order {
1011            // Safe to purge
1012            self.orders.remove(&client_order_id);
1013
1014            // Remove order from venue index
1015            if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
1016            {
1017                venue_orders.remove(&client_order_id);
1018            }
1019
1020            // Remove venue order ID index if exists
1021            if let Some(venue_order_id) = ord.venue_order_id() {
1022                self.index.venue_order_ids.remove(&venue_order_id);
1023            }
1024
1025            // Remove from instrument orders index
1026            if let Some(instrument_orders) =
1027                self.index.instrument_orders.get_mut(&ord.instrument_id())
1028            {
1029                instrument_orders.remove(&client_order_id);
1030            }
1031
1032            // Remove from position orders index if associated with a position
1033            if let Some(position_id) = ord.position_id()
1034                && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
1035            {
1036                position_orders.remove(&client_order_id);
1037            }
1038
1039            // Remove from exec algorithm orders index if it has an exec algorithm
1040            if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
1041                && let Some(exec_algorithm_orders) =
1042                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
1043            {
1044                exec_algorithm_orders.remove(&client_order_id);
1045            }
1046
1047            // Clean up strategy orders reverse index
1048            if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1049                strategy_orders.remove(&client_order_id);
1050                if strategy_orders.is_empty() {
1051                    self.index.strategy_orders.remove(&ord.strategy_id());
1052                }
1053            }
1054
1055            // Clean up account orders index
1056            if let Some(account_id) = ord.account_id()
1057                && let Some(account_orders) = self.index.account_orders.get_mut(&account_id)
1058            {
1059                account_orders.remove(&client_order_id);
1060                if account_orders.is_empty() {
1061                    self.index.account_orders.remove(&account_id);
1062                }
1063            }
1064
1065            // Clean up exec spawn reverse index (if this order is a spawned child)
1066            if let Some(exec_spawn_id) = ord.exec_spawn_id()
1067                && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1068            {
1069                spawn_orders.remove(&client_order_id);
1070                if spawn_orders.is_empty() {
1071                    self.index.exec_spawn_orders.remove(&exec_spawn_id);
1072                }
1073            }
1074
1075            log::info!("Purged order {client_order_id}");
1076        } else {
1077            log::warn!("Order {client_order_id} not found when purging");
1078        }
1079
1080        // Always clean up order indices (even if order was not in cache)
1081        self.index.order_position.remove(&client_order_id);
1082        let strategy_id = self.index.order_strategy.remove(&client_order_id);
1083        self.index.order_client.remove(&client_order_id);
1084        self.index.client_order_ids.remove(&client_order_id);
1085
1086        // Clean up reverse index when order not in cache (using forward index)
1087        if let Some(strategy_id) = strategy_id
1088            && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1089        {
1090            strategy_orders.remove(&client_order_id);
1091            if strategy_orders.is_empty() {
1092                self.index.strategy_orders.remove(&strategy_id);
1093            }
1094        }
1095
1096        // Remove spawn parent entry if this order was a spawn root
1097        self.index.exec_spawn_orders.remove(&client_order_id);
1098
1099        self.index.orders.remove(&client_order_id);
1100        self.index.orders_open.remove(&client_order_id);
1101        self.index.orders_closed.remove(&client_order_id);
1102        self.index.orders_emulated.remove(&client_order_id);
1103        self.index.orders_inflight.remove(&client_order_id);
1104        self.index.orders_pending_cancel.remove(&client_order_id);
1105    }
1106
1107    /// Purges the position with the `position_id` from the cache (if found).
1108    ///
1109    /// For safety, a position is prevented from being purged if it's open.
1110    pub fn purge_position(&mut self, position_id: PositionId) {
1111        // Check if position exists and is safe to purge before removing
1112        let position = self.positions.get(&position_id).cloned();
1113
1114        // SAFETY: Prevent purging open positions
1115        if let Some(ref pos) = position
1116            && pos.is_open()
1117        {
1118            log::warn!("Position {position_id} found open when purging, skipping purge");
1119            return;
1120        }
1121
1122        // If position exists in cache, remove it and clean up position-specific indices
1123        if let Some(ref pos) = position {
1124            self.positions.remove(&position_id);
1125
1126            // Remove from venue positions index
1127            if let Some(venue_positions) =
1128                self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1129            {
1130                venue_positions.remove(&position_id);
1131            }
1132
1133            // Remove from instrument positions index
1134            if let Some(instrument_positions) =
1135                self.index.instrument_positions.get_mut(&pos.instrument_id)
1136            {
1137                instrument_positions.remove(&position_id);
1138            }
1139
1140            // Remove from strategy positions index
1141            if let Some(strategy_positions) =
1142                self.index.strategy_positions.get_mut(&pos.strategy_id)
1143            {
1144                strategy_positions.remove(&position_id);
1145            }
1146
1147            // Remove from account positions index
1148            if let Some(account_positions) = self.index.account_positions.get_mut(&pos.account_id) {
1149                account_positions.remove(&position_id);
1150                if account_positions.is_empty() {
1151                    self.index.account_positions.remove(&pos.account_id);
1152                }
1153            }
1154
1155            // Remove position ID from orders that reference it
1156            for client_order_id in pos.client_order_ids() {
1157                self.index.order_position.remove(&client_order_id);
1158            }
1159
1160            log::info!("Purged position {position_id}");
1161        } else {
1162            log::warn!("Position {position_id} not found when purging");
1163        }
1164
1165        // Always clean up position indices (even if position not in cache)
1166        self.index.position_strategy.remove(&position_id);
1167        self.index.position_orders.remove(&position_id);
1168        self.index.positions.remove(&position_id);
1169        self.index.positions_open.remove(&position_id);
1170        self.index.positions_closed.remove(&position_id);
1171
1172        // Always clean up position snapshots (even if position not in cache)
1173        self.position_snapshots.remove(&position_id);
1174    }
1175
1176    /// Purges all account state events which are outside the lookback window.
1177    ///
1178    /// Only events which are outside the lookback window will be purged.
1179    /// A value of 0 means purge all account state events.
1180    pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1181        log::debug!(
1182            "Purging account events{}",
1183            if lookback_secs > 0 {
1184                format!(" with lookback_secs={lookback_secs}")
1185            } else {
1186                String::new()
1187            }
1188        );
1189
1190        for account in self.accounts.values_mut() {
1191            let event_count = account.event_count();
1192            account.purge_account_events(ts_now, lookback_secs);
1193            let count_diff = event_count - account.event_count();
1194            if count_diff > 0 {
1195                log::info!(
1196                    "Purged {} event(s) from account {}",
1197                    count_diff,
1198                    account.id()
1199                );
1200            }
1201        }
1202    }
1203
1204    /// Clears the caches index.
1205    pub fn clear_index(&mut self) {
1206        self.index.clear();
1207        log::debug!("Cleared index");
1208    }
1209
1210    /// Resets the cache.
1211    ///
1212    /// All stateful fields are reset to their initial value.
1213    pub fn reset(&mut self) {
1214        log::debug!("Resetting cache");
1215
1216        self.general.clear();
1217        self.currencies.clear();
1218        self.instruments.clear();
1219        self.synthetics.clear();
1220        self.books.clear();
1221        self.own_books.clear();
1222        self.quotes.clear();
1223        self.trades.clear();
1224        self.mark_xrates.clear();
1225        self.mark_prices.clear();
1226        self.index_prices.clear();
1227        self.funding_rates.clear();
1228        self.bars.clear();
1229        self.accounts.clear();
1230        self.orders.clear();
1231        self.order_lists.clear();
1232        self.positions.clear();
1233        self.position_snapshots.clear();
1234        self.greeks.clear();
1235        self.yield_curves.clear();
1236
1237        #[cfg(feature = "defi")]
1238        {
1239            self.defi.pools.clear();
1240            self.defi.pool_profilers.clear();
1241        }
1242
1243        self.clear_index();
1244
1245        log::info!("Reset cache");
1246    }
1247
1248    /// Dispose of the cache which will close any underlying database adapter.
1249    ///
1250    /// If closing the database connection fails, an error is logged.
1251    pub fn dispose(&mut self) {
1252        if let Some(database) = &mut self.database
1253            && let Err(e) = database.close()
1254        {
1255            log::error!("Failed to close database during dispose: {e}");
1256        }
1257    }
1258
1259    /// Flushes the caches database which permanently removes all persisted data.
1260    ///
1261    /// If flushing the database connection fails, an error is logged.
1262    pub fn flush_db(&mut self) {
1263        if let Some(database) = &mut self.database
1264            && let Err(e) = database.flush()
1265        {
1266            log::error!("Failed to flush database: {e}");
1267        }
1268    }
1269
1270    /// Adds a raw bytes `value` to the cache under the `key`.
1271    ///
1272    /// The cache stores only raw bytes; interpretation is the caller's responsibility.
1273    ///
1274    /// # Errors
1275    ///
1276    /// Returns an error if persisting the entry to the backing database fails.
1277    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1278        check_valid_string_ascii(key, stringify!(key))?;
1279        check_predicate_false(value.is_empty(), stringify!(value))?;
1280
1281        log::debug!("Adding general {key}");
1282        self.general.insert(key.to_string(), value.clone());
1283
1284        if let Some(database) = &mut self.database {
1285            database.add(key.to_string(), value)?;
1286        }
1287        Ok(())
1288    }
1289
1290    /// Adds an `OrderBook` to the cache.
1291    ///
1292    /// # Errors
1293    ///
1294    /// Returns an error if persisting the order book to the backing database fails.
1295    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1296        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1297
1298        if self.config.save_market_data
1299            && let Some(database) = &mut self.database
1300        {
1301            database.add_order_book(&book)?;
1302        }
1303
1304        self.books.insert(book.instrument_id, book);
1305        Ok(())
1306    }
1307
1308    /// Adds an `OwnOrderBook` to the cache.
1309    ///
1310    /// # Errors
1311    ///
1312    /// Returns an error if persisting the own order book fails.
1313    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1314        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1315
1316        self.own_books.insert(own_book.instrument_id, own_book);
1317        Ok(())
1318    }
1319
1320    /// Adds the `mark_price` update to the cache.
1321    ///
1322    /// # Errors
1323    ///
1324    /// Returns an error if persisting the mark price to the backing database fails.
1325    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1326        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1327
1328        if self.config.save_market_data {
1329            // TODO: Placeholder and return Result for consistency
1330        }
1331
1332        let mark_prices_deque = self
1333            .mark_prices
1334            .entry(mark_price.instrument_id)
1335            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1336        mark_prices_deque.push_front(mark_price);
1337        Ok(())
1338    }
1339
1340    /// Adds the `index_price` update to the cache.
1341    ///
1342    /// # Errors
1343    ///
1344    /// Returns an error if persisting the index price to the backing database fails.
1345    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1346        log::debug!(
1347            "Adding `IndexPriceUpdate` for {}",
1348            index_price.instrument_id
1349        );
1350
1351        if self.config.save_market_data {
1352            // TODO: Placeholder and return Result for consistency
1353        }
1354
1355        let index_prices_deque = self
1356            .index_prices
1357            .entry(index_price.instrument_id)
1358            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1359        index_prices_deque.push_front(index_price);
1360        Ok(())
1361    }
1362
1363    /// Adds the `funding_rate` update to the cache.
1364    ///
1365    /// # Errors
1366    ///
1367    /// Returns an error if persisting the funding rate update to the backing database fails.
1368    pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1369        log::debug!(
1370            "Adding `FundingRateUpdate` for {}",
1371            funding_rate.instrument_id
1372        );
1373
1374        if self.config.save_market_data {
1375            // TODO: Placeholder and return Result for consistency
1376        }
1377
1378        let funding_rates_deque = self
1379            .funding_rates
1380            .entry(funding_rate.instrument_id)
1381            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1382        funding_rates_deque.push_front(funding_rate);
1383        Ok(())
1384    }
1385
1386    /// Adds the given `funding rates` to the cache.
1387    ///
1388    /// # Errors
1389    ///
1390    /// Returns an error if persisting the trade ticks to the backing database fails.
1391    pub fn add_funding_rates(&mut self, funding_rates: &[FundingRateUpdate]) -> anyhow::Result<()> {
1392        check_slice_not_empty(funding_rates, stringify!(funding_rates))?;
1393
1394        let instrument_id = funding_rates[0].instrument_id;
1395        log::debug!(
1396            "Adding `FundingRateUpdate`[{}] {instrument_id}",
1397            funding_rates.len()
1398        );
1399
1400        if self.config.save_market_data
1401            && let Some(database) = &mut self.database
1402        {
1403            for funding_rate in funding_rates {
1404                database.add_funding_rate(funding_rate)?;
1405            }
1406        }
1407
1408        let funding_rate_deque = self
1409            .funding_rates
1410            .entry(instrument_id)
1411            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1412
1413        for funding_rate in funding_rates {
1414            funding_rate_deque.push_front(*funding_rate);
1415        }
1416        Ok(())
1417    }
1418
1419    /// Adds the `quote` tick to the cache.
1420    ///
1421    /// # Errors
1422    ///
1423    /// Returns an error if persisting the quote tick to the backing database fails.
1424    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1425        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1426
1427        if self.config.save_market_data
1428            && let Some(database) = &mut self.database
1429        {
1430            database.add_quote(&quote)?;
1431        }
1432
1433        let quotes_deque = self
1434            .quotes
1435            .entry(quote.instrument_id)
1436            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1437        quotes_deque.push_front(quote);
1438        Ok(())
1439    }
1440
1441    /// Adds the `quotes` to the cache.
1442    ///
1443    /// # Errors
1444    ///
1445    /// Returns an error if persisting the quote ticks to the backing database fails.
1446    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1447        check_slice_not_empty(quotes, stringify!(quotes))?;
1448
1449        let instrument_id = quotes[0].instrument_id;
1450        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1451
1452        if self.config.save_market_data
1453            && let Some(database) = &mut self.database
1454        {
1455            for quote in quotes {
1456                database.add_quote(quote)?;
1457            }
1458        }
1459
1460        let quotes_deque = self
1461            .quotes
1462            .entry(instrument_id)
1463            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1464
1465        for quote in quotes {
1466            quotes_deque.push_front(*quote);
1467        }
1468        Ok(())
1469    }
1470
1471    /// Adds the `trade` tick to the cache.
1472    ///
1473    /// # Errors
1474    ///
1475    /// Returns an error if persisting the trade tick to the backing database fails.
1476    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1477        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1478
1479        if self.config.save_market_data
1480            && let Some(database) = &mut self.database
1481        {
1482            database.add_trade(&trade)?;
1483        }
1484
1485        let trades_deque = self
1486            .trades
1487            .entry(trade.instrument_id)
1488            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1489        trades_deque.push_front(trade);
1490        Ok(())
1491    }
1492
1493    /// Adds the give `trades` to the cache.
1494    ///
1495    /// # Errors
1496    ///
1497    /// Returns an error if persisting the trade ticks to the backing database fails.
1498    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1499        check_slice_not_empty(trades, stringify!(trades))?;
1500
1501        let instrument_id = trades[0].instrument_id;
1502        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1503
1504        if self.config.save_market_data
1505            && let Some(database) = &mut self.database
1506        {
1507            for trade in trades {
1508                database.add_trade(trade)?;
1509            }
1510        }
1511
1512        let trades_deque = self
1513            .trades
1514            .entry(instrument_id)
1515            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1516
1517        for trade in trades {
1518            trades_deque.push_front(*trade);
1519        }
1520        Ok(())
1521    }
1522
1523    /// Adds the `bar` to the cache.
1524    ///
1525    /// # Errors
1526    ///
1527    /// Returns an error if persisting the bar to the backing database fails.
1528    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1529        log::debug!("Adding `Bar` {}", bar.bar_type);
1530
1531        if self.config.save_market_data
1532            && let Some(database) = &mut self.database
1533        {
1534            database.add_bar(&bar)?;
1535        }
1536
1537        let bars = self
1538            .bars
1539            .entry(bar.bar_type)
1540            .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1541        bars.push_front(bar);
1542        Ok(())
1543    }
1544
1545    /// Adds the `bars` to the cache.
1546    ///
1547    /// # Errors
1548    ///
1549    /// Returns an error if persisting the bars to the backing database fails.
1550    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1551        check_slice_not_empty(bars, stringify!(bars))?;
1552
1553        let bar_type = bars[0].bar_type;
1554        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1555
1556        if self.config.save_market_data
1557            && let Some(database) = &mut self.database
1558        {
1559            for bar in bars {
1560                database.add_bar(bar)?;
1561            }
1562        }
1563
1564        let bars_deque = self
1565            .bars
1566            .entry(bar_type)
1567            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1568
1569        for bar in bars {
1570            bars_deque.push_front(*bar);
1571        }
1572        Ok(())
1573    }
1574
1575    /// Adds the `greeks` data to the cache.
1576    ///
1577    /// # Errors
1578    ///
1579    /// Returns an error if persisting the greeks data to the backing database fails.
1580    pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1581        log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1582
1583        if self.config.save_market_data
1584            && let Some(_database) = &mut self.database
1585        {
1586            // TODO: Implement database.add_greeks(&greeks) when database adapter is updated
1587        }
1588
1589        self.greeks.insert(greeks.instrument_id, greeks);
1590        Ok(())
1591    }
1592
1593    /// Gets the greeks data for the `instrument_id`.
1594    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1595        self.greeks.get(instrument_id).cloned()
1596    }
1597
1598    /// Adds the `yield_curve` data to the cache.
1599    ///
1600    /// # Errors
1601    ///
1602    /// Returns an error if persisting the yield curve data to the backing database fails.
1603    pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1604        log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1605
1606        if self.config.save_market_data
1607            && let Some(_database) = &mut self.database
1608        {
1609            // TODO: Implement database.add_yield_curve(&yield_curve) when database adapter is updated
1610        }
1611
1612        self.yield_curves
1613            .insert(yield_curve.curve_name.clone(), yield_curve);
1614        Ok(())
1615    }
1616
1617    /// Gets the yield curve for the `key`.
1618    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1619        self.yield_curves.get(key).map(|curve| {
1620            let curve_clone = curve.clone();
1621            Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1622                as Box<dyn Fn(f64) -> f64>
1623        })
1624    }
1625
1626    /// Adds the `currency` to the cache.
1627    ///
1628    /// # Errors
1629    ///
1630    /// Returns an error if persisting the currency to the backing database fails.
1631    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1632        if self.currencies.contains_key(&currency.code) {
1633            return Ok(());
1634        }
1635        log::debug!("Adding `Currency` {}", currency.code);
1636
1637        if let Some(database) = &mut self.database {
1638            database.add_currency(&currency)?;
1639        }
1640
1641        self.currencies.insert(currency.code, currency);
1642        Ok(())
1643    }
1644
1645    /// Adds the `instrument` to the cache.
1646    ///
1647    /// # Errors
1648    ///
1649    /// Returns an error if persisting the instrument to the backing database fails.
1650    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1651        log::debug!("Adding `Instrument` {}", instrument.id());
1652
1653        // Ensure currencies exist in cache - safe to call repeatedly as add_currency is idempotent
1654        if let Some(base_currency) = instrument.base_currency() {
1655            self.add_currency(base_currency)?;
1656        }
1657        self.add_currency(instrument.quote_currency())?;
1658        self.add_currency(instrument.settlement_currency())?;
1659
1660        if let Some(database) = &mut self.database {
1661            database.add_instrument(&instrument)?;
1662        }
1663
1664        self.instruments.insert(instrument.id(), instrument);
1665        Ok(())
1666    }
1667
1668    /// Adds the `synthetic` instrument to the cache.
1669    ///
1670    /// # Errors
1671    ///
1672    /// Returns an error if persisting the synthetic instrument to the backing database fails.
1673    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1674        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1675
1676        if let Some(database) = &mut self.database {
1677            database.add_synthetic(&synthetic)?;
1678        }
1679
1680        self.synthetics.insert(synthetic.id, synthetic);
1681        Ok(())
1682    }
1683
1684    /// Adds the `account` to the cache.
1685    ///
1686    /// # Errors
1687    ///
1688    /// Returns an error if persisting the account to the backing database fails.
1689    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1690        log::debug!("Adding `Account` {}", account.id());
1691
1692        if let Some(database) = &mut self.database {
1693            database.add_account(&account)?;
1694        }
1695
1696        let account_id = account.id();
1697        self.accounts.insert(account_id, account);
1698        self.index
1699            .venue_account
1700            .insert(account_id.get_issuer(), account_id);
1701        Ok(())
1702    }
1703
1704    /// Indexes the `client_order_id` with the `venue_order_id`.
1705    ///
1706    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
1707    ///
1708    /// # Errors
1709    ///
1710    /// Returns an error if the existing venue order ID conflicts and overwrite is false.
1711    pub fn add_venue_order_id(
1712        &mut self,
1713        client_order_id: &ClientOrderId,
1714        venue_order_id: &VenueOrderId,
1715        overwrite: bool,
1716    ) -> anyhow::Result<()> {
1717        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1718            && !overwrite
1719            && existing_venue_order_id != venue_order_id
1720        {
1721            anyhow::bail!(
1722                "Existing {existing_venue_order_id} for {client_order_id}
1723                    did not match the given {venue_order_id}.
1724                    If you are writing a test then try a different `venue_order_id`,
1725                    otherwise this is probably a bug."
1726            );
1727        }
1728
1729        self.index
1730            .client_order_ids
1731            .insert(*client_order_id, *venue_order_id);
1732        self.index
1733            .venue_order_ids
1734            .insert(*venue_order_id, *client_order_id);
1735
1736        Ok(())
1737    }
1738
1739    /// Adds the `order` to the cache indexed with any given identifiers.
1740    ///
1741    /// # Parameters
1742    ///
1743    /// `override_existing`: If the added order should 'override' any existing order and replace
1744    /// it in the cache. This is currently used for emulated orders which are
1745    /// being released and transformed into another type.
1746    ///
1747    /// # Errors
1748    ///
1749    /// Returns an error if not `replace_existing` and the `order.client_order_id` is already contained in the cache.
1750    pub fn add_order(
1751        &mut self,
1752        order: OrderAny,
1753        position_id: Option<PositionId>,
1754        client_id: Option<ClientId>,
1755        replace_existing: bool,
1756    ) -> anyhow::Result<()> {
1757        let instrument_id = order.instrument_id();
1758        let venue = instrument_id.venue;
1759        let client_order_id = order.client_order_id();
1760        let strategy_id = order.strategy_id();
1761        let exec_algorithm_id = order.exec_algorithm_id();
1762        let exec_spawn_id = order.exec_spawn_id();
1763
1764        if !replace_existing {
1765            check_key_not_in_map(
1766                &client_order_id,
1767                &self.orders,
1768                stringify!(client_order_id),
1769                stringify!(orders),
1770            )?;
1771        }
1772
1773        log::debug!("Adding {order:?}");
1774
1775        self.index.orders.insert(client_order_id);
1776        self.index
1777            .order_strategy
1778            .insert(client_order_id, strategy_id);
1779        self.index.strategies.insert(strategy_id);
1780
1781        // Update venue -> orders index
1782        self.index
1783            .venue_orders
1784            .entry(venue)
1785            .or_default()
1786            .insert(client_order_id);
1787
1788        // Update instrument -> orders index
1789        self.index
1790            .instrument_orders
1791            .entry(instrument_id)
1792            .or_default()
1793            .insert(client_order_id);
1794
1795        // Update strategy -> orders index
1796        self.index
1797            .strategy_orders
1798            .entry(strategy_id)
1799            .or_default()
1800            .insert(client_order_id);
1801
1802        // Update account -> orders index (if account_id known at creation)
1803        if let Some(account_id) = order.account_id() {
1804            self.index
1805                .account_orders
1806                .entry(account_id)
1807                .or_default()
1808                .insert(client_order_id);
1809        }
1810
1811        // Update exec_algorithm -> orders index
1812        if let Some(exec_algorithm_id) = exec_algorithm_id {
1813            self.index.exec_algorithms.insert(exec_algorithm_id);
1814
1815            self.index
1816                .exec_algorithm_orders
1817                .entry(exec_algorithm_id)
1818                .or_default()
1819                .insert(client_order_id);
1820        }
1821
1822        // Update exec_spawn -> orders index
1823        if let Some(exec_spawn_id) = exec_spawn_id {
1824            self.index
1825                .exec_spawn_orders
1826                .entry(exec_spawn_id)
1827                .or_default()
1828                .insert(client_order_id);
1829        }
1830
1831        // Update emulation index
1832        if let Some(emulation_trigger) = order.emulation_trigger()
1833            && emulation_trigger != TriggerType::NoTrigger
1834        {
1835            self.index.orders_emulated.insert(client_order_id);
1836        }
1837
1838        // Index position ID if provided
1839        if let Some(position_id) = position_id {
1840            self.add_position_id(
1841                &position_id,
1842                &order.instrument_id().venue,
1843                &client_order_id,
1844                &strategy_id,
1845            )?;
1846        }
1847
1848        // Index client ID if provided
1849        if let Some(client_id) = client_id {
1850            self.index.order_client.insert(client_order_id, client_id);
1851            log::debug!("Indexed {client_id:?}");
1852        }
1853
1854        if let Some(database) = &mut self.database {
1855            database.add_order(&order, client_id)?;
1856            // TODO: Implement
1857            // if self.config.snapshot_orders {
1858            //     database.snapshot_order_state(order)?;
1859            // }
1860        }
1861
1862        self.orders.insert(client_order_id, order);
1863
1864        Ok(())
1865    }
1866
1867    /// Adds the `order_list` to the cache.
1868    ///
1869    /// # Errors
1870    ///
1871    /// Returns an error if the order list ID is already contained in the cache.
1872    pub fn add_order_list(&mut self, order_list: OrderList) -> anyhow::Result<()> {
1873        let order_list_id = order_list.id;
1874        check_key_not_in_map(
1875            &order_list_id,
1876            &self.order_lists,
1877            stringify!(order_list_id),
1878            stringify!(order_lists),
1879        )?;
1880
1881        log::debug!("Adding {order_list:?}");
1882        self.order_lists.insert(order_list_id, order_list);
1883        Ok(())
1884    }
1885
1886    /// Indexes the `position_id` with the other given IDs.
1887    ///
1888    /// # Errors
1889    ///
1890    /// Returns an error if indexing position ID in the backing database fails.
1891    pub fn add_position_id(
1892        &mut self,
1893        position_id: &PositionId,
1894        venue: &Venue,
1895        client_order_id: &ClientOrderId,
1896        strategy_id: &StrategyId,
1897    ) -> anyhow::Result<()> {
1898        self.index
1899            .order_position
1900            .insert(*client_order_id, *position_id);
1901
1902        // Index: ClientOrderId -> PositionId
1903        if let Some(database) = &mut self.database {
1904            database.index_order_position(*client_order_id, *position_id)?;
1905        }
1906
1907        // Index: PositionId -> StrategyId
1908        self.index
1909            .position_strategy
1910            .insert(*position_id, *strategy_id);
1911
1912        // Index: PositionId -> set[ClientOrderId]
1913        self.index
1914            .position_orders
1915            .entry(*position_id)
1916            .or_default()
1917            .insert(*client_order_id);
1918
1919        // Index: StrategyId -> set[PositionId]
1920        self.index
1921            .strategy_positions
1922            .entry(*strategy_id)
1923            .or_default()
1924            .insert(*position_id);
1925
1926        // Index: Venue -> set[PositionId]
1927        self.index
1928            .venue_positions
1929            .entry(*venue)
1930            .or_default()
1931            .insert(*position_id);
1932
1933        Ok(())
1934    }
1935
1936    /// Adds the `position` to the cache.
1937    ///
1938    /// # Errors
1939    ///
1940    /// Returns an error if persisting the position to the backing database fails.
1941    pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1942        self.positions.insert(position.id, position.clone());
1943        self.index.positions.insert(position.id);
1944        self.index.positions_open.insert(position.id);
1945        self.index.positions_closed.remove(&position.id); // Cleanup for NETTING reopen
1946
1947        log::debug!("Adding {position}");
1948
1949        self.add_position_id(
1950            &position.id,
1951            &position.instrument_id.venue,
1952            &position.opening_order_id,
1953            &position.strategy_id,
1954        )?;
1955
1956        let venue = position.instrument_id.venue;
1957        let venue_positions = self.index.venue_positions.entry(venue).or_default();
1958        venue_positions.insert(position.id);
1959
1960        // Index: InstrumentId -> AHashSet
1961        let instrument_id = position.instrument_id;
1962        let instrument_positions = self
1963            .index
1964            .instrument_positions
1965            .entry(instrument_id)
1966            .or_default();
1967        instrument_positions.insert(position.id);
1968
1969        // Index: AccountId -> AHashSet<PositionId>
1970        self.index
1971            .account_positions
1972            .entry(position.account_id)
1973            .or_default()
1974            .insert(position.id);
1975
1976        if let Some(database) = &mut self.database {
1977            database.add_position(&position)?;
1978            // TODO: Implement position snapshots
1979            // if self.snapshot_positions {
1980            //     database.snapshot_position_state(
1981            //         position,
1982            //         position.ts_last,
1983            //         self.calculate_unrealized_pnl(&position),
1984            //     )?;
1985            // }
1986        }
1987
1988        Ok(())
1989    }
1990
1991    /// Updates the `account` in the cache.
1992    ///
1993    /// # Errors
1994    ///
1995    /// Returns an error if updating the account in the database fails.
1996    pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1997        let account_id = account.id();
1998        self.accounts.insert(account_id, account.clone());
1999
2000        if let Some(database) = &mut self.database {
2001            database.update_account(&account)?;
2002        }
2003        Ok(())
2004    }
2005
2006    /// Updates the `order` in the cache.
2007    ///
2008    /// # Errors
2009    ///
2010    /// Returns an error if updating the order in the database fails.
2011    pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2012        let client_order_id = order.client_order_id();
2013
2014        // Update venue order ID
2015        if let Some(venue_order_id) = order.venue_order_id() {
2016            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
2017            // venues which use a cancel+replace update strategy.
2018            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
2019                // TODO: If the last event was `OrderUpdated` then overwrite should be true
2020                self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
2021            }
2022        }
2023
2024        // Update in-flight state
2025        if order.is_inflight() {
2026            self.index.orders_inflight.insert(client_order_id);
2027        } else {
2028            self.index.orders_inflight.remove(&client_order_id);
2029        }
2030
2031        // Update open/closed state
2032        if order.is_open() {
2033            self.index.orders_closed.remove(&client_order_id);
2034            self.index.orders_open.insert(client_order_id);
2035        } else if order.is_closed() {
2036            self.index.orders_open.remove(&client_order_id);
2037            self.index.orders_pending_cancel.remove(&client_order_id);
2038            self.index.orders_closed.insert(client_order_id);
2039        }
2040
2041        // Update emulation index
2042        if let Some(emulation_trigger) = order.emulation_trigger()
2043            && emulation_trigger != TriggerType::NoTrigger
2044            && !order.is_closed()
2045        {
2046            self.index.orders_emulated.insert(client_order_id);
2047        } else {
2048            self.index.orders_emulated.remove(&client_order_id);
2049        }
2050
2051        // Update account orders index when account_id becomes available
2052        if let Some(account_id) = order.account_id() {
2053            self.index
2054                .account_orders
2055                .entry(account_id)
2056                .or_default()
2057                .insert(client_order_id);
2058        }
2059
2060        // Update own book
2061        if self.own_order_book(&order.instrument_id()).is_some()
2062            && should_handle_own_book_order(order)
2063        {
2064            self.update_own_order_book(order);
2065        }
2066
2067        if let Some(database) = &mut self.database {
2068            database.update_order(order.last_event())?;
2069            // TODO: Implement order snapshots
2070            // if self.snapshot_orders {
2071            //     database.snapshot_order_state(order)?;
2072            // }
2073        }
2074
2075        // update the order in the cache
2076        self.orders.insert(client_order_id, order.clone());
2077
2078        Ok(())
2079    }
2080
2081    /// Updates the `order` as pending cancel locally.
2082    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
2083        self.index
2084            .orders_pending_cancel
2085            .insert(order.client_order_id());
2086    }
2087
2088    /// Updates the `position` in the cache.
2089    ///
2090    /// # Errors
2091    ///
2092    /// Returns an error if updating the position in the database fails.
2093    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
2094        // Update open/closed state
2095
2096        if position.is_open() {
2097            self.index.positions_open.insert(position.id);
2098            self.index.positions_closed.remove(&position.id);
2099        } else {
2100            self.index.positions_closed.insert(position.id);
2101            self.index.positions_open.remove(&position.id);
2102        }
2103
2104        if let Some(database) = &mut self.database {
2105            database.update_position(position)?;
2106            // TODO: Implement order snapshots
2107            // if self.snapshot_orders {
2108            //     database.snapshot_order_state(order)?;
2109            // }
2110        }
2111
2112        self.positions.insert(position.id, position.clone());
2113
2114        Ok(())
2115    }
2116
2117    /// Creates a snapshot of the `position` by cloning it, assigning a new ID,
2118    /// serializing it, and storing it in the position snapshots.
2119    ///
2120    /// # Errors
2121    ///
2122    /// Returns an error if serializing or storing the position snapshot fails.
2123    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
2124        let position_id = position.id;
2125
2126        let mut copied_position = position.clone();
2127        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
2128        copied_position.id = PositionId::new(new_id);
2129
2130        // Serialize the position (TODO: temporarily just to JSON to remove a dependency)
2131        let position_serialized = serde_json::to_vec(&copied_position)?;
2132
2133        let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
2134        let new_snapshots = match snapshots {
2135            Some(existing_snapshots) => {
2136                let mut combined = existing_snapshots.to_vec();
2137                combined.extend(position_serialized);
2138                Bytes::from(combined)
2139            }
2140            None => Bytes::from(position_serialized),
2141        };
2142        self.position_snapshots.insert(position_id, new_snapshots);
2143
2144        log::debug!("Snapshot {copied_position}");
2145        Ok(())
2146    }
2147
2148    /// Creates a snapshot of the `position` state in the database.
2149    ///
2150    /// # Errors
2151    ///
2152    /// Returns an error if snapshotting the position state fails.
2153    pub fn snapshot_position_state(
2154        &mut self,
2155        position: &Position,
2156        // ts_snapshot: u64,
2157        // unrealized_pnl: Option<Money>,
2158        open_only: Option<bool>,
2159    ) -> anyhow::Result<()> {
2160        let open_only = open_only.unwrap_or(true);
2161
2162        if open_only && !position.is_open() {
2163            return Ok(());
2164        }
2165
2166        if let Some(database) = &mut self.database {
2167            database.snapshot_position_state(position).map_err(|e| {
2168                log::error!(
2169                    "Failed to snapshot position state for {}: {e:?}",
2170                    position.id
2171                );
2172                e
2173            })?;
2174        } else {
2175            log::warn!(
2176                "Cannot snapshot position state for {} (no database configured)",
2177                position.id
2178            );
2179        }
2180
2181        // Ok(())
2182        todo!()
2183    }
2184
2185    /// Gets the OMS type for the `position_id`.
2186    #[must_use]
2187    pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2188        // Get OMS type from the index
2189        if self.index.position_strategy.contains_key(position_id) {
2190            // For now, we'll default to NETTING
2191            // TODO: Store and retrieve actual OMS type per position
2192            Some(OmsType::Netting)
2193        } else {
2194            None
2195        }
2196    }
2197
2198    /// Gets position snapshot bytes for the `position_id`.
2199    #[must_use]
2200    pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
2201        self.position_snapshots.get(position_id).map(|b| b.to_vec())
2202    }
2203
2204    /// Gets position snapshot IDs for the `instrument_id`.
2205    #[must_use]
2206    pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2207        // Get snapshot position IDs that match the instrument
2208        let mut result = AHashSet::new();
2209        for (position_id, _) in &self.position_snapshots {
2210            // Check if this position is for the requested instrument
2211            if let Some(position) = self.positions.get(position_id)
2212                && position.instrument_id == *instrument_id
2213            {
2214                result.insert(*position_id);
2215            }
2216        }
2217        result
2218    }
2219
2220    /// Snapshots the `order` state in the database.
2221    ///
2222    /// # Errors
2223    ///
2224    /// Returns an error if snapshotting the order state fails.
2225    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2226        let database = if let Some(database) = &self.database {
2227            database
2228        } else {
2229            log::warn!(
2230                "Cannot snapshot order state for {} (no database configured)",
2231                order.client_order_id()
2232            );
2233            return Ok(());
2234        };
2235
2236        database.snapshot_order_state(order)
2237    }
2238
2239    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
2240
2241    fn build_order_query_filter_set(
2242        &self,
2243        venue: Option<&Venue>,
2244        instrument_id: Option<&InstrumentId>,
2245        strategy_id: Option<&StrategyId>,
2246        account_id: Option<&AccountId>,
2247    ) -> Option<AHashSet<ClientOrderId>> {
2248        let mut query: Option<AHashSet<ClientOrderId>> = None;
2249
2250        if let Some(venue) = venue {
2251            query = Some(
2252                self.index
2253                    .venue_orders
2254                    .get(venue)
2255                    .cloned()
2256                    .unwrap_or_default(),
2257            );
2258        }
2259
2260        if let Some(instrument_id) = instrument_id {
2261            let instrument_orders = self
2262                .index
2263                .instrument_orders
2264                .get(instrument_id)
2265                .cloned()
2266                .unwrap_or_default();
2267
2268            if let Some(existing_query) = &mut query {
2269                *existing_query = existing_query
2270                    .intersection(&instrument_orders)
2271                    .copied()
2272                    .collect();
2273            } else {
2274                query = Some(instrument_orders);
2275            }
2276        }
2277
2278        if let Some(strategy_id) = strategy_id {
2279            let strategy_orders = self
2280                .index
2281                .strategy_orders
2282                .get(strategy_id)
2283                .cloned()
2284                .unwrap_or_default();
2285
2286            if let Some(existing_query) = &mut query {
2287                *existing_query = existing_query
2288                    .intersection(&strategy_orders)
2289                    .copied()
2290                    .collect();
2291            } else {
2292                query = Some(strategy_orders);
2293            }
2294        }
2295
2296        if let Some(account_id) = account_id {
2297            let account_orders = self
2298                .index
2299                .account_orders
2300                .get(account_id)
2301                .cloned()
2302                .unwrap_or_default();
2303
2304            if let Some(existing_query) = &mut query {
2305                *existing_query = existing_query
2306                    .intersection(&account_orders)
2307                    .copied()
2308                    .collect();
2309            } else {
2310                query = Some(account_orders);
2311            }
2312        }
2313
2314        query
2315    }
2316
2317    fn build_position_query_filter_set(
2318        &self,
2319        venue: Option<&Venue>,
2320        instrument_id: Option<&InstrumentId>,
2321        strategy_id: Option<&StrategyId>,
2322        account_id: Option<&AccountId>,
2323    ) -> Option<AHashSet<PositionId>> {
2324        let mut query: Option<AHashSet<PositionId>> = None;
2325
2326        if let Some(venue) = venue {
2327            query = Some(
2328                self.index
2329                    .venue_positions
2330                    .get(venue)
2331                    .cloned()
2332                    .unwrap_or_default(),
2333            );
2334        }
2335
2336        if let Some(instrument_id) = instrument_id {
2337            let instrument_positions = self
2338                .index
2339                .instrument_positions
2340                .get(instrument_id)
2341                .cloned()
2342                .unwrap_or_default();
2343
2344            if let Some(existing_query) = query {
2345                query = Some(
2346                    existing_query
2347                        .intersection(&instrument_positions)
2348                        .copied()
2349                        .collect(),
2350                );
2351            } else {
2352                query = Some(instrument_positions);
2353            }
2354        }
2355
2356        if let Some(strategy_id) = strategy_id {
2357            let strategy_positions = self
2358                .index
2359                .strategy_positions
2360                .get(strategy_id)
2361                .cloned()
2362                .unwrap_or_default();
2363
2364            if let Some(existing_query) = query {
2365                query = Some(
2366                    existing_query
2367                        .intersection(&strategy_positions)
2368                        .copied()
2369                        .collect(),
2370                );
2371            } else {
2372                query = Some(strategy_positions);
2373            }
2374        }
2375
2376        if let Some(account_id) = account_id {
2377            let account_positions = self
2378                .index
2379                .account_positions
2380                .get(account_id)
2381                .cloned()
2382                .unwrap_or_default();
2383
2384            if let Some(existing_query) = query {
2385                query = Some(
2386                    existing_query
2387                        .intersection(&account_positions)
2388                        .copied()
2389                        .collect(),
2390                );
2391            } else {
2392                query = Some(account_positions);
2393            }
2394        }
2395
2396        query
2397    }
2398
2399    /// Retrieves orders corresponding to the `client_order_ids`, optionally filtering by `side`.
2400    ///
2401    /// # Panics
2402    ///
2403    /// Panics if any `client_order_id` in the set is not found in the cache.
2404    fn get_orders_for_ids(
2405        &self,
2406        client_order_ids: &AHashSet<ClientOrderId>,
2407        side: Option<OrderSide>,
2408    ) -> Vec<&OrderAny> {
2409        let side = side.unwrap_or(OrderSide::NoOrderSide);
2410        let mut orders = Vec::new();
2411
2412        for client_order_id in client_order_ids {
2413            let order = self
2414                .orders
2415                .get(client_order_id)
2416                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2417            if side == OrderSide::NoOrderSide || side == order.order_side() {
2418                orders.push(order);
2419            }
2420        }
2421
2422        orders
2423    }
2424
2425    /// Retrieves positions corresponding to the `position_ids`, optionally filtering by `side`.
2426    ///
2427    /// # Panics
2428    ///
2429    /// Panics if any `position_id` in the set is not found in the cache.
2430    fn get_positions_for_ids(
2431        &self,
2432        position_ids: &AHashSet<PositionId>,
2433        side: Option<PositionSide>,
2434    ) -> Vec<&Position> {
2435        let side = side.unwrap_or(PositionSide::NoPositionSide);
2436        let mut positions = Vec::new();
2437
2438        for position_id in position_ids {
2439            let position = self
2440                .positions
2441                .get(position_id)
2442                .unwrap_or_else(|| panic!("Position {position_id} not found"));
2443            if side == PositionSide::NoPositionSide || side == position.side {
2444                positions.push(position);
2445            }
2446        }
2447
2448        positions
2449    }
2450
2451    /// Returns the `ClientOrderId`s of all orders.
2452    #[must_use]
2453    pub fn client_order_ids(
2454        &self,
2455        venue: Option<&Venue>,
2456        instrument_id: Option<&InstrumentId>,
2457        strategy_id: Option<&StrategyId>,
2458        account_id: Option<&AccountId>,
2459    ) -> AHashSet<ClientOrderId> {
2460        let query =
2461            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2462        match query {
2463            Some(query) => self.index.orders.intersection(&query).copied().collect(),
2464            None => self.index.orders.clone(),
2465        }
2466    }
2467
2468    /// Returns the `ClientOrderId`s of all open orders.
2469    #[must_use]
2470    pub fn client_order_ids_open(
2471        &self,
2472        venue: Option<&Venue>,
2473        instrument_id: Option<&InstrumentId>,
2474        strategy_id: Option<&StrategyId>,
2475        account_id: Option<&AccountId>,
2476    ) -> AHashSet<ClientOrderId> {
2477        let query =
2478            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2479        match query {
2480            Some(query) => self
2481                .index
2482                .orders_open
2483                .intersection(&query)
2484                .copied()
2485                .collect(),
2486            None => self.index.orders_open.clone(),
2487        }
2488    }
2489
2490    /// Returns the `ClientOrderId`s of all closed orders.
2491    #[must_use]
2492    pub fn client_order_ids_closed(
2493        &self,
2494        venue: Option<&Venue>,
2495        instrument_id: Option<&InstrumentId>,
2496        strategy_id: Option<&StrategyId>,
2497        account_id: Option<&AccountId>,
2498    ) -> AHashSet<ClientOrderId> {
2499        let query =
2500            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2501        match query {
2502            Some(query) => self
2503                .index
2504                .orders_closed
2505                .intersection(&query)
2506                .copied()
2507                .collect(),
2508            None => self.index.orders_closed.clone(),
2509        }
2510    }
2511
2512    /// Returns the `ClientOrderId`s of all emulated orders.
2513    #[must_use]
2514    pub fn client_order_ids_emulated(
2515        &self,
2516        venue: Option<&Venue>,
2517        instrument_id: Option<&InstrumentId>,
2518        strategy_id: Option<&StrategyId>,
2519        account_id: Option<&AccountId>,
2520    ) -> AHashSet<ClientOrderId> {
2521        let query =
2522            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2523        match query {
2524            Some(query) => self
2525                .index
2526                .orders_emulated
2527                .intersection(&query)
2528                .copied()
2529                .collect(),
2530            None => self.index.orders_emulated.clone(),
2531        }
2532    }
2533
2534    /// Returns the `ClientOrderId`s of all in-flight orders.
2535    #[must_use]
2536    pub fn client_order_ids_inflight(
2537        &self,
2538        venue: Option<&Venue>,
2539        instrument_id: Option<&InstrumentId>,
2540        strategy_id: Option<&StrategyId>,
2541        account_id: Option<&AccountId>,
2542    ) -> AHashSet<ClientOrderId> {
2543        let query =
2544            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2545        match query {
2546            Some(query) => self
2547                .index
2548                .orders_inflight
2549                .intersection(&query)
2550                .copied()
2551                .collect(),
2552            None => self.index.orders_inflight.clone(),
2553        }
2554    }
2555
2556    /// Returns `PositionId`s of all positions.
2557    #[must_use]
2558    pub fn position_ids(
2559        &self,
2560        venue: Option<&Venue>,
2561        instrument_id: Option<&InstrumentId>,
2562        strategy_id: Option<&StrategyId>,
2563        account_id: Option<&AccountId>,
2564    ) -> AHashSet<PositionId> {
2565        let query =
2566            self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2567        match query {
2568            Some(query) => self.index.positions.intersection(&query).copied().collect(),
2569            None => self.index.positions.clone(),
2570        }
2571    }
2572
2573    /// Returns the `PositionId`s of all open positions.
2574    #[must_use]
2575    pub fn position_open_ids(
2576        &self,
2577        venue: Option<&Venue>,
2578        instrument_id: Option<&InstrumentId>,
2579        strategy_id: Option<&StrategyId>,
2580        account_id: Option<&AccountId>,
2581    ) -> AHashSet<PositionId> {
2582        let query =
2583            self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2584        match query {
2585            Some(query) => self
2586                .index
2587                .positions_open
2588                .intersection(&query)
2589                .copied()
2590                .collect(),
2591            None => self.index.positions_open.clone(),
2592        }
2593    }
2594
2595    /// Returns the `PositionId`s of all closed positions.
2596    #[must_use]
2597    pub fn position_closed_ids(
2598        &self,
2599        venue: Option<&Venue>,
2600        instrument_id: Option<&InstrumentId>,
2601        strategy_id: Option<&StrategyId>,
2602        account_id: Option<&AccountId>,
2603    ) -> AHashSet<PositionId> {
2604        let query =
2605            self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2606        match query {
2607            Some(query) => self
2608                .index
2609                .positions_closed
2610                .intersection(&query)
2611                .copied()
2612                .collect(),
2613            None => self.index.positions_closed.clone(),
2614        }
2615    }
2616
2617    /// Returns the `ComponentId`s of all actors.
2618    #[must_use]
2619    pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2620        self.index.actors.clone()
2621    }
2622
2623    /// Returns the `StrategyId`s of all strategies.
2624    #[must_use]
2625    pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2626        self.index.strategies.clone()
2627    }
2628
2629    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
2630    #[must_use]
2631    pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2632        self.index.exec_algorithms.clone()
2633    }
2634
2635    // -- ORDER QUERIES ---------------------------------------------------------------------------
2636
2637    /// Gets a reference to the order with the `client_order_id` (if found).
2638    #[must_use]
2639    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2640        self.orders.get(client_order_id)
2641    }
2642
2643    /// Gets cloned orders for the given `client_order_ids`, logging an error for any missing.
2644    #[must_use]
2645    pub fn orders_for_ids(
2646        &self,
2647        client_order_ids: &[ClientOrderId],
2648        context: &dyn Display,
2649    ) -> Vec<OrderAny> {
2650        let mut orders = Vec::with_capacity(client_order_ids.len());
2651        for id in client_order_ids {
2652            match self.orders.get(id) {
2653                Some(order) => orders.push(order.clone()),
2654                None => log::error!("Order {id} not found in cache for {context}"),
2655            }
2656        }
2657        orders
2658    }
2659
2660    /// Gets a reference to the order with the `client_order_id` (if found).
2661    #[must_use]
2662    pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2663        self.orders.get_mut(client_order_id)
2664    }
2665
2666    /// Gets a reference to the client order ID for the `venue_order_id` (if found).
2667    #[must_use]
2668    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2669        self.index.venue_order_ids.get(venue_order_id)
2670    }
2671
2672    /// Gets a reference to the venue order ID for the `client_order_id` (if found).
2673    #[must_use]
2674    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2675        self.index.client_order_ids.get(client_order_id)
2676    }
2677
2678    /// Gets a reference to the client ID indexed for then `client_order_id` (if found).
2679    #[must_use]
2680    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2681        self.index.order_client.get(client_order_id)
2682    }
2683
2684    /// Returns references to all orders matching the optional filter parameters.
2685    #[must_use]
2686    pub fn orders(
2687        &self,
2688        venue: Option<&Venue>,
2689        instrument_id: Option<&InstrumentId>,
2690        strategy_id: Option<&StrategyId>,
2691        account_id: Option<&AccountId>,
2692        side: Option<OrderSide>,
2693    ) -> Vec<&OrderAny> {
2694        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id, account_id);
2695        self.get_orders_for_ids(&client_order_ids, side)
2696    }
2697
2698    /// Returns references to all open orders matching the optional filter parameters.
2699    #[must_use]
2700    pub fn orders_open(
2701        &self,
2702        venue: Option<&Venue>,
2703        instrument_id: Option<&InstrumentId>,
2704        strategy_id: Option<&StrategyId>,
2705        account_id: Option<&AccountId>,
2706        side: Option<OrderSide>,
2707    ) -> Vec<&OrderAny> {
2708        let client_order_ids =
2709            self.client_order_ids_open(venue, instrument_id, strategy_id, account_id);
2710        self.get_orders_for_ids(&client_order_ids, side)
2711    }
2712
2713    /// Returns references to all closed orders matching the optional filter parameters.
2714    #[must_use]
2715    pub fn orders_closed(
2716        &self,
2717        venue: Option<&Venue>,
2718        instrument_id: Option<&InstrumentId>,
2719        strategy_id: Option<&StrategyId>,
2720        account_id: Option<&AccountId>,
2721        side: Option<OrderSide>,
2722    ) -> Vec<&OrderAny> {
2723        let client_order_ids =
2724            self.client_order_ids_closed(venue, instrument_id, strategy_id, account_id);
2725        self.get_orders_for_ids(&client_order_ids, side)
2726    }
2727
2728    /// Returns references to all emulated orders matching the optional filter parameters.
2729    #[must_use]
2730    pub fn orders_emulated(
2731        &self,
2732        venue: Option<&Venue>,
2733        instrument_id: Option<&InstrumentId>,
2734        strategy_id: Option<&StrategyId>,
2735        account_id: Option<&AccountId>,
2736        side: Option<OrderSide>,
2737    ) -> Vec<&OrderAny> {
2738        let client_order_ids =
2739            self.client_order_ids_emulated(venue, instrument_id, strategy_id, account_id);
2740        self.get_orders_for_ids(&client_order_ids, side)
2741    }
2742
2743    /// Returns references to all in-flight orders matching the optional filter parameters.
2744    #[must_use]
2745    pub fn orders_inflight(
2746        &self,
2747        venue: Option<&Venue>,
2748        instrument_id: Option<&InstrumentId>,
2749        strategy_id: Option<&StrategyId>,
2750        account_id: Option<&AccountId>,
2751        side: Option<OrderSide>,
2752    ) -> Vec<&OrderAny> {
2753        let client_order_ids =
2754            self.client_order_ids_inflight(venue, instrument_id, strategy_id, account_id);
2755        self.get_orders_for_ids(&client_order_ids, side)
2756    }
2757
2758    /// Returns references to all orders for the `position_id`.
2759    #[must_use]
2760    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2761        let client_order_ids = self.index.position_orders.get(position_id);
2762        match client_order_ids {
2763            Some(client_order_ids) => {
2764                self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2765            }
2766            None => Vec::new(),
2767        }
2768    }
2769
2770    /// Returns whether an order with the `client_order_id` exists.
2771    #[must_use]
2772    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2773        self.index.orders.contains(client_order_id)
2774    }
2775
2776    /// Returns whether an order with the `client_order_id` is open.
2777    #[must_use]
2778    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2779        self.index.orders_open.contains(client_order_id)
2780    }
2781
2782    /// Returns whether an order with the `client_order_id` is closed.
2783    #[must_use]
2784    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2785        self.index.orders_closed.contains(client_order_id)
2786    }
2787
2788    /// Returns whether an order with the `client_order_id` is emulated.
2789    #[must_use]
2790    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2791        self.index.orders_emulated.contains(client_order_id)
2792    }
2793
2794    /// Returns whether an order with the `client_order_id` is in-flight.
2795    #[must_use]
2796    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2797        self.index.orders_inflight.contains(client_order_id)
2798    }
2799
2800    /// Returns whether an order with the `client_order_id` is `PENDING_CANCEL` locally.
2801    #[must_use]
2802    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2803        self.index.orders_pending_cancel.contains(client_order_id)
2804    }
2805
2806    /// Returns the count of all open orders.
2807    #[must_use]
2808    pub fn orders_open_count(
2809        &self,
2810        venue: Option<&Venue>,
2811        instrument_id: Option<&InstrumentId>,
2812        strategy_id: Option<&StrategyId>,
2813        account_id: Option<&AccountId>,
2814        side: Option<OrderSide>,
2815    ) -> usize {
2816        self.orders_open(venue, instrument_id, strategy_id, account_id, side)
2817            .len()
2818    }
2819
2820    /// Returns the count of all closed orders.
2821    #[must_use]
2822    pub fn orders_closed_count(
2823        &self,
2824        venue: Option<&Venue>,
2825        instrument_id: Option<&InstrumentId>,
2826        strategy_id: Option<&StrategyId>,
2827        account_id: Option<&AccountId>,
2828        side: Option<OrderSide>,
2829    ) -> usize {
2830        self.orders_closed(venue, instrument_id, strategy_id, account_id, side)
2831            .len()
2832    }
2833
2834    /// Returns the count of all emulated orders.
2835    #[must_use]
2836    pub fn orders_emulated_count(
2837        &self,
2838        venue: Option<&Venue>,
2839        instrument_id: Option<&InstrumentId>,
2840        strategy_id: Option<&StrategyId>,
2841        account_id: Option<&AccountId>,
2842        side: Option<OrderSide>,
2843    ) -> usize {
2844        self.orders_emulated(venue, instrument_id, strategy_id, account_id, side)
2845            .len()
2846    }
2847
2848    /// Returns the count of all in-flight orders.
2849    #[must_use]
2850    pub fn orders_inflight_count(
2851        &self,
2852        venue: Option<&Venue>,
2853        instrument_id: Option<&InstrumentId>,
2854        strategy_id: Option<&StrategyId>,
2855        account_id: Option<&AccountId>,
2856        side: Option<OrderSide>,
2857    ) -> usize {
2858        self.orders_inflight(venue, instrument_id, strategy_id, account_id, side)
2859            .len()
2860    }
2861
2862    /// Returns the count of all orders.
2863    #[must_use]
2864    pub fn orders_total_count(
2865        &self,
2866        venue: Option<&Venue>,
2867        instrument_id: Option<&InstrumentId>,
2868        strategy_id: Option<&StrategyId>,
2869        account_id: Option<&AccountId>,
2870        side: Option<OrderSide>,
2871    ) -> usize {
2872        self.orders(venue, instrument_id, strategy_id, account_id, side)
2873            .len()
2874    }
2875
2876    /// Returns the order list for the `order_list_id`.
2877    #[must_use]
2878    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2879        self.order_lists.get(order_list_id)
2880    }
2881
2882    /// Returns all order lists matching the optional filter parameters.
2883    #[must_use]
2884    pub fn order_lists(
2885        &self,
2886        venue: Option<&Venue>,
2887        instrument_id: Option<&InstrumentId>,
2888        strategy_id: Option<&StrategyId>,
2889        account_id: Option<&AccountId>,
2890    ) -> Vec<&OrderList> {
2891        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2892
2893        if let Some(venue) = venue {
2894            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2895        }
2896
2897        if let Some(instrument_id) = instrument_id {
2898            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2899        }
2900
2901        if let Some(strategy_id) = strategy_id {
2902            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2903        }
2904
2905        if let Some(account_id) = account_id {
2906            order_lists.retain(|ol| {
2907                ol.client_order_ids.iter().any(|client_order_id| {
2908                    self.orders
2909                        .get(client_order_id)
2910                        .is_some_and(|order| order.account_id().as_ref() == Some(account_id))
2911                })
2912            });
2913        }
2914
2915        order_lists
2916    }
2917
2918    /// Returns whether an order list with the `order_list_id` exists.
2919    #[must_use]
2920    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2921        self.order_lists.contains_key(order_list_id)
2922    }
2923
2924    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
2925
2926    /// Returns references to all orders associated with the `exec_algorithm_id` matching the
2927    /// optional filter parameters.
2928    #[must_use]
2929    pub fn orders_for_exec_algorithm(
2930        &self,
2931        exec_algorithm_id: &ExecAlgorithmId,
2932        venue: Option<&Venue>,
2933        instrument_id: Option<&InstrumentId>,
2934        strategy_id: Option<&StrategyId>,
2935        account_id: Option<&AccountId>,
2936        side: Option<OrderSide>,
2937    ) -> Vec<&OrderAny> {
2938        let query =
2939            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2940        let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2941
2942        if let Some(query) = query
2943            && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
2944        {
2945            let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2946        }
2947
2948        if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2949            self.get_orders_for_ids(exec_algorithm_order_ids, side)
2950        } else {
2951            Vec::new()
2952        }
2953    }
2954
2955    /// Returns references to all orders with the `exec_spawn_id`.
2956    #[must_use]
2957    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2958        self.get_orders_for_ids(
2959            self.index
2960                .exec_spawn_orders
2961                .get(exec_spawn_id)
2962                .unwrap_or(&AHashSet::new()),
2963            None,
2964        )
2965    }
2966
2967    /// Returns the total order quantity for the `exec_spawn_id`.
2968    #[must_use]
2969    pub fn exec_spawn_total_quantity(
2970        &self,
2971        exec_spawn_id: &ClientOrderId,
2972        active_only: bool,
2973    ) -> Option<Quantity> {
2974        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2975
2976        let mut total_quantity: Option<Quantity> = None;
2977
2978        for spawn_order in exec_spawn_orders {
2979            if active_only && spawn_order.is_closed() {
2980                continue;
2981            }
2982
2983            match total_quantity.as_mut() {
2984                Some(total) => *total = *total + spawn_order.quantity(),
2985                None => total_quantity = Some(spawn_order.quantity()),
2986            }
2987        }
2988
2989        total_quantity
2990    }
2991
2992    /// Returns the total filled quantity for all orders with the `exec_spawn_id`.
2993    #[must_use]
2994    pub fn exec_spawn_total_filled_qty(
2995        &self,
2996        exec_spawn_id: &ClientOrderId,
2997        active_only: bool,
2998    ) -> Option<Quantity> {
2999        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3000
3001        let mut total_quantity: Option<Quantity> = None;
3002
3003        for spawn_order in exec_spawn_orders {
3004            if active_only && spawn_order.is_closed() {
3005                continue;
3006            }
3007
3008            match total_quantity.as_mut() {
3009                Some(total) => *total = *total + spawn_order.filled_qty(),
3010                None => total_quantity = Some(spawn_order.filled_qty()),
3011            }
3012        }
3013
3014        total_quantity
3015    }
3016
3017    /// Returns the total leaves quantity for all orders with the `exec_spawn_id`.
3018    #[must_use]
3019    pub fn exec_spawn_total_leaves_qty(
3020        &self,
3021        exec_spawn_id: &ClientOrderId,
3022        active_only: bool,
3023    ) -> Option<Quantity> {
3024        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3025
3026        let mut total_quantity: Option<Quantity> = None;
3027
3028        for spawn_order in exec_spawn_orders {
3029            if active_only && spawn_order.is_closed() {
3030                continue;
3031            }
3032
3033            match total_quantity.as_mut() {
3034                Some(total) => *total = *total + spawn_order.leaves_qty(),
3035                None => total_quantity = Some(spawn_order.leaves_qty()),
3036            }
3037        }
3038
3039        total_quantity
3040    }
3041
3042    // -- POSITION QUERIES ------------------------------------------------------------------------
3043
3044    /// Returns a reference to the position with the `position_id` (if found).
3045    #[must_use]
3046    pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
3047        self.positions.get(position_id)
3048    }
3049
3050    /// Returns a reference to the position for the `client_order_id` (if found).
3051    #[must_use]
3052    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
3053        self.index
3054            .order_position
3055            .get(client_order_id)
3056            .and_then(|position_id| self.positions.get(position_id))
3057    }
3058
3059    /// Returns a reference to the position ID for the `client_order_id` (if found).
3060    #[must_use]
3061    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
3062        self.index.order_position.get(client_order_id)
3063    }
3064
3065    /// Returns a reference to all positions matching the optional filter parameters.
3066    #[must_use]
3067    pub fn positions(
3068        &self,
3069        venue: Option<&Venue>,
3070        instrument_id: Option<&InstrumentId>,
3071        strategy_id: Option<&StrategyId>,
3072        account_id: Option<&AccountId>,
3073        side: Option<PositionSide>,
3074    ) -> Vec<&Position> {
3075        let position_ids = self.position_ids(venue, instrument_id, strategy_id, account_id);
3076        self.get_positions_for_ids(&position_ids, side)
3077    }
3078
3079    /// Returns a reference to all open positions matching the optional filter parameters.
3080    #[must_use]
3081    pub fn positions_open(
3082        &self,
3083        venue: Option<&Venue>,
3084        instrument_id: Option<&InstrumentId>,
3085        strategy_id: Option<&StrategyId>,
3086        account_id: Option<&AccountId>,
3087        side: Option<PositionSide>,
3088    ) -> Vec<&Position> {
3089        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id, account_id);
3090        self.get_positions_for_ids(&position_ids, side)
3091    }
3092
3093    /// Returns a reference to all closed positions matching the optional filter parameters.
3094    #[must_use]
3095    pub fn positions_closed(
3096        &self,
3097        venue: Option<&Venue>,
3098        instrument_id: Option<&InstrumentId>,
3099        strategy_id: Option<&StrategyId>,
3100        account_id: Option<&AccountId>,
3101        side: Option<PositionSide>,
3102    ) -> Vec<&Position> {
3103        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id, account_id);
3104        self.get_positions_for_ids(&position_ids, side)
3105    }
3106
3107    /// Returns whether a position with the `position_id` exists.
3108    #[must_use]
3109    pub fn position_exists(&self, position_id: &PositionId) -> bool {
3110        self.index.positions.contains(position_id)
3111    }
3112
3113    /// Returns whether a position with the `position_id` is open.
3114    #[must_use]
3115    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
3116        self.index.positions_open.contains(position_id)
3117    }
3118
3119    /// Returns whether a position with the `position_id` is closed.
3120    #[must_use]
3121    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
3122        self.index.positions_closed.contains(position_id)
3123    }
3124
3125    /// Returns the count of all open positions.
3126    #[must_use]
3127    pub fn positions_open_count(
3128        &self,
3129        venue: Option<&Venue>,
3130        instrument_id: Option<&InstrumentId>,
3131        strategy_id: Option<&StrategyId>,
3132        account_id: Option<&AccountId>,
3133        side: Option<PositionSide>,
3134    ) -> usize {
3135        self.positions_open(venue, instrument_id, strategy_id, account_id, side)
3136            .len()
3137    }
3138
3139    /// Returns the count of all closed positions.
3140    #[must_use]
3141    pub fn positions_closed_count(
3142        &self,
3143        venue: Option<&Venue>,
3144        instrument_id: Option<&InstrumentId>,
3145        strategy_id: Option<&StrategyId>,
3146        account_id: Option<&AccountId>,
3147        side: Option<PositionSide>,
3148    ) -> usize {
3149        self.positions_closed(venue, instrument_id, strategy_id, account_id, side)
3150            .len()
3151    }
3152
3153    /// Returns the count of all positions.
3154    #[must_use]
3155    pub fn positions_total_count(
3156        &self,
3157        venue: Option<&Venue>,
3158        instrument_id: Option<&InstrumentId>,
3159        strategy_id: Option<&StrategyId>,
3160        account_id: Option<&AccountId>,
3161        side: Option<PositionSide>,
3162    ) -> usize {
3163        self.positions(venue, instrument_id, strategy_id, account_id, side)
3164            .len()
3165    }
3166
3167    // -- STRATEGY QUERIES ------------------------------------------------------------------------
3168
3169    /// Gets a reference to the strategy ID for the `client_order_id` (if found).
3170    #[must_use]
3171    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
3172        self.index.order_strategy.get(client_order_id)
3173    }
3174
3175    /// Gets a reference to the strategy ID for the `position_id` (if found).
3176    #[must_use]
3177    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
3178        self.index.position_strategy.get(position_id)
3179    }
3180
3181    // -- GENERAL ---------------------------------------------------------------------------------
3182
3183    /// Gets a reference to the general value for the `key` (if found).
3184    ///
3185    /// # Errors
3186    ///
3187    /// Returns an error if the `key` is invalid.
3188    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
3189        check_valid_string_ascii(key, stringify!(key))?;
3190
3191        Ok(self.general.get(key))
3192    }
3193
3194    // -- DATA QUERIES ----------------------------------------------------------------------------
3195
3196    /// Returns the price for the `instrument_id` and `price_type` (if found).
3197    #[must_use]
3198    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
3199        match price_type {
3200            PriceType::Bid => self
3201                .quotes
3202                .get(instrument_id)
3203                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
3204            PriceType::Ask => self
3205                .quotes
3206                .get(instrument_id)
3207                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
3208            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
3209                quotes.front().map(|quote| {
3210                    Price::new(
3211                        f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
3212                        quote.bid_price.precision + 1,
3213                    )
3214                })
3215            }),
3216            PriceType::Last => self
3217                .trades
3218                .get(instrument_id)
3219                .and_then(|trades| trades.front().map(|trade| trade.price)),
3220            PriceType::Mark => self
3221                .mark_prices
3222                .get(instrument_id)
3223                .and_then(|marks| marks.front().map(|mark| mark.value)),
3224        }
3225    }
3226
3227    /// Gets all quotes for the `instrument_id`.
3228    #[must_use]
3229    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
3230        self.quotes
3231            .get(instrument_id)
3232            .map(|quotes| quotes.iter().copied().collect())
3233    }
3234
3235    /// Gets all trades for the `instrument_id`.
3236    #[must_use]
3237    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
3238        self.trades
3239            .get(instrument_id)
3240            .map(|trades| trades.iter().copied().collect())
3241    }
3242
3243    /// Gets all mark price updates for the `instrument_id`.
3244    #[must_use]
3245    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
3246        self.mark_prices
3247            .get(instrument_id)
3248            .map(|mark_prices| mark_prices.iter().copied().collect())
3249    }
3250
3251    /// Gets all index price updates for the `instrument_id`.
3252    #[must_use]
3253    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
3254        self.index_prices
3255            .get(instrument_id)
3256            .map(|index_prices| index_prices.iter().copied().collect())
3257    }
3258
3259    /// Gets all funding rate updates for the `instrument_id`.
3260    #[must_use]
3261    pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
3262        self.funding_rates
3263            .get(instrument_id)
3264            .map(|funding_rates| funding_rates.iter().copied().collect())
3265    }
3266
3267    /// Gets all bars for the `bar_type`.
3268    #[must_use]
3269    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
3270        self.bars
3271            .get(bar_type)
3272            .map(|bars| bars.iter().copied().collect())
3273    }
3274
3275    /// Gets a reference to the order book for the `instrument_id`.
3276    #[must_use]
3277    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3278        self.books.get(instrument_id)
3279    }
3280
3281    /// Gets a reference to the order book for the `instrument_id`.
3282    #[must_use]
3283    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3284        self.books.get_mut(instrument_id)
3285    }
3286
3287    /// Gets a reference to the own order book for the `instrument_id`.
3288    #[must_use]
3289    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3290        self.own_books.get(instrument_id)
3291    }
3292
3293    /// Gets a reference to the own order book for the `instrument_id`.
3294    #[must_use]
3295    pub fn own_order_book_mut(
3296        &mut self,
3297        instrument_id: &InstrumentId,
3298    ) -> Option<&mut OwnOrderBook> {
3299        self.own_books.get_mut(instrument_id)
3300    }
3301
3302    /// Gets a reference to the latest quote for the `instrument_id`.
3303    #[must_use]
3304    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3305        self.quotes
3306            .get(instrument_id)
3307            .and_then(|quotes| quotes.front())
3308    }
3309
3310    /// Gets a reference to the latest trade for the `instrument_id`.
3311    #[must_use]
3312    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3313        self.trades
3314            .get(instrument_id)
3315            .and_then(|trades| trades.front())
3316    }
3317
3318    /// Gets a reference to the latest mark price update for the `instrument_id`.
3319    #[must_use]
3320    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3321        self.mark_prices
3322            .get(instrument_id)
3323            .and_then(|mark_prices| mark_prices.front())
3324    }
3325
3326    /// Gets a reference to the latest index price update for the `instrument_id`.
3327    #[must_use]
3328    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3329        self.index_prices
3330            .get(instrument_id)
3331            .and_then(|index_prices| index_prices.front())
3332    }
3333
3334    /// Gets a reference to the latest funding rate update for the `instrument_id`.
3335    #[must_use]
3336    pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3337        self.funding_rates
3338            .get(instrument_id)
3339            .and_then(|funding_rates| funding_rates.front())
3340    }
3341
3342    /// Gets a reference to the latest bar for the `bar_type`.
3343    #[must_use]
3344    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3345        self.bars.get(bar_type).and_then(|bars| bars.front())
3346    }
3347
3348    /// Gets the order book update count for the `instrument_id`.
3349    #[must_use]
3350    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3351        self.books
3352            .get(instrument_id)
3353            .map_or(0, |book| book.update_count) as usize
3354    }
3355
3356    /// Gets the quote tick count for the `instrument_id`.
3357    #[must_use]
3358    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3359        self.quotes
3360            .get(instrument_id)
3361            .map_or(0, std::collections::VecDeque::len)
3362    }
3363
3364    /// Gets the trade tick count for the `instrument_id`.
3365    #[must_use]
3366    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3367        self.trades
3368            .get(instrument_id)
3369            .map_or(0, std::collections::VecDeque::len)
3370    }
3371
3372    /// Gets the bar count for the `instrument_id`.
3373    #[must_use]
3374    pub fn bar_count(&self, bar_type: &BarType) -> usize {
3375        self.bars
3376            .get(bar_type)
3377            .map_or(0, std::collections::VecDeque::len)
3378    }
3379
3380    /// Returns whether the cache contains an order book for the `instrument_id`.
3381    #[must_use]
3382    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3383        self.books.contains_key(instrument_id)
3384    }
3385
3386    /// Returns whether the cache contains quotes for the `instrument_id`.
3387    #[must_use]
3388    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3389        self.quote_count(instrument_id) > 0
3390    }
3391
3392    /// Returns whether the cache contains trades for the `instrument_id`.
3393    #[must_use]
3394    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3395        self.trade_count(instrument_id) > 0
3396    }
3397
3398    /// Returns whether the cache contains bars for the `bar_type`.
3399    #[must_use]
3400    pub fn has_bars(&self, bar_type: &BarType) -> bool {
3401        self.bar_count(bar_type) > 0
3402    }
3403
3404    #[must_use]
3405    pub fn get_xrate(
3406        &self,
3407        venue: Venue,
3408        from_currency: Currency,
3409        to_currency: Currency,
3410        price_type: PriceType,
3411    ) -> Option<f64> {
3412        if from_currency == to_currency {
3413            // When the source and target currencies are identical,
3414            // no conversion is needed; return an exchange rate of 1.0.
3415            return Some(1.0);
3416        }
3417
3418        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3419
3420        match get_exchange_rate(
3421            from_currency.code,
3422            to_currency.code,
3423            price_type,
3424            bid_quote,
3425            ask_quote,
3426        ) {
3427            Ok(rate) => rate,
3428            Err(e) => {
3429                log::error!("Failed to calculate xrate: {e}");
3430                None
3431            }
3432        }
3433    }
3434
3435    fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3436        let mut bid_quotes = AHashMap::new();
3437        let mut ask_quotes = AHashMap::new();
3438
3439        for instrument_id in self.instruments.keys() {
3440            if instrument_id.venue != *venue {
3441                continue;
3442            }
3443
3444            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3445                if let Some(tick) = ticks.front() {
3446                    (tick.bid_price, tick.ask_price)
3447                } else {
3448                    continue; // Empty ticks vector
3449                }
3450            } else {
3451                let bid_bar = self
3452                    .bars
3453                    .iter()
3454                    .find(|(k, _)| {
3455                        k.instrument_id() == *instrument_id
3456                            && matches!(k.spec().price_type, PriceType::Bid)
3457                    })
3458                    .map(|(_, v)| v);
3459
3460                let ask_bar = self
3461                    .bars
3462                    .iter()
3463                    .find(|(k, _)| {
3464                        k.instrument_id() == *instrument_id
3465                            && matches!(k.spec().price_type, PriceType::Ask)
3466                    })
3467                    .map(|(_, v)| v);
3468
3469                match (bid_bar, ask_bar) {
3470                    (Some(bid), Some(ask)) => {
3471                        match (bid.front(), ask.front()) {
3472                            (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
3473                            _ => {
3474                                // Empty bar VecDeques
3475                                continue;
3476                            }
3477                        }
3478                    }
3479                    _ => continue,
3480                }
3481            };
3482
3483            bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3484            ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3485        }
3486
3487        (bid_quotes, ask_quotes)
3488    }
3489
3490    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
3491    #[must_use]
3492    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3493        self.mark_xrates.get(&(from_currency, to_currency)).copied()
3494    }
3495
3496    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
3497    ///
3498    /// # Panics
3499    ///
3500    /// Panics if `xrate` is not positive.
3501    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3502        assert!(xrate > 0.0, "xrate was zero");
3503        self.mark_xrates.insert((from_currency, to_currency), xrate);
3504        self.mark_xrates
3505            .insert((to_currency, from_currency), 1.0 / xrate);
3506    }
3507
3508    /// Clears the mark exchange rate for the given currency pair.
3509    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3510        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3511    }
3512
3513    /// Clears all mark exchange rates.
3514    pub fn clear_mark_xrates(&mut self) {
3515        self.mark_xrates.clear();
3516    }
3517
3518    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
3519
3520    /// Returns a reference to the instrument for the `instrument_id` (if found).
3521    #[must_use]
3522    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3523        self.instruments.get(instrument_id)
3524    }
3525
3526    /// Returns references to all instrument IDs for the `venue`.
3527    #[must_use]
3528    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3529        match venue {
3530            Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3531            None => self.instruments.keys().collect(),
3532        }
3533    }
3534
3535    /// Returns references to all instruments for the `venue`.
3536    #[must_use]
3537    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3538        self.instruments
3539            .values()
3540            .filter(|i| &i.id().venue == venue)
3541            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3542            .collect()
3543    }
3544
3545    /// Returns references to all bar types contained in the cache.
3546    #[must_use]
3547    pub fn bar_types(
3548        &self,
3549        instrument_id: Option<&InstrumentId>,
3550        price_type: Option<&PriceType>,
3551        aggregation_source: AggregationSource,
3552    ) -> Vec<&BarType> {
3553        let mut bar_types = self
3554            .bars
3555            .keys()
3556            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3557            .collect::<Vec<&BarType>>();
3558
3559        if let Some(instrument_id) = instrument_id {
3560            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3561        }
3562
3563        if let Some(price_type) = price_type {
3564            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3565        }
3566
3567        bar_types
3568    }
3569
3570    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
3571
3572    /// Returns a reference to the synthetic instrument for the `instrument_id` (if found).
3573    #[must_use]
3574    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3575        self.synthetics.get(instrument_id)
3576    }
3577
3578    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
3579    #[must_use]
3580    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3581        self.synthetics.keys().collect()
3582    }
3583
3584    /// Returns references to all synthetic instruments contained in the cache.
3585    #[must_use]
3586    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3587        self.synthetics.values().collect()
3588    }
3589
3590    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
3591
3592    /// Returns a reference to the account for the `account_id` (if found).
3593    #[must_use]
3594    pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3595        self.accounts.get(account_id)
3596    }
3597
3598    /// Returns a reference to the account for the `venue` (if found).
3599    #[must_use]
3600    pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3601        self.index
3602            .venue_account
3603            .get(venue)
3604            .and_then(|account_id| self.accounts.get(account_id))
3605    }
3606
3607    /// Returns a reference to the account ID for the `venue` (if found).
3608    #[must_use]
3609    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3610        self.index.venue_account.get(venue)
3611    }
3612
3613    /// Returns references to all accounts for the `account_id`.
3614    #[must_use]
3615    pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3616        self.accounts
3617            .values()
3618            .filter(|account| &account.id() == account_id)
3619            .collect()
3620    }
3621
3622    /// Updates the own order book with an order.
3623    ///
3624    /// This method adds, updates, or removes an order from the own order book
3625    /// based on the order's current state.
3626    ///
3627    /// Orders without prices (MARKET, etc.) are skipped as they cannot be
3628    /// represented in own books.
3629    pub fn update_own_order_book(&mut self, order: &OrderAny) {
3630        if !order.has_price() {
3631            return;
3632        }
3633
3634        let instrument_id = order.instrument_id();
3635
3636        let own_book = self
3637            .own_books
3638            .entry(instrument_id)
3639            .or_insert_with(|| OwnOrderBook::new(instrument_id));
3640
3641        let own_book_order = order.to_own_book_order();
3642
3643        if order.is_closed() {
3644            if let Err(e) = own_book.delete(own_book_order) {
3645                log::debug!(
3646                    "Failed to delete order {} from own book: {e}",
3647                    order.client_order_id(),
3648                );
3649            } else {
3650                log::debug!("Deleted order {} from own book", order.client_order_id());
3651            }
3652        } else {
3653            // Add or update the order in the own book
3654            if let Err(e) = own_book.update(own_book_order) {
3655                log::debug!(
3656                    "Failed to update order {} in own book: {e}; inserting instead",
3657                    order.client_order_id(),
3658                );
3659                own_book.add(own_book_order);
3660            }
3661            log::debug!("Updated order {} in own book", order.client_order_id());
3662        }
3663    }
3664
3665    /// Force removal of an order from own order books and clean up all indexes.
3666    ///
3667    /// This method is used when order event application fails and we need to ensure
3668    /// terminal orders are properly cleaned up from own books and all relevant indexes.
3669    /// Replicates the index cleanup that update_order performs for closed orders.
3670    pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
3671        let order = match self.orders.get(client_order_id) {
3672            Some(order) => order,
3673            None => return,
3674        };
3675
3676        self.index.orders_open.remove(client_order_id);
3677        self.index.orders_pending_cancel.remove(client_order_id);
3678        self.index.orders_inflight.remove(client_order_id);
3679        self.index.orders_emulated.remove(client_order_id);
3680
3681        if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
3682            && order.has_price()
3683        {
3684            let own_book_order = order.to_own_book_order();
3685            if let Err(e) = own_book.delete(own_book_order) {
3686                log::debug!("Could not force delete {client_order_id} from own book: {e}");
3687            } else {
3688                log::debug!("Force deleted {client_order_id} from own book");
3689            }
3690        }
3691
3692        self.index.orders_closed.insert(*client_order_id);
3693    }
3694
3695    /// Audit all own order books against open and inflight order indexes.
3696    ///
3697    /// Ensures closed orders are removed from own order books. This includes both
3698    /// orders tracked in `orders_open` (ACCEPTED, TRIGGERED, PENDING_*, PARTIALLY_FILLED)
3699    /// and `orders_inflight` (INITIALIZED, SUBMITTED) to prevent false positives
3700    /// during venue latency windows.
3701    pub fn audit_own_order_books(&mut self) {
3702        log::debug!("Starting own books audit");
3703        let start = std::time::Instant::now();
3704
3705        // Build union of open and inflight orders for audit,
3706        // this prevents false positives for SUBMITTED orders during venue latency.
3707        let valid_order_ids: AHashSet<ClientOrderId> = self
3708            .index
3709            .orders_open
3710            .union(&self.index.orders_inflight)
3711            .copied()
3712            .collect();
3713
3714        for own_book in self.own_books.values_mut() {
3715            own_book.audit_open_orders(&valid_order_ids);
3716        }
3717
3718        log::debug!("Completed own books audit in {:?}", start.elapsed());
3719    }
3720}