nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! In-memory cache for market and execution data, with optional persistent backing.
17//!
18//! Provides methods to load, query, and update cached data such as instruments, orders, and prices.
19
20pub mod config;
21pub mod database;
22
23mod index;
24
25#[cfg(test)]
26mod tests;
27
28use std::{
29    collections::{HashSet, VecDeque},
30    fmt::Debug,
31    time::{SystemTime, UNIX_EPOCH},
32};
33
34use ahash::{AHashMap, AHashSet};
35use bytes::Bytes;
36pub use config::CacheConfig; // Re-export
37use database::{CacheDatabaseAdapter, CacheMap};
38use index::CacheIndex;
39use nautilus_core::{
40    UUID4, UnixNanos,
41    correctness::{
42        check_key_not_in_map, check_predicate_false, check_slice_not_empty, check_valid_string,
43    },
44    datetime::secs_to_nanos,
45};
46#[cfg(feature = "defi")]
47use nautilus_model::defi::Pool;
48use nautilus_model::{
49    accounts::{Account, AccountAny},
50    data::{
51        Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
52        TradeTick, YieldCurveData,
53    },
54    enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
55    identifiers::{
56        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
57        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
58    },
59    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
60    orderbook::{
61        OrderBook,
62        own::{OwnOrderBook, should_handle_own_book_order},
63    },
64    orders::{Order, OrderAny, OrderList},
65    position::Position,
66    types::{Currency, Money, Price, Quantity},
67};
68use ustr::Ustr;
69
70use crate::xrate::get_exchange_rate;
71
72/// A common in-memory `Cache` for market and execution related data.
73#[cfg_attr(
74    feature = "python",
75    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
76)]
77pub struct Cache {
78    config: CacheConfig,
79    index: CacheIndex,
80    database: Option<Box<dyn CacheDatabaseAdapter>>,
81    general: AHashMap<String, Bytes>,
82    currencies: AHashMap<Ustr, Currency>,
83    instruments: AHashMap<InstrumentId, InstrumentAny>,
84    synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
85    books: AHashMap<InstrumentId, OrderBook>,
86    own_books: AHashMap<InstrumentId, OwnOrderBook>,
87    quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
88    trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
89    mark_xrates: AHashMap<(Currency, Currency), f64>,
90    mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
91    index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
92    funding_rates: AHashMap<InstrumentId, FundingRateUpdate>,
93    bars: AHashMap<BarType, VecDeque<Bar>>,
94    greeks: AHashMap<InstrumentId, GreeksData>,
95    yield_curves: AHashMap<String, YieldCurveData>,
96    accounts: AHashMap<AccountId, AccountAny>,
97    orders: AHashMap<ClientOrderId, OrderAny>,
98    order_lists: AHashMap<OrderListId, OrderList>,
99    positions: AHashMap<PositionId, Position>,
100    position_snapshots: AHashMap<PositionId, Bytes>,
101    #[cfg(feature = "defi")]
102    pools: AHashMap<InstrumentId, Pool>,
103}
104
105impl Debug for Cache {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        f.debug_struct(stringify!(Cache))
108            .field("config", &self.config)
109            .field("index", &self.index)
110            .field("general", &self.general)
111            .field("currencies", &self.currencies)
112            .field("instruments", &self.instruments)
113            .field("synthetics", &self.synthetics)
114            .field("books", &self.books)
115            .field("own_books", &self.own_books)
116            .field("quotes", &self.quotes)
117            .field("trades", &self.trades)
118            .field("mark_xrates", &self.mark_xrates)
119            .field("mark_prices", &self.mark_prices)
120            .field("index_prices", &self.index_prices)
121            .field("funding_rates", &self.funding_rates)
122            .field("bars", &self.bars)
123            .field("greeks", &self.greeks)
124            .field("yield_curves", &self.yield_curves)
125            .field("accounts", &self.accounts)
126            .field("orders", &self.orders)
127            .field("order_lists", &self.order_lists)
128            .field("positions", &self.positions)
129            .field("position_snapshots", &self.position_snapshots)
130            .finish()
131    }
132}
133
134impl Default for Cache {
135    /// Creates a new default [`Cache`] instance.
136    fn default() -> Self {
137        Self::new(Some(CacheConfig::default()), None)
138    }
139}
140
141impl Cache {
142    /// Creates a new [`Cache`] instance with optional configuration and database adapter.
143    #[must_use]
144    /// # Note
145    ///
146    /// Uses provided `CacheConfig` or defaults, and optional `CacheDatabaseAdapter` for persistence.
147    pub fn new(
148        config: Option<CacheConfig>,
149        database: Option<Box<dyn CacheDatabaseAdapter>>,
150    ) -> Self {
151        Self {
152            config: config.unwrap_or_default(),
153            index: CacheIndex::default(),
154            database,
155            general: AHashMap::new(),
156            currencies: AHashMap::new(),
157            instruments: AHashMap::new(),
158            synthetics: AHashMap::new(),
159            books: AHashMap::new(),
160            own_books: AHashMap::new(),
161            quotes: AHashMap::new(),
162            trades: AHashMap::new(),
163            mark_xrates: AHashMap::new(),
164            mark_prices: AHashMap::new(),
165            index_prices: AHashMap::new(),
166            funding_rates: AHashMap::new(),
167            bars: AHashMap::new(),
168            greeks: AHashMap::new(),
169            yield_curves: AHashMap::new(),
170            accounts: AHashMap::new(),
171            orders: AHashMap::new(),
172            order_lists: AHashMap::new(),
173            positions: AHashMap::new(),
174            position_snapshots: AHashMap::new(),
175            #[cfg(feature = "defi")]
176            pools: AHashMap::new(),
177        }
178    }
179
180    /// Returns the cache instances memory address.
181    #[must_use]
182    pub fn memory_address(&self) -> String {
183        format!("{:?}", std::ptr::from_ref(self))
184    }
185
186    // -- COMMANDS --------------------------------------------------------------------------------
187
188    /// Clears and reloads general entries from the database into the cache.
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if loading general cache data fails.
193    pub fn cache_general(&mut self) -> anyhow::Result<()> {
194        self.general = match &mut self.database {
195            Some(db) => db.load()?,
196            None => AHashMap::new(),
197        };
198
199        log::info!(
200            "Cached {} general object(s) from database",
201            self.general.len()
202        );
203        Ok(())
204    }
205
206    /// Loads all core caches (currencies, instruments, accounts, orders, positions) from the database.
207    ///
208    /// # Errors
209    ///
210    /// Returns an error if loading all cache data fails.
211    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
212        let cache_map = match &self.database {
213            Some(db) => db.load_all().await?,
214            None => CacheMap::default(),
215        };
216
217        self.currencies = cache_map.currencies;
218        self.instruments = cache_map.instruments;
219        self.synthetics = cache_map.synthetics;
220        self.accounts = cache_map.accounts;
221        self.orders = cache_map.orders;
222        self.positions = cache_map.positions;
223        Ok(())
224    }
225
226    /// Clears and reloads the currency cache from the database.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if loading currencies cache fails.
231    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
232        self.currencies = match &mut self.database {
233            Some(db) => db.load_currencies().await?,
234            None => AHashMap::new(),
235        };
236
237        log::info!("Cached {} currencies from database", self.general.len());
238        Ok(())
239    }
240
241    /// Clears and reloads the instrument cache from the database.
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if loading instruments cache fails.
246    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
247        self.instruments = match &mut self.database {
248            Some(db) => db.load_instruments().await?,
249            None => AHashMap::new(),
250        };
251
252        log::info!("Cached {} instruments from database", self.general.len());
253        Ok(())
254    }
255
256    /// Clears and reloads the synthetic instrument cache from the database.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if loading synthetic instruments cache fails.
261    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
262        self.synthetics = match &mut self.database {
263            Some(db) => db.load_synthetics().await?,
264            None => AHashMap::new(),
265        };
266
267        log::info!(
268            "Cached {} synthetic instruments from database",
269            self.general.len()
270        );
271        Ok(())
272    }
273
274    /// Clears and reloads the account cache from the database.
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if loading accounts cache fails.
279    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
280        self.accounts = match &mut self.database {
281            Some(db) => db.load_accounts().await?,
282            None => AHashMap::new(),
283        };
284
285        log::info!(
286            "Cached {} synthetic instruments from database",
287            self.general.len()
288        );
289        Ok(())
290    }
291
292    /// Clears and reloads the order cache from the database.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if loading orders cache fails.
297    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
298        self.orders = match &mut self.database {
299            Some(db) => db.load_orders().await?,
300            None => AHashMap::new(),
301        };
302
303        log::info!("Cached {} orders from database", self.general.len());
304        Ok(())
305    }
306
307    /// Clears and reloads the position cache from the database.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error if loading positions cache fails.
312    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
313        self.positions = match &mut self.database {
314            Some(db) => db.load_positions().await?,
315            None => AHashMap::new(),
316        };
317
318        log::info!("Cached {} positions from database", self.general.len());
319        Ok(())
320    }
321
322    /// Clears the current cache index and re-build.
323    pub fn build_index(&mut self) {
324        log::debug!("Building index");
325
326        // Index accounts
327        for account_id in self.accounts.keys() {
328            self.index
329                .venue_account
330                .insert(account_id.get_issuer(), *account_id);
331        }
332
333        // Index orders
334        for (client_order_id, order) in &self.orders {
335            let instrument_id = order.instrument_id();
336            let venue = instrument_id.venue;
337            let strategy_id = order.strategy_id();
338
339            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
340            self.index
341                .venue_orders
342                .entry(venue)
343                .or_default()
344                .insert(*client_order_id);
345
346            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
347            if let Some(venue_order_id) = order.venue_order_id() {
348                self.index
349                    .venue_order_ids
350                    .insert(venue_order_id, *client_order_id);
351            }
352
353            // 3: Build index.order_position -> {ClientOrderId, PositionId}
354            if let Some(position_id) = order.position_id() {
355                self.index
356                    .order_position
357                    .insert(*client_order_id, position_id);
358            }
359
360            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
361            self.index
362                .order_strategy
363                .insert(*client_order_id, order.strategy_id());
364
365            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
366            self.index
367                .instrument_orders
368                .entry(instrument_id)
369                .or_default()
370                .insert(*client_order_id);
371
372            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
373            self.index
374                .strategy_orders
375                .entry(strategy_id)
376                .or_default()
377                .insert(*client_order_id);
378
379            // 7: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
380            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
381                self.index
382                    .exec_algorithm_orders
383                    .entry(exec_algorithm_id)
384                    .or_default()
385                    .insert(*client_order_id);
386            }
387
388            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
389            if let Some(exec_spawn_id) = order.exec_spawn_id() {
390                self.index
391                    .exec_spawn_orders
392                    .entry(exec_spawn_id)
393                    .or_default()
394                    .insert(*client_order_id);
395            }
396
397            // 9: Build index.orders -> {ClientOrderId}
398            self.index.orders.insert(*client_order_id);
399
400            // 10: Build index.orders_open -> {ClientOrderId}
401            if order.is_open() {
402                self.index.orders_open.insert(*client_order_id);
403            }
404
405            // 11: Build index.orders_closed -> {ClientOrderId}
406            if order.is_closed() {
407                self.index.orders_closed.insert(*client_order_id);
408            }
409
410            // 12: Build index.orders_emulated -> {ClientOrderId}
411            if let Some(emulation_trigger) = order.emulation_trigger()
412                && emulation_trigger != TriggerType::NoTrigger
413                && !order.is_closed()
414            {
415                self.index.orders_emulated.insert(*client_order_id);
416            }
417
418            // 13: Build index.orders_inflight -> {ClientOrderId}
419            if order.is_inflight() {
420                self.index.orders_inflight.insert(*client_order_id);
421            }
422
423            // 14: Build index.strategies -> {StrategyId}
424            self.index.strategies.insert(strategy_id);
425
426            // 15: Build index.strategies -> {ExecAlgorithmId}
427            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
428                self.index.exec_algorithms.insert(exec_algorithm_id);
429            }
430        }
431
432        // Index positions
433        for (position_id, position) in &self.positions {
434            let instrument_id = position.instrument_id;
435            let venue = instrument_id.venue;
436            let strategy_id = position.strategy_id;
437
438            // 1: Build index.venue_positions -> {Venue, {PositionId}}
439            self.index
440                .venue_positions
441                .entry(venue)
442                .or_default()
443                .insert(*position_id);
444
445            // 2: Build index.position_strategy -> {PositionId, StrategyId}
446            self.index
447                .position_strategy
448                .insert(*position_id, position.strategy_id);
449
450            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
451            self.index
452                .position_orders
453                .entry(*position_id)
454                .or_default()
455                .extend(position.client_order_ids().into_iter());
456
457            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
458            self.index
459                .instrument_positions
460                .entry(instrument_id)
461                .or_default()
462                .insert(*position_id);
463
464            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
465            self.index
466                .strategy_positions
467                .entry(strategy_id)
468                .or_default()
469                .insert(*position_id);
470
471            // 6: Build index.positions -> {PositionId}
472            self.index.positions.insert(*position_id);
473
474            // 7: Build index.positions_open -> {PositionId}
475            if position.is_open() {
476                self.index.positions_open.insert(*position_id);
477            }
478
479            // 8: Build index.positions_closed -> {PositionId}
480            if position.is_closed() {
481                self.index.positions_closed.insert(*position_id);
482            }
483
484            // 9: Build index.strategies -> {StrategyId}
485            self.index.strategies.insert(strategy_id);
486        }
487    }
488
489    /// Returns whether the cache has a backing database.
490    #[must_use]
491    pub const fn has_backing(&self) -> bool {
492        self.config.database.is_some()
493    }
494
495    // Calculate the unrealized profit and loss (PnL) for `position`.
496    #[must_use]
497    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
498        let quote = if let Some(quote) = self.quote(&position.instrument_id) {
499            quote
500        } else {
501            log::warn!(
502                "Cannot calculate unrealized PnL for {}, no quotes for {}",
503                position.id,
504                position.instrument_id
505            );
506            return None;
507        };
508
509        let last = match position.side {
510            PositionSide::Flat | PositionSide::NoPositionSide => {
511                return Some(Money::new(0.0, position.settlement_currency));
512            }
513            PositionSide::Long => quote.ask_price,
514            PositionSide::Short => quote.bid_price,
515        };
516
517        Some(position.unrealized_pnl(last))
518    }
519
520    /// Checks integrity of data within the cache.
521    ///
522    /// All data should be loaded from the database prior to this call.
523    /// If an error is found then a log error message will also be produced.
524    ///
525    /// # Panics
526    ///
527    /// Panics if failure calling system clock.
528    #[must_use]
529    pub fn check_integrity(&mut self) -> bool {
530        let mut error_count = 0;
531        let failure = "Integrity failure";
532
533        // Get current timestamp in microseconds
534        let timestamp_us = SystemTime::now()
535            .duration_since(UNIX_EPOCH)
536            .expect("Time went backwards")
537            .as_micros();
538
539        log::info!("Checking data integrity");
540
541        // Check object caches
542        for account_id in self.accounts.keys() {
543            if !self
544                .index
545                .venue_account
546                .contains_key(&account_id.get_issuer())
547            {
548                log::error!(
549                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
550                );
551                error_count += 1;
552            }
553        }
554
555        for (client_order_id, order) in &self.orders {
556            if !self.index.order_strategy.contains_key(client_order_id) {
557                log::error!(
558                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
559                );
560                error_count += 1;
561            }
562            if !self.index.orders.contains(client_order_id) {
563                log::error!(
564                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
565                );
566                error_count += 1;
567            }
568            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
569                log::error!(
570                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
571                );
572                error_count += 1;
573            }
574            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
575                log::error!(
576                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
577                );
578                error_count += 1;
579            }
580            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
581                log::error!(
582                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
583                );
584                error_count += 1;
585            }
586            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
587                if !self
588                    .index
589                    .exec_algorithm_orders
590                    .contains_key(&exec_algorithm_id)
591                {
592                    log::error!(
593                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
594                    );
595                    error_count += 1;
596                }
597                if order.exec_spawn_id().is_none()
598                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
599                {
600                    log::error!(
601                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
602                    );
603                    error_count += 1;
604                }
605            }
606        }
607
608        for (position_id, position) in &self.positions {
609            if !self.index.position_strategy.contains_key(position_id) {
610                log::error!(
611                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
612                );
613                error_count += 1;
614            }
615            if !self.index.position_orders.contains_key(position_id) {
616                log::error!(
617                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
618                );
619                error_count += 1;
620            }
621            if !self.index.positions.contains(position_id) {
622                log::error!(
623                    "{failure} in positions: {position_id} not found in `self.index.positions`",
624                );
625                error_count += 1;
626            }
627            if position.is_open() && !self.index.positions_open.contains(position_id) {
628                log::error!(
629                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
630                );
631                error_count += 1;
632            }
633            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
634                log::error!(
635                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
636                );
637                error_count += 1;
638            }
639        }
640
641        // Check indexes
642        for account_id in self.index.venue_account.values() {
643            if !self.accounts.contains_key(account_id) {
644                log::error!(
645                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
646                );
647                error_count += 1;
648            }
649        }
650
651        for client_order_id in self.index.venue_order_ids.values() {
652            if !self.orders.contains_key(client_order_id) {
653                log::error!(
654                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
655                );
656                error_count += 1;
657            }
658        }
659
660        for client_order_id in self.index.client_order_ids.keys() {
661            if !self.orders.contains_key(client_order_id) {
662                log::error!(
663                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
664                );
665                error_count += 1;
666            }
667        }
668
669        for client_order_id in self.index.order_position.keys() {
670            if !self.orders.contains_key(client_order_id) {
671                log::error!(
672                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
673                );
674                error_count += 1;
675            }
676        }
677
678        // Check indexes
679        for client_order_id in self.index.order_strategy.keys() {
680            if !self.orders.contains_key(client_order_id) {
681                log::error!(
682                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
683                );
684                error_count += 1;
685            }
686        }
687
688        for position_id in self.index.position_strategy.keys() {
689            if !self.positions.contains_key(position_id) {
690                log::error!(
691                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
692                );
693                error_count += 1;
694            }
695        }
696
697        for position_id in self.index.position_orders.keys() {
698            if !self.positions.contains_key(position_id) {
699                log::error!(
700                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
701                );
702                error_count += 1;
703            }
704        }
705
706        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
707            for client_order_id in client_order_ids {
708                if !self.orders.contains_key(client_order_id) {
709                    log::error!(
710                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
711                    );
712                    error_count += 1;
713                }
714            }
715        }
716
717        for instrument_id in self.index.instrument_positions.keys() {
718            if !self.index.instrument_orders.contains_key(instrument_id) {
719                log::error!(
720                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
721                );
722                error_count += 1;
723            }
724        }
725
726        for client_order_ids in self.index.strategy_orders.values() {
727            for client_order_id in client_order_ids {
728                if !self.orders.contains_key(client_order_id) {
729                    log::error!(
730                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
731                    );
732                    error_count += 1;
733                }
734            }
735        }
736
737        for position_ids in self.index.strategy_positions.values() {
738            for position_id in position_ids {
739                if !self.positions.contains_key(position_id) {
740                    log::error!(
741                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
742                    );
743                    error_count += 1;
744                }
745            }
746        }
747
748        for client_order_id in &self.index.orders {
749            if !self.orders.contains_key(client_order_id) {
750                log::error!(
751                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
752                );
753                error_count += 1;
754            }
755        }
756
757        for client_order_id in &self.index.orders_emulated {
758            if !self.orders.contains_key(client_order_id) {
759                log::error!(
760                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
761                );
762                error_count += 1;
763            }
764        }
765
766        for client_order_id in &self.index.orders_inflight {
767            if !self.orders.contains_key(client_order_id) {
768                log::error!(
769                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
770                );
771                error_count += 1;
772            }
773        }
774
775        for client_order_id in &self.index.orders_open {
776            if !self.orders.contains_key(client_order_id) {
777                log::error!(
778                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
779                );
780                error_count += 1;
781            }
782        }
783
784        for client_order_id in &self.index.orders_closed {
785            if !self.orders.contains_key(client_order_id) {
786                log::error!(
787                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
788                );
789                error_count += 1;
790            }
791        }
792
793        for position_id in &self.index.positions {
794            if !self.positions.contains_key(position_id) {
795                log::error!(
796                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
797                );
798                error_count += 1;
799            }
800        }
801
802        for position_id in &self.index.positions_open {
803            if !self.positions.contains_key(position_id) {
804                log::error!(
805                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
806                );
807                error_count += 1;
808            }
809        }
810
811        for position_id in &self.index.positions_closed {
812            if !self.positions.contains_key(position_id) {
813                log::error!(
814                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
815                );
816                error_count += 1;
817            }
818        }
819
820        for strategy_id in &self.index.strategies {
821            if !self.index.strategy_orders.contains_key(strategy_id) {
822                log::error!(
823                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
824                );
825                error_count += 1;
826            }
827        }
828
829        for exec_algorithm_id in &self.index.exec_algorithms {
830            if !self
831                .index
832                .exec_algorithm_orders
833                .contains_key(exec_algorithm_id)
834            {
835                log::error!(
836                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
837                );
838                error_count += 1;
839            }
840        }
841
842        let total_us = SystemTime::now()
843            .duration_since(UNIX_EPOCH)
844            .expect("Time went backwards")
845            .as_micros()
846            - timestamp_us;
847
848        if error_count == 0 {
849            log::info!("Integrity check passed in {total_us}μs");
850            true
851        } else {
852            log::error!(
853                "Integrity check failed with {error_count} error{} in {total_us}μs",
854                if error_count == 1 { "" } else { "s" },
855            );
856            false
857        }
858    }
859
860    /// Checks for any residual open state and log warnings if any are found.
861    ///
862    ///'Open state' is considered to be open orders and open positions.
863    #[must_use]
864    pub fn check_residuals(&self) -> bool {
865        log::debug!("Checking residuals");
866
867        let mut residuals = false;
868
869        // Check for any open orders
870        for order in self.orders_open(None, None, None, None) {
871            residuals = true;
872            log::warn!("Residual {order:?}");
873        }
874
875        // Check for any open positions
876        for position in self.positions_open(None, None, None, None) {
877            residuals = true;
878            log::warn!("Residual {position}");
879        }
880
881        residuals
882    }
883
884    /// Purges all closed orders from the cache that are older than `buffer_secs`.
885    ///
886    ///
887    /// Only orders that have been closed for at least this amount of time will be purged.
888    /// A value of 0 means purge all closed orders regardless of when they were closed.
889    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
890        log::debug!(
891            "Purging closed orders{}",
892            if buffer_secs > 0 {
893                format!(" with buffer_secs={buffer_secs}")
894            } else {
895                String::new()
896            }
897        );
898
899        let buffer_ns = secs_to_nanos(buffer_secs as f64);
900
901        'outer: for client_order_id in self.index.orders_closed.clone() {
902            if let Some(order) = self.orders.get(&client_order_id)
903                && let Some(ts_closed) = order.ts_closed()
904                && ts_closed + buffer_ns <= ts_now
905            {
906                // Check any linked orders (contingency orders)
907                if let Some(linked_order_ids) = order.linked_order_ids() {
908                    for linked_order_id in linked_order_ids {
909                        if let Some(linked_order) = self.orders.get(linked_order_id)
910                            && linked_order.is_open()
911                        {
912                            // Do not purge if linked order still open
913                            continue 'outer;
914                        }
915                    }
916                }
917
918                self.purge_order(client_order_id);
919            }
920        }
921    }
922
923    /// Purges all closed positions from the cache that are older than `buffer_secs`.
924    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
925        log::debug!(
926            "Purging closed positions{}",
927            if buffer_secs > 0 {
928                format!(" with buffer_secs={buffer_secs}")
929            } else {
930                String::new()
931            }
932        );
933
934        let buffer_ns = secs_to_nanos(buffer_secs as f64);
935
936        for position_id in self.index.positions_closed.clone() {
937            if let Some(position) = self.positions.get(&position_id)
938                && let Some(ts_closed) = position.ts_closed
939                && ts_closed + buffer_ns <= ts_now
940            {
941                self.purge_position(position_id);
942            }
943        }
944    }
945
946    /// Purges the order with the `client_order_id` from the cache (if found).
947    ///
948    /// All `OrderFilled` events for the order will also be purged from any associated position.
949    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
950        // Purge events from associated position if exists
951        if let Some(position_id) = self.index.order_position.get(&client_order_id)
952            && let Some(position) = self.positions.get_mut(position_id)
953        {
954            position.purge_events_for_order(client_order_id);
955        }
956
957        if let Some(order) = self.orders.remove(&client_order_id) {
958            // Remove order from venue index
959            if let Some(venue_orders) = self
960                .index
961                .venue_orders
962                .get_mut(&order.instrument_id().venue)
963            {
964                venue_orders.remove(&client_order_id);
965            }
966
967            // Remove venue order ID index if exists
968            if let Some(venue_order_id) = order.venue_order_id() {
969                self.index.venue_order_ids.remove(&venue_order_id);
970            }
971
972            // Remove from instrument orders index
973            if let Some(instrument_orders) =
974                self.index.instrument_orders.get_mut(&order.instrument_id())
975            {
976                instrument_orders.remove(&client_order_id);
977            }
978
979            // Remove from position orders index if associated with a position
980            if let Some(position_id) = order.position_id()
981                && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
982            {
983                position_orders.remove(&client_order_id);
984            }
985
986            // Remove from exec algorithm orders index if it has an exec algorithm
987            if let Some(exec_algorithm_id) = order.exec_algorithm_id()
988                && let Some(exec_algorithm_orders) =
989                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
990            {
991                exec_algorithm_orders.remove(&client_order_id);
992            }
993
994            log::info!("Purged order {client_order_id}");
995        } else {
996            log::warn!("Order {client_order_id} not found when purging");
997        }
998
999        // Remove from all other index collections regardless of whether order was found
1000        self.index.order_position.remove(&client_order_id);
1001        self.index.order_strategy.remove(&client_order_id);
1002        self.index.order_client.remove(&client_order_id);
1003        self.index.client_order_ids.remove(&client_order_id);
1004        self.index.exec_spawn_orders.remove(&client_order_id);
1005        self.index.orders.remove(&client_order_id);
1006        self.index.orders_closed.remove(&client_order_id);
1007        self.index.orders_emulated.remove(&client_order_id);
1008        self.index.orders_inflight.remove(&client_order_id);
1009        self.index.orders_pending_cancel.remove(&client_order_id);
1010    }
1011
1012    /// Purges the position with the `position_id` from the cache (if found).
1013    pub fn purge_position(&mut self, position_id: PositionId) {
1014        if let Some(position) = self.positions.remove(&position_id) {
1015            // Remove from venue positions index
1016            if let Some(venue_positions) = self
1017                .index
1018                .venue_positions
1019                .get_mut(&position.instrument_id.venue)
1020            {
1021                venue_positions.remove(&position_id);
1022            }
1023
1024            // Remove from instrument positions index
1025            if let Some(instrument_positions) = self
1026                .index
1027                .instrument_positions
1028                .get_mut(&position.instrument_id)
1029            {
1030                instrument_positions.remove(&position_id);
1031            }
1032
1033            // Remove from strategy positions index
1034            if let Some(strategy_positions) =
1035                self.index.strategy_positions.get_mut(&position.strategy_id)
1036            {
1037                strategy_positions.remove(&position_id);
1038            }
1039
1040            // Remove position ID from orders that reference it
1041            for client_order_id in position.client_order_ids() {
1042                self.index.order_position.remove(&client_order_id);
1043            }
1044
1045            log::info!("Purged position {position_id}");
1046        } else {
1047            log::warn!("Position {position_id} not found when purging");
1048        }
1049
1050        // Remove from all other index collections regardless of whether position was found
1051        self.index.position_strategy.remove(&position_id);
1052        self.index.position_orders.remove(&position_id);
1053        self.index.positions.remove(&position_id);
1054        self.index.positions_open.remove(&position_id);
1055        self.index.positions_closed.remove(&position_id);
1056    }
1057
1058    /// Purges all account state events which are outside the lookback window.
1059    ///
1060    /// Only events which are outside the lookback window will be purged.
1061    /// A value of 0 means purge all account state events.
1062    pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1063        log::debug!(
1064            "Purging account events{}",
1065            if lookback_secs > 0 {
1066                format!(" with lookback_secs={lookback_secs}")
1067            } else {
1068                String::new()
1069            }
1070        );
1071
1072        for account in self.accounts.values_mut() {
1073            let event_count = account.event_count();
1074            account.purge_account_events(ts_now, lookback_secs);
1075            let count_diff = event_count - account.event_count();
1076            if count_diff > 0 {
1077                log::info!(
1078                    "Purged {} event(s) from account {}",
1079                    count_diff,
1080                    account.id()
1081                );
1082            }
1083        }
1084    }
1085
1086    /// Clears the caches index.
1087    pub fn clear_index(&mut self) {
1088        self.index.clear();
1089        log::debug!("Cleared index");
1090    }
1091
1092    /// Resets the cache.
1093    ///
1094    /// All stateful fields are reset to their initial value.
1095    pub fn reset(&mut self) {
1096        log::debug!("Resetting cache");
1097
1098        self.general.clear();
1099        self.currencies.clear();
1100        self.instruments.clear();
1101        self.synthetics.clear();
1102        self.books.clear();
1103        self.own_books.clear();
1104        self.quotes.clear();
1105        self.trades.clear();
1106        self.mark_xrates.clear();
1107        self.mark_prices.clear();
1108        self.index_prices.clear();
1109        self.bars.clear();
1110        self.accounts.clear();
1111        self.orders.clear();
1112        self.order_lists.clear();
1113        self.positions.clear();
1114        self.position_snapshots.clear();
1115        self.greeks.clear();
1116        self.yield_curves.clear();
1117
1118        #[cfg(feature = "defi")]
1119        self.pools.clear();
1120
1121        self.clear_index();
1122
1123        log::info!("Reset cache");
1124    }
1125
1126    /// Dispose of the cache which will close any underlying database adapter.
1127    ///
1128    /// # Panics
1129    ///
1130    /// Panics if closing the database connection fails.
1131    pub fn dispose(&mut self) {
1132        if let Some(database) = &mut self.database {
1133            database.close().expect("Failed to close database");
1134        }
1135    }
1136
1137    /// Flushes the caches database which permanently removes all persisted data.
1138    ///
1139    /// # Panics
1140    ///
1141    /// Panics if flushing the database connection fails.
1142    pub fn flush_db(&mut self) {
1143        if let Some(database) = &mut self.database {
1144            database.flush().expect("Failed to flush database");
1145        }
1146    }
1147
1148    /// Adds a raw bytes `value` to the cache under the `key`.
1149    ///
1150    /// The cache stores only raw bytes; interpretation is the caller's responsibility.
1151    ///
1152    /// # Errors
1153    ///
1154    /// Returns an error if persisting the entry to the backing database fails.
1155    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1156        check_valid_string(key, stringify!(key))?;
1157        check_predicate_false(value.is_empty(), stringify!(value))?;
1158
1159        log::debug!("Adding general {key}");
1160        self.general.insert(key.to_string(), value.clone());
1161
1162        if let Some(database) = &mut self.database {
1163            database.add(key.to_string(), value)?;
1164        }
1165        Ok(())
1166    }
1167
1168    /// Adds an `OrderBook` to the cache.
1169    ///
1170    /// # Errors
1171    ///
1172    /// Returns an error if persisting the order book to the backing database fails.
1173    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1174        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1175
1176        if self.config.save_market_data
1177            && let Some(database) = &mut self.database
1178        {
1179            database.add_order_book(&book)?;
1180        }
1181
1182        self.books.insert(book.instrument_id, book);
1183        Ok(())
1184    }
1185
1186    /// Adds an `OwnOrderBook` to the cache.
1187    ///
1188    /// # Errors
1189    ///
1190    /// Returns an error if persisting the own order book fails.
1191    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1192        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1193
1194        self.own_books.insert(own_book.instrument_id, own_book);
1195        Ok(())
1196    }
1197
1198    /// Adds a `Pool` to the cache.
1199    ///
1200    /// # Errors
1201    ///
1202    /// This function currently does not return errors but follows the same pattern as other add methods for consistency.
1203    #[cfg(feature = "defi")]
1204    pub fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
1205        log::debug!("Adding `Pool` {}", pool.instrument_id);
1206
1207        self.pools.insert(pool.instrument_id, pool);
1208        Ok(())
1209    }
1210
1211    /// Adds the `mark_price` update to the cache.
1212    ///
1213    /// # Errors
1214    ///
1215    /// Returns an error if persisting the mark price to the backing database fails.
1216    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1217        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1218
1219        if self.config.save_market_data {
1220            // TODO: Placeholder and return Result for consistency
1221        }
1222
1223        let mark_prices_deque = self
1224            .mark_prices
1225            .entry(mark_price.instrument_id)
1226            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1227        mark_prices_deque.push_front(mark_price);
1228        Ok(())
1229    }
1230
1231    /// Adds the `index_price` update to the cache.
1232    ///
1233    /// # Errors
1234    ///
1235    /// Returns an error if persisting the index price to the backing database fails.
1236    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1237        log::debug!(
1238            "Adding `IndexPriceUpdate` for {}",
1239            index_price.instrument_id
1240        );
1241
1242        if self.config.save_market_data {
1243            // TODO: Placeholder and return Result for consistency
1244        }
1245
1246        let index_prices_deque = self
1247            .index_prices
1248            .entry(index_price.instrument_id)
1249            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1250        index_prices_deque.push_front(index_price);
1251        Ok(())
1252    }
1253
1254    /// Adds the `funding_rate` update to the cache.
1255    ///
1256    /// # Errors
1257    ///
1258    /// Returns an error if persisting the funding rate update to the backing database fails.
1259    pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1260        log::debug!(
1261            "Adding `FundingRateUpdate` for {}",
1262            funding_rate.instrument_id
1263        );
1264
1265        if self.config.save_market_data {
1266            // TODO: Placeholder and return Result for consistency
1267        }
1268
1269        self.funding_rates
1270            .insert(funding_rate.instrument_id, funding_rate);
1271        Ok(())
1272    }
1273
1274    /// Adds the `quote` tick to the cache.
1275    ///
1276    /// # Errors
1277    ///
1278    /// Returns an error if persisting the quote tick to the backing database fails.
1279    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1280        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1281
1282        if self.config.save_market_data
1283            && let Some(database) = &mut self.database
1284        {
1285            database.add_quote(&quote)?;
1286        }
1287
1288        let quotes_deque = self
1289            .quotes
1290            .entry(quote.instrument_id)
1291            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1292        quotes_deque.push_front(quote);
1293        Ok(())
1294    }
1295
1296    /// Adds the `quotes` to the cache.
1297    ///
1298    /// # Errors
1299    ///
1300    /// Returns an error if persisting the quote ticks to the backing database fails.
1301    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1302        check_slice_not_empty(quotes, stringify!(quotes))?;
1303
1304        let instrument_id = quotes[0].instrument_id;
1305        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1306
1307        if self.config.save_market_data
1308            && let Some(database) = &mut self.database
1309        {
1310            for quote in quotes {
1311                database.add_quote(quote)?;
1312            }
1313        }
1314
1315        let quotes_deque = self
1316            .quotes
1317            .entry(instrument_id)
1318            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1319
1320        for quote in quotes {
1321            quotes_deque.push_front(*quote);
1322        }
1323        Ok(())
1324    }
1325
1326    /// Adds the `trade` tick to the cache.
1327    ///
1328    /// # Errors
1329    ///
1330    /// Returns an error if persisting the trade tick to the backing database fails.
1331    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1332        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1333
1334        if self.config.save_market_data
1335            && let Some(database) = &mut self.database
1336        {
1337            database.add_trade(&trade)?;
1338        }
1339
1340        let trades_deque = self
1341            .trades
1342            .entry(trade.instrument_id)
1343            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1344        trades_deque.push_front(trade);
1345        Ok(())
1346    }
1347
1348    /// Adds the give `trades` to the cache.
1349    ///
1350    /// # Errors
1351    ///
1352    /// Returns an error if persisting the trade ticks to the backing database fails.
1353    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1354        check_slice_not_empty(trades, stringify!(trades))?;
1355
1356        let instrument_id = trades[0].instrument_id;
1357        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1358
1359        if self.config.save_market_data
1360            && let Some(database) = &mut self.database
1361        {
1362            for trade in trades {
1363                database.add_trade(trade)?;
1364            }
1365        }
1366
1367        let trades_deque = self
1368            .trades
1369            .entry(instrument_id)
1370            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1371
1372        for trade in trades {
1373            trades_deque.push_front(*trade);
1374        }
1375        Ok(())
1376    }
1377
1378    /// Adds the `bar` to the cache.
1379    ///
1380    /// # Errors
1381    ///
1382    /// Returns an error if persisting the bar to the backing database fails.
1383    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1384        log::debug!("Adding `Bar` {}", bar.bar_type);
1385
1386        if self.config.save_market_data
1387            && let Some(database) = &mut self.database
1388        {
1389            database.add_bar(&bar)?;
1390        }
1391
1392        let bars = self
1393            .bars
1394            .entry(bar.bar_type)
1395            .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1396        bars.push_front(bar);
1397        Ok(())
1398    }
1399
1400    /// Adds the `bars` to the cache.
1401    ///
1402    /// # Errors
1403    ///
1404    /// Returns an error if persisting the bars to the backing database fails.
1405    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1406        check_slice_not_empty(bars, stringify!(bars))?;
1407
1408        let bar_type = bars[0].bar_type;
1409        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1410
1411        if self.config.save_market_data
1412            && let Some(database) = &mut self.database
1413        {
1414            for bar in bars {
1415                database.add_bar(bar)?;
1416            }
1417        }
1418
1419        let bars_deque = self
1420            .bars
1421            .entry(bar_type)
1422            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1423
1424        for bar in bars {
1425            bars_deque.push_front(*bar);
1426        }
1427        Ok(())
1428    }
1429
1430    /// Adds the `greeks` data to the cache.
1431    ///
1432    /// # Errors
1433    ///
1434    /// Returns an error if persisting the greeks data to the backing database fails.
1435    pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1436        log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1437
1438        if self.config.save_market_data
1439            && let Some(_database) = &mut self.database
1440        {
1441            // TODO: Implement database.add_greeks(&greeks) when database adapter is updated
1442        }
1443
1444        self.greeks.insert(greeks.instrument_id, greeks);
1445        Ok(())
1446    }
1447
1448    /// Gets the greeks data for the `instrument_id`.
1449    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1450        self.greeks.get(instrument_id).cloned()
1451    }
1452
1453    /// Adds the `yield_curve` data to the cache.
1454    ///
1455    /// # Errors
1456    ///
1457    /// Returns an error if persisting the yield curve data to the backing database fails.
1458    pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1459        log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1460
1461        if self.config.save_market_data
1462            && let Some(_database) = &mut self.database
1463        {
1464            // TODO: Implement database.add_yield_curve(&yield_curve) when database adapter is updated
1465        }
1466
1467        self.yield_curves
1468            .insert(yield_curve.curve_name.clone(), yield_curve);
1469        Ok(())
1470    }
1471
1472    /// Gets the yield curve for the `key`.
1473    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1474        self.yield_curves.get(key).map(|curve| {
1475            let curve_clone = curve.clone();
1476            Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1477                as Box<dyn Fn(f64) -> f64>
1478        })
1479    }
1480
1481    /// Adds the `currency` to the cache.
1482    ///
1483    /// # Errors
1484    ///
1485    /// Returns an error if persisting the currency to the backing database fails.
1486    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1487        log::debug!("Adding `Currency` {}", currency.code);
1488
1489        if let Some(database) = &mut self.database {
1490            database.add_currency(&currency)?;
1491        }
1492
1493        self.currencies.insert(currency.code, currency);
1494        Ok(())
1495    }
1496
1497    /// Adds the `instrument` to the cache.
1498    ///
1499    /// # Errors
1500    ///
1501    /// Returns an error if persisting the instrument to the backing database fails.
1502    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1503        log::debug!("Adding `Instrument` {}", instrument.id());
1504
1505        if let Some(database) = &mut self.database {
1506            database.add_instrument(&instrument)?;
1507        }
1508
1509        self.instruments.insert(instrument.id(), instrument);
1510        Ok(())
1511    }
1512
1513    /// Adds the `synthetic` instrument to the cache.
1514    ///
1515    /// # Errors
1516    ///
1517    /// Returns an error if persisting the synthetic instrument to the backing database fails.
1518    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1519        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1520
1521        if let Some(database) = &mut self.database {
1522            database.add_synthetic(&synthetic)?;
1523        }
1524
1525        self.synthetics.insert(synthetic.id, synthetic);
1526        Ok(())
1527    }
1528
1529    /// Adds the `account` to the cache.
1530    ///
1531    /// # Errors
1532    ///
1533    /// Returns an error if persisting the account to the backing database fails.
1534    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1535        log::debug!("Adding `Account` {}", account.id());
1536
1537        if let Some(database) = &mut self.database {
1538            database.add_account(&account)?;
1539        }
1540
1541        let account_id = account.id();
1542        self.accounts.insert(account_id, account);
1543        self.index
1544            .venue_account
1545            .insert(account_id.get_issuer(), account_id);
1546        Ok(())
1547    }
1548
1549    /// Indexes the `client_order_id` with the `venue_order_id`.
1550    ///
1551    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
1552    ///
1553    /// # Errors
1554    ///
1555    /// Returns an error if the existing venue order ID conflicts and overwrite is false.
1556    pub fn add_venue_order_id(
1557        &mut self,
1558        client_order_id: &ClientOrderId,
1559        venue_order_id: &VenueOrderId,
1560        overwrite: bool,
1561    ) -> anyhow::Result<()> {
1562        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1563            && !overwrite
1564            && existing_venue_order_id != venue_order_id
1565        {
1566            anyhow::bail!(
1567                "Existing {existing_venue_order_id} for {client_order_id}
1568                    did not match the given {venue_order_id}.
1569                    If you are writing a test then try a different `venue_order_id`,
1570                    otherwise this is probably a bug."
1571            );
1572        }
1573
1574        self.index
1575            .client_order_ids
1576            .insert(*client_order_id, *venue_order_id);
1577        self.index
1578            .venue_order_ids
1579            .insert(*venue_order_id, *client_order_id);
1580
1581        Ok(())
1582    }
1583
1584    /// Adds the `order` to the cache indexed with any given identifiers.
1585    ///
1586    /// # Parameters
1587    ///
1588    /// `override_existing`: If the added order should 'override' any existing order and replace
1589    /// it in the cache. This is currently used for emulated orders which are
1590    /// being released and transformed into another type.
1591    ///
1592    /// # Errors
1593    ///
1594    /// Returns an error if not `replace_existing` and the `order.client_order_id` is already contained in the cache.
1595    pub fn add_order(
1596        &mut self,
1597        order: OrderAny,
1598        position_id: Option<PositionId>,
1599        client_id: Option<ClientId>,
1600        replace_existing: bool,
1601    ) -> anyhow::Result<()> {
1602        let instrument_id = order.instrument_id();
1603        let venue = instrument_id.venue;
1604        let client_order_id = order.client_order_id();
1605        let strategy_id = order.strategy_id();
1606        let exec_algorithm_id = order.exec_algorithm_id();
1607        let exec_spawn_id = order.exec_spawn_id();
1608
1609        if !replace_existing {
1610            check_key_not_in_map(
1611                &client_order_id,
1612                &self.orders,
1613                stringify!(client_order_id),
1614                stringify!(orders),
1615            )?;
1616        }
1617
1618        log::debug!("Adding {order:?}");
1619
1620        self.index.orders.insert(client_order_id);
1621        self.index
1622            .order_strategy
1623            .insert(client_order_id, strategy_id);
1624        self.index.strategies.insert(strategy_id);
1625
1626        // Update venue -> orders index
1627        self.index
1628            .venue_orders
1629            .entry(venue)
1630            .or_default()
1631            .insert(client_order_id);
1632
1633        // Update instrument -> orders index
1634        self.index
1635            .instrument_orders
1636            .entry(instrument_id)
1637            .or_default()
1638            .insert(client_order_id);
1639
1640        // Update strategy -> orders index
1641        self.index
1642            .strategy_orders
1643            .entry(strategy_id)
1644            .or_default()
1645            .insert(client_order_id);
1646
1647        // Update exec_algorithm -> orders index
1648        // Update exec_algorithm -> orders index
1649        if let (Some(exec_algorithm_id), Some(exec_spawn_id)) = (exec_algorithm_id, exec_spawn_id) {
1650            self.index.exec_algorithms.insert(exec_algorithm_id);
1651
1652            self.index
1653                .exec_algorithm_orders
1654                .entry(exec_algorithm_id)
1655                .or_default()
1656                .insert(client_order_id);
1657
1658            self.index
1659                .exec_spawn_orders
1660                .entry(exec_spawn_id)
1661                .or_default()
1662                .insert(client_order_id);
1663        }
1664
1665        // Update emulation index
1666        match order.emulation_trigger() {
1667            Some(_) => {
1668                self.index.orders_emulated.remove(&client_order_id);
1669            }
1670            None => {
1671                self.index.orders_emulated.insert(client_order_id);
1672            }
1673        }
1674
1675        // Index position ID if provided
1676        if let Some(position_id) = position_id {
1677            self.add_position_id(
1678                &position_id,
1679                &order.instrument_id().venue,
1680                &client_order_id,
1681                &strategy_id,
1682            )?;
1683        }
1684
1685        // Index client ID if provided
1686        if let Some(client_id) = client_id {
1687            self.index.order_client.insert(client_order_id, client_id);
1688            log::debug!("Indexed {client_id:?}");
1689        }
1690
1691        if let Some(database) = &mut self.database {
1692            database.add_order(&order, client_id)?;
1693            // TODO: Implement
1694            // if self.config.snapshot_orders {
1695            //     database.snapshot_order_state(order)?;
1696            // }
1697        }
1698
1699        self.orders.insert(client_order_id, order);
1700
1701        Ok(())
1702    }
1703
1704    /// Indexes the `position_id` with the other given IDs.
1705    ///
1706    /// # Errors
1707    ///
1708    /// Returns an error if indexing position ID in the backing database fails.
1709    pub fn add_position_id(
1710        &mut self,
1711        position_id: &PositionId,
1712        venue: &Venue,
1713        client_order_id: &ClientOrderId,
1714        strategy_id: &StrategyId,
1715    ) -> anyhow::Result<()> {
1716        self.index
1717            .order_position
1718            .insert(*client_order_id, *position_id);
1719
1720        // Index: ClientOrderId -> PositionId
1721        if let Some(database) = &mut self.database {
1722            database.index_order_position(*client_order_id, *position_id)?;
1723        }
1724
1725        // Index: PositionId -> StrategyId
1726        self.index
1727            .position_strategy
1728            .insert(*position_id, *strategy_id);
1729
1730        // Index: PositionId -> set[ClientOrderId]
1731        self.index
1732            .position_orders
1733            .entry(*position_id)
1734            .or_default()
1735            .insert(*client_order_id);
1736
1737        // Index: StrategyId -> set[PositionId]
1738        self.index
1739            .strategy_positions
1740            .entry(*strategy_id)
1741            .or_default()
1742            .insert(*position_id);
1743
1744        // Index: Venue -> set[PositionId]
1745        self.index
1746            .venue_positions
1747            .entry(*venue)
1748            .or_default()
1749            .insert(*position_id);
1750
1751        Ok(())
1752    }
1753
1754    /// Adds the `position` to the cache.
1755    ///
1756    /// # Errors
1757    ///
1758    /// Returns an error if persisting the position to the backing database fails.
1759    pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1760        self.positions.insert(position.id, position.clone());
1761        self.index.positions.insert(position.id);
1762        self.index.positions_open.insert(position.id);
1763
1764        log::debug!("Adding {position}");
1765
1766        self.add_position_id(
1767            &position.id,
1768            &position.instrument_id.venue,
1769            &position.opening_order_id,
1770            &position.strategy_id,
1771        )?;
1772
1773        let venue = position.instrument_id.venue;
1774        let venue_positions = self.index.venue_positions.entry(venue).or_default();
1775        venue_positions.insert(position.id);
1776
1777        // Index: InstrumentId -> AHashSet
1778        let instrument_id = position.instrument_id;
1779        let instrument_positions = self
1780            .index
1781            .instrument_positions
1782            .entry(instrument_id)
1783            .or_default();
1784        instrument_positions.insert(position.id);
1785
1786        if let Some(database) = &mut self.database {
1787            database.add_position(&position)?;
1788            // TODO: Implement position snapshots
1789            // if self.snapshot_positions {
1790            //     database.snapshot_position_state(
1791            //         position,
1792            //         position.ts_last,
1793            //         self.calculate_unrealized_pnl(&position),
1794            //     )?;
1795            // }
1796        }
1797
1798        Ok(())
1799    }
1800
1801    /// Updates the `account` in the cache.
1802    ///
1803    /// # Errors
1804    ///
1805    /// Returns an error if updating the account in the database fails.
1806    pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1807        if let Some(database) = &mut self.database {
1808            database.update_account(&account)?;
1809        }
1810        Ok(())
1811    }
1812
1813    /// Updates the `order` in the cache.
1814    ///
1815    /// # Errors
1816    ///
1817    /// Returns an error if updating the order in the database fails.
1818    pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1819        let client_order_id = order.client_order_id();
1820
1821        // Update venue order ID
1822        if let Some(venue_order_id) = order.venue_order_id() {
1823            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
1824            // venues which use a cancel+replace update strategy.
1825            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1826                // TODO: If the last event was `OrderUpdated` then overwrite should be true
1827                self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1828            }
1829        }
1830
1831        // Update in-flight state
1832        if order.is_inflight() {
1833            self.index.orders_inflight.insert(client_order_id);
1834        } else {
1835            self.index.orders_inflight.remove(&client_order_id);
1836        }
1837
1838        // Update open/closed state
1839        if order.is_open() {
1840            self.index.orders_closed.remove(&client_order_id);
1841            self.index.orders_open.insert(client_order_id);
1842        } else if order.is_closed() {
1843            self.index.orders_open.remove(&client_order_id);
1844            self.index.orders_pending_cancel.remove(&client_order_id);
1845            self.index.orders_closed.insert(client_order_id);
1846        }
1847
1848        // Update emulation
1849        if let Some(emulation_trigger) = order.emulation_trigger() {
1850            match emulation_trigger {
1851                TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1852                _ => self.index.orders_emulated.insert(client_order_id),
1853            };
1854        }
1855
1856        // Update own book
1857        if self.own_order_book(&order.instrument_id()).is_some()
1858            && should_handle_own_book_order(order)
1859        {
1860            self.update_own_order_book(order);
1861        }
1862
1863        if let Some(database) = &mut self.database {
1864            database.update_order(order.last_event())?;
1865            // TODO: Implement order snapshots
1866            // if self.snapshot_orders {
1867            //     database.snapshot_order_state(order)?;
1868            // }
1869        }
1870
1871        // update the order in the cache
1872        self.orders.insert(client_order_id, order.clone());
1873
1874        Ok(())
1875    }
1876
1877    /// Updates the `order` as pending cancel locally.
1878    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1879        self.index
1880            .orders_pending_cancel
1881            .insert(order.client_order_id());
1882    }
1883
1884    /// Updates the `position` in the cache.
1885    ///
1886    /// # Errors
1887    ///
1888    /// Returns an error if updating the position in the database fails.
1889    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1890        // Update open/closed state
1891
1892        if position.is_open() {
1893            self.index.positions_open.insert(position.id);
1894            self.index.positions_closed.remove(&position.id);
1895        } else {
1896            self.index.positions_closed.insert(position.id);
1897            self.index.positions_open.remove(&position.id);
1898        }
1899
1900        if let Some(database) = &mut self.database {
1901            database.update_position(position)?;
1902            // TODO: Implement order snapshots
1903            // if self.snapshot_orders {
1904            //     database.snapshot_order_state(order)?;
1905            // }
1906        }
1907
1908        self.positions.insert(position.id, position.clone());
1909
1910        Ok(())
1911    }
1912
1913    /// Creates a snapshot of the `position` by cloning it, assigning a new ID,
1914    /// serializing it, and storing it in the position snapshots.
1915    ///
1916    /// # Errors
1917    ///
1918    /// Returns an error if serializing or storing the position snapshot fails.
1919    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1920        let position_id = position.id;
1921
1922        let mut copied_position = position.clone();
1923        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1924        copied_position.id = PositionId::new(new_id);
1925
1926        // Serialize the position (TODO: temporily just to JSON to remove a dependency)
1927        let position_serialized = serde_json::to_vec(&copied_position)?;
1928
1929        let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1930        let new_snapshots = match snapshots {
1931            Some(existing_snapshots) => {
1932                let mut combined = existing_snapshots.to_vec();
1933                combined.extend(position_serialized);
1934                Bytes::from(combined)
1935            }
1936            None => Bytes::from(position_serialized),
1937        };
1938        self.position_snapshots.insert(position_id, new_snapshots);
1939
1940        log::debug!("Snapshot {copied_position}");
1941        Ok(())
1942    }
1943
1944    /// Creates a snapshot of the `position` state in the database.
1945    ///
1946    /// # Errors
1947    ///
1948    /// Returns an error if snapshotting the position state fails.
1949    pub fn snapshot_position_state(
1950        &mut self,
1951        position: &Position,
1952        // ts_snapshot: u64,
1953        // unrealized_pnl: Option<Money>,
1954        open_only: Option<bool>,
1955    ) -> anyhow::Result<()> {
1956        let open_only = open_only.unwrap_or(true);
1957
1958        if open_only && !position.is_open() {
1959            return Ok(());
1960        }
1961
1962        if let Some(database) = &mut self.database {
1963            database.snapshot_position_state(position).map_err(|e| {
1964                log::error!(
1965                    "Failed to snapshot position state for {}: {e:?}",
1966                    position.id
1967                );
1968                e
1969            })?;
1970        } else {
1971            log::warn!(
1972                "Cannot snapshot position state for {} (no database configured)",
1973                position.id
1974            );
1975        }
1976
1977        // Ok(())
1978        todo!()
1979    }
1980
1981    /// Gets the OMS type for the `position_id`.
1982    #[must_use]
1983    pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
1984        // Get OMS type from the index
1985        if self.index.position_strategy.contains_key(position_id) {
1986            // For now, we'll default to NETTING
1987            // TODO: Store and retrieve actual OMS type per position
1988            Some(OmsType::Netting)
1989        } else {
1990            None
1991        }
1992    }
1993
1994    /// Gets position snapshot bytes for the `position_id`.
1995    #[must_use]
1996    pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
1997        self.position_snapshots.get(position_id).map(|b| b.to_vec())
1998    }
1999
2000    /// Gets position snapshot IDs for the `instrument_id`.
2001    #[must_use]
2002    pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> HashSet<PositionId> {
2003        // Get snapshot position IDs that match the instrument
2004        let mut result = HashSet::new();
2005        for (position_id, _) in &self.position_snapshots {
2006            // Check if this position is for the requested instrument
2007            if let Some(position) = self.positions.get(position_id)
2008                && position.instrument_id == *instrument_id
2009            {
2010                result.insert(*position_id);
2011            }
2012        }
2013        result
2014    }
2015
2016    /// Snapshots the `order` state in the database.
2017    ///
2018    /// # Errors
2019    ///
2020    /// Returns an error if snapshotting the order state fails.
2021    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2022        let database = if let Some(database) = &self.database {
2023            database
2024        } else {
2025            log::warn!(
2026                "Cannot snapshot order state for {} (no database configured)",
2027                order.client_order_id()
2028            );
2029            return Ok(());
2030        };
2031
2032        database.snapshot_order_state(order)
2033    }
2034
2035    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
2036
2037    fn build_order_query_filter_set(
2038        &self,
2039        venue: Option<&Venue>,
2040        instrument_id: Option<&InstrumentId>,
2041        strategy_id: Option<&StrategyId>,
2042    ) -> Option<AHashSet<ClientOrderId>> {
2043        let mut query: Option<AHashSet<ClientOrderId>> = None;
2044
2045        if let Some(venue) = venue {
2046            query = Some(
2047                self.index
2048                    .venue_orders
2049                    .get(venue)
2050                    .cloned()
2051                    .unwrap_or_default(),
2052            );
2053        }
2054
2055        if let Some(instrument_id) = instrument_id {
2056            let instrument_orders = self
2057                .index
2058                .instrument_orders
2059                .get(instrument_id)
2060                .cloned()
2061                .unwrap_or_default();
2062
2063            if let Some(existing_query) = &mut query {
2064                *existing_query = existing_query
2065                    .intersection(&instrument_orders)
2066                    .copied()
2067                    .collect();
2068            } else {
2069                query = Some(instrument_orders);
2070            }
2071        }
2072
2073        if let Some(strategy_id) = strategy_id {
2074            let strategy_orders = self
2075                .index
2076                .strategy_orders
2077                .get(strategy_id)
2078                .cloned()
2079                .unwrap_or_default();
2080
2081            if let Some(existing_query) = &mut query {
2082                *existing_query = existing_query
2083                    .intersection(&strategy_orders)
2084                    .copied()
2085                    .collect();
2086            } else {
2087                query = Some(strategy_orders);
2088            }
2089        }
2090
2091        query
2092    }
2093
2094    fn build_position_query_filter_set(
2095        &self,
2096        venue: Option<&Venue>,
2097        instrument_id: Option<&InstrumentId>,
2098        strategy_id: Option<&StrategyId>,
2099    ) -> Option<AHashSet<PositionId>> {
2100        let mut query: Option<AHashSet<PositionId>> = None;
2101
2102        if let Some(venue) = venue {
2103            query = Some(
2104                self.index
2105                    .venue_positions
2106                    .get(venue)
2107                    .cloned()
2108                    .unwrap_or_default(),
2109            );
2110        }
2111
2112        if let Some(instrument_id) = instrument_id {
2113            let instrument_positions = self
2114                .index
2115                .instrument_positions
2116                .get(instrument_id)
2117                .cloned()
2118                .unwrap_or_default();
2119
2120            if let Some(existing_query) = query {
2121                query = Some(
2122                    existing_query
2123                        .intersection(&instrument_positions)
2124                        .copied()
2125                        .collect(),
2126                );
2127            } else {
2128                query = Some(instrument_positions);
2129            }
2130        }
2131
2132        if let Some(strategy_id) = strategy_id {
2133            let strategy_positions = self
2134                .index
2135                .strategy_positions
2136                .get(strategy_id)
2137                .cloned()
2138                .unwrap_or_default();
2139
2140            if let Some(existing_query) = query {
2141                query = Some(
2142                    existing_query
2143                        .intersection(&strategy_positions)
2144                        .copied()
2145                        .collect(),
2146                );
2147            } else {
2148                query = Some(strategy_positions);
2149            }
2150        }
2151
2152        query
2153    }
2154
2155    /// Retrieves orders corresponding to the `client_order_ids`, optionally filtering by `side`.
2156    ///
2157    /// # Panics
2158    ///
2159    /// Panics if any `client_order_id` in the set is not found in the cache.
2160    fn get_orders_for_ids(
2161        &self,
2162        client_order_ids: &AHashSet<ClientOrderId>,
2163        side: Option<OrderSide>,
2164    ) -> Vec<&OrderAny> {
2165        let side = side.unwrap_or(OrderSide::NoOrderSide);
2166        let mut orders = Vec::new();
2167
2168        for client_order_id in client_order_ids {
2169            let order = self
2170                .orders
2171                .get(client_order_id)
2172                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2173            if side == OrderSide::NoOrderSide || side == order.order_side() {
2174                orders.push(order);
2175            }
2176        }
2177
2178        orders
2179    }
2180
2181    /// Retrieves positions corresponding to the `position_ids`, optionally filtering by `side`.
2182    ///
2183    /// # Panics
2184    ///
2185    /// Panics if any `position_id` in the set is not found in the cache.
2186    fn get_positions_for_ids(
2187        &self,
2188        position_ids: &AHashSet<PositionId>,
2189        side: Option<PositionSide>,
2190    ) -> Vec<&Position> {
2191        let side = side.unwrap_or(PositionSide::NoPositionSide);
2192        let mut positions = Vec::new();
2193
2194        for position_id in position_ids {
2195            let position = self
2196                .positions
2197                .get(position_id)
2198                .unwrap_or_else(|| panic!("Position {position_id} not found"));
2199            if side == PositionSide::NoPositionSide || side == position.side {
2200                positions.push(position);
2201            }
2202        }
2203
2204        positions
2205    }
2206
2207    /// Returns the `ClientOrderId`s of all orders.
2208    #[must_use]
2209    pub fn client_order_ids(
2210        &self,
2211        venue: Option<&Venue>,
2212        instrument_id: Option<&InstrumentId>,
2213        strategy_id: Option<&StrategyId>,
2214    ) -> AHashSet<ClientOrderId> {
2215        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2216        match query {
2217            Some(query) => self.index.orders.intersection(&query).copied().collect(),
2218            None => self.index.orders.clone(),
2219        }
2220    }
2221
2222    /// Returns the `ClientOrderId`s of all open orders.
2223    #[must_use]
2224    pub fn client_order_ids_open(
2225        &self,
2226        venue: Option<&Venue>,
2227        instrument_id: Option<&InstrumentId>,
2228        strategy_id: Option<&StrategyId>,
2229    ) -> AHashSet<ClientOrderId> {
2230        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2231        match query {
2232            Some(query) => self
2233                .index
2234                .orders_open
2235                .intersection(&query)
2236                .copied()
2237                .collect(),
2238            None => self.index.orders_open.clone(),
2239        }
2240    }
2241
2242    /// Returns the `ClientOrderId`s of all closed orders.
2243    #[must_use]
2244    pub fn client_order_ids_closed(
2245        &self,
2246        venue: Option<&Venue>,
2247        instrument_id: Option<&InstrumentId>,
2248        strategy_id: Option<&StrategyId>,
2249    ) -> AHashSet<ClientOrderId> {
2250        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2251        match query {
2252            Some(query) => self
2253                .index
2254                .orders_closed
2255                .intersection(&query)
2256                .copied()
2257                .collect(),
2258            None => self.index.orders_closed.clone(),
2259        }
2260    }
2261
2262    /// Returns the `ClientOrderId`s of all emulated orders.
2263    #[must_use]
2264    pub fn client_order_ids_emulated(
2265        &self,
2266        venue: Option<&Venue>,
2267        instrument_id: Option<&InstrumentId>,
2268        strategy_id: Option<&StrategyId>,
2269    ) -> AHashSet<ClientOrderId> {
2270        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2271        match query {
2272            Some(query) => self
2273                .index
2274                .orders_emulated
2275                .intersection(&query)
2276                .copied()
2277                .collect(),
2278            None => self.index.orders_emulated.clone(),
2279        }
2280    }
2281
2282    /// Returns the `ClientOrderId`s of all in-flight orders.
2283    #[must_use]
2284    pub fn client_order_ids_inflight(
2285        &self,
2286        venue: Option<&Venue>,
2287        instrument_id: Option<&InstrumentId>,
2288        strategy_id: Option<&StrategyId>,
2289    ) -> AHashSet<ClientOrderId> {
2290        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2291        match query {
2292            Some(query) => self
2293                .index
2294                .orders_inflight
2295                .intersection(&query)
2296                .copied()
2297                .collect(),
2298            None => self.index.orders_inflight.clone(),
2299        }
2300    }
2301
2302    /// Returns `PositionId`s of all positions.
2303    #[must_use]
2304    pub fn position_ids(
2305        &self,
2306        venue: Option<&Venue>,
2307        instrument_id: Option<&InstrumentId>,
2308        strategy_id: Option<&StrategyId>,
2309    ) -> AHashSet<PositionId> {
2310        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2311        match query {
2312            Some(query) => self.index.positions.intersection(&query).copied().collect(),
2313            None => self.index.positions.clone(),
2314        }
2315    }
2316
2317    /// Returns the `PositionId`s of all open positions.
2318    #[must_use]
2319    pub fn position_open_ids(
2320        &self,
2321        venue: Option<&Venue>,
2322        instrument_id: Option<&InstrumentId>,
2323        strategy_id: Option<&StrategyId>,
2324    ) -> AHashSet<PositionId> {
2325        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2326        match query {
2327            Some(query) => self
2328                .index
2329                .positions_open
2330                .intersection(&query)
2331                .copied()
2332                .collect(),
2333            None => self.index.positions_open.clone(),
2334        }
2335    }
2336
2337    /// Returns the `PositionId`s of all closed positions.
2338    #[must_use]
2339    pub fn position_closed_ids(
2340        &self,
2341        venue: Option<&Venue>,
2342        instrument_id: Option<&InstrumentId>,
2343        strategy_id: Option<&StrategyId>,
2344    ) -> AHashSet<PositionId> {
2345        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2346        match query {
2347            Some(query) => self
2348                .index
2349                .positions_closed
2350                .intersection(&query)
2351                .copied()
2352                .collect(),
2353            None => self.index.positions_closed.clone(),
2354        }
2355    }
2356
2357    /// Returns the `ComponentId`s of all actors.
2358    #[must_use]
2359    pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2360        self.index.actors.clone()
2361    }
2362
2363    /// Returns the `StrategyId`s of all strategies.
2364    #[must_use]
2365    pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2366        self.index.strategies.clone()
2367    }
2368
2369    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
2370    #[must_use]
2371    pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2372        self.index.exec_algorithms.clone()
2373    }
2374
2375    // -- ORDER QUERIES ---------------------------------------------------------------------------
2376
2377    /// Gets a reference to the order with the `client_order_id` (if found).
2378    #[must_use]
2379    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2380        self.orders.get(client_order_id)
2381    }
2382
2383    /// Gets a reference to the order with the `client_order_id` (if found).
2384    #[must_use]
2385    pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2386        self.orders.get_mut(client_order_id)
2387    }
2388
2389    /// Gets a reference to the client order ID for the `venue_order_id` (if found).
2390    #[must_use]
2391    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2392        self.index.venue_order_ids.get(venue_order_id)
2393    }
2394
2395    /// Gets a reference to the venue order ID for the `client_order_id` (if found).
2396    #[must_use]
2397    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2398        self.index.client_order_ids.get(client_order_id)
2399    }
2400
2401    /// Gets a reference to the client ID indexed for then `client_order_id` (if found).
2402    #[must_use]
2403    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2404        self.index.order_client.get(client_order_id)
2405    }
2406
2407    /// Returns references to all orders matching the optional filter parameters.
2408    #[must_use]
2409    pub fn orders(
2410        &self,
2411        venue: Option<&Venue>,
2412        instrument_id: Option<&InstrumentId>,
2413        strategy_id: Option<&StrategyId>,
2414        side: Option<OrderSide>,
2415    ) -> Vec<&OrderAny> {
2416        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2417        self.get_orders_for_ids(&client_order_ids, side)
2418    }
2419
2420    /// Returns references to all open orders matching the optional filter parameters.
2421    #[must_use]
2422    pub fn orders_open(
2423        &self,
2424        venue: Option<&Venue>,
2425        instrument_id: Option<&InstrumentId>,
2426        strategy_id: Option<&StrategyId>,
2427        side: Option<OrderSide>,
2428    ) -> Vec<&OrderAny> {
2429        let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2430        self.get_orders_for_ids(&client_order_ids, side)
2431    }
2432
2433    /// Returns references to all closed orders matching the optional filter parameters.
2434    #[must_use]
2435    pub fn orders_closed(
2436        &self,
2437        venue: Option<&Venue>,
2438        instrument_id: Option<&InstrumentId>,
2439        strategy_id: Option<&StrategyId>,
2440        side: Option<OrderSide>,
2441    ) -> Vec<&OrderAny> {
2442        let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2443        self.get_orders_for_ids(&client_order_ids, side)
2444    }
2445
2446    /// Returns references to all emulated orders matching the optional filter parameters.
2447    #[must_use]
2448    pub fn orders_emulated(
2449        &self,
2450        venue: Option<&Venue>,
2451        instrument_id: Option<&InstrumentId>,
2452        strategy_id: Option<&StrategyId>,
2453        side: Option<OrderSide>,
2454    ) -> Vec<&OrderAny> {
2455        let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2456        self.get_orders_for_ids(&client_order_ids, side)
2457    }
2458
2459    /// Returns references to all in-flight orders matching the optional filter parameters.
2460    #[must_use]
2461    pub fn orders_inflight(
2462        &self,
2463        venue: Option<&Venue>,
2464        instrument_id: Option<&InstrumentId>,
2465        strategy_id: Option<&StrategyId>,
2466        side: Option<OrderSide>,
2467    ) -> Vec<&OrderAny> {
2468        let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2469        self.get_orders_for_ids(&client_order_ids, side)
2470    }
2471
2472    /// Returns references to all orders for the `position_id`.
2473    #[must_use]
2474    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2475        let client_order_ids = self.index.position_orders.get(position_id);
2476        match client_order_ids {
2477            Some(client_order_ids) => {
2478                self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2479            }
2480            None => Vec::new(),
2481        }
2482    }
2483
2484    /// Returns whether an order with the `client_order_id` exists.
2485    #[must_use]
2486    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2487        self.index.orders.contains(client_order_id)
2488    }
2489
2490    /// Returns whether an order with the `client_order_id` is open.
2491    #[must_use]
2492    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2493        self.index.orders_open.contains(client_order_id)
2494    }
2495
2496    /// Returns whether an order with the `client_order_id` is closed.
2497    #[must_use]
2498    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2499        self.index.orders_closed.contains(client_order_id)
2500    }
2501
2502    /// Returns whether an order with the `client_order_id` is emulated.
2503    #[must_use]
2504    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2505        self.index.orders_emulated.contains(client_order_id)
2506    }
2507
2508    /// Returns whether an order with the `client_order_id` is in-flight.
2509    #[must_use]
2510    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2511        self.index.orders_inflight.contains(client_order_id)
2512    }
2513
2514    /// Returns whether an order with the `client_order_id` is `PENDING_CANCEL` locally.
2515    #[must_use]
2516    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2517        self.index.orders_pending_cancel.contains(client_order_id)
2518    }
2519
2520    /// Returns the count of all open orders.
2521    #[must_use]
2522    pub fn orders_open_count(
2523        &self,
2524        venue: Option<&Venue>,
2525        instrument_id: Option<&InstrumentId>,
2526        strategy_id: Option<&StrategyId>,
2527        side: Option<OrderSide>,
2528    ) -> usize {
2529        self.orders_open(venue, instrument_id, strategy_id, side)
2530            .len()
2531    }
2532
2533    /// Returns the count of all closed orders.
2534    #[must_use]
2535    pub fn orders_closed_count(
2536        &self,
2537        venue: Option<&Venue>,
2538        instrument_id: Option<&InstrumentId>,
2539        strategy_id: Option<&StrategyId>,
2540        side: Option<OrderSide>,
2541    ) -> usize {
2542        self.orders_closed(venue, instrument_id, strategy_id, side)
2543            .len()
2544    }
2545
2546    /// Returns the count of all emulated orders.
2547    #[must_use]
2548    pub fn orders_emulated_count(
2549        &self,
2550        venue: Option<&Venue>,
2551        instrument_id: Option<&InstrumentId>,
2552        strategy_id: Option<&StrategyId>,
2553        side: Option<OrderSide>,
2554    ) -> usize {
2555        self.orders_emulated(venue, instrument_id, strategy_id, side)
2556            .len()
2557    }
2558
2559    /// Returns the count of all in-flight orders.
2560    #[must_use]
2561    pub fn orders_inflight_count(
2562        &self,
2563        venue: Option<&Venue>,
2564        instrument_id: Option<&InstrumentId>,
2565        strategy_id: Option<&StrategyId>,
2566        side: Option<OrderSide>,
2567    ) -> usize {
2568        self.orders_inflight(venue, instrument_id, strategy_id, side)
2569            .len()
2570    }
2571
2572    /// Returns the count of all orders.
2573    #[must_use]
2574    pub fn orders_total_count(
2575        &self,
2576        venue: Option<&Venue>,
2577        instrument_id: Option<&InstrumentId>,
2578        strategy_id: Option<&StrategyId>,
2579        side: Option<OrderSide>,
2580    ) -> usize {
2581        self.orders(venue, instrument_id, strategy_id, side).len()
2582    }
2583
2584    /// Returns the order list for the `order_list_id`.
2585    #[must_use]
2586    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2587        self.order_lists.get(order_list_id)
2588    }
2589
2590    /// Returns all order lists matching the optional filter parameters.
2591    #[must_use]
2592    pub fn order_lists(
2593        &self,
2594        venue: Option<&Venue>,
2595        instrument_id: Option<&InstrumentId>,
2596        strategy_id: Option<&StrategyId>,
2597    ) -> Vec<&OrderList> {
2598        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2599
2600        if let Some(venue) = venue {
2601            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2602        }
2603
2604        if let Some(instrument_id) = instrument_id {
2605            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2606        }
2607
2608        if let Some(strategy_id) = strategy_id {
2609            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2610        }
2611
2612        order_lists
2613    }
2614
2615    /// Returns whether an order list with the `order_list_id` exists.
2616    #[must_use]
2617    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2618        self.order_lists.contains_key(order_list_id)
2619    }
2620
2621    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
2622
2623    /// Returns references to all orders associated with the `exec_algorithm_id` matching the
2624    /// optional filter parameters.
2625    #[must_use]
2626    pub fn orders_for_exec_algorithm(
2627        &self,
2628        exec_algorithm_id: &ExecAlgorithmId,
2629        venue: Option<&Venue>,
2630        instrument_id: Option<&InstrumentId>,
2631        strategy_id: Option<&StrategyId>,
2632        side: Option<OrderSide>,
2633    ) -> Vec<&OrderAny> {
2634        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2635        let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2636
2637        if let Some(query) = query
2638            && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
2639        {
2640            let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2641        }
2642
2643        if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2644            self.get_orders_for_ids(exec_algorithm_order_ids, side)
2645        } else {
2646            Vec::new()
2647        }
2648    }
2649
2650    /// Returns references to all orders with the `exec_spawn_id`.
2651    #[must_use]
2652    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2653        self.get_orders_for_ids(
2654            self.index
2655                .exec_spawn_orders
2656                .get(exec_spawn_id)
2657                .unwrap_or(&AHashSet::new()),
2658            None,
2659        )
2660    }
2661
2662    /// Returns the total order quantity for the `exec_spawn_id`.
2663    #[must_use]
2664    pub fn exec_spawn_total_quantity(
2665        &self,
2666        exec_spawn_id: &ClientOrderId,
2667        active_only: bool,
2668    ) -> Option<Quantity> {
2669        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2670
2671        let mut total_quantity: Option<Quantity> = None;
2672
2673        for spawn_order in exec_spawn_orders {
2674            if !active_only || !spawn_order.is_closed() {
2675                if let Some(mut total_quantity) = total_quantity {
2676                    total_quantity += spawn_order.quantity();
2677                }
2678            } else {
2679                total_quantity = Some(spawn_order.quantity());
2680            }
2681        }
2682
2683        total_quantity
2684    }
2685
2686    /// Returns the total filled quantity for all orders with the `exec_spawn_id`.
2687    #[must_use]
2688    pub fn exec_spawn_total_filled_qty(
2689        &self,
2690        exec_spawn_id: &ClientOrderId,
2691        active_only: bool,
2692    ) -> Option<Quantity> {
2693        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2694
2695        let mut total_quantity: Option<Quantity> = None;
2696
2697        for spawn_order in exec_spawn_orders {
2698            if !active_only || !spawn_order.is_closed() {
2699                if let Some(mut total_quantity) = total_quantity {
2700                    total_quantity += spawn_order.filled_qty();
2701                }
2702            } else {
2703                total_quantity = Some(spawn_order.filled_qty());
2704            }
2705        }
2706
2707        total_quantity
2708    }
2709
2710    /// Returns the total leaves quantity for all orders with the `exec_spawn_id`.
2711    #[must_use]
2712    pub fn exec_spawn_total_leaves_qty(
2713        &self,
2714        exec_spawn_id: &ClientOrderId,
2715        active_only: bool,
2716    ) -> Option<Quantity> {
2717        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2718
2719        let mut total_quantity: Option<Quantity> = None;
2720
2721        for spawn_order in exec_spawn_orders {
2722            if !active_only || !spawn_order.is_closed() {
2723                if let Some(mut total_quantity) = total_quantity {
2724                    total_quantity += spawn_order.leaves_qty();
2725                }
2726            } else {
2727                total_quantity = Some(spawn_order.leaves_qty());
2728            }
2729        }
2730
2731        total_quantity
2732    }
2733
2734    // -- POSITION QUERIES ------------------------------------------------------------------------
2735
2736    /// Returns a reference to the position with the `position_id` (if found).
2737    #[must_use]
2738    pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2739        self.positions.get(position_id)
2740    }
2741
2742    /// Returns a reference to the position for the `client_order_id` (if found).
2743    #[must_use]
2744    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2745        self.index
2746            .order_position
2747            .get(client_order_id)
2748            .and_then(|position_id| self.positions.get(position_id))
2749    }
2750
2751    /// Returns a reference to the position ID for the `client_order_id` (if found).
2752    #[must_use]
2753    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2754        self.index.order_position.get(client_order_id)
2755    }
2756
2757    /// Returns a reference to all positions matching the optional filter parameters.
2758    #[must_use]
2759    pub fn positions(
2760        &self,
2761        venue: Option<&Venue>,
2762        instrument_id: Option<&InstrumentId>,
2763        strategy_id: Option<&StrategyId>,
2764        side: Option<PositionSide>,
2765    ) -> Vec<&Position> {
2766        let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2767        self.get_positions_for_ids(&position_ids, side)
2768    }
2769
2770    /// Returns a reference to all open positions matching the optional filter parameters.
2771    #[must_use]
2772    pub fn positions_open(
2773        &self,
2774        venue: Option<&Venue>,
2775        instrument_id: Option<&InstrumentId>,
2776        strategy_id: Option<&StrategyId>,
2777        side: Option<PositionSide>,
2778    ) -> Vec<&Position> {
2779        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2780        self.get_positions_for_ids(&position_ids, side)
2781    }
2782
2783    /// Returns a reference to all closed positions matching the optional filter parameters.
2784    #[must_use]
2785    pub fn positions_closed(
2786        &self,
2787        venue: Option<&Venue>,
2788        instrument_id: Option<&InstrumentId>,
2789        strategy_id: Option<&StrategyId>,
2790        side: Option<PositionSide>,
2791    ) -> Vec<&Position> {
2792        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2793        self.get_positions_for_ids(&position_ids, side)
2794    }
2795
2796    /// Returns whether a position with the `position_id` exists.
2797    #[must_use]
2798    pub fn position_exists(&self, position_id: &PositionId) -> bool {
2799        self.index.positions.contains(position_id)
2800    }
2801
2802    /// Returns whether a position with the `position_id` is open.
2803    #[must_use]
2804    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2805        self.index.positions_open.contains(position_id)
2806    }
2807
2808    /// Returns whether a position with the `position_id` is closed.
2809    #[must_use]
2810    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2811        self.index.positions_closed.contains(position_id)
2812    }
2813
2814    /// Returns the count of all open positions.
2815    #[must_use]
2816    pub fn positions_open_count(
2817        &self,
2818        venue: Option<&Venue>,
2819        instrument_id: Option<&InstrumentId>,
2820        strategy_id: Option<&StrategyId>,
2821        side: Option<PositionSide>,
2822    ) -> usize {
2823        self.positions_open(venue, instrument_id, strategy_id, side)
2824            .len()
2825    }
2826
2827    /// Returns the count of all closed positions.
2828    #[must_use]
2829    pub fn positions_closed_count(
2830        &self,
2831        venue: Option<&Venue>,
2832        instrument_id: Option<&InstrumentId>,
2833        strategy_id: Option<&StrategyId>,
2834        side: Option<PositionSide>,
2835    ) -> usize {
2836        self.positions_closed(venue, instrument_id, strategy_id, side)
2837            .len()
2838    }
2839
2840    /// Returns the count of all positions.
2841    #[must_use]
2842    pub fn positions_total_count(
2843        &self,
2844        venue: Option<&Venue>,
2845        instrument_id: Option<&InstrumentId>,
2846        strategy_id: Option<&StrategyId>,
2847        side: Option<PositionSide>,
2848    ) -> usize {
2849        self.positions(venue, instrument_id, strategy_id, side)
2850            .len()
2851    }
2852
2853    // -- STRATEGY QUERIES ------------------------------------------------------------------------
2854
2855    /// Gets a reference to the strategy ID for the `client_order_id` (if found).
2856    #[must_use]
2857    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2858        self.index.order_strategy.get(client_order_id)
2859    }
2860
2861    /// Gets a reference to the strategy ID for the `position_id` (if found).
2862    #[must_use]
2863    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2864        self.index.position_strategy.get(position_id)
2865    }
2866
2867    // -- GENERAL ---------------------------------------------------------------------------------
2868
2869    /// Gets a reference to the general value for the `key` (if found).
2870    ///
2871    /// # Errors
2872    ///
2873    /// Returns an error if the `key` is invalid.
2874    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2875        check_valid_string(key, stringify!(key))?;
2876
2877        Ok(self.general.get(key))
2878    }
2879
2880    // -- DATA QUERIES ----------------------------------------------------------------------------
2881
2882    /// Returns the price for the `instrument_id` and `price_type` (if found).
2883    #[must_use]
2884    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2885        match price_type {
2886            PriceType::Bid => self
2887                .quotes
2888                .get(instrument_id)
2889                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2890            PriceType::Ask => self
2891                .quotes
2892                .get(instrument_id)
2893                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2894            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2895                quotes.front().map(|quote| {
2896                    Price::new(
2897                        f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2898                        quote.bid_price.precision + 1,
2899                    )
2900                })
2901            }),
2902            PriceType::Last => self
2903                .trades
2904                .get(instrument_id)
2905                .and_then(|trades| trades.front().map(|trade| trade.price)),
2906            PriceType::Mark => self
2907                .mark_prices
2908                .get(instrument_id)
2909                .and_then(|marks| marks.front().map(|mark| mark.value)),
2910        }
2911    }
2912
2913    /// Gets all quotes for the `instrument_id`.
2914    #[must_use]
2915    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2916        self.quotes
2917            .get(instrument_id)
2918            .map(|quotes| quotes.iter().copied().collect())
2919    }
2920
2921    /// Gets all trades for the `instrument_id`.
2922    #[must_use]
2923    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2924        self.trades
2925            .get(instrument_id)
2926            .map(|trades| trades.iter().copied().collect())
2927    }
2928
2929    /// Gets all mark price updates for the `instrument_id`.
2930    #[must_use]
2931    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
2932        self.mark_prices
2933            .get(instrument_id)
2934            .map(|mark_prices| mark_prices.iter().copied().collect())
2935    }
2936
2937    /// Gets all index price updates for the `instrument_id`.
2938    #[must_use]
2939    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
2940        self.index_prices
2941            .get(instrument_id)
2942            .map(|index_prices| index_prices.iter().copied().collect())
2943    }
2944
2945    /// Gets all bars for the `bar_type`.
2946    #[must_use]
2947    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2948        self.bars
2949            .get(bar_type)
2950            .map(|bars| bars.iter().copied().collect())
2951    }
2952
2953    /// Gets a reference to the order book for the `instrument_id`.
2954    #[must_use]
2955    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
2956        self.books.get(instrument_id)
2957    }
2958
2959    /// Gets a reference to the order book for the `instrument_id`.
2960    #[must_use]
2961    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
2962        self.books.get_mut(instrument_id)
2963    }
2964
2965    /// Gets a reference to the own order book for the `instrument_id`.
2966    #[must_use]
2967    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
2968        self.own_books.get(instrument_id)
2969    }
2970
2971    /// Gets a reference to the own order book for the `instrument_id`.
2972    #[must_use]
2973    pub fn own_order_book_mut(
2974        &mut self,
2975        instrument_id: &InstrumentId,
2976    ) -> Option<&mut OwnOrderBook> {
2977        self.own_books.get_mut(instrument_id)
2978    }
2979
2980    /// Gets a reference to the pool for the `instrument_id`.
2981    #[cfg(feature = "defi")]
2982    #[must_use]
2983    pub fn pool(&self, instrument_id: &InstrumentId) -> Option<&Pool> {
2984        self.pools.get(instrument_id)
2985    }
2986
2987    /// Gets a mutable reference to the pool for the `instrument_id`.
2988    #[cfg(feature = "defi")]
2989    #[must_use]
2990    pub fn pool_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut Pool> {
2991        self.pools.get_mut(instrument_id)
2992    }
2993
2994    /// Gets a reference to the latest quote for the `instrument_id`.
2995    #[must_use]
2996    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
2997        self.quotes
2998            .get(instrument_id)
2999            .and_then(|quotes| quotes.front())
3000    }
3001
3002    /// Gets a reference to the latest trade for the `instrument_id`.
3003    #[must_use]
3004    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3005        self.trades
3006            .get(instrument_id)
3007            .and_then(|trades| trades.front())
3008    }
3009
3010    /// Gets a referenece to the latest mark price update for the `instrument_id`.
3011    #[must_use]
3012    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3013        self.mark_prices
3014            .get(instrument_id)
3015            .and_then(|mark_prices| mark_prices.front())
3016    }
3017
3018    /// Gets a referenece to the latest index price update for the `instrument_id`.
3019    #[must_use]
3020    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3021        self.index_prices
3022            .get(instrument_id)
3023            .and_then(|index_prices| index_prices.front())
3024    }
3025
3026    /// Gets a reference to the funding rate update for the `instrument_id`.
3027    #[must_use]
3028    pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3029        self.funding_rates.get(instrument_id)
3030    }
3031
3032    /// Gets a reference to the latest bar for the `bar_type`.
3033    #[must_use]
3034    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3035        self.bars.get(bar_type).and_then(|bars| bars.front())
3036    }
3037
3038    /// Gets the order book update count for the `instrument_id`.
3039    #[must_use]
3040    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3041        self.books
3042            .get(instrument_id)
3043            .map_or(0, |book| book.update_count) as usize
3044    }
3045
3046    /// Gets the quote tick count for the `instrument_id`.
3047    #[must_use]
3048    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3049        self.quotes
3050            .get(instrument_id)
3051            .map_or(0, std::collections::VecDeque::len)
3052    }
3053
3054    /// Gets the trade tick count for the `instrument_id`.
3055    #[must_use]
3056    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3057        self.trades
3058            .get(instrument_id)
3059            .map_or(0, std::collections::VecDeque::len)
3060    }
3061
3062    /// Gets the bar count for the `instrument_id`.
3063    #[must_use]
3064    pub fn bar_count(&self, bar_type: &BarType) -> usize {
3065        self.bars
3066            .get(bar_type)
3067            .map_or(0, std::collections::VecDeque::len)
3068    }
3069
3070    /// Returns whether the cache contains an order book for the `instrument_id`.
3071    #[must_use]
3072    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3073        self.books.contains_key(instrument_id)
3074    }
3075
3076    /// Returns whether the cache contains quotes for the `instrument_id`.
3077    #[must_use]
3078    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3079        self.quote_count(instrument_id) > 0
3080    }
3081
3082    /// Returns whether the cache contains trades for the `instrument_id`.
3083    #[must_use]
3084    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3085        self.trade_count(instrument_id) > 0
3086    }
3087
3088    /// Returns whether the cache contains bars for the `bar_type`.
3089    #[must_use]
3090    pub fn has_bars(&self, bar_type: &BarType) -> bool {
3091        self.bar_count(bar_type) > 0
3092    }
3093
3094    #[must_use]
3095    pub fn get_xrate(
3096        &self,
3097        venue: Venue,
3098        from_currency: Currency,
3099        to_currency: Currency,
3100        price_type: PriceType,
3101    ) -> Option<f64> {
3102        if from_currency == to_currency {
3103            // When the source and target currencies are identical,
3104            // no conversion is needed; return an exchange rate of 1.0.
3105            return Some(1.0);
3106        }
3107
3108        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3109
3110        match get_exchange_rate(
3111            from_currency.code,
3112            to_currency.code,
3113            price_type,
3114            bid_quote,
3115            ask_quote,
3116        ) {
3117            Ok(rate) => rate,
3118            Err(e) => {
3119                log::error!("Failed to calculate xrate: {e}");
3120                None
3121            }
3122        }
3123    }
3124
3125    fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3126        let mut bid_quotes = AHashMap::new();
3127        let mut ask_quotes = AHashMap::new();
3128
3129        for instrument_id in self.instruments.keys() {
3130            if instrument_id.venue != *venue {
3131                continue;
3132            }
3133
3134            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3135                if let Some(tick) = ticks.front() {
3136                    (tick.bid_price, tick.ask_price)
3137                } else {
3138                    continue; // Empty ticks vector
3139                }
3140            } else {
3141                let bid_bar = self
3142                    .bars
3143                    .iter()
3144                    .find(|(k, _)| {
3145                        k.instrument_id() == *instrument_id
3146                            && matches!(k.spec().price_type, PriceType::Bid)
3147                    })
3148                    .map(|(_, v)| v);
3149
3150                let ask_bar = self
3151                    .bars
3152                    .iter()
3153                    .find(|(k, _)| {
3154                        k.instrument_id() == *instrument_id
3155                            && matches!(k.spec().price_type, PriceType::Ask)
3156                    })
3157                    .map(|(_, v)| v);
3158
3159                match (bid_bar, ask_bar) {
3160                    (Some(bid), Some(ask)) => {
3161                        let bid_price = bid.front().unwrap().close;
3162                        let ask_price = ask.front().unwrap().close;
3163
3164                        (bid_price, ask_price)
3165                    }
3166                    _ => continue,
3167                }
3168            };
3169
3170            bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3171            ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3172        }
3173
3174        (bid_quotes, ask_quotes)
3175    }
3176
3177    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
3178    #[must_use]
3179    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3180        self.mark_xrates.get(&(from_currency, to_currency)).copied()
3181    }
3182
3183    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
3184    ///
3185    /// # Panics
3186    ///
3187    /// Panics if `xrate` is not positive.
3188    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3189        assert!(xrate > 0.0, "xrate was zero");
3190        self.mark_xrates.insert((from_currency, to_currency), xrate);
3191        self.mark_xrates
3192            .insert((to_currency, from_currency), 1.0 / xrate);
3193    }
3194
3195    /// Clears the mark exchange rate for the given currency pair.
3196    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3197        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3198    }
3199
3200    /// Clears all mark exchange rates.
3201    pub fn clear_mark_xrates(&mut self) {
3202        self.mark_xrates.clear();
3203    }
3204
3205    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
3206
3207    /// Returns a reference to the instrument for the `instrument_id` (if found).
3208    #[must_use]
3209    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3210        self.instruments.get(instrument_id)
3211    }
3212
3213    /// Returns references to all instrument IDs for the `venue`.
3214    #[must_use]
3215    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3216        match venue {
3217            Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3218            None => self.instruments.keys().collect(),
3219        }
3220    }
3221
3222    /// Returns references to all instruments for the `venue`.
3223    #[must_use]
3224    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3225        self.instruments
3226            .values()
3227            .filter(|i| &i.id().venue == venue)
3228            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3229            .collect()
3230    }
3231
3232    /// Returns references to all bar types contained in the cache.
3233    #[must_use]
3234    pub fn bar_types(
3235        &self,
3236        instrument_id: Option<&InstrumentId>,
3237        price_type: Option<&PriceType>,
3238        aggregation_source: AggregationSource,
3239    ) -> Vec<&BarType> {
3240        let mut bar_types = self
3241            .bars
3242            .keys()
3243            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3244            .collect::<Vec<&BarType>>();
3245
3246        if let Some(instrument_id) = instrument_id {
3247            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3248        }
3249
3250        if let Some(price_type) = price_type {
3251            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3252        }
3253
3254        bar_types
3255    }
3256
3257    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
3258
3259    /// Returns a reference to the synthetic instrument for the `instrument_id` (if found).
3260    #[must_use]
3261    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3262        self.synthetics.get(instrument_id)
3263    }
3264
3265    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
3266    #[must_use]
3267    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3268        self.synthetics.keys().collect()
3269    }
3270
3271    /// Returns references to all synthetic instruments contained in the cache.
3272    #[must_use]
3273    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3274        self.synthetics.values().collect()
3275    }
3276
3277    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
3278
3279    /// Returns a reference to the account for the `account_id` (if found).
3280    #[must_use]
3281    pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3282        self.accounts.get(account_id)
3283    }
3284
3285    /// Returns a reference to the account for the `venue` (if found).
3286    #[must_use]
3287    pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3288        self.index
3289            .venue_account
3290            .get(venue)
3291            .and_then(|account_id| self.accounts.get(account_id))
3292    }
3293
3294    /// Returns a reference to the account ID for the `venue` (if found).
3295    #[must_use]
3296    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3297        self.index.venue_account.get(venue)
3298    }
3299
3300    /// Returns references to all accounts for the `account_id`.
3301    #[must_use]
3302    pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3303        self.accounts
3304            .values()
3305            .filter(|account| &account.id() == account_id)
3306            .collect()
3307    }
3308
3309    /// Updates the own order book with an order.
3310    ///
3311    /// This method adds, updates, or removes an order from the own order book
3312    /// based on the order's current state.
3313    pub fn update_own_order_book(&mut self, order: &OrderAny) {
3314        let instrument_id = order.instrument_id();
3315
3316        // Get or create the own order book for this instrument
3317        let own_book = self
3318            .own_books
3319            .entry(instrument_id)
3320            .or_insert_with(|| OwnOrderBook::new(instrument_id));
3321
3322        // Convert order to own book order
3323        let own_book_order = order.to_own_book_order();
3324
3325        if order.is_closed() {
3326            // Remove the order from the own book if it's closed
3327            if let Err(e) = own_book.delete(own_book_order) {
3328                log::debug!(
3329                    "Failed to delete order {} from own book: {}",
3330                    order.client_order_id(),
3331                    e
3332                );
3333            } else {
3334                log::debug!("Deleted order {} from own book", order.client_order_id());
3335            }
3336        } else {
3337            // Add or update the order in the own book
3338            own_book.update(own_book_order).unwrap_or_else(|e| {
3339                log::debug!(
3340                    "Failed to update order {} in own book: {}",
3341                    order.client_order_id(),
3342                    e
3343                );
3344            });
3345            log::debug!("Updated order {} in own book", order.client_order_id());
3346        }
3347    }
3348}