nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! In-memory cache for market and execution data, with optional persistent backing.
17//!
18//! Provides methods to load, query, and update cached data such as instruments, orders, and prices.
19
20pub mod config;
21pub mod database;
22pub mod quote;
23
24mod index;
25
26#[cfg(test)]
27mod tests;
28
29use std::{
30    collections::VecDeque,
31    fmt::Debug,
32    time::{SystemTime, UNIX_EPOCH},
33};
34
35use ahash::{AHashMap, AHashSet};
36use bytes::Bytes;
37pub use config::CacheConfig; // Re-export
38use database::{CacheDatabaseAdapter, CacheMap};
39use index::CacheIndex;
40use nautilus_core::{
41    UUID4, UnixNanos,
42    correctness::{
43        check_key_not_in_map, check_predicate_false, check_slice_not_empty,
44        check_valid_string_ascii,
45    },
46    datetime::secs_to_nanos_unchecked,
47};
48use nautilus_model::{
49    accounts::{Account, AccountAny},
50    data::{
51        Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
52        TradeTick, YieldCurveData,
53    },
54    enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
55    identifiers::{
56        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
57        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
58    },
59    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
60    orderbook::{
61        OrderBook,
62        own::{OwnOrderBook, should_handle_own_book_order},
63    },
64    orders::{Order, OrderAny, OrderList},
65    position::Position,
66    types::{Currency, Money, Price, Quantity},
67};
68use ustr::Ustr;
69
70use crate::xrate::get_exchange_rate;
71
72/// A common in-memory `Cache` for market and execution related data.
73#[cfg_attr(
74    feature = "python",
75    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
76)]
77pub struct Cache {
78    config: CacheConfig,
79    index: CacheIndex,
80    database: Option<Box<dyn CacheDatabaseAdapter>>,
81    general: AHashMap<String, Bytes>,
82    currencies: AHashMap<Ustr, Currency>,
83    instruments: AHashMap<InstrumentId, InstrumentAny>,
84    synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
85    books: AHashMap<InstrumentId, OrderBook>,
86    own_books: AHashMap<InstrumentId, OwnOrderBook>,
87    quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
88    trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
89    mark_xrates: AHashMap<(Currency, Currency), f64>,
90    mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
91    index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
92    funding_rates: AHashMap<InstrumentId, FundingRateUpdate>,
93    bars: AHashMap<BarType, VecDeque<Bar>>,
94    greeks: AHashMap<InstrumentId, GreeksData>,
95    yield_curves: AHashMap<String, YieldCurveData>,
96    accounts: AHashMap<AccountId, AccountAny>,
97    orders: AHashMap<ClientOrderId, OrderAny>,
98    order_lists: AHashMap<OrderListId, OrderList>,
99    positions: AHashMap<PositionId, Position>,
100    position_snapshots: AHashMap<PositionId, Bytes>,
101    #[cfg(feature = "defi")]
102    pub(crate) defi: crate::defi::cache::DefiCache,
103}
104
105impl Debug for Cache {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        f.debug_struct(stringify!(Cache))
108            .field("config", &self.config)
109            .field("index", &self.index)
110            .field("general", &self.general)
111            .field("currencies", &self.currencies)
112            .field("instruments", &self.instruments)
113            .field("synthetics", &self.synthetics)
114            .field("books", &self.books)
115            .field("own_books", &self.own_books)
116            .field("quotes", &self.quotes)
117            .field("trades", &self.trades)
118            .field("mark_xrates", &self.mark_xrates)
119            .field("mark_prices", &self.mark_prices)
120            .field("index_prices", &self.index_prices)
121            .field("funding_rates", &self.funding_rates)
122            .field("bars", &self.bars)
123            .field("greeks", &self.greeks)
124            .field("yield_curves", &self.yield_curves)
125            .field("accounts", &self.accounts)
126            .field("orders", &self.orders)
127            .field("order_lists", &self.order_lists)
128            .field("positions", &self.positions)
129            .field("position_snapshots", &self.position_snapshots)
130            .finish()
131    }
132}
133
134impl Default for Cache {
135    /// Creates a new default [`Cache`] instance.
136    fn default() -> Self {
137        Self::new(Some(CacheConfig::default()), None)
138    }
139}
140
141impl Cache {
142    /// Creates a new [`Cache`] instance with optional configuration and database adapter.
143    #[must_use]
144    /// # Note
145    ///
146    /// Uses provided `CacheConfig` or defaults, and optional `CacheDatabaseAdapter` for persistence.
147    pub fn new(
148        config: Option<CacheConfig>,
149        database: Option<Box<dyn CacheDatabaseAdapter>>,
150    ) -> Self {
151        Self {
152            config: config.unwrap_or_default(),
153            index: CacheIndex::default(),
154            database,
155            general: AHashMap::new(),
156            currencies: AHashMap::new(),
157            instruments: AHashMap::new(),
158            synthetics: AHashMap::new(),
159            books: AHashMap::new(),
160            own_books: AHashMap::new(),
161            quotes: AHashMap::new(),
162            trades: AHashMap::new(),
163            mark_xrates: AHashMap::new(),
164            mark_prices: AHashMap::new(),
165            index_prices: AHashMap::new(),
166            funding_rates: AHashMap::new(),
167            bars: AHashMap::new(),
168            greeks: AHashMap::new(),
169            yield_curves: AHashMap::new(),
170            accounts: AHashMap::new(),
171            orders: AHashMap::new(),
172            order_lists: AHashMap::new(),
173            positions: AHashMap::new(),
174            position_snapshots: AHashMap::new(),
175            #[cfg(feature = "defi")]
176            defi: crate::defi::cache::DefiCache::default(),
177        }
178    }
179
180    /// Returns the cache instances memory address.
181    #[must_use]
182    pub fn memory_address(&self) -> String {
183        format!("{:?}", std::ptr::from_ref(self))
184    }
185
186    // -- COMMANDS --------------------------------------------------------------------------------
187
188    /// Clears and reloads general entries from the database into the cache.
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if loading general cache data fails.
193    pub fn cache_general(&mut self) -> anyhow::Result<()> {
194        self.general = match &mut self.database {
195            Some(db) => db.load()?,
196            None => AHashMap::new(),
197        };
198
199        log::info!(
200            "Cached {} general object(s) from database",
201            self.general.len()
202        );
203        Ok(())
204    }
205
206    /// Loads all core caches (currencies, instruments, accounts, orders, positions) from the database.
207    ///
208    /// # Errors
209    ///
210    /// Returns an error if loading all cache data fails.
211    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
212        let cache_map = match &self.database {
213            Some(db) => db.load_all().await?,
214            None => CacheMap::default(),
215        };
216
217        self.currencies = cache_map.currencies;
218        self.instruments = cache_map.instruments;
219        self.synthetics = cache_map.synthetics;
220        self.accounts = cache_map.accounts;
221        self.orders = cache_map.orders;
222        self.positions = cache_map.positions;
223        Ok(())
224    }
225
226    /// Clears and reloads the currency cache from the database.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if loading currencies cache fails.
231    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
232        self.currencies = match &mut self.database {
233            Some(db) => db.load_currencies().await?,
234            None => AHashMap::new(),
235        };
236
237        log::info!("Cached {} currencies from database", self.general.len());
238        Ok(())
239    }
240
241    /// Clears and reloads the instrument cache from the database.
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if loading instruments cache fails.
246    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
247        self.instruments = match &mut self.database {
248            Some(db) => db.load_instruments().await?,
249            None => AHashMap::new(),
250        };
251
252        log::info!("Cached {} instruments from database", self.general.len());
253        Ok(())
254    }
255
256    /// Clears and reloads the synthetic instrument cache from the database.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if loading synthetic instruments cache fails.
261    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
262        self.synthetics = match &mut self.database {
263            Some(db) => db.load_synthetics().await?,
264            None => AHashMap::new(),
265        };
266
267        log::info!(
268            "Cached {} synthetic instruments from database",
269            self.general.len()
270        );
271        Ok(())
272    }
273
274    /// Clears and reloads the account cache from the database.
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if loading accounts cache fails.
279    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
280        self.accounts = match &mut self.database {
281            Some(db) => db.load_accounts().await?,
282            None => AHashMap::new(),
283        };
284
285        log::info!(
286            "Cached {} synthetic instruments from database",
287            self.general.len()
288        );
289        Ok(())
290    }
291
292    /// Clears and reloads the order cache from the database.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if loading orders cache fails.
297    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
298        self.orders = match &mut self.database {
299            Some(db) => db.load_orders().await?,
300            None => AHashMap::new(),
301        };
302
303        log::info!("Cached {} orders from database", self.general.len());
304        Ok(())
305    }
306
307    /// Clears and reloads the position cache from the database.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error if loading positions cache fails.
312    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
313        self.positions = match &mut self.database {
314            Some(db) => db.load_positions().await?,
315            None => AHashMap::new(),
316        };
317
318        log::info!("Cached {} positions from database", self.general.len());
319        Ok(())
320    }
321
322    /// Clears the current cache index and re-build.
323    pub fn build_index(&mut self) {
324        log::debug!("Building index");
325
326        // Index accounts
327        for account_id in self.accounts.keys() {
328            self.index
329                .venue_account
330                .insert(account_id.get_issuer(), *account_id);
331        }
332
333        // Index orders
334        for (client_order_id, order) in &self.orders {
335            let instrument_id = order.instrument_id();
336            let venue = instrument_id.venue;
337            let strategy_id = order.strategy_id();
338
339            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
340            self.index
341                .venue_orders
342                .entry(venue)
343                .or_default()
344                .insert(*client_order_id);
345
346            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
347            if let Some(venue_order_id) = order.venue_order_id() {
348                self.index
349                    .venue_order_ids
350                    .insert(venue_order_id, *client_order_id);
351            }
352
353            // 3: Build index.order_position -> {ClientOrderId, PositionId}
354            if let Some(position_id) = order.position_id() {
355                self.index
356                    .order_position
357                    .insert(*client_order_id, position_id);
358            }
359
360            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
361            self.index
362                .order_strategy
363                .insert(*client_order_id, order.strategy_id());
364
365            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
366            self.index
367                .instrument_orders
368                .entry(instrument_id)
369                .or_default()
370                .insert(*client_order_id);
371
372            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
373            self.index
374                .strategy_orders
375                .entry(strategy_id)
376                .or_default()
377                .insert(*client_order_id);
378
379            // 7: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
380            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
381                self.index
382                    .exec_algorithm_orders
383                    .entry(exec_algorithm_id)
384                    .or_default()
385                    .insert(*client_order_id);
386            }
387
388            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
389            if let Some(exec_spawn_id) = order.exec_spawn_id() {
390                self.index
391                    .exec_spawn_orders
392                    .entry(exec_spawn_id)
393                    .or_default()
394                    .insert(*client_order_id);
395            }
396
397            // 9: Build index.orders -> {ClientOrderId}
398            self.index.orders.insert(*client_order_id);
399
400            // 10: Build index.orders_open -> {ClientOrderId}
401            if order.is_open() {
402                self.index.orders_open.insert(*client_order_id);
403            }
404
405            // 11: Build index.orders_closed -> {ClientOrderId}
406            if order.is_closed() {
407                self.index.orders_closed.insert(*client_order_id);
408            }
409
410            // 12: Build index.orders_emulated -> {ClientOrderId}
411            if let Some(emulation_trigger) = order.emulation_trigger()
412                && emulation_trigger != TriggerType::NoTrigger
413                && !order.is_closed()
414            {
415                self.index.orders_emulated.insert(*client_order_id);
416            }
417
418            // 13: Build index.orders_inflight -> {ClientOrderId}
419            if order.is_inflight() {
420                self.index.orders_inflight.insert(*client_order_id);
421            }
422
423            // 14: Build index.strategies -> {StrategyId}
424            self.index.strategies.insert(strategy_id);
425
426            // 15: Build index.strategies -> {ExecAlgorithmId}
427            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
428                self.index.exec_algorithms.insert(exec_algorithm_id);
429            }
430        }
431
432        // Index positions
433        for (position_id, position) in &self.positions {
434            let instrument_id = position.instrument_id;
435            let venue = instrument_id.venue;
436            let strategy_id = position.strategy_id;
437
438            // 1: Build index.venue_positions -> {Venue, {PositionId}}
439            self.index
440                .venue_positions
441                .entry(venue)
442                .or_default()
443                .insert(*position_id);
444
445            // 2: Build index.position_strategy -> {PositionId, StrategyId}
446            self.index
447                .position_strategy
448                .insert(*position_id, position.strategy_id);
449
450            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
451            self.index
452                .position_orders
453                .entry(*position_id)
454                .or_default()
455                .extend(position.client_order_ids().into_iter());
456
457            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
458            self.index
459                .instrument_positions
460                .entry(instrument_id)
461                .or_default()
462                .insert(*position_id);
463
464            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
465            self.index
466                .strategy_positions
467                .entry(strategy_id)
468                .or_default()
469                .insert(*position_id);
470
471            // 6: Build index.positions -> {PositionId}
472            self.index.positions.insert(*position_id);
473
474            // 7: Build index.positions_open -> {PositionId}
475            if position.is_open() {
476                self.index.positions_open.insert(*position_id);
477            }
478
479            // 8: Build index.positions_closed -> {PositionId}
480            if position.is_closed() {
481                self.index.positions_closed.insert(*position_id);
482            }
483
484            // 9: Build index.strategies -> {StrategyId}
485            self.index.strategies.insert(strategy_id);
486        }
487    }
488
489    /// Returns whether the cache has a backing database.
490    #[must_use]
491    pub const fn has_backing(&self) -> bool {
492        self.config.database.is_some()
493    }
494
495    // Calculate the unrealized profit and loss (PnL) for `position`.
496    #[must_use]
497    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
498        let quote = if let Some(quote) = self.quote(&position.instrument_id) {
499            quote
500        } else {
501            log::warn!(
502                "Cannot calculate unrealized PnL for {}, no quotes for {}",
503                position.id,
504                position.instrument_id
505            );
506            return None;
507        };
508
509        let last = match position.side {
510            PositionSide::Flat | PositionSide::NoPositionSide => {
511                return Some(Money::new(0.0, position.settlement_currency));
512            }
513            PositionSide::Long => quote.ask_price,
514            PositionSide::Short => quote.bid_price,
515        };
516
517        Some(position.unrealized_pnl(last))
518    }
519
520    /// Checks integrity of data within the cache.
521    ///
522    /// All data should be loaded from the database prior to this call.
523    /// If an error is found then a log error message will also be produced.
524    ///
525    /// # Panics
526    ///
527    /// Panics if failure calling system clock.
528    #[must_use]
529    pub fn check_integrity(&mut self) -> bool {
530        let mut error_count = 0;
531        let failure = "Integrity failure";
532
533        // Get current timestamp in microseconds
534        let timestamp_us = SystemTime::now()
535            .duration_since(UNIX_EPOCH)
536            .expect("Time went backwards")
537            .as_micros();
538
539        log::info!("Checking data integrity");
540
541        // Check object caches
542        for account_id in self.accounts.keys() {
543            if !self
544                .index
545                .venue_account
546                .contains_key(&account_id.get_issuer())
547            {
548                log::error!(
549                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
550                );
551                error_count += 1;
552            }
553        }
554
555        for (client_order_id, order) in &self.orders {
556            if !self.index.order_strategy.contains_key(client_order_id) {
557                log::error!(
558                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
559                );
560                error_count += 1;
561            }
562            if !self.index.orders.contains(client_order_id) {
563                log::error!(
564                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
565                );
566                error_count += 1;
567            }
568            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
569                log::error!(
570                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
571                );
572                error_count += 1;
573            }
574            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
575                log::error!(
576                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
577                );
578                error_count += 1;
579            }
580            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
581                log::error!(
582                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
583                );
584                error_count += 1;
585            }
586            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
587                if !self
588                    .index
589                    .exec_algorithm_orders
590                    .contains_key(&exec_algorithm_id)
591                {
592                    log::error!(
593                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
594                    );
595                    error_count += 1;
596                }
597                if order.exec_spawn_id().is_none()
598                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
599                {
600                    log::error!(
601                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
602                    );
603                    error_count += 1;
604                }
605            }
606        }
607
608        for (position_id, position) in &self.positions {
609            if !self.index.position_strategy.contains_key(position_id) {
610                log::error!(
611                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
612                );
613                error_count += 1;
614            }
615            if !self.index.position_orders.contains_key(position_id) {
616                log::error!(
617                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
618                );
619                error_count += 1;
620            }
621            if !self.index.positions.contains(position_id) {
622                log::error!(
623                    "{failure} in positions: {position_id} not found in `self.index.positions`",
624                );
625                error_count += 1;
626            }
627            if position.is_open() && !self.index.positions_open.contains(position_id) {
628                log::error!(
629                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
630                );
631                error_count += 1;
632            }
633            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
634                log::error!(
635                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
636                );
637                error_count += 1;
638            }
639        }
640
641        // Check indexes
642        for account_id in self.index.venue_account.values() {
643            if !self.accounts.contains_key(account_id) {
644                log::error!(
645                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
646                );
647                error_count += 1;
648            }
649        }
650
651        for client_order_id in self.index.venue_order_ids.values() {
652            if !self.orders.contains_key(client_order_id) {
653                log::error!(
654                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
655                );
656                error_count += 1;
657            }
658        }
659
660        for client_order_id in self.index.client_order_ids.keys() {
661            if !self.orders.contains_key(client_order_id) {
662                log::error!(
663                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
664                );
665                error_count += 1;
666            }
667        }
668
669        for client_order_id in self.index.order_position.keys() {
670            if !self.orders.contains_key(client_order_id) {
671                log::error!(
672                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
673                );
674                error_count += 1;
675            }
676        }
677
678        // Check indexes
679        for client_order_id in self.index.order_strategy.keys() {
680            if !self.orders.contains_key(client_order_id) {
681                log::error!(
682                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
683                );
684                error_count += 1;
685            }
686        }
687
688        for position_id in self.index.position_strategy.keys() {
689            if !self.positions.contains_key(position_id) {
690                log::error!(
691                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
692                );
693                error_count += 1;
694            }
695        }
696
697        for position_id in self.index.position_orders.keys() {
698            if !self.positions.contains_key(position_id) {
699                log::error!(
700                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
701                );
702                error_count += 1;
703            }
704        }
705
706        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
707            for client_order_id in client_order_ids {
708                if !self.orders.contains_key(client_order_id) {
709                    log::error!(
710                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
711                    );
712                    error_count += 1;
713                }
714            }
715        }
716
717        for instrument_id in self.index.instrument_positions.keys() {
718            if !self.index.instrument_orders.contains_key(instrument_id) {
719                log::error!(
720                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
721                );
722                error_count += 1;
723            }
724        }
725
726        for client_order_ids in self.index.strategy_orders.values() {
727            for client_order_id in client_order_ids {
728                if !self.orders.contains_key(client_order_id) {
729                    log::error!(
730                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
731                    );
732                    error_count += 1;
733                }
734            }
735        }
736
737        for position_ids in self.index.strategy_positions.values() {
738            for position_id in position_ids {
739                if !self.positions.contains_key(position_id) {
740                    log::error!(
741                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
742                    );
743                    error_count += 1;
744                }
745            }
746        }
747
748        for client_order_id in &self.index.orders {
749            if !self.orders.contains_key(client_order_id) {
750                log::error!(
751                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
752                );
753                error_count += 1;
754            }
755        }
756
757        for client_order_id in &self.index.orders_emulated {
758            if !self.orders.contains_key(client_order_id) {
759                log::error!(
760                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
761                );
762                error_count += 1;
763            }
764        }
765
766        for client_order_id in &self.index.orders_inflight {
767            if !self.orders.contains_key(client_order_id) {
768                log::error!(
769                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
770                );
771                error_count += 1;
772            }
773        }
774
775        for client_order_id in &self.index.orders_open {
776            if !self.orders.contains_key(client_order_id) {
777                log::error!(
778                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
779                );
780                error_count += 1;
781            }
782        }
783
784        for client_order_id in &self.index.orders_closed {
785            if !self.orders.contains_key(client_order_id) {
786                log::error!(
787                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
788                );
789                error_count += 1;
790            }
791        }
792
793        for position_id in &self.index.positions {
794            if !self.positions.contains_key(position_id) {
795                log::error!(
796                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
797                );
798                error_count += 1;
799            }
800        }
801
802        for position_id in &self.index.positions_open {
803            if !self.positions.contains_key(position_id) {
804                log::error!(
805                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
806                );
807                error_count += 1;
808            }
809        }
810
811        for position_id in &self.index.positions_closed {
812            if !self.positions.contains_key(position_id) {
813                log::error!(
814                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
815                );
816                error_count += 1;
817            }
818        }
819
820        for strategy_id in &self.index.strategies {
821            if !self.index.strategy_orders.contains_key(strategy_id) {
822                log::error!(
823                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
824                );
825                error_count += 1;
826            }
827        }
828
829        for exec_algorithm_id in &self.index.exec_algorithms {
830            if !self
831                .index
832                .exec_algorithm_orders
833                .contains_key(exec_algorithm_id)
834            {
835                log::error!(
836                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
837                );
838                error_count += 1;
839            }
840        }
841
842        let total_us = SystemTime::now()
843            .duration_since(UNIX_EPOCH)
844            .expect("Time went backwards")
845            .as_micros()
846            - timestamp_us;
847
848        if error_count == 0 {
849            log::info!("Integrity check passed in {total_us}μs");
850            true
851        } else {
852            log::error!(
853                "Integrity check failed with {error_count} error{} in {total_us}μs",
854                if error_count == 1 { "" } else { "s" },
855            );
856            false
857        }
858    }
859
860    /// Checks for any residual open state and log warnings if any are found.
861    ///
862    ///'Open state' is considered to be open orders and open positions.
863    #[must_use]
864    pub fn check_residuals(&self) -> bool {
865        log::debug!("Checking residuals");
866
867        let mut residuals = false;
868
869        // Check for any open orders
870        for order in self.orders_open(None, None, None, None) {
871            residuals = true;
872            log::warn!("Residual {order:?}");
873        }
874
875        // Check for any open positions
876        for position in self.positions_open(None, None, None, None) {
877            residuals = true;
878            log::warn!("Residual {position}");
879        }
880
881        residuals
882    }
883
884    /// Purges all closed orders from the cache that are older than `buffer_secs`.
885    ///
886    ///
887    /// Only orders that have been closed for at least this amount of time will be purged.
888    /// A value of 0 means purge all closed orders regardless of when they were closed.
889    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
890        log::debug!(
891            "Purging closed orders{}",
892            if buffer_secs > 0 {
893                format!(" with buffer_secs={buffer_secs}")
894            } else {
895                String::new()
896            }
897        );
898
899        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
900
901        'outer: for client_order_id in self.index.orders_closed.clone() {
902            if let Some(order) = self.orders.get(&client_order_id)
903                && order.is_closed()
904                && let Some(ts_closed) = order.ts_closed()
905                && ts_closed + buffer_ns <= ts_now
906            {
907                // Check any linked orders (contingency orders)
908                if let Some(linked_order_ids) = order.linked_order_ids() {
909                    for linked_order_id in linked_order_ids {
910                        if let Some(linked_order) = self.orders.get(linked_order_id)
911                            && linked_order.is_open()
912                        {
913                            // Do not purge if linked order still open
914                            continue 'outer;
915                        }
916                    }
917                }
918
919                self.purge_order(client_order_id);
920            }
921        }
922    }
923
924    /// Purges all closed positions from the cache that are older than `buffer_secs`.
925    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
926        log::debug!(
927            "Purging closed positions{}",
928            if buffer_secs > 0 {
929                format!(" with buffer_secs={buffer_secs}")
930            } else {
931                String::new()
932            }
933        );
934
935        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
936
937        for position_id in self.index.positions_closed.clone() {
938            if let Some(position) = self.positions.get(&position_id)
939                && position.is_closed()
940                && let Some(ts_closed) = position.ts_closed
941                && ts_closed + buffer_ns <= ts_now
942            {
943                self.purge_position(position_id);
944            }
945        }
946    }
947
948    /// Purges the order with the `client_order_id` from the cache (if found).
949    ///
950    /// For safety, an order is prevented from being purged if it's open.
951    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
952        // Check if order exists and is safe to purge before removing
953        let order = self.orders.get(&client_order_id).cloned();
954
955        // SAFETY: Prevent purging open orders
956        if let Some(ref ord) = order
957            && ord.is_open()
958        {
959            log::warn!("Order {client_order_id} found open when purging, skipping purge");
960            return;
961        }
962
963        // If order exists in cache, remove it and clean up order-specific indices
964        if let Some(ref ord) = order {
965            // Safe to purge
966            self.orders.remove(&client_order_id);
967
968            // Remove order from venue index
969            if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
970            {
971                venue_orders.remove(&client_order_id);
972            }
973
974            // Remove venue order ID index if exists
975            if let Some(venue_order_id) = ord.venue_order_id() {
976                self.index.venue_order_ids.remove(&venue_order_id);
977            }
978
979            // Remove from instrument orders index
980            if let Some(instrument_orders) =
981                self.index.instrument_orders.get_mut(&ord.instrument_id())
982            {
983                instrument_orders.remove(&client_order_id);
984            }
985
986            // Remove from position orders index if associated with a position
987            if let Some(position_id) = ord.position_id()
988                && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
989            {
990                position_orders.remove(&client_order_id);
991            }
992
993            // Remove from exec algorithm orders index if it has an exec algorithm
994            if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
995                && let Some(exec_algorithm_orders) =
996                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
997            {
998                exec_algorithm_orders.remove(&client_order_id);
999            }
1000
1001            // Clean up strategy orders reverse index
1002            if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1003                strategy_orders.remove(&client_order_id);
1004                if strategy_orders.is_empty() {
1005                    self.index.strategy_orders.remove(&ord.strategy_id());
1006                }
1007            }
1008
1009            // Clean up exec spawn reverse index (if this order is a spawned child)
1010            if let Some(exec_spawn_id) = ord.exec_spawn_id()
1011                && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1012            {
1013                spawn_orders.remove(&client_order_id);
1014                if spawn_orders.is_empty() {
1015                    self.index.exec_spawn_orders.remove(&exec_spawn_id);
1016                }
1017            }
1018
1019            log::info!("Purged order {client_order_id}");
1020        } else {
1021            log::warn!("Order {client_order_id} not found when purging");
1022        }
1023
1024        // Always clean up order indices (even if order was not in cache)
1025        self.index.order_position.remove(&client_order_id);
1026        let strategy_id = self.index.order_strategy.remove(&client_order_id);
1027        self.index.order_client.remove(&client_order_id);
1028        self.index.client_order_ids.remove(&client_order_id);
1029
1030        // Clean up reverse index when order not in cache (using forward index)
1031        if let Some(strategy_id) = strategy_id
1032            && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1033        {
1034            strategy_orders.remove(&client_order_id);
1035            if strategy_orders.is_empty() {
1036                self.index.strategy_orders.remove(&strategy_id);
1037            }
1038        }
1039
1040        // Remove spawn parent entry if this order was a spawn root
1041        self.index.exec_spawn_orders.remove(&client_order_id);
1042
1043        self.index.orders.remove(&client_order_id);
1044        self.index.orders_closed.remove(&client_order_id);
1045        self.index.orders_emulated.remove(&client_order_id);
1046        self.index.orders_inflight.remove(&client_order_id);
1047        self.index.orders_pending_cancel.remove(&client_order_id);
1048    }
1049
1050    /// Purges the position with the `position_id` from the cache (if found).
1051    ///
1052    /// For safety, a position is prevented from being purged if it's open.
1053    pub fn purge_position(&mut self, position_id: PositionId) {
1054        // Check if position exists and is safe to purge before removing
1055        let position = self.positions.get(&position_id).cloned();
1056
1057        // SAFETY: Prevent purging open positions
1058        if let Some(ref pos) = position
1059            && pos.is_open()
1060        {
1061            log::warn!("Position {position_id} found open when purging, skipping purge");
1062            return;
1063        }
1064
1065        // If position exists in cache, remove it and clean up position-specific indices
1066        if let Some(ref pos) = position {
1067            self.positions.remove(&position_id);
1068
1069            // Remove from venue positions index
1070            if let Some(venue_positions) =
1071                self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1072            {
1073                venue_positions.remove(&position_id);
1074            }
1075
1076            // Remove from instrument positions index
1077            if let Some(instrument_positions) =
1078                self.index.instrument_positions.get_mut(&pos.instrument_id)
1079            {
1080                instrument_positions.remove(&position_id);
1081            }
1082
1083            // Remove from strategy positions index
1084            if let Some(strategy_positions) =
1085                self.index.strategy_positions.get_mut(&pos.strategy_id)
1086            {
1087                strategy_positions.remove(&position_id);
1088            }
1089
1090            // Remove position ID from orders that reference it
1091            for client_order_id in pos.client_order_ids() {
1092                self.index.order_position.remove(&client_order_id);
1093            }
1094
1095            log::info!("Purged position {position_id}");
1096        } else {
1097            log::warn!("Position {position_id} not found when purging");
1098        }
1099
1100        // Always clean up position indices (even if position not in cache)
1101        self.index.position_strategy.remove(&position_id);
1102        self.index.position_orders.remove(&position_id);
1103        self.index.positions.remove(&position_id);
1104        self.index.positions_open.remove(&position_id);
1105        self.index.positions_closed.remove(&position_id);
1106
1107        // Always clean up position snapshots (even if position not in cache)
1108        self.position_snapshots.remove(&position_id);
1109    }
1110
1111    /// Purges all account state events which are outside the lookback window.
1112    ///
1113    /// Only events which are outside the lookback window will be purged.
1114    /// A value of 0 means purge all account state events.
1115    pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1116        log::debug!(
1117            "Purging account events{}",
1118            if lookback_secs > 0 {
1119                format!(" with lookback_secs={lookback_secs}")
1120            } else {
1121                String::new()
1122            }
1123        );
1124
1125        for account in self.accounts.values_mut() {
1126            let event_count = account.event_count();
1127            account.purge_account_events(ts_now, lookback_secs);
1128            let count_diff = event_count - account.event_count();
1129            if count_diff > 0 {
1130                log::info!(
1131                    "Purged {} event(s) from account {}",
1132                    count_diff,
1133                    account.id()
1134                );
1135            }
1136        }
1137    }
1138
1139    /// Clears the caches index.
1140    pub fn clear_index(&mut self) {
1141        self.index.clear();
1142        log::debug!("Cleared index");
1143    }
1144
1145    /// Resets the cache.
1146    ///
1147    /// All stateful fields are reset to their initial value.
1148    pub fn reset(&mut self) {
1149        log::debug!("Resetting cache");
1150
1151        self.general.clear();
1152        self.currencies.clear();
1153        self.instruments.clear();
1154        self.synthetics.clear();
1155        self.books.clear();
1156        self.own_books.clear();
1157        self.quotes.clear();
1158        self.trades.clear();
1159        self.mark_xrates.clear();
1160        self.mark_prices.clear();
1161        self.index_prices.clear();
1162        self.bars.clear();
1163        self.accounts.clear();
1164        self.orders.clear();
1165        self.order_lists.clear();
1166        self.positions.clear();
1167        self.position_snapshots.clear();
1168        self.greeks.clear();
1169        self.yield_curves.clear();
1170
1171        #[cfg(feature = "defi")]
1172        {
1173            self.defi.pools.clear();
1174            self.defi.pool_profilers.clear();
1175        }
1176
1177        self.clear_index();
1178
1179        log::info!("Reset cache");
1180    }
1181
1182    /// Dispose of the cache which will close any underlying database adapter.
1183    ///
1184    /// If closing the database connection fails, an error is logged.
1185    pub fn dispose(&mut self) {
1186        if let Some(database) = &mut self.database
1187            && let Err(e) = database.close()
1188        {
1189            log::error!("Failed to close database during dispose: {e}");
1190        }
1191    }
1192
1193    /// Flushes the caches database which permanently removes all persisted data.
1194    ///
1195    /// If flushing the database connection fails, an error is logged.
1196    pub fn flush_db(&mut self) {
1197        if let Some(database) = &mut self.database
1198            && let Err(e) = database.flush()
1199        {
1200            log::error!("Failed to flush database: {e}");
1201        }
1202    }
1203
1204    /// Adds a raw bytes `value` to the cache under the `key`.
1205    ///
1206    /// The cache stores only raw bytes; interpretation is the caller's responsibility.
1207    ///
1208    /// # Errors
1209    ///
1210    /// Returns an error if persisting the entry to the backing database fails.
1211    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1212        check_valid_string_ascii(key, stringify!(key))?;
1213        check_predicate_false(value.is_empty(), stringify!(value))?;
1214
1215        log::debug!("Adding general {key}");
1216        self.general.insert(key.to_string(), value.clone());
1217
1218        if let Some(database) = &mut self.database {
1219            database.add(key.to_string(), value)?;
1220        }
1221        Ok(())
1222    }
1223
1224    /// Adds an `OrderBook` to the cache.
1225    ///
1226    /// # Errors
1227    ///
1228    /// Returns an error if persisting the order book to the backing database fails.
1229    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1230        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1231
1232        if self.config.save_market_data
1233            && let Some(database) = &mut self.database
1234        {
1235            database.add_order_book(&book)?;
1236        }
1237
1238        self.books.insert(book.instrument_id, book);
1239        Ok(())
1240    }
1241
1242    /// Adds an `OwnOrderBook` to the cache.
1243    ///
1244    /// # Errors
1245    ///
1246    /// Returns an error if persisting the own order book fails.
1247    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1248        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1249
1250        self.own_books.insert(own_book.instrument_id, own_book);
1251        Ok(())
1252    }
1253
1254    /// Adds the `mark_price` update to the cache.
1255    ///
1256    /// # Errors
1257    ///
1258    /// Returns an error if persisting the mark price to the backing database fails.
1259    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1260        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1261
1262        if self.config.save_market_data {
1263            // TODO: Placeholder and return Result for consistency
1264        }
1265
1266        let mark_prices_deque = self
1267            .mark_prices
1268            .entry(mark_price.instrument_id)
1269            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1270        mark_prices_deque.push_front(mark_price);
1271        Ok(())
1272    }
1273
1274    /// Adds the `index_price` update to the cache.
1275    ///
1276    /// # Errors
1277    ///
1278    /// Returns an error if persisting the index price to the backing database fails.
1279    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1280        log::debug!(
1281            "Adding `IndexPriceUpdate` for {}",
1282            index_price.instrument_id
1283        );
1284
1285        if self.config.save_market_data {
1286            // TODO: Placeholder and return Result for consistency
1287        }
1288
1289        let index_prices_deque = self
1290            .index_prices
1291            .entry(index_price.instrument_id)
1292            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1293        index_prices_deque.push_front(index_price);
1294        Ok(())
1295    }
1296
1297    /// Adds the `funding_rate` update to the cache.
1298    ///
1299    /// # Errors
1300    ///
1301    /// Returns an error if persisting the funding rate update to the backing database fails.
1302    pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1303        log::debug!(
1304            "Adding `FundingRateUpdate` for {}",
1305            funding_rate.instrument_id
1306        );
1307
1308        if self.config.save_market_data {
1309            // TODO: Placeholder and return Result for consistency
1310        }
1311
1312        self.funding_rates
1313            .insert(funding_rate.instrument_id, funding_rate);
1314        Ok(())
1315    }
1316
1317    /// Adds the `quote` tick to the cache.
1318    ///
1319    /// # Errors
1320    ///
1321    /// Returns an error if persisting the quote tick to the backing database fails.
1322    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1323        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1324
1325        if self.config.save_market_data
1326            && let Some(database) = &mut self.database
1327        {
1328            database.add_quote(&quote)?;
1329        }
1330
1331        let quotes_deque = self
1332            .quotes
1333            .entry(quote.instrument_id)
1334            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1335        quotes_deque.push_front(quote);
1336        Ok(())
1337    }
1338
1339    /// Adds the `quotes` to the cache.
1340    ///
1341    /// # Errors
1342    ///
1343    /// Returns an error if persisting the quote ticks to the backing database fails.
1344    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1345        check_slice_not_empty(quotes, stringify!(quotes))?;
1346
1347        let instrument_id = quotes[0].instrument_id;
1348        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1349
1350        if self.config.save_market_data
1351            && let Some(database) = &mut self.database
1352        {
1353            for quote in quotes {
1354                database.add_quote(quote)?;
1355            }
1356        }
1357
1358        let quotes_deque = self
1359            .quotes
1360            .entry(instrument_id)
1361            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1362
1363        for quote in quotes {
1364            quotes_deque.push_front(*quote);
1365        }
1366        Ok(())
1367    }
1368
1369    /// Adds the `trade` tick to the cache.
1370    ///
1371    /// # Errors
1372    ///
1373    /// Returns an error if persisting the trade tick to the backing database fails.
1374    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1375        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1376
1377        if self.config.save_market_data
1378            && let Some(database) = &mut self.database
1379        {
1380            database.add_trade(&trade)?;
1381        }
1382
1383        let trades_deque = self
1384            .trades
1385            .entry(trade.instrument_id)
1386            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1387        trades_deque.push_front(trade);
1388        Ok(())
1389    }
1390
1391    /// Adds the give `trades` to the cache.
1392    ///
1393    /// # Errors
1394    ///
1395    /// Returns an error if persisting the trade ticks to the backing database fails.
1396    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1397        check_slice_not_empty(trades, stringify!(trades))?;
1398
1399        let instrument_id = trades[0].instrument_id;
1400        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1401
1402        if self.config.save_market_data
1403            && let Some(database) = &mut self.database
1404        {
1405            for trade in trades {
1406                database.add_trade(trade)?;
1407            }
1408        }
1409
1410        let trades_deque = self
1411            .trades
1412            .entry(instrument_id)
1413            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1414
1415        for trade in trades {
1416            trades_deque.push_front(*trade);
1417        }
1418        Ok(())
1419    }
1420
1421    /// Adds the `bar` to the cache.
1422    ///
1423    /// # Errors
1424    ///
1425    /// Returns an error if persisting the bar to the backing database fails.
1426    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1427        log::debug!("Adding `Bar` {}", bar.bar_type);
1428
1429        if self.config.save_market_data
1430            && let Some(database) = &mut self.database
1431        {
1432            database.add_bar(&bar)?;
1433        }
1434
1435        let bars = self
1436            .bars
1437            .entry(bar.bar_type)
1438            .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1439        bars.push_front(bar);
1440        Ok(())
1441    }
1442
1443    /// Adds the `bars` to the cache.
1444    ///
1445    /// # Errors
1446    ///
1447    /// Returns an error if persisting the bars to the backing database fails.
1448    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1449        check_slice_not_empty(bars, stringify!(bars))?;
1450
1451        let bar_type = bars[0].bar_type;
1452        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1453
1454        if self.config.save_market_data
1455            && let Some(database) = &mut self.database
1456        {
1457            for bar in bars {
1458                database.add_bar(bar)?;
1459            }
1460        }
1461
1462        let bars_deque = self
1463            .bars
1464            .entry(bar_type)
1465            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1466
1467        for bar in bars {
1468            bars_deque.push_front(*bar);
1469        }
1470        Ok(())
1471    }
1472
1473    /// Adds the `greeks` data to the cache.
1474    ///
1475    /// # Errors
1476    ///
1477    /// Returns an error if persisting the greeks data to the backing database fails.
1478    pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1479        log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1480
1481        if self.config.save_market_data
1482            && let Some(_database) = &mut self.database
1483        {
1484            // TODO: Implement database.add_greeks(&greeks) when database adapter is updated
1485        }
1486
1487        self.greeks.insert(greeks.instrument_id, greeks);
1488        Ok(())
1489    }
1490
1491    /// Gets the greeks data for the `instrument_id`.
1492    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1493        self.greeks.get(instrument_id).cloned()
1494    }
1495
1496    /// Adds the `yield_curve` data to the cache.
1497    ///
1498    /// # Errors
1499    ///
1500    /// Returns an error if persisting the yield curve data to the backing database fails.
1501    pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1502        log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1503
1504        if self.config.save_market_data
1505            && let Some(_database) = &mut self.database
1506        {
1507            // TODO: Implement database.add_yield_curve(&yield_curve) when database adapter is updated
1508        }
1509
1510        self.yield_curves
1511            .insert(yield_curve.curve_name.clone(), yield_curve);
1512        Ok(())
1513    }
1514
1515    /// Gets the yield curve for the `key`.
1516    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1517        self.yield_curves.get(key).map(|curve| {
1518            let curve_clone = curve.clone();
1519            Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1520                as Box<dyn Fn(f64) -> f64>
1521        })
1522    }
1523
1524    /// Adds the `currency` to the cache.
1525    ///
1526    /// # Errors
1527    ///
1528    /// Returns an error if persisting the currency to the backing database fails.
1529    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1530        log::debug!("Adding `Currency` {}", currency.code);
1531
1532        if let Some(database) = &mut self.database {
1533            database.add_currency(&currency)?;
1534        }
1535
1536        self.currencies.insert(currency.code, currency);
1537        Ok(())
1538    }
1539
1540    /// Adds the `instrument` to the cache.
1541    ///
1542    /// # Errors
1543    ///
1544    /// Returns an error if persisting the instrument to the backing database fails.
1545    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1546        log::debug!("Adding `Instrument` {}", instrument.id());
1547
1548        if let Some(database) = &mut self.database {
1549            database.add_instrument(&instrument)?;
1550        }
1551
1552        self.instruments.insert(instrument.id(), instrument);
1553        Ok(())
1554    }
1555
1556    /// Adds the `synthetic` instrument to the cache.
1557    ///
1558    /// # Errors
1559    ///
1560    /// Returns an error if persisting the synthetic instrument to the backing database fails.
1561    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1562        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1563
1564        if let Some(database) = &mut self.database {
1565            database.add_synthetic(&synthetic)?;
1566        }
1567
1568        self.synthetics.insert(synthetic.id, synthetic);
1569        Ok(())
1570    }
1571
1572    /// Adds the `account` to the cache.
1573    ///
1574    /// # Errors
1575    ///
1576    /// Returns an error if persisting the account to the backing database fails.
1577    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1578        log::debug!("Adding `Account` {}", account.id());
1579
1580        if let Some(database) = &mut self.database {
1581            database.add_account(&account)?;
1582        }
1583
1584        let account_id = account.id();
1585        self.accounts.insert(account_id, account);
1586        self.index
1587            .venue_account
1588            .insert(account_id.get_issuer(), account_id);
1589        Ok(())
1590    }
1591
1592    /// Indexes the `client_order_id` with the `venue_order_id`.
1593    ///
1594    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
1595    ///
1596    /// # Errors
1597    ///
1598    /// Returns an error if the existing venue order ID conflicts and overwrite is false.
1599    pub fn add_venue_order_id(
1600        &mut self,
1601        client_order_id: &ClientOrderId,
1602        venue_order_id: &VenueOrderId,
1603        overwrite: bool,
1604    ) -> anyhow::Result<()> {
1605        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1606            && !overwrite
1607            && existing_venue_order_id != venue_order_id
1608        {
1609            anyhow::bail!(
1610                "Existing {existing_venue_order_id} for {client_order_id}
1611                    did not match the given {venue_order_id}.
1612                    If you are writing a test then try a different `venue_order_id`,
1613                    otherwise this is probably a bug."
1614            );
1615        }
1616
1617        self.index
1618            .client_order_ids
1619            .insert(*client_order_id, *venue_order_id);
1620        self.index
1621            .venue_order_ids
1622            .insert(*venue_order_id, *client_order_id);
1623
1624        Ok(())
1625    }
1626
1627    /// Adds the `order` to the cache indexed with any given identifiers.
1628    ///
1629    /// # Parameters
1630    ///
1631    /// `override_existing`: If the added order should 'override' any existing order and replace
1632    /// it in the cache. This is currently used for emulated orders which are
1633    /// being released and transformed into another type.
1634    ///
1635    /// # Errors
1636    ///
1637    /// Returns an error if not `replace_existing` and the `order.client_order_id` is already contained in the cache.
1638    pub fn add_order(
1639        &mut self,
1640        order: OrderAny,
1641        position_id: Option<PositionId>,
1642        client_id: Option<ClientId>,
1643        replace_existing: bool,
1644    ) -> anyhow::Result<()> {
1645        let instrument_id = order.instrument_id();
1646        let venue = instrument_id.venue;
1647        let client_order_id = order.client_order_id();
1648        let strategy_id = order.strategy_id();
1649        let exec_algorithm_id = order.exec_algorithm_id();
1650        let exec_spawn_id = order.exec_spawn_id();
1651
1652        if !replace_existing {
1653            check_key_not_in_map(
1654                &client_order_id,
1655                &self.orders,
1656                stringify!(client_order_id),
1657                stringify!(orders),
1658            )?;
1659        }
1660
1661        log::debug!("Adding {order:?}");
1662
1663        self.index.orders.insert(client_order_id);
1664        self.index
1665            .order_strategy
1666            .insert(client_order_id, strategy_id);
1667        self.index.strategies.insert(strategy_id);
1668
1669        // Update venue -> orders index
1670        self.index
1671            .venue_orders
1672            .entry(venue)
1673            .or_default()
1674            .insert(client_order_id);
1675
1676        // Update instrument -> orders index
1677        self.index
1678            .instrument_orders
1679            .entry(instrument_id)
1680            .or_default()
1681            .insert(client_order_id);
1682
1683        // Update strategy -> orders index
1684        self.index
1685            .strategy_orders
1686            .entry(strategy_id)
1687            .or_default()
1688            .insert(client_order_id);
1689
1690        // Update exec_algorithm -> orders index
1691        if let Some(exec_algorithm_id) = exec_algorithm_id {
1692            self.index.exec_algorithms.insert(exec_algorithm_id);
1693
1694            self.index
1695                .exec_algorithm_orders
1696                .entry(exec_algorithm_id)
1697                .or_default()
1698                .insert(client_order_id);
1699        }
1700
1701        // Update exec_spawn -> orders index
1702        if let Some(exec_spawn_id) = exec_spawn_id {
1703            self.index
1704                .exec_spawn_orders
1705                .entry(exec_spawn_id)
1706                .or_default()
1707                .insert(client_order_id);
1708        }
1709
1710        // Update emulation index
1711        match order.emulation_trigger() {
1712            Some(_) => {
1713                self.index.orders_emulated.remove(&client_order_id);
1714            }
1715            None => {
1716                self.index.orders_emulated.insert(client_order_id);
1717            }
1718        }
1719
1720        // Index position ID if provided
1721        if let Some(position_id) = position_id {
1722            self.add_position_id(
1723                &position_id,
1724                &order.instrument_id().venue,
1725                &client_order_id,
1726                &strategy_id,
1727            )?;
1728        }
1729
1730        // Index client ID if provided
1731        if let Some(client_id) = client_id {
1732            self.index.order_client.insert(client_order_id, client_id);
1733            log::debug!("Indexed {client_id:?}");
1734        }
1735
1736        if let Some(database) = &mut self.database {
1737            database.add_order(&order, client_id)?;
1738            // TODO: Implement
1739            // if self.config.snapshot_orders {
1740            //     database.snapshot_order_state(order)?;
1741            // }
1742        }
1743
1744        self.orders.insert(client_order_id, order);
1745
1746        Ok(())
1747    }
1748
1749    /// Indexes the `position_id` with the other given IDs.
1750    ///
1751    /// # Errors
1752    ///
1753    /// Returns an error if indexing position ID in the backing database fails.
1754    pub fn add_position_id(
1755        &mut self,
1756        position_id: &PositionId,
1757        venue: &Venue,
1758        client_order_id: &ClientOrderId,
1759        strategy_id: &StrategyId,
1760    ) -> anyhow::Result<()> {
1761        self.index
1762            .order_position
1763            .insert(*client_order_id, *position_id);
1764
1765        // Index: ClientOrderId -> PositionId
1766        if let Some(database) = &mut self.database {
1767            database.index_order_position(*client_order_id, *position_id)?;
1768        }
1769
1770        // Index: PositionId -> StrategyId
1771        self.index
1772            .position_strategy
1773            .insert(*position_id, *strategy_id);
1774
1775        // Index: PositionId -> set[ClientOrderId]
1776        self.index
1777            .position_orders
1778            .entry(*position_id)
1779            .or_default()
1780            .insert(*client_order_id);
1781
1782        // Index: StrategyId -> set[PositionId]
1783        self.index
1784            .strategy_positions
1785            .entry(*strategy_id)
1786            .or_default()
1787            .insert(*position_id);
1788
1789        // Index: Venue -> set[PositionId]
1790        self.index
1791            .venue_positions
1792            .entry(*venue)
1793            .or_default()
1794            .insert(*position_id);
1795
1796        Ok(())
1797    }
1798
1799    /// Adds the `position` to the cache.
1800    ///
1801    /// # Errors
1802    ///
1803    /// Returns an error if persisting the position to the backing database fails.
1804    pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1805        self.positions.insert(position.id, position.clone());
1806        self.index.positions.insert(position.id);
1807        self.index.positions_open.insert(position.id);
1808        self.index.positions_closed.remove(&position.id); // Cleanup for NETTING reopen
1809
1810        log::debug!("Adding {position}");
1811
1812        self.add_position_id(
1813            &position.id,
1814            &position.instrument_id.venue,
1815            &position.opening_order_id,
1816            &position.strategy_id,
1817        )?;
1818
1819        let venue = position.instrument_id.venue;
1820        let venue_positions = self.index.venue_positions.entry(venue).or_default();
1821        venue_positions.insert(position.id);
1822
1823        // Index: InstrumentId -> AHashSet
1824        let instrument_id = position.instrument_id;
1825        let instrument_positions = self
1826            .index
1827            .instrument_positions
1828            .entry(instrument_id)
1829            .or_default();
1830        instrument_positions.insert(position.id);
1831
1832        if let Some(database) = &mut self.database {
1833            database.add_position(&position)?;
1834            // TODO: Implement position snapshots
1835            // if self.snapshot_positions {
1836            //     database.snapshot_position_state(
1837            //         position,
1838            //         position.ts_last,
1839            //         self.calculate_unrealized_pnl(&position),
1840            //     )?;
1841            // }
1842        }
1843
1844        Ok(())
1845    }
1846
1847    /// Updates the `account` in the cache.
1848    ///
1849    /// # Errors
1850    ///
1851    /// Returns an error if updating the account in the database fails.
1852    pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1853        if let Some(database) = &mut self.database {
1854            database.update_account(&account)?;
1855        }
1856        Ok(())
1857    }
1858
1859    /// Updates the `order` in the cache.
1860    ///
1861    /// # Errors
1862    ///
1863    /// Returns an error if updating the order in the database fails.
1864    pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1865        let client_order_id = order.client_order_id();
1866
1867        // Update venue order ID
1868        if let Some(venue_order_id) = order.venue_order_id() {
1869            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
1870            // venues which use a cancel+replace update strategy.
1871            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1872                // TODO: If the last event was `OrderUpdated` then overwrite should be true
1873                self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1874            }
1875        }
1876
1877        // Update in-flight state
1878        if order.is_inflight() {
1879            self.index.orders_inflight.insert(client_order_id);
1880        } else {
1881            self.index.orders_inflight.remove(&client_order_id);
1882        }
1883
1884        // Update open/closed state
1885        if order.is_open() {
1886            self.index.orders_closed.remove(&client_order_id);
1887            self.index.orders_open.insert(client_order_id);
1888        } else if order.is_closed() {
1889            self.index.orders_open.remove(&client_order_id);
1890            self.index.orders_pending_cancel.remove(&client_order_id);
1891            self.index.orders_closed.insert(client_order_id);
1892        }
1893
1894        // Update emulation
1895        if let Some(emulation_trigger) = order.emulation_trigger() {
1896            match emulation_trigger {
1897                TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1898                _ => self.index.orders_emulated.insert(client_order_id),
1899            };
1900        }
1901
1902        // Update own book
1903        if self.own_order_book(&order.instrument_id()).is_some()
1904            && should_handle_own_book_order(order)
1905        {
1906            self.update_own_order_book(order);
1907        }
1908
1909        if let Some(database) = &mut self.database {
1910            database.update_order(order.last_event())?;
1911            // TODO: Implement order snapshots
1912            // if self.snapshot_orders {
1913            //     database.snapshot_order_state(order)?;
1914            // }
1915        }
1916
1917        // update the order in the cache
1918        self.orders.insert(client_order_id, order.clone());
1919
1920        Ok(())
1921    }
1922
1923    /// Updates the `order` as pending cancel locally.
1924    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1925        self.index
1926            .orders_pending_cancel
1927            .insert(order.client_order_id());
1928    }
1929
1930    /// Updates the `position` in the cache.
1931    ///
1932    /// # Errors
1933    ///
1934    /// Returns an error if updating the position in the database fails.
1935    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1936        // Update open/closed state
1937
1938        if position.is_open() {
1939            self.index.positions_open.insert(position.id);
1940            self.index.positions_closed.remove(&position.id);
1941        } else {
1942            self.index.positions_closed.insert(position.id);
1943            self.index.positions_open.remove(&position.id);
1944        }
1945
1946        if let Some(database) = &mut self.database {
1947            database.update_position(position)?;
1948            // TODO: Implement order snapshots
1949            // if self.snapshot_orders {
1950            //     database.snapshot_order_state(order)?;
1951            // }
1952        }
1953
1954        self.positions.insert(position.id, position.clone());
1955
1956        Ok(())
1957    }
1958
1959    /// Creates a snapshot of the `position` by cloning it, assigning a new ID,
1960    /// serializing it, and storing it in the position snapshots.
1961    ///
1962    /// # Errors
1963    ///
1964    /// Returns an error if serializing or storing the position snapshot fails.
1965    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1966        let position_id = position.id;
1967
1968        let mut copied_position = position.clone();
1969        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1970        copied_position.id = PositionId::new(new_id);
1971
1972        // Serialize the position (TODO: temporarily just to JSON to remove a dependency)
1973        let position_serialized = serde_json::to_vec(&copied_position)?;
1974
1975        let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1976        let new_snapshots = match snapshots {
1977            Some(existing_snapshots) => {
1978                let mut combined = existing_snapshots.to_vec();
1979                combined.extend(position_serialized);
1980                Bytes::from(combined)
1981            }
1982            None => Bytes::from(position_serialized),
1983        };
1984        self.position_snapshots.insert(position_id, new_snapshots);
1985
1986        log::debug!("Snapshot {copied_position}");
1987        Ok(())
1988    }
1989
1990    /// Creates a snapshot of the `position` state in the database.
1991    ///
1992    /// # Errors
1993    ///
1994    /// Returns an error if snapshotting the position state fails.
1995    pub fn snapshot_position_state(
1996        &mut self,
1997        position: &Position,
1998        // ts_snapshot: u64,
1999        // unrealized_pnl: Option<Money>,
2000        open_only: Option<bool>,
2001    ) -> anyhow::Result<()> {
2002        let open_only = open_only.unwrap_or(true);
2003
2004        if open_only && !position.is_open() {
2005            return Ok(());
2006        }
2007
2008        if let Some(database) = &mut self.database {
2009            database.snapshot_position_state(position).map_err(|e| {
2010                log::error!(
2011                    "Failed to snapshot position state for {}: {e:?}",
2012                    position.id
2013                );
2014                e
2015            })?;
2016        } else {
2017            log::warn!(
2018                "Cannot snapshot position state for {} (no database configured)",
2019                position.id
2020            );
2021        }
2022
2023        // Ok(())
2024        todo!()
2025    }
2026
2027    /// Gets the OMS type for the `position_id`.
2028    #[must_use]
2029    pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2030        // Get OMS type from the index
2031        if self.index.position_strategy.contains_key(position_id) {
2032            // For now, we'll default to NETTING
2033            // TODO: Store and retrieve actual OMS type per position
2034            Some(OmsType::Netting)
2035        } else {
2036            None
2037        }
2038    }
2039
2040    /// Gets position snapshot bytes for the `position_id`.
2041    #[must_use]
2042    pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
2043        self.position_snapshots.get(position_id).map(|b| b.to_vec())
2044    }
2045
2046    /// Gets position snapshot IDs for the `instrument_id`.
2047    #[must_use]
2048    pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2049        // Get snapshot position IDs that match the instrument
2050        let mut result = AHashSet::new();
2051        for (position_id, _) in &self.position_snapshots {
2052            // Check if this position is for the requested instrument
2053            if let Some(position) = self.positions.get(position_id)
2054                && position.instrument_id == *instrument_id
2055            {
2056                result.insert(*position_id);
2057            }
2058        }
2059        result
2060    }
2061
2062    /// Snapshots the `order` state in the database.
2063    ///
2064    /// # Errors
2065    ///
2066    /// Returns an error if snapshotting the order state fails.
2067    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2068        let database = if let Some(database) = &self.database {
2069            database
2070        } else {
2071            log::warn!(
2072                "Cannot snapshot order state for {} (no database configured)",
2073                order.client_order_id()
2074            );
2075            return Ok(());
2076        };
2077
2078        database.snapshot_order_state(order)
2079    }
2080
2081    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
2082
2083    fn build_order_query_filter_set(
2084        &self,
2085        venue: Option<&Venue>,
2086        instrument_id: Option<&InstrumentId>,
2087        strategy_id: Option<&StrategyId>,
2088    ) -> Option<AHashSet<ClientOrderId>> {
2089        let mut query: Option<AHashSet<ClientOrderId>> = None;
2090
2091        if let Some(venue) = venue {
2092            query = Some(
2093                self.index
2094                    .venue_orders
2095                    .get(venue)
2096                    .cloned()
2097                    .unwrap_or_default(),
2098            );
2099        }
2100
2101        if let Some(instrument_id) = instrument_id {
2102            let instrument_orders = self
2103                .index
2104                .instrument_orders
2105                .get(instrument_id)
2106                .cloned()
2107                .unwrap_or_default();
2108
2109            if let Some(existing_query) = &mut query {
2110                *existing_query = existing_query
2111                    .intersection(&instrument_orders)
2112                    .copied()
2113                    .collect();
2114            } else {
2115                query = Some(instrument_orders);
2116            }
2117        }
2118
2119        if let Some(strategy_id) = strategy_id {
2120            let strategy_orders = self
2121                .index
2122                .strategy_orders
2123                .get(strategy_id)
2124                .cloned()
2125                .unwrap_or_default();
2126
2127            if let Some(existing_query) = &mut query {
2128                *existing_query = existing_query
2129                    .intersection(&strategy_orders)
2130                    .copied()
2131                    .collect();
2132            } else {
2133                query = Some(strategy_orders);
2134            }
2135        }
2136
2137        query
2138    }
2139
2140    fn build_position_query_filter_set(
2141        &self,
2142        venue: Option<&Venue>,
2143        instrument_id: Option<&InstrumentId>,
2144        strategy_id: Option<&StrategyId>,
2145    ) -> Option<AHashSet<PositionId>> {
2146        let mut query: Option<AHashSet<PositionId>> = None;
2147
2148        if let Some(venue) = venue {
2149            query = Some(
2150                self.index
2151                    .venue_positions
2152                    .get(venue)
2153                    .cloned()
2154                    .unwrap_or_default(),
2155            );
2156        }
2157
2158        if let Some(instrument_id) = instrument_id {
2159            let instrument_positions = self
2160                .index
2161                .instrument_positions
2162                .get(instrument_id)
2163                .cloned()
2164                .unwrap_or_default();
2165
2166            if let Some(existing_query) = query {
2167                query = Some(
2168                    existing_query
2169                        .intersection(&instrument_positions)
2170                        .copied()
2171                        .collect(),
2172                );
2173            } else {
2174                query = Some(instrument_positions);
2175            }
2176        }
2177
2178        if let Some(strategy_id) = strategy_id {
2179            let strategy_positions = self
2180                .index
2181                .strategy_positions
2182                .get(strategy_id)
2183                .cloned()
2184                .unwrap_or_default();
2185
2186            if let Some(existing_query) = query {
2187                query = Some(
2188                    existing_query
2189                        .intersection(&strategy_positions)
2190                        .copied()
2191                        .collect(),
2192                );
2193            } else {
2194                query = Some(strategy_positions);
2195            }
2196        }
2197
2198        query
2199    }
2200
2201    /// Retrieves orders corresponding to the `client_order_ids`, optionally filtering by `side`.
2202    ///
2203    /// # Panics
2204    ///
2205    /// Panics if any `client_order_id` in the set is not found in the cache.
2206    fn get_orders_for_ids(
2207        &self,
2208        client_order_ids: &AHashSet<ClientOrderId>,
2209        side: Option<OrderSide>,
2210    ) -> Vec<&OrderAny> {
2211        let side = side.unwrap_or(OrderSide::NoOrderSide);
2212        let mut orders = Vec::new();
2213
2214        for client_order_id in client_order_ids {
2215            let order = self
2216                .orders
2217                .get(client_order_id)
2218                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2219            if side == OrderSide::NoOrderSide || side == order.order_side() {
2220                orders.push(order);
2221            }
2222        }
2223
2224        orders
2225    }
2226
2227    /// Retrieves positions corresponding to the `position_ids`, optionally filtering by `side`.
2228    ///
2229    /// # Panics
2230    ///
2231    /// Panics if any `position_id` in the set is not found in the cache.
2232    fn get_positions_for_ids(
2233        &self,
2234        position_ids: &AHashSet<PositionId>,
2235        side: Option<PositionSide>,
2236    ) -> Vec<&Position> {
2237        let side = side.unwrap_or(PositionSide::NoPositionSide);
2238        let mut positions = Vec::new();
2239
2240        for position_id in position_ids {
2241            let position = self
2242                .positions
2243                .get(position_id)
2244                .unwrap_or_else(|| panic!("Position {position_id} not found"));
2245            if side == PositionSide::NoPositionSide || side == position.side {
2246                positions.push(position);
2247            }
2248        }
2249
2250        positions
2251    }
2252
2253    /// Returns the `ClientOrderId`s of all orders.
2254    #[must_use]
2255    pub fn client_order_ids(
2256        &self,
2257        venue: Option<&Venue>,
2258        instrument_id: Option<&InstrumentId>,
2259        strategy_id: Option<&StrategyId>,
2260    ) -> AHashSet<ClientOrderId> {
2261        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2262        match query {
2263            Some(query) => self.index.orders.intersection(&query).copied().collect(),
2264            None => self.index.orders.clone(),
2265        }
2266    }
2267
2268    /// Returns the `ClientOrderId`s of all open orders.
2269    #[must_use]
2270    pub fn client_order_ids_open(
2271        &self,
2272        venue: Option<&Venue>,
2273        instrument_id: Option<&InstrumentId>,
2274        strategy_id: Option<&StrategyId>,
2275    ) -> AHashSet<ClientOrderId> {
2276        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2277        match query {
2278            Some(query) => self
2279                .index
2280                .orders_open
2281                .intersection(&query)
2282                .copied()
2283                .collect(),
2284            None => self.index.orders_open.clone(),
2285        }
2286    }
2287
2288    /// Returns the `ClientOrderId`s of all closed orders.
2289    #[must_use]
2290    pub fn client_order_ids_closed(
2291        &self,
2292        venue: Option<&Venue>,
2293        instrument_id: Option<&InstrumentId>,
2294        strategy_id: Option<&StrategyId>,
2295    ) -> AHashSet<ClientOrderId> {
2296        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2297        match query {
2298            Some(query) => self
2299                .index
2300                .orders_closed
2301                .intersection(&query)
2302                .copied()
2303                .collect(),
2304            None => self.index.orders_closed.clone(),
2305        }
2306    }
2307
2308    /// Returns the `ClientOrderId`s of all emulated orders.
2309    #[must_use]
2310    pub fn client_order_ids_emulated(
2311        &self,
2312        venue: Option<&Venue>,
2313        instrument_id: Option<&InstrumentId>,
2314        strategy_id: Option<&StrategyId>,
2315    ) -> AHashSet<ClientOrderId> {
2316        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2317        match query {
2318            Some(query) => self
2319                .index
2320                .orders_emulated
2321                .intersection(&query)
2322                .copied()
2323                .collect(),
2324            None => self.index.orders_emulated.clone(),
2325        }
2326    }
2327
2328    /// Returns the `ClientOrderId`s of all in-flight orders.
2329    #[must_use]
2330    pub fn client_order_ids_inflight(
2331        &self,
2332        venue: Option<&Venue>,
2333        instrument_id: Option<&InstrumentId>,
2334        strategy_id: Option<&StrategyId>,
2335    ) -> AHashSet<ClientOrderId> {
2336        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2337        match query {
2338            Some(query) => self
2339                .index
2340                .orders_inflight
2341                .intersection(&query)
2342                .copied()
2343                .collect(),
2344            None => self.index.orders_inflight.clone(),
2345        }
2346    }
2347
2348    /// Returns `PositionId`s of all positions.
2349    #[must_use]
2350    pub fn position_ids(
2351        &self,
2352        venue: Option<&Venue>,
2353        instrument_id: Option<&InstrumentId>,
2354        strategy_id: Option<&StrategyId>,
2355    ) -> AHashSet<PositionId> {
2356        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2357        match query {
2358            Some(query) => self.index.positions.intersection(&query).copied().collect(),
2359            None => self.index.positions.clone(),
2360        }
2361    }
2362
2363    /// Returns the `PositionId`s of all open positions.
2364    #[must_use]
2365    pub fn position_open_ids(
2366        &self,
2367        venue: Option<&Venue>,
2368        instrument_id: Option<&InstrumentId>,
2369        strategy_id: Option<&StrategyId>,
2370    ) -> AHashSet<PositionId> {
2371        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2372        match query {
2373            Some(query) => self
2374                .index
2375                .positions_open
2376                .intersection(&query)
2377                .copied()
2378                .collect(),
2379            None => self.index.positions_open.clone(),
2380        }
2381    }
2382
2383    /// Returns the `PositionId`s of all closed positions.
2384    #[must_use]
2385    pub fn position_closed_ids(
2386        &self,
2387        venue: Option<&Venue>,
2388        instrument_id: Option<&InstrumentId>,
2389        strategy_id: Option<&StrategyId>,
2390    ) -> AHashSet<PositionId> {
2391        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2392        match query {
2393            Some(query) => self
2394                .index
2395                .positions_closed
2396                .intersection(&query)
2397                .copied()
2398                .collect(),
2399            None => self.index.positions_closed.clone(),
2400        }
2401    }
2402
2403    /// Returns the `ComponentId`s of all actors.
2404    #[must_use]
2405    pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2406        self.index.actors.clone()
2407    }
2408
2409    /// Returns the `StrategyId`s of all strategies.
2410    #[must_use]
2411    pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2412        self.index.strategies.clone()
2413    }
2414
2415    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
2416    #[must_use]
2417    pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2418        self.index.exec_algorithms.clone()
2419    }
2420
2421    // -- ORDER QUERIES ---------------------------------------------------------------------------
2422
2423    /// Gets a reference to the order with the `client_order_id` (if found).
2424    #[must_use]
2425    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2426        self.orders.get(client_order_id)
2427    }
2428
2429    /// Gets a reference to the order with the `client_order_id` (if found).
2430    #[must_use]
2431    pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2432        self.orders.get_mut(client_order_id)
2433    }
2434
2435    /// Gets a reference to the client order ID for the `venue_order_id` (if found).
2436    #[must_use]
2437    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2438        self.index.venue_order_ids.get(venue_order_id)
2439    }
2440
2441    /// Gets a reference to the venue order ID for the `client_order_id` (if found).
2442    #[must_use]
2443    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2444        self.index.client_order_ids.get(client_order_id)
2445    }
2446
2447    /// Gets a reference to the client ID indexed for then `client_order_id` (if found).
2448    #[must_use]
2449    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2450        self.index.order_client.get(client_order_id)
2451    }
2452
2453    /// Returns references to all orders matching the optional filter parameters.
2454    #[must_use]
2455    pub fn orders(
2456        &self,
2457        venue: Option<&Venue>,
2458        instrument_id: Option<&InstrumentId>,
2459        strategy_id: Option<&StrategyId>,
2460        side: Option<OrderSide>,
2461    ) -> Vec<&OrderAny> {
2462        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2463        self.get_orders_for_ids(&client_order_ids, side)
2464    }
2465
2466    /// Returns references to all open orders matching the optional filter parameters.
2467    #[must_use]
2468    pub fn orders_open(
2469        &self,
2470        venue: Option<&Venue>,
2471        instrument_id: Option<&InstrumentId>,
2472        strategy_id: Option<&StrategyId>,
2473        side: Option<OrderSide>,
2474    ) -> Vec<&OrderAny> {
2475        let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2476        self.get_orders_for_ids(&client_order_ids, side)
2477    }
2478
2479    /// Returns references to all closed orders matching the optional filter parameters.
2480    #[must_use]
2481    pub fn orders_closed(
2482        &self,
2483        venue: Option<&Venue>,
2484        instrument_id: Option<&InstrumentId>,
2485        strategy_id: Option<&StrategyId>,
2486        side: Option<OrderSide>,
2487    ) -> Vec<&OrderAny> {
2488        let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2489        self.get_orders_for_ids(&client_order_ids, side)
2490    }
2491
2492    /// Returns references to all emulated orders matching the optional filter parameters.
2493    #[must_use]
2494    pub fn orders_emulated(
2495        &self,
2496        venue: Option<&Venue>,
2497        instrument_id: Option<&InstrumentId>,
2498        strategy_id: Option<&StrategyId>,
2499        side: Option<OrderSide>,
2500    ) -> Vec<&OrderAny> {
2501        let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2502        self.get_orders_for_ids(&client_order_ids, side)
2503    }
2504
2505    /// Returns references to all in-flight orders matching the optional filter parameters.
2506    #[must_use]
2507    pub fn orders_inflight(
2508        &self,
2509        venue: Option<&Venue>,
2510        instrument_id: Option<&InstrumentId>,
2511        strategy_id: Option<&StrategyId>,
2512        side: Option<OrderSide>,
2513    ) -> Vec<&OrderAny> {
2514        let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2515        self.get_orders_for_ids(&client_order_ids, side)
2516    }
2517
2518    /// Returns references to all orders for the `position_id`.
2519    #[must_use]
2520    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2521        let client_order_ids = self.index.position_orders.get(position_id);
2522        match client_order_ids {
2523            Some(client_order_ids) => {
2524                self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2525            }
2526            None => Vec::new(),
2527        }
2528    }
2529
2530    /// Returns whether an order with the `client_order_id` exists.
2531    #[must_use]
2532    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2533        self.index.orders.contains(client_order_id)
2534    }
2535
2536    /// Returns whether an order with the `client_order_id` is open.
2537    #[must_use]
2538    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2539        self.index.orders_open.contains(client_order_id)
2540    }
2541
2542    /// Returns whether an order with the `client_order_id` is closed.
2543    #[must_use]
2544    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2545        self.index.orders_closed.contains(client_order_id)
2546    }
2547
2548    /// Returns whether an order with the `client_order_id` is emulated.
2549    #[must_use]
2550    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2551        self.index.orders_emulated.contains(client_order_id)
2552    }
2553
2554    /// Returns whether an order with the `client_order_id` is in-flight.
2555    #[must_use]
2556    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2557        self.index.orders_inflight.contains(client_order_id)
2558    }
2559
2560    /// Returns whether an order with the `client_order_id` is `PENDING_CANCEL` locally.
2561    #[must_use]
2562    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2563        self.index.orders_pending_cancel.contains(client_order_id)
2564    }
2565
2566    /// Returns the count of all open orders.
2567    #[must_use]
2568    pub fn orders_open_count(
2569        &self,
2570        venue: Option<&Venue>,
2571        instrument_id: Option<&InstrumentId>,
2572        strategy_id: Option<&StrategyId>,
2573        side: Option<OrderSide>,
2574    ) -> usize {
2575        self.orders_open(venue, instrument_id, strategy_id, side)
2576            .len()
2577    }
2578
2579    /// Returns the count of all closed orders.
2580    #[must_use]
2581    pub fn orders_closed_count(
2582        &self,
2583        venue: Option<&Venue>,
2584        instrument_id: Option<&InstrumentId>,
2585        strategy_id: Option<&StrategyId>,
2586        side: Option<OrderSide>,
2587    ) -> usize {
2588        self.orders_closed(venue, instrument_id, strategy_id, side)
2589            .len()
2590    }
2591
2592    /// Returns the count of all emulated orders.
2593    #[must_use]
2594    pub fn orders_emulated_count(
2595        &self,
2596        venue: Option<&Venue>,
2597        instrument_id: Option<&InstrumentId>,
2598        strategy_id: Option<&StrategyId>,
2599        side: Option<OrderSide>,
2600    ) -> usize {
2601        self.orders_emulated(venue, instrument_id, strategy_id, side)
2602            .len()
2603    }
2604
2605    /// Returns the count of all in-flight orders.
2606    #[must_use]
2607    pub fn orders_inflight_count(
2608        &self,
2609        venue: Option<&Venue>,
2610        instrument_id: Option<&InstrumentId>,
2611        strategy_id: Option<&StrategyId>,
2612        side: Option<OrderSide>,
2613    ) -> usize {
2614        self.orders_inflight(venue, instrument_id, strategy_id, side)
2615            .len()
2616    }
2617
2618    /// Returns the count of all orders.
2619    #[must_use]
2620    pub fn orders_total_count(
2621        &self,
2622        venue: Option<&Venue>,
2623        instrument_id: Option<&InstrumentId>,
2624        strategy_id: Option<&StrategyId>,
2625        side: Option<OrderSide>,
2626    ) -> usize {
2627        self.orders(venue, instrument_id, strategy_id, side).len()
2628    }
2629
2630    /// Returns the order list for the `order_list_id`.
2631    #[must_use]
2632    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2633        self.order_lists.get(order_list_id)
2634    }
2635
2636    /// Returns all order lists matching the optional filter parameters.
2637    #[must_use]
2638    pub fn order_lists(
2639        &self,
2640        venue: Option<&Venue>,
2641        instrument_id: Option<&InstrumentId>,
2642        strategy_id: Option<&StrategyId>,
2643    ) -> Vec<&OrderList> {
2644        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2645
2646        if let Some(venue) = venue {
2647            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2648        }
2649
2650        if let Some(instrument_id) = instrument_id {
2651            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2652        }
2653
2654        if let Some(strategy_id) = strategy_id {
2655            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2656        }
2657
2658        order_lists
2659    }
2660
2661    /// Returns whether an order list with the `order_list_id` exists.
2662    #[must_use]
2663    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2664        self.order_lists.contains_key(order_list_id)
2665    }
2666
2667    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
2668
2669    /// Returns references to all orders associated with the `exec_algorithm_id` matching the
2670    /// optional filter parameters.
2671    #[must_use]
2672    pub fn orders_for_exec_algorithm(
2673        &self,
2674        exec_algorithm_id: &ExecAlgorithmId,
2675        venue: Option<&Venue>,
2676        instrument_id: Option<&InstrumentId>,
2677        strategy_id: Option<&StrategyId>,
2678        side: Option<OrderSide>,
2679    ) -> Vec<&OrderAny> {
2680        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2681        let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2682
2683        if let Some(query) = query
2684            && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
2685        {
2686            let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2687        }
2688
2689        if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2690            self.get_orders_for_ids(exec_algorithm_order_ids, side)
2691        } else {
2692            Vec::new()
2693        }
2694    }
2695
2696    /// Returns references to all orders with the `exec_spawn_id`.
2697    #[must_use]
2698    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2699        self.get_orders_for_ids(
2700            self.index
2701                .exec_spawn_orders
2702                .get(exec_spawn_id)
2703                .unwrap_or(&AHashSet::new()),
2704            None,
2705        )
2706    }
2707
2708    /// Returns the total order quantity for the `exec_spawn_id`.
2709    #[must_use]
2710    pub fn exec_spawn_total_quantity(
2711        &self,
2712        exec_spawn_id: &ClientOrderId,
2713        active_only: bool,
2714    ) -> Option<Quantity> {
2715        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2716
2717        let mut total_quantity: Option<Quantity> = None;
2718
2719        for spawn_order in exec_spawn_orders {
2720            if !active_only || !spawn_order.is_closed() {
2721                if let Some(mut total_quantity) = total_quantity {
2722                    total_quantity += spawn_order.quantity();
2723                }
2724            } else {
2725                total_quantity = Some(spawn_order.quantity());
2726            }
2727        }
2728
2729        total_quantity
2730    }
2731
2732    /// Returns the total filled quantity for all orders with the `exec_spawn_id`.
2733    #[must_use]
2734    pub fn exec_spawn_total_filled_qty(
2735        &self,
2736        exec_spawn_id: &ClientOrderId,
2737        active_only: bool,
2738    ) -> Option<Quantity> {
2739        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2740
2741        let mut total_quantity: Option<Quantity> = None;
2742
2743        for spawn_order in exec_spawn_orders {
2744            if !active_only || !spawn_order.is_closed() {
2745                if let Some(mut total_quantity) = total_quantity {
2746                    total_quantity += spawn_order.filled_qty();
2747                }
2748            } else {
2749                total_quantity = Some(spawn_order.filled_qty());
2750            }
2751        }
2752
2753        total_quantity
2754    }
2755
2756    /// Returns the total leaves quantity for all orders with the `exec_spawn_id`.
2757    #[must_use]
2758    pub fn exec_spawn_total_leaves_qty(
2759        &self,
2760        exec_spawn_id: &ClientOrderId,
2761        active_only: bool,
2762    ) -> Option<Quantity> {
2763        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2764
2765        let mut total_quantity: Option<Quantity> = None;
2766
2767        for spawn_order in exec_spawn_orders {
2768            if !active_only || !spawn_order.is_closed() {
2769                if let Some(mut total_quantity) = total_quantity {
2770                    total_quantity += spawn_order.leaves_qty();
2771                }
2772            } else {
2773                total_quantity = Some(spawn_order.leaves_qty());
2774            }
2775        }
2776
2777        total_quantity
2778    }
2779
2780    // -- POSITION QUERIES ------------------------------------------------------------------------
2781
2782    /// Returns a reference to the position with the `position_id` (if found).
2783    #[must_use]
2784    pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2785        self.positions.get(position_id)
2786    }
2787
2788    /// Returns a reference to the position for the `client_order_id` (if found).
2789    #[must_use]
2790    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2791        self.index
2792            .order_position
2793            .get(client_order_id)
2794            .and_then(|position_id| self.positions.get(position_id))
2795    }
2796
2797    /// Returns a reference to the position ID for the `client_order_id` (if found).
2798    #[must_use]
2799    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2800        self.index.order_position.get(client_order_id)
2801    }
2802
2803    /// Returns a reference to all positions matching the optional filter parameters.
2804    #[must_use]
2805    pub fn positions(
2806        &self,
2807        venue: Option<&Venue>,
2808        instrument_id: Option<&InstrumentId>,
2809        strategy_id: Option<&StrategyId>,
2810        side: Option<PositionSide>,
2811    ) -> Vec<&Position> {
2812        let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2813        self.get_positions_for_ids(&position_ids, side)
2814    }
2815
2816    /// Returns a reference to all open positions matching the optional filter parameters.
2817    #[must_use]
2818    pub fn positions_open(
2819        &self,
2820        venue: Option<&Venue>,
2821        instrument_id: Option<&InstrumentId>,
2822        strategy_id: Option<&StrategyId>,
2823        side: Option<PositionSide>,
2824    ) -> Vec<&Position> {
2825        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2826        self.get_positions_for_ids(&position_ids, side)
2827    }
2828
2829    /// Returns a reference to all closed positions matching the optional filter parameters.
2830    #[must_use]
2831    pub fn positions_closed(
2832        &self,
2833        venue: Option<&Venue>,
2834        instrument_id: Option<&InstrumentId>,
2835        strategy_id: Option<&StrategyId>,
2836        side: Option<PositionSide>,
2837    ) -> Vec<&Position> {
2838        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2839        self.get_positions_for_ids(&position_ids, side)
2840    }
2841
2842    /// Returns whether a position with the `position_id` exists.
2843    #[must_use]
2844    pub fn position_exists(&self, position_id: &PositionId) -> bool {
2845        self.index.positions.contains(position_id)
2846    }
2847
2848    /// Returns whether a position with the `position_id` is open.
2849    #[must_use]
2850    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2851        self.index.positions_open.contains(position_id)
2852    }
2853
2854    /// Returns whether a position with the `position_id` is closed.
2855    #[must_use]
2856    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2857        self.index.positions_closed.contains(position_id)
2858    }
2859
2860    /// Returns the count of all open positions.
2861    #[must_use]
2862    pub fn positions_open_count(
2863        &self,
2864        venue: Option<&Venue>,
2865        instrument_id: Option<&InstrumentId>,
2866        strategy_id: Option<&StrategyId>,
2867        side: Option<PositionSide>,
2868    ) -> usize {
2869        self.positions_open(venue, instrument_id, strategy_id, side)
2870            .len()
2871    }
2872
2873    /// Returns the count of all closed positions.
2874    #[must_use]
2875    pub fn positions_closed_count(
2876        &self,
2877        venue: Option<&Venue>,
2878        instrument_id: Option<&InstrumentId>,
2879        strategy_id: Option<&StrategyId>,
2880        side: Option<PositionSide>,
2881    ) -> usize {
2882        self.positions_closed(venue, instrument_id, strategy_id, side)
2883            .len()
2884    }
2885
2886    /// Returns the count of all positions.
2887    #[must_use]
2888    pub fn positions_total_count(
2889        &self,
2890        venue: Option<&Venue>,
2891        instrument_id: Option<&InstrumentId>,
2892        strategy_id: Option<&StrategyId>,
2893        side: Option<PositionSide>,
2894    ) -> usize {
2895        self.positions(venue, instrument_id, strategy_id, side)
2896            .len()
2897    }
2898
2899    // -- STRATEGY QUERIES ------------------------------------------------------------------------
2900
2901    /// Gets a reference to the strategy ID for the `client_order_id` (if found).
2902    #[must_use]
2903    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2904        self.index.order_strategy.get(client_order_id)
2905    }
2906
2907    /// Gets a reference to the strategy ID for the `position_id` (if found).
2908    #[must_use]
2909    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2910        self.index.position_strategy.get(position_id)
2911    }
2912
2913    // -- GENERAL ---------------------------------------------------------------------------------
2914
2915    /// Gets a reference to the general value for the `key` (if found).
2916    ///
2917    /// # Errors
2918    ///
2919    /// Returns an error if the `key` is invalid.
2920    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2921        check_valid_string_ascii(key, stringify!(key))?;
2922
2923        Ok(self.general.get(key))
2924    }
2925
2926    // -- DATA QUERIES ----------------------------------------------------------------------------
2927
2928    /// Returns the price for the `instrument_id` and `price_type` (if found).
2929    #[must_use]
2930    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2931        match price_type {
2932            PriceType::Bid => self
2933                .quotes
2934                .get(instrument_id)
2935                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2936            PriceType::Ask => self
2937                .quotes
2938                .get(instrument_id)
2939                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2940            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2941                quotes.front().map(|quote| {
2942                    Price::new(
2943                        f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2944                        quote.bid_price.precision + 1,
2945                    )
2946                })
2947            }),
2948            PriceType::Last => self
2949                .trades
2950                .get(instrument_id)
2951                .and_then(|trades| trades.front().map(|trade| trade.price)),
2952            PriceType::Mark => self
2953                .mark_prices
2954                .get(instrument_id)
2955                .and_then(|marks| marks.front().map(|mark| mark.value)),
2956        }
2957    }
2958
2959    /// Gets all quotes for the `instrument_id`.
2960    #[must_use]
2961    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2962        self.quotes
2963            .get(instrument_id)
2964            .map(|quotes| quotes.iter().copied().collect())
2965    }
2966
2967    /// Gets all trades for the `instrument_id`.
2968    #[must_use]
2969    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2970        self.trades
2971            .get(instrument_id)
2972            .map(|trades| trades.iter().copied().collect())
2973    }
2974
2975    /// Gets all mark price updates for the `instrument_id`.
2976    #[must_use]
2977    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
2978        self.mark_prices
2979            .get(instrument_id)
2980            .map(|mark_prices| mark_prices.iter().copied().collect())
2981    }
2982
2983    /// Gets all index price updates for the `instrument_id`.
2984    #[must_use]
2985    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
2986        self.index_prices
2987            .get(instrument_id)
2988            .map(|index_prices| index_prices.iter().copied().collect())
2989    }
2990
2991    /// Gets all bars for the `bar_type`.
2992    #[must_use]
2993    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2994        self.bars
2995            .get(bar_type)
2996            .map(|bars| bars.iter().copied().collect())
2997    }
2998
2999    /// Gets a reference to the order book for the `instrument_id`.
3000    #[must_use]
3001    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3002        self.books.get(instrument_id)
3003    }
3004
3005    /// Gets a reference to the order book for the `instrument_id`.
3006    #[must_use]
3007    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3008        self.books.get_mut(instrument_id)
3009    }
3010
3011    /// Gets a reference to the own order book for the `instrument_id`.
3012    #[must_use]
3013    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3014        self.own_books.get(instrument_id)
3015    }
3016
3017    /// Gets a reference to the own order book for the `instrument_id`.
3018    #[must_use]
3019    pub fn own_order_book_mut(
3020        &mut self,
3021        instrument_id: &InstrumentId,
3022    ) -> Option<&mut OwnOrderBook> {
3023        self.own_books.get_mut(instrument_id)
3024    }
3025
3026    /// Gets a reference to the latest quote for the `instrument_id`.
3027    #[must_use]
3028    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3029        self.quotes
3030            .get(instrument_id)
3031            .and_then(|quotes| quotes.front())
3032    }
3033
3034    /// Gets a reference to the latest trade for the `instrument_id`.
3035    #[must_use]
3036    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3037        self.trades
3038            .get(instrument_id)
3039            .and_then(|trades| trades.front())
3040    }
3041
3042    /// Gets a reference to the latest mark price update for the `instrument_id`.
3043    #[must_use]
3044    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3045        self.mark_prices
3046            .get(instrument_id)
3047            .and_then(|mark_prices| mark_prices.front())
3048    }
3049
3050    /// Gets a reference to the latest index price update for the `instrument_id`.
3051    #[must_use]
3052    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3053        self.index_prices
3054            .get(instrument_id)
3055            .and_then(|index_prices| index_prices.front())
3056    }
3057
3058    /// Gets a reference to the funding rate update for the `instrument_id`.
3059    #[must_use]
3060    pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3061        self.funding_rates.get(instrument_id)
3062    }
3063
3064    /// Gets a reference to the latest bar for the `bar_type`.
3065    #[must_use]
3066    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3067        self.bars.get(bar_type).and_then(|bars| bars.front())
3068    }
3069
3070    /// Gets the order book update count for the `instrument_id`.
3071    #[must_use]
3072    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3073        self.books
3074            .get(instrument_id)
3075            .map_or(0, |book| book.update_count) as usize
3076    }
3077
3078    /// Gets the quote tick count for the `instrument_id`.
3079    #[must_use]
3080    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3081        self.quotes
3082            .get(instrument_id)
3083            .map_or(0, std::collections::VecDeque::len)
3084    }
3085
3086    /// Gets the trade tick count for the `instrument_id`.
3087    #[must_use]
3088    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3089        self.trades
3090            .get(instrument_id)
3091            .map_or(0, std::collections::VecDeque::len)
3092    }
3093
3094    /// Gets the bar count for the `instrument_id`.
3095    #[must_use]
3096    pub fn bar_count(&self, bar_type: &BarType) -> usize {
3097        self.bars
3098            .get(bar_type)
3099            .map_or(0, std::collections::VecDeque::len)
3100    }
3101
3102    /// Returns whether the cache contains an order book for the `instrument_id`.
3103    #[must_use]
3104    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3105        self.books.contains_key(instrument_id)
3106    }
3107
3108    /// Returns whether the cache contains quotes for the `instrument_id`.
3109    #[must_use]
3110    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3111        self.quote_count(instrument_id) > 0
3112    }
3113
3114    /// Returns whether the cache contains trades for the `instrument_id`.
3115    #[must_use]
3116    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3117        self.trade_count(instrument_id) > 0
3118    }
3119
3120    /// Returns whether the cache contains bars for the `bar_type`.
3121    #[must_use]
3122    pub fn has_bars(&self, bar_type: &BarType) -> bool {
3123        self.bar_count(bar_type) > 0
3124    }
3125
3126    #[must_use]
3127    pub fn get_xrate(
3128        &self,
3129        venue: Venue,
3130        from_currency: Currency,
3131        to_currency: Currency,
3132        price_type: PriceType,
3133    ) -> Option<f64> {
3134        if from_currency == to_currency {
3135            // When the source and target currencies are identical,
3136            // no conversion is needed; return an exchange rate of 1.0.
3137            return Some(1.0);
3138        }
3139
3140        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3141
3142        match get_exchange_rate(
3143            from_currency.code,
3144            to_currency.code,
3145            price_type,
3146            bid_quote,
3147            ask_quote,
3148        ) {
3149            Ok(rate) => rate,
3150            Err(e) => {
3151                log::error!("Failed to calculate xrate: {e}");
3152                None
3153            }
3154        }
3155    }
3156
3157    fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3158        let mut bid_quotes = AHashMap::new();
3159        let mut ask_quotes = AHashMap::new();
3160
3161        for instrument_id in self.instruments.keys() {
3162            if instrument_id.venue != *venue {
3163                continue;
3164            }
3165
3166            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3167                if let Some(tick) = ticks.front() {
3168                    (tick.bid_price, tick.ask_price)
3169                } else {
3170                    continue; // Empty ticks vector
3171                }
3172            } else {
3173                let bid_bar = self
3174                    .bars
3175                    .iter()
3176                    .find(|(k, _)| {
3177                        k.instrument_id() == *instrument_id
3178                            && matches!(k.spec().price_type, PriceType::Bid)
3179                    })
3180                    .map(|(_, v)| v);
3181
3182                let ask_bar = self
3183                    .bars
3184                    .iter()
3185                    .find(|(k, _)| {
3186                        k.instrument_id() == *instrument_id
3187                            && matches!(k.spec().price_type, PriceType::Ask)
3188                    })
3189                    .map(|(_, v)| v);
3190
3191                match (bid_bar, ask_bar) {
3192                    (Some(bid), Some(ask)) => {
3193                        match (bid.front(), ask.front()) {
3194                            (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
3195                            _ => {
3196                                // Empty bar VecDeques
3197                                continue;
3198                            }
3199                        }
3200                    }
3201                    _ => continue,
3202                }
3203            };
3204
3205            bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3206            ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3207        }
3208
3209        (bid_quotes, ask_quotes)
3210    }
3211
3212    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
3213    #[must_use]
3214    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3215        self.mark_xrates.get(&(from_currency, to_currency)).copied()
3216    }
3217
3218    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
3219    ///
3220    /// # Panics
3221    ///
3222    /// Panics if `xrate` is not positive.
3223    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3224        assert!(xrate > 0.0, "xrate was zero");
3225        self.mark_xrates.insert((from_currency, to_currency), xrate);
3226        self.mark_xrates
3227            .insert((to_currency, from_currency), 1.0 / xrate);
3228    }
3229
3230    /// Clears the mark exchange rate for the given currency pair.
3231    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3232        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3233    }
3234
3235    /// Clears all mark exchange rates.
3236    pub fn clear_mark_xrates(&mut self) {
3237        self.mark_xrates.clear();
3238    }
3239
3240    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
3241
3242    /// Returns a reference to the instrument for the `instrument_id` (if found).
3243    #[must_use]
3244    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3245        self.instruments.get(instrument_id)
3246    }
3247
3248    /// Returns references to all instrument IDs for the `venue`.
3249    #[must_use]
3250    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3251        match venue {
3252            Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3253            None => self.instruments.keys().collect(),
3254        }
3255    }
3256
3257    /// Returns references to all instruments for the `venue`.
3258    #[must_use]
3259    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3260        self.instruments
3261            .values()
3262            .filter(|i| &i.id().venue == venue)
3263            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3264            .collect()
3265    }
3266
3267    /// Returns references to all bar types contained in the cache.
3268    #[must_use]
3269    pub fn bar_types(
3270        &self,
3271        instrument_id: Option<&InstrumentId>,
3272        price_type: Option<&PriceType>,
3273        aggregation_source: AggregationSource,
3274    ) -> Vec<&BarType> {
3275        let mut bar_types = self
3276            .bars
3277            .keys()
3278            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3279            .collect::<Vec<&BarType>>();
3280
3281        if let Some(instrument_id) = instrument_id {
3282            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3283        }
3284
3285        if let Some(price_type) = price_type {
3286            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3287        }
3288
3289        bar_types
3290    }
3291
3292    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
3293
3294    /// Returns a reference to the synthetic instrument for the `instrument_id` (if found).
3295    #[must_use]
3296    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3297        self.synthetics.get(instrument_id)
3298    }
3299
3300    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
3301    #[must_use]
3302    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3303        self.synthetics.keys().collect()
3304    }
3305
3306    /// Returns references to all synthetic instruments contained in the cache.
3307    #[must_use]
3308    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3309        self.synthetics.values().collect()
3310    }
3311
3312    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
3313
3314    /// Returns a reference to the account for the `account_id` (if found).
3315    #[must_use]
3316    pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3317        self.accounts.get(account_id)
3318    }
3319
3320    /// Returns a reference to the account for the `venue` (if found).
3321    #[must_use]
3322    pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3323        self.index
3324            .venue_account
3325            .get(venue)
3326            .and_then(|account_id| self.accounts.get(account_id))
3327    }
3328
3329    /// Returns a reference to the account ID for the `venue` (if found).
3330    #[must_use]
3331    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3332        self.index.venue_account.get(venue)
3333    }
3334
3335    /// Returns references to all accounts for the `account_id`.
3336    #[must_use]
3337    pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3338        self.accounts
3339            .values()
3340            .filter(|account| &account.id() == account_id)
3341            .collect()
3342    }
3343
3344    /// Updates the own order book with an order.
3345    ///
3346    /// This method adds, updates, or removes an order from the own order book
3347    /// based on the order's current state.
3348    ///
3349    /// Orders without prices (MARKET, etc.) are skipped as they cannot be
3350    /// represented in own books.
3351    pub fn update_own_order_book(&mut self, order: &OrderAny) {
3352        if !order.has_price() {
3353            return;
3354        }
3355
3356        let instrument_id = order.instrument_id();
3357
3358        let own_book = self
3359            .own_books
3360            .entry(instrument_id)
3361            .or_insert_with(|| OwnOrderBook::new(instrument_id));
3362
3363        let own_book_order = order.to_own_book_order();
3364
3365        if order.is_closed() {
3366            if let Err(e) = own_book.delete(own_book_order) {
3367                log::debug!(
3368                    "Failed to delete order {} from own book: {e}",
3369                    order.client_order_id(),
3370                );
3371            } else {
3372                log::debug!("Deleted order {} from own book", order.client_order_id());
3373            }
3374        } else {
3375            // Add or update the order in the own book
3376            if let Err(e) = own_book.update(own_book_order) {
3377                log::debug!(
3378                    "Failed to update order {} in own book: {e}; inserting instead",
3379                    order.client_order_id(),
3380                );
3381                own_book.add(own_book_order);
3382            }
3383            log::debug!("Updated order {} in own book", order.client_order_id());
3384        }
3385    }
3386
3387    /// Force removal of an order from own order books and clean up all indexes.
3388    ///
3389    /// This method is used when order event application fails and we need to ensure
3390    /// terminal orders are properly cleaned up from own books and all relevant indexes.
3391    /// Replicates the index cleanup that update_order performs for closed orders.
3392    pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
3393        let order = match self.orders.get(client_order_id) {
3394            Some(order) => order,
3395            None => return,
3396        };
3397
3398        self.index.orders_open.remove(client_order_id);
3399        self.index.orders_pending_cancel.remove(client_order_id);
3400        self.index.orders_inflight.remove(client_order_id);
3401        self.index.orders_emulated.remove(client_order_id);
3402
3403        if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
3404            && order.has_price()
3405        {
3406            let own_book_order = order.to_own_book_order();
3407            if let Err(e) = own_book.delete(own_book_order) {
3408                log::debug!("Could not force delete {client_order_id} from own book: {e}");
3409            } else {
3410                log::debug!("Force deleted {client_order_id} from own book");
3411            }
3412        }
3413
3414        self.index.orders_closed.insert(*client_order_id);
3415    }
3416
3417    /// Audit all own order books against open and inflight order indexes.
3418    ///
3419    /// Ensures closed orders are removed from own order books. This includes both
3420    /// orders tracked in `orders_open` (ACCEPTED, TRIGGERED, PENDING_*, PARTIALLY_FILLED)
3421    /// and `orders_inflight` (INITIALIZED, SUBMITTED) to prevent false positives
3422    /// during venue latency windows.
3423    pub fn audit_own_order_books(&mut self) {
3424        log::debug!("Starting own books audit");
3425        let start = std::time::Instant::now();
3426
3427        // Build union of open and inflight orders for audit,
3428        // this prevents false positives for SUBMITTED orders during venue latency.
3429        let valid_order_ids: AHashSet<ClientOrderId> = self
3430            .index
3431            .orders_open
3432            .union(&self.index.orders_inflight)
3433            .copied()
3434            .collect();
3435
3436        for own_book in self.own_books.values_mut() {
3437            own_book.audit_open_orders(&valid_order_ids);
3438        }
3439
3440        log::debug!("Completed own books audit in {:?}", start.elapsed());
3441    }
3442}