nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! A common in-memory `Cache` for market and execution related data.
17
18// Under development
19#![allow(dead_code)]
20#![allow(unused_variables)]
21
22pub mod database;
23
24#[cfg(test)]
25mod tests;
26
27use std::{
28    collections::{HashMap, HashSet, VecDeque},
29    time::{SystemTime, UNIX_EPOCH},
30};
31
32use bytes::Bytes;
33use database::CacheDatabaseAdapter;
34use nautilus_core::{
35    correctness::{
36        check_key_not_in_map, check_predicate_false, check_slice_not_empty, check_valid_string,
37        FAILED,
38    },
39    UUID4,
40};
41use nautilus_model::{
42    accounts::AccountAny,
43    data::{Bar, BarType, QuoteTick, TradeTick},
44    enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
45    identifiers::{
46        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
47        OrderListId, PositionId, StrategyId, Symbol, Venue, VenueOrderId,
48    },
49    instruments::{InstrumentAny, SyntheticInstrument},
50    orderbook::OrderBook,
51    orders::{OrderAny, OrderList},
52    position::Position,
53    types::{Currency, Money, Price, Quantity},
54};
55use rust_decimal::Decimal;
56use serde::{Deserialize, Serialize};
57use ustr::Ustr;
58
59use crate::{
60    enums::SerializationEncoding, msgbus::database::DatabaseConfig, xrate::get_exchange_rate,
61};
62
63/// Configuration for `Cache` instances.
64#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(default)]
66pub struct CacheConfig {
67    /// The configuration for the cache backing database.
68    pub database: Option<DatabaseConfig>,
69    /// The encoding for database operations, controls the type of serializer used.
70    pub encoding: SerializationEncoding,
71    /// If timestamps should be persisted as ISO 8601 strings.
72    pub timestamps_as_iso8601: bool,
73    /// The buffer interval (milliseconds) between pipelined/batched transactions.
74    pub buffer_interval_ms: Option<usize>,
75    /// If a 'trader-' prefix is used for keys.
76    pub use_trader_prefix: bool,
77    /// If the trader's instance ID is used for keys.
78    pub use_instance_id: bool,
79    /// If the database should be flushed on start.
80    pub flush_on_start: bool,
81    /// If instrument data should be dropped from the cache's memory on reset.
82    pub drop_instruments_on_reset: bool,
83    /// The maximum length for internal tick deques.
84    pub tick_capacity: usize,
85    /// The maximum length for internal bar deques.
86    pub bar_capacity: usize,
87    /// If market data should be persisted to disk.
88    pub save_market_data: bool,
89}
90
91impl Default for CacheConfig {
92    /// Creates a new default [`CacheConfig`] instance.
93    fn default() -> Self {
94        Self {
95            database: None,
96            encoding: SerializationEncoding::MsgPack,
97            timestamps_as_iso8601: false,
98            buffer_interval_ms: None,
99            use_trader_prefix: true,
100            use_instance_id: false,
101            flush_on_start: false,
102            drop_instruments_on_reset: true,
103            tick_capacity: 10_000,
104            bar_capacity: 10_000,
105            save_market_data: false,
106        }
107    }
108}
109
110impl CacheConfig {
111    /// Creates a new [`CacheConfig`] instance.
112    #[allow(clippy::too_many_arguments)]
113    #[must_use]
114    pub const fn new(
115        database: Option<DatabaseConfig>,
116        encoding: SerializationEncoding,
117        timestamps_as_iso8601: bool,
118        buffer_interval_ms: Option<usize>,
119        use_trader_prefix: bool,
120        use_instance_id: bool,
121        flush_on_start: bool,
122        drop_instruments_on_reset: bool,
123        tick_capacity: usize,
124        bar_capacity: usize,
125        save_market_data: bool,
126    ) -> Self {
127        Self {
128            database,
129            encoding,
130            timestamps_as_iso8601,
131            buffer_interval_ms,
132            use_trader_prefix,
133            use_instance_id,
134            flush_on_start,
135            drop_instruments_on_reset,
136            tick_capacity,
137            bar_capacity,
138            save_market_data,
139        }
140    }
141}
142
143/// A key-value lookup index for a `Cache`.
144#[derive(Debug)]
145pub struct CacheIndex {
146    venue_account: HashMap<Venue, AccountId>,
147    venue_orders: HashMap<Venue, HashSet<ClientOrderId>>,
148    venue_positions: HashMap<Venue, HashSet<PositionId>>,
149    venue_order_ids: HashMap<VenueOrderId, ClientOrderId>,
150    client_order_ids: HashMap<ClientOrderId, VenueOrderId>,
151    order_position: HashMap<ClientOrderId, PositionId>,
152    order_strategy: HashMap<ClientOrderId, StrategyId>,
153    order_client: HashMap<ClientOrderId, ClientId>,
154    position_strategy: HashMap<PositionId, StrategyId>,
155    position_orders: HashMap<PositionId, HashSet<ClientOrderId>>,
156    instrument_orders: HashMap<InstrumentId, HashSet<ClientOrderId>>,
157    instrument_positions: HashMap<InstrumentId, HashSet<PositionId>>,
158    strategy_orders: HashMap<StrategyId, HashSet<ClientOrderId>>,
159    strategy_positions: HashMap<StrategyId, HashSet<PositionId>>,
160    exec_algorithm_orders: HashMap<ExecAlgorithmId, HashSet<ClientOrderId>>,
161    exec_spawn_orders: HashMap<ClientOrderId, HashSet<ClientOrderId>>,
162    orders: HashSet<ClientOrderId>,
163    orders_open: HashSet<ClientOrderId>,
164    orders_closed: HashSet<ClientOrderId>,
165    orders_emulated: HashSet<ClientOrderId>,
166    orders_inflight: HashSet<ClientOrderId>,
167    orders_pending_cancel: HashSet<ClientOrderId>,
168    positions: HashSet<PositionId>,
169    positions_open: HashSet<PositionId>,
170    positions_closed: HashSet<PositionId>,
171    actors: HashSet<ComponentId>,
172    strategies: HashSet<StrategyId>,
173    exec_algorithms: HashSet<ExecAlgorithmId>,
174}
175
176impl CacheIndex {
177    /// Clears the index which will clear/reset all internal state.
178    pub fn clear(&mut self) {
179        self.venue_account.clear();
180        self.venue_orders.clear();
181        self.venue_positions.clear();
182        self.venue_order_ids.clear();
183        self.client_order_ids.clear();
184        self.order_position.clear();
185        self.order_strategy.clear();
186        self.order_client.clear();
187        self.position_strategy.clear();
188        self.position_orders.clear();
189        self.instrument_orders.clear();
190        self.instrument_positions.clear();
191        self.strategy_orders.clear();
192        self.strategy_positions.clear();
193        self.exec_algorithm_orders.clear();
194        self.exec_spawn_orders.clear();
195        self.orders.clear();
196        self.orders_open.clear();
197        self.orders_closed.clear();
198        self.orders_emulated.clear();
199        self.orders_inflight.clear();
200        self.orders_pending_cancel.clear();
201        self.positions.clear();
202        self.positions_open.clear();
203        self.positions_closed.clear();
204        self.actors.clear();
205        self.strategies.clear();
206        self.exec_algorithms.clear();
207    }
208}
209
210/// A common in-memory `Cache` for market and execution related data.
211pub struct Cache {
212    config: CacheConfig,
213    index: CacheIndex,
214    database: Option<Box<dyn CacheDatabaseAdapter>>,
215    general: HashMap<String, Bytes>,
216    quotes: HashMap<InstrumentId, VecDeque<QuoteTick>>,
217    trades: HashMap<InstrumentId, VecDeque<TradeTick>>,
218    books: HashMap<InstrumentId, OrderBook>,
219    bars: HashMap<BarType, VecDeque<Bar>>,
220    currencies: HashMap<Ustr, Currency>,
221    instruments: HashMap<InstrumentId, InstrumentAny>,
222    synthetics: HashMap<InstrumentId, SyntheticInstrument>,
223    accounts: HashMap<AccountId, AccountAny>,
224    orders: HashMap<ClientOrderId, OrderAny>,
225    order_lists: HashMap<OrderListId, OrderList>,
226    positions: HashMap<PositionId, Position>,
227    position_snapshots: HashMap<PositionId, Bytes>,
228}
229
230// SAFETY: Cache is not meant to be passed between threads
231unsafe impl Send for Cache {}
232unsafe impl Sync for Cache {}
233
234impl Default for Cache {
235    /// Creates a new default [`Cache`] instance.
236    fn default() -> Self {
237        Self::new(Some(CacheConfig::default()), None)
238    }
239}
240
241impl Cache {
242    /// Creates a new [`Cache`] instance.
243    #[must_use]
244    pub fn new(
245        config: Option<CacheConfig>,
246        database: Option<Box<dyn CacheDatabaseAdapter>>,
247    ) -> Self {
248        let index = CacheIndex {
249            venue_account: HashMap::new(),
250            venue_orders: HashMap::new(),
251            venue_positions: HashMap::new(),
252            venue_order_ids: HashMap::new(),
253            client_order_ids: HashMap::new(),
254            order_position: HashMap::new(),
255            order_strategy: HashMap::new(),
256            order_client: HashMap::new(),
257            position_strategy: HashMap::new(),
258            position_orders: HashMap::new(),
259            instrument_orders: HashMap::new(),
260            instrument_positions: HashMap::new(),
261            strategy_orders: HashMap::new(),
262            strategy_positions: HashMap::new(),
263            exec_algorithm_orders: HashMap::new(),
264            exec_spawn_orders: HashMap::new(),
265            orders: HashSet::new(),
266            orders_open: HashSet::new(),
267            orders_closed: HashSet::new(),
268            orders_emulated: HashSet::new(),
269            orders_inflight: HashSet::new(),
270            orders_pending_cancel: HashSet::new(),
271            positions: HashSet::new(),
272            positions_open: HashSet::new(),
273            positions_closed: HashSet::new(),
274            actors: HashSet::new(),
275            strategies: HashSet::new(),
276            exec_algorithms: HashSet::new(),
277        };
278
279        Self {
280            config: config.unwrap_or_default(),
281            index,
282            database,
283            general: HashMap::new(),
284            quotes: HashMap::new(),
285            trades: HashMap::new(),
286            books: HashMap::new(),
287            bars: HashMap::new(),
288            currencies: HashMap::new(),
289            instruments: HashMap::new(),
290            synthetics: HashMap::new(),
291            accounts: HashMap::new(),
292            orders: HashMap::new(),
293            order_lists: HashMap::new(),
294            positions: HashMap::new(),
295            position_snapshots: HashMap::new(),
296        }
297    }
298
299    /// Returns the cache instances memory address.
300    #[must_use]
301    pub fn memory_address(&self) -> String {
302        format!("{:?}", std::ptr::from_ref(self))
303    }
304
305    // -- COMMANDS --------------------------------------------------------------------------------
306
307    /// Clears the current general cache and loads the general objects from the cache database.
308    pub fn cache_general(&mut self) -> anyhow::Result<()> {
309        self.general = match &mut self.database {
310            Some(db) => db.load()?,
311            None => HashMap::new(),
312        };
313
314        log::info!(
315            "Cached {} general object(s) from database",
316            self.general.len()
317        );
318        Ok(())
319    }
320
321    /// Clears the current currencies cache and loads currencies from the cache database.
322    pub fn cache_currencies(&mut self) -> anyhow::Result<()> {
323        self.currencies = match &mut self.database {
324            Some(db) => db.load_currencies()?,
325            None => HashMap::new(),
326        };
327
328        log::info!("Cached {} currencies from database", self.general.len());
329        Ok(())
330    }
331
332    /// Clears the current instruments cache and loads instruments from the cache database.
333    pub fn cache_instruments(&mut self) -> anyhow::Result<()> {
334        self.instruments = match &mut self.database {
335            Some(db) => db.load_instruments()?,
336            None => HashMap::new(),
337        };
338
339        log::info!("Cached {} instruments from database", self.general.len());
340        Ok(())
341    }
342
343    /// Clears the current synthetic instruments cache and loads synthetic instruments from the cache
344    /// database.
345    pub fn cache_synthetics(&mut self) -> anyhow::Result<()> {
346        self.synthetics = match &mut self.database {
347            Some(db) => db.load_synthetics()?,
348            None => HashMap::new(),
349        };
350
351        log::info!(
352            "Cached {} synthetic instruments from database",
353            self.general.len()
354        );
355        Ok(())
356    }
357
358    /// Clears the current accounts cache and loads accounts from the cache database.
359    pub fn cache_accounts(&mut self) -> anyhow::Result<()> {
360        self.accounts = match &mut self.database {
361            Some(db) => db.load_accounts()?,
362            None => HashMap::new(),
363        };
364
365        log::info!(
366            "Cached {} synthetic instruments from database",
367            self.general.len()
368        );
369        Ok(())
370    }
371
372    /// Clears the current orders cache and loads orders from the cache database.
373    pub fn cache_orders(&mut self) -> anyhow::Result<()> {
374        self.orders = match &mut self.database {
375            Some(db) => db.load_orders()?,
376            None => HashMap::new(),
377        };
378
379        log::info!("Cached {} orders from database", self.general.len());
380        Ok(())
381    }
382
383    /// Clears the current positions cache and loads positions from the cache database.
384    pub fn cache_positions(&mut self) -> anyhow::Result<()> {
385        self.positions = match &mut self.database {
386            Some(db) => db.load_positions()?,
387            None => HashMap::new(),
388        };
389
390        log::info!("Cached {} positions from database", self.general.len());
391        Ok(())
392    }
393
394    /// Clears the current cache index and re-build.
395    pub fn build_index(&mut self) {
396        self.index.clear();
397        log::debug!("Building index");
398
399        // Index accounts
400        for account_id in self.accounts.keys() {
401            self.index
402                .venue_account
403                .insert(account_id.get_issuer(), *account_id);
404        }
405
406        // Index orders
407        for (client_order_id, order) in &self.orders {
408            let instrument_id = order.instrument_id();
409            let venue = instrument_id.venue;
410            let strategy_id = order.strategy_id();
411
412            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
413            self.index
414                .venue_orders
415                .entry(venue)
416                .or_default()
417                .insert(*client_order_id);
418
419            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
420            if let Some(venue_order_id) = order.venue_order_id() {
421                self.index
422                    .venue_order_ids
423                    .insert(venue_order_id, *client_order_id);
424            }
425
426            // 3: Build index.order_position -> {ClientOrderId, PositionId}
427            if let Some(position_id) = order.position_id() {
428                self.index
429                    .order_position
430                    .insert(*client_order_id, position_id);
431            }
432
433            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
434            self.index
435                .order_strategy
436                .insert(*client_order_id, order.strategy_id());
437
438            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
439            self.index
440                .instrument_orders
441                .entry(instrument_id)
442                .or_default()
443                .insert(*client_order_id);
444
445            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
446            self.index
447                .strategy_orders
448                .entry(strategy_id)
449                .or_default()
450                .insert(*client_order_id);
451
452            // 7: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
453            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
454                self.index
455                    .exec_algorithm_orders
456                    .entry(exec_algorithm_id)
457                    .or_default()
458                    .insert(*client_order_id);
459            }
460
461            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
462            if let Some(exec_spawn_id) = order.exec_spawn_id() {
463                self.index
464                    .exec_spawn_orders
465                    .entry(exec_spawn_id)
466                    .or_default()
467                    .insert(*client_order_id);
468            }
469
470            // 9: Build index.orders -> {ClientOrderId}
471            self.index.orders.insert(*client_order_id);
472
473            // 10: Build index.orders_open -> {ClientOrderId}
474            if order.is_open() {
475                self.index.orders_open.insert(*client_order_id);
476            }
477
478            // 11: Build index.orders_closed -> {ClientOrderId}
479            if order.is_closed() {
480                self.index.orders_closed.insert(*client_order_id);
481            }
482
483            // 12: Build index.orders_emulated -> {ClientOrderId}
484            if let Some(emulation_trigger) = order.emulation_trigger() {
485                if emulation_trigger != TriggerType::NoTrigger && !order.is_closed() {
486                    self.index.orders_emulated.insert(*client_order_id);
487                }
488            }
489
490            // 13: Build index.orders_inflight -> {ClientOrderId}
491            if order.is_inflight() {
492                self.index.orders_inflight.insert(*client_order_id);
493            }
494
495            // 14: Build index.strategies -> {StrategyId}
496            self.index.strategies.insert(strategy_id);
497
498            // 15: Build index.strategies -> {ExecAlgorithmId}
499            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
500                self.index.exec_algorithms.insert(exec_algorithm_id);
501            }
502        }
503
504        // Index positions
505        for (position_id, position) in &self.positions {
506            let instrument_id = position.instrument_id;
507            let venue = instrument_id.venue;
508            let strategy_id = position.strategy_id;
509
510            // 1: Build index.venue_positions -> {Venue, {PositionId}}
511            self.index
512                .venue_positions
513                .entry(venue)
514                .or_default()
515                .insert(*position_id);
516
517            // 2: Build index.position_strategy -> {PositionId, StrategyId}
518            self.index
519                .position_strategy
520                .insert(*position_id, position.strategy_id);
521
522            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
523            self.index
524                .position_orders
525                .entry(*position_id)
526                .or_default()
527                .extend(position.client_order_ids().into_iter());
528
529            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
530            self.index
531                .instrument_positions
532                .entry(instrument_id)
533                .or_default()
534                .insert(*position_id);
535
536            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
537            self.index
538                .strategy_positions
539                .entry(strategy_id)
540                .or_default()
541                .insert(*position_id);
542
543            // 6: Build index.positions -> {PositionId}
544            self.index.positions.insert(*position_id);
545
546            // 7: Build index.positions_open -> {PositionId}
547            if position.is_open() {
548                self.index.positions_open.insert(*position_id);
549            }
550
551            // 8: Build index.positions_closed -> {PositionId}
552            if position.is_closed() {
553                self.index.positions_closed.insert(*position_id);
554            }
555
556            // 9: Build index.strategies -> {StrategyId}
557            self.index.strategies.insert(strategy_id);
558        }
559    }
560
561    /// Returns whether the cache has a backing database.
562    #[must_use]
563    pub const fn has_backing(&self) -> bool {
564        self.config.database.is_some()
565    }
566
567    // Calculate the unrealized profit and loss (PnL) for a given position.
568    #[must_use]
569    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
570        let quote = if let Some(quote) = self.quote(&position.instrument_id) {
571            quote
572        } else {
573            log::warn!(
574                "Cannot calculate unrealized PnL for {}, no quotes for {}",
575                position.id,
576                position.instrument_id
577            );
578            return None;
579        };
580
581        let last = match position.side {
582            PositionSide::Flat | PositionSide::NoPositionSide => {
583                return Some(Money::new(0.0, position.settlement_currency));
584            }
585            PositionSide::Long => quote.ask_price,
586            PositionSide::Short => quote.bid_price,
587        };
588
589        Some(position.unrealized_pnl(last))
590    }
591
592    /// Checks integrity of data within the cache.
593    ///
594    /// All data should be loaded from the database prior to this call.
595    /// If an error is found then a log error message will also be produced.
596    #[must_use]
597    pub fn check_integrity(&mut self) -> bool {
598        let mut error_count = 0;
599        let failure = "Integrity failure";
600
601        // Get current timestamp in microseconds
602        let timestamp_us = SystemTime::now()
603            .duration_since(UNIX_EPOCH)
604            .expect("Time went backwards")
605            .as_micros();
606
607        log::info!("Checking data integrity");
608
609        // Check object caches
610        for account_id in self.accounts.keys() {
611            if !self
612                .index
613                .venue_account
614                .contains_key(&account_id.get_issuer())
615            {
616                log::error!(
617                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
618                );
619                error_count += 1;
620            }
621        }
622
623        for (client_order_id, order) in &self.orders {
624            if !self.index.order_strategy.contains_key(client_order_id) {
625                log::error!(
626                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
627                );
628                error_count += 1;
629            }
630            if !self.index.orders.contains(client_order_id) {
631                log::error!(
632                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
633                );
634                error_count += 1;
635            }
636            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
637                log::error!(
638                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
639                );
640                error_count += 1;
641            }
642            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
643                log::error!(
644                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
645                );
646                error_count += 1;
647            }
648            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
649                log::error!(
650                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
651                );
652                error_count += 1;
653            }
654            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
655                if !self
656                    .index
657                    .exec_algorithm_orders
658                    .contains_key(&exec_algorithm_id)
659                {
660                    log::error!(
661                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
662                    );
663                    error_count += 1;
664                }
665                if order.exec_spawn_id().is_none()
666                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
667                {
668                    log::error!(
669                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
670                    );
671                    error_count += 1;
672                }
673            }
674        }
675
676        for (position_id, position) in &self.positions {
677            if !self.index.position_strategy.contains_key(position_id) {
678                log::error!(
679                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
680                );
681                error_count += 1;
682            }
683            if !self.index.position_orders.contains_key(position_id) {
684                log::error!(
685                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
686                );
687                error_count += 1;
688            }
689            if !self.index.positions.contains(position_id) {
690                log::error!(
691                    "{failure} in positions: {position_id} not found in `self.index.positions`",
692                );
693                error_count += 1;
694            }
695            if position.is_open() && !self.index.positions_open.contains(position_id) {
696                log::error!(
697                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
698                );
699                error_count += 1;
700            }
701            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
702                log::error!(
703                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
704                );
705                error_count += 1;
706            }
707        }
708
709        // Check indexes
710        for account_id in self.index.venue_account.values() {
711            if !self.accounts.contains_key(account_id) {
712                log::error!(
713                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
714                );
715                error_count += 1;
716            }
717        }
718
719        for client_order_id in self.index.venue_order_ids.values() {
720            if !self.orders.contains_key(client_order_id) {
721                log::error!(
722                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
723                );
724                error_count += 1;
725            }
726        }
727
728        for client_order_id in self.index.client_order_ids.keys() {
729            if !self.orders.contains_key(client_order_id) {
730                log::error!(
731                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
732                );
733                error_count += 1;
734            }
735        }
736
737        for client_order_id in self.index.order_position.keys() {
738            if !self.orders.contains_key(client_order_id) {
739                log::error!(
740                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
741                );
742                error_count += 1;
743            }
744        }
745
746        // Check indexes
747        for client_order_id in self.index.order_strategy.keys() {
748            if !self.orders.contains_key(client_order_id) {
749                log::error!(
750                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
751                );
752                error_count += 1;
753            }
754        }
755
756        for position_id in self.index.position_strategy.keys() {
757            if !self.positions.contains_key(position_id) {
758                log::error!(
759                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
760                );
761                error_count += 1;
762            }
763        }
764
765        for position_id in self.index.position_orders.keys() {
766            if !self.positions.contains_key(position_id) {
767                log::error!(
768                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
769                );
770                error_count += 1;
771            }
772        }
773
774        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
775            for client_order_id in client_order_ids {
776                if !self.orders.contains_key(client_order_id) {
777                    log::error!(
778                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
779                    );
780                    error_count += 1;
781                }
782            }
783        }
784
785        for instrument_id in self.index.instrument_positions.keys() {
786            if !self.index.instrument_orders.contains_key(instrument_id) {
787                log::error!(
788                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
789                );
790                error_count += 1;
791            }
792        }
793
794        for client_order_ids in self.index.strategy_orders.values() {
795            for client_order_id in client_order_ids {
796                if !self.orders.contains_key(client_order_id) {
797                    log::error!(
798                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
799                    );
800                    error_count += 1;
801                }
802            }
803        }
804
805        for position_ids in self.index.strategy_positions.values() {
806            for position_id in position_ids {
807                if !self.positions.contains_key(position_id) {
808                    log::error!(
809                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
810                    );
811                    error_count += 1;
812                }
813            }
814        }
815
816        for client_order_id in &self.index.orders {
817            if !self.orders.contains_key(client_order_id) {
818                log::error!(
819                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
820                );
821                error_count += 1;
822            }
823        }
824
825        for client_order_id in &self.index.orders_emulated {
826            if !self.orders.contains_key(client_order_id) {
827                log::error!(
828                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
829                );
830                error_count += 1;
831            }
832        }
833
834        for client_order_id in &self.index.orders_inflight {
835            if !self.orders.contains_key(client_order_id) {
836                log::error!(
837                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
838                );
839                error_count += 1;
840            }
841        }
842
843        for client_order_id in &self.index.orders_open {
844            if !self.orders.contains_key(client_order_id) {
845                log::error!(
846                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
847                );
848                error_count += 1;
849            }
850        }
851
852        for client_order_id in &self.index.orders_closed {
853            if !self.orders.contains_key(client_order_id) {
854                log::error!(
855                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
856                );
857                error_count += 1;
858            }
859        }
860
861        for position_id in &self.index.positions {
862            if !self.positions.contains_key(position_id) {
863                log::error!(
864                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
865                );
866                error_count += 1;
867            }
868        }
869
870        for position_id in &self.index.positions_open {
871            if !self.positions.contains_key(position_id) {
872                log::error!(
873                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
874                );
875                error_count += 1;
876            }
877        }
878
879        for position_id in &self.index.positions_closed {
880            if !self.positions.contains_key(position_id) {
881                log::error!(
882                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
883                );
884                error_count += 1;
885            }
886        }
887
888        for strategy_id in &self.index.strategies {
889            if !self.index.strategy_orders.contains_key(strategy_id) {
890                log::error!(
891                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
892                );
893                error_count += 1;
894            }
895        }
896
897        for exec_algorithm_id in &self.index.exec_algorithms {
898            if !self
899                .index
900                .exec_algorithm_orders
901                .contains_key(exec_algorithm_id)
902            {
903                log::error!(
904                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
905                );
906                error_count += 1;
907            }
908        }
909
910        let total_us = SystemTime::now()
911            .duration_since(UNIX_EPOCH)
912            .expect("Time went backwards")
913            .as_micros()
914            - timestamp_us;
915
916        if error_count == 0 {
917            log::info!("Integrity check passed in {total_us}μs");
918            true
919        } else {
920            log::error!(
921                "Integrity check failed with {error_count} error{} in {total_us}μs",
922                if error_count == 1 { "" } else { "s" },
923            );
924            false
925        }
926    }
927
928    /// Checks for any residual open state and log warnings if any are found.
929    ///
930    ///'Open state' is considered to be open orders and open positions.
931    #[must_use]
932    pub fn check_residuals(&self) -> bool {
933        log::debug!("Checking residuals");
934
935        let mut residuals = false;
936
937        // Check for any open orders
938        for order in self.orders_open(None, None, None, None) {
939            residuals = true;
940            log::warn!("Residual {order:?}");
941        }
942
943        // Check for any open positions
944        for position in self.positions_open(None, None, None, None) {
945            residuals = true;
946            log::warn!("Residual {position}");
947        }
948
949        residuals
950    }
951
952    /// Clears the caches index.
953    pub fn clear_index(&mut self) {
954        self.index.clear();
955        log::debug!("Cleared index");
956    }
957
958    /// Resets the cache.
959    ///
960    /// All stateful fields are reset to their initial value.
961    pub fn reset(&mut self) {
962        log::debug!("Resetting cache");
963
964        self.general.clear();
965        self.quotes.clear();
966        self.trades.clear();
967        self.books.clear();
968        self.bars.clear();
969        self.currencies.clear();
970        self.instruments.clear();
971        self.synthetics.clear();
972        self.accounts.clear();
973        self.orders.clear();
974        self.order_lists.clear();
975        self.positions.clear();
976        self.position_snapshots.clear();
977
978        self.clear_index();
979
980        log::info!("Reset cache");
981    }
982
983    /// Dispose of the cache which will close any underlying database adapter.
984    pub fn dispose(&mut self) {
985        if let Some(database) = &mut self.database {
986            database.close().expect("Failed to close database");
987        }
988    }
989
990    /// Flushes the caches database which permanently removes all persisted data.
991    pub fn flush_db(&mut self) {
992        if let Some(database) = &mut self.database {
993            database.flush().expect("Failed to flush database");
994        }
995    }
996
997    /// Adds a general object `value` (as bytes) to the cache at the given `key`.
998    ///
999    /// The cache is agnostic to what the bytes actually represent (and how it may be serialized),
1000    /// which provides maximum flexibility.
1001    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1002        check_valid_string(key, stringify!(key)).expect(FAILED);
1003        check_predicate_false(value.is_empty(), stringify!(value)).expect(FAILED);
1004
1005        log::debug!("Adding general {key}");
1006        self.general.insert(key.to_string(), value.clone());
1007
1008        if let Some(database) = &mut self.database {
1009            database.add(key.to_string(), value)?;
1010        }
1011        Ok(())
1012    }
1013
1014    /// Adds the given order `book` to the cache.
1015    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1016        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1017
1018        if self.config.save_market_data {
1019            if let Some(database) = &mut self.database {
1020                database.add_order_book(&book)?;
1021            }
1022        }
1023
1024        self.books.insert(book.instrument_id, book);
1025        Ok(())
1026    }
1027
1028    /// Adds the given `quote` tick to the cache.
1029    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1030        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1031
1032        if self.config.save_market_data {
1033            if let Some(database) = &mut self.database {
1034                database.add_quote(&quote)?;
1035            }
1036        }
1037
1038        let quotes_deque = self
1039            .quotes
1040            .entry(quote.instrument_id)
1041            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1042        quotes_deque.push_front(quote);
1043        Ok(())
1044    }
1045
1046    /// Adds the given `quotes` to the cache.
1047    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1048        check_slice_not_empty(quotes, stringify!(quotes)).unwrap();
1049
1050        let instrument_id = quotes[0].instrument_id;
1051        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1052
1053        if self.config.save_market_data {
1054            if let Some(database) = &mut self.database {
1055                for quote in quotes {
1056                    database.add_quote(quote).unwrap();
1057                }
1058            }
1059        }
1060
1061        let quotes_deque = self
1062            .quotes
1063            .entry(instrument_id)
1064            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1065
1066        for quote in quotes {
1067            quotes_deque.push_front(*quote);
1068        }
1069        Ok(())
1070    }
1071
1072    /// Adds the given `trade` tick to the cache.
1073    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1074        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1075
1076        if self.config.save_market_data {
1077            if let Some(database) = &mut self.database {
1078                database.add_trade(&trade)?;
1079            }
1080        }
1081
1082        let trades_deque = self
1083            .trades
1084            .entry(trade.instrument_id)
1085            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1086        trades_deque.push_front(trade);
1087        Ok(())
1088    }
1089
1090    /// Adds the give `trades` to the cache.
1091    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1092        check_slice_not_empty(trades, stringify!(trades)).unwrap();
1093
1094        let instrument_id = trades[0].instrument_id;
1095        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1096
1097        if self.config.save_market_data {
1098            if let Some(database) = &mut self.database {
1099                for trade in trades {
1100                    database.add_trade(trade).unwrap();
1101                }
1102            }
1103        }
1104
1105        let trades_deque = self
1106            .trades
1107            .entry(instrument_id)
1108            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1109
1110        for trade in trades {
1111            trades_deque.push_front(*trade);
1112        }
1113        Ok(())
1114    }
1115
1116    /// Adds the given `bar` to the cache.
1117    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1118        log::debug!("Adding `Bar` {}", bar.bar_type);
1119
1120        if self.config.save_market_data {
1121            if let Some(database) = &mut self.database {
1122                database.add_bar(&bar)?;
1123            }
1124        }
1125
1126        let bars = self
1127            .bars
1128            .entry(bar.bar_type)
1129            .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1130        bars.push_front(bar);
1131        Ok(())
1132    }
1133
1134    /// Adds the given `bars` to the cache.
1135    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1136        check_slice_not_empty(bars, stringify!(bars)).unwrap();
1137
1138        let bar_type = bars[0].bar_type;
1139        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1140
1141        if self.config.save_market_data {
1142            if let Some(database) = &mut self.database {
1143                for bar in bars {
1144                    database.add_bar(bar).unwrap();
1145                }
1146            }
1147        }
1148
1149        let bars_deque = self
1150            .bars
1151            .entry(bar_type)
1152            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1153
1154        for bar in bars {
1155            bars_deque.push_front(*bar);
1156        }
1157        Ok(())
1158    }
1159
1160    /// Adds the given `currency` to the cache.
1161    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1162        log::debug!("Adding `Currency` {}", currency.code);
1163
1164        if let Some(database) = &mut self.database {
1165            database.add_currency(&currency)?;
1166        }
1167
1168        self.currencies.insert(currency.code, currency);
1169        Ok(())
1170    }
1171
1172    /// Adds the given `instrument` to the cache.
1173    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1174        log::debug!("Adding `Instrument` {}", instrument.id());
1175
1176        if let Some(database) = &mut self.database {
1177            database.add_instrument(&instrument)?;
1178        }
1179
1180        self.instruments.insert(instrument.id(), instrument);
1181        Ok(())
1182    }
1183
1184    /// Adds the given `synthetic` instrument to the cache.
1185    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1186        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1187
1188        if let Some(database) = &mut self.database {
1189            database.add_synthetic(&synthetic)?;
1190        }
1191
1192        self.synthetics.insert(synthetic.id, synthetic);
1193        Ok(())
1194    }
1195
1196    /// Adds the given `account` to the cache.
1197    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1198        log::debug!("Adding `Account` {}", account.id());
1199
1200        if let Some(database) = &mut self.database {
1201            database.add_account(&account)?;
1202        }
1203
1204        let account_id = account.id();
1205        self.accounts.insert(account_id, account);
1206        self.index
1207            .venue_account
1208            .insert(account_id.get_issuer(), account_id);
1209        Ok(())
1210    }
1211
1212    /// Indexes the given `client_order_id` with the given `venue_order_id`.
1213    ///
1214    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
1215    pub fn add_venue_order_id(
1216        &mut self,
1217        client_order_id: &ClientOrderId,
1218        venue_order_id: &VenueOrderId,
1219        overwrite: bool,
1220    ) -> anyhow::Result<()> {
1221        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id) {
1222            if !overwrite && existing_venue_order_id != venue_order_id {
1223                anyhow::bail!(
1224                    "Existing {existing_venue_order_id} for {client_order_id}
1225                    did not match the given {venue_order_id}.
1226                    If you are writing a test then try a different `venue_order_id`,
1227                    otherwise this is probably a bug."
1228                );
1229            }
1230        }
1231
1232        self.index
1233            .client_order_ids
1234            .insert(*client_order_id, *venue_order_id);
1235        self.index
1236            .venue_order_ids
1237            .insert(*venue_order_id, *client_order_id);
1238
1239        Ok(())
1240    }
1241
1242    /// Adds the given `order` to the cache indexed with any given identifiers.
1243    ///
1244    /// # Parameters
1245    ///
1246    /// `override_existing`: If the added order should 'override' any existing order and replace
1247    /// it in the cache. This is currently used for emulated orders which are
1248    /// being released and transformed into another type.
1249    ///
1250    /// # Errors
1251    ///
1252    /// This function returns an error:
1253    /// If not `replace_existing` and the `order.client_order_id` is already contained in the cache.
1254    pub fn add_order(
1255        &mut self,
1256        order: OrderAny,
1257        position_id: Option<PositionId>,
1258        client_id: Option<ClientId>,
1259        replace_existing: bool,
1260    ) -> anyhow::Result<()> {
1261        let instrument_id = order.instrument_id();
1262        let venue = instrument_id.venue;
1263        let client_order_id = order.client_order_id();
1264        let strategy_id = order.strategy_id();
1265        let exec_algorithm_id = order.exec_algorithm_id();
1266        let exec_spawn_id = order.exec_spawn_id();
1267
1268        if !replace_existing {
1269            check_key_not_in_map(
1270                &client_order_id,
1271                &self.orders,
1272                stringify!(client_order_id),
1273                stringify!(orders),
1274            )
1275            .expect(FAILED);
1276            check_key_not_in_map(
1277                &client_order_id,
1278                &self.orders,
1279                stringify!(client_order_id),
1280                stringify!(orders),
1281            )
1282            .expect(FAILED);
1283            check_key_not_in_map(
1284                &client_order_id,
1285                &self.orders,
1286                stringify!(client_order_id),
1287                stringify!(orders),
1288            )
1289            .expect(FAILED);
1290            check_key_not_in_map(
1291                &client_order_id,
1292                &self.orders,
1293                stringify!(client_order_id),
1294                stringify!(orders),
1295            )
1296            .expect(FAILED);
1297        }
1298
1299        log::debug!("Adding {order:?}");
1300
1301        self.index.orders.insert(client_order_id);
1302        self.index
1303            .order_strategy
1304            .insert(client_order_id, strategy_id);
1305        self.index.strategies.insert(strategy_id);
1306
1307        // Update venue -> orders index
1308        self.index
1309            .venue_orders
1310            .entry(venue)
1311            .or_default()
1312            .insert(client_order_id);
1313
1314        // Update instrument -> orders index
1315        self.index
1316            .instrument_orders
1317            .entry(instrument_id)
1318            .or_default()
1319            .insert(client_order_id);
1320
1321        // Update strategy -> orders index
1322        self.index
1323            .strategy_orders
1324            .entry(strategy_id)
1325            .or_default()
1326            .insert(client_order_id);
1327
1328        // Update exec_algorithm -> orders index
1329        if let Some(exec_algorithm_id) = exec_algorithm_id {
1330            self.index.exec_algorithms.insert(exec_algorithm_id);
1331
1332            self.index
1333                .exec_algorithm_orders
1334                .entry(exec_algorithm_id)
1335                .or_default()
1336                .insert(client_order_id);
1337
1338            self.index
1339                .exec_spawn_orders
1340                .entry(exec_spawn_id.expect("`exec_spawn_id` is guaranteed to exist"))
1341                .or_default()
1342                .insert(client_order_id);
1343        }
1344
1345        // Update emulation index
1346        match order.emulation_trigger() {
1347            Some(_) => {
1348                self.index.orders_emulated.remove(&client_order_id);
1349            }
1350            None => {
1351                self.index.orders_emulated.insert(client_order_id);
1352            }
1353        }
1354
1355        // Index position ID if provided
1356        if let Some(position_id) = position_id {
1357            self.add_position_id(
1358                &position_id,
1359                &order.instrument_id().venue,
1360                &client_order_id,
1361                &strategy_id,
1362            )?;
1363        }
1364
1365        // Index client ID if provided
1366        if let Some(client_id) = client_id {
1367            self.index.order_client.insert(client_order_id, client_id);
1368            log::debug!("Indexed {client_id:?}");
1369        }
1370
1371        if let Some(database) = &mut self.database {
1372            database.add_order(&order, client_id)?;
1373            // TODO: Implement
1374            // if self.config.snapshot_orders {
1375            //     database.snapshot_order_state(order)?;
1376            // }
1377        }
1378
1379        self.orders.insert(client_order_id, order);
1380
1381        Ok(())
1382    }
1383
1384    /// Indexes the given `position_id` with the other given IDs.
1385    pub fn add_position_id(
1386        &mut self,
1387        position_id: &PositionId,
1388        venue: &Venue,
1389        client_order_id: &ClientOrderId,
1390        strategy_id: &StrategyId,
1391    ) -> anyhow::Result<()> {
1392        self.index
1393            .order_position
1394            .insert(*client_order_id, *position_id);
1395
1396        // Index: ClientOrderId -> PositionId
1397        if let Some(database) = &mut self.database {
1398            database.index_order_position(*client_order_id, *position_id)?;
1399        }
1400
1401        // Index: PositionId -> StrategyId
1402        self.index
1403            .position_strategy
1404            .insert(*position_id, *strategy_id);
1405
1406        // Index: PositionId -> set[ClientOrderId]
1407        self.index
1408            .position_orders
1409            .entry(*position_id)
1410            .or_default()
1411            .insert(*client_order_id);
1412
1413        // Index: StrategyId -> set[PositionId]
1414        self.index
1415            .strategy_positions
1416            .entry(*strategy_id)
1417            .or_default()
1418            .insert(*position_id);
1419
1420        Ok(())
1421    }
1422
1423    /// Adds the given `position` to the cache.
1424    pub fn add_position(&mut self, position: Position, oms_type: OmsType) -> anyhow::Result<()> {
1425        self.positions.insert(position.id, position.clone());
1426        self.index.positions.insert(position.id);
1427        self.index.positions_open.insert(position.id);
1428
1429        log::debug!("Adding {position}");
1430
1431        self.add_position_id(
1432            &position.id,
1433            &position.instrument_id.venue,
1434            &position.opening_order_id,
1435            &position.strategy_id,
1436        )?;
1437
1438        let venue = position.instrument_id.venue;
1439        let venue_positions = self.index.venue_positions.entry(venue).or_default();
1440        venue_positions.insert(position.id);
1441
1442        // Index: InstrumentId -> HashSet
1443        let instrument_id = position.instrument_id;
1444        let instrument_positions = self
1445            .index
1446            .instrument_positions
1447            .entry(instrument_id)
1448            .or_default();
1449        instrument_positions.insert(position.id);
1450
1451        if let Some(database) = &mut self.database {
1452            database.add_position(&position)?;
1453            // TODO: Implement position snapshots
1454            // if self.snapshot_positions {
1455            //     database.snapshot_position_state(
1456            //         position,
1457            //         position.ts_last,
1458            //         self.calculate_unrealized_pnl(&position),
1459            //     )?;
1460            // }
1461        }
1462
1463        Ok(())
1464    }
1465
1466    /// Updates the given `account` in the cache.
1467    pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1468        if let Some(database) = &mut self.database {
1469            database.update_account(&account)?;
1470        }
1471        Ok(())
1472    }
1473
1474    /// Updates the given `order` in the cache.
1475    pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1476        let client_order_id = order.client_order_id();
1477
1478        // Update venue order ID
1479        if let Some(venue_order_id) = order.venue_order_id() {
1480            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
1481            // venues which use a cancel+replace update strategy.
1482            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1483                // TODO: If the last event was `OrderUpdated` then overwrite should be true
1484                self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1485            }
1486        }
1487
1488        // Update in-flight state
1489        if order.is_inflight() {
1490            self.index.orders_inflight.insert(client_order_id);
1491        } else {
1492            self.index.orders_inflight.remove(&client_order_id);
1493        }
1494
1495        // Update open/closed state
1496        if order.is_open() {
1497            self.index.orders_closed.remove(&client_order_id);
1498            self.index.orders_open.insert(client_order_id);
1499        } else if order.is_closed() {
1500            self.index.orders_open.remove(&client_order_id);
1501            self.index.orders_pending_cancel.remove(&client_order_id);
1502            self.index.orders_closed.insert(client_order_id);
1503        }
1504
1505        // Update emulation
1506        if let Some(emulation_trigger) = order.emulation_trigger() {
1507            match emulation_trigger {
1508                TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1509                _ => self.index.orders_emulated.insert(client_order_id),
1510            };
1511        }
1512
1513        if let Some(database) = &mut self.database {
1514            database.update_order(order.last_event())?;
1515            // TODO: Implement order snapshots
1516            // if self.snapshot_orders {
1517            //     database.snapshot_order_state(order)?;
1518            // }
1519        }
1520
1521        // update the order in the cache
1522        self.orders.insert(client_order_id, order.clone());
1523
1524        Ok(())
1525    }
1526
1527    /// Updates the given `order` as pending cancel locally.
1528    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1529        self.index
1530            .orders_pending_cancel
1531            .insert(order.client_order_id());
1532    }
1533
1534    /// Updates the given `position` in the cache.
1535    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1536        // Update open/closed state
1537        if position.is_open() {
1538            self.index.positions_open.insert(position.id);
1539            self.index.positions_closed.remove(&position.id);
1540        } else {
1541            self.index.positions_closed.insert(position.id);
1542            self.index.positions_open.remove(&position.id);
1543        }
1544
1545        if let Some(database) = &mut self.database {
1546            database.update_position(position)?;
1547            // TODO: Implement order snapshots
1548            // if self.snapshot_orders {
1549            //     database.snapshot_order_state(order)?;
1550            // }
1551        }
1552        Ok(())
1553    }
1554
1555    /// Creates a snapshot of the given position by cloning it, assigning a new ID,
1556    /// serializing it, and storing it in the position snapshots.
1557    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1558        let position_id = position.id;
1559
1560        let mut copied_position = position.clone();
1561        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1562        copied_position.id = PositionId::new(new_id);
1563
1564        // Serialize the position
1565        let position_serialized = bincode::serialize(&copied_position)?;
1566
1567        let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1568        let new_snapshots = match snapshots {
1569            Some(existing_snapshots) => {
1570                let mut combined = existing_snapshots.to_vec();
1571                combined.extend(position_serialized);
1572                Bytes::from(combined)
1573            }
1574            None => Bytes::from(position_serialized),
1575        };
1576        self.position_snapshots.insert(position_id, new_snapshots);
1577
1578        log::debug!("Snapshot {}", copied_position);
1579        Ok(())
1580    }
1581
1582    pub fn snapshot_position_state(
1583        &mut self,
1584        position: &Position,
1585        // ts_snapshot: u64,
1586        // unrealized_pnl: Option<Money>,
1587        open_only: Option<bool>,
1588    ) -> anyhow::Result<()> {
1589        let open_only = open_only.unwrap_or(true);
1590
1591        if open_only && !position.is_open() {
1592            return Ok(());
1593        }
1594
1595        if let Some(database) = &mut self.database {
1596            database.snapshot_position_state(position).map_err(|e| {
1597                log::error!(
1598                    "Failed to snapshot position state for {}: {:?}",
1599                    position.id,
1600                    e
1601                );
1602                e
1603            })?;
1604        } else {
1605            log::warn!(
1606                "Cannot snapshot position state for {} (no database configured)",
1607                position.id
1608            );
1609        }
1610
1611        // Ok(())
1612        todo!()
1613    }
1614
1615    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1616        let database = if let Some(database) = &self.database {
1617            database
1618        } else {
1619            log::warn!(
1620                "Cannot snapshot order state for {} (no database configured)",
1621                order.client_order_id()
1622            );
1623            return Ok(());
1624        };
1625
1626        database.snapshot_order_state(order)
1627    }
1628
1629    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
1630
1631    fn build_order_query_filter_set(
1632        &self,
1633        venue: Option<&Venue>,
1634        instrument_id: Option<&InstrumentId>,
1635        strategy_id: Option<&StrategyId>,
1636    ) -> Option<HashSet<ClientOrderId>> {
1637        let mut query: Option<HashSet<ClientOrderId>> = None;
1638
1639        if let Some(venue) = venue {
1640            query = Some(
1641                self.index
1642                    .venue_orders
1643                    .get(venue)
1644                    .map_or(HashSet::new(), |o| o.iter().copied().collect()),
1645            );
1646        }
1647
1648        if let Some(instrument_id) = instrument_id {
1649            let instrument_orders = self
1650                .index
1651                .instrument_orders
1652                .get(instrument_id)
1653                .map_or(HashSet::new(), |o| o.iter().copied().collect());
1654
1655            if let Some(existing_query) = &mut query {
1656                *existing_query = existing_query
1657                    .intersection(&instrument_orders)
1658                    .copied()
1659                    .collect();
1660            } else {
1661                query = Some(instrument_orders);
1662            }
1663        }
1664
1665        if let Some(strategy_id) = strategy_id {
1666            let strategy_orders = self
1667                .index
1668                .strategy_orders
1669                .get(strategy_id)
1670                .map_or(HashSet::new(), |o| o.iter().copied().collect());
1671
1672            if let Some(existing_query) = &mut query {
1673                *existing_query = existing_query
1674                    .intersection(&strategy_orders)
1675                    .copied()
1676                    .collect();
1677            } else {
1678                query = Some(strategy_orders);
1679            }
1680        }
1681
1682        query
1683    }
1684
1685    fn build_position_query_filter_set(
1686        &self,
1687        venue: Option<&Venue>,
1688        instrument_id: Option<&InstrumentId>,
1689        strategy_id: Option<&StrategyId>,
1690    ) -> Option<HashSet<PositionId>> {
1691        let mut query: Option<HashSet<PositionId>> = None;
1692
1693        if let Some(venue) = venue {
1694            query = Some(
1695                self.index
1696                    .venue_positions
1697                    .get(venue)
1698                    .map_or(HashSet::new(), |p| p.iter().copied().collect()),
1699            );
1700        }
1701
1702        if let Some(instrument_id) = instrument_id {
1703            let instrument_positions = self
1704                .index
1705                .instrument_positions
1706                .get(instrument_id)
1707                .map_or(HashSet::new(), |p| p.iter().copied().collect());
1708
1709            if let Some(existing_query) = query {
1710                query = Some(
1711                    existing_query
1712                        .intersection(&instrument_positions)
1713                        .copied()
1714                        .collect(),
1715                );
1716            } else {
1717                query = Some(instrument_positions);
1718            }
1719        }
1720
1721        if let Some(strategy_id) = strategy_id {
1722            let strategy_positions = self
1723                .index
1724                .strategy_positions
1725                .get(strategy_id)
1726                .map_or(HashSet::new(), |p| p.iter().copied().collect());
1727
1728            if let Some(existing_query) = query {
1729                query = Some(
1730                    existing_query
1731                        .intersection(&strategy_positions)
1732                        .copied()
1733                        .collect(),
1734                );
1735            } else {
1736                query = Some(strategy_positions);
1737            }
1738        }
1739
1740        query
1741    }
1742
1743    fn get_orders_for_ids(
1744        &self,
1745        client_order_ids: &HashSet<ClientOrderId>,
1746        side: Option<OrderSide>,
1747    ) -> Vec<&OrderAny> {
1748        let side = side.unwrap_or(OrderSide::NoOrderSide);
1749        let mut orders = Vec::new();
1750
1751        for client_order_id in client_order_ids {
1752            let order = self
1753                .orders
1754                .get(client_order_id)
1755                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
1756            if side == OrderSide::NoOrderSide || side == order.order_side() {
1757                orders.push(order);
1758            }
1759        }
1760
1761        orders
1762    }
1763
1764    fn get_positions_for_ids(
1765        &self,
1766        position_ids: &HashSet<PositionId>,
1767        side: Option<PositionSide>,
1768    ) -> Vec<&Position> {
1769        let side = side.unwrap_or(PositionSide::NoPositionSide);
1770        let mut positions = Vec::new();
1771
1772        for position_id in position_ids {
1773            let position = self
1774                .positions
1775                .get(position_id)
1776                .unwrap_or_else(|| panic!("Position {position_id} not found"));
1777            if side == PositionSide::NoPositionSide || side == position.side {
1778                positions.push(position);
1779            }
1780        }
1781
1782        positions
1783    }
1784
1785    /// Returns the `ClientOrderId`s of all orders.
1786    #[must_use]
1787    pub fn client_order_ids(
1788        &self,
1789        venue: Option<&Venue>,
1790        instrument_id: Option<&InstrumentId>,
1791        strategy_id: Option<&StrategyId>,
1792    ) -> HashSet<ClientOrderId> {
1793        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1794        match query {
1795            Some(query) => self.index.orders.intersection(&query).copied().collect(),
1796            None => self.index.orders.clone(),
1797        }
1798    }
1799
1800    /// Returns the `ClientOrderId`s of all open orders.
1801    #[must_use]
1802    pub fn client_order_ids_open(
1803        &self,
1804        venue: Option<&Venue>,
1805        instrument_id: Option<&InstrumentId>,
1806        strategy_id: Option<&StrategyId>,
1807    ) -> HashSet<ClientOrderId> {
1808        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1809        match query {
1810            Some(query) => self
1811                .index
1812                .orders_open
1813                .intersection(&query)
1814                .copied()
1815                .collect(),
1816            None => self.index.orders_open.clone(),
1817        }
1818    }
1819
1820    /// Returns the `ClientOrderId`s of all closed orders.
1821    #[must_use]
1822    pub fn client_order_ids_closed(
1823        &self,
1824        venue: Option<&Venue>,
1825        instrument_id: Option<&InstrumentId>,
1826        strategy_id: Option<&StrategyId>,
1827    ) -> HashSet<ClientOrderId> {
1828        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1829        match query {
1830            Some(query) => self
1831                .index
1832                .orders_closed
1833                .intersection(&query)
1834                .copied()
1835                .collect(),
1836            None => self.index.orders_closed.clone(),
1837        }
1838    }
1839
1840    /// Returns the `ClientOrderId`s of all emulated orders.
1841    #[must_use]
1842    pub fn client_order_ids_emulated(
1843        &self,
1844        venue: Option<&Venue>,
1845        instrument_id: Option<&InstrumentId>,
1846        strategy_id: Option<&StrategyId>,
1847    ) -> HashSet<ClientOrderId> {
1848        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1849        match query {
1850            Some(query) => self
1851                .index
1852                .orders_emulated
1853                .intersection(&query)
1854                .copied()
1855                .collect(),
1856            None => self.index.orders_emulated.clone(),
1857        }
1858    }
1859
1860    /// Returns the `ClientOrderId`s of all in-flight orders.
1861    #[must_use]
1862    pub fn client_order_ids_inflight(
1863        &self,
1864        venue: Option<&Venue>,
1865        instrument_id: Option<&InstrumentId>,
1866        strategy_id: Option<&StrategyId>,
1867    ) -> HashSet<ClientOrderId> {
1868        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
1869        match query {
1870            Some(query) => self
1871                .index
1872                .orders_inflight
1873                .intersection(&query)
1874                .copied()
1875                .collect(),
1876            None => self.index.orders_inflight.clone(),
1877        }
1878    }
1879
1880    /// Returns `PositionId`s of all positions.
1881    #[must_use]
1882    pub fn position_ids(
1883        &self,
1884        venue: Option<&Venue>,
1885        instrument_id: Option<&InstrumentId>,
1886        strategy_id: Option<&StrategyId>,
1887    ) -> HashSet<PositionId> {
1888        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
1889        match query {
1890            Some(query) => self.index.positions.intersection(&query).copied().collect(),
1891            None => self.index.positions.clone(),
1892        }
1893    }
1894
1895    /// Returns the `PositionId`s of all open positions.
1896    #[must_use]
1897    pub fn position_open_ids(
1898        &self,
1899        venue: Option<&Venue>,
1900        instrument_id: Option<&InstrumentId>,
1901        strategy_id: Option<&StrategyId>,
1902    ) -> HashSet<PositionId> {
1903        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
1904        match query {
1905            Some(query) => self
1906                .index
1907                .positions_open
1908                .intersection(&query)
1909                .copied()
1910                .collect(),
1911            None => self.index.positions_open.clone(),
1912        }
1913    }
1914
1915    /// Returns the `PositionId`s of all closed positions.
1916    #[must_use]
1917    pub fn position_closed_ids(
1918        &self,
1919        venue: Option<&Venue>,
1920        instrument_id: Option<&InstrumentId>,
1921        strategy_id: Option<&StrategyId>,
1922    ) -> HashSet<PositionId> {
1923        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
1924        match query {
1925            Some(query) => self
1926                .index
1927                .positions_closed
1928                .intersection(&query)
1929                .copied()
1930                .collect(),
1931            None => self.index.positions_closed.clone(),
1932        }
1933    }
1934
1935    /// Returns the `ComponentId`s of all actors.
1936    #[must_use]
1937    pub fn actor_ids(&self) -> HashSet<ComponentId> {
1938        self.index.actors.clone()
1939    }
1940
1941    /// Returns the `StrategyId`s of all strategies.
1942    #[must_use]
1943    pub fn strategy_ids(&self) -> HashSet<StrategyId> {
1944        self.index.strategies.clone()
1945    }
1946
1947    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
1948    #[must_use]
1949    pub fn exec_algorithm_ids(&self) -> HashSet<ExecAlgorithmId> {
1950        self.index.exec_algorithms.clone()
1951    }
1952
1953    // -- ORDER QUERIES ---------------------------------------------------------------------------
1954
1955    /// Gets a reference to the order with the given `client_order_id` (if found).
1956    #[must_use]
1957    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
1958        self.orders.get(client_order_id)
1959    }
1960
1961    /// Gets a reference to the order with the given `client_order_id` (if found).
1962    #[must_use]
1963    pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
1964        self.orders.get_mut(client_order_id)
1965    }
1966
1967    /// Gets a reference to the client order ID for given `venue_order_id` (if found).
1968    #[must_use]
1969    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
1970        self.index.venue_order_ids.get(venue_order_id)
1971    }
1972
1973    /// Gets a reference to the venue order ID for given `client_order_id` (if found).
1974    #[must_use]
1975    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
1976        self.index.client_order_ids.get(client_order_id)
1977    }
1978
1979    /// Gets a reference to the client ID indexed for given `client_order_id` (if found).
1980    #[must_use]
1981    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
1982        self.index.order_client.get(client_order_id)
1983    }
1984
1985    /// Returns references to all orders matching the given optional filter parameters.
1986    #[must_use]
1987    pub fn orders(
1988        &self,
1989        venue: Option<&Venue>,
1990        instrument_id: Option<&InstrumentId>,
1991        strategy_id: Option<&StrategyId>,
1992        side: Option<OrderSide>,
1993    ) -> Vec<&OrderAny> {
1994        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
1995        self.get_orders_for_ids(&client_order_ids, side)
1996    }
1997
1998    /// Returns references to all open orders matching the given optional filter parameters.
1999    #[must_use]
2000    pub fn orders_open(
2001        &self,
2002        venue: Option<&Venue>,
2003        instrument_id: Option<&InstrumentId>,
2004        strategy_id: Option<&StrategyId>,
2005        side: Option<OrderSide>,
2006    ) -> Vec<&OrderAny> {
2007        let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2008        self.get_orders_for_ids(&client_order_ids, side)
2009    }
2010
2011    /// Returns references to all closed orders matching the given optional filter parameters.
2012    #[must_use]
2013    pub fn orders_closed(
2014        &self,
2015        venue: Option<&Venue>,
2016        instrument_id: Option<&InstrumentId>,
2017        strategy_id: Option<&StrategyId>,
2018        side: Option<OrderSide>,
2019    ) -> Vec<&OrderAny> {
2020        let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2021        self.get_orders_for_ids(&client_order_ids, side)
2022    }
2023
2024    /// Returns references to all emulated orders matching the given optional filter parameters.
2025    #[must_use]
2026    pub fn orders_emulated(
2027        &self,
2028        venue: Option<&Venue>,
2029        instrument_id: Option<&InstrumentId>,
2030        strategy_id: Option<&StrategyId>,
2031        side: Option<OrderSide>,
2032    ) -> Vec<&OrderAny> {
2033        let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2034        self.get_orders_for_ids(&client_order_ids, side)
2035    }
2036
2037    /// Returns references to all in-flight orders matching the given optional filter parameters.
2038    #[must_use]
2039    pub fn orders_inflight(
2040        &self,
2041        venue: Option<&Venue>,
2042        instrument_id: Option<&InstrumentId>,
2043        strategy_id: Option<&StrategyId>,
2044        side: Option<OrderSide>,
2045    ) -> Vec<&OrderAny> {
2046        let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2047        self.get_orders_for_ids(&client_order_ids, side)
2048    }
2049
2050    /// Returns references to all orders for the given `position_id`.
2051    #[must_use]
2052    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2053        let client_order_ids = self.index.position_orders.get(position_id);
2054        match client_order_ids {
2055            Some(client_order_ids) => {
2056                self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2057            }
2058            None => Vec::new(),
2059        }
2060    }
2061
2062    /// Returns whether an order with the given `client_order_id` exists.
2063    #[must_use]
2064    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2065        self.index.orders.contains(client_order_id)
2066    }
2067
2068    /// Returns whether an order with the given `client_order_id` is open.
2069    #[must_use]
2070    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2071        self.index.orders_open.contains(client_order_id)
2072    }
2073
2074    /// Returns whether an order with the given `client_order_id` is closed.
2075    #[must_use]
2076    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2077        self.index.orders_closed.contains(client_order_id)
2078    }
2079
2080    /// Returns whether an order with the given `client_order_id` is emulated.
2081    #[must_use]
2082    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2083        self.index.orders_emulated.contains(client_order_id)
2084    }
2085
2086    /// Returns whether an order with the given `client_order_id` is in-flight.
2087    #[must_use]
2088    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2089        self.index.orders_inflight.contains(client_order_id)
2090    }
2091
2092    /// Returns whether an order with the given `client_order_id` is `PENDING_CANCEL` locally.
2093    #[must_use]
2094    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2095        self.index.orders_pending_cancel.contains(client_order_id)
2096    }
2097
2098    /// Returns the count of all open orders.
2099    #[must_use]
2100    pub fn orders_open_count(
2101        &self,
2102        venue: Option<&Venue>,
2103        instrument_id: Option<&InstrumentId>,
2104        strategy_id: Option<&StrategyId>,
2105        side: Option<OrderSide>,
2106    ) -> usize {
2107        self.orders_open(venue, instrument_id, strategy_id, side)
2108            .len()
2109    }
2110
2111    /// Returns the count of all closed orders.
2112    #[must_use]
2113    pub fn orders_closed_count(
2114        &self,
2115        venue: Option<&Venue>,
2116        instrument_id: Option<&InstrumentId>,
2117        strategy_id: Option<&StrategyId>,
2118        side: Option<OrderSide>,
2119    ) -> usize {
2120        self.orders_closed(venue, instrument_id, strategy_id, side)
2121            .len()
2122    }
2123
2124    /// Returns the count of all emulated orders.
2125    #[must_use]
2126    pub fn orders_emulated_count(
2127        &self,
2128        venue: Option<&Venue>,
2129        instrument_id: Option<&InstrumentId>,
2130        strategy_id: Option<&StrategyId>,
2131        side: Option<OrderSide>,
2132    ) -> usize {
2133        self.orders_emulated(venue, instrument_id, strategy_id, side)
2134            .len()
2135    }
2136
2137    /// Returns the count of all in-flight orders.
2138    #[must_use]
2139    pub fn orders_inflight_count(
2140        &self,
2141        venue: Option<&Venue>,
2142        instrument_id: Option<&InstrumentId>,
2143        strategy_id: Option<&StrategyId>,
2144        side: Option<OrderSide>,
2145    ) -> usize {
2146        self.orders_inflight(venue, instrument_id, strategy_id, side)
2147            .len()
2148    }
2149
2150    /// Returns the count of all orders.
2151    #[must_use]
2152    pub fn orders_total_count(
2153        &self,
2154        venue: Option<&Venue>,
2155        instrument_id: Option<&InstrumentId>,
2156        strategy_id: Option<&StrategyId>,
2157        side: Option<OrderSide>,
2158    ) -> usize {
2159        self.orders(venue, instrument_id, strategy_id, side).len()
2160    }
2161
2162    /// Returns the order list for the given `order_list_id`.
2163    #[must_use]
2164    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2165        self.order_lists.get(order_list_id)
2166    }
2167
2168    /// Returns all order lists matching the given optional filter parameters.
2169    #[must_use]
2170    pub fn order_lists(
2171        &self,
2172        venue: Option<&Venue>,
2173        instrument_id: Option<&InstrumentId>,
2174        strategy_id: Option<&StrategyId>,
2175    ) -> Vec<&OrderList> {
2176        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2177
2178        if let Some(venue) = venue {
2179            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2180        }
2181
2182        if let Some(instrument_id) = instrument_id {
2183            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2184        }
2185
2186        if let Some(strategy_id) = strategy_id {
2187            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2188        }
2189
2190        order_lists
2191    }
2192
2193    /// Returns whether an order list with the given `order_list_id` exists.
2194    #[must_use]
2195    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2196        self.order_lists.contains_key(order_list_id)
2197    }
2198
2199    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
2200
2201    /// Returns references to all orders associated with the given `exec_algorithm_id` matching the given
2202    /// optional filter parameters.
2203    #[must_use]
2204    pub fn orders_for_exec_algorithm(
2205        &self,
2206        exec_algorithm_id: &ExecAlgorithmId,
2207        venue: Option<&Venue>,
2208        instrument_id: Option<&InstrumentId>,
2209        strategy_id: Option<&StrategyId>,
2210        side: Option<OrderSide>,
2211    ) -> Vec<&OrderAny> {
2212        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2213        let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2214
2215        if let Some(query) = query {
2216            if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2217                let exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2218            }
2219        }
2220
2221        if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2222            self.get_orders_for_ids(exec_algorithm_order_ids, side)
2223        } else {
2224            Vec::new()
2225        }
2226    }
2227
2228    /// Returns references to all orders with the given `exec_spawn_id`.
2229    #[must_use]
2230    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2231        self.get_orders_for_ids(
2232            self.index
2233                .exec_spawn_orders
2234                .get(exec_spawn_id)
2235                .unwrap_or(&HashSet::new()),
2236            None,
2237        )
2238    }
2239
2240    /// Returns the total order quantity for the given `exec_spawn_id`.
2241    #[must_use]
2242    pub fn exec_spawn_total_quantity(
2243        &self,
2244        exec_spawn_id: &ClientOrderId,
2245        active_only: bool,
2246    ) -> Option<Quantity> {
2247        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2248
2249        let mut total_quantity: Option<Quantity> = None;
2250
2251        for spawn_order in exec_spawn_orders {
2252            if !active_only || !spawn_order.is_closed() {
2253                if let Some(mut total_quantity) = total_quantity {
2254                    total_quantity += spawn_order.quantity();
2255                }
2256            } else {
2257                total_quantity = Some(spawn_order.quantity());
2258            }
2259        }
2260
2261        total_quantity
2262    }
2263
2264    /// Returns the total filled quantity for all orders with the given `exec_spawn_id`.
2265    #[must_use]
2266    pub fn exec_spawn_total_filled_qty(
2267        &self,
2268        exec_spawn_id: &ClientOrderId,
2269        active_only: bool,
2270    ) -> Option<Quantity> {
2271        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2272
2273        let mut total_quantity: Option<Quantity> = None;
2274
2275        for spawn_order in exec_spawn_orders {
2276            if !active_only || !spawn_order.is_closed() {
2277                if let Some(mut total_quantity) = total_quantity {
2278                    total_quantity += spawn_order.filled_qty();
2279                }
2280            } else {
2281                total_quantity = Some(spawn_order.filled_qty());
2282            }
2283        }
2284
2285        total_quantity
2286    }
2287
2288    /// Returns the total leaves quantity for all orders with the given `exec_spawn_id`.
2289    #[must_use]
2290    pub fn exec_spawn_total_leaves_qty(
2291        &self,
2292        exec_spawn_id: &ClientOrderId,
2293        active_only: bool,
2294    ) -> Option<Quantity> {
2295        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2296
2297        let mut total_quantity: Option<Quantity> = None;
2298
2299        for spawn_order in exec_spawn_orders {
2300            if !active_only || !spawn_order.is_closed() {
2301                if let Some(mut total_quantity) = total_quantity {
2302                    total_quantity += spawn_order.leaves_qty();
2303                }
2304            } else {
2305                total_quantity = Some(spawn_order.leaves_qty());
2306            }
2307        }
2308
2309        total_quantity
2310    }
2311
2312    // -- POSITION QUERIES ------------------------------------------------------------------------
2313
2314    /// Returns a reference to the position with the given `position_id` (if found).
2315    #[must_use]
2316    pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2317        self.positions.get(position_id)
2318    }
2319
2320    /// Returns a reference to the position for the given `client_order_id` (if found).
2321    #[must_use]
2322    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2323        self.index
2324            .order_position
2325            .get(client_order_id)
2326            .and_then(|position_id| self.positions.get(position_id))
2327    }
2328
2329    /// Returns a reference to the position ID for the given `client_order_id` (if found).
2330    #[must_use]
2331    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2332        self.index.order_position.get(client_order_id)
2333    }
2334
2335    /// Returns a reference to all positions matching the given optional filter parameters.
2336    #[must_use]
2337    pub fn positions(
2338        &self,
2339        venue: Option<&Venue>,
2340        instrument_id: Option<&InstrumentId>,
2341        strategy_id: Option<&StrategyId>,
2342        side: Option<PositionSide>,
2343    ) -> Vec<&Position> {
2344        let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2345        self.get_positions_for_ids(&position_ids, side)
2346    }
2347
2348    /// Returns a reference to all open positions matching the given optional filter parameters.
2349    #[must_use]
2350    pub fn positions_open(
2351        &self,
2352        venue: Option<&Venue>,
2353        instrument_id: Option<&InstrumentId>,
2354        strategy_id: Option<&StrategyId>,
2355        side: Option<PositionSide>,
2356    ) -> Vec<&Position> {
2357        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2358        self.get_positions_for_ids(&position_ids, side)
2359    }
2360
2361    /// Returns a reference to all closed positions matching the given optional filter parameters.
2362    #[must_use]
2363    pub fn positions_closed(
2364        &self,
2365        venue: Option<&Venue>,
2366        instrument_id: Option<&InstrumentId>,
2367        strategy_id: Option<&StrategyId>,
2368        side: Option<PositionSide>,
2369    ) -> Vec<&Position> {
2370        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2371        self.get_positions_for_ids(&position_ids, side)
2372    }
2373
2374    /// Returns whether a position with the given `position_id` exists.
2375    #[must_use]
2376    pub fn position_exists(&self, position_id: &PositionId) -> bool {
2377        self.index.positions.contains(position_id)
2378    }
2379
2380    /// Returns whether a position with the given `position_id` is open.
2381    #[must_use]
2382    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2383        self.index.positions_open.contains(position_id)
2384    }
2385
2386    /// Returns whether a position with the given `position_id` is closed.
2387    #[must_use]
2388    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2389        self.index.positions_closed.contains(position_id)
2390    }
2391
2392    /// Returns the count of all open positions.
2393    #[must_use]
2394    pub fn positions_open_count(
2395        &self,
2396        venue: Option<&Venue>,
2397        instrument_id: Option<&InstrumentId>,
2398        strategy_id: Option<&StrategyId>,
2399        side: Option<PositionSide>,
2400    ) -> usize {
2401        self.positions_open(venue, instrument_id, strategy_id, side)
2402            .len()
2403    }
2404
2405    /// Returns the count of all closed positions.
2406    #[must_use]
2407    pub fn positions_closed_count(
2408        &self,
2409        venue: Option<&Venue>,
2410        instrument_id: Option<&InstrumentId>,
2411        strategy_id: Option<&StrategyId>,
2412        side: Option<PositionSide>,
2413    ) -> usize {
2414        self.positions_closed(venue, instrument_id, strategy_id, side)
2415            .len()
2416    }
2417
2418    /// Returns the count of all positions.
2419    #[must_use]
2420    pub fn positions_total_count(
2421        &self,
2422        venue: Option<&Venue>,
2423        instrument_id: Option<&InstrumentId>,
2424        strategy_id: Option<&StrategyId>,
2425        side: Option<PositionSide>,
2426    ) -> usize {
2427        self.positions(venue, instrument_id, strategy_id, side)
2428            .len()
2429    }
2430
2431    // -- STRATEGY QUERIES ------------------------------------------------------------------------
2432
2433    /// Gets a reference to the strategy ID for the given `client_order_id` (if found).
2434    #[must_use]
2435    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2436        self.index.order_strategy.get(client_order_id)
2437    }
2438
2439    /// Gets a reference to the strategy ID for the given `position_id` (if found).
2440    #[must_use]
2441    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2442        self.index.position_strategy.get(position_id)
2443    }
2444
2445    // -- GENERAL ---------------------------------------------------------------------------------
2446
2447    /// Gets a reference to the general object value for the given `key` (if found).
2448    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2449        check_valid_string(key, stringify!(key)).expect(FAILED);
2450
2451        Ok(self.general.get(key))
2452    }
2453
2454    // -- DATA QUERIES ----------------------------------------------------------------------------
2455
2456    /// Returns the price for the given `instrument_id` and `price_type` (if found).
2457    #[must_use]
2458    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2459        match price_type {
2460            PriceType::Bid => self
2461                .quotes
2462                .get(instrument_id)
2463                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2464            PriceType::Ask => self
2465                .quotes
2466                .get(instrument_id)
2467                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2468            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2469                quotes.front().map(|quote| {
2470                    Price::new(
2471                        (quote.ask_price.as_f64() + quote.bid_price.as_f64()) / 2.0,
2472                        quote.bid_price.precision + 1,
2473                    )
2474                })
2475            }),
2476            PriceType::Last => self
2477                .trades
2478                .get(instrument_id)
2479                .and_then(|trades| trades.front().map(|trade| trade.price)),
2480        }
2481    }
2482
2483    /// Gets all quotes for the given `instrument_id`.
2484    #[must_use]
2485    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2486        self.quotes
2487            .get(instrument_id)
2488            .map(|quotes| quotes.iter().copied().collect())
2489    }
2490
2491    /// Gets all trades for the given `instrument_id`.
2492    #[must_use]
2493    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2494        self.trades
2495            .get(instrument_id)
2496            .map(|trades| trades.iter().copied().collect())
2497    }
2498
2499    /// Gets all bars for the given `bar_type`.
2500    #[must_use]
2501    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2502        self.bars
2503            .get(bar_type)
2504            .map(|bars| bars.iter().copied().collect())
2505    }
2506
2507    /// Gets a reference to the order book for the given `instrument_id`.
2508    #[must_use]
2509    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
2510        self.books.get(instrument_id)
2511    }
2512
2513    /// Gets a reference to the order book for the given `instrument_id`.
2514    #[must_use]
2515    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
2516        self.books.get_mut(instrument_id)
2517    }
2518
2519    /// Gets a reference to the latest quote tick for the given `instrument_id`.
2520    #[must_use]
2521    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
2522        self.quotes
2523            .get(instrument_id)
2524            .and_then(|quotes| quotes.front())
2525    }
2526
2527    /// Gets a refernece to the latest trade tick for the given `instrument_id`.
2528    #[must_use]
2529    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
2530        self.trades
2531            .get(instrument_id)
2532            .and_then(|trades| trades.front())
2533    }
2534
2535    /// Gets a reference to the latest bar for the given `bar_type`.
2536    #[must_use]
2537    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
2538        self.bars.get(bar_type).and_then(|bars| bars.front())
2539    }
2540
2541    /// Gets the order book update count for the given `instrument_id`.
2542    #[must_use]
2543    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
2544        self.books.get(instrument_id).map_or(0, |book| book.count) as usize
2545    }
2546
2547    /// Gets the quote tick count for the given `instrument_id`.
2548    #[must_use]
2549    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
2550        self.quotes
2551            .get(instrument_id)
2552            .map_or(0, std::collections::VecDeque::len)
2553    }
2554
2555    /// Gets the trade tick count for the given `instrument_id`.
2556    #[must_use]
2557    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
2558        self.trades
2559            .get(instrument_id)
2560            .map_or(0, std::collections::VecDeque::len)
2561    }
2562
2563    /// Gets the bar count for the given `instrument_id`.
2564    #[must_use]
2565    pub fn bar_count(&self, bar_type: &BarType) -> usize {
2566        self.bars
2567            .get(bar_type)
2568            .map_or(0, std::collections::VecDeque::len)
2569    }
2570
2571    /// Returns whether the cache contains an order book for the given `instrument_id`.
2572    #[must_use]
2573    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
2574        self.books.contains_key(instrument_id)
2575    }
2576
2577    /// Returns whether the cache contains quotes for the given `instrument_id`.
2578    #[must_use]
2579    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
2580        self.quote_count(instrument_id) > 0
2581    }
2582
2583    /// Returns whether the cache contains trades for the given `instrument_id`.
2584    #[must_use]
2585    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
2586        self.trade_count(instrument_id) > 0
2587    }
2588
2589    /// Returns whether the cache contains bars for the given `bar_type`.
2590    #[must_use]
2591    pub fn has_bars(&self, bar_type: &BarType) -> bool {
2592        self.bar_count(bar_type) > 0
2593    }
2594
2595    #[must_use]
2596    pub fn get_xrate(
2597        &self,
2598        venue: Venue,
2599        from_currency: Currency,
2600        to_currency: Currency,
2601        price_type: PriceType,
2602    ) -> Decimal {
2603        if from_currency == to_currency {
2604            return Decimal::ONE;
2605        }
2606
2607        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
2608
2609        get_exchange_rate(from_currency, to_currency, price_type, bid_quote, ask_quote)
2610    }
2611
2612    fn build_quote_table(
2613        &self,
2614        venue: &Venue,
2615    ) -> (HashMap<Symbol, Decimal>, HashMap<Symbol, Decimal>) {
2616        let mut bid_quotes = HashMap::new();
2617        let mut ask_quotes = HashMap::new();
2618
2619        for instrument_id in self.instruments.keys() {
2620            if instrument_id.venue != *venue {
2621                continue;
2622            }
2623
2624            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
2625                if let Some(tick) = ticks.front() {
2626                    (tick.bid_price, tick.ask_price)
2627                } else {
2628                    continue; // Empty ticks vector
2629                }
2630            } else {
2631                let bid_bar = self
2632                    .bars
2633                    .iter()
2634                    .find(|(k, _)| {
2635                        k.instrument_id() == *instrument_id
2636                            && matches!(k.spec().price_type, PriceType::Bid)
2637                    })
2638                    .map(|(_, v)| v);
2639
2640                let ask_bar = self
2641                    .bars
2642                    .iter()
2643                    .find(|(k, _)| {
2644                        k.instrument_id() == *instrument_id
2645                            && matches!(k.spec().price_type, PriceType::Ask)
2646                    })
2647                    .map(|(_, v)| v);
2648
2649                match (bid_bar, ask_bar) {
2650                    (Some(bid), Some(ask)) => {
2651                        let bid_price = bid.front().unwrap().close;
2652                        let ask_price = ask.front().unwrap().close;
2653
2654                        (bid_price, ask_price)
2655                    }
2656                    _ => continue,
2657                }
2658            };
2659
2660            bid_quotes.insert(instrument_id.symbol, bid_price.as_decimal());
2661            ask_quotes.insert(instrument_id.symbol, ask_price.as_decimal());
2662        }
2663
2664        (bid_quotes, ask_quotes)
2665    }
2666
2667    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
2668
2669    /// Returns a reference to the instrument for the given `instrument_id` (if found).
2670    #[must_use]
2671    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
2672        self.instruments.get(instrument_id)
2673    }
2674
2675    /// Returns references to all instrument IDs for the given `venue`.
2676    #[must_use]
2677    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
2678        self.instruments
2679            .keys()
2680            .filter(|i| venue.is_none() || &i.venue == venue.unwrap())
2681            .collect()
2682    }
2683
2684    /// Returns references to all instruments for the given `venue`.
2685    #[must_use]
2686    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
2687        self.instruments
2688            .values()
2689            .filter(|i| &i.id().venue == venue)
2690            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(u)))
2691            .collect()
2692    }
2693
2694    /// Returns references to all bar types contained in the cache.
2695    #[must_use]
2696    pub fn bar_types(
2697        &self,
2698        instrument_id: Option<&InstrumentId>,
2699        price_type: Option<&PriceType>,
2700        aggregation_source: AggregationSource,
2701    ) -> Vec<&BarType> {
2702        let mut bar_types = self
2703            .bars
2704            .keys()
2705            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
2706            .collect::<Vec<&BarType>>();
2707
2708        if let Some(instrument_id) = instrument_id {
2709            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
2710        }
2711
2712        if let Some(price_type) = price_type {
2713            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
2714        }
2715
2716        bar_types
2717    }
2718
2719    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
2720
2721    /// Returns a reference to the synthetic instrument for the given `instrument_id` (if found).
2722    #[must_use]
2723    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
2724        self.synthetics.get(instrument_id)
2725    }
2726
2727    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
2728    #[must_use]
2729    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
2730        self.synthetics.keys().collect()
2731    }
2732
2733    /// Returns references to all synthetic instruments contained in the cache.
2734    #[must_use]
2735    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
2736        self.synthetics.values().collect()
2737    }
2738
2739    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
2740
2741    /// Returns a reference to the account for the given `account_id` (if found).
2742    #[must_use]
2743    pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
2744        self.accounts.get(account_id)
2745    }
2746
2747    /// Returns a reference to the account for the given `venue` (if found).
2748    #[must_use]
2749    pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
2750        self.index
2751            .venue_account
2752            .get(venue)
2753            .and_then(|account_id| self.accounts.get(account_id))
2754    }
2755
2756    /// Returns a reference to the account ID for the given `venue` (if found).
2757    #[must_use]
2758    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
2759        self.index.venue_account.get(venue)
2760    }
2761
2762    /// Returns references to all accounts for the given `account_id`.
2763    #[must_use]
2764    pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
2765        self.accounts
2766            .values()
2767            .filter(|account| &account.id() == account_id)
2768            .collect()
2769    }
2770}