#![allow(dead_code)]
#![allow(unused_variables)]
pub mod database;
#[cfg(test)]
mod tests;
use std::{
collections::{HashMap, HashSet, VecDeque},
time::{SystemTime, UNIX_EPOCH},
};
use bytes::Bytes;
use database::CacheDatabaseAdapter;
use nautilus_core::{
correctness::{
check_key_not_in_map, check_predicate_false, check_slice_not_empty, check_valid_string,
FAILED,
},
UUID4,
};
use nautilus_model::{
accounts::AccountAny,
data::{Bar, BarType, QuoteTick, TradeTick},
enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
identifiers::{
AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
OrderListId, PositionId, StrategyId, Symbol, Venue, VenueOrderId,
},
instruments::{InstrumentAny, SyntheticInstrument},
orderbook::OrderBook,
orders::{OrderAny, OrderList},
position::Position,
types::{Currency, Money, Price, Quantity},
};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use ustr::Ustr;
use crate::{
enums::SerializationEncoding, msgbus::database::DatabaseConfig, xrate::get_exchange_rate,
};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct CacheConfig {
pub database: Option<DatabaseConfig>,
pub encoding: SerializationEncoding,
pub timestamps_as_iso8601: bool,
pub buffer_interval_ms: Option<usize>,
pub use_trader_prefix: bool,
pub use_instance_id: bool,
pub flush_on_start: bool,
pub drop_instruments_on_reset: bool,
pub tick_capacity: usize,
pub bar_capacity: usize,
pub save_market_data: bool,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
database: None,
encoding: SerializationEncoding::MsgPack,
timestamps_as_iso8601: false,
buffer_interval_ms: None,
use_trader_prefix: true,
use_instance_id: false,
flush_on_start: false,
drop_instruments_on_reset: true,
tick_capacity: 10_000,
bar_capacity: 10_000,
save_market_data: false,
}
}
}
impl CacheConfig {
#[allow(clippy::too_many_arguments)]
#[must_use]
pub const fn new(
database: Option<DatabaseConfig>,
encoding: SerializationEncoding,
timestamps_as_iso8601: bool,
buffer_interval_ms: Option<usize>,
use_trader_prefix: bool,
use_instance_id: bool,
flush_on_start: bool,
drop_instruments_on_reset: bool,
tick_capacity: usize,
bar_capacity: usize,
save_market_data: bool,
) -> Self {
Self {
database,
encoding,
timestamps_as_iso8601,
buffer_interval_ms,
use_trader_prefix,
use_instance_id,
flush_on_start,
drop_instruments_on_reset,
tick_capacity,
bar_capacity,
save_market_data,
}
}
}
pub struct CacheIndex {
venue_account: HashMap<Venue, AccountId>,
venue_orders: HashMap<Venue, HashSet<ClientOrderId>>,
venue_positions: HashMap<Venue, HashSet<PositionId>>,
venue_order_ids: HashMap<VenueOrderId, ClientOrderId>,
client_order_ids: HashMap<ClientOrderId, VenueOrderId>,
order_position: HashMap<ClientOrderId, PositionId>,
order_strategy: HashMap<ClientOrderId, StrategyId>,
order_client: HashMap<ClientOrderId, ClientId>,
position_strategy: HashMap<PositionId, StrategyId>,
position_orders: HashMap<PositionId, HashSet<ClientOrderId>>,
instrument_orders: HashMap<InstrumentId, HashSet<ClientOrderId>>,
instrument_positions: HashMap<InstrumentId, HashSet<PositionId>>,
strategy_orders: HashMap<StrategyId, HashSet<ClientOrderId>>,
strategy_positions: HashMap<StrategyId, HashSet<PositionId>>,
exec_algorithm_orders: HashMap<ExecAlgorithmId, HashSet<ClientOrderId>>,
exec_spawn_orders: HashMap<ClientOrderId, HashSet<ClientOrderId>>,
orders: HashSet<ClientOrderId>,
orders_open: HashSet<ClientOrderId>,
orders_closed: HashSet<ClientOrderId>,
orders_emulated: HashSet<ClientOrderId>,
orders_inflight: HashSet<ClientOrderId>,
orders_pending_cancel: HashSet<ClientOrderId>,
positions: HashSet<PositionId>,
positions_open: HashSet<PositionId>,
positions_closed: HashSet<PositionId>,
actors: HashSet<ComponentId>,
strategies: HashSet<StrategyId>,
exec_algorithms: HashSet<ExecAlgorithmId>,
}
impl CacheIndex {
pub fn clear(&mut self) {
self.venue_account.clear();
self.venue_orders.clear();
self.venue_positions.clear();
self.venue_order_ids.clear();
self.client_order_ids.clear();
self.order_position.clear();
self.order_strategy.clear();
self.order_client.clear();
self.position_strategy.clear();
self.position_orders.clear();
self.instrument_orders.clear();
self.instrument_positions.clear();
self.strategy_orders.clear();
self.strategy_positions.clear();
self.exec_algorithm_orders.clear();
self.exec_spawn_orders.clear();
self.orders.clear();
self.orders_open.clear();
self.orders_closed.clear();
self.orders_emulated.clear();
self.orders_inflight.clear();
self.orders_pending_cancel.clear();
self.positions.clear();
self.positions_open.clear();
self.positions_closed.clear();
self.actors.clear();
self.strategies.clear();
self.exec_algorithms.clear();
}
}
pub struct Cache {
config: CacheConfig,
index: CacheIndex,
database: Option<Box<dyn CacheDatabaseAdapter>>,
general: HashMap<String, Bytes>,
quotes: HashMap<InstrumentId, VecDeque<QuoteTick>>,
trades: HashMap<InstrumentId, VecDeque<TradeTick>>,
books: HashMap<InstrumentId, OrderBook>,
bars: HashMap<BarType, VecDeque<Bar>>,
currencies: HashMap<Ustr, Currency>,
instruments: HashMap<InstrumentId, InstrumentAny>,
synthetics: HashMap<InstrumentId, SyntheticInstrument>,
accounts: HashMap<AccountId, AccountAny>,
orders: HashMap<ClientOrderId, OrderAny>,
order_lists: HashMap<OrderListId, OrderList>,
pub positions: HashMap<PositionId, Position>,
position_snapshots: HashMap<PositionId, Bytes>,
}
unsafe impl Send for Cache {}
unsafe impl Sync for Cache {}
impl Default for Cache {
fn default() -> Self {
Self::new(Some(CacheConfig::default()), None)
}
}
impl Cache {
#[must_use]
pub fn new(
config: Option<CacheConfig>,
database: Option<Box<dyn CacheDatabaseAdapter>>,
) -> Self {
let index = CacheIndex {
venue_account: HashMap::new(),
venue_orders: HashMap::new(),
venue_positions: HashMap::new(),
venue_order_ids: HashMap::new(),
client_order_ids: HashMap::new(),
order_position: HashMap::new(),
order_strategy: HashMap::new(),
order_client: HashMap::new(),
position_strategy: HashMap::new(),
position_orders: HashMap::new(),
instrument_orders: HashMap::new(),
instrument_positions: HashMap::new(),
strategy_orders: HashMap::new(),
strategy_positions: HashMap::new(),
exec_algorithm_orders: HashMap::new(),
exec_spawn_orders: HashMap::new(),
orders: HashSet::new(),
orders_open: HashSet::new(),
orders_closed: HashSet::new(),
orders_emulated: HashSet::new(),
orders_inflight: HashSet::new(),
orders_pending_cancel: HashSet::new(),
positions: HashSet::new(),
positions_open: HashSet::new(),
positions_closed: HashSet::new(),
actors: HashSet::new(),
strategies: HashSet::new(),
exec_algorithms: HashSet::new(),
};
Self {
config: config.unwrap_or_default(),
index,
database,
general: HashMap::new(),
quotes: HashMap::new(),
trades: HashMap::new(),
books: HashMap::new(),
bars: HashMap::new(),
currencies: HashMap::new(),
instruments: HashMap::new(),
synthetics: HashMap::new(),
accounts: HashMap::new(),
orders: HashMap::new(),
order_lists: HashMap::new(),
positions: HashMap::new(),
position_snapshots: HashMap::new(),
}
}
#[must_use]
pub fn memory_address(&self) -> String {
format!("{:?}", std::ptr::from_ref(self))
}
pub fn cache_general(&mut self) -> anyhow::Result<()> {
self.general = match &mut self.database {
Some(db) => db.load()?,
None => HashMap::new(),
};
log::info!(
"Cached {} general object(s) from database",
self.general.len()
);
Ok(())
}
pub fn cache_currencies(&mut self) -> anyhow::Result<()> {
self.currencies = match &mut self.database {
Some(db) => db.load_currencies()?,
None => HashMap::new(),
};
log::info!("Cached {} currencies from database", self.general.len());
Ok(())
}
pub fn cache_instruments(&mut self) -> anyhow::Result<()> {
self.instruments = match &mut self.database {
Some(db) => db.load_instruments()?,
None => HashMap::new(),
};
log::info!("Cached {} instruments from database", self.general.len());
Ok(())
}
pub fn cache_synthetics(&mut self) -> anyhow::Result<()> {
self.synthetics = match &mut self.database {
Some(db) => db.load_synthetics()?,
None => HashMap::new(),
};
log::info!(
"Cached {} synthetic instruments from database",
self.general.len()
);
Ok(())
}
pub fn cache_accounts(&mut self) -> anyhow::Result<()> {
self.accounts = match &mut self.database {
Some(db) => db.load_accounts()?,
None => HashMap::new(),
};
log::info!(
"Cached {} synthetic instruments from database",
self.general.len()
);
Ok(())
}
pub fn cache_orders(&mut self) -> anyhow::Result<()> {
self.orders = match &mut self.database {
Some(db) => db.load_orders()?,
None => HashMap::new(),
};
log::info!("Cached {} orders from database", self.general.len());
Ok(())
}
pub fn cache_positions(&mut self) -> anyhow::Result<()> {
self.positions = match &mut self.database {
Some(db) => db.load_positions()?,
None => HashMap::new(),
};
log::info!("Cached {} positions from database", self.general.len());
Ok(())
}
pub fn build_index(&mut self) {
self.index.clear();
log::debug!("Building index");
for account_id in self.accounts.keys() {
self.index
.venue_account
.insert(account_id.get_issuer(), *account_id);
}
for (client_order_id, order) in &self.orders {
let instrument_id = order.instrument_id();
let venue = instrument_id.venue;
let strategy_id = order.strategy_id();
self.index
.venue_orders
.entry(venue)
.or_default()
.insert(*client_order_id);
if let Some(venue_order_id) = order.venue_order_id() {
self.index
.venue_order_ids
.insert(venue_order_id, *client_order_id);
}
if let Some(position_id) = order.position_id() {
self.index
.order_position
.insert(*client_order_id, position_id);
}
self.index
.order_strategy
.insert(*client_order_id, order.strategy_id());
self.index
.instrument_orders
.entry(instrument_id)
.or_default()
.insert(*client_order_id);
self.index
.strategy_orders
.entry(strategy_id)
.or_default()
.insert(*client_order_id);
if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
self.index
.exec_algorithm_orders
.entry(exec_algorithm_id)
.or_default()
.insert(*client_order_id);
}
if let Some(exec_spawn_id) = order.exec_spawn_id() {
self.index
.exec_spawn_orders
.entry(exec_spawn_id)
.or_default()
.insert(*client_order_id);
}
self.index.orders.insert(*client_order_id);
if order.is_open() {
self.index.orders_open.insert(*client_order_id);
}
if order.is_closed() {
self.index.orders_closed.insert(*client_order_id);
}
if let Some(emulation_trigger) = order.emulation_trigger() {
if emulation_trigger != TriggerType::NoTrigger && !order.is_closed() {
self.index.orders_emulated.insert(*client_order_id);
}
}
if order.is_inflight() {
self.index.orders_inflight.insert(*client_order_id);
}
self.index.strategies.insert(strategy_id);
if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
self.index.exec_algorithms.insert(exec_algorithm_id);
}
}
for (position_id, position) in &self.positions {
let instrument_id = position.instrument_id;
let venue = instrument_id.venue;
let strategy_id = position.strategy_id;
self.index
.venue_positions
.entry(venue)
.or_default()
.insert(*position_id);
self.index
.position_strategy
.insert(*position_id, position.strategy_id);
self.index
.position_orders
.entry(*position_id)
.or_default()
.extend(position.client_order_ids().into_iter());
self.index
.instrument_positions
.entry(instrument_id)
.or_default()
.insert(*position_id);
self.index
.strategy_positions
.entry(strategy_id)
.or_default()
.insert(*position_id);
self.index.positions.insert(*position_id);
if position.is_open() {
self.index.positions_open.insert(*position_id);
}
if position.is_closed() {
self.index.positions_closed.insert(*position_id);
}
self.index.strategies.insert(strategy_id);
}
}
#[must_use]
pub const fn has_backing(&self) -> bool {
self.config.database.is_some()
}
#[must_use]
pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
let quote = if let Some(quote) = self.quote(&position.instrument_id) {
quote
} else {
log::warn!(
"Cannot calculate unrealized PnL for {}, no quotes for {}",
position.id,
position.instrument_id
);
return None;
};
let last = match position.side {
PositionSide::Flat | PositionSide::NoPositionSide => {
return Some(Money::new(0.0, position.settlement_currency));
}
PositionSide::Long => quote.ask_price,
PositionSide::Short => quote.bid_price,
};
Some(position.unrealized_pnl(last))
}
#[must_use]
pub fn check_integrity(&mut self) -> bool {
let mut error_count = 0;
let failure = "Integrity failure";
let timestamp_us = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_micros();
log::info!("Checking data integrity");
for account_id in self.accounts.keys() {
if !self
.index
.venue_account
.contains_key(&account_id.get_issuer())
{
log::error!(
"{failure} in accounts: {account_id} not found in `self.index.venue_account`",
);
error_count += 1;
}
}
for (client_order_id, order) in &self.orders {
if !self.index.order_strategy.contains_key(client_order_id) {
log::error!(
"{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
);
error_count += 1;
}
if !self.index.orders.contains(client_order_id) {
log::error!(
"{failure} in orders: {client_order_id} not found in `self.index.orders`",
);
error_count += 1;
}
if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
log::error!(
"{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
);
error_count += 1;
}
if order.is_open() && !self.index.orders_open.contains(client_order_id) {
log::error!(
"{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
);
error_count += 1;
}
if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
log::error!(
"{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
);
error_count += 1;
}
if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
if !self
.index
.exec_algorithm_orders
.contains_key(&exec_algorithm_id)
{
log::error!(
"{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
);
error_count += 1;
}
if order.exec_spawn_id().is_none()
&& !self.index.exec_spawn_orders.contains_key(client_order_id)
{
log::error!(
"{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
);
error_count += 1;
}
}
}
for (position_id, position) in &self.positions {
if !self.index.position_strategy.contains_key(position_id) {
log::error!(
"{failure} in positions: {position_id} not found in `self.index.position_strategy`",
);
error_count += 1;
}
if !self.index.position_orders.contains_key(position_id) {
log::error!(
"{failure} in positions: {position_id} not found in `self.index.position_orders`",
);
error_count += 1;
}
if !self.index.positions.contains(position_id) {
log::error!(
"{failure} in positions: {position_id} not found in `self.index.positions`",
);
error_count += 1;
}
if position.is_open() && !self.index.positions_open.contains(position_id) {
log::error!(
"{failure} in positions: {position_id} not found in `self.index.positions_open`",
);
error_count += 1;
}
if position.is_closed() && !self.index.positions_closed.contains(position_id) {
log::error!(
"{failure} in positions: {position_id} not found in `self.index.positions_closed`",
);
error_count += 1;
}
}
for account_id in self.index.venue_account.values() {
if !self.accounts.contains_key(account_id) {
log::error!(
"{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
);
error_count += 1;
}
}
for client_order_id in self.index.venue_order_ids.values() {
if !self.orders.contains_key(client_order_id) {
log::error!(
"{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
);
error_count += 1;
}
}
for client_order_id in self.index.client_order_ids.keys() {
if !self.orders.contains_key(client_order_id) {
log::error!(
"{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
);
error_count += 1;
}
}
for client_order_id in self.index.order_position.keys() {
if !self.orders.contains_key(client_order_id) {
log::error!(
"{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
);
error_count += 1;
}
}
for client_order_id in self.index.order_strategy.keys() {
if !self.orders.contains_key(client_order_id) {
log::error!(
"{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
);
error_count += 1;
}
}
for position_id in self.index.position_strategy.keys() {
if !self.positions.contains_key(position_id) {
log::error!(
"{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
);
error_count += 1;
}
}
for position_id in self.index.position_orders.keys() {
if !self.positions.contains_key(position_id) {
log::error!(
"{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
);
error_count += 1;
}
}
for (instrument_id, client_order_ids) in &self.index.instrument_orders {
for client_order_id in client_order_ids {
if !self.orders.contains_key(client_order_id) {
log::error!(
"{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
);
error_count += 1;
}
}
}
for instrument_id in self.index.instrument_positions.keys() {
if !self.index.instrument_orders.contains_key(instrument_id) {
log::error!(
"{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
);
error_count += 1;
}
}
for client_order_ids in self.index.strategy_orders.values() {
for client_order_id in client_order_ids {
if !self.orders.contains_key(client_order_id) {
log::error!(
"{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
);
error_count += 1;
}
}
}
for position_ids in self.index.strategy_positions.values() {
for position_id in position_ids {
if !self.positions.contains_key(position_id) {
log::error!(
"{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
);
error_count += 1;
}
}
}
for client_order_id in &self.index.orders {
if !self.orders.contains_key(client_order_id) {
log::error!(
"{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
);
error_count += 1;
}
}
for client_order_id in &self.index.orders_emulated {
if !self.orders.contains_key(client_order_id) {
log::error!(
"{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
);
error_count += 1;
}
}
for client_order_id in &self.index.orders_inflight {
if !self.orders.contains_key(client_order_id) {
log::error!(
"{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
);
error_count += 1;
}
}
for client_order_id in &self.index.orders_open {
if !self.orders.contains_key(client_order_id) {
log::error!(
"{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
);
error_count += 1;
}
}
for client_order_id in &self.index.orders_closed {
if !self.orders.contains_key(client_order_id) {
log::error!(
"{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
);
error_count += 1;
}
}
for position_id in &self.index.positions {
if !self.positions.contains_key(position_id) {
log::error!(
"{failure} in `index.positions`: {position_id} not found in `self.positions`",
);
error_count += 1;
}
}
for position_id in &self.index.positions_open {
if !self.positions.contains_key(position_id) {
log::error!(
"{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
);
error_count += 1;
}
}
for position_id in &self.index.positions_closed {
if !self.positions.contains_key(position_id) {
log::error!(
"{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
);
error_count += 1;
}
}
for strategy_id in &self.index.strategies {
if !self.index.strategy_orders.contains_key(strategy_id) {
log::error!(
"{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
);
error_count += 1;
}
}
for exec_algorithm_id in &self.index.exec_algorithms {
if !self
.index
.exec_algorithm_orders
.contains_key(exec_algorithm_id)
{
log::error!(
"{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
);
error_count += 1;
}
}
let total_us = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_micros()
- timestamp_us;
if error_count == 0 {
log::info!("Integrity check passed in {total_us}μs");
true
} else {
log::error!(
"Integrity check failed with {error_count} error{} in {total_us}μs",
if error_count == 1 { "" } else { "s" },
);
false
}
}
#[must_use]
pub fn check_residuals(&self) -> bool {
log::debug!("Checking residuals");
let mut residuals = false;
for order in self.orders_open(None, None, None, None) {
residuals = true;
log::warn!("Residual {order:?}");
}
for position in self.positions_open(None, None, None, None) {
residuals = true;
log::warn!("Residual {position}");
}
residuals
}
pub fn clear_index(&mut self) {
self.index.clear();
log::debug!("Cleared index");
}
pub fn reset(&mut self) {
log::debug!("Resetting cache");
self.general.clear();
self.quotes.clear();
self.trades.clear();
self.books.clear();
self.bars.clear();
self.currencies.clear();
self.instruments.clear();
self.synthetics.clear();
self.accounts.clear();
self.orders.clear();
self.order_lists.clear();
self.positions.clear();
self.position_snapshots.clear();
self.clear_index();
log::info!("Reset cache");
}
pub fn dispose(&mut self) {
if let Some(database) = &mut self.database {
database.close().expect("Failed to close database");
}
}
pub fn flush_db(&mut self) {
if let Some(database) = &mut self.database {
database.flush().expect("Failed to flush database");
}
}
pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
check_valid_string(key, stringify!(key)).expect(FAILED);
check_predicate_false(value.is_empty(), stringify!(value)).expect(FAILED);
log::debug!("Adding general {key}");
self.general.insert(key.to_string(), value.clone());
if let Some(database) = &mut self.database {
database.add(key.to_string(), value)?;
}
Ok(())
}
pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
log::debug!("Adding `OrderBook` {}", book.instrument_id);
if self.config.save_market_data {
if let Some(database) = &mut self.database {
database.add_order_book(&book)?;
}
}
self.books.insert(book.instrument_id, book);
Ok(())
}
pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
if self.config.save_market_data {
if let Some(database) = &mut self.database {
database.add_quote("e)?;
}
}
let quotes_deque = self
.quotes
.entry(quote.instrument_id)
.or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
quotes_deque.push_front(quote);
Ok(())
}
pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
check_slice_not_empty(quotes, stringify!(quotes)).unwrap();
let instrument_id = quotes[0].instrument_id;
log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
if self.config.save_market_data {
if let Some(database) = &mut self.database {
for quote in quotes {
database.add_quote(quote).unwrap();
}
}
}
let quotes_deque = self
.quotes
.entry(instrument_id)
.or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
for quote in quotes {
quotes_deque.push_front(*quote);
}
Ok(())
}
pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
log::debug!("Adding `TradeTick` {}", trade.instrument_id);
if self.config.save_market_data {
if let Some(database) = &mut self.database {
database.add_trade(&trade)?;
}
}
let trades_deque = self
.trades
.entry(trade.instrument_id)
.or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
trades_deque.push_front(trade);
Ok(())
}
pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
check_slice_not_empty(trades, stringify!(trades)).unwrap();
let instrument_id = trades[0].instrument_id;
log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
if self.config.save_market_data {
if let Some(database) = &mut self.database {
for trade in trades {
database.add_trade(trade).unwrap();
}
}
}
let trades_deque = self
.trades
.entry(instrument_id)
.or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
for trade in trades {
trades_deque.push_front(*trade);
}
Ok(())
}
pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
log::debug!("Adding `Bar` {}", bar.bar_type);
if self.config.save_market_data {
if let Some(database) = &mut self.database {
database.add_bar(&bar)?;
}
}
let bars = self
.bars
.entry(bar.bar_type)
.or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
bars.push_front(bar);
Ok(())
}
pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
check_slice_not_empty(bars, stringify!(bars)).unwrap();
let bar_type = bars[0].bar_type;
log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
if self.config.save_market_data {
if let Some(database) = &mut self.database {
for bar in bars {
database.add_bar(bar).unwrap();
}
}
}
let bars_deque = self
.bars
.entry(bar_type)
.or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
for bar in bars {
bars_deque.push_front(*bar);
}
Ok(())
}
pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
log::debug!("Adding `Currency` {}", currency.code);
if let Some(database) = &mut self.database {
database.add_currency(¤cy)?;
}
self.currencies.insert(currency.code, currency);
Ok(())
}
pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
log::debug!("Adding `Instrument` {}", instrument.id());
if let Some(database) = &mut self.database {
database.add_instrument(&instrument)?;
}
self.instruments.insert(instrument.id(), instrument);
Ok(())
}
pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
if let Some(database) = &mut self.database {
database.add_synthetic(&synthetic)?;
}
self.synthetics.insert(synthetic.id, synthetic);
Ok(())
}
pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
log::debug!("Adding `Account` {}", account.id());
if let Some(database) = &mut self.database {
database.add_account(&account)?;
}
let account_id = account.id();
self.accounts.insert(account_id, account);
self.index
.venue_account
.insert(account_id.get_issuer(), account_id);
Ok(())
}
pub fn add_venue_order_id(
&mut self,
client_order_id: &ClientOrderId,
venue_order_id: &VenueOrderId,
overwrite: bool,
) -> anyhow::Result<()> {
if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id) {
if !overwrite && existing_venue_order_id != venue_order_id {
anyhow::bail!(
"Existing {existing_venue_order_id} for {client_order_id}
did not match the given {venue_order_id}.
If you are writing a test then try a different `venue_order_id`,
otherwise this is probably a bug."
);
}
};
self.index
.client_order_ids
.insert(*client_order_id, *venue_order_id);
self.index
.venue_order_ids
.insert(*venue_order_id, *client_order_id);
Ok(())
}
pub fn add_order(
&mut self,
order: OrderAny,
position_id: Option<PositionId>,
client_id: Option<ClientId>,
replace_existing: bool,
) -> anyhow::Result<()> {
let instrument_id = order.instrument_id();
let venue = instrument_id.venue;
let client_order_id = order.client_order_id();
let strategy_id = order.strategy_id();
let exec_algorithm_id = order.exec_algorithm_id();
let exec_spawn_id = order.exec_spawn_id();
if !replace_existing {
check_key_not_in_map(
&client_order_id,
&self.orders,
stringify!(client_order_id),
stringify!(orders),
)
.expect(FAILED);
check_key_not_in_map(
&client_order_id,
&self.orders,
stringify!(client_order_id),
stringify!(orders),
)
.expect(FAILED);
check_key_not_in_map(
&client_order_id,
&self.orders,
stringify!(client_order_id),
stringify!(orders),
)
.expect(FAILED);
check_key_not_in_map(
&client_order_id,
&self.orders,
stringify!(client_order_id),
stringify!(orders),
)
.expect(FAILED);
};
log::debug!("Adding {order:?}");
self.index.orders.insert(client_order_id);
self.index
.order_strategy
.insert(client_order_id, strategy_id);
self.index.strategies.insert(strategy_id);
self.index
.venue_orders
.entry(venue)
.or_default()
.insert(client_order_id);
self.index
.instrument_orders
.entry(instrument_id)
.or_default()
.insert(client_order_id);
self.index
.strategy_orders
.entry(strategy_id)
.or_default()
.insert(client_order_id);
if let Some(exec_algorithm_id) = exec_algorithm_id {
self.index.exec_algorithms.insert(exec_algorithm_id);
self.index
.exec_algorithm_orders
.entry(exec_algorithm_id)
.or_default()
.insert(client_order_id);
self.index
.exec_spawn_orders
.entry(exec_spawn_id.expect("`exec_spawn_id` is guaranteed to exist"))
.or_default()
.insert(client_order_id);
}
match order.emulation_trigger() {
Some(_) => {
self.index.orders_emulated.remove(&client_order_id);
}
None => {
self.index.orders_emulated.insert(client_order_id);
}
}
if let Some(position_id) = position_id {
self.add_position_id(
&position_id,
&order.instrument_id().venue,
&client_order_id,
&strategy_id,
)?;
}
if let Some(client_id) = client_id {
self.index.order_client.insert(client_order_id, client_id);
log::debug!("Indexed {client_id:?}");
}
if let Some(database) = &mut self.database {
database.add_order(&order, client_id)?;
}
self.orders.insert(client_order_id, order);
Ok(())
}
pub fn add_position_id(
&mut self,
position_id: &PositionId,
venue: &Venue,
client_order_id: &ClientOrderId,
strategy_id: &StrategyId,
) -> anyhow::Result<()> {
self.index
.order_position
.insert(*client_order_id, *position_id);
if let Some(database) = &mut self.database {
database.index_order_position(*client_order_id, *position_id)?;
}
self.index
.position_strategy
.insert(*position_id, *strategy_id);
self.index
.position_orders
.entry(*position_id)
.or_default()
.insert(*client_order_id);
self.index
.strategy_positions
.entry(*strategy_id)
.or_default()
.insert(*position_id);
Ok(())
}
pub fn add_position(&mut self, position: Position, oms_type: OmsType) -> anyhow::Result<()> {
self.positions.insert(position.id, position.clone());
self.index.positions.insert(position.id);
self.index.positions_open.insert(position.id);
log::debug!("Adding {position}");
self.add_position_id(
&position.id,
&position.instrument_id.venue,
&position.opening_order_id,
&position.strategy_id,
)?;
let venue = position.instrument_id.venue;
let venue_positions = self.index.venue_positions.entry(venue).or_default();
venue_positions.insert(position.id);
let instrument_id = position.instrument_id;
let instrument_positions = self
.index
.instrument_positions
.entry(instrument_id)
.or_default();
instrument_positions.insert(position.id);
if let Some(database) = &mut self.database {
database.add_position(&position)?;
}
Ok(())
}
pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
if let Some(database) = &mut self.database {
database.update_account(&account)?;
}
Ok(())
}
pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
let client_order_id = order.client_order_id();
if let Some(venue_order_id) = order.venue_order_id() {
if !self.index.venue_order_ids.contains_key(&venue_order_id) {
self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
};
}
if order.is_inflight() {
self.index.orders_inflight.insert(client_order_id);
} else {
self.index.orders_inflight.remove(&client_order_id);
}
if order.is_open() {
self.index.orders_closed.remove(&client_order_id);
self.index.orders_open.insert(client_order_id);
} else if order.is_closed() {
self.index.orders_open.remove(&client_order_id);
self.index.orders_pending_cancel.remove(&client_order_id);
self.index.orders_closed.insert(client_order_id);
}
if let Some(emulation_trigger) = order.emulation_trigger() {
match emulation_trigger {
TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
_ => self.index.orders_emulated.insert(client_order_id),
};
}
if let Some(database) = &mut self.database {
database.update_order(order.last_event())?;
}
Ok(())
}
pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
self.index
.orders_pending_cancel
.insert(order.client_order_id());
}
pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
if position.is_open() {
self.index.positions_open.insert(position.id);
self.index.positions_closed.remove(&position.id);
} else {
self.index.positions_closed.insert(position.id);
self.index.positions_open.remove(&position.id);
}
if let Some(database) = &mut self.database {
database.update_position(position)?;
}
Ok(())
}
pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
let position_id = position.id;
let mut copied_position = position.clone();
let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
copied_position.id = PositionId::new(new_id);
let position_serialized = bincode::serialize(&copied_position)?;
let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
let new_snapshots = match snapshots {
Some(existing_snapshots) => {
let mut combined = existing_snapshots.to_vec();
combined.extend(position_serialized);
Bytes::from(combined)
}
None => Bytes::from(position_serialized),
};
self.position_snapshots.insert(position_id, new_snapshots);
log::debug!("Snapshot {}", copied_position);
Ok(())
}
pub fn snapshot_position_state(
&mut self,
position: &Position,
open_only: Option<bool>,
) -> anyhow::Result<()> {
let open_only = open_only.unwrap_or(true);
if open_only && !position.is_open() {
return Ok(());
}
if let Some(database) = &mut self.database {
database.snapshot_position_state(position).map_err(|e| {
log::error!(
"Failed to snapshot position state for {}: {:?}",
position.id,
e
);
e
})?;
} else {
log::warn!(
"Cannot snapshot position state for {} (no database configured)",
position.id
);
}
todo!()
}
pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
let database = if let Some(database) = &self.database {
database
} else {
log::warn!(
"Cannot snapshot order state for {} (no database configured)",
order.client_order_id()
);
return Ok(());
};
database.snapshot_order_state(order)
}
fn build_order_query_filter_set(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
) -> Option<HashSet<ClientOrderId>> {
let mut query: Option<HashSet<ClientOrderId>> = None;
if let Some(venue) = venue {
query = Some(
self.index
.venue_orders
.get(venue)
.map_or(HashSet::new(), |o| o.iter().copied().collect()),
);
};
if let Some(instrument_id) = instrument_id {
let instrument_orders = self
.index
.instrument_orders
.get(instrument_id)
.map_or(HashSet::new(), |o| o.iter().copied().collect());
if let Some(existing_query) = &mut query {
*existing_query = existing_query
.intersection(&instrument_orders)
.copied()
.collect();
} else {
query = Some(instrument_orders);
};
};
if let Some(strategy_id) = strategy_id {
let strategy_orders = self
.index
.strategy_orders
.get(strategy_id)
.map_or(HashSet::new(), |o| o.iter().copied().collect());
if let Some(existing_query) = &mut query {
*existing_query = existing_query
.intersection(&strategy_orders)
.copied()
.collect();
} else {
query = Some(strategy_orders);
};
};
query
}
fn build_position_query_filter_set(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
) -> Option<HashSet<PositionId>> {
let mut query: Option<HashSet<PositionId>> = None;
if let Some(venue) = venue {
query = Some(
self.index
.venue_positions
.get(venue)
.map_or(HashSet::new(), |p| p.iter().copied().collect()),
);
};
if let Some(instrument_id) = instrument_id {
let instrument_positions = self
.index
.instrument_positions
.get(instrument_id)
.map_or(HashSet::new(), |p| p.iter().copied().collect());
if let Some(existing_query) = query {
query = Some(
existing_query
.intersection(&instrument_positions)
.copied()
.collect(),
);
} else {
query = Some(instrument_positions);
};
};
if let Some(strategy_id) = strategy_id {
let strategy_positions = self
.index
.strategy_positions
.get(strategy_id)
.map_or(HashSet::new(), |p| p.iter().copied().collect());
if let Some(existing_query) = query {
query = Some(
existing_query
.intersection(&strategy_positions)
.copied()
.collect(),
);
} else {
query = Some(strategy_positions);
};
};
query
}
fn get_orders_for_ids(
&self,
client_order_ids: &HashSet<ClientOrderId>,
side: Option<OrderSide>,
) -> Vec<&OrderAny> {
let side = side.unwrap_or(OrderSide::NoOrderSide);
let mut orders = Vec::new();
for client_order_id in client_order_ids {
let order = self
.orders
.get(client_order_id)
.unwrap_or_else(|| panic!("Order {client_order_id} not found"));
if side == OrderSide::NoOrderSide || side == order.order_side() {
orders.push(order);
};
}
orders
}
fn get_positions_for_ids(
&self,
position_ids: &HashSet<PositionId>,
side: Option<PositionSide>,
) -> Vec<&Position> {
let side = side.unwrap_or(PositionSide::NoPositionSide);
let mut positions = Vec::new();
for position_id in position_ids {
let position = self
.positions
.get(position_id)
.unwrap_or_else(|| panic!("Position {position_id} not found"));
if side == PositionSide::NoPositionSide || side == position.side {
positions.push(position);
};
}
positions
}
#[must_use]
pub fn client_order_ids(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
) -> HashSet<ClientOrderId> {
let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
match query {
Some(query) => self.index.orders.intersection(&query).copied().collect(),
None => self.index.orders.clone(),
}
}
#[must_use]
pub fn client_order_ids_open(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
) -> HashSet<ClientOrderId> {
let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
match query {
Some(query) => self
.index
.orders_open
.intersection(&query)
.copied()
.collect(),
None => self.index.orders_open.clone(),
}
}
#[must_use]
pub fn client_order_ids_closed(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
) -> HashSet<ClientOrderId> {
let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
match query {
Some(query) => self
.index
.orders_closed
.intersection(&query)
.copied()
.collect(),
None => self.index.orders_closed.clone(),
}
}
#[must_use]
pub fn client_order_ids_emulated(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
) -> HashSet<ClientOrderId> {
let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
match query {
Some(query) => self
.index
.orders_emulated
.intersection(&query)
.copied()
.collect(),
None => self.index.orders_emulated.clone(),
}
}
#[must_use]
pub fn client_order_ids_inflight(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
) -> HashSet<ClientOrderId> {
let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
match query {
Some(query) => self
.index
.orders_inflight
.intersection(&query)
.copied()
.collect(),
None => self.index.orders_inflight.clone(),
}
}
#[must_use]
pub fn position_ids(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
) -> HashSet<PositionId> {
let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
match query {
Some(query) => self.index.positions.intersection(&query).copied().collect(),
None => self.index.positions.clone(),
}
}
#[must_use]
pub fn position_open_ids(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
) -> HashSet<PositionId> {
let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
match query {
Some(query) => self
.index
.positions_open
.intersection(&query)
.copied()
.collect(),
None => self.index.positions_open.clone(),
}
}
#[must_use]
pub fn position_closed_ids(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
) -> HashSet<PositionId> {
let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
match query {
Some(query) => self
.index
.positions_closed
.intersection(&query)
.copied()
.collect(),
None => self.index.positions_closed.clone(),
}
}
#[must_use]
pub fn actor_ids(&self) -> HashSet<ComponentId> {
self.index.actors.clone()
}
#[must_use]
pub fn strategy_ids(&self) -> HashSet<StrategyId> {
self.index.strategies.clone()
}
#[must_use]
pub fn exec_algorithm_ids(&self) -> HashSet<ExecAlgorithmId> {
self.index.exec_algorithms.clone()
}
#[must_use]
pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
self.orders.get(client_order_id)
}
#[must_use]
pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
self.index.venue_order_ids.get(venue_order_id)
}
#[must_use]
pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
self.index.client_order_ids.get(client_order_id)
}
#[must_use]
pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
self.index.order_client.get(client_order_id)
}
#[must_use]
pub fn orders(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<OrderSide>,
) -> Vec<&OrderAny> {
let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
self.get_orders_for_ids(&client_order_ids, side)
}
#[must_use]
pub fn orders_open(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<OrderSide>,
) -> Vec<&OrderAny> {
let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
self.get_orders_for_ids(&client_order_ids, side)
}
#[must_use]
pub fn orders_closed(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<OrderSide>,
) -> Vec<&OrderAny> {
let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
self.get_orders_for_ids(&client_order_ids, side)
}
#[must_use]
pub fn orders_emulated(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<OrderSide>,
) -> Vec<&OrderAny> {
let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
self.get_orders_for_ids(&client_order_ids, side)
}
#[must_use]
pub fn orders_inflight(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<OrderSide>,
) -> Vec<&OrderAny> {
let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
self.get_orders_for_ids(&client_order_ids, side)
}
#[must_use]
pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
let client_order_ids = self.index.position_orders.get(position_id);
match client_order_ids {
Some(client_order_ids) => {
self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
}
None => Vec::new(),
}
}
#[must_use]
pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
self.index.orders.contains(client_order_id)
}
#[must_use]
pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
self.index.orders_open.contains(client_order_id)
}
#[must_use]
pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
self.index.orders_closed.contains(client_order_id)
}
#[must_use]
pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
self.index.orders_emulated.contains(client_order_id)
}
#[must_use]
pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
self.index.orders_inflight.contains(client_order_id)
}
#[must_use]
pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
self.index.orders_pending_cancel.contains(client_order_id)
}
#[must_use]
pub fn orders_open_count(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<OrderSide>,
) -> usize {
self.orders_open(venue, instrument_id, strategy_id, side)
.len()
}
#[must_use]
pub fn orders_closed_count(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<OrderSide>,
) -> usize {
self.orders_closed(venue, instrument_id, strategy_id, side)
.len()
}
#[must_use]
pub fn orders_emulated_count(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<OrderSide>,
) -> usize {
self.orders_emulated(venue, instrument_id, strategy_id, side)
.len()
}
#[must_use]
pub fn orders_inflight_count(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<OrderSide>,
) -> usize {
self.orders_inflight(venue, instrument_id, strategy_id, side)
.len()
}
#[must_use]
pub fn orders_total_count(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<OrderSide>,
) -> usize {
self.orders(venue, instrument_id, strategy_id, side).len()
}
#[must_use]
pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
self.order_lists.get(order_list_id)
}
#[must_use]
pub fn order_lists(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
) -> Vec<&OrderList> {
let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
if let Some(venue) = venue {
order_lists.retain(|ol| &ol.instrument_id.venue == venue);
}
if let Some(instrument_id) = instrument_id {
order_lists.retain(|ol| &ol.instrument_id == instrument_id);
}
if let Some(strategy_id) = strategy_id {
order_lists.retain(|ol| &ol.strategy_id == strategy_id);
}
order_lists
}
#[must_use]
pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
self.order_lists.contains_key(order_list_id)
}
#[must_use]
pub fn orders_for_exec_algorithm(
&self,
exec_algorithm_id: &ExecAlgorithmId,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<OrderSide>,
) -> Vec<&OrderAny> {
let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
if let Some(query) = query {
if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
let exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
}
}
if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
self.get_orders_for_ids(exec_algorithm_order_ids, side)
} else {
Vec::new()
}
}
#[must_use]
pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
self.get_orders_for_ids(
self.index
.exec_spawn_orders
.get(exec_spawn_id)
.unwrap_or(&HashSet::new()),
None,
)
}
#[must_use]
pub fn exec_spawn_total_quantity(
&self,
exec_spawn_id: &ClientOrderId,
active_only: bool,
) -> Option<Quantity> {
let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
let mut total_quantity: Option<Quantity> = None;
for spawn_order in exec_spawn_orders {
if !active_only || !spawn_order.is_closed() {
if let Some(mut total_quantity) = total_quantity {
total_quantity += spawn_order.quantity();
}
} else {
total_quantity = Some(spawn_order.quantity());
}
}
total_quantity
}
#[must_use]
pub fn exec_spawn_total_filled_qty(
&self,
exec_spawn_id: &ClientOrderId,
active_only: bool,
) -> Option<Quantity> {
let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
let mut total_quantity: Option<Quantity> = None;
for spawn_order in exec_spawn_orders {
if !active_only || !spawn_order.is_closed() {
if let Some(mut total_quantity) = total_quantity {
total_quantity += spawn_order.filled_qty();
}
} else {
total_quantity = Some(spawn_order.filled_qty());
}
}
total_quantity
}
#[must_use]
pub fn exec_spawn_total_leaves_qty(
&self,
exec_spawn_id: &ClientOrderId,
active_only: bool,
) -> Option<Quantity> {
let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
let mut total_quantity: Option<Quantity> = None;
for spawn_order in exec_spawn_orders {
if !active_only || !spawn_order.is_closed() {
if let Some(mut total_quantity) = total_quantity {
total_quantity += spawn_order.leaves_qty();
}
} else {
total_quantity = Some(spawn_order.leaves_qty());
}
}
total_quantity
}
#[must_use]
pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
self.positions.get(position_id)
}
#[must_use]
pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
self.index
.order_position
.get(client_order_id)
.and_then(|position_id| self.positions.get(position_id))
}
#[must_use]
pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
self.index.order_position.get(client_order_id)
}
#[must_use]
pub fn positions(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<PositionSide>,
) -> Vec<&Position> {
let position_ids = self.position_ids(venue, instrument_id, strategy_id);
self.get_positions_for_ids(&position_ids, side)
}
#[must_use]
pub fn positions_open(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<PositionSide>,
) -> Vec<&Position> {
let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
self.get_positions_for_ids(&position_ids, side)
}
#[must_use]
pub fn positions_closed(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<PositionSide>,
) -> Vec<&Position> {
let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
self.get_positions_for_ids(&position_ids, side)
}
#[must_use]
pub fn position_exists(&self, position_id: &PositionId) -> bool {
self.index.positions.contains(position_id)
}
#[must_use]
pub fn is_position_open(&self, position_id: &PositionId) -> bool {
self.index.positions_open.contains(position_id)
}
#[must_use]
pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
self.index.positions_closed.contains(position_id)
}
#[must_use]
pub fn positions_open_count(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<PositionSide>,
) -> usize {
self.positions_open(venue, instrument_id, strategy_id, side)
.len()
}
#[must_use]
pub fn positions_closed_count(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<PositionSide>,
) -> usize {
self.positions_closed(venue, instrument_id, strategy_id, side)
.len()
}
#[must_use]
pub fn positions_total_count(
&self,
venue: Option<&Venue>,
instrument_id: Option<&InstrumentId>,
strategy_id: Option<&StrategyId>,
side: Option<PositionSide>,
) -> usize {
self.positions(venue, instrument_id, strategy_id, side)
.len()
}
#[must_use]
pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
self.index.order_strategy.get(client_order_id)
}
#[must_use]
pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
self.index.position_strategy.get(position_id)
}
pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
check_valid_string(key, stringify!(key)).expect(FAILED);
Ok(self.general.get(key))
}
#[must_use]
pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
match price_type {
PriceType::Bid => self
.quotes
.get(instrument_id)
.and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
PriceType::Ask => self
.quotes
.get(instrument_id)
.and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
quotes.front().map(|quote| {
Price::new(
(quote.ask_price.as_f64() + quote.bid_price.as_f64()) / 2.0,
quote.bid_price.precision + 1,
)
})
}),
PriceType::Last => self
.trades
.get(instrument_id)
.and_then(|trades| trades.front().map(|trade| trade.price)),
}
}
#[must_use]
pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
self.quotes
.get(instrument_id)
.map(|quotes| quotes.iter().copied().collect())
}
#[must_use]
pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
self.trades
.get(instrument_id)
.map(|trades| trades.iter().copied().collect())
}
#[must_use]
pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
self.bars
.get(bar_type)
.map(|bars| bars.iter().copied().collect())
}
#[must_use]
pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
self.books.get(instrument_id)
}
#[must_use]
pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
self.books.get_mut(instrument_id)
}
#[must_use]
pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
self.quotes
.get(instrument_id)
.and_then(|quotes| quotes.front())
}
#[must_use]
pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
self.trades
.get(instrument_id)
.and_then(|trades| trades.front())
}
#[must_use]
pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
self.bars.get(bar_type).and_then(|bars| bars.front())
}
#[must_use]
pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
self.books.get(instrument_id).map_or(0, |book| book.count) as usize
}
#[must_use]
pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
self.quotes
.get(instrument_id)
.map_or(0, std::collections::VecDeque::len)
}
#[must_use]
pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
self.trades
.get(instrument_id)
.map_or(0, std::collections::VecDeque::len)
}
#[must_use]
pub fn bar_count(&self, bar_type: &BarType) -> usize {
self.bars
.get(bar_type)
.map_or(0, std::collections::VecDeque::len)
}
#[must_use]
pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
self.books.contains_key(instrument_id)
}
#[must_use]
pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
self.quote_count(instrument_id) > 0
}
#[must_use]
pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
self.trade_count(instrument_id) > 0
}
#[must_use]
pub fn has_bars(&self, bar_type: &BarType) -> bool {
self.bar_count(bar_type) > 0
}
#[must_use]
pub fn get_xrate(
&self,
venue: Venue,
from_currency: Currency,
to_currency: Currency,
price_type: PriceType,
) -> Decimal {
if from_currency == to_currency {
return Decimal::ONE;
}
let (bid_quote, ask_quote) = self.build_quote_table(&venue);
get_exchange_rate(from_currency, to_currency, price_type, bid_quote, ask_quote)
}
fn build_quote_table(
&self,
venue: &Venue,
) -> (HashMap<Symbol, Decimal>, HashMap<Symbol, Decimal>) {
let mut bid_quotes = HashMap::new();
let mut ask_quotes = HashMap::new();
for instrument_id in self.instruments.keys() {
if instrument_id.venue != *venue {
continue;
}
let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
if let Some(tick) = ticks.front() {
(tick.bid_price, tick.ask_price)
} else {
continue; }
} else {
let bid_bar = self
.bars
.iter()
.find(|(k, _)| {
k.instrument_id() == *instrument_id
&& matches!(k.spec().price_type, PriceType::Bid)
})
.map(|(_, v)| v);
let ask_bar = self
.bars
.iter()
.find(|(k, _)| {
k.instrument_id() == *instrument_id
&& matches!(k.spec().price_type, PriceType::Ask)
})
.map(|(_, v)| v);
match (bid_bar, ask_bar) {
(Some(bid), Some(ask)) => {
let bid_price = bid.front().unwrap().close;
let ask_price = ask.front().unwrap().close;
(bid_price, ask_price)
}
_ => continue,
}
};
bid_quotes.insert(instrument_id.symbol, bid_price.as_decimal());
ask_quotes.insert(instrument_id.symbol, ask_price.as_decimal());
}
(bid_quotes, ask_quotes)
}
#[must_use]
pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
self.instruments.get(instrument_id)
}
#[must_use]
pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
self.instruments
.keys()
.filter(|i| venue.is_none() || &i.venue == venue.unwrap())
.collect()
}
#[must_use]
pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
self.instruments
.values()
.filter(|i| &i.id().venue == venue)
.filter(|i| underlying.is_none_or(|u| i.underlying() == Some(u)))
.collect()
}
#[must_use]
pub fn bar_types(
&self,
instrument_id: Option<&InstrumentId>,
price_type: Option<&PriceType>,
aggregation_source: AggregationSource,
) -> Vec<&BarType> {
let mut bar_types = self
.bars
.keys()
.filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
.collect::<Vec<&BarType>>();
if let Some(instrument_id) = instrument_id {
bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
}
if let Some(price_type) = price_type {
bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
}
bar_types
}
#[must_use]
pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
self.synthetics.get(instrument_id)
}
#[must_use]
pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
self.synthetics.keys().collect()
}
#[must_use]
pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
self.synthetics.values().collect()
}
#[must_use]
pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
self.accounts.get(account_id)
}
#[must_use]
pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
self.index
.venue_account
.get(venue)
.and_then(|account_id| self.accounts.get(account_id))
}
#[must_use]
pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
self.index.venue_account.get(venue)
}
#[must_use]
pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
self.accounts
.values()
.filter(|account| &account.id() == account_id)
.collect()
}
}