nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! A common in-memory `Cache` for market and execution related data.
17
18pub mod config;
19pub mod database;
20
21mod index;
22
23#[cfg(test)]
24mod tests;
25
26use std::{
27    collections::{HashMap, HashSet, VecDeque},
28    time::{SystemTime, UNIX_EPOCH},
29};
30
31use bytes::Bytes;
32pub use config::CacheConfig; // Re-export
33use database::{CacheDatabaseAdapter, CacheMap};
34use index::CacheIndex;
35use nautilus_core::{
36    UUID4, UnixNanos,
37    correctness::{
38        FAILED, check_key_not_in_map, check_predicate_false, check_slice_not_empty,
39        check_valid_string,
40    },
41    datetime::secs_to_nanos,
42};
43use nautilus_model::{
44    accounts::AccountAny,
45    data::{
46        Bar, BarType, QuoteTick, TradeTick,
47        prices::{IndexPriceUpdate, MarkPriceUpdate},
48    },
49    enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
50    identifiers::{
51        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
52        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
53    },
54    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
55    orderbook::{OrderBook, own::OwnOrderBook},
56    orders::{Order, OrderAny, OrderList},
57    position::Position,
58    types::{Currency, Money, Price, Quantity},
59};
60use ustr::Ustr;
61
62use crate::xrate::get_exchange_rate;
63
64/// A common in-memory `Cache` for market and execution related data.
65pub struct Cache {
66    config: CacheConfig,
67    index: CacheIndex,
68    database: Option<Box<dyn CacheDatabaseAdapter>>,
69    general: HashMap<String, Bytes>,
70    currencies: HashMap<Ustr, Currency>,
71    instruments: HashMap<InstrumentId, InstrumentAny>,
72    synthetics: HashMap<InstrumentId, SyntheticInstrument>,
73    books: HashMap<InstrumentId, OrderBook>,
74    own_books: HashMap<InstrumentId, OwnOrderBook>,
75    quotes: HashMap<InstrumentId, VecDeque<QuoteTick>>,
76    trades: HashMap<InstrumentId, VecDeque<TradeTick>>,
77    mark_xrates: HashMap<(Currency, Currency), f64>,
78    mark_prices: HashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
79    index_prices: HashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
80    bars: HashMap<BarType, VecDeque<Bar>>,
81    accounts: HashMap<AccountId, AccountAny>,
82    orders: HashMap<ClientOrderId, OrderAny>,
83    order_lists: HashMap<OrderListId, OrderList>,
84    positions: HashMap<PositionId, Position>,
85    position_snapshots: HashMap<PositionId, Bytes>,
86}
87
88impl Default for Cache {
89    /// Creates a new default [`Cache`] instance.
90    fn default() -> Self {
91        Self::new(Some(CacheConfig::default()), None)
92    }
93}
94
95impl Cache {
96    /// Creates a new [`Cache`] instance.
97    #[must_use]
98    pub fn new(
99        config: Option<CacheConfig>,
100        database: Option<Box<dyn CacheDatabaseAdapter>>,
101    ) -> Self {
102        Self {
103            config: config.unwrap_or_default(),
104            index: CacheIndex::default(),
105            database,
106            general: HashMap::new(),
107            currencies: HashMap::new(),
108            instruments: HashMap::new(),
109            synthetics: HashMap::new(),
110            books: HashMap::new(),
111            own_books: HashMap::new(),
112            quotes: HashMap::new(),
113            trades: HashMap::new(),
114            mark_xrates: HashMap::new(),
115            mark_prices: HashMap::new(),
116            index_prices: HashMap::new(),
117            bars: HashMap::new(),
118            accounts: HashMap::new(),
119            orders: HashMap::new(),
120            order_lists: HashMap::new(),
121            positions: HashMap::new(),
122            position_snapshots: HashMap::new(),
123        }
124    }
125
126    /// Returns the cache instances memory address.
127    #[must_use]
128    pub fn memory_address(&self) -> String {
129        format!("{:?}", std::ptr::from_ref(self))
130    }
131
132    // -- COMMANDS --------------------------------------------------------------------------------
133
134    /// Clears the current general cache and loads the general objects from the cache database.
135    pub fn cache_general(&mut self) -> anyhow::Result<()> {
136        self.general = match &mut self.database {
137            Some(db) => db.load()?,
138            None => HashMap::new(),
139        };
140
141        log::info!(
142            "Cached {} general object(s) from database",
143            self.general.len()
144        );
145        Ok(())
146    }
147
148    /// Loads all caches (currencies, instruments, synthetics, accounts, orders, positions) from the database.
149    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
150        let cache_map = match &self.database {
151            Some(db) => db.load_all().await?,
152            None => CacheMap::default(),
153        };
154
155        self.currencies = cache_map.currencies;
156        self.instruments = cache_map.instruments;
157        self.synthetics = cache_map.synthetics;
158        self.accounts = cache_map.accounts;
159        self.orders = cache_map.orders;
160        self.positions = cache_map.positions;
161        Ok(())
162    }
163
164    /// Clears the current currencies cache and loads currencies from the cache database.
165    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
166        self.currencies = match &mut self.database {
167            Some(db) => db.load_currencies().await?,
168            None => HashMap::new(),
169        };
170
171        log::info!("Cached {} currencies from database", self.general.len());
172        Ok(())
173    }
174
175    /// Clears the current instruments cache and loads instruments from the cache database.
176    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
177        self.instruments = match &mut self.database {
178            Some(db) => db.load_instruments().await?,
179            None => HashMap::new(),
180        };
181
182        log::info!("Cached {} instruments from database", self.general.len());
183        Ok(())
184    }
185
186    /// Clears the current synthetic instruments cache and loads synthetic instruments from the cache
187    /// database.
188    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
189        self.synthetics = match &mut self.database {
190            Some(db) => db.load_synthetics().await?,
191            None => HashMap::new(),
192        };
193
194        log::info!(
195            "Cached {} synthetic instruments from database",
196            self.general.len()
197        );
198        Ok(())
199    }
200
201    /// Clears the current accounts cache and loads accounts from the cache database.
202    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
203        self.accounts = match &mut self.database {
204            Some(db) => db.load_accounts().await?,
205            None => HashMap::new(),
206        };
207
208        log::info!(
209            "Cached {} synthetic instruments from database",
210            self.general.len()
211        );
212        Ok(())
213    }
214
215    /// Clears the current orders cache and loads orders from the cache database.
216    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
217        self.orders = match &mut self.database {
218            Some(db) => db.load_orders().await?,
219            None => HashMap::new(),
220        };
221
222        log::info!("Cached {} orders from database", self.general.len());
223        Ok(())
224    }
225
226    /// Clears the current positions cache and loads positions from the cache database.
227    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
228        self.positions = match &mut self.database {
229            Some(db) => db.load_positions().await?,
230            None => HashMap::new(),
231        };
232
233        log::info!("Cached {} positions from database", self.general.len());
234        Ok(())
235    }
236
237    /// Clears the current cache index and re-build.
238    pub fn build_index(&mut self) {
239        log::debug!("Building index");
240
241        // Index accounts
242        for account_id in self.accounts.keys() {
243            self.index
244                .venue_account
245                .insert(account_id.get_issuer(), *account_id);
246        }
247
248        // Index orders
249        for (client_order_id, order) in &self.orders {
250            let instrument_id = order.instrument_id();
251            let venue = instrument_id.venue;
252            let strategy_id = order.strategy_id();
253
254            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
255            self.index
256                .venue_orders
257                .entry(venue)
258                .or_default()
259                .insert(*client_order_id);
260
261            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
262            if let Some(venue_order_id) = order.venue_order_id() {
263                self.index
264                    .venue_order_ids
265                    .insert(venue_order_id, *client_order_id);
266            }
267
268            // 3: Build index.order_position -> {ClientOrderId, PositionId}
269            if let Some(position_id) = order.position_id() {
270                self.index
271                    .order_position
272                    .insert(*client_order_id, position_id);
273            }
274
275            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
276            self.index
277                .order_strategy
278                .insert(*client_order_id, order.strategy_id());
279
280            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
281            self.index
282                .instrument_orders
283                .entry(instrument_id)
284                .or_default()
285                .insert(*client_order_id);
286
287            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
288            self.index
289                .strategy_orders
290                .entry(strategy_id)
291                .or_default()
292                .insert(*client_order_id);
293
294            // 7: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
295            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
296                self.index
297                    .exec_algorithm_orders
298                    .entry(exec_algorithm_id)
299                    .or_default()
300                    .insert(*client_order_id);
301            }
302
303            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
304            if let Some(exec_spawn_id) = order.exec_spawn_id() {
305                self.index
306                    .exec_spawn_orders
307                    .entry(exec_spawn_id)
308                    .or_default()
309                    .insert(*client_order_id);
310            }
311
312            // 9: Build index.orders -> {ClientOrderId}
313            self.index.orders.insert(*client_order_id);
314
315            // 10: Build index.orders_open -> {ClientOrderId}
316            if order.is_open() {
317                self.index.orders_open.insert(*client_order_id);
318            }
319
320            // 11: Build index.orders_closed -> {ClientOrderId}
321            if order.is_closed() {
322                self.index.orders_closed.insert(*client_order_id);
323            }
324
325            // 12: Build index.orders_emulated -> {ClientOrderId}
326            if let Some(emulation_trigger) = order.emulation_trigger() {
327                if emulation_trigger != TriggerType::NoTrigger && !order.is_closed() {
328                    self.index.orders_emulated.insert(*client_order_id);
329                }
330            }
331
332            // 13: Build index.orders_inflight -> {ClientOrderId}
333            if order.is_inflight() {
334                self.index.orders_inflight.insert(*client_order_id);
335            }
336
337            // 14: Build index.strategies -> {StrategyId}
338            self.index.strategies.insert(strategy_id);
339
340            // 15: Build index.strategies -> {ExecAlgorithmId}
341            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
342                self.index.exec_algorithms.insert(exec_algorithm_id);
343            }
344        }
345
346        // Index positions
347        for (position_id, position) in &self.positions {
348            let instrument_id = position.instrument_id;
349            let venue = instrument_id.venue;
350            let strategy_id = position.strategy_id;
351
352            // 1: Build index.venue_positions -> {Venue, {PositionId}}
353            self.index
354                .venue_positions
355                .entry(venue)
356                .or_default()
357                .insert(*position_id);
358
359            // 2: Build index.position_strategy -> {PositionId, StrategyId}
360            self.index
361                .position_strategy
362                .insert(*position_id, position.strategy_id);
363
364            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
365            self.index
366                .position_orders
367                .entry(*position_id)
368                .or_default()
369                .extend(position.client_order_ids().into_iter());
370
371            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
372            self.index
373                .instrument_positions
374                .entry(instrument_id)
375                .or_default()
376                .insert(*position_id);
377
378            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
379            self.index
380                .strategy_positions
381                .entry(strategy_id)
382                .or_default()
383                .insert(*position_id);
384
385            // 6: Build index.positions -> {PositionId}
386            self.index.positions.insert(*position_id);
387
388            // 7: Build index.positions_open -> {PositionId}
389            if position.is_open() {
390                self.index.positions_open.insert(*position_id);
391            }
392
393            // 8: Build index.positions_closed -> {PositionId}
394            if position.is_closed() {
395                self.index.positions_closed.insert(*position_id);
396            }
397
398            // 9: Build index.strategies -> {StrategyId}
399            self.index.strategies.insert(strategy_id);
400        }
401    }
402
403    /// Returns whether the cache has a backing database.
404    #[must_use]
405    pub const fn has_backing(&self) -> bool {
406        self.config.database.is_some()
407    }
408
409    // Calculate the unrealized profit and loss (PnL) for a given position.
410    #[must_use]
411    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
412        let quote = if let Some(quote) = self.quote(&position.instrument_id) {
413            quote
414        } else {
415            log::warn!(
416                "Cannot calculate unrealized PnL for {}, no quotes for {}",
417                position.id,
418                position.instrument_id
419            );
420            return None;
421        };
422
423        let last = match position.side {
424            PositionSide::Flat | PositionSide::NoPositionSide => {
425                return Some(Money::new(0.0, position.settlement_currency));
426            }
427            PositionSide::Long => quote.ask_price,
428            PositionSide::Short => quote.bid_price,
429        };
430
431        Some(position.unrealized_pnl(last))
432    }
433
434    /// Checks integrity of data within the cache.
435    ///
436    /// All data should be loaded from the database prior to this call.
437    /// If an error is found then a log error message will also be produced.
438    #[must_use]
439    pub fn check_integrity(&mut self) -> bool {
440        let mut error_count = 0;
441        let failure = "Integrity failure";
442
443        // Get current timestamp in microseconds
444        let timestamp_us = SystemTime::now()
445            .duration_since(UNIX_EPOCH)
446            .expect("Time went backwards")
447            .as_micros();
448
449        log::info!("Checking data integrity");
450
451        // Check object caches
452        for account_id in self.accounts.keys() {
453            if !self
454                .index
455                .venue_account
456                .contains_key(&account_id.get_issuer())
457            {
458                log::error!(
459                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
460                );
461                error_count += 1;
462            }
463        }
464
465        for (client_order_id, order) in &self.orders {
466            if !self.index.order_strategy.contains_key(client_order_id) {
467                log::error!(
468                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
469                );
470                error_count += 1;
471            }
472            if !self.index.orders.contains(client_order_id) {
473                log::error!(
474                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
475                );
476                error_count += 1;
477            }
478            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
479                log::error!(
480                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
481                );
482                error_count += 1;
483            }
484            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
485                log::error!(
486                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
487                );
488                error_count += 1;
489            }
490            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
491                log::error!(
492                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
493                );
494                error_count += 1;
495            }
496            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
497                if !self
498                    .index
499                    .exec_algorithm_orders
500                    .contains_key(&exec_algorithm_id)
501                {
502                    log::error!(
503                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
504                    );
505                    error_count += 1;
506                }
507                if order.exec_spawn_id().is_none()
508                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
509                {
510                    log::error!(
511                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
512                    );
513                    error_count += 1;
514                }
515            }
516        }
517
518        for (position_id, position) in &self.positions {
519            if !self.index.position_strategy.contains_key(position_id) {
520                log::error!(
521                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
522                );
523                error_count += 1;
524            }
525            if !self.index.position_orders.contains_key(position_id) {
526                log::error!(
527                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
528                );
529                error_count += 1;
530            }
531            if !self.index.positions.contains(position_id) {
532                log::error!(
533                    "{failure} in positions: {position_id} not found in `self.index.positions`",
534                );
535                error_count += 1;
536            }
537            if position.is_open() && !self.index.positions_open.contains(position_id) {
538                log::error!(
539                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
540                );
541                error_count += 1;
542            }
543            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
544                log::error!(
545                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
546                );
547                error_count += 1;
548            }
549        }
550
551        // Check indexes
552        for account_id in self.index.venue_account.values() {
553            if !self.accounts.contains_key(account_id) {
554                log::error!(
555                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
556                );
557                error_count += 1;
558            }
559        }
560
561        for client_order_id in self.index.venue_order_ids.values() {
562            if !self.orders.contains_key(client_order_id) {
563                log::error!(
564                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
565                );
566                error_count += 1;
567            }
568        }
569
570        for client_order_id in self.index.client_order_ids.keys() {
571            if !self.orders.contains_key(client_order_id) {
572                log::error!(
573                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
574                );
575                error_count += 1;
576            }
577        }
578
579        for client_order_id in self.index.order_position.keys() {
580            if !self.orders.contains_key(client_order_id) {
581                log::error!(
582                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
583                );
584                error_count += 1;
585            }
586        }
587
588        // Check indexes
589        for client_order_id in self.index.order_strategy.keys() {
590            if !self.orders.contains_key(client_order_id) {
591                log::error!(
592                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
593                );
594                error_count += 1;
595            }
596        }
597
598        for position_id in self.index.position_strategy.keys() {
599            if !self.positions.contains_key(position_id) {
600                log::error!(
601                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
602                );
603                error_count += 1;
604            }
605        }
606
607        for position_id in self.index.position_orders.keys() {
608            if !self.positions.contains_key(position_id) {
609                log::error!(
610                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
611                );
612                error_count += 1;
613            }
614        }
615
616        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
617            for client_order_id in client_order_ids {
618                if !self.orders.contains_key(client_order_id) {
619                    log::error!(
620                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
621                    );
622                    error_count += 1;
623                }
624            }
625        }
626
627        for instrument_id in self.index.instrument_positions.keys() {
628            if !self.index.instrument_orders.contains_key(instrument_id) {
629                log::error!(
630                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
631                );
632                error_count += 1;
633            }
634        }
635
636        for client_order_ids in self.index.strategy_orders.values() {
637            for client_order_id in client_order_ids {
638                if !self.orders.contains_key(client_order_id) {
639                    log::error!(
640                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
641                    );
642                    error_count += 1;
643                }
644            }
645        }
646
647        for position_ids in self.index.strategy_positions.values() {
648            for position_id in position_ids {
649                if !self.positions.contains_key(position_id) {
650                    log::error!(
651                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
652                    );
653                    error_count += 1;
654                }
655            }
656        }
657
658        for client_order_id in &self.index.orders {
659            if !self.orders.contains_key(client_order_id) {
660                log::error!(
661                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
662                );
663                error_count += 1;
664            }
665        }
666
667        for client_order_id in &self.index.orders_emulated {
668            if !self.orders.contains_key(client_order_id) {
669                log::error!(
670                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
671                );
672                error_count += 1;
673            }
674        }
675
676        for client_order_id in &self.index.orders_inflight {
677            if !self.orders.contains_key(client_order_id) {
678                log::error!(
679                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
680                );
681                error_count += 1;
682            }
683        }
684
685        for client_order_id in &self.index.orders_open {
686            if !self.orders.contains_key(client_order_id) {
687                log::error!(
688                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
689                );
690                error_count += 1;
691            }
692        }
693
694        for client_order_id in &self.index.orders_closed {
695            if !self.orders.contains_key(client_order_id) {
696                log::error!(
697                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
698                );
699                error_count += 1;
700            }
701        }
702
703        for position_id in &self.index.positions {
704            if !self.positions.contains_key(position_id) {
705                log::error!(
706                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
707                );
708                error_count += 1;
709            }
710        }
711
712        for position_id in &self.index.positions_open {
713            if !self.positions.contains_key(position_id) {
714                log::error!(
715                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
716                );
717                error_count += 1;
718            }
719        }
720
721        for position_id in &self.index.positions_closed {
722            if !self.positions.contains_key(position_id) {
723                log::error!(
724                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
725                );
726                error_count += 1;
727            }
728        }
729
730        for strategy_id in &self.index.strategies {
731            if !self.index.strategy_orders.contains_key(strategy_id) {
732                log::error!(
733                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
734                );
735                error_count += 1;
736            }
737        }
738
739        for exec_algorithm_id in &self.index.exec_algorithms {
740            if !self
741                .index
742                .exec_algorithm_orders
743                .contains_key(exec_algorithm_id)
744            {
745                log::error!(
746                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
747                );
748                error_count += 1;
749            }
750        }
751
752        let total_us = SystemTime::now()
753            .duration_since(UNIX_EPOCH)
754            .expect("Time went backwards")
755            .as_micros()
756            - timestamp_us;
757
758        if error_count == 0 {
759            log::info!("Integrity check passed in {total_us}μs");
760            true
761        } else {
762            log::error!(
763                "Integrity check failed with {error_count} error{} in {total_us}μs",
764                if error_count == 1 { "" } else { "s" },
765            );
766            false
767        }
768    }
769
770    /// Checks for any residual open state and log warnings if any are found.
771    ///
772    ///'Open state' is considered to be open orders and open positions.
773    #[must_use]
774    pub fn check_residuals(&self) -> bool {
775        log::debug!("Checking residuals");
776
777        let mut residuals = false;
778
779        // Check for any open orders
780        for order in self.orders_open(None, None, None, None) {
781            residuals = true;
782            log::warn!("Residual {order:?}");
783        }
784
785        // Check for any open positions
786        for position in self.positions_open(None, None, None, None) {
787            residuals = true;
788            log::warn!("Residual {position}");
789        }
790
791        residuals
792    }
793
794    /// Purges all closed orders from the cache that are older than the given buffer time.
795    ///
796    ///
797    /// Only orders that have been closed for at least this amount of time will be purged.
798    /// A value of 0 means purge all closed orders regardless of when they were closed.
799    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
800        log::debug!(
801            "Purging closed orders{}",
802            if buffer_secs > 0 {
803                format!(" with buffer_secs={buffer_secs}")
804            } else {
805                String::new()
806            }
807        );
808
809        let buffer_ns = secs_to_nanos(buffer_secs as f64);
810
811        for client_order_id in self.index.orders_closed.clone() {
812            if let Some(order) = self.orders.get(&client_order_id) {
813                if let Some(ts_closed) = order.ts_closed() {
814                    if ts_closed + buffer_ns <= ts_now {
815                        self.purge_order(client_order_id);
816                    }
817                }
818            }
819        }
820    }
821
822    /// Purges all closed positions from the cache that are older than the given buffer time.
823    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
824        log::debug!(
825            "Purging closed positions{}",
826            if buffer_secs > 0 {
827                format!(" with buffer_secs={buffer_secs}")
828            } else {
829                String::new()
830            }
831        );
832
833        let buffer_ns = secs_to_nanos(buffer_secs as f64);
834
835        for position_id in self.index.positions_closed.clone() {
836            if let Some(position) = self.positions.get(&position_id) {
837                if let Some(ts_closed) = position.ts_closed {
838                    if ts_closed + buffer_ns <= ts_now {
839                        self.purge_position(position_id);
840                    }
841                }
842            }
843        }
844    }
845
846    /// Purges the order with the given client order ID from the cache (if found).
847    ///
848    /// All `OrderFilled` events for the order will also be purged from any associated position.
849    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
850        if let Some(order) = self.orders.remove(&client_order_id) {
851            // Remove order from venue index
852            if let Some(venue_orders) = self
853                .index
854                .venue_orders
855                .get_mut(&order.instrument_id().venue)
856            {
857                venue_orders.remove(&client_order_id);
858            }
859
860            // Remove venue order ID index if exists
861            if let Some(venue_order_id) = order.venue_order_id() {
862                self.index.venue_order_ids.remove(&venue_order_id);
863            }
864
865            // Remove from instrument orders index
866            if let Some(instrument_orders) =
867                self.index.instrument_orders.get_mut(&order.instrument_id())
868            {
869                instrument_orders.remove(&client_order_id);
870            }
871
872            // Remove from position orders index if associated with a position
873            if let Some(position_id) = order.position_id() {
874                if let Some(position_orders) = self.index.position_orders.get_mut(&position_id) {
875                    position_orders.remove(&client_order_id);
876                }
877            }
878
879            // Remove from exec algorithm orders index if it has an exec algorithm
880            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
881                if let Some(exec_algorithm_orders) =
882                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
883                {
884                    exec_algorithm_orders.remove(&client_order_id);
885                }
886            }
887
888            log::info!("Purged order {client_order_id}");
889        } else {
890            log::warn!("Order {client_order_id} not found when purging");
891        }
892
893        // Remove from all other index collections regardless of whether order was found
894        self.index.order_position.remove(&client_order_id);
895        self.index.order_strategy.remove(&client_order_id);
896        self.index.order_client.remove(&client_order_id);
897        self.index.client_order_ids.remove(&client_order_id);
898        self.index.exec_spawn_orders.remove(&client_order_id);
899        self.index.orders.remove(&client_order_id);
900        self.index.orders_closed.remove(&client_order_id);
901        self.index.orders_emulated.remove(&client_order_id);
902        self.index.orders_inflight.remove(&client_order_id);
903        self.index.orders_pending_cancel.remove(&client_order_id);
904
905        // Purge events from associated position if exists
906        if let Some(position_id) = self.index.order_position.get(&client_order_id) {
907            if let Some(position) = self.positions.get_mut(position_id) {
908                position.purge_events_for_order(client_order_id);
909            }
910        }
911    }
912
913    /// Purges the position with the given position ID from the cache (if found).
914    pub fn purge_position(&mut self, position_id: PositionId) {
915        if let Some(position) = self.positions.remove(&position_id) {
916            // Remove from venue positions index
917            if let Some(venue_positions) = self
918                .index
919                .venue_positions
920                .get_mut(&position.instrument_id.venue)
921            {
922                venue_positions.remove(&position_id);
923            }
924
925            // Remove from instrument positions index
926            if let Some(instrument_positions) = self
927                .index
928                .instrument_positions
929                .get_mut(&position.instrument_id)
930            {
931                instrument_positions.remove(&position_id);
932            }
933
934            // Remove from strategy positions index
935            if let Some(strategy_positions) =
936                self.index.strategy_positions.get_mut(&position.strategy_id)
937            {
938                strategy_positions.remove(&position_id);
939            }
940
941            // Remove position ID from orders that reference it
942            for client_order_id in position.client_order_ids() {
943                self.index.order_position.remove(&client_order_id);
944            }
945
946            log::info!("Purged position {position_id}");
947        } else {
948            log::warn!("Position {position_id} not found when purging");
949        }
950
951        // Remove from all other index collections regardless of whether position was found
952        self.index.position_strategy.remove(&position_id);
953        self.index.position_orders.remove(&position_id);
954        self.index.positions.remove(&position_id);
955        self.index.positions_open.remove(&position_id);
956        self.index.positions_closed.remove(&position_id);
957    }
958
959    /// Purges all account state events which are outside the lookback window.
960    ///
961    /// Only events which are outside the lookback window will be purged.
962    /// A value of 0 means purge all account state events.
963    pub fn purge_account_events(&mut self, _ts_now: UnixNanos, lookback_secs: u64) {
964        log::debug!(
965            "Purging account events{}",
966            if lookback_secs > 0 {
967                format!(" with lookback_secs={lookback_secs}")
968            } else {
969                String::new()
970            }
971        );
972
973        // TODO: Implement purging of account state events
974        // for account in self.accounts.values_mut() {
975        //     let event_count = account.event_count();
976        //     account.purge_account_events(ts_now, lookback_secs);
977        //     let count_diff = event_count - account.event_count();
978        //     if count_diff > 0 {
979        //         log::info!(
980        //             "Purged {} event(s) from account {}",
981        //             count_diff,
982        //             account.id()
983        //         );
984        //     }
985        // }
986    }
987
988    /// Clears the caches index.
989    pub fn clear_index(&mut self) {
990        self.index.clear();
991        log::debug!("Cleared index");
992    }
993
994    /// Resets the cache.
995    ///
996    /// All stateful fields are reset to their initial value.
997    pub fn reset(&mut self) {
998        log::debug!("Resetting cache");
999
1000        self.general.clear();
1001        self.currencies.clear();
1002        self.instruments.clear();
1003        self.synthetics.clear();
1004        self.books.clear();
1005        self.own_books.clear();
1006        self.quotes.clear();
1007        self.trades.clear();
1008        self.mark_xrates.clear();
1009        self.mark_prices.clear();
1010        self.index_prices.clear();
1011        self.bars.clear();
1012        self.accounts.clear();
1013        self.orders.clear();
1014        self.order_lists.clear();
1015        self.positions.clear();
1016        self.position_snapshots.clear();
1017
1018        self.clear_index();
1019
1020        log::info!("Reset cache");
1021    }
1022
1023    /// Dispose of the cache which will close any underlying database adapter.
1024    pub fn dispose(&mut self) {
1025        if let Some(database) = &mut self.database {
1026            database.close().expect("Failed to close database");
1027        }
1028    }
1029
1030    /// Flushes the caches database which permanently removes all persisted data.
1031    pub fn flush_db(&mut self) {
1032        if let Some(database) = &mut self.database {
1033            database.flush().expect("Failed to flush database");
1034        }
1035    }
1036
1037    /// Adds a general object `value` (as bytes) to the cache at the given `key`.
1038    ///
1039    /// The cache is agnostic to what the bytes actually represent (and how it may be serialized),
1040    /// which provides maximum flexibility.
1041    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1042        check_valid_string(key, stringify!(key)).expect(FAILED);
1043        check_predicate_false(value.is_empty(), stringify!(value)).expect(FAILED);
1044
1045        log::debug!("Adding general {key}");
1046        self.general.insert(key.to_string(), value.clone());
1047
1048        if let Some(database) = &mut self.database {
1049            database.add(key.to_string(), value)?;
1050        }
1051        Ok(())
1052    }
1053
1054    /// Adds the given order `book` to the cache.
1055    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1056        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1057
1058        if self.config.save_market_data {
1059            if let Some(database) = &mut self.database {
1060                database.add_order_book(&book)?;
1061            }
1062        }
1063
1064        self.books.insert(book.instrument_id, book);
1065        Ok(())
1066    }
1067
1068    /// Adds the given `own_book` to the cache.
1069    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1070        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1071
1072        self.own_books.insert(own_book.instrument_id, own_book);
1073        Ok(())
1074    }
1075
1076    /// Adds the given `mark_price` update for the given `instrument_id` to the cache.
1077    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1078        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1079
1080        if self.config.save_market_data {
1081            // TODO: Placeholder and return Result for consistency
1082        }
1083
1084        let mark_prices_deque = self
1085            .mark_prices
1086            .entry(mark_price.instrument_id)
1087            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1088        mark_prices_deque.push_front(mark_price);
1089        Ok(())
1090    }
1091
1092    /// Adds the given `index_price` update for the given `instrument_id` to the cache.
1093    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1094        log::debug!(
1095            "Adding `IndexPriceUpdate` for {}",
1096            index_price.instrument_id
1097        );
1098
1099        if self.config.save_market_data {
1100            // TODO: Placeholder and return Result for consistency
1101        }
1102
1103        let index_prices_deque = self
1104            .index_prices
1105            .entry(index_price.instrument_id)
1106            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1107        index_prices_deque.push_front(index_price);
1108        Ok(())
1109    }
1110
1111    /// Adds the given `quote` tick to the cache.
1112    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1113        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1114
1115        if self.config.save_market_data {
1116            if let Some(database) = &mut self.database {
1117                database.add_quote(&quote)?;
1118            }
1119        }
1120
1121        let quotes_deque = self
1122            .quotes
1123            .entry(quote.instrument_id)
1124            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1125        quotes_deque.push_front(quote);
1126        Ok(())
1127    }
1128
1129    /// Adds the given `quotes` to the cache.
1130    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1131        check_slice_not_empty(quotes, stringify!(quotes)).unwrap();
1132
1133        let instrument_id = quotes[0].instrument_id;
1134        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1135
1136        if self.config.save_market_data {
1137            if let Some(database) = &mut self.database {
1138                for quote in quotes {
1139                    database.add_quote(quote).unwrap();
1140                }
1141            }
1142        }
1143
1144        let quotes_deque = self
1145            .quotes
1146            .entry(instrument_id)
1147            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1148
1149        for quote in quotes {
1150            quotes_deque.push_front(*quote);
1151        }
1152        Ok(())
1153    }
1154
1155    /// Adds the given `trade` tick to the cache.
1156    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1157        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1158
1159        if self.config.save_market_data {
1160            if let Some(database) = &mut self.database {
1161                database.add_trade(&trade)?;
1162            }
1163        }
1164
1165        let trades_deque = self
1166            .trades
1167            .entry(trade.instrument_id)
1168            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1169        trades_deque.push_front(trade);
1170        Ok(())
1171    }
1172
1173    /// Adds the give `trades` to the cache.
1174    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1175        check_slice_not_empty(trades, stringify!(trades)).unwrap();
1176
1177        let instrument_id = trades[0].instrument_id;
1178        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1179
1180        if self.config.save_market_data {
1181            if let Some(database) = &mut self.database {
1182                for trade in trades {
1183                    database.add_trade(trade).unwrap();
1184                }
1185            }
1186        }
1187
1188        let trades_deque = self
1189            .trades
1190            .entry(instrument_id)
1191            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1192
1193        for trade in trades {
1194            trades_deque.push_front(*trade);
1195        }
1196        Ok(())
1197    }
1198
1199    /// Adds the given `bar` to the cache.
1200    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1201        log::debug!("Adding `Bar` {}", bar.bar_type);
1202
1203        if self.config.save_market_data {
1204            if let Some(database) = &mut self.database {
1205                database.add_bar(&bar)?;
1206            }
1207        }
1208
1209        let bars = self
1210            .bars
1211            .entry(bar.bar_type)
1212            .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1213        bars.push_front(bar);
1214        Ok(())
1215    }
1216
1217    /// Adds the given `bars` to the cache.
1218    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1219        check_slice_not_empty(bars, stringify!(bars)).unwrap();
1220
1221        let bar_type = bars[0].bar_type;
1222        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1223
1224        if self.config.save_market_data {
1225            if let Some(database) = &mut self.database {
1226                for bar in bars {
1227                    database.add_bar(bar).unwrap();
1228                }
1229            }
1230        }
1231
1232        let bars_deque = self
1233            .bars
1234            .entry(bar_type)
1235            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1236
1237        for bar in bars {
1238            bars_deque.push_front(*bar);
1239        }
1240        Ok(())
1241    }
1242
1243    /// Adds the given `currency` to the cache.
1244    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1245        log::debug!("Adding `Currency` {}", currency.code);
1246
1247        if let Some(database) = &mut self.database {
1248            database.add_currency(&currency)?;
1249        }
1250
1251        self.currencies.insert(currency.code, currency);
1252        Ok(())
1253    }
1254
1255    /// Adds the given `instrument` to the cache.
1256    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1257        log::debug!("Adding `Instrument` {}", instrument.id());
1258
1259        if let Some(database) = &mut self.database {
1260            database.add_instrument(&instrument)?;
1261        }
1262
1263        self.instruments.insert(instrument.id(), instrument);
1264        Ok(())
1265    }
1266
1267    /// Adds the given `synthetic` instrument to the cache.
1268    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1269        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1270
1271        if let Some(database) = &mut self.database {
1272            database.add_synthetic(&synthetic)?;
1273        }
1274
1275        self.synthetics.insert(synthetic.id, synthetic);
1276        Ok(())
1277    }
1278
1279    /// Adds the given `account` to the cache.
1280    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1281        log::debug!("Adding `Account` {}", account.id());
1282
1283        if let Some(database) = &mut self.database {
1284            database.add_account(&account)?;
1285        }
1286
1287        let account_id = account.id();
1288        self.accounts.insert(account_id, account);
1289        self.index
1290            .venue_account
1291            .insert(account_id.get_issuer(), account_id);
1292        Ok(())
1293    }
1294
1295    /// Indexes the given `client_order_id` with the given `venue_order_id`.
1296    ///
1297    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
1298    pub fn add_venue_order_id(
1299        &mut self,
1300        client_order_id: &ClientOrderId,
1301        venue_order_id: &VenueOrderId,
1302        overwrite: bool,
1303    ) -> anyhow::Result<()> {
1304        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id) {
1305            if !overwrite && existing_venue_order_id != venue_order_id {
1306                anyhow::bail!(
1307                    "Existing {existing_venue_order_id} for {client_order_id}
1308                    did not match the given {venue_order_id}.
1309                    If you are writing a test then try a different `venue_order_id`,
1310                    otherwise this is probably a bug."
1311                );
1312            }
1313        }
1314
1315        self.index
1316            .client_order_ids
1317            .insert(*client_order_id, *venue_order_id);
1318        self.index
1319            .venue_order_ids
1320            .insert(*venue_order_id, *client_order_id);
1321
1322        Ok(())
1323    }
1324
1325    /// Adds the given `order` to the cache indexed with any given identifiers.
1326    ///
1327    /// # Parameters
1328    ///
1329    /// `override_existing`: If the added order should 'override' any existing order and replace
1330    /// it in the cache. This is currently used for emulated orders which are
1331    /// being released and transformed into another type.
1332    ///
1333    /// # Errors
1334    ///
1335    /// This function returns an error:
1336    /// If not `replace_existing` and the `order.client_order_id` is already contained in the cache.
1337    pub fn add_order(
1338        &mut self,
1339        order: OrderAny,
1340        position_id: Option<PositionId>,
1341        client_id: Option<ClientId>,
1342        replace_existing: bool,
1343    ) -> anyhow::Result<()> {
1344        let instrument_id = order.instrument_id();
1345        let venue = instrument_id.venue;
1346        let client_order_id = order.client_order_id();
1347        let strategy_id = order.strategy_id();
1348        let exec_algorithm_id = order.exec_algorithm_id();
1349        let exec_spawn_id = order.exec_spawn_id();
1350
1351        if !replace_existing {
1352            check_key_not_in_map(
1353                &client_order_id,
1354                &self.orders,
1355                stringify!(client_order_id),
1356                stringify!(orders),
1357            )
1358            .expect(FAILED);
1359            check_key_not_in_map(
1360                &client_order_id,
1361                &self.orders,
1362                stringify!(client_order_id),
1363                stringify!(orders),
1364            )
1365            .expect(FAILED);
1366            check_key_not_in_map(
1367                &client_order_id,
1368                &self.orders,
1369                stringify!(client_order_id),
1370                stringify!(orders),
1371            )
1372            .expect(FAILED);
1373            check_key_not_in_map(
1374                &client_order_id,
1375                &self.orders,
1376                stringify!(client_order_id),
1377                stringify!(orders),
1378            )
1379            .expect(FAILED);
1380        }
1381
1382        log::debug!("Adding {order:?}");
1383
1384        self.index.orders.insert(client_order_id);
1385        self.index
1386            .order_strategy
1387            .insert(client_order_id, strategy_id);
1388        self.index.strategies.insert(strategy_id);
1389
1390        // Update venue -> orders index
1391        self.index
1392            .venue_orders
1393            .entry(venue)
1394            .or_default()
1395            .insert(client_order_id);
1396
1397        // Update instrument -> orders index
1398        self.index
1399            .instrument_orders
1400            .entry(instrument_id)
1401            .or_default()
1402            .insert(client_order_id);
1403
1404        // Update strategy -> orders index
1405        self.index
1406            .strategy_orders
1407            .entry(strategy_id)
1408            .or_default()
1409            .insert(client_order_id);
1410
1411        // Update exec_algorithm -> orders index
1412        if let Some(exec_algorithm_id) = exec_algorithm_id {
1413            self.index.exec_algorithms.insert(exec_algorithm_id);
1414
1415            self.index
1416                .exec_algorithm_orders
1417                .entry(exec_algorithm_id)
1418                .or_default()
1419                .insert(client_order_id);
1420
1421            self.index
1422                .exec_spawn_orders
1423                .entry(exec_spawn_id.expect("`exec_spawn_id` is guaranteed to exist"))
1424                .or_default()
1425                .insert(client_order_id);
1426        }
1427
1428        // Update emulation index
1429        match order.emulation_trigger() {
1430            Some(_) => {
1431                self.index.orders_emulated.remove(&client_order_id);
1432            }
1433            None => {
1434                self.index.orders_emulated.insert(client_order_id);
1435            }
1436        }
1437
1438        // Index position ID if provided
1439        if let Some(position_id) = position_id {
1440            self.add_position_id(
1441                &position_id,
1442                &order.instrument_id().venue,
1443                &client_order_id,
1444                &strategy_id,
1445            )?;
1446        }
1447
1448        // Index client ID if provided
1449        if let Some(client_id) = client_id {
1450            self.index.order_client.insert(client_order_id, client_id);
1451            log::debug!("Indexed {client_id:?}");
1452        }
1453
1454        if let Some(database) = &mut self.database {
1455            database.add_order(&order, client_id)?;
1456            // TODO: Implement
1457            // if self.config.snapshot_orders {
1458            //     database.snapshot_order_state(order)?;
1459            // }
1460        }
1461
1462        self.orders.insert(client_order_id, order);
1463
1464        Ok(())
1465    }
1466
1467    /// Indexes the given `position_id` with the other given IDs.
1468    pub fn add_position_id(
1469        &mut self,
1470        position_id: &PositionId,
1471        venue: &Venue,
1472        client_order_id: &ClientOrderId,
1473        strategy_id: &StrategyId,
1474    ) -> anyhow::Result<()> {
1475        self.index
1476            .order_position
1477            .insert(*client_order_id, *position_id);
1478
1479        // Index: ClientOrderId -> PositionId
1480        if let Some(database) = &mut self.database {
1481            database.index_order_position(*client_order_id, *position_id)?;
1482        }
1483
1484        // Index: PositionId -> StrategyId
1485        self.index
1486            .position_strategy
1487            .insert(*position_id, *strategy_id);
1488
1489        // Index: PositionId -> set[ClientOrderId]
1490        self.index
1491            .position_orders
1492            .entry(*position_id)
1493            .or_default()
1494            .insert(*client_order_id);
1495
1496        // Index: StrategyId -> set[PositionId]
1497        self.index
1498            .strategy_positions
1499            .entry(*strategy_id)
1500            .or_default()
1501            .insert(*position_id);
1502
1503        // Index: Venue -> set[PositionId]
1504        self.index
1505            .venue_positions
1506            .entry(*venue)
1507            .or_default()
1508            .insert(*position_id);
1509
1510        Ok(())
1511    }
1512
1513    /// Adds the given `position` to the cache.
1514    pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1515        self.positions.insert(position.id, position.clone());
1516        self.index.positions.insert(position.id);
1517        self.index.positions_open.insert(position.id);
1518
1519        log::debug!("Adding {position}");
1520
1521        self.add_position_id(
1522            &position.id,
1523            &position.instrument_id.venue,
1524            &position.opening_order_id,
1525            &position.strategy_id,
1526        )?;
1527
1528        let venue = position.instrument_id.venue;
1529        let venue_positions = self.index.venue_positions.entry(venue).or_default();
1530        venue_positions.insert(position.id);
1531
1532        // Index: InstrumentId -> HashSet
1533        let instrument_id = position.instrument_id;
1534        let instrument_positions = self
1535            .index
1536            .instrument_positions
1537            .entry(instrument_id)
1538            .or_default();
1539        instrument_positions.insert(position.id);
1540
1541        if let Some(database) = &mut self.database {
1542            database.add_position(&position)?;
1543            // TODO: Implement position snapshots
1544            // if self.snapshot_positions {
1545            //     database.snapshot_position_state(
1546            //         position,
1547            //         position.ts_last,
1548            //         self.calculate_unrealized_pnl(&position),
1549            //     )?;
1550            // }
1551        }
1552
1553        Ok(())
1554    }
1555
1556    /// Updates the given `account` in the cache.
1557    pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1558        if let Some(database) = &mut self.database {
1559            database.update_account(&account)?;
1560        }
1561        Ok(())
1562    }
1563
1564    /// Updates the given `order` in the cache.
1565    pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1566        let client_order_id = order.client_order_id();
1567
1568        // Update venue order ID
1569        if let Some(venue_order_id) = order.venue_order_id() {
1570            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
1571            // venues which use a cancel+replace update strategy.
1572            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1573                // TODO: If the last event was `OrderUpdated` then overwrite should be true
1574                self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1575            }
1576        }
1577
1578        // Update in-flight state
1579        if order.is_inflight() {
1580            self.index.orders_inflight.insert(client_order_id);
1581        } else {
1582            self.index.orders_inflight.remove(&client_order_id);
1583        }
1584
1585        // Update open/closed state
1586        if order.is_open() {
1587            self.index.orders_closed.remove(&client_order_id);
1588            self.index.orders_open.insert(client_order_id);
1589        } else if order.is_closed() {
1590            self.index.orders_open.remove(&client_order_id);
1591            self.index.orders_pending_cancel.remove(&client_order_id);
1592            self.index.orders_closed.insert(client_order_id);
1593        }
1594
1595        // Update emulation
1596        if let Some(emulation_trigger) = order.emulation_trigger() {
1597            match emulation_trigger {
1598                TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1599                _ => self.index.orders_emulated.insert(client_order_id),
1600            };
1601        }
1602
1603        if let Some(database) = &mut self.database {
1604            database.update_order(order.last_event())?;
1605            // TODO: Implement order snapshots
1606            // if self.snapshot_orders {
1607            //     database.snapshot_order_state(order)?;
1608            // }
1609        }
1610
1611        // update the order in the cache
1612        self.orders.insert(client_order_id, order.clone());
1613
1614        Ok(())
1615    }
1616
1617    /// Updates the given `order` as pending cancel locally.
1618    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1619        self.index
1620            .orders_pending_cancel
1621            .insert(order.client_order_id());
1622    }
1623
1624    /// Updates the given `position` in the cache.
1625    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1626        // Update open/closed state
1627        if position.is_open() {
1628            self.index.positions_open.insert(position.id);
1629            self.index.positions_closed.remove(&position.id);
1630        } else {
1631            self.index.positions_closed.insert(position.id);
1632            self.index.positions_open.remove(&position.id);
1633        }
1634
1635        if let Some(database) = &mut self.database {
1636            database.update_position(position)?;
1637            // TODO: Implement order snapshots
1638            // if self.snapshot_orders {
1639            //     database.snapshot_order_state(order)?;
1640            // }
1641        }
1642        Ok(())
1643    }
1644
1645    /// Creates a snapshot of the given position by cloning it, assigning a new ID,
1646    /// serializing it, and storing it in the position snapshots.
1647    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1648        let position_id = position.id;
1649
1650        let mut copied_position = position.clone();
1651        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1652        copied_position.id = PositionId::new(new_id);
1653
1654        // Serialize the position (TODO: temporily just to JSON to remove a dependency)
1655        let position_serialized = serde_json::to_vec(&copied_position)?;
1656
1657        let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1658        let new_snapshots = match snapshots {
1659            Some(existing_snapshots) => {
1660                let mut combined = existing_snapshots.to_vec();
1661                combined.extend(position_serialized);
1662                Bytes::from(combined)
1663            }
1664            None => Bytes::from(position_serialized),
1665        };
1666        self.position_snapshots.insert(position_id, new_snapshots);
1667
1668        log::debug!("Snapshot {}", copied_position);
1669        Ok(())
1670    }
1671
1672    pub fn snapshot_position_state(
1673        &mut self,
1674        position: &Position,
1675        // ts_snapshot: u64,
1676        // unrealized_pnl: Option<Money>,
1677        open_only: Option<bool>,
1678    ) -> anyhow::Result<()> {
1679        let open_only = open_only.unwrap_or(true);
1680
1681        if open_only && !position.is_open() {
1682            return Ok(());
1683        }
1684
1685        if let Some(database) = &mut self.database {
1686            database.snapshot_position_state(position).map_err(|e| {
1687                log::error!(
1688                    "Failed to snapshot position state for {}: {e:?}",
1689                    position.id
1690                );
1691                e
1692            })?;
1693        } else {
1694            log::warn!(
1695                "Cannot snapshot position state for {} (no database configured)",
1696                position.id
1697            );
1698        }
1699
1700        // Ok(())
1701        todo!()
1702    }
1703
1704    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1705        let database = if let Some(database) = &self.database {
1706            database
1707        } else {
1708            log::warn!(
1709                "Cannot snapshot order state for {} (no database configured)",
1710                order.client_order_id()
1711            );
1712            return Ok(());
1713        };
1714
1715        database.snapshot_order_state(order)
1716    }
1717
1718    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
1719
1720    fn build_order_query_filter_set(
1721        &self,
1722        venue: Option<&Venue>,
1723        instrument_id: Option<&InstrumentId>,
1724        strategy_id: Option<&StrategyId>,
1725    ) -> Option<HashSet<ClientOrderId>> {
1726        let mut query: Option<HashSet<ClientOrderId>> = None;
1727
1728        if let Some(venue) = venue {
1729            query = Some(
1730                self.index
1731                    .venue_orders
1732                    .get(venue)
1733                    .cloned()
1734                    .unwrap_or_default(),
1735            );
1736        }
1737
1738        if let Some(instrument_id) = instrument_id {
1739            let instrument_orders = self
1740                .index
1741                .instrument_orders
1742                .get(instrument_id)
1743                .cloned()
1744                .unwrap_or_default();
1745
1746            if let Some(existing_query) = &mut query {
1747                *existing_query = existing_query
1748                    .intersection(&instrument_orders)
1749                    .copied()
1750                    .collect();
1751            } else {
1752                query = Some(instrument_orders);
1753            }
1754        }
1755
1756        if let Some(strategy_id) = strategy_id {
1757            let strategy_orders = self
1758                .index
1759                .strategy_orders
1760                .get(strategy_id)
1761                .cloned()
1762                .unwrap_or_default();
1763
1764            if let Some(existing_query) = &mut query {
1765                *existing_query = existing_query
1766                    .intersection(&strategy_orders)
1767                    .copied()
1768                    .collect();
1769            } else {
1770                query = Some(strategy_orders);
1771            }
1772        }
1773
1774        query
1775    }
1776
1777    fn build_position_query_filter_set(
1778        &self,
1779        venue: Option<&Venue>,
1780        instrument_id: Option<&InstrumentId>,
1781        strategy_id: Option<&StrategyId>,
1782    ) -> Option<HashSet<PositionId>> {
1783        let mut query: Option<HashSet<PositionId>> = None;
1784
1785        if let Some(venue) = venue {
1786            query = Some(
1787                self.index
1788                    .venue_positions
1789                    .get(venue)
1790                    .cloned()
1791                    .unwrap_or_default(),
1792            );
1793        }
1794
1795        if let Some(instrument_id) = instrument_id {
1796            let instrument_positions = self
1797                .index
1798                .instrument_positions
1799                .get(instrument_id)
1800                .cloned()
1801                .unwrap_or_default();
1802
1803            if let Some(existing_query) = query {
1804                query = Some(
1805                    existing_query
1806                        .intersection(&instrument_positions)
1807                        .copied()
1808                        .collect(),
1809                );
1810            } else {
1811                query = Some(instrument_positions);
1812            }
1813        }
1814
1815        if let Some(strategy_id) = strategy_id {
1816            let strategy_positions = self
1817                .index
1818                .strategy_positions
1819                .get(strategy_id)
1820                .cloned()
1821                .unwrap_or_default();
1822
1823            if let Some(existing_query) = query {
1824                query = Some(
1825                    existing_query
1826                        .intersection(&strategy_positions)
1827                        .copied()
1828                        .collect(),
1829                );
1830            } else {
1831                query = Some(strategy_positions);
1832            }
1833        }
1834
1835        query
1836    }
1837
1838    fn get_orders_for_ids(
1839        &self,
1840        client_order_ids: &HashSet<ClientOrderId>,
1841        side: Option<OrderSide>,
1842    ) -> Vec<&OrderAny> {
1843        let side = side.unwrap_or(OrderSide::NoOrderSide);
1844        let mut orders = Vec::new();
1845
1846        for client_order_id in client_order_ids {
1847            let order = self
1848                .orders
1849                .get(client_order_id)
1850                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
1851            if side == OrderSide::NoOrderSide || side == order.order_side() {
1852                orders.push(order);
1853            }
1854        }
1855
1856        orders
1857    }
1858
1859    fn get_positions_for_ids(
1860        &self,
1861        position_ids: &HashSet<PositionId>,
1862        side: Option<PositionSide>,
1863    ) -> Vec<&Position> {
1864        let side = side.unwrap_or(PositionSide::NoPositionSide);
1865        let mut positions = Vec::new();
1866
1867        for position_id in position_ids {
1868            let position = self
1869                .positions
1870                .get(position_id)
1871                .unwrap_or_else(|| panic!("Position {position_id} not found"));
1872            if side == PositionSide::NoPositionSide || side == position.side {
1873                positions.push(position);
1874            }
1875        }
1876
1877        positions
1878    }
1879
1880    /// Returns the `ClientOrderId`s of all orders.
1881    #[must_use]
1882    pub fn client_order_ids(
1883        &self,
1884        venue: Option<&Venue>,
1885        instrument_id: Option<&InstrumentId>,
1886        strategy_id: Option<&StrategyId>,
1887    ) -> HashSet<ClientOrderId> {
1888        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1889        match query {
1890            Some(query) => self.index.orders.intersection(&query).copied().collect(),
1891            None => self.index.orders.clone(),
1892        }
1893    }
1894
1895    /// Returns the `ClientOrderId`s of all open orders.
1896    #[must_use]
1897    pub fn client_order_ids_open(
1898        &self,
1899        venue: Option<&Venue>,
1900        instrument_id: Option<&InstrumentId>,
1901        strategy_id: Option<&StrategyId>,
1902    ) -> HashSet<ClientOrderId> {
1903        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1904        match query {
1905            Some(query) => self
1906                .index
1907                .orders_open
1908                .intersection(&query)
1909                .copied()
1910                .collect(),
1911            None => self.index.orders_open.clone(),
1912        }
1913    }
1914
1915    /// Returns the `ClientOrderId`s of all closed orders.
1916    #[must_use]
1917    pub fn client_order_ids_closed(
1918        &self,
1919        venue: Option<&Venue>,
1920        instrument_id: Option<&InstrumentId>,
1921        strategy_id: Option<&StrategyId>,
1922    ) -> HashSet<ClientOrderId> {
1923        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1924        match query {
1925            Some(query) => self
1926                .index
1927                .orders_closed
1928                .intersection(&query)
1929                .copied()
1930                .collect(),
1931            None => self.index.orders_closed.clone(),
1932        }
1933    }
1934
1935    /// Returns the `ClientOrderId`s of all emulated orders.
1936    #[must_use]
1937    pub fn client_order_ids_emulated(
1938        &self,
1939        venue: Option<&Venue>,
1940        instrument_id: Option<&InstrumentId>,
1941        strategy_id: Option<&StrategyId>,
1942    ) -> HashSet<ClientOrderId> {
1943        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1944        match query {
1945            Some(query) => self
1946                .index
1947                .orders_emulated
1948                .intersection(&query)
1949                .copied()
1950                .collect(),
1951            None => self.index.orders_emulated.clone(),
1952        }
1953    }
1954
1955    /// Returns the `ClientOrderId`s of all in-flight orders.
1956    #[must_use]
1957    pub fn client_order_ids_inflight(
1958        &self,
1959        venue: Option<&Venue>,
1960        instrument_id: Option<&InstrumentId>,
1961        strategy_id: Option<&StrategyId>,
1962    ) -> HashSet<ClientOrderId> {
1963        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1964        match query {
1965            Some(query) => self
1966                .index
1967                .orders_inflight
1968                .intersection(&query)
1969                .copied()
1970                .collect(),
1971            None => self.index.orders_inflight.clone(),
1972        }
1973    }
1974
1975    /// Returns `PositionId`s of all positions.
1976    #[must_use]
1977    pub fn position_ids(
1978        &self,
1979        venue: Option<&Venue>,
1980        instrument_id: Option<&InstrumentId>,
1981        strategy_id: Option<&StrategyId>,
1982    ) -> HashSet<PositionId> {
1983        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
1984        match query {
1985            Some(query) => self.index.positions.intersection(&query).copied().collect(),
1986            None => self.index.positions.clone(),
1987        }
1988    }
1989
1990    /// Returns the `PositionId`s of all open positions.
1991    #[must_use]
1992    pub fn position_open_ids(
1993        &self,
1994        venue: Option<&Venue>,
1995        instrument_id: Option<&InstrumentId>,
1996        strategy_id: Option<&StrategyId>,
1997    ) -> HashSet<PositionId> {
1998        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
1999        match query {
2000            Some(query) => self
2001                .index
2002                .positions_open
2003                .intersection(&query)
2004                .copied()
2005                .collect(),
2006            None => self.index.positions_open.clone(),
2007        }
2008    }
2009
2010    /// Returns the `PositionId`s of all closed positions.
2011    #[must_use]
2012    pub fn position_closed_ids(
2013        &self,
2014        venue: Option<&Venue>,
2015        instrument_id: Option<&InstrumentId>,
2016        strategy_id: Option<&StrategyId>,
2017    ) -> HashSet<PositionId> {
2018        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2019        match query {
2020            Some(query) => self
2021                .index
2022                .positions_closed
2023                .intersection(&query)
2024                .copied()
2025                .collect(),
2026            None => self.index.positions_closed.clone(),
2027        }
2028    }
2029
2030    /// Returns the `ComponentId`s of all actors.
2031    #[must_use]
2032    pub fn actor_ids(&self) -> HashSet<ComponentId> {
2033        self.index.actors.clone()
2034    }
2035
2036    /// Returns the `StrategyId`s of all strategies.
2037    #[must_use]
2038    pub fn strategy_ids(&self) -> HashSet<StrategyId> {
2039        self.index.strategies.clone()
2040    }
2041
2042    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
2043    #[must_use]
2044    pub fn exec_algorithm_ids(&self) -> HashSet<ExecAlgorithmId> {
2045        self.index.exec_algorithms.clone()
2046    }
2047
2048    // -- ORDER QUERIES ---------------------------------------------------------------------------
2049
2050    /// Gets a reference to the order with the given `client_order_id` (if found).
2051    #[must_use]
2052    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2053        self.orders.get(client_order_id)
2054    }
2055
2056    /// Gets a reference to the order with the given `client_order_id` (if found).
2057    #[must_use]
2058    pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2059        self.orders.get_mut(client_order_id)
2060    }
2061
2062    /// Gets a reference to the client order ID for given `venue_order_id` (if found).
2063    #[must_use]
2064    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2065        self.index.venue_order_ids.get(venue_order_id)
2066    }
2067
2068    /// Gets a reference to the venue order ID for given `client_order_id` (if found).
2069    #[must_use]
2070    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2071        self.index.client_order_ids.get(client_order_id)
2072    }
2073
2074    /// Gets a reference to the client ID indexed for given `client_order_id` (if found).
2075    #[must_use]
2076    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2077        self.index.order_client.get(client_order_id)
2078    }
2079
2080    /// Returns references to all orders matching the given optional filter parameters.
2081    #[must_use]
2082    pub fn orders(
2083        &self,
2084        venue: Option<&Venue>,
2085        instrument_id: Option<&InstrumentId>,
2086        strategy_id: Option<&StrategyId>,
2087        side: Option<OrderSide>,
2088    ) -> Vec<&OrderAny> {
2089        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2090        self.get_orders_for_ids(&client_order_ids, side)
2091    }
2092
2093    /// Returns references to all open orders matching the given optional filter parameters.
2094    #[must_use]
2095    pub fn orders_open(
2096        &self,
2097        venue: Option<&Venue>,
2098        instrument_id: Option<&InstrumentId>,
2099        strategy_id: Option<&StrategyId>,
2100        side: Option<OrderSide>,
2101    ) -> Vec<&OrderAny> {
2102        let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2103        self.get_orders_for_ids(&client_order_ids, side)
2104    }
2105
2106    /// Returns references to all closed orders matching the given optional filter parameters.
2107    #[must_use]
2108    pub fn orders_closed(
2109        &self,
2110        venue: Option<&Venue>,
2111        instrument_id: Option<&InstrumentId>,
2112        strategy_id: Option<&StrategyId>,
2113        side: Option<OrderSide>,
2114    ) -> Vec<&OrderAny> {
2115        let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2116        self.get_orders_for_ids(&client_order_ids, side)
2117    }
2118
2119    /// Returns references to all emulated orders matching the given optional filter parameters.
2120    #[must_use]
2121    pub fn orders_emulated(
2122        &self,
2123        venue: Option<&Venue>,
2124        instrument_id: Option<&InstrumentId>,
2125        strategy_id: Option<&StrategyId>,
2126        side: Option<OrderSide>,
2127    ) -> Vec<&OrderAny> {
2128        let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2129        self.get_orders_for_ids(&client_order_ids, side)
2130    }
2131
2132    /// Returns references to all in-flight orders matching the given optional filter parameters.
2133    #[must_use]
2134    pub fn orders_inflight(
2135        &self,
2136        venue: Option<&Venue>,
2137        instrument_id: Option<&InstrumentId>,
2138        strategy_id: Option<&StrategyId>,
2139        side: Option<OrderSide>,
2140    ) -> Vec<&OrderAny> {
2141        let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2142        self.get_orders_for_ids(&client_order_ids, side)
2143    }
2144
2145    /// Returns references to all orders for the given `position_id`.
2146    #[must_use]
2147    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2148        let client_order_ids = self.index.position_orders.get(position_id);
2149        match client_order_ids {
2150            Some(client_order_ids) => {
2151                self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2152            }
2153            None => Vec::new(),
2154        }
2155    }
2156
2157    /// Returns whether an order with the given `client_order_id` exists.
2158    #[must_use]
2159    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2160        self.index.orders.contains(client_order_id)
2161    }
2162
2163    /// Returns whether an order with the given `client_order_id` is open.
2164    #[must_use]
2165    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2166        self.index.orders_open.contains(client_order_id)
2167    }
2168
2169    /// Returns whether an order with the given `client_order_id` is closed.
2170    #[must_use]
2171    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2172        self.index.orders_closed.contains(client_order_id)
2173    }
2174
2175    /// Returns whether an order with the given `client_order_id` is emulated.
2176    #[must_use]
2177    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2178        self.index.orders_emulated.contains(client_order_id)
2179    }
2180
2181    /// Returns whether an order with the given `client_order_id` is in-flight.
2182    #[must_use]
2183    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2184        self.index.orders_inflight.contains(client_order_id)
2185    }
2186
2187    /// Returns whether an order with the given `client_order_id` is `PENDING_CANCEL` locally.
2188    #[must_use]
2189    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2190        self.index.orders_pending_cancel.contains(client_order_id)
2191    }
2192
2193    /// Returns the count of all open orders.
2194    #[must_use]
2195    pub fn orders_open_count(
2196        &self,
2197        venue: Option<&Venue>,
2198        instrument_id: Option<&InstrumentId>,
2199        strategy_id: Option<&StrategyId>,
2200        side: Option<OrderSide>,
2201    ) -> usize {
2202        self.orders_open(venue, instrument_id, strategy_id, side)
2203            .len()
2204    }
2205
2206    /// Returns the count of all closed orders.
2207    #[must_use]
2208    pub fn orders_closed_count(
2209        &self,
2210        venue: Option<&Venue>,
2211        instrument_id: Option<&InstrumentId>,
2212        strategy_id: Option<&StrategyId>,
2213        side: Option<OrderSide>,
2214    ) -> usize {
2215        self.orders_closed(venue, instrument_id, strategy_id, side)
2216            .len()
2217    }
2218
2219    /// Returns the count of all emulated orders.
2220    #[must_use]
2221    pub fn orders_emulated_count(
2222        &self,
2223        venue: Option<&Venue>,
2224        instrument_id: Option<&InstrumentId>,
2225        strategy_id: Option<&StrategyId>,
2226        side: Option<OrderSide>,
2227    ) -> usize {
2228        self.orders_emulated(venue, instrument_id, strategy_id, side)
2229            .len()
2230    }
2231
2232    /// Returns the count of all in-flight orders.
2233    #[must_use]
2234    pub fn orders_inflight_count(
2235        &self,
2236        venue: Option<&Venue>,
2237        instrument_id: Option<&InstrumentId>,
2238        strategy_id: Option<&StrategyId>,
2239        side: Option<OrderSide>,
2240    ) -> usize {
2241        self.orders_inflight(venue, instrument_id, strategy_id, side)
2242            .len()
2243    }
2244
2245    /// Returns the count of all orders.
2246    #[must_use]
2247    pub fn orders_total_count(
2248        &self,
2249        venue: Option<&Venue>,
2250        instrument_id: Option<&InstrumentId>,
2251        strategy_id: Option<&StrategyId>,
2252        side: Option<OrderSide>,
2253    ) -> usize {
2254        self.orders(venue, instrument_id, strategy_id, side).len()
2255    }
2256
2257    /// Returns the order list for the given `order_list_id`.
2258    #[must_use]
2259    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2260        self.order_lists.get(order_list_id)
2261    }
2262
2263    /// Returns all order lists matching the given optional filter parameters.
2264    #[must_use]
2265    pub fn order_lists(
2266        &self,
2267        venue: Option<&Venue>,
2268        instrument_id: Option<&InstrumentId>,
2269        strategy_id: Option<&StrategyId>,
2270    ) -> Vec<&OrderList> {
2271        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2272
2273        if let Some(venue) = venue {
2274            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2275        }
2276
2277        if let Some(instrument_id) = instrument_id {
2278            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2279        }
2280
2281        if let Some(strategy_id) = strategy_id {
2282            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2283        }
2284
2285        order_lists
2286    }
2287
2288    /// Returns whether an order list with the given `order_list_id` exists.
2289    #[must_use]
2290    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2291        self.order_lists.contains_key(order_list_id)
2292    }
2293
2294    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
2295
2296    /// Returns references to all orders associated with the given `exec_algorithm_id` matching the given
2297    /// optional filter parameters.
2298    #[must_use]
2299    pub fn orders_for_exec_algorithm(
2300        &self,
2301        exec_algorithm_id: &ExecAlgorithmId,
2302        venue: Option<&Venue>,
2303        instrument_id: Option<&InstrumentId>,
2304        strategy_id: Option<&StrategyId>,
2305        side: Option<OrderSide>,
2306    ) -> Vec<&OrderAny> {
2307        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2308        let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2309
2310        if let Some(query) = query {
2311            if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2312                let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2313            }
2314        }
2315
2316        if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2317            self.get_orders_for_ids(exec_algorithm_order_ids, side)
2318        } else {
2319            Vec::new()
2320        }
2321    }
2322
2323    /// Returns references to all orders with the given `exec_spawn_id`.
2324    #[must_use]
2325    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2326        self.get_orders_for_ids(
2327            self.index
2328                .exec_spawn_orders
2329                .get(exec_spawn_id)
2330                .unwrap_or(&HashSet::new()),
2331            None,
2332        )
2333    }
2334
2335    /// Returns the total order quantity for the given `exec_spawn_id`.
2336    #[must_use]
2337    pub fn exec_spawn_total_quantity(
2338        &self,
2339        exec_spawn_id: &ClientOrderId,
2340        active_only: bool,
2341    ) -> Option<Quantity> {
2342        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2343
2344        let mut total_quantity: Option<Quantity> = None;
2345
2346        for spawn_order in exec_spawn_orders {
2347            if !active_only || !spawn_order.is_closed() {
2348                if let Some(mut total_quantity) = total_quantity {
2349                    total_quantity += spawn_order.quantity();
2350                }
2351            } else {
2352                total_quantity = Some(spawn_order.quantity());
2353            }
2354        }
2355
2356        total_quantity
2357    }
2358
2359    /// Returns the total filled quantity for all orders with the given `exec_spawn_id`.
2360    #[must_use]
2361    pub fn exec_spawn_total_filled_qty(
2362        &self,
2363        exec_spawn_id: &ClientOrderId,
2364        active_only: bool,
2365    ) -> Option<Quantity> {
2366        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2367
2368        let mut total_quantity: Option<Quantity> = None;
2369
2370        for spawn_order in exec_spawn_orders {
2371            if !active_only || !spawn_order.is_closed() {
2372                if let Some(mut total_quantity) = total_quantity {
2373                    total_quantity += spawn_order.filled_qty();
2374                }
2375            } else {
2376                total_quantity = Some(spawn_order.filled_qty());
2377            }
2378        }
2379
2380        total_quantity
2381    }
2382
2383    /// Returns the total leaves quantity for all orders with the given `exec_spawn_id`.
2384    #[must_use]
2385    pub fn exec_spawn_total_leaves_qty(
2386        &self,
2387        exec_spawn_id: &ClientOrderId,
2388        active_only: bool,
2389    ) -> Option<Quantity> {
2390        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2391
2392        let mut total_quantity: Option<Quantity> = None;
2393
2394        for spawn_order in exec_spawn_orders {
2395            if !active_only || !spawn_order.is_closed() {
2396                if let Some(mut total_quantity) = total_quantity {
2397                    total_quantity += spawn_order.leaves_qty();
2398                }
2399            } else {
2400                total_quantity = Some(spawn_order.leaves_qty());
2401            }
2402        }
2403
2404        total_quantity
2405    }
2406
2407    // -- POSITION QUERIES ------------------------------------------------------------------------
2408
2409    /// Returns a reference to the position with the given `position_id` (if found).
2410    #[must_use]
2411    pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2412        self.positions.get(position_id)
2413    }
2414
2415    /// Returns a reference to the position for the given `client_order_id` (if found).
2416    #[must_use]
2417    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2418        self.index
2419            .order_position
2420            .get(client_order_id)
2421            .and_then(|position_id| self.positions.get(position_id))
2422    }
2423
2424    /// Returns a reference to the position ID for the given `client_order_id` (if found).
2425    #[must_use]
2426    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2427        self.index.order_position.get(client_order_id)
2428    }
2429
2430    /// Returns a reference to all positions matching the given optional filter parameters.
2431    #[must_use]
2432    pub fn positions(
2433        &self,
2434        venue: Option<&Venue>,
2435        instrument_id: Option<&InstrumentId>,
2436        strategy_id: Option<&StrategyId>,
2437        side: Option<PositionSide>,
2438    ) -> Vec<&Position> {
2439        let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2440        self.get_positions_for_ids(&position_ids, side)
2441    }
2442
2443    /// Returns a reference to all open positions matching the given optional filter parameters.
2444    #[must_use]
2445    pub fn positions_open(
2446        &self,
2447        venue: Option<&Venue>,
2448        instrument_id: Option<&InstrumentId>,
2449        strategy_id: Option<&StrategyId>,
2450        side: Option<PositionSide>,
2451    ) -> Vec<&Position> {
2452        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2453        self.get_positions_for_ids(&position_ids, side)
2454    }
2455
2456    /// Returns a reference to all closed positions matching the given optional filter parameters.
2457    #[must_use]
2458    pub fn positions_closed(
2459        &self,
2460        venue: Option<&Venue>,
2461        instrument_id: Option<&InstrumentId>,
2462        strategy_id: Option<&StrategyId>,
2463        side: Option<PositionSide>,
2464    ) -> Vec<&Position> {
2465        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2466        self.get_positions_for_ids(&position_ids, side)
2467    }
2468
2469    /// Returns whether a position with the given `position_id` exists.
2470    #[must_use]
2471    pub fn position_exists(&self, position_id: &PositionId) -> bool {
2472        self.index.positions.contains(position_id)
2473    }
2474
2475    /// Returns whether a position with the given `position_id` is open.
2476    #[must_use]
2477    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2478        self.index.positions_open.contains(position_id)
2479    }
2480
2481    /// Returns whether a position with the given `position_id` is closed.
2482    #[must_use]
2483    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2484        self.index.positions_closed.contains(position_id)
2485    }
2486
2487    /// Returns the count of all open positions.
2488    #[must_use]
2489    pub fn positions_open_count(
2490        &self,
2491        venue: Option<&Venue>,
2492        instrument_id: Option<&InstrumentId>,
2493        strategy_id: Option<&StrategyId>,
2494        side: Option<PositionSide>,
2495    ) -> usize {
2496        self.positions_open(venue, instrument_id, strategy_id, side)
2497            .len()
2498    }
2499
2500    /// Returns the count of all closed positions.
2501    #[must_use]
2502    pub fn positions_closed_count(
2503        &self,
2504        venue: Option<&Venue>,
2505        instrument_id: Option<&InstrumentId>,
2506        strategy_id: Option<&StrategyId>,
2507        side: Option<PositionSide>,
2508    ) -> usize {
2509        self.positions_closed(venue, instrument_id, strategy_id, side)
2510            .len()
2511    }
2512
2513    /// Returns the count of all positions.
2514    #[must_use]
2515    pub fn positions_total_count(
2516        &self,
2517        venue: Option<&Venue>,
2518        instrument_id: Option<&InstrumentId>,
2519        strategy_id: Option<&StrategyId>,
2520        side: Option<PositionSide>,
2521    ) -> usize {
2522        self.positions(venue, instrument_id, strategy_id, side)
2523            .len()
2524    }
2525
2526    // -- STRATEGY QUERIES ------------------------------------------------------------------------
2527
2528    /// Gets a reference to the strategy ID for the given `client_order_id` (if found).
2529    #[must_use]
2530    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2531        self.index.order_strategy.get(client_order_id)
2532    }
2533
2534    /// Gets a reference to the strategy ID for the given `position_id` (if found).
2535    #[must_use]
2536    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2537        self.index.position_strategy.get(position_id)
2538    }
2539
2540    // -- GENERAL ---------------------------------------------------------------------------------
2541
2542    /// Gets a reference to the general object value for the given `key` (if found).
2543    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2544        check_valid_string(key, stringify!(key)).expect(FAILED);
2545
2546        Ok(self.general.get(key))
2547    }
2548
2549    // -- DATA QUERIES ----------------------------------------------------------------------------
2550
2551    /// Returns the price for the given `instrument_id` and `price_type` (if found).
2552    #[must_use]
2553    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2554        match price_type {
2555            PriceType::Bid => self
2556                .quotes
2557                .get(instrument_id)
2558                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2559            PriceType::Ask => self
2560                .quotes
2561                .get(instrument_id)
2562                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2563            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2564                quotes.front().map(|quote| {
2565                    Price::new(
2566                        f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2567                        quote.bid_price.precision + 1,
2568                    )
2569                })
2570            }),
2571            PriceType::Last => self
2572                .trades
2573                .get(instrument_id)
2574                .and_then(|trades| trades.front().map(|trade| trade.price)),
2575            PriceType::Mark => self
2576                .mark_prices
2577                .get(instrument_id)
2578                .and_then(|marks| marks.front().map(|mark| mark.value)),
2579        }
2580    }
2581
2582    /// Gets all quotes for the given `instrument_id`.
2583    #[must_use]
2584    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2585        self.quotes
2586            .get(instrument_id)
2587            .map(|quotes| quotes.iter().copied().collect())
2588    }
2589
2590    /// Gets all trades for the given `instrument_id`.
2591    #[must_use]
2592    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2593        self.trades
2594            .get(instrument_id)
2595            .map(|trades| trades.iter().copied().collect())
2596    }
2597
2598    /// Gets all mark price updates for the given `instrument_id`.
2599    #[must_use]
2600    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
2601        self.mark_prices
2602            .get(instrument_id)
2603            .map(|mark_prices| mark_prices.iter().copied().collect())
2604    }
2605
2606    /// Gets all index price updates for the given `instrument_id`.
2607    #[must_use]
2608    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
2609        self.index_prices
2610            .get(instrument_id)
2611            .map(|index_prices| index_prices.iter().copied().collect())
2612    }
2613
2614    /// Gets all bars for the given `bar_type`.
2615    #[must_use]
2616    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2617        self.bars
2618            .get(bar_type)
2619            .map(|bars| bars.iter().copied().collect())
2620    }
2621
2622    /// Gets a reference to the order book for the given `instrument_id`.
2623    #[must_use]
2624    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
2625        self.books.get(instrument_id)
2626    }
2627
2628    /// Gets a reference to the order book for the given `instrument_id`.
2629    #[must_use]
2630    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
2631        self.books.get_mut(instrument_id)
2632    }
2633
2634    /// Gets a reference to the own order book for the given `instrument_id`.
2635    #[must_use]
2636    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
2637        self.own_books.get(instrument_id)
2638    }
2639
2640    /// Gets a reference to the own order book for the given `instrument_id`.
2641    #[must_use]
2642    pub fn own_order_book_mut(
2643        &mut self,
2644        instrument_id: &InstrumentId,
2645    ) -> Option<&mut OwnOrderBook> {
2646        self.own_books.get_mut(instrument_id)
2647    }
2648
2649    /// Gets a reference to the latest quote tick for the given `instrument_id`.
2650    #[must_use]
2651    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
2652        self.quotes
2653            .get(instrument_id)
2654            .and_then(|quotes| quotes.front())
2655    }
2656
2657    /// Gets a reference to the latest trade tick for the given `instrument_id`.
2658    #[must_use]
2659    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
2660        self.trades
2661            .get(instrument_id)
2662            .and_then(|trades| trades.front())
2663    }
2664
2665    /// Gets a referenece to the latest mark price update for the given `instrument_id`.
2666    #[must_use]
2667    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
2668        self.mark_prices
2669            .get(instrument_id)
2670            .and_then(|mark_prices| mark_prices.front())
2671    }
2672
2673    /// Gets a referenece to the latest index price update for the given `instrument_id`.
2674    #[must_use]
2675    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
2676        self.index_prices
2677            .get(instrument_id)
2678            .and_then(|index_prices| index_prices.front())
2679    }
2680
2681    /// Gets a reference to the latest bar for the given `bar_type`.
2682    #[must_use]
2683    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
2684        self.bars.get(bar_type).and_then(|bars| bars.front())
2685    }
2686
2687    /// Gets the order book update count for the given `instrument_id`.
2688    #[must_use]
2689    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
2690        self.books
2691            .get(instrument_id)
2692            .map_or(0, |book| book.update_count) as usize
2693    }
2694
2695    /// Gets the quote tick count for the given `instrument_id`.
2696    #[must_use]
2697    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
2698        self.quotes
2699            .get(instrument_id)
2700            .map_or(0, std::collections::VecDeque::len)
2701    }
2702
2703    /// Gets the trade tick count for the given `instrument_id`.
2704    #[must_use]
2705    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
2706        self.trades
2707            .get(instrument_id)
2708            .map_or(0, std::collections::VecDeque::len)
2709    }
2710
2711    /// Gets the bar count for the given `instrument_id`.
2712    #[must_use]
2713    pub fn bar_count(&self, bar_type: &BarType) -> usize {
2714        self.bars
2715            .get(bar_type)
2716            .map_or(0, std::collections::VecDeque::len)
2717    }
2718
2719    /// Returns whether the cache contains an order book for the given `instrument_id`.
2720    #[must_use]
2721    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
2722        self.books.contains_key(instrument_id)
2723    }
2724
2725    /// Returns whether the cache contains quotes for the given `instrument_id`.
2726    #[must_use]
2727    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
2728        self.quote_count(instrument_id) > 0
2729    }
2730
2731    /// Returns whether the cache contains trades for the given `instrument_id`.
2732    #[must_use]
2733    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
2734        self.trade_count(instrument_id) > 0
2735    }
2736
2737    /// Returns whether the cache contains bars for the given `bar_type`.
2738    #[must_use]
2739    pub fn has_bars(&self, bar_type: &BarType) -> bool {
2740        self.bar_count(bar_type) > 0
2741    }
2742
2743    #[must_use]
2744    pub fn get_xrate(
2745        &self,
2746        venue: Venue,
2747        from_currency: Currency,
2748        to_currency: Currency,
2749        price_type: PriceType,
2750    ) -> Option<f64> {
2751        if from_currency == to_currency {
2752            // When the source and target currencies are identical,
2753            // no conversion is needed; return an exchange rate of 1.0.
2754            return Some(1.0);
2755        }
2756
2757        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
2758
2759        match get_exchange_rate(
2760            from_currency.code,
2761            to_currency.code,
2762            price_type,
2763            bid_quote,
2764            ask_quote,
2765        ) {
2766            Ok(rate) => rate,
2767            Err(e) => {
2768                log::error!("Failed to calculate xrate: {e}");
2769                None
2770            }
2771        }
2772    }
2773
2774    fn build_quote_table(&self, venue: &Venue) -> (HashMap<String, f64>, HashMap<String, f64>) {
2775        let mut bid_quotes = HashMap::new();
2776        let mut ask_quotes = HashMap::new();
2777
2778        for instrument_id in self.instruments.keys() {
2779            if instrument_id.venue != *venue {
2780                continue;
2781            }
2782
2783            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
2784                if let Some(tick) = ticks.front() {
2785                    (tick.bid_price, tick.ask_price)
2786                } else {
2787                    continue; // Empty ticks vector
2788                }
2789            } else {
2790                let bid_bar = self
2791                    .bars
2792                    .iter()
2793                    .find(|(k, _)| {
2794                        k.instrument_id() == *instrument_id
2795                            && matches!(k.spec().price_type, PriceType::Bid)
2796                    })
2797                    .map(|(_, v)| v);
2798
2799                let ask_bar = self
2800                    .bars
2801                    .iter()
2802                    .find(|(k, _)| {
2803                        k.instrument_id() == *instrument_id
2804                            && matches!(k.spec().price_type, PriceType::Ask)
2805                    })
2806                    .map(|(_, v)| v);
2807
2808                match (bid_bar, ask_bar) {
2809                    (Some(bid), Some(ask)) => {
2810                        let bid_price = bid.front().unwrap().close;
2811                        let ask_price = ask.front().unwrap().close;
2812
2813                        (bid_price, ask_price)
2814                    }
2815                    _ => continue,
2816                }
2817            };
2818
2819            bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
2820            ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
2821        }
2822
2823        (bid_quotes, ask_quotes)
2824    }
2825
2826    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
2827    #[must_use]
2828    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
2829        self.mark_xrates.get(&(from_currency, to_currency)).copied()
2830    }
2831
2832    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
2833    ///
2834    /// # Panics
2835    ///
2836    /// This function panics if `xrate` is not positive.
2837    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
2838        assert!(xrate > 0.0, "xrate was zero");
2839        self.mark_xrates.insert((from_currency, to_currency), xrate);
2840        self.mark_xrates
2841            .insert((to_currency, from_currency), 1.0 / xrate);
2842    }
2843
2844    /// Clears the mark exchange rate for the given currency pair.
2845    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
2846        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
2847    }
2848
2849    /// Clears all mark exchange rates.
2850    pub fn clear_mark_xrates(&mut self) {
2851        self.mark_xrates.clear();
2852    }
2853
2854    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
2855
2856    /// Returns a reference to the instrument for the given `instrument_id` (if found).
2857    #[must_use]
2858    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
2859        self.instruments.get(instrument_id)
2860    }
2861
2862    /// Returns references to all instrument IDs for the given `venue`.
2863    #[must_use]
2864    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
2865        self.instruments
2866            .keys()
2867            .filter(|i| venue.is_none() || &i.venue == venue.unwrap())
2868            .collect()
2869    }
2870
2871    /// Returns references to all instruments for the given `venue`.
2872    #[must_use]
2873    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
2874        self.instruments
2875            .values()
2876            .filter(|i| &i.id().venue == venue)
2877            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
2878            .collect()
2879    }
2880
2881    /// Returns references to all bar types contained in the cache.
2882    #[must_use]
2883    pub fn bar_types(
2884        &self,
2885        instrument_id: Option<&InstrumentId>,
2886        price_type: Option<&PriceType>,
2887        aggregation_source: AggregationSource,
2888    ) -> Vec<&BarType> {
2889        let mut bar_types = self
2890            .bars
2891            .keys()
2892            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
2893            .collect::<Vec<&BarType>>();
2894
2895        if let Some(instrument_id) = instrument_id {
2896            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
2897        }
2898
2899        if let Some(price_type) = price_type {
2900            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
2901        }
2902
2903        bar_types
2904    }
2905
2906    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
2907
2908    /// Returns a reference to the synthetic instrument for the given `instrument_id` (if found).
2909    #[must_use]
2910    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
2911        self.synthetics.get(instrument_id)
2912    }
2913
2914    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
2915    #[must_use]
2916    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
2917        self.synthetics.keys().collect()
2918    }
2919
2920    /// Returns references to all synthetic instruments contained in the cache.
2921    #[must_use]
2922    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
2923        self.synthetics.values().collect()
2924    }
2925
2926    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
2927
2928    /// Returns a reference to the account for the given `account_id` (if found).
2929    #[must_use]
2930    pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
2931        self.accounts.get(account_id)
2932    }
2933
2934    /// Returns a reference to the account for the given `venue` (if found).
2935    #[must_use]
2936    pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
2937        self.index
2938            .venue_account
2939            .get(venue)
2940            .and_then(|account_id| self.accounts.get(account_id))
2941    }
2942
2943    /// Returns a reference to the account ID for the given `venue` (if found).
2944    #[must_use]
2945    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
2946        self.index.venue_account.get(venue)
2947    }
2948
2949    /// Returns references to all accounts for the given `account_id`.
2950    #[must_use]
2951    pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
2952        self.accounts
2953            .values()
2954            .filter(|account| &account.id() == account_id)
2955            .collect()
2956    }
2957}