nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! A common in-memory `Cache` for market and execution related data.
17
18pub mod config;
19pub mod database;
20
21mod index;
22
23#[cfg(test)]
24mod tests;
25
26use std::{
27    collections::{HashMap, HashSet, VecDeque},
28    time::{SystemTime, UNIX_EPOCH},
29};
30
31use bytes::Bytes;
32pub use config::CacheConfig; // Re-export
33use database::{CacheDatabaseAdapter, CacheMap};
34use index::CacheIndex;
35use nautilus_core::{
36    UUID4, UnixNanos,
37    correctness::{
38        FAILED, check_key_not_in_map, check_predicate_false, check_slice_not_empty,
39        check_valid_string,
40    },
41    datetime::secs_to_nanos,
42};
43use nautilus_model::{
44    accounts::AccountAny,
45    data::{
46        Bar, BarType, GreeksData, QuoteTick, TradeTick, YieldCurveData,
47        prices::{IndexPriceUpdate, MarkPriceUpdate},
48    },
49    enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
50    identifiers::{
51        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
52        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
53    },
54    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
55    orderbook::{OrderBook, own::OwnOrderBook},
56    orders::{Order, OrderAny, OrderList},
57    position::Position,
58    types::{Currency, Money, Price, Quantity},
59};
60use ustr::Ustr;
61
62use crate::xrate::get_exchange_rate;
63
64/// A common in-memory `Cache` for market and execution related data.
65#[allow(missing_debug_implementations)]
66pub struct Cache {
67    config: CacheConfig,
68    index: CacheIndex,
69    database: Option<Box<dyn CacheDatabaseAdapter>>,
70    general: HashMap<String, Bytes>,
71    currencies: HashMap<Ustr, Currency>,
72    instruments: HashMap<InstrumentId, InstrumentAny>,
73    synthetics: HashMap<InstrumentId, SyntheticInstrument>,
74    books: HashMap<InstrumentId, OrderBook>,
75    own_books: HashMap<InstrumentId, OwnOrderBook>,
76    quotes: HashMap<InstrumentId, VecDeque<QuoteTick>>,
77    trades: HashMap<InstrumentId, VecDeque<TradeTick>>,
78    mark_xrates: HashMap<(Currency, Currency), f64>,
79    mark_prices: HashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
80    index_prices: HashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
81    bars: HashMap<BarType, VecDeque<Bar>>,
82    greeks: HashMap<InstrumentId, GreeksData>,
83    yield_curves: HashMap<String, YieldCurveData>,
84    accounts: HashMap<AccountId, AccountAny>,
85    orders: HashMap<ClientOrderId, OrderAny>,
86    order_lists: HashMap<OrderListId, OrderList>,
87    positions: HashMap<PositionId, Position>,
88    position_snapshots: HashMap<PositionId, Bytes>,
89}
90
91impl Default for Cache {
92    /// Creates a new default [`Cache`] instance.
93    fn default() -> Self {
94        Self::new(Some(CacheConfig::default()), None)
95    }
96}
97
98impl Cache {
99    /// Creates a new [`Cache`] instance.
100    #[must_use]
101    pub fn new(
102        config: Option<CacheConfig>,
103        database: Option<Box<dyn CacheDatabaseAdapter>>,
104    ) -> Self {
105        Self {
106            config: config.unwrap_or_default(),
107            index: CacheIndex::default(),
108            database,
109            general: HashMap::new(),
110            currencies: HashMap::new(),
111            instruments: HashMap::new(),
112            synthetics: HashMap::new(),
113            books: HashMap::new(),
114            own_books: HashMap::new(),
115            quotes: HashMap::new(),
116            trades: HashMap::new(),
117            mark_xrates: HashMap::new(),
118            mark_prices: HashMap::new(),
119            index_prices: HashMap::new(),
120            bars: HashMap::new(),
121            greeks: HashMap::new(),
122            yield_curves: HashMap::new(),
123            accounts: HashMap::new(),
124            orders: HashMap::new(),
125            order_lists: HashMap::new(),
126            positions: HashMap::new(),
127            position_snapshots: HashMap::new(),
128        }
129    }
130
131    /// Returns the cache instances memory address.
132    #[must_use]
133    pub fn memory_address(&self) -> String {
134        format!("{:?}", std::ptr::from_ref(self))
135    }
136
137    // -- COMMANDS --------------------------------------------------------------------------------
138
139    /// Clears the current general cache and loads the general objects from the cache database.
140    pub fn cache_general(&mut self) -> anyhow::Result<()> {
141        self.general = match &mut self.database {
142            Some(db) => db.load()?,
143            None => HashMap::new(),
144        };
145
146        log::info!(
147            "Cached {} general object(s) from database",
148            self.general.len()
149        );
150        Ok(())
151    }
152
153    /// Loads all caches (currencies, instruments, synthetics, accounts, orders, positions) from the database.
154    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
155        let cache_map = match &self.database {
156            Some(db) => db.load_all().await?,
157            None => CacheMap::default(),
158        };
159
160        self.currencies = cache_map.currencies;
161        self.instruments = cache_map.instruments;
162        self.synthetics = cache_map.synthetics;
163        self.accounts = cache_map.accounts;
164        self.orders = cache_map.orders;
165        self.positions = cache_map.positions;
166        Ok(())
167    }
168
169    /// Clears the current currencies cache and loads currencies from the cache database.
170    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
171        self.currencies = match &mut self.database {
172            Some(db) => db.load_currencies().await?,
173            None => HashMap::new(),
174        };
175
176        log::info!("Cached {} currencies from database", self.general.len());
177        Ok(())
178    }
179
180    /// Clears the current instruments cache and loads instruments from the cache database.
181    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
182        self.instruments = match &mut self.database {
183            Some(db) => db.load_instruments().await?,
184            None => HashMap::new(),
185        };
186
187        log::info!("Cached {} instruments from database", self.general.len());
188        Ok(())
189    }
190
191    /// Clears the current synthetic instruments cache and loads synthetic instruments from the cache
192    /// database.
193    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
194        self.synthetics = match &mut self.database {
195            Some(db) => db.load_synthetics().await?,
196            None => HashMap::new(),
197        };
198
199        log::info!(
200            "Cached {} synthetic instruments from database",
201            self.general.len()
202        );
203        Ok(())
204    }
205
206    /// Clears the current accounts cache and loads accounts from the cache database.
207    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
208        self.accounts = match &mut self.database {
209            Some(db) => db.load_accounts().await?,
210            None => HashMap::new(),
211        };
212
213        log::info!(
214            "Cached {} synthetic instruments from database",
215            self.general.len()
216        );
217        Ok(())
218    }
219
220    /// Clears the current orders cache and loads orders from the cache database.
221    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
222        self.orders = match &mut self.database {
223            Some(db) => db.load_orders().await?,
224            None => HashMap::new(),
225        };
226
227        log::info!("Cached {} orders from database", self.general.len());
228        Ok(())
229    }
230
231    /// Clears the current positions cache and loads positions from the cache database.
232    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
233        self.positions = match &mut self.database {
234            Some(db) => db.load_positions().await?,
235            None => HashMap::new(),
236        };
237
238        log::info!("Cached {} positions from database", self.general.len());
239        Ok(())
240    }
241
242    /// Clears the current cache index and re-build.
243    pub fn build_index(&mut self) {
244        log::debug!("Building index");
245
246        // Index accounts
247        for account_id in self.accounts.keys() {
248            self.index
249                .venue_account
250                .insert(account_id.get_issuer(), *account_id);
251        }
252
253        // Index orders
254        for (client_order_id, order) in &self.orders {
255            let instrument_id = order.instrument_id();
256            let venue = instrument_id.venue;
257            let strategy_id = order.strategy_id();
258
259            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
260            self.index
261                .venue_orders
262                .entry(venue)
263                .or_default()
264                .insert(*client_order_id);
265
266            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
267            if let Some(venue_order_id) = order.venue_order_id() {
268                self.index
269                    .venue_order_ids
270                    .insert(venue_order_id, *client_order_id);
271            }
272
273            // 3: Build index.order_position -> {ClientOrderId, PositionId}
274            if let Some(position_id) = order.position_id() {
275                self.index
276                    .order_position
277                    .insert(*client_order_id, position_id);
278            }
279
280            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
281            self.index
282                .order_strategy
283                .insert(*client_order_id, order.strategy_id());
284
285            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
286            self.index
287                .instrument_orders
288                .entry(instrument_id)
289                .or_default()
290                .insert(*client_order_id);
291
292            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
293            self.index
294                .strategy_orders
295                .entry(strategy_id)
296                .or_default()
297                .insert(*client_order_id);
298
299            // 7: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
300            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
301                self.index
302                    .exec_algorithm_orders
303                    .entry(exec_algorithm_id)
304                    .or_default()
305                    .insert(*client_order_id);
306            }
307
308            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
309            if let Some(exec_spawn_id) = order.exec_spawn_id() {
310                self.index
311                    .exec_spawn_orders
312                    .entry(exec_spawn_id)
313                    .or_default()
314                    .insert(*client_order_id);
315            }
316
317            // 9: Build index.orders -> {ClientOrderId}
318            self.index.orders.insert(*client_order_id);
319
320            // 10: Build index.orders_open -> {ClientOrderId}
321            if order.is_open() {
322                self.index.orders_open.insert(*client_order_id);
323            }
324
325            // 11: Build index.orders_closed -> {ClientOrderId}
326            if order.is_closed() {
327                self.index.orders_closed.insert(*client_order_id);
328            }
329
330            // 12: Build index.orders_emulated -> {ClientOrderId}
331            if let Some(emulation_trigger) = order.emulation_trigger() {
332                if emulation_trigger != TriggerType::NoTrigger && !order.is_closed() {
333                    self.index.orders_emulated.insert(*client_order_id);
334                }
335            }
336
337            // 13: Build index.orders_inflight -> {ClientOrderId}
338            if order.is_inflight() {
339                self.index.orders_inflight.insert(*client_order_id);
340            }
341
342            // 14: Build index.strategies -> {StrategyId}
343            self.index.strategies.insert(strategy_id);
344
345            // 15: Build index.strategies -> {ExecAlgorithmId}
346            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
347                self.index.exec_algorithms.insert(exec_algorithm_id);
348            }
349        }
350
351        // Index positions
352        for (position_id, position) in &self.positions {
353            let instrument_id = position.instrument_id;
354            let venue = instrument_id.venue;
355            let strategy_id = position.strategy_id;
356
357            // 1: Build index.venue_positions -> {Venue, {PositionId}}
358            self.index
359                .venue_positions
360                .entry(venue)
361                .or_default()
362                .insert(*position_id);
363
364            // 2: Build index.position_strategy -> {PositionId, StrategyId}
365            self.index
366                .position_strategy
367                .insert(*position_id, position.strategy_id);
368
369            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
370            self.index
371                .position_orders
372                .entry(*position_id)
373                .or_default()
374                .extend(position.client_order_ids().into_iter());
375
376            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
377            self.index
378                .instrument_positions
379                .entry(instrument_id)
380                .or_default()
381                .insert(*position_id);
382
383            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
384            self.index
385                .strategy_positions
386                .entry(strategy_id)
387                .or_default()
388                .insert(*position_id);
389
390            // 6: Build index.positions -> {PositionId}
391            self.index.positions.insert(*position_id);
392
393            // 7: Build index.positions_open -> {PositionId}
394            if position.is_open() {
395                self.index.positions_open.insert(*position_id);
396            }
397
398            // 8: Build index.positions_closed -> {PositionId}
399            if position.is_closed() {
400                self.index.positions_closed.insert(*position_id);
401            }
402
403            // 9: Build index.strategies -> {StrategyId}
404            self.index.strategies.insert(strategy_id);
405        }
406    }
407
408    /// Returns whether the cache has a backing database.
409    #[must_use]
410    pub const fn has_backing(&self) -> bool {
411        self.config.database.is_some()
412    }
413
414    // Calculate the unrealized profit and loss (PnL) for a given position.
415    #[must_use]
416    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
417        let quote = if let Some(quote) = self.quote(&position.instrument_id) {
418            quote
419        } else {
420            log::warn!(
421                "Cannot calculate unrealized PnL for {}, no quotes for {}",
422                position.id,
423                position.instrument_id
424            );
425            return None;
426        };
427
428        let last = match position.side {
429            PositionSide::Flat | PositionSide::NoPositionSide => {
430                return Some(Money::new(0.0, position.settlement_currency));
431            }
432            PositionSide::Long => quote.ask_price,
433            PositionSide::Short => quote.bid_price,
434        };
435
436        Some(position.unrealized_pnl(last))
437    }
438
439    /// Checks integrity of data within the cache.
440    ///
441    /// All data should be loaded from the database prior to this call.
442    /// If an error is found then a log error message will also be produced.
443    #[must_use]
444    pub fn check_integrity(&mut self) -> bool {
445        let mut error_count = 0;
446        let failure = "Integrity failure";
447
448        // Get current timestamp in microseconds
449        let timestamp_us = SystemTime::now()
450            .duration_since(UNIX_EPOCH)
451            .expect("Time went backwards")
452            .as_micros();
453
454        log::info!("Checking data integrity");
455
456        // Check object caches
457        for account_id in self.accounts.keys() {
458            if !self
459                .index
460                .venue_account
461                .contains_key(&account_id.get_issuer())
462            {
463                log::error!(
464                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
465                );
466                error_count += 1;
467            }
468        }
469
470        for (client_order_id, order) in &self.orders {
471            if !self.index.order_strategy.contains_key(client_order_id) {
472                log::error!(
473                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
474                );
475                error_count += 1;
476            }
477            if !self.index.orders.contains(client_order_id) {
478                log::error!(
479                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
480                );
481                error_count += 1;
482            }
483            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
484                log::error!(
485                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
486                );
487                error_count += 1;
488            }
489            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
490                log::error!(
491                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
492                );
493                error_count += 1;
494            }
495            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
496                log::error!(
497                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
498                );
499                error_count += 1;
500            }
501            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
502                if !self
503                    .index
504                    .exec_algorithm_orders
505                    .contains_key(&exec_algorithm_id)
506                {
507                    log::error!(
508                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
509                    );
510                    error_count += 1;
511                }
512                if order.exec_spawn_id().is_none()
513                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
514                {
515                    log::error!(
516                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
517                    );
518                    error_count += 1;
519                }
520            }
521        }
522
523        for (position_id, position) in &self.positions {
524            if !self.index.position_strategy.contains_key(position_id) {
525                log::error!(
526                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
527                );
528                error_count += 1;
529            }
530            if !self.index.position_orders.contains_key(position_id) {
531                log::error!(
532                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
533                );
534                error_count += 1;
535            }
536            if !self.index.positions.contains(position_id) {
537                log::error!(
538                    "{failure} in positions: {position_id} not found in `self.index.positions`",
539                );
540                error_count += 1;
541            }
542            if position.is_open() && !self.index.positions_open.contains(position_id) {
543                log::error!(
544                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
545                );
546                error_count += 1;
547            }
548            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
549                log::error!(
550                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
551                );
552                error_count += 1;
553            }
554        }
555
556        // Check indexes
557        for account_id in self.index.venue_account.values() {
558            if !self.accounts.contains_key(account_id) {
559                log::error!(
560                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
561                );
562                error_count += 1;
563            }
564        }
565
566        for client_order_id in self.index.venue_order_ids.values() {
567            if !self.orders.contains_key(client_order_id) {
568                log::error!(
569                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
570                );
571                error_count += 1;
572            }
573        }
574
575        for client_order_id in self.index.client_order_ids.keys() {
576            if !self.orders.contains_key(client_order_id) {
577                log::error!(
578                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
579                );
580                error_count += 1;
581            }
582        }
583
584        for client_order_id in self.index.order_position.keys() {
585            if !self.orders.contains_key(client_order_id) {
586                log::error!(
587                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
588                );
589                error_count += 1;
590            }
591        }
592
593        // Check indexes
594        for client_order_id in self.index.order_strategy.keys() {
595            if !self.orders.contains_key(client_order_id) {
596                log::error!(
597                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
598                );
599                error_count += 1;
600            }
601        }
602
603        for position_id in self.index.position_strategy.keys() {
604            if !self.positions.contains_key(position_id) {
605                log::error!(
606                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
607                );
608                error_count += 1;
609            }
610        }
611
612        for position_id in self.index.position_orders.keys() {
613            if !self.positions.contains_key(position_id) {
614                log::error!(
615                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
616                );
617                error_count += 1;
618            }
619        }
620
621        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
622            for client_order_id in client_order_ids {
623                if !self.orders.contains_key(client_order_id) {
624                    log::error!(
625                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
626                    );
627                    error_count += 1;
628                }
629            }
630        }
631
632        for instrument_id in self.index.instrument_positions.keys() {
633            if !self.index.instrument_orders.contains_key(instrument_id) {
634                log::error!(
635                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
636                );
637                error_count += 1;
638            }
639        }
640
641        for client_order_ids in self.index.strategy_orders.values() {
642            for client_order_id in client_order_ids {
643                if !self.orders.contains_key(client_order_id) {
644                    log::error!(
645                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
646                    );
647                    error_count += 1;
648                }
649            }
650        }
651
652        for position_ids in self.index.strategy_positions.values() {
653            for position_id in position_ids {
654                if !self.positions.contains_key(position_id) {
655                    log::error!(
656                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
657                    );
658                    error_count += 1;
659                }
660            }
661        }
662
663        for client_order_id in &self.index.orders {
664            if !self.orders.contains_key(client_order_id) {
665                log::error!(
666                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
667                );
668                error_count += 1;
669            }
670        }
671
672        for client_order_id in &self.index.orders_emulated {
673            if !self.orders.contains_key(client_order_id) {
674                log::error!(
675                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
676                );
677                error_count += 1;
678            }
679        }
680
681        for client_order_id in &self.index.orders_inflight {
682            if !self.orders.contains_key(client_order_id) {
683                log::error!(
684                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
685                );
686                error_count += 1;
687            }
688        }
689
690        for client_order_id in &self.index.orders_open {
691            if !self.orders.contains_key(client_order_id) {
692                log::error!(
693                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
694                );
695                error_count += 1;
696            }
697        }
698
699        for client_order_id in &self.index.orders_closed {
700            if !self.orders.contains_key(client_order_id) {
701                log::error!(
702                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
703                );
704                error_count += 1;
705            }
706        }
707
708        for position_id in &self.index.positions {
709            if !self.positions.contains_key(position_id) {
710                log::error!(
711                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
712                );
713                error_count += 1;
714            }
715        }
716
717        for position_id in &self.index.positions_open {
718            if !self.positions.contains_key(position_id) {
719                log::error!(
720                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
721                );
722                error_count += 1;
723            }
724        }
725
726        for position_id in &self.index.positions_closed {
727            if !self.positions.contains_key(position_id) {
728                log::error!(
729                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
730                );
731                error_count += 1;
732            }
733        }
734
735        for strategy_id in &self.index.strategies {
736            if !self.index.strategy_orders.contains_key(strategy_id) {
737                log::error!(
738                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
739                );
740                error_count += 1;
741            }
742        }
743
744        for exec_algorithm_id in &self.index.exec_algorithms {
745            if !self
746                .index
747                .exec_algorithm_orders
748                .contains_key(exec_algorithm_id)
749            {
750                log::error!(
751                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
752                );
753                error_count += 1;
754            }
755        }
756
757        let total_us = SystemTime::now()
758            .duration_since(UNIX_EPOCH)
759            .expect("Time went backwards")
760            .as_micros()
761            - timestamp_us;
762
763        if error_count == 0 {
764            log::info!("Integrity check passed in {total_us}μs");
765            true
766        } else {
767            log::error!(
768                "Integrity check failed with {error_count} error{} in {total_us}μs",
769                if error_count == 1 { "" } else { "s" },
770            );
771            false
772        }
773    }
774
775    /// Checks for any residual open state and log warnings if any are found.
776    ///
777    ///'Open state' is considered to be open orders and open positions.
778    #[must_use]
779    pub fn check_residuals(&self) -> bool {
780        log::debug!("Checking residuals");
781
782        let mut residuals = false;
783
784        // Check for any open orders
785        for order in self.orders_open(None, None, None, None) {
786            residuals = true;
787            log::warn!("Residual {order:?}");
788        }
789
790        // Check for any open positions
791        for position in self.positions_open(None, None, None, None) {
792            residuals = true;
793            log::warn!("Residual {position}");
794        }
795
796        residuals
797    }
798
799    /// Purges all closed orders from the cache that are older than the given buffer time.
800    ///
801    ///
802    /// Only orders that have been closed for at least this amount of time will be purged.
803    /// A value of 0 means purge all closed orders regardless of when they were closed.
804    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
805        log::debug!(
806            "Purging closed orders{}",
807            if buffer_secs > 0 {
808                format!(" with buffer_secs={buffer_secs}")
809            } else {
810                String::new()
811            }
812        );
813
814        let buffer_ns = secs_to_nanos(buffer_secs as f64);
815
816        for client_order_id in self.index.orders_closed.clone() {
817            if let Some(order) = self.orders.get(&client_order_id) {
818                if let Some(ts_closed) = order.ts_closed() {
819                    if ts_closed + buffer_ns <= ts_now {
820                        self.purge_order(client_order_id);
821                    }
822                }
823            }
824        }
825    }
826
827    /// Purges all closed positions from the cache that are older than the given buffer time.
828    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
829        log::debug!(
830            "Purging closed positions{}",
831            if buffer_secs > 0 {
832                format!(" with buffer_secs={buffer_secs}")
833            } else {
834                String::new()
835            }
836        );
837
838        let buffer_ns = secs_to_nanos(buffer_secs as f64);
839
840        for position_id in self.index.positions_closed.clone() {
841            if let Some(position) = self.positions.get(&position_id) {
842                if let Some(ts_closed) = position.ts_closed {
843                    if ts_closed + buffer_ns <= ts_now {
844                        self.purge_position(position_id);
845                    }
846                }
847            }
848        }
849    }
850
851    /// Purges the order with the given client order ID from the cache (if found).
852    ///
853    /// All `OrderFilled` events for the order will also be purged from any associated position.
854    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
855        // Purge events from associated position if exists
856        if let Some(position_id) = self.index.order_position.get(&client_order_id) {
857            if let Some(position) = self.positions.get_mut(position_id) {
858                position.purge_events_for_order(client_order_id);
859            }
860        }
861
862        if let Some(order) = self.orders.remove(&client_order_id) {
863            // Remove order from venue index
864            if let Some(venue_orders) = self
865                .index
866                .venue_orders
867                .get_mut(&order.instrument_id().venue)
868            {
869                venue_orders.remove(&client_order_id);
870            }
871
872            // Remove venue order ID index if exists
873            if let Some(venue_order_id) = order.venue_order_id() {
874                self.index.venue_order_ids.remove(&venue_order_id);
875            }
876
877            // Remove from instrument orders index
878            if let Some(instrument_orders) =
879                self.index.instrument_orders.get_mut(&order.instrument_id())
880            {
881                instrument_orders.remove(&client_order_id);
882            }
883
884            // Remove from position orders index if associated with a position
885            if let Some(position_id) = order.position_id() {
886                if let Some(position_orders) = self.index.position_orders.get_mut(&position_id) {
887                    position_orders.remove(&client_order_id);
888                }
889            }
890
891            // Remove from exec algorithm orders index if it has an exec algorithm
892            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
893                if let Some(exec_algorithm_orders) =
894                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
895                {
896                    exec_algorithm_orders.remove(&client_order_id);
897                }
898            }
899
900            log::info!("Purged order {client_order_id}");
901        } else {
902            log::warn!("Order {client_order_id} not found when purging");
903        }
904
905        // Remove from all other index collections regardless of whether order was found
906        self.index.order_position.remove(&client_order_id);
907        self.index.order_strategy.remove(&client_order_id);
908        self.index.order_client.remove(&client_order_id);
909        self.index.client_order_ids.remove(&client_order_id);
910        self.index.exec_spawn_orders.remove(&client_order_id);
911        self.index.orders.remove(&client_order_id);
912        self.index.orders_closed.remove(&client_order_id);
913        self.index.orders_emulated.remove(&client_order_id);
914        self.index.orders_inflight.remove(&client_order_id);
915        self.index.orders_pending_cancel.remove(&client_order_id);
916    }
917
918    /// Purges the position with the given position ID from the cache (if found).
919    pub fn purge_position(&mut self, position_id: PositionId) {
920        if let Some(position) = self.positions.remove(&position_id) {
921            // Remove from venue positions index
922            if let Some(venue_positions) = self
923                .index
924                .venue_positions
925                .get_mut(&position.instrument_id.venue)
926            {
927                venue_positions.remove(&position_id);
928            }
929
930            // Remove from instrument positions index
931            if let Some(instrument_positions) = self
932                .index
933                .instrument_positions
934                .get_mut(&position.instrument_id)
935            {
936                instrument_positions.remove(&position_id);
937            }
938
939            // Remove from strategy positions index
940            if let Some(strategy_positions) =
941                self.index.strategy_positions.get_mut(&position.strategy_id)
942            {
943                strategy_positions.remove(&position_id);
944            }
945
946            // Remove position ID from orders that reference it
947            for client_order_id in position.client_order_ids() {
948                self.index.order_position.remove(&client_order_id);
949            }
950
951            log::info!("Purged position {position_id}");
952        } else {
953            log::warn!("Position {position_id} not found when purging");
954        }
955
956        // Remove from all other index collections regardless of whether position was found
957        self.index.position_strategy.remove(&position_id);
958        self.index.position_orders.remove(&position_id);
959        self.index.positions.remove(&position_id);
960        self.index.positions_open.remove(&position_id);
961        self.index.positions_closed.remove(&position_id);
962    }
963
964    /// Purges all account state events which are outside the lookback window.
965    ///
966    /// Only events which are outside the lookback window will be purged.
967    /// A value of 0 means purge all account state events.
968    pub fn purge_account_events(&mut self, _ts_now: UnixNanos, lookback_secs: u64) {
969        log::debug!(
970            "Purging account events{}",
971            if lookback_secs > 0 {
972                format!(" with lookback_secs={lookback_secs}")
973            } else {
974                String::new()
975            }
976        );
977
978        // TODO: Implement purging of account state events
979        // for account in self.accounts.values_mut() {
980        //     let event_count = account.event_count();
981        //     account.purge_account_events(ts_now, lookback_secs);
982        //     let count_diff = event_count - account.event_count();
983        //     if count_diff > 0 {
984        //         log::info!(
985        //             "Purged {} event(s) from account {}",
986        //             count_diff,
987        //             account.id()
988        //         );
989        //     }
990        // }
991    }
992
993    /// Clears the caches index.
994    pub fn clear_index(&mut self) {
995        self.index.clear();
996        log::debug!("Cleared index");
997    }
998
999    /// Resets the cache.
1000    ///
1001    /// All stateful fields are reset to their initial value.
1002    pub fn reset(&mut self) {
1003        log::debug!("Resetting cache");
1004
1005        self.general.clear();
1006        self.currencies.clear();
1007        self.instruments.clear();
1008        self.synthetics.clear();
1009        self.books.clear();
1010        self.own_books.clear();
1011        self.quotes.clear();
1012        self.trades.clear();
1013        self.mark_xrates.clear();
1014        self.mark_prices.clear();
1015        self.index_prices.clear();
1016        self.bars.clear();
1017        self.accounts.clear();
1018        self.orders.clear();
1019        self.order_lists.clear();
1020        self.positions.clear();
1021        self.position_snapshots.clear();
1022        self.greeks.clear();
1023        self.yield_curves.clear();
1024
1025        self.clear_index();
1026
1027        log::info!("Reset cache");
1028    }
1029
1030    /// Dispose of the cache which will close any underlying database adapter.
1031    pub fn dispose(&mut self) {
1032        if let Some(database) = &mut self.database {
1033            database.close().expect("Failed to close database");
1034        }
1035    }
1036
1037    /// Flushes the caches database which permanently removes all persisted data.
1038    pub fn flush_db(&mut self) {
1039        if let Some(database) = &mut self.database {
1040            database.flush().expect("Failed to flush database");
1041        }
1042    }
1043
1044    /// Adds a general object `value` (as bytes) to the cache at the given `key`.
1045    ///
1046    /// The cache is agnostic to what the bytes actually represent (and how it may be serialized),
1047    /// which provides maximum flexibility.
1048    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1049        check_valid_string(key, stringify!(key)).expect(FAILED);
1050        check_predicate_false(value.is_empty(), stringify!(value)).expect(FAILED);
1051
1052        log::debug!("Adding general {key}");
1053        self.general.insert(key.to_string(), value.clone());
1054
1055        if let Some(database) = &mut self.database {
1056            database.add(key.to_string(), value)?;
1057        }
1058        Ok(())
1059    }
1060
1061    /// Adds the given order `book` to the cache.
1062    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1063        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1064
1065        if self.config.save_market_data {
1066            if let Some(database) = &mut self.database {
1067                database.add_order_book(&book)?;
1068            }
1069        }
1070
1071        self.books.insert(book.instrument_id, book);
1072        Ok(())
1073    }
1074
1075    /// Adds the given `own_book` to the cache.
1076    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1077        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1078
1079        self.own_books.insert(own_book.instrument_id, own_book);
1080        Ok(())
1081    }
1082
1083    /// Adds the given `mark_price` update for the given `instrument_id` to the cache.
1084    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1085        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1086
1087        if self.config.save_market_data {
1088            // TODO: Placeholder and return Result for consistency
1089        }
1090
1091        let mark_prices_deque = self
1092            .mark_prices
1093            .entry(mark_price.instrument_id)
1094            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1095        mark_prices_deque.push_front(mark_price);
1096        Ok(())
1097    }
1098
1099    /// Adds the given `index_price` update for the given `instrument_id` to the cache.
1100    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1101        log::debug!(
1102            "Adding `IndexPriceUpdate` for {}",
1103            index_price.instrument_id
1104        );
1105
1106        if self.config.save_market_data {
1107            // TODO: Placeholder and return Result for consistency
1108        }
1109
1110        let index_prices_deque = self
1111            .index_prices
1112            .entry(index_price.instrument_id)
1113            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1114        index_prices_deque.push_front(index_price);
1115        Ok(())
1116    }
1117
1118    /// Adds the given `quote` tick to the cache.
1119    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1120        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1121
1122        if self.config.save_market_data {
1123            if let Some(database) = &mut self.database {
1124                database.add_quote(&quote)?;
1125            }
1126        }
1127
1128        let quotes_deque = self
1129            .quotes
1130            .entry(quote.instrument_id)
1131            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1132        quotes_deque.push_front(quote);
1133        Ok(())
1134    }
1135
1136    /// Adds the given `quotes` to the cache.
1137    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1138        check_slice_not_empty(quotes, stringify!(quotes)).unwrap();
1139
1140        let instrument_id = quotes[0].instrument_id;
1141        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1142
1143        if self.config.save_market_data {
1144            if let Some(database) = &mut self.database {
1145                for quote in quotes {
1146                    database.add_quote(quote).unwrap();
1147                }
1148            }
1149        }
1150
1151        let quotes_deque = self
1152            .quotes
1153            .entry(instrument_id)
1154            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1155
1156        for quote in quotes {
1157            quotes_deque.push_front(*quote);
1158        }
1159        Ok(())
1160    }
1161
1162    /// Adds the given `trade` tick to the cache.
1163    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1164        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1165
1166        if self.config.save_market_data {
1167            if let Some(database) = &mut self.database {
1168                database.add_trade(&trade)?;
1169            }
1170        }
1171
1172        let trades_deque = self
1173            .trades
1174            .entry(trade.instrument_id)
1175            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1176        trades_deque.push_front(trade);
1177        Ok(())
1178    }
1179
1180    /// Adds the give `trades` to the cache.
1181    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1182        check_slice_not_empty(trades, stringify!(trades)).unwrap();
1183
1184        let instrument_id = trades[0].instrument_id;
1185        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1186
1187        if self.config.save_market_data {
1188            if let Some(database) = &mut self.database {
1189                for trade in trades {
1190                    database.add_trade(trade).unwrap();
1191                }
1192            }
1193        }
1194
1195        let trades_deque = self
1196            .trades
1197            .entry(instrument_id)
1198            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1199
1200        for trade in trades {
1201            trades_deque.push_front(*trade);
1202        }
1203        Ok(())
1204    }
1205
1206    /// Adds the given `bar` to the cache.
1207    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1208        log::debug!("Adding `Bar` {}", bar.bar_type);
1209
1210        if self.config.save_market_data {
1211            if let Some(database) = &mut self.database {
1212                database.add_bar(&bar)?;
1213            }
1214        }
1215
1216        let bars = self
1217            .bars
1218            .entry(bar.bar_type)
1219            .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1220        bars.push_front(bar);
1221        Ok(())
1222    }
1223
1224    /// Adds the given `bars` to the cache.
1225    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1226        check_slice_not_empty(bars, stringify!(bars)).unwrap();
1227
1228        let bar_type = bars[0].bar_type;
1229        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1230
1231        if self.config.save_market_data {
1232            if let Some(database) = &mut self.database {
1233                for bar in bars {
1234                    database.add_bar(bar).unwrap();
1235                }
1236            }
1237        }
1238
1239        let bars_deque = self
1240            .bars
1241            .entry(bar_type)
1242            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1243
1244        for bar in bars {
1245            bars_deque.push_front(*bar);
1246        }
1247        Ok(())
1248    }
1249
1250    /// Adds the given `greeks` data to the cache.
1251    pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1252        log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1253
1254        if self.config.save_market_data {
1255            if let Some(_database) = &mut self.database {
1256                // TODO: Implement database.add_greeks(&greeks) when database adapter is updated
1257            }
1258        }
1259
1260        self.greeks.insert(greeks.instrument_id, greeks);
1261        Ok(())
1262    }
1263
1264    /// Gets the greeks data for the given instrument ID.
1265    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1266        self.greeks.get(instrument_id).cloned()
1267    }
1268
1269    /// Adds the given `yield_curve` data to the cache.
1270    pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1271        log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1272
1273        if self.config.save_market_data {
1274            if let Some(_database) = &mut self.database {
1275                // TODO: Implement database.add_yield_curve(&yield_curve) when database adapter is updated
1276            }
1277        }
1278
1279        self.yield_curves
1280            .insert(yield_curve.curve_name.clone(), yield_curve);
1281        Ok(())
1282    }
1283
1284    /// Gets the yield curve for the given key.
1285    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1286        self.yield_curves.get(key).map(|curve| {
1287            let curve_clone = curve.clone();
1288            Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1289                as Box<dyn Fn(f64) -> f64>
1290        })
1291    }
1292
1293    /// Adds the given `currency` to the cache.
1294    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1295        log::debug!("Adding `Currency` {}", currency.code);
1296
1297        if let Some(database) = &mut self.database {
1298            database.add_currency(&currency)?;
1299        }
1300
1301        self.currencies.insert(currency.code, currency);
1302        Ok(())
1303    }
1304
1305    /// Adds the given `instrument` to the cache.
1306    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1307        log::debug!("Adding `Instrument` {}", instrument.id());
1308
1309        if let Some(database) = &mut self.database {
1310            database.add_instrument(&instrument)?;
1311        }
1312
1313        self.instruments.insert(instrument.id(), instrument);
1314        Ok(())
1315    }
1316
1317    /// Adds the given `synthetic` instrument to the cache.
1318    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1319        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1320
1321        if let Some(database) = &mut self.database {
1322            database.add_synthetic(&synthetic)?;
1323        }
1324
1325        self.synthetics.insert(synthetic.id, synthetic);
1326        Ok(())
1327    }
1328
1329    /// Adds the given `account` to the cache.
1330    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1331        log::debug!("Adding `Account` {}", account.id());
1332
1333        if let Some(database) = &mut self.database {
1334            database.add_account(&account)?;
1335        }
1336
1337        let account_id = account.id();
1338        self.accounts.insert(account_id, account);
1339        self.index
1340            .venue_account
1341            .insert(account_id.get_issuer(), account_id);
1342        Ok(())
1343    }
1344
1345    /// Indexes the given `client_order_id` with the given `venue_order_id`.
1346    ///
1347    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
1348    pub fn add_venue_order_id(
1349        &mut self,
1350        client_order_id: &ClientOrderId,
1351        venue_order_id: &VenueOrderId,
1352        overwrite: bool,
1353    ) -> anyhow::Result<()> {
1354        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id) {
1355            if !overwrite && existing_venue_order_id != venue_order_id {
1356                anyhow::bail!(
1357                    "Existing {existing_venue_order_id} for {client_order_id}
1358                    did not match the given {venue_order_id}.
1359                    If you are writing a test then try a different `venue_order_id`,
1360                    otherwise this is probably a bug."
1361                );
1362            }
1363        }
1364
1365        self.index
1366            .client_order_ids
1367            .insert(*client_order_id, *venue_order_id);
1368        self.index
1369            .venue_order_ids
1370            .insert(*venue_order_id, *client_order_id);
1371
1372        Ok(())
1373    }
1374
1375    /// Adds the given `order` to the cache indexed with any given identifiers.
1376    ///
1377    /// # Parameters
1378    ///
1379    /// `override_existing`: If the added order should 'override' any existing order and replace
1380    /// it in the cache. This is currently used for emulated orders which are
1381    /// being released and transformed into another type.
1382    ///
1383    /// # Errors
1384    ///
1385    /// This function returns an error:
1386    /// If not `replace_existing` and the `order.client_order_id` is already contained in the cache.
1387    pub fn add_order(
1388        &mut self,
1389        order: OrderAny,
1390        position_id: Option<PositionId>,
1391        client_id: Option<ClientId>,
1392        replace_existing: bool,
1393    ) -> anyhow::Result<()> {
1394        let instrument_id = order.instrument_id();
1395        let venue = instrument_id.venue;
1396        let client_order_id = order.client_order_id();
1397        let strategy_id = order.strategy_id();
1398        let exec_algorithm_id = order.exec_algorithm_id();
1399        let exec_spawn_id = order.exec_spawn_id();
1400
1401        if !replace_existing {
1402            check_key_not_in_map(
1403                &client_order_id,
1404                &self.orders,
1405                stringify!(client_order_id),
1406                stringify!(orders),
1407            )
1408            .expect(FAILED);
1409            check_key_not_in_map(
1410                &client_order_id,
1411                &self.orders,
1412                stringify!(client_order_id),
1413                stringify!(orders),
1414            )
1415            .expect(FAILED);
1416            check_key_not_in_map(
1417                &client_order_id,
1418                &self.orders,
1419                stringify!(client_order_id),
1420                stringify!(orders),
1421            )
1422            .expect(FAILED);
1423            check_key_not_in_map(
1424                &client_order_id,
1425                &self.orders,
1426                stringify!(client_order_id),
1427                stringify!(orders),
1428            )
1429            .expect(FAILED);
1430        }
1431
1432        log::debug!("Adding {order:?}");
1433
1434        self.index.orders.insert(client_order_id);
1435        self.index
1436            .order_strategy
1437            .insert(client_order_id, strategy_id);
1438        self.index.strategies.insert(strategy_id);
1439
1440        // Update venue -> orders index
1441        self.index
1442            .venue_orders
1443            .entry(venue)
1444            .or_default()
1445            .insert(client_order_id);
1446
1447        // Update instrument -> orders index
1448        self.index
1449            .instrument_orders
1450            .entry(instrument_id)
1451            .or_default()
1452            .insert(client_order_id);
1453
1454        // Update strategy -> orders index
1455        self.index
1456            .strategy_orders
1457            .entry(strategy_id)
1458            .or_default()
1459            .insert(client_order_id);
1460
1461        // Update exec_algorithm -> orders index
1462        if let Some(exec_algorithm_id) = exec_algorithm_id {
1463            self.index.exec_algorithms.insert(exec_algorithm_id);
1464
1465            self.index
1466                .exec_algorithm_orders
1467                .entry(exec_algorithm_id)
1468                .or_default()
1469                .insert(client_order_id);
1470
1471            self.index
1472                .exec_spawn_orders
1473                .entry(exec_spawn_id.expect("`exec_spawn_id` is guaranteed to exist"))
1474                .or_default()
1475                .insert(client_order_id);
1476        }
1477
1478        // Update emulation index
1479        match order.emulation_trigger() {
1480            Some(_) => {
1481                self.index.orders_emulated.remove(&client_order_id);
1482            }
1483            None => {
1484                self.index.orders_emulated.insert(client_order_id);
1485            }
1486        }
1487
1488        // Index position ID if provided
1489        if let Some(position_id) = position_id {
1490            self.add_position_id(
1491                &position_id,
1492                &order.instrument_id().venue,
1493                &client_order_id,
1494                &strategy_id,
1495            )?;
1496        }
1497
1498        // Index client ID if provided
1499        if let Some(client_id) = client_id {
1500            self.index.order_client.insert(client_order_id, client_id);
1501            log::debug!("Indexed {client_id:?}");
1502        }
1503
1504        if let Some(database) = &mut self.database {
1505            database.add_order(&order, client_id)?;
1506            // TODO: Implement
1507            // if self.config.snapshot_orders {
1508            //     database.snapshot_order_state(order)?;
1509            // }
1510        }
1511
1512        self.orders.insert(client_order_id, order);
1513
1514        Ok(())
1515    }
1516
1517    /// Indexes the given `position_id` with the other given IDs.
1518    pub fn add_position_id(
1519        &mut self,
1520        position_id: &PositionId,
1521        venue: &Venue,
1522        client_order_id: &ClientOrderId,
1523        strategy_id: &StrategyId,
1524    ) -> anyhow::Result<()> {
1525        self.index
1526            .order_position
1527            .insert(*client_order_id, *position_id);
1528
1529        // Index: ClientOrderId -> PositionId
1530        if let Some(database) = &mut self.database {
1531            database.index_order_position(*client_order_id, *position_id)?;
1532        }
1533
1534        // Index: PositionId -> StrategyId
1535        self.index
1536            .position_strategy
1537            .insert(*position_id, *strategy_id);
1538
1539        // Index: PositionId -> set[ClientOrderId]
1540        self.index
1541            .position_orders
1542            .entry(*position_id)
1543            .or_default()
1544            .insert(*client_order_id);
1545
1546        // Index: StrategyId -> set[PositionId]
1547        self.index
1548            .strategy_positions
1549            .entry(*strategy_id)
1550            .or_default()
1551            .insert(*position_id);
1552
1553        // Index: Venue -> set[PositionId]
1554        self.index
1555            .venue_positions
1556            .entry(*venue)
1557            .or_default()
1558            .insert(*position_id);
1559
1560        Ok(())
1561    }
1562
1563    /// Adds the given `position` to the cache.
1564    pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1565        self.positions.insert(position.id, position.clone());
1566        self.index.positions.insert(position.id);
1567        self.index.positions_open.insert(position.id);
1568
1569        log::debug!("Adding {position}");
1570
1571        self.add_position_id(
1572            &position.id,
1573            &position.instrument_id.venue,
1574            &position.opening_order_id,
1575            &position.strategy_id,
1576        )?;
1577
1578        let venue = position.instrument_id.venue;
1579        let venue_positions = self.index.venue_positions.entry(venue).or_default();
1580        venue_positions.insert(position.id);
1581
1582        // Index: InstrumentId -> HashSet
1583        let instrument_id = position.instrument_id;
1584        let instrument_positions = self
1585            .index
1586            .instrument_positions
1587            .entry(instrument_id)
1588            .or_default();
1589        instrument_positions.insert(position.id);
1590
1591        if let Some(database) = &mut self.database {
1592            database.add_position(&position)?;
1593            // TODO: Implement position snapshots
1594            // if self.snapshot_positions {
1595            //     database.snapshot_position_state(
1596            //         position,
1597            //         position.ts_last,
1598            //         self.calculate_unrealized_pnl(&position),
1599            //     )?;
1600            // }
1601        }
1602
1603        Ok(())
1604    }
1605
1606    /// Updates the given `account` in the cache.
1607    pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1608        if let Some(database) = &mut self.database {
1609            database.update_account(&account)?;
1610        }
1611        Ok(())
1612    }
1613
1614    /// Updates the given `order` in the cache.
1615    pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1616        let client_order_id = order.client_order_id();
1617
1618        // Update venue order ID
1619        if let Some(venue_order_id) = order.venue_order_id() {
1620            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
1621            // venues which use a cancel+replace update strategy.
1622            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1623                // TODO: If the last event was `OrderUpdated` then overwrite should be true
1624                self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1625            }
1626        }
1627
1628        // Update in-flight state
1629        if order.is_inflight() {
1630            self.index.orders_inflight.insert(client_order_id);
1631        } else {
1632            self.index.orders_inflight.remove(&client_order_id);
1633        }
1634
1635        // Update open/closed state
1636        if order.is_open() {
1637            self.index.orders_closed.remove(&client_order_id);
1638            self.index.orders_open.insert(client_order_id);
1639        } else if order.is_closed() {
1640            self.index.orders_open.remove(&client_order_id);
1641            self.index.orders_pending_cancel.remove(&client_order_id);
1642            self.index.orders_closed.insert(client_order_id);
1643        }
1644
1645        // Update emulation
1646        if let Some(emulation_trigger) = order.emulation_trigger() {
1647            match emulation_trigger {
1648                TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1649                _ => self.index.orders_emulated.insert(client_order_id),
1650            };
1651        }
1652
1653        if let Some(database) = &mut self.database {
1654            database.update_order(order.last_event())?;
1655            // TODO: Implement order snapshots
1656            // if self.snapshot_orders {
1657            //     database.snapshot_order_state(order)?;
1658            // }
1659        }
1660
1661        // update the order in the cache
1662        self.orders.insert(client_order_id, order.clone());
1663
1664        Ok(())
1665    }
1666
1667    /// Updates the given `order` as pending cancel locally.
1668    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1669        self.index
1670            .orders_pending_cancel
1671            .insert(order.client_order_id());
1672    }
1673
1674    /// Updates the given `position` in the cache.
1675    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1676        // Update open/closed state
1677        if position.is_open() {
1678            self.index.positions_open.insert(position.id);
1679            self.index.positions_closed.remove(&position.id);
1680        } else {
1681            self.index.positions_closed.insert(position.id);
1682            self.index.positions_open.remove(&position.id);
1683        }
1684
1685        if let Some(database) = &mut self.database {
1686            database.update_position(position)?;
1687            // TODO: Implement order snapshots
1688            // if self.snapshot_orders {
1689            //     database.snapshot_order_state(order)?;
1690            // }
1691        }
1692        Ok(())
1693    }
1694
1695    /// Creates a snapshot of the given position by cloning it, assigning a new ID,
1696    /// serializing it, and storing it in the position snapshots.
1697    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1698        let position_id = position.id;
1699
1700        let mut copied_position = position.clone();
1701        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1702        copied_position.id = PositionId::new(new_id);
1703
1704        // Serialize the position (TODO: temporily just to JSON to remove a dependency)
1705        let position_serialized = serde_json::to_vec(&copied_position)?;
1706
1707        let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1708        let new_snapshots = match snapshots {
1709            Some(existing_snapshots) => {
1710                let mut combined = existing_snapshots.to_vec();
1711                combined.extend(position_serialized);
1712                Bytes::from(combined)
1713            }
1714            None => Bytes::from(position_serialized),
1715        };
1716        self.position_snapshots.insert(position_id, new_snapshots);
1717
1718        log::debug!("Snapshot {}", copied_position);
1719        Ok(())
1720    }
1721
1722    pub fn snapshot_position_state(
1723        &mut self,
1724        position: &Position,
1725        // ts_snapshot: u64,
1726        // unrealized_pnl: Option<Money>,
1727        open_only: Option<bool>,
1728    ) -> anyhow::Result<()> {
1729        let open_only = open_only.unwrap_or(true);
1730
1731        if open_only && !position.is_open() {
1732            return Ok(());
1733        }
1734
1735        if let Some(database) = &mut self.database {
1736            database.snapshot_position_state(position).map_err(|e| {
1737                log::error!(
1738                    "Failed to snapshot position state for {}: {e:?}",
1739                    position.id
1740                );
1741                e
1742            })?;
1743        } else {
1744            log::warn!(
1745                "Cannot snapshot position state for {} (no database configured)",
1746                position.id
1747            );
1748        }
1749
1750        // Ok(())
1751        todo!()
1752    }
1753
1754    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1755        let database = if let Some(database) = &self.database {
1756            database
1757        } else {
1758            log::warn!(
1759                "Cannot snapshot order state for {} (no database configured)",
1760                order.client_order_id()
1761            );
1762            return Ok(());
1763        };
1764
1765        database.snapshot_order_state(order)
1766    }
1767
1768    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
1769
1770    fn build_order_query_filter_set(
1771        &self,
1772        venue: Option<&Venue>,
1773        instrument_id: Option<&InstrumentId>,
1774        strategy_id: Option<&StrategyId>,
1775    ) -> Option<HashSet<ClientOrderId>> {
1776        let mut query: Option<HashSet<ClientOrderId>> = None;
1777
1778        if let Some(venue) = venue {
1779            query = Some(
1780                self.index
1781                    .venue_orders
1782                    .get(venue)
1783                    .cloned()
1784                    .unwrap_or_default(),
1785            );
1786        }
1787
1788        if let Some(instrument_id) = instrument_id {
1789            let instrument_orders = self
1790                .index
1791                .instrument_orders
1792                .get(instrument_id)
1793                .cloned()
1794                .unwrap_or_default();
1795
1796            if let Some(existing_query) = &mut query {
1797                *existing_query = existing_query
1798                    .intersection(&instrument_orders)
1799                    .copied()
1800                    .collect();
1801            } else {
1802                query = Some(instrument_orders);
1803            }
1804        }
1805
1806        if let Some(strategy_id) = strategy_id {
1807            let strategy_orders = self
1808                .index
1809                .strategy_orders
1810                .get(strategy_id)
1811                .cloned()
1812                .unwrap_or_default();
1813
1814            if let Some(existing_query) = &mut query {
1815                *existing_query = existing_query
1816                    .intersection(&strategy_orders)
1817                    .copied()
1818                    .collect();
1819            } else {
1820                query = Some(strategy_orders);
1821            }
1822        }
1823
1824        query
1825    }
1826
1827    fn build_position_query_filter_set(
1828        &self,
1829        venue: Option<&Venue>,
1830        instrument_id: Option<&InstrumentId>,
1831        strategy_id: Option<&StrategyId>,
1832    ) -> Option<HashSet<PositionId>> {
1833        let mut query: Option<HashSet<PositionId>> = None;
1834
1835        if let Some(venue) = venue {
1836            query = Some(
1837                self.index
1838                    .venue_positions
1839                    .get(venue)
1840                    .cloned()
1841                    .unwrap_or_default(),
1842            );
1843        }
1844
1845        if let Some(instrument_id) = instrument_id {
1846            let instrument_positions = self
1847                .index
1848                .instrument_positions
1849                .get(instrument_id)
1850                .cloned()
1851                .unwrap_or_default();
1852
1853            if let Some(existing_query) = query {
1854                query = Some(
1855                    existing_query
1856                        .intersection(&instrument_positions)
1857                        .copied()
1858                        .collect(),
1859                );
1860            } else {
1861                query = Some(instrument_positions);
1862            }
1863        }
1864
1865        if let Some(strategy_id) = strategy_id {
1866            let strategy_positions = self
1867                .index
1868                .strategy_positions
1869                .get(strategy_id)
1870                .cloned()
1871                .unwrap_or_default();
1872
1873            if let Some(existing_query) = query {
1874                query = Some(
1875                    existing_query
1876                        .intersection(&strategy_positions)
1877                        .copied()
1878                        .collect(),
1879                );
1880            } else {
1881                query = Some(strategy_positions);
1882            }
1883        }
1884
1885        query
1886    }
1887
1888    fn get_orders_for_ids(
1889        &self,
1890        client_order_ids: &HashSet<ClientOrderId>,
1891        side: Option<OrderSide>,
1892    ) -> Vec<&OrderAny> {
1893        let side = side.unwrap_or(OrderSide::NoOrderSide);
1894        let mut orders = Vec::new();
1895
1896        for client_order_id in client_order_ids {
1897            let order = self
1898                .orders
1899                .get(client_order_id)
1900                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
1901            if side == OrderSide::NoOrderSide || side == order.order_side() {
1902                orders.push(order);
1903            }
1904        }
1905
1906        orders
1907    }
1908
1909    fn get_positions_for_ids(
1910        &self,
1911        position_ids: &HashSet<PositionId>,
1912        side: Option<PositionSide>,
1913    ) -> Vec<&Position> {
1914        let side = side.unwrap_or(PositionSide::NoPositionSide);
1915        let mut positions = Vec::new();
1916
1917        for position_id in position_ids {
1918            let position = self
1919                .positions
1920                .get(position_id)
1921                .unwrap_or_else(|| panic!("Position {position_id} not found"));
1922            if side == PositionSide::NoPositionSide || side == position.side {
1923                positions.push(position);
1924            }
1925        }
1926
1927        positions
1928    }
1929
1930    /// Returns the `ClientOrderId`s of all orders.
1931    #[must_use]
1932    pub fn client_order_ids(
1933        &self,
1934        venue: Option<&Venue>,
1935        instrument_id: Option<&InstrumentId>,
1936        strategy_id: Option<&StrategyId>,
1937    ) -> HashSet<ClientOrderId> {
1938        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1939        match query {
1940            Some(query) => self.index.orders.intersection(&query).copied().collect(),
1941            None => self.index.orders.clone(),
1942        }
1943    }
1944
1945    /// Returns the `ClientOrderId`s of all open orders.
1946    #[must_use]
1947    pub fn client_order_ids_open(
1948        &self,
1949        venue: Option<&Venue>,
1950        instrument_id: Option<&InstrumentId>,
1951        strategy_id: Option<&StrategyId>,
1952    ) -> HashSet<ClientOrderId> {
1953        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1954        match query {
1955            Some(query) => self
1956                .index
1957                .orders_open
1958                .intersection(&query)
1959                .copied()
1960                .collect(),
1961            None => self.index.orders_open.clone(),
1962        }
1963    }
1964
1965    /// Returns the `ClientOrderId`s of all closed orders.
1966    #[must_use]
1967    pub fn client_order_ids_closed(
1968        &self,
1969        venue: Option<&Venue>,
1970        instrument_id: Option<&InstrumentId>,
1971        strategy_id: Option<&StrategyId>,
1972    ) -> HashSet<ClientOrderId> {
1973        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1974        match query {
1975            Some(query) => self
1976                .index
1977                .orders_closed
1978                .intersection(&query)
1979                .copied()
1980                .collect(),
1981            None => self.index.orders_closed.clone(),
1982        }
1983    }
1984
1985    /// Returns the `ClientOrderId`s of all emulated orders.
1986    #[must_use]
1987    pub fn client_order_ids_emulated(
1988        &self,
1989        venue: Option<&Venue>,
1990        instrument_id: Option<&InstrumentId>,
1991        strategy_id: Option<&StrategyId>,
1992    ) -> HashSet<ClientOrderId> {
1993        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1994        match query {
1995            Some(query) => self
1996                .index
1997                .orders_emulated
1998                .intersection(&query)
1999                .copied()
2000                .collect(),
2001            None => self.index.orders_emulated.clone(),
2002        }
2003    }
2004
2005    /// Returns the `ClientOrderId`s of all in-flight orders.
2006    #[must_use]
2007    pub fn client_order_ids_inflight(
2008        &self,
2009        venue: Option<&Venue>,
2010        instrument_id: Option<&InstrumentId>,
2011        strategy_id: Option<&StrategyId>,
2012    ) -> HashSet<ClientOrderId> {
2013        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2014        match query {
2015            Some(query) => self
2016                .index
2017                .orders_inflight
2018                .intersection(&query)
2019                .copied()
2020                .collect(),
2021            None => self.index.orders_inflight.clone(),
2022        }
2023    }
2024
2025    /// Returns `PositionId`s of all positions.
2026    #[must_use]
2027    pub fn position_ids(
2028        &self,
2029        venue: Option<&Venue>,
2030        instrument_id: Option<&InstrumentId>,
2031        strategy_id: Option<&StrategyId>,
2032    ) -> HashSet<PositionId> {
2033        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2034        match query {
2035            Some(query) => self.index.positions.intersection(&query).copied().collect(),
2036            None => self.index.positions.clone(),
2037        }
2038    }
2039
2040    /// Returns the `PositionId`s of all open positions.
2041    #[must_use]
2042    pub fn position_open_ids(
2043        &self,
2044        venue: Option<&Venue>,
2045        instrument_id: Option<&InstrumentId>,
2046        strategy_id: Option<&StrategyId>,
2047    ) -> HashSet<PositionId> {
2048        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2049        match query {
2050            Some(query) => self
2051                .index
2052                .positions_open
2053                .intersection(&query)
2054                .copied()
2055                .collect(),
2056            None => self.index.positions_open.clone(),
2057        }
2058    }
2059
2060    /// Returns the `PositionId`s of all closed positions.
2061    #[must_use]
2062    pub fn position_closed_ids(
2063        &self,
2064        venue: Option<&Venue>,
2065        instrument_id: Option<&InstrumentId>,
2066        strategy_id: Option<&StrategyId>,
2067    ) -> HashSet<PositionId> {
2068        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2069        match query {
2070            Some(query) => self
2071                .index
2072                .positions_closed
2073                .intersection(&query)
2074                .copied()
2075                .collect(),
2076            None => self.index.positions_closed.clone(),
2077        }
2078    }
2079
2080    /// Returns the `ComponentId`s of all actors.
2081    #[must_use]
2082    pub fn actor_ids(&self) -> HashSet<ComponentId> {
2083        self.index.actors.clone()
2084    }
2085
2086    /// Returns the `StrategyId`s of all strategies.
2087    #[must_use]
2088    pub fn strategy_ids(&self) -> HashSet<StrategyId> {
2089        self.index.strategies.clone()
2090    }
2091
2092    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
2093    #[must_use]
2094    pub fn exec_algorithm_ids(&self) -> HashSet<ExecAlgorithmId> {
2095        self.index.exec_algorithms.clone()
2096    }
2097
2098    // -- ORDER QUERIES ---------------------------------------------------------------------------
2099
2100    /// Gets a reference to the order with the given `client_order_id` (if found).
2101    #[must_use]
2102    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2103        self.orders.get(client_order_id)
2104    }
2105
2106    /// Gets a reference to the order with the given `client_order_id` (if found).
2107    #[must_use]
2108    pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2109        self.orders.get_mut(client_order_id)
2110    }
2111
2112    /// Gets a reference to the client order ID for given `venue_order_id` (if found).
2113    #[must_use]
2114    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2115        self.index.venue_order_ids.get(venue_order_id)
2116    }
2117
2118    /// Gets a reference to the venue order ID for given `client_order_id` (if found).
2119    #[must_use]
2120    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2121        self.index.client_order_ids.get(client_order_id)
2122    }
2123
2124    /// Gets a reference to the client ID indexed for given `client_order_id` (if found).
2125    #[must_use]
2126    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2127        self.index.order_client.get(client_order_id)
2128    }
2129
2130    /// Returns references to all orders matching the given optional filter parameters.
2131    #[must_use]
2132    pub fn orders(
2133        &self,
2134        venue: Option<&Venue>,
2135        instrument_id: Option<&InstrumentId>,
2136        strategy_id: Option<&StrategyId>,
2137        side: Option<OrderSide>,
2138    ) -> Vec<&OrderAny> {
2139        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2140        self.get_orders_for_ids(&client_order_ids, side)
2141    }
2142
2143    /// Returns references to all open orders matching the given optional filter parameters.
2144    #[must_use]
2145    pub fn orders_open(
2146        &self,
2147        venue: Option<&Venue>,
2148        instrument_id: Option<&InstrumentId>,
2149        strategy_id: Option<&StrategyId>,
2150        side: Option<OrderSide>,
2151    ) -> Vec<&OrderAny> {
2152        let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2153        self.get_orders_for_ids(&client_order_ids, side)
2154    }
2155
2156    /// Returns references to all closed orders matching the given optional filter parameters.
2157    #[must_use]
2158    pub fn orders_closed(
2159        &self,
2160        venue: Option<&Venue>,
2161        instrument_id: Option<&InstrumentId>,
2162        strategy_id: Option<&StrategyId>,
2163        side: Option<OrderSide>,
2164    ) -> Vec<&OrderAny> {
2165        let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2166        self.get_orders_for_ids(&client_order_ids, side)
2167    }
2168
2169    /// Returns references to all emulated orders matching the given optional filter parameters.
2170    #[must_use]
2171    pub fn orders_emulated(
2172        &self,
2173        venue: Option<&Venue>,
2174        instrument_id: Option<&InstrumentId>,
2175        strategy_id: Option<&StrategyId>,
2176        side: Option<OrderSide>,
2177    ) -> Vec<&OrderAny> {
2178        let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2179        self.get_orders_for_ids(&client_order_ids, side)
2180    }
2181
2182    /// Returns references to all in-flight orders matching the given optional filter parameters.
2183    #[must_use]
2184    pub fn orders_inflight(
2185        &self,
2186        venue: Option<&Venue>,
2187        instrument_id: Option<&InstrumentId>,
2188        strategy_id: Option<&StrategyId>,
2189        side: Option<OrderSide>,
2190    ) -> Vec<&OrderAny> {
2191        let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2192        self.get_orders_for_ids(&client_order_ids, side)
2193    }
2194
2195    /// Returns references to all orders for the given `position_id`.
2196    #[must_use]
2197    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2198        let client_order_ids = self.index.position_orders.get(position_id);
2199        match client_order_ids {
2200            Some(client_order_ids) => {
2201                self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2202            }
2203            None => Vec::new(),
2204        }
2205    }
2206
2207    /// Returns whether an order with the given `client_order_id` exists.
2208    #[must_use]
2209    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2210        self.index.orders.contains(client_order_id)
2211    }
2212
2213    /// Returns whether an order with the given `client_order_id` is open.
2214    #[must_use]
2215    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2216        self.index.orders_open.contains(client_order_id)
2217    }
2218
2219    /// Returns whether an order with the given `client_order_id` is closed.
2220    #[must_use]
2221    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2222        self.index.orders_closed.contains(client_order_id)
2223    }
2224
2225    /// Returns whether an order with the given `client_order_id` is emulated.
2226    #[must_use]
2227    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2228        self.index.orders_emulated.contains(client_order_id)
2229    }
2230
2231    /// Returns whether an order with the given `client_order_id` is in-flight.
2232    #[must_use]
2233    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2234        self.index.orders_inflight.contains(client_order_id)
2235    }
2236
2237    /// Returns whether an order with the given `client_order_id` is `PENDING_CANCEL` locally.
2238    #[must_use]
2239    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2240        self.index.orders_pending_cancel.contains(client_order_id)
2241    }
2242
2243    /// Returns the count of all open orders.
2244    #[must_use]
2245    pub fn orders_open_count(
2246        &self,
2247        venue: Option<&Venue>,
2248        instrument_id: Option<&InstrumentId>,
2249        strategy_id: Option<&StrategyId>,
2250        side: Option<OrderSide>,
2251    ) -> usize {
2252        self.orders_open(venue, instrument_id, strategy_id, side)
2253            .len()
2254    }
2255
2256    /// Returns the count of all closed orders.
2257    #[must_use]
2258    pub fn orders_closed_count(
2259        &self,
2260        venue: Option<&Venue>,
2261        instrument_id: Option<&InstrumentId>,
2262        strategy_id: Option<&StrategyId>,
2263        side: Option<OrderSide>,
2264    ) -> usize {
2265        self.orders_closed(venue, instrument_id, strategy_id, side)
2266            .len()
2267    }
2268
2269    /// Returns the count of all emulated orders.
2270    #[must_use]
2271    pub fn orders_emulated_count(
2272        &self,
2273        venue: Option<&Venue>,
2274        instrument_id: Option<&InstrumentId>,
2275        strategy_id: Option<&StrategyId>,
2276        side: Option<OrderSide>,
2277    ) -> usize {
2278        self.orders_emulated(venue, instrument_id, strategy_id, side)
2279            .len()
2280    }
2281
2282    /// Returns the count of all in-flight orders.
2283    #[must_use]
2284    pub fn orders_inflight_count(
2285        &self,
2286        venue: Option<&Venue>,
2287        instrument_id: Option<&InstrumentId>,
2288        strategy_id: Option<&StrategyId>,
2289        side: Option<OrderSide>,
2290    ) -> usize {
2291        self.orders_inflight(venue, instrument_id, strategy_id, side)
2292            .len()
2293    }
2294
2295    /// Returns the count of all orders.
2296    #[must_use]
2297    pub fn orders_total_count(
2298        &self,
2299        venue: Option<&Venue>,
2300        instrument_id: Option<&InstrumentId>,
2301        strategy_id: Option<&StrategyId>,
2302        side: Option<OrderSide>,
2303    ) -> usize {
2304        self.orders(venue, instrument_id, strategy_id, side).len()
2305    }
2306
2307    /// Returns the order list for the given `order_list_id`.
2308    #[must_use]
2309    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2310        self.order_lists.get(order_list_id)
2311    }
2312
2313    /// Returns all order lists matching the given optional filter parameters.
2314    #[must_use]
2315    pub fn order_lists(
2316        &self,
2317        venue: Option<&Venue>,
2318        instrument_id: Option<&InstrumentId>,
2319        strategy_id: Option<&StrategyId>,
2320    ) -> Vec<&OrderList> {
2321        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2322
2323        if let Some(venue) = venue {
2324            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2325        }
2326
2327        if let Some(instrument_id) = instrument_id {
2328            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2329        }
2330
2331        if let Some(strategy_id) = strategy_id {
2332            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2333        }
2334
2335        order_lists
2336    }
2337
2338    /// Returns whether an order list with the given `order_list_id` exists.
2339    #[must_use]
2340    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2341        self.order_lists.contains_key(order_list_id)
2342    }
2343
2344    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
2345
2346    /// Returns references to all orders associated with the given `exec_algorithm_id` matching the given
2347    /// optional filter parameters.
2348    #[must_use]
2349    pub fn orders_for_exec_algorithm(
2350        &self,
2351        exec_algorithm_id: &ExecAlgorithmId,
2352        venue: Option<&Venue>,
2353        instrument_id: Option<&InstrumentId>,
2354        strategy_id: Option<&StrategyId>,
2355        side: Option<OrderSide>,
2356    ) -> Vec<&OrderAny> {
2357        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2358        let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2359
2360        if let Some(query) = query {
2361            if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2362                let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2363            }
2364        }
2365
2366        if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2367            self.get_orders_for_ids(exec_algorithm_order_ids, side)
2368        } else {
2369            Vec::new()
2370        }
2371    }
2372
2373    /// Returns references to all orders with the given `exec_spawn_id`.
2374    #[must_use]
2375    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2376        self.get_orders_for_ids(
2377            self.index
2378                .exec_spawn_orders
2379                .get(exec_spawn_id)
2380                .unwrap_or(&HashSet::new()),
2381            None,
2382        )
2383    }
2384
2385    /// Returns the total order quantity for the given `exec_spawn_id`.
2386    #[must_use]
2387    pub fn exec_spawn_total_quantity(
2388        &self,
2389        exec_spawn_id: &ClientOrderId,
2390        active_only: bool,
2391    ) -> Option<Quantity> {
2392        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2393
2394        let mut total_quantity: Option<Quantity> = None;
2395
2396        for spawn_order in exec_spawn_orders {
2397            if !active_only || !spawn_order.is_closed() {
2398                if let Some(mut total_quantity) = total_quantity {
2399                    total_quantity += spawn_order.quantity();
2400                }
2401            } else {
2402                total_quantity = Some(spawn_order.quantity());
2403            }
2404        }
2405
2406        total_quantity
2407    }
2408
2409    /// Returns the total filled quantity for all orders with the given `exec_spawn_id`.
2410    #[must_use]
2411    pub fn exec_spawn_total_filled_qty(
2412        &self,
2413        exec_spawn_id: &ClientOrderId,
2414        active_only: bool,
2415    ) -> Option<Quantity> {
2416        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2417
2418        let mut total_quantity: Option<Quantity> = None;
2419
2420        for spawn_order in exec_spawn_orders {
2421            if !active_only || !spawn_order.is_closed() {
2422                if let Some(mut total_quantity) = total_quantity {
2423                    total_quantity += spawn_order.filled_qty();
2424                }
2425            } else {
2426                total_quantity = Some(spawn_order.filled_qty());
2427            }
2428        }
2429
2430        total_quantity
2431    }
2432
2433    /// Returns the total leaves quantity for all orders with the given `exec_spawn_id`.
2434    #[must_use]
2435    pub fn exec_spawn_total_leaves_qty(
2436        &self,
2437        exec_spawn_id: &ClientOrderId,
2438        active_only: bool,
2439    ) -> Option<Quantity> {
2440        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2441
2442        let mut total_quantity: Option<Quantity> = None;
2443
2444        for spawn_order in exec_spawn_orders {
2445            if !active_only || !spawn_order.is_closed() {
2446                if let Some(mut total_quantity) = total_quantity {
2447                    total_quantity += spawn_order.leaves_qty();
2448                }
2449            } else {
2450                total_quantity = Some(spawn_order.leaves_qty());
2451            }
2452        }
2453
2454        total_quantity
2455    }
2456
2457    // -- POSITION QUERIES ------------------------------------------------------------------------
2458
2459    /// Returns a reference to the position with the given `position_id` (if found).
2460    #[must_use]
2461    pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2462        self.positions.get(position_id)
2463    }
2464
2465    /// Returns a reference to the position for the given `client_order_id` (if found).
2466    #[must_use]
2467    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2468        self.index
2469            .order_position
2470            .get(client_order_id)
2471            .and_then(|position_id| self.positions.get(position_id))
2472    }
2473
2474    /// Returns a reference to the position ID for the given `client_order_id` (if found).
2475    #[must_use]
2476    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2477        self.index.order_position.get(client_order_id)
2478    }
2479
2480    /// Returns a reference to all positions matching the given optional filter parameters.
2481    #[must_use]
2482    pub fn positions(
2483        &self,
2484        venue: Option<&Venue>,
2485        instrument_id: Option<&InstrumentId>,
2486        strategy_id: Option<&StrategyId>,
2487        side: Option<PositionSide>,
2488    ) -> Vec<&Position> {
2489        let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2490        self.get_positions_for_ids(&position_ids, side)
2491    }
2492
2493    /// Returns a reference to all open positions matching the given optional filter parameters.
2494    #[must_use]
2495    pub fn positions_open(
2496        &self,
2497        venue: Option<&Venue>,
2498        instrument_id: Option<&InstrumentId>,
2499        strategy_id: Option<&StrategyId>,
2500        side: Option<PositionSide>,
2501    ) -> Vec<&Position> {
2502        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2503        self.get_positions_for_ids(&position_ids, side)
2504    }
2505
2506    /// Returns a reference to all closed positions matching the given optional filter parameters.
2507    #[must_use]
2508    pub fn positions_closed(
2509        &self,
2510        venue: Option<&Venue>,
2511        instrument_id: Option<&InstrumentId>,
2512        strategy_id: Option<&StrategyId>,
2513        side: Option<PositionSide>,
2514    ) -> Vec<&Position> {
2515        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2516        self.get_positions_for_ids(&position_ids, side)
2517    }
2518
2519    /// Returns whether a position with the given `position_id` exists.
2520    #[must_use]
2521    pub fn position_exists(&self, position_id: &PositionId) -> bool {
2522        self.index.positions.contains(position_id)
2523    }
2524
2525    /// Returns whether a position with the given `position_id` is open.
2526    #[must_use]
2527    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2528        self.index.positions_open.contains(position_id)
2529    }
2530
2531    /// Returns whether a position with the given `position_id` is closed.
2532    #[must_use]
2533    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2534        self.index.positions_closed.contains(position_id)
2535    }
2536
2537    /// Returns the count of all open positions.
2538    #[must_use]
2539    pub fn positions_open_count(
2540        &self,
2541        venue: Option<&Venue>,
2542        instrument_id: Option<&InstrumentId>,
2543        strategy_id: Option<&StrategyId>,
2544        side: Option<PositionSide>,
2545    ) -> usize {
2546        self.positions_open(venue, instrument_id, strategy_id, side)
2547            .len()
2548    }
2549
2550    /// Returns the count of all closed positions.
2551    #[must_use]
2552    pub fn positions_closed_count(
2553        &self,
2554        venue: Option<&Venue>,
2555        instrument_id: Option<&InstrumentId>,
2556        strategy_id: Option<&StrategyId>,
2557        side: Option<PositionSide>,
2558    ) -> usize {
2559        self.positions_closed(venue, instrument_id, strategy_id, side)
2560            .len()
2561    }
2562
2563    /// Returns the count of all positions.
2564    #[must_use]
2565    pub fn positions_total_count(
2566        &self,
2567        venue: Option<&Venue>,
2568        instrument_id: Option<&InstrumentId>,
2569        strategy_id: Option<&StrategyId>,
2570        side: Option<PositionSide>,
2571    ) -> usize {
2572        self.positions(venue, instrument_id, strategy_id, side)
2573            .len()
2574    }
2575
2576    // -- STRATEGY QUERIES ------------------------------------------------------------------------
2577
2578    /// Gets a reference to the strategy ID for the given `client_order_id` (if found).
2579    #[must_use]
2580    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2581        self.index.order_strategy.get(client_order_id)
2582    }
2583
2584    /// Gets a reference to the strategy ID for the given `position_id` (if found).
2585    #[must_use]
2586    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2587        self.index.position_strategy.get(position_id)
2588    }
2589
2590    // -- GENERAL ---------------------------------------------------------------------------------
2591
2592    /// Gets a reference to the general object value for the given `key` (if found).
2593    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2594        check_valid_string(key, stringify!(key)).expect(FAILED);
2595
2596        Ok(self.general.get(key))
2597    }
2598
2599    // -- DATA QUERIES ----------------------------------------------------------------------------
2600
2601    /// Returns the price for the given `instrument_id` and `price_type` (if found).
2602    #[must_use]
2603    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2604        match price_type {
2605            PriceType::Bid => self
2606                .quotes
2607                .get(instrument_id)
2608                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2609            PriceType::Ask => self
2610                .quotes
2611                .get(instrument_id)
2612                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2613            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2614                quotes.front().map(|quote| {
2615                    Price::new(
2616                        f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2617                        quote.bid_price.precision + 1,
2618                    )
2619                })
2620            }),
2621            PriceType::Last => self
2622                .trades
2623                .get(instrument_id)
2624                .and_then(|trades| trades.front().map(|trade| trade.price)),
2625            PriceType::Mark => self
2626                .mark_prices
2627                .get(instrument_id)
2628                .and_then(|marks| marks.front().map(|mark| mark.value)),
2629        }
2630    }
2631
2632    /// Gets all quotes for the given `instrument_id`.
2633    #[must_use]
2634    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2635        self.quotes
2636            .get(instrument_id)
2637            .map(|quotes| quotes.iter().copied().collect())
2638    }
2639
2640    /// Gets all trades for the given `instrument_id`.
2641    #[must_use]
2642    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2643        self.trades
2644            .get(instrument_id)
2645            .map(|trades| trades.iter().copied().collect())
2646    }
2647
2648    /// Gets all mark price updates for the given `instrument_id`.
2649    #[must_use]
2650    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
2651        self.mark_prices
2652            .get(instrument_id)
2653            .map(|mark_prices| mark_prices.iter().copied().collect())
2654    }
2655
2656    /// Gets all index price updates for the given `instrument_id`.
2657    #[must_use]
2658    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
2659        self.index_prices
2660            .get(instrument_id)
2661            .map(|index_prices| index_prices.iter().copied().collect())
2662    }
2663
2664    /// Gets all bars for the given `bar_type`.
2665    #[must_use]
2666    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2667        self.bars
2668            .get(bar_type)
2669            .map(|bars| bars.iter().copied().collect())
2670    }
2671
2672    /// Gets a reference to the order book for the given `instrument_id`.
2673    #[must_use]
2674    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
2675        self.books.get(instrument_id)
2676    }
2677
2678    /// Gets a reference to the order book for the given `instrument_id`.
2679    #[must_use]
2680    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
2681        self.books.get_mut(instrument_id)
2682    }
2683
2684    /// Gets a reference to the own order book for the given `instrument_id`.
2685    #[must_use]
2686    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
2687        self.own_books.get(instrument_id)
2688    }
2689
2690    /// Gets a reference to the own order book for the given `instrument_id`.
2691    #[must_use]
2692    pub fn own_order_book_mut(
2693        &mut self,
2694        instrument_id: &InstrumentId,
2695    ) -> Option<&mut OwnOrderBook> {
2696        self.own_books.get_mut(instrument_id)
2697    }
2698
2699    /// Gets a reference to the latest quote tick for the given `instrument_id`.
2700    #[must_use]
2701    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
2702        self.quotes
2703            .get(instrument_id)
2704            .and_then(|quotes| quotes.front())
2705    }
2706
2707    /// Gets a reference to the latest trade tick for the given `instrument_id`.
2708    #[must_use]
2709    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
2710        self.trades
2711            .get(instrument_id)
2712            .and_then(|trades| trades.front())
2713    }
2714
2715    /// Gets a referenece to the latest mark price update for the given `instrument_id`.
2716    #[must_use]
2717    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
2718        self.mark_prices
2719            .get(instrument_id)
2720            .and_then(|mark_prices| mark_prices.front())
2721    }
2722
2723    /// Gets a referenece to the latest index price update for the given `instrument_id`.
2724    #[must_use]
2725    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
2726        self.index_prices
2727            .get(instrument_id)
2728            .and_then(|index_prices| index_prices.front())
2729    }
2730
2731    /// Gets a reference to the latest bar for the given `bar_type`.
2732    #[must_use]
2733    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
2734        self.bars.get(bar_type).and_then(|bars| bars.front())
2735    }
2736
2737    /// Gets the order book update count for the given `instrument_id`.
2738    #[must_use]
2739    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
2740        self.books
2741            .get(instrument_id)
2742            .map_or(0, |book| book.update_count) as usize
2743    }
2744
2745    /// Gets the quote tick count for the given `instrument_id`.
2746    #[must_use]
2747    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
2748        self.quotes
2749            .get(instrument_id)
2750            .map_or(0, std::collections::VecDeque::len)
2751    }
2752
2753    /// Gets the trade tick count for the given `instrument_id`.
2754    #[must_use]
2755    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
2756        self.trades
2757            .get(instrument_id)
2758            .map_or(0, std::collections::VecDeque::len)
2759    }
2760
2761    /// Gets the bar count for the given `instrument_id`.
2762    #[must_use]
2763    pub fn bar_count(&self, bar_type: &BarType) -> usize {
2764        self.bars
2765            .get(bar_type)
2766            .map_or(0, std::collections::VecDeque::len)
2767    }
2768
2769    /// Returns whether the cache contains an order book for the given `instrument_id`.
2770    #[must_use]
2771    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
2772        self.books.contains_key(instrument_id)
2773    }
2774
2775    /// Returns whether the cache contains quotes for the given `instrument_id`.
2776    #[must_use]
2777    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
2778        self.quote_count(instrument_id) > 0
2779    }
2780
2781    /// Returns whether the cache contains trades for the given `instrument_id`.
2782    #[must_use]
2783    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
2784        self.trade_count(instrument_id) > 0
2785    }
2786
2787    /// Returns whether the cache contains bars for the given `bar_type`.
2788    #[must_use]
2789    pub fn has_bars(&self, bar_type: &BarType) -> bool {
2790        self.bar_count(bar_type) > 0
2791    }
2792
2793    #[must_use]
2794    pub fn get_xrate(
2795        &self,
2796        venue: Venue,
2797        from_currency: Currency,
2798        to_currency: Currency,
2799        price_type: PriceType,
2800    ) -> Option<f64> {
2801        if from_currency == to_currency {
2802            // When the source and target currencies are identical,
2803            // no conversion is needed; return an exchange rate of 1.0.
2804            return Some(1.0);
2805        }
2806
2807        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
2808
2809        match get_exchange_rate(
2810            from_currency.code,
2811            to_currency.code,
2812            price_type,
2813            bid_quote,
2814            ask_quote,
2815        ) {
2816            Ok(rate) => rate,
2817            Err(e) => {
2818                log::error!("Failed to calculate xrate: {e}");
2819                None
2820            }
2821        }
2822    }
2823
2824    fn build_quote_table(&self, venue: &Venue) -> (HashMap<String, f64>, HashMap<String, f64>) {
2825        let mut bid_quotes = HashMap::new();
2826        let mut ask_quotes = HashMap::new();
2827
2828        for instrument_id in self.instruments.keys() {
2829            if instrument_id.venue != *venue {
2830                continue;
2831            }
2832
2833            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
2834                if let Some(tick) = ticks.front() {
2835                    (tick.bid_price, tick.ask_price)
2836                } else {
2837                    continue; // Empty ticks vector
2838                }
2839            } else {
2840                let bid_bar = self
2841                    .bars
2842                    .iter()
2843                    .find(|(k, _)| {
2844                        k.instrument_id() == *instrument_id
2845                            && matches!(k.spec().price_type, PriceType::Bid)
2846                    })
2847                    .map(|(_, v)| v);
2848
2849                let ask_bar = self
2850                    .bars
2851                    .iter()
2852                    .find(|(k, _)| {
2853                        k.instrument_id() == *instrument_id
2854                            && matches!(k.spec().price_type, PriceType::Ask)
2855                    })
2856                    .map(|(_, v)| v);
2857
2858                match (bid_bar, ask_bar) {
2859                    (Some(bid), Some(ask)) => {
2860                        let bid_price = bid.front().unwrap().close;
2861                        let ask_price = ask.front().unwrap().close;
2862
2863                        (bid_price, ask_price)
2864                    }
2865                    _ => continue,
2866                }
2867            };
2868
2869            bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
2870            ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
2871        }
2872
2873        (bid_quotes, ask_quotes)
2874    }
2875
2876    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
2877    #[must_use]
2878    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
2879        self.mark_xrates.get(&(from_currency, to_currency)).copied()
2880    }
2881
2882    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
2883    ///
2884    /// # Panics
2885    ///
2886    /// This function panics if `xrate` is not positive.
2887    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
2888        assert!(xrate > 0.0, "xrate was zero");
2889        self.mark_xrates.insert((from_currency, to_currency), xrate);
2890        self.mark_xrates
2891            .insert((to_currency, from_currency), 1.0 / xrate);
2892    }
2893
2894    /// Clears the mark exchange rate for the given currency pair.
2895    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
2896        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
2897    }
2898
2899    /// Clears all mark exchange rates.
2900    pub fn clear_mark_xrates(&mut self) {
2901        self.mark_xrates.clear();
2902    }
2903
2904    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
2905
2906    /// Returns a reference to the instrument for the given `instrument_id` (if found).
2907    #[must_use]
2908    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
2909        self.instruments.get(instrument_id)
2910    }
2911
2912    /// Returns references to all instrument IDs for the given `venue`.
2913    #[must_use]
2914    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
2915        self.instruments
2916            .keys()
2917            .filter(|i| venue.is_none() || &i.venue == venue.unwrap())
2918            .collect()
2919    }
2920
2921    /// Returns references to all instruments for the given `venue`.
2922    #[must_use]
2923    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
2924        self.instruments
2925            .values()
2926            .filter(|i| &i.id().venue == venue)
2927            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
2928            .collect()
2929    }
2930
2931    /// Returns references to all bar types contained in the cache.
2932    #[must_use]
2933    pub fn bar_types(
2934        &self,
2935        instrument_id: Option<&InstrumentId>,
2936        price_type: Option<&PriceType>,
2937        aggregation_source: AggregationSource,
2938    ) -> Vec<&BarType> {
2939        let mut bar_types = self
2940            .bars
2941            .keys()
2942            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
2943            .collect::<Vec<&BarType>>();
2944
2945        if let Some(instrument_id) = instrument_id {
2946            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
2947        }
2948
2949        if let Some(price_type) = price_type {
2950            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
2951        }
2952
2953        bar_types
2954    }
2955
2956    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
2957
2958    /// Returns a reference to the synthetic instrument for the given `instrument_id` (if found).
2959    #[must_use]
2960    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
2961        self.synthetics.get(instrument_id)
2962    }
2963
2964    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
2965    #[must_use]
2966    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
2967        self.synthetics.keys().collect()
2968    }
2969
2970    /// Returns references to all synthetic instruments contained in the cache.
2971    #[must_use]
2972    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
2973        self.synthetics.values().collect()
2974    }
2975
2976    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
2977
2978    /// Returns a reference to the account for the given `account_id` (if found).
2979    #[must_use]
2980    pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
2981        self.accounts.get(account_id)
2982    }
2983
2984    /// Returns a reference to the account for the given `venue` (if found).
2985    #[must_use]
2986    pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
2987        self.index
2988            .venue_account
2989            .get(venue)
2990            .and_then(|account_id| self.accounts.get(account_id))
2991    }
2992
2993    /// Returns a reference to the account ID for the given `venue` (if found).
2994    #[must_use]
2995    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
2996        self.index.venue_account.get(venue)
2997    }
2998
2999    /// Returns references to all accounts for the given `account_id`.
3000    #[must_use]
3001    pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3002        self.accounts
3003            .values()
3004            .filter(|account| &account.id() == account_id)
3005            .collect()
3006    }
3007}