1use std::{collections::HashMap, str::FromStr};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use chrono::{DateTime, Utc};
21use futures::future::join_all;
22use nautilus_common::{cache::database::CacheMap, enums::SerializationEncoding};
23use nautilus_model::{
24 accounts::AccountAny,
25 identifiers::{AccountId, ClientOrderId, InstrumentId, PositionId},
26 instruments::{InstrumentAny, SyntheticInstrument},
27 orders::OrderAny,
28 position::Position,
29 types::Currency,
30};
31use redis::{AsyncCommands, aio::ConnectionManager};
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use tokio::try_join;
35use ustr::Ustr;
36
37const INDEX: &str = "index";
39const GENERAL: &str = "general";
40const CURRENCIES: &str = "currencies";
41const INSTRUMENTS: &str = "instruments";
42const SYNTHETICS: &str = "synthetics";
43const ACCOUNTS: &str = "accounts";
44const ORDERS: &str = "orders";
45const POSITIONS: &str = "positions";
46const ACTORS: &str = "actors";
47const STRATEGIES: &str = "strategies";
48const REDIS_DELIMITER: char = ':';
49
50const INDEX_ORDER_IDS: &str = "index:order_ids";
52const INDEX_ORDER_POSITION: &str = "index:order_position";
53const INDEX_ORDER_CLIENT: &str = "index:order_client";
54const INDEX_ORDERS: &str = "index:orders";
55const INDEX_ORDERS_OPEN: &str = "index:orders_open";
56const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
57const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
58const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
59const INDEX_POSITIONS: &str = "index:positions";
60const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
61const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
62
63#[derive(Debug)]
64pub struct DatabaseQueries;
65
66impl DatabaseQueries {
67 pub fn serialize_payload<T: Serialize>(
73 encoding: SerializationEncoding,
74 payload: &T,
75 ) -> anyhow::Result<Vec<u8>> {
76 let mut value = serde_json::to_value(payload)?;
77 convert_timestamps(&mut value);
78 match encoding {
79 SerializationEncoding::MsgPack => rmp_serde::to_vec(&value)
80 .map_err(|e| anyhow::anyhow!("Failed to serialize msgpack `payload`: {e}")),
81 SerializationEncoding::Json => serde_json::to_vec(&value)
82 .map_err(|e| anyhow::anyhow!("Failed to serialize json `payload`: {e}")),
83 }
84 }
85
86 pub fn deserialize_payload<T: DeserializeOwned>(
92 encoding: SerializationEncoding,
93 payload: &[u8],
94 ) -> anyhow::Result<T> {
95 let mut value = match encoding {
96 SerializationEncoding::MsgPack => rmp_serde::from_slice(payload)
97 .map_err(|e| anyhow::anyhow!("Failed to deserialize msgpack `payload`: {e}"))?,
98 SerializationEncoding::Json => serde_json::from_slice(payload)
99 .map_err(|e| anyhow::anyhow!("Failed to deserialize json `payload`: {e}"))?,
100 };
101
102 convert_timestamp_strings(&mut value);
103
104 serde_json::from_value(value)
105 .map_err(|e| anyhow::anyhow!("Failed to convert value to target type: {e}"))
106 }
107
108 pub async fn scan_keys(
114 con: &mut ConnectionManager,
115 pattern: String,
116 ) -> anyhow::Result<Vec<String>> {
117 let mut result = Vec::new();
118 let mut cursor = 0u64;
119
120 loop {
121 let scan_result: (u64, Vec<String>) = redis::cmd("SCAN")
122 .arg(cursor)
123 .arg("MATCH")
124 .arg(&pattern)
125 .arg("COUNT")
126 .arg(5000)
127 .query_async(con)
128 .await?;
129
130 let (new_cursor, keys) = scan_result;
131 result.extend(keys);
132
133 if new_cursor == 0 {
135 break;
136 }
137
138 cursor = new_cursor;
139 }
140
141 Ok(result)
142 }
143
144 pub async fn read_bulk(
150 con: &ConnectionManager,
151 keys: &[String],
152 ) -> anyhow::Result<Vec<Option<Bytes>>> {
153 if keys.is_empty() {
154 return Ok(vec![]);
155 }
156
157 let mut con = con.clone();
158
159 let results: Vec<Option<Vec<u8>>> =
161 redis::cmd("MGET").arg(keys).query_async(&mut con).await?;
162
163 let bytes_results: Vec<Option<Bytes>> = results
165 .into_iter()
166 .map(|opt| opt.map(Bytes::from))
167 .collect();
168
169 Ok(bytes_results)
170 }
171
172 pub async fn read(
178 con: &ConnectionManager,
179 trader_key: &str,
180 key: &str,
181 ) -> anyhow::Result<Vec<Bytes>> {
182 let collection = Self::get_collection_key(key)?;
183 let full_key = format!("{trader_key}{REDIS_DELIMITER}{key}");
184
185 let mut con = con.clone();
186
187 match collection {
188 INDEX => Self::read_index(&mut con, &full_key).await,
189 GENERAL => Self::read_string(&mut con, &full_key).await,
190 CURRENCIES => Self::read_string(&mut con, &full_key).await,
191 INSTRUMENTS => Self::read_string(&mut con, &full_key).await,
192 SYNTHETICS => Self::read_string(&mut con, &full_key).await,
193 ACCOUNTS => Self::read_list(&mut con, &full_key).await,
194 ORDERS => Self::read_list(&mut con, &full_key).await,
195 POSITIONS => Self::read_list(&mut con, &full_key).await,
196 ACTORS => Self::read_string(&mut con, &full_key).await,
197 STRATEGIES => Self::read_string(&mut con, &full_key).await,
198 _ => anyhow::bail!("Unsupported operation: `read` for collection '{collection}'"),
199 }
200 }
201
202 pub async fn load_all(
208 con: &ConnectionManager,
209 encoding: SerializationEncoding,
210 trader_key: &str,
211 ) -> anyhow::Result<CacheMap> {
212 let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
213 Self::load_currencies(con, trader_key, encoding),
214 Self::load_instruments(con, trader_key, encoding),
215 Self::load_synthetics(con, trader_key, encoding),
216 Self::load_accounts(con, trader_key, encoding),
217 Self::load_orders(con, trader_key, encoding),
218 Self::load_positions(con, trader_key, encoding)
219 )
220 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
221
222 let greeks = AHashMap::new();
225 let yield_curves = AHashMap::new();
226
227 Ok(CacheMap {
228 currencies,
229 instruments,
230 synthetics,
231 accounts,
232 orders,
233 positions,
234 greeks,
235 yield_curves,
236 })
237 }
238
239 pub async fn load_currencies(
245 con: &ConnectionManager,
246 trader_key: &str,
247 encoding: SerializationEncoding,
248 ) -> anyhow::Result<AHashMap<Ustr, Currency>> {
249 let mut currencies = AHashMap::new();
250 let pattern = format!("{trader_key}{REDIS_DELIMITER}{CURRENCIES}*");
251 tracing::debug!("Loading {pattern}");
252
253 let mut con = con.clone();
254 let keys = Self::scan_keys(&mut con, pattern).await?;
255
256 if keys.is_empty() {
257 return Ok(currencies);
258 }
259
260 let bulk_values = Self::read_bulk(&con, &keys).await?;
262
263 for (key, value_opt) in keys.iter().zip(bulk_values.iter()) {
265 let currency_code = if let Some(code) = key.as_str().rsplit(':').next() {
266 Ustr::from(code)
267 } else {
268 log::error!("Invalid key format: {key}");
269 continue;
270 };
271
272 if let Some(value_bytes) = value_opt {
273 match Self::deserialize_payload(encoding, value_bytes) {
274 Ok(currency) => {
275 currencies.insert(currency_code, currency);
276 }
277 Err(e) => {
278 log::error!("Failed to deserialize currency {currency_code}: {e}");
279 }
280 }
281 } else {
282 log::error!("Currency not found in Redis: {currency_code}");
283 }
284 }
285
286 tracing::debug!("Loaded {} currencies(s)", currencies.len());
287
288 Ok(currencies)
289 }
290
291 pub async fn load_instruments(
302 con: &ConnectionManager,
303 trader_key: &str,
304 encoding: SerializationEncoding,
305 ) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
306 let mut instruments = AHashMap::new();
307 let pattern = format!("{trader_key}{REDIS_DELIMITER}{INSTRUMENTS}*");
308 tracing::debug!("Loading {pattern}");
309
310 let mut con = con.clone();
311 let keys = Self::scan_keys(&mut con, pattern).await?;
312
313 let futures: Vec<_> = keys
314 .iter()
315 .map(|key| {
316 let con = con.clone();
317 async move {
318 let instrument_id = key
319 .as_str()
320 .rsplit(':')
321 .next()
322 .ok_or_else(|| {
323 log::error!("Invalid key format: {key}");
324 "Invalid key format"
325 })
326 .and_then(|code| {
327 InstrumentId::from_str(code).map_err(|e| {
328 log::error!("Failed to convert to InstrumentId for {key}: {e}");
329 "Invalid instrument ID"
330 })
331 });
332
333 let instrument_id = match instrument_id {
334 Ok(id) => id,
335 Err(_) => return None,
336 };
337
338 match Self::load_instrument(&con, trader_key, &instrument_id, encoding).await {
339 Ok(Some(instrument)) => Some((instrument_id, instrument)),
340 Ok(None) => {
341 log::error!("Instrument not found: {instrument_id}");
342 None
343 }
344 Err(e) => {
345 log::error!("Failed to load instrument {instrument_id}: {e}");
346 None
347 }
348 }
349 }
350 })
351 .collect();
352
353 instruments.extend(join_all(futures).await.into_iter().flatten());
355 tracing::debug!("Loaded {} instruments(s)", instruments.len());
356
357 Ok(instruments)
358 }
359
360 pub async fn load_synthetics(
371 con: &ConnectionManager,
372 trader_key: &str,
373 encoding: SerializationEncoding,
374 ) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
375 let mut synthetics = AHashMap::new();
376 let pattern = format!("{trader_key}{REDIS_DELIMITER}{SYNTHETICS}*");
377 tracing::debug!("Loading {pattern}");
378
379 let mut con = con.clone();
380 let keys = Self::scan_keys(&mut con, pattern).await?;
381
382 let futures: Vec<_> = keys
383 .iter()
384 .map(|key| {
385 let con = con.clone();
386 async move {
387 let instrument_id = key
388 .as_str()
389 .rsplit(':')
390 .next()
391 .ok_or_else(|| {
392 log::error!("Invalid key format: {key}");
393 "Invalid key format"
394 })
395 .and_then(|code| {
396 InstrumentId::from_str(code).map_err(|e| {
397 log::error!("Failed to parse InstrumentId for {key}: {e}");
398 "Invalid instrument ID"
399 })
400 });
401
402 let instrument_id = match instrument_id {
403 Ok(id) => id,
404 Err(_) => return None,
405 };
406
407 match Self::load_synthetic(&con, trader_key, &instrument_id, encoding).await {
408 Ok(Some(synthetic)) => Some((instrument_id, synthetic)),
409 Ok(None) => {
410 log::error!("Synthetic not found: {instrument_id}");
411 None
412 }
413 Err(e) => {
414 log::error!("Failed to load synthetic {instrument_id}: {e}");
415 None
416 }
417 }
418 }
419 })
420 .collect();
421
422 synthetics.extend(join_all(futures).await.into_iter().flatten());
424 tracing::debug!("Loaded {} synthetics(s)", synthetics.len());
425
426 Ok(synthetics)
427 }
428
429 pub async fn load_accounts(
440 con: &ConnectionManager,
441 trader_key: &str,
442 encoding: SerializationEncoding,
443 ) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
444 let mut accounts = AHashMap::new();
445 let pattern = format!("{trader_key}{REDIS_DELIMITER}{ACCOUNTS}*");
446 tracing::debug!("Loading {pattern}");
447
448 let mut con = con.clone();
449 let keys = Self::scan_keys(&mut con, pattern).await?;
450
451 let futures: Vec<_> = keys
452 .iter()
453 .map(|key| {
454 let con = con.clone();
455 async move {
456 let account_id = if let Some(code) = key.as_str().rsplit(':').next() {
457 AccountId::from(code)
458 } else {
459 log::error!("Invalid key format: {key}");
460 return None;
461 };
462
463 match Self::load_account(&con, trader_key, &account_id, encoding).await {
464 Ok(Some(account)) => Some((account_id, account)),
465 Ok(None) => {
466 log::error!("Account not found: {account_id}");
467 None
468 }
469 Err(e) => {
470 log::error!("Failed to load account {account_id}: {e}");
471 None
472 }
473 }
474 }
475 })
476 .collect();
477
478 accounts.extend(join_all(futures).await.into_iter().flatten());
480 tracing::debug!("Loaded {} accounts(s)", accounts.len());
481
482 Ok(accounts)
483 }
484
485 pub async fn load_orders(
496 con: &ConnectionManager,
497 trader_key: &str,
498 encoding: SerializationEncoding,
499 ) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
500 let mut orders = AHashMap::new();
501 let pattern = format!("{trader_key}{REDIS_DELIMITER}{ORDERS}*");
502 tracing::debug!("Loading {pattern}");
503
504 let mut con = con.clone();
505 let keys = Self::scan_keys(&mut con, pattern).await?;
506
507 let futures: Vec<_> = keys
508 .iter()
509 .map(|key| {
510 let con = con.clone();
511 async move {
512 let client_order_id = if let Some(code) = key.as_str().rsplit(':').next() {
513 ClientOrderId::from(code)
514 } else {
515 log::error!("Invalid key format: {key}");
516 return None;
517 };
518
519 match Self::load_order(&con, trader_key, &client_order_id, encoding).await {
520 Ok(Some(order)) => Some((client_order_id, order)),
521 Ok(None) => {
522 log::error!("Order not found: {client_order_id}");
523 None
524 }
525 Err(e) => {
526 log::error!("Failed to load order {client_order_id}: {e}");
527 None
528 }
529 }
530 }
531 })
532 .collect();
533
534 orders.extend(join_all(futures).await.into_iter().flatten());
536 tracing::debug!("Loaded {} order(s)", orders.len());
537
538 Ok(orders)
539 }
540
541 pub async fn load_positions(
552 con: &ConnectionManager,
553 trader_key: &str,
554 encoding: SerializationEncoding,
555 ) -> anyhow::Result<AHashMap<PositionId, Position>> {
556 let mut positions = AHashMap::new();
557 let pattern = format!("{trader_key}{REDIS_DELIMITER}{POSITIONS}*");
558 tracing::debug!("Loading {pattern}");
559
560 let mut con = con.clone();
561 let keys = Self::scan_keys(&mut con, pattern).await?;
562
563 let futures: Vec<_> = keys
564 .iter()
565 .map(|key| {
566 let con = con.clone();
567 async move {
568 let position_id = if let Some(code) = key.as_str().rsplit(':').next() {
569 PositionId::from(code)
570 } else {
571 log::error!("Invalid key format: {key}");
572 return None;
573 };
574
575 match Self::load_position(&con, trader_key, &position_id, encoding).await {
576 Ok(Some(position)) => Some((position_id, position)),
577 Ok(None) => {
578 log::error!("Position not found: {position_id}");
579 None
580 }
581 Err(e) => {
582 log::error!("Failed to load position {position_id}: {e}");
583 None
584 }
585 }
586 }
587 })
588 .collect();
589
590 positions.extend(join_all(futures).await.into_iter().flatten());
592 tracing::debug!("Loaded {} position(s)", positions.len());
593
594 Ok(positions)
595 }
596
597 pub async fn load_currency(
603 con: &ConnectionManager,
604 trader_key: &str,
605 code: &Ustr,
606 encoding: SerializationEncoding,
607 ) -> anyhow::Result<Option<Currency>> {
608 let key = format!("{CURRENCIES}{REDIS_DELIMITER}{code}");
609 let result = Self::read(con, trader_key, &key).await?;
610
611 if result.is_empty() {
612 return Ok(None);
613 }
614
615 let currency = Self::deserialize_payload(encoding, &result[0])?;
616 Ok(currency)
617 }
618
619 pub async fn load_instrument(
625 con: &ConnectionManager,
626 trader_key: &str,
627 instrument_id: &InstrumentId,
628 encoding: SerializationEncoding,
629 ) -> anyhow::Result<Option<InstrumentAny>> {
630 let key = format!("{INSTRUMENTS}{REDIS_DELIMITER}{instrument_id}");
631 let result = Self::read(con, trader_key, &key).await?;
632 if result.is_empty() {
633 return Ok(None);
634 }
635
636 let instrument: InstrumentAny = Self::deserialize_payload(encoding, &result[0])?;
637 Ok(Some(instrument))
638 }
639
640 pub async fn load_synthetic(
646 con: &ConnectionManager,
647 trader_key: &str,
648 instrument_id: &InstrumentId,
649 encoding: SerializationEncoding,
650 ) -> anyhow::Result<Option<SyntheticInstrument>> {
651 let key = format!("{SYNTHETICS}{REDIS_DELIMITER}{instrument_id}");
652 let result = Self::read(con, trader_key, &key).await?;
653 if result.is_empty() {
654 return Ok(None);
655 }
656
657 let synthetic: SyntheticInstrument = Self::deserialize_payload(encoding, &result[0])?;
658 Ok(Some(synthetic))
659 }
660
661 pub async fn load_account(
667 con: &ConnectionManager,
668 trader_key: &str,
669 account_id: &AccountId,
670 encoding: SerializationEncoding,
671 ) -> anyhow::Result<Option<AccountAny>> {
672 let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
673 let result = Self::read(con, trader_key, &key).await?;
674 if result.is_empty() {
675 return Ok(None);
676 }
677
678 let account: AccountAny = Self::deserialize_payload(encoding, &result[0])?;
679 Ok(Some(account))
680 }
681
682 pub async fn load_order(
688 con: &ConnectionManager,
689 trader_key: &str,
690 client_order_id: &ClientOrderId,
691 encoding: SerializationEncoding,
692 ) -> anyhow::Result<Option<OrderAny>> {
693 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
694 let result = Self::read(con, trader_key, &key).await?;
695 if result.is_empty() {
696 return Ok(None);
697 }
698
699 let order: OrderAny = Self::deserialize_payload(encoding, &result[0])?;
700 Ok(Some(order))
701 }
702
703 pub async fn load_position(
709 con: &ConnectionManager,
710 trader_key: &str,
711 position_id: &PositionId,
712 encoding: SerializationEncoding,
713 ) -> anyhow::Result<Option<Position>> {
714 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
715 let result = Self::read(con, trader_key, &key).await?;
716 if result.is_empty() {
717 return Ok(None);
718 }
719
720 let position: Position = Self::deserialize_payload(encoding, &result[0])?;
721 Ok(Some(position))
722 }
723
724 fn get_collection_key(key: &str) -> anyhow::Result<&str> {
725 key.split_once(REDIS_DELIMITER)
726 .map(|(collection, _)| collection)
727 .ok_or_else(|| {
728 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
729 })
730 }
731
732 async fn read_index(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
733 let index_key = Self::get_index_key(key)?;
734 match index_key {
735 INDEX_ORDER_IDS => Self::read_set(conn, key).await,
736 INDEX_ORDER_POSITION => Self::read_hset(conn, key).await,
737 INDEX_ORDER_CLIENT => Self::read_hset(conn, key).await,
738 INDEX_ORDERS => Self::read_set(conn, key).await,
739 INDEX_ORDERS_OPEN => Self::read_set(conn, key).await,
740 INDEX_ORDERS_CLOSED => Self::read_set(conn, key).await,
741 INDEX_ORDERS_EMULATED => Self::read_set(conn, key).await,
742 INDEX_ORDERS_INFLIGHT => Self::read_set(conn, key).await,
743 INDEX_POSITIONS => Self::read_set(conn, key).await,
744 INDEX_POSITIONS_OPEN => Self::read_set(conn, key).await,
745 INDEX_POSITIONS_CLOSED => Self::read_set(conn, key).await,
746 _ => anyhow::bail!("Index unknown '{index_key}' on read"),
747 }
748 }
749
750 async fn read_string(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
751 let result: Vec<u8> = conn.get(key).await?;
752
753 if result.is_empty() {
754 Ok(vec![])
755 } else {
756 Ok(vec![Bytes::from(result)])
757 }
758 }
759
760 async fn read_set(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
761 let result: Vec<Bytes> = conn.smembers(key).await?;
762 Ok(result)
763 }
764
765 async fn read_hset(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
766 let result: HashMap<String, String> = conn.hgetall(key).await?;
767 let json = serde_json::to_string(&result)?;
768 Ok(vec![Bytes::from(json.into_bytes())])
769 }
770
771 async fn read_list(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
772 let result: Vec<Bytes> = conn.lrange(key, 0, -1).await?;
773 Ok(result)
774 }
775
776 fn get_index_key(key: &str) -> anyhow::Result<&str> {
777 key.split_once(REDIS_DELIMITER)
778 .map(|(_, index_key)| index_key)
779 .ok_or_else(|| {
780 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
781 })
782 }
783}
784
785fn is_timestamp_field(key: &str) -> bool {
786 let expire_match = key == "expire_time_ns";
787 let ts_match = key.starts_with("ts_");
788 expire_match || ts_match
789}
790
791fn convert_timestamps(value: &mut Value) {
792 match value {
793 Value::Object(map) => {
794 for (key, v) in map {
795 if is_timestamp_field(key)
796 && let Value::Number(n) = v
797 && let Some(n) = n.as_u64()
798 {
799 let dt = DateTime::<Utc>::from_timestamp_nanos(n as i64);
800 *v = Value::String(dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true));
801 }
802 convert_timestamps(v);
803 }
804 }
805 Value::Array(arr) => {
806 for item in arr {
807 convert_timestamps(item);
808 }
809 }
810 _ => {}
811 }
812}
813
814fn convert_timestamp_strings(value: &mut Value) {
815 match value {
816 Value::Object(map) => {
817 for (key, v) in map {
818 if is_timestamp_field(key)
819 && let Value::String(s) = v
820 && let Ok(dt) = DateTime::parse_from_rfc3339(s)
821 {
822 *v = Value::Number(
823 (dt.with_timezone(&Utc)
824 .timestamp_nanos_opt()
825 .expect("Invalid DateTime") as u64)
826 .into(),
827 );
828 }
829 convert_timestamp_strings(v);
830 }
831 }
832 Value::Array(arr) => {
833 for item in arr {
834 convert_timestamp_strings(item);
835 }
836 }
837 _ => {}
838 }
839}