1use std::{collections::HashMap, str::FromStr};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use chrono::{DateTime, Utc};
21use futures::future::join_all;
22use nautilus_common::{cache::database::CacheMap, enums::SerializationEncoding};
23use nautilus_model::{
24 accounts::AccountAny,
25 identifiers::{AccountId, ClientOrderId, InstrumentId, PositionId},
26 instruments::{InstrumentAny, SyntheticInstrument},
27 orders::OrderAny,
28 position::Position,
29 types::Currency,
30};
31use redis::{AsyncCommands, aio::ConnectionManager};
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use ustr::Ustr;
35
36const INDEX: &str = "index";
38const GENERAL: &str = "general";
39const CURRENCIES: &str = "currencies";
40const INSTRUMENTS: &str = "instruments";
41const SYNTHETICS: &str = "synthetics";
42const ACCOUNTS: &str = "accounts";
43const ORDERS: &str = "orders";
44const POSITIONS: &str = "positions";
45const ACTORS: &str = "actors";
46const STRATEGIES: &str = "strategies";
47const REDIS_DELIMITER: char = ':';
48
49const INDEX_ORDER_IDS: &str = "index:order_ids";
51const INDEX_ORDER_POSITION: &str = "index:order_position";
52const INDEX_ORDER_CLIENT: &str = "index:order_client";
53const INDEX_ORDERS: &str = "index:orders";
54const INDEX_ORDERS_OPEN: &str = "index:orders_open";
55const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
56const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
57const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
58const INDEX_POSITIONS: &str = "index:positions";
59const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
60const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
61
62#[derive(Debug)]
63pub struct DatabaseQueries;
64
65impl DatabaseQueries {
66 pub fn serialize_payload<T: Serialize>(
72 encoding: SerializationEncoding,
73 payload: &T,
74 ) -> anyhow::Result<Vec<u8>> {
75 let mut value = serde_json::to_value(payload)?;
76 convert_timestamps(&mut value);
77 match encoding {
78 SerializationEncoding::MsgPack => rmp_serde::to_vec(&value)
79 .map_err(|e| anyhow::anyhow!("Failed to serialize msgpack `payload`: {e}")),
80 SerializationEncoding::Json => serde_json::to_vec(&value)
81 .map_err(|e| anyhow::anyhow!("Failed to serialize json `payload`: {e}")),
82 }
83 }
84
85 pub fn deserialize_payload<T: DeserializeOwned>(
91 encoding: SerializationEncoding,
92 payload: &[u8],
93 ) -> anyhow::Result<T> {
94 let mut value = match encoding {
95 SerializationEncoding::MsgPack => rmp_serde::from_slice(payload)
96 .map_err(|e| anyhow::anyhow!("Failed to deserialize msgpack `payload`: {e}"))?,
97 SerializationEncoding::Json => serde_json::from_slice(payload)
98 .map_err(|e| anyhow::anyhow!("Failed to deserialize json `payload`: {e}"))?,
99 };
100
101 convert_timestamp_strings(&mut value);
102
103 serde_json::from_value(value)
104 .map_err(|e| anyhow::anyhow!("Failed to convert value to target type: {e}"))
105 }
106
107 pub async fn scan_keys(
113 con: &mut ConnectionManager,
114 pattern: String,
115 ) -> anyhow::Result<Vec<String>> {
116 let mut result = Vec::new();
117 let mut cursor = 0u64;
118
119 loop {
120 let scan_result: (u64, Vec<String>) = redis::cmd("SCAN")
121 .arg(cursor)
122 .arg("MATCH")
123 .arg(&pattern)
124 .arg("COUNT")
125 .arg(5000)
126 .query_async(con)
127 .await?;
128
129 let (new_cursor, keys) = scan_result;
130 result.extend(keys);
131
132 if new_cursor == 0 {
134 break;
135 }
136
137 cursor = new_cursor;
138 }
139
140 Ok(result)
141 }
142
143 pub async fn read_bulk(
149 con: &ConnectionManager,
150 keys: &[String],
151 ) -> anyhow::Result<Vec<Option<Bytes>>> {
152 if keys.is_empty() {
153 return Ok(vec![]);
154 }
155
156 let mut con = con.clone();
157
158 let results: Vec<Option<Vec<u8>>> =
160 redis::cmd("MGET").arg(keys).query_async(&mut con).await?;
161
162 let bytes_results: Vec<Option<Bytes>> = results
164 .into_iter()
165 .map(|opt| opt.map(Bytes::from))
166 .collect();
167
168 Ok(bytes_results)
169 }
170
171 pub async fn read(
177 con: &ConnectionManager,
178 trader_key: &str,
179 key: &str,
180 ) -> anyhow::Result<Vec<Bytes>> {
181 let collection = Self::get_collection_key(key)?;
182 let full_key = format!("{trader_key}{REDIS_DELIMITER}{key}");
183
184 let mut con = con.clone();
185
186 match collection {
187 INDEX => Self::read_index(&mut con, &full_key).await,
188 GENERAL => Self::read_string(&mut con, &full_key).await,
189 CURRENCIES => Self::read_string(&mut con, &full_key).await,
190 INSTRUMENTS => Self::read_string(&mut con, &full_key).await,
191 SYNTHETICS => Self::read_string(&mut con, &full_key).await,
192 ACCOUNTS => Self::read_list(&mut con, &full_key).await,
193 ORDERS => Self::read_list(&mut con, &full_key).await,
194 POSITIONS => Self::read_list(&mut con, &full_key).await,
195 ACTORS => Self::read_string(&mut con, &full_key).await,
196 STRATEGIES => Self::read_string(&mut con, &full_key).await,
197 _ => anyhow::bail!("Unsupported operation: `read` for collection '{collection}'"),
198 }
199 }
200
201 pub async fn load_all(
207 con: &ConnectionManager,
208 encoding: SerializationEncoding,
209 trader_key: &str,
210 ) -> anyhow::Result<CacheMap> {
211 let (currencies, instruments, synthetics, accounts, orders, positions) = tokio::try_join!(
212 Self::load_currencies(con, trader_key, encoding),
213 Self::load_instruments(con, trader_key, encoding),
214 Self::load_synthetics(con, trader_key, encoding),
215 Self::load_accounts(con, trader_key, encoding),
216 Self::load_orders(con, trader_key, encoding),
217 Self::load_positions(con, trader_key, encoding)
218 )
219 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
220
221 let greeks = AHashMap::new();
224 let yield_curves = AHashMap::new();
225
226 Ok(CacheMap {
227 currencies,
228 instruments,
229 synthetics,
230 accounts,
231 orders,
232 positions,
233 greeks,
234 yield_curves,
235 })
236 }
237
238 pub async fn load_currencies(
244 con: &ConnectionManager,
245 trader_key: &str,
246 encoding: SerializationEncoding,
247 ) -> anyhow::Result<AHashMap<Ustr, Currency>> {
248 let mut currencies = AHashMap::new();
249 let pattern = format!("{trader_key}{REDIS_DELIMITER}{CURRENCIES}*");
250 tracing::debug!("Loading {pattern}");
251
252 let mut con = con.clone();
253 let keys = Self::scan_keys(&mut con, pattern).await?;
254
255 if keys.is_empty() {
256 return Ok(currencies);
257 }
258
259 let bulk_values = Self::read_bulk(&con, &keys).await?;
261
262 for (key, value_opt) in keys.iter().zip(bulk_values.iter()) {
264 let currency_code = if let Some(code) = key.as_str().rsplit(':').next() {
265 Ustr::from(code)
266 } else {
267 log::error!("Invalid key format: {key}");
268 continue;
269 };
270
271 if let Some(value_bytes) = value_opt {
272 match Self::deserialize_payload(encoding, value_bytes) {
273 Ok(currency) => {
274 currencies.insert(currency_code, currency);
275 }
276 Err(e) => {
277 log::error!("Failed to deserialize currency {currency_code}: {e}");
278 }
279 }
280 } else {
281 log::error!("Currency not found in Redis: {currency_code}");
282 }
283 }
284
285 tracing::debug!("Loaded {} currencies(s)", currencies.len());
286
287 Ok(currencies)
288 }
289
290 pub async fn load_instruments(
301 con: &ConnectionManager,
302 trader_key: &str,
303 encoding: SerializationEncoding,
304 ) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
305 let mut instruments = AHashMap::new();
306 let pattern = format!("{trader_key}{REDIS_DELIMITER}{INSTRUMENTS}*");
307 tracing::debug!("Loading {pattern}");
308
309 let mut con = con.clone();
310 let keys = Self::scan_keys(&mut con, pattern).await?;
311
312 let futures: Vec<_> = keys
313 .iter()
314 .map(|key| {
315 let con = con.clone();
316 async move {
317 let instrument_id = key
318 .as_str()
319 .rsplit(':')
320 .next()
321 .ok_or_else(|| {
322 log::error!("Invalid key format: {key}");
323 "Invalid key format"
324 })
325 .and_then(|code| {
326 InstrumentId::from_str(code).map_err(|e| {
327 log::error!("Failed to convert to InstrumentId for {key}: {e}");
328 "Invalid instrument ID"
329 })
330 });
331
332 let instrument_id = match instrument_id {
333 Ok(id) => id,
334 Err(_) => return None,
335 };
336
337 match Self::load_instrument(&con, trader_key, &instrument_id, encoding).await {
338 Ok(Some(instrument)) => Some((instrument_id, instrument)),
339 Ok(None) => {
340 log::error!("Instrument not found: {instrument_id}");
341 None
342 }
343 Err(e) => {
344 log::error!("Failed to load instrument {instrument_id}: {e}");
345 None
346 }
347 }
348 }
349 })
350 .collect();
351
352 instruments.extend(join_all(futures).await.into_iter().flatten());
354 tracing::debug!("Loaded {} instruments(s)", instruments.len());
355
356 Ok(instruments)
357 }
358
359 pub async fn load_synthetics(
370 con: &ConnectionManager,
371 trader_key: &str,
372 encoding: SerializationEncoding,
373 ) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
374 let mut synthetics = AHashMap::new();
375 let pattern = format!("{trader_key}{REDIS_DELIMITER}{SYNTHETICS}*");
376 tracing::debug!("Loading {pattern}");
377
378 let mut con = con.clone();
379 let keys = Self::scan_keys(&mut con, pattern).await?;
380
381 let futures: Vec<_> = keys
382 .iter()
383 .map(|key| {
384 let con = con.clone();
385 async move {
386 let instrument_id = key
387 .as_str()
388 .rsplit(':')
389 .next()
390 .ok_or_else(|| {
391 log::error!("Invalid key format: {key}");
392 "Invalid key format"
393 })
394 .and_then(|code| {
395 InstrumentId::from_str(code).map_err(|e| {
396 log::error!("Failed to parse InstrumentId for {key}: {e}");
397 "Invalid instrument ID"
398 })
399 });
400
401 let instrument_id = match instrument_id {
402 Ok(id) => id,
403 Err(_) => return None,
404 };
405
406 match Self::load_synthetic(&con, trader_key, &instrument_id, encoding).await {
407 Ok(Some(synthetic)) => Some((instrument_id, synthetic)),
408 Ok(None) => {
409 log::error!("Synthetic not found: {instrument_id}");
410 None
411 }
412 Err(e) => {
413 log::error!("Failed to load synthetic {instrument_id}: {e}");
414 None
415 }
416 }
417 }
418 })
419 .collect();
420
421 synthetics.extend(join_all(futures).await.into_iter().flatten());
423 tracing::debug!("Loaded {} synthetics(s)", synthetics.len());
424
425 Ok(synthetics)
426 }
427
428 pub async fn load_accounts(
439 con: &ConnectionManager,
440 trader_key: &str,
441 encoding: SerializationEncoding,
442 ) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
443 let mut accounts = AHashMap::new();
444 let pattern = format!("{trader_key}{REDIS_DELIMITER}{ACCOUNTS}*");
445 tracing::debug!("Loading {pattern}");
446
447 let mut con = con.clone();
448 let keys = Self::scan_keys(&mut con, pattern).await?;
449
450 let futures: Vec<_> = keys
451 .iter()
452 .map(|key| {
453 let con = con.clone();
454 async move {
455 let account_id = if let Some(code) = key.as_str().rsplit(':').next() {
456 AccountId::from(code)
457 } else {
458 log::error!("Invalid key format: {key}");
459 return None;
460 };
461
462 match Self::load_account(&con, trader_key, &account_id, encoding).await {
463 Ok(Some(account)) => Some((account_id, account)),
464 Ok(None) => {
465 log::error!("Account not found: {account_id}");
466 None
467 }
468 Err(e) => {
469 log::error!("Failed to load account {account_id}: {e}");
470 None
471 }
472 }
473 }
474 })
475 .collect();
476
477 accounts.extend(join_all(futures).await.into_iter().flatten());
479 tracing::debug!("Loaded {} accounts(s)", accounts.len());
480
481 Ok(accounts)
482 }
483
484 pub async fn load_orders(
495 con: &ConnectionManager,
496 trader_key: &str,
497 encoding: SerializationEncoding,
498 ) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
499 let mut orders = AHashMap::new();
500 let pattern = format!("{trader_key}{REDIS_DELIMITER}{ORDERS}*");
501 tracing::debug!("Loading {pattern}");
502
503 let mut con = con.clone();
504 let keys = Self::scan_keys(&mut con, pattern).await?;
505
506 let futures: Vec<_> = keys
507 .iter()
508 .map(|key| {
509 let con = con.clone();
510 async move {
511 let client_order_id = if let Some(code) = key.as_str().rsplit(':').next() {
512 ClientOrderId::from(code)
513 } else {
514 log::error!("Invalid key format: {key}");
515 return None;
516 };
517
518 match Self::load_order(&con, trader_key, &client_order_id, encoding).await {
519 Ok(Some(order)) => Some((client_order_id, order)),
520 Ok(None) => {
521 log::error!("Order not found: {client_order_id}");
522 None
523 }
524 Err(e) => {
525 log::error!("Failed to load order {client_order_id}: {e}");
526 None
527 }
528 }
529 }
530 })
531 .collect();
532
533 orders.extend(join_all(futures).await.into_iter().flatten());
535 tracing::debug!("Loaded {} order(s)", orders.len());
536
537 Ok(orders)
538 }
539
540 pub async fn load_positions(
551 con: &ConnectionManager,
552 trader_key: &str,
553 encoding: SerializationEncoding,
554 ) -> anyhow::Result<AHashMap<PositionId, Position>> {
555 let mut positions = AHashMap::new();
556 let pattern = format!("{trader_key}{REDIS_DELIMITER}{POSITIONS}*");
557 tracing::debug!("Loading {pattern}");
558
559 let mut con = con.clone();
560 let keys = Self::scan_keys(&mut con, pattern).await?;
561
562 let futures: Vec<_> = keys
563 .iter()
564 .map(|key| {
565 let con = con.clone();
566 async move {
567 let position_id = if let Some(code) = key.as_str().rsplit(':').next() {
568 PositionId::from(code)
569 } else {
570 log::error!("Invalid key format: {key}");
571 return None;
572 };
573
574 match Self::load_position(&con, trader_key, &position_id, encoding).await {
575 Ok(Some(position)) => Some((position_id, position)),
576 Ok(None) => {
577 log::error!("Position not found: {position_id}");
578 None
579 }
580 Err(e) => {
581 log::error!("Failed to load position {position_id}: {e}");
582 None
583 }
584 }
585 }
586 })
587 .collect();
588
589 positions.extend(join_all(futures).await.into_iter().flatten());
591 tracing::debug!("Loaded {} position(s)", positions.len());
592
593 Ok(positions)
594 }
595
596 pub async fn load_currency(
602 con: &ConnectionManager,
603 trader_key: &str,
604 code: &Ustr,
605 encoding: SerializationEncoding,
606 ) -> anyhow::Result<Option<Currency>> {
607 let key = format!("{CURRENCIES}{REDIS_DELIMITER}{code}");
608 let result = Self::read(con, trader_key, &key).await?;
609
610 if result.is_empty() {
611 return Ok(None);
612 }
613
614 let currency = Self::deserialize_payload(encoding, &result[0])?;
615 Ok(currency)
616 }
617
618 pub async fn load_instrument(
624 con: &ConnectionManager,
625 trader_key: &str,
626 instrument_id: &InstrumentId,
627 encoding: SerializationEncoding,
628 ) -> anyhow::Result<Option<InstrumentAny>> {
629 let key = format!("{INSTRUMENTS}{REDIS_DELIMITER}{instrument_id}");
630 let result = Self::read(con, trader_key, &key).await?;
631 if result.is_empty() {
632 return Ok(None);
633 }
634
635 let instrument: InstrumentAny = Self::deserialize_payload(encoding, &result[0])?;
636 Ok(Some(instrument))
637 }
638
639 pub async fn load_synthetic(
645 con: &ConnectionManager,
646 trader_key: &str,
647 instrument_id: &InstrumentId,
648 encoding: SerializationEncoding,
649 ) -> anyhow::Result<Option<SyntheticInstrument>> {
650 let key = format!("{SYNTHETICS}{REDIS_DELIMITER}{instrument_id}");
651 let result = Self::read(con, trader_key, &key).await?;
652 if result.is_empty() {
653 return Ok(None);
654 }
655
656 let synthetic: SyntheticInstrument = Self::deserialize_payload(encoding, &result[0])?;
657 Ok(Some(synthetic))
658 }
659
660 pub async fn load_account(
666 con: &ConnectionManager,
667 trader_key: &str,
668 account_id: &AccountId,
669 encoding: SerializationEncoding,
670 ) -> anyhow::Result<Option<AccountAny>> {
671 let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
672 let result = Self::read(con, trader_key, &key).await?;
673 if result.is_empty() {
674 return Ok(None);
675 }
676
677 let account: AccountAny = Self::deserialize_payload(encoding, &result[0])?;
678 Ok(Some(account))
679 }
680
681 pub async fn load_order(
687 con: &ConnectionManager,
688 trader_key: &str,
689 client_order_id: &ClientOrderId,
690 encoding: SerializationEncoding,
691 ) -> anyhow::Result<Option<OrderAny>> {
692 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
693 let result = Self::read(con, trader_key, &key).await?;
694 if result.is_empty() {
695 return Ok(None);
696 }
697
698 let order: OrderAny = Self::deserialize_payload(encoding, &result[0])?;
699 Ok(Some(order))
700 }
701
702 pub async fn load_position(
708 con: &ConnectionManager,
709 trader_key: &str,
710 position_id: &PositionId,
711 encoding: SerializationEncoding,
712 ) -> anyhow::Result<Option<Position>> {
713 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
714 let result = Self::read(con, trader_key, &key).await?;
715 if result.is_empty() {
716 return Ok(None);
717 }
718
719 let position: Position = Self::deserialize_payload(encoding, &result[0])?;
720 Ok(Some(position))
721 }
722
723 fn get_collection_key(key: &str) -> anyhow::Result<&str> {
724 key.split_once(REDIS_DELIMITER)
725 .map(|(collection, _)| collection)
726 .ok_or_else(|| {
727 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
728 })
729 }
730
731 async fn read_index(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
732 let index_key = Self::get_index_key(key)?;
733 match index_key {
734 INDEX_ORDER_IDS => Self::read_set(conn, key).await,
735 INDEX_ORDER_POSITION => Self::read_hset(conn, key).await,
736 INDEX_ORDER_CLIENT => Self::read_hset(conn, key).await,
737 INDEX_ORDERS => Self::read_set(conn, key).await,
738 INDEX_ORDERS_OPEN => Self::read_set(conn, key).await,
739 INDEX_ORDERS_CLOSED => Self::read_set(conn, key).await,
740 INDEX_ORDERS_EMULATED => Self::read_set(conn, key).await,
741 INDEX_ORDERS_INFLIGHT => Self::read_set(conn, key).await,
742 INDEX_POSITIONS => Self::read_set(conn, key).await,
743 INDEX_POSITIONS_OPEN => Self::read_set(conn, key).await,
744 INDEX_POSITIONS_CLOSED => Self::read_set(conn, key).await,
745 _ => anyhow::bail!("Index unknown '{index_key}' on read"),
746 }
747 }
748
749 async fn read_string(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
750 let result: Vec<u8> = conn.get(key).await?;
751
752 if result.is_empty() {
753 Ok(vec![])
754 } else {
755 Ok(vec![Bytes::from(result)])
756 }
757 }
758
759 async fn read_set(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
760 let result: Vec<Bytes> = conn.smembers(key).await?;
761 Ok(result)
762 }
763
764 async fn read_hset(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
765 let result: HashMap<String, String> = conn.hgetall(key).await?;
766 let json = serde_json::to_string(&result)?;
767 Ok(vec![Bytes::from(json.into_bytes())])
768 }
769
770 async fn read_list(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
771 let result: Vec<Bytes> = conn.lrange(key, 0, -1).await?;
772 Ok(result)
773 }
774
775 fn get_index_key(key: &str) -> anyhow::Result<&str> {
776 key.split_once(REDIS_DELIMITER)
777 .map(|(_, index_key)| index_key)
778 .ok_or_else(|| {
779 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
780 })
781 }
782}
783
784fn is_timestamp_field(key: &str) -> bool {
785 let expire_match = key == "expire_time_ns";
786 let ts_match = key.starts_with("ts_");
787 expire_match || ts_match
788}
789
790fn convert_timestamps(value: &mut Value) {
791 match value {
792 Value::Object(map) => {
793 for (key, v) in map {
794 if is_timestamp_field(key)
795 && let Value::Number(n) = v
796 && let Some(n) = n.as_u64()
797 {
798 let dt = DateTime::<Utc>::from_timestamp_nanos(n as i64);
799 *v = Value::String(dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true));
800 }
801 convert_timestamps(v);
802 }
803 }
804 Value::Array(arr) => {
805 for item in arr {
806 convert_timestamps(item);
807 }
808 }
809 _ => {}
810 }
811}
812
813fn convert_timestamp_strings(value: &mut Value) {
814 match value {
815 Value::Object(map) => {
816 for (key, v) in map {
817 if is_timestamp_field(key)
818 && let Value::String(s) = v
819 && let Ok(dt) = DateTime::parse_from_rfc3339(s)
820 {
821 *v = Value::Number(
822 (dt.with_timezone(&Utc)
823 .timestamp_nanos_opt()
824 .expect("Invalid DateTime") as u64)
825 .into(),
826 );
827 }
828 convert_timestamp_strings(v);
829 }
830 }
831 Value::Array(arr) => {
832 for item in arr {
833 convert_timestamp_strings(item);
834 }
835 }
836 _ => {}
837 }
838}