1use std::{collections::HashMap, str::FromStr};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use chrono::{DateTime, Utc};
21use futures::future::join_all;
22use nautilus_common::{cache::database::CacheMap, enums::SerializationEncoding};
23use nautilus_model::{
24 accounts::AccountAny,
25 identifiers::{AccountId, ClientOrderId, InstrumentId, PositionId},
26 instruments::{InstrumentAny, SyntheticInstrument},
27 orders::OrderAny,
28 position::Position,
29 types::Currency,
30};
31use redis::{AsyncCommands, aio::ConnectionManager};
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use ustr::Ustr;
35
36use super::get_index_key;
37
38const INDEX: &str = "index";
40const GENERAL: &str = "general";
41const CURRENCIES: &str = "currencies";
42const INSTRUMENTS: &str = "instruments";
43const SYNTHETICS: &str = "synthetics";
44const ACCOUNTS: &str = "accounts";
45const ORDERS: &str = "orders";
46const POSITIONS: &str = "positions";
47const ACTORS: &str = "actors";
48const STRATEGIES: &str = "strategies";
49const REDIS_DELIMITER: char = ':';
50
51const INDEX_ORDER_IDS: &str = "index:order_ids";
53const INDEX_ORDER_POSITION: &str = "index:order_position";
54const INDEX_ORDER_CLIENT: &str = "index:order_client";
55const INDEX_ORDERS: &str = "index:orders";
56const INDEX_ORDERS_OPEN: &str = "index:orders_open";
57const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
58const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
59const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
60const INDEX_POSITIONS: &str = "index:positions";
61const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
62const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
63
64#[derive(Debug)]
65pub struct DatabaseQueries;
66
67impl DatabaseQueries {
68 pub fn serialize_payload<T: Serialize>(
74 encoding: SerializationEncoding,
75 payload: &T,
76 ) -> anyhow::Result<Vec<u8>> {
77 let mut value = serde_json::to_value(payload)?;
78 convert_timestamps(&mut value);
79 match encoding {
80 SerializationEncoding::MsgPack => rmp_serde::to_vec(&value)
81 .map_err(|e| anyhow::anyhow!("Failed to serialize msgpack `payload`: {e}")),
82 SerializationEncoding::Json => serde_json::to_vec(&value)
83 .map_err(|e| anyhow::anyhow!("Failed to serialize json `payload`: {e}")),
84 }
85 }
86
87 pub fn deserialize_payload<T: DeserializeOwned>(
93 encoding: SerializationEncoding,
94 payload: &[u8],
95 ) -> anyhow::Result<T> {
96 let mut value = match encoding {
97 SerializationEncoding::MsgPack => rmp_serde::from_slice(payload)
98 .map_err(|e| anyhow::anyhow!("Failed to deserialize msgpack `payload`: {e}"))?,
99 SerializationEncoding::Json => serde_json::from_slice(payload)
100 .map_err(|e| anyhow::anyhow!("Failed to deserialize json `payload`: {e}"))?,
101 };
102
103 convert_timestamp_strings(&mut value);
104
105 serde_json::from_value(value)
106 .map_err(|e| anyhow::anyhow!("Failed to convert value to target type: {e}"))
107 }
108
109 pub async fn scan_keys(
115 con: &mut ConnectionManager,
116 pattern: String,
117 ) -> anyhow::Result<Vec<String>> {
118 let mut result = Vec::new();
119 let mut cursor = 0u64;
120
121 loop {
122 let scan_result: (u64, Vec<String>) = redis::cmd("SCAN")
123 .arg(cursor)
124 .arg("MATCH")
125 .arg(&pattern)
126 .arg("COUNT")
127 .arg(5000)
128 .query_async(con)
129 .await?;
130
131 let (new_cursor, keys) = scan_result;
132 result.extend(keys);
133
134 if new_cursor == 0 {
136 break;
137 }
138
139 cursor = new_cursor;
140 }
141
142 Ok(result)
143 }
144
145 pub async fn read_bulk(
151 con: &ConnectionManager,
152 keys: &[String],
153 ) -> anyhow::Result<Vec<Option<Bytes>>> {
154 if keys.is_empty() {
155 return Ok(vec![]);
156 }
157
158 let mut con = con.clone();
159
160 let results: Vec<Option<Vec<u8>>> =
162 redis::cmd("MGET").arg(keys).query_async(&mut con).await?;
163
164 let bytes_results: Vec<Option<Bytes>> = results
166 .into_iter()
167 .map(|opt| opt.map(Bytes::from))
168 .collect();
169
170 Ok(bytes_results)
171 }
172
173 pub async fn read(
179 con: &ConnectionManager,
180 trader_key: &str,
181 key: &str,
182 ) -> anyhow::Result<Vec<Bytes>> {
183 let collection = Self::get_collection_key(key)?;
184 let full_key = format!("{trader_key}{REDIS_DELIMITER}{key}");
185
186 let mut con = con.clone();
187
188 match collection {
189 INDEX => Self::read_index(&mut con, &full_key).await,
190 GENERAL => Self::read_string(&mut con, &full_key).await,
191 CURRENCIES => Self::read_string(&mut con, &full_key).await,
192 INSTRUMENTS => Self::read_string(&mut con, &full_key).await,
193 SYNTHETICS => Self::read_string(&mut con, &full_key).await,
194 ACCOUNTS => Self::read_list(&mut con, &full_key).await,
195 ORDERS => Self::read_list(&mut con, &full_key).await,
196 POSITIONS => Self::read_list(&mut con, &full_key).await,
197 ACTORS => Self::read_string(&mut con, &full_key).await,
198 STRATEGIES => Self::read_string(&mut con, &full_key).await,
199 _ => anyhow::bail!("Unsupported operation: `read` for collection '{collection}'"),
200 }
201 }
202
203 pub async fn load_all(
209 con: &ConnectionManager,
210 encoding: SerializationEncoding,
211 trader_key: &str,
212 ) -> anyhow::Result<CacheMap> {
213 let (currencies, instruments, synthetics, accounts, orders, positions) = tokio::try_join!(
214 Self::load_currencies(con, trader_key, encoding),
215 Self::load_instruments(con, trader_key, encoding),
216 Self::load_synthetics(con, trader_key, encoding),
217 Self::load_accounts(con, trader_key, encoding),
218 Self::load_orders(con, trader_key, encoding),
219 Self::load_positions(con, trader_key, encoding)
220 )
221 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
222
223 let greeks = AHashMap::new();
226 let yield_curves = AHashMap::new();
227
228 Ok(CacheMap {
229 currencies,
230 instruments,
231 synthetics,
232 accounts,
233 orders,
234 positions,
235 greeks,
236 yield_curves,
237 })
238 }
239
240 pub async fn load_currencies(
246 con: &ConnectionManager,
247 trader_key: &str,
248 encoding: SerializationEncoding,
249 ) -> anyhow::Result<AHashMap<Ustr, Currency>> {
250 let mut currencies = AHashMap::new();
251 let pattern = format!("{trader_key}{REDIS_DELIMITER}{CURRENCIES}*");
252 tracing::debug!("Loading {pattern}");
253
254 let mut con = con.clone();
255 let keys = Self::scan_keys(&mut con, pattern).await?;
256
257 if keys.is_empty() {
258 return Ok(currencies);
259 }
260
261 let bulk_values = Self::read_bulk(&con, &keys).await?;
263
264 for (key, value_opt) in keys.iter().zip(bulk_values.iter()) {
266 let currency_code = if let Some(code) = key.as_str().rsplit(':').next() {
267 Ustr::from(code)
268 } else {
269 log::error!("Invalid key format: {key}");
270 continue;
271 };
272
273 if let Some(value_bytes) = value_opt {
274 match Self::deserialize_payload(encoding, value_bytes) {
275 Ok(currency) => {
276 currencies.insert(currency_code, currency);
277 }
278 Err(e) => {
279 log::error!("Failed to deserialize currency {currency_code}: {e}");
280 }
281 }
282 } else {
283 log::error!("Currency not found in Redis: {currency_code}");
284 }
285 }
286
287 tracing::debug!("Loaded {} currencies(s)", currencies.len());
288
289 Ok(currencies)
290 }
291
292 pub async fn load_instruments(
303 con: &ConnectionManager,
304 trader_key: &str,
305 encoding: SerializationEncoding,
306 ) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
307 let mut instruments = AHashMap::new();
308 let pattern = format!("{trader_key}{REDIS_DELIMITER}{INSTRUMENTS}*");
309 tracing::debug!("Loading {pattern}");
310
311 let mut con = con.clone();
312 let keys = Self::scan_keys(&mut con, pattern).await?;
313
314 let futures: Vec<_> = keys
315 .iter()
316 .map(|key| {
317 let con = con.clone();
318 async move {
319 let instrument_id = key
320 .as_str()
321 .rsplit(':')
322 .next()
323 .ok_or_else(|| {
324 log::error!("Invalid key format: {key}");
325 "Invalid key format"
326 })
327 .and_then(|code| {
328 InstrumentId::from_str(code).map_err(|e| {
329 log::error!("Failed to convert to InstrumentId for {key}: {e}");
330 "Invalid instrument ID"
331 })
332 });
333
334 let instrument_id = match instrument_id {
335 Ok(id) => id,
336 Err(_) => return None,
337 };
338
339 match Self::load_instrument(&con, trader_key, &instrument_id, encoding).await {
340 Ok(Some(instrument)) => Some((instrument_id, instrument)),
341 Ok(None) => {
342 log::error!("Instrument not found: {instrument_id}");
343 None
344 }
345 Err(e) => {
346 log::error!("Failed to load instrument {instrument_id}: {e}");
347 None
348 }
349 }
350 }
351 })
352 .collect();
353
354 instruments.extend(join_all(futures).await.into_iter().flatten());
356 tracing::debug!("Loaded {} instruments(s)", instruments.len());
357
358 Ok(instruments)
359 }
360
361 pub async fn load_synthetics(
372 con: &ConnectionManager,
373 trader_key: &str,
374 encoding: SerializationEncoding,
375 ) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
376 let mut synthetics = AHashMap::new();
377 let pattern = format!("{trader_key}{REDIS_DELIMITER}{SYNTHETICS}*");
378 tracing::debug!("Loading {pattern}");
379
380 let mut con = con.clone();
381 let keys = Self::scan_keys(&mut con, pattern).await?;
382
383 let futures: Vec<_> = keys
384 .iter()
385 .map(|key| {
386 let con = con.clone();
387 async move {
388 let instrument_id = key
389 .as_str()
390 .rsplit(':')
391 .next()
392 .ok_or_else(|| {
393 log::error!("Invalid key format: {key}");
394 "Invalid key format"
395 })
396 .and_then(|code| {
397 InstrumentId::from_str(code).map_err(|e| {
398 log::error!("Failed to parse InstrumentId for {key}: {e}");
399 "Invalid instrument ID"
400 })
401 });
402
403 let instrument_id = match instrument_id {
404 Ok(id) => id,
405 Err(_) => return None,
406 };
407
408 match Self::load_synthetic(&con, trader_key, &instrument_id, encoding).await {
409 Ok(Some(synthetic)) => Some((instrument_id, synthetic)),
410 Ok(None) => {
411 log::error!("Synthetic not found: {instrument_id}");
412 None
413 }
414 Err(e) => {
415 log::error!("Failed to load synthetic {instrument_id}: {e}");
416 None
417 }
418 }
419 }
420 })
421 .collect();
422
423 synthetics.extend(join_all(futures).await.into_iter().flatten());
425 tracing::debug!("Loaded {} synthetics(s)", synthetics.len());
426
427 Ok(synthetics)
428 }
429
430 pub async fn load_accounts(
441 con: &ConnectionManager,
442 trader_key: &str,
443 encoding: SerializationEncoding,
444 ) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
445 let mut accounts = AHashMap::new();
446 let pattern = format!("{trader_key}{REDIS_DELIMITER}{ACCOUNTS}*");
447 tracing::debug!("Loading {pattern}");
448
449 let mut con = con.clone();
450 let keys = Self::scan_keys(&mut con, pattern).await?;
451
452 let futures: Vec<_> = keys
453 .iter()
454 .map(|key| {
455 let con = con.clone();
456 async move {
457 let account_id = if let Some(code) = key.as_str().rsplit(':').next() {
458 AccountId::from(code)
459 } else {
460 log::error!("Invalid key format: {key}");
461 return None;
462 };
463
464 match Self::load_account(&con, trader_key, &account_id, encoding).await {
465 Ok(Some(account)) => Some((account_id, account)),
466 Ok(None) => {
467 log::error!("Account not found: {account_id}");
468 None
469 }
470 Err(e) => {
471 log::error!("Failed to load account {account_id}: {e}");
472 None
473 }
474 }
475 }
476 })
477 .collect();
478
479 accounts.extend(join_all(futures).await.into_iter().flatten());
481 tracing::debug!("Loaded {} accounts(s)", accounts.len());
482
483 Ok(accounts)
484 }
485
486 pub async fn load_orders(
497 con: &ConnectionManager,
498 trader_key: &str,
499 encoding: SerializationEncoding,
500 ) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
501 let mut orders = AHashMap::new();
502 let pattern = format!("{trader_key}{REDIS_DELIMITER}{ORDERS}*");
503 tracing::debug!("Loading {pattern}");
504
505 let mut con = con.clone();
506 let keys = Self::scan_keys(&mut con, pattern).await?;
507
508 let futures: Vec<_> = keys
509 .iter()
510 .map(|key| {
511 let con = con.clone();
512 async move {
513 let client_order_id = if let Some(code) = key.as_str().rsplit(':').next() {
514 ClientOrderId::from(code)
515 } else {
516 log::error!("Invalid key format: {key}");
517 return None;
518 };
519
520 match Self::load_order(&con, trader_key, &client_order_id, encoding).await {
521 Ok(Some(order)) => Some((client_order_id, order)),
522 Ok(None) => {
523 log::error!("Order not found: {client_order_id}");
524 None
525 }
526 Err(e) => {
527 log::error!("Failed to load order {client_order_id}: {e}");
528 None
529 }
530 }
531 }
532 })
533 .collect();
534
535 orders.extend(join_all(futures).await.into_iter().flatten());
537 tracing::debug!("Loaded {} order(s)", orders.len());
538
539 Ok(orders)
540 }
541
542 pub async fn load_positions(
553 con: &ConnectionManager,
554 trader_key: &str,
555 encoding: SerializationEncoding,
556 ) -> anyhow::Result<AHashMap<PositionId, Position>> {
557 let mut positions = AHashMap::new();
558 let pattern = format!("{trader_key}{REDIS_DELIMITER}{POSITIONS}*");
559 tracing::debug!("Loading {pattern}");
560
561 let mut con = con.clone();
562 let keys = Self::scan_keys(&mut con, pattern).await?;
563
564 let futures: Vec<_> = keys
565 .iter()
566 .map(|key| {
567 let con = con.clone();
568 async move {
569 let position_id = if let Some(code) = key.as_str().rsplit(':').next() {
570 PositionId::from(code)
571 } else {
572 log::error!("Invalid key format: {key}");
573 return None;
574 };
575
576 match Self::load_position(&con, trader_key, &position_id, encoding).await {
577 Ok(Some(position)) => Some((position_id, position)),
578 Ok(None) => {
579 log::error!("Position not found: {position_id}");
580 None
581 }
582 Err(e) => {
583 log::error!("Failed to load position {position_id}: {e}");
584 None
585 }
586 }
587 }
588 })
589 .collect();
590
591 positions.extend(join_all(futures).await.into_iter().flatten());
593 tracing::debug!("Loaded {} position(s)", positions.len());
594
595 Ok(positions)
596 }
597
598 pub async fn load_currency(
604 con: &ConnectionManager,
605 trader_key: &str,
606 code: &Ustr,
607 encoding: SerializationEncoding,
608 ) -> anyhow::Result<Option<Currency>> {
609 let key = format!("{CURRENCIES}{REDIS_DELIMITER}{code}");
610 let result = Self::read(con, trader_key, &key).await?;
611
612 if result.is_empty() {
613 return Ok(None);
614 }
615
616 let currency = Self::deserialize_payload(encoding, &result[0])?;
617 Ok(currency)
618 }
619
620 pub async fn load_instrument(
626 con: &ConnectionManager,
627 trader_key: &str,
628 instrument_id: &InstrumentId,
629 encoding: SerializationEncoding,
630 ) -> anyhow::Result<Option<InstrumentAny>> {
631 let key = format!("{INSTRUMENTS}{REDIS_DELIMITER}{instrument_id}");
632 let result = Self::read(con, trader_key, &key).await?;
633 if result.is_empty() {
634 return Ok(None);
635 }
636
637 let instrument: InstrumentAny = Self::deserialize_payload(encoding, &result[0])?;
638 Ok(Some(instrument))
639 }
640
641 pub async fn load_synthetic(
647 con: &ConnectionManager,
648 trader_key: &str,
649 instrument_id: &InstrumentId,
650 encoding: SerializationEncoding,
651 ) -> anyhow::Result<Option<SyntheticInstrument>> {
652 let key = format!("{SYNTHETICS}{REDIS_DELIMITER}{instrument_id}");
653 let result = Self::read(con, trader_key, &key).await?;
654 if result.is_empty() {
655 return Ok(None);
656 }
657
658 let synthetic: SyntheticInstrument = Self::deserialize_payload(encoding, &result[0])?;
659 Ok(Some(synthetic))
660 }
661
662 pub async fn load_account(
668 con: &ConnectionManager,
669 trader_key: &str,
670 account_id: &AccountId,
671 encoding: SerializationEncoding,
672 ) -> anyhow::Result<Option<AccountAny>> {
673 let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
674 let result = Self::read(con, trader_key, &key).await?;
675 if result.is_empty() {
676 return Ok(None);
677 }
678
679 let account: AccountAny = Self::deserialize_payload(encoding, &result[0])?;
680 Ok(Some(account))
681 }
682
683 pub async fn load_order(
689 con: &ConnectionManager,
690 trader_key: &str,
691 client_order_id: &ClientOrderId,
692 encoding: SerializationEncoding,
693 ) -> anyhow::Result<Option<OrderAny>> {
694 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
695 let result = Self::read(con, trader_key, &key).await?;
696 if result.is_empty() {
697 return Ok(None);
698 }
699
700 let order: OrderAny = Self::deserialize_payload(encoding, &result[0])?;
701 Ok(Some(order))
702 }
703
704 pub async fn load_position(
710 con: &ConnectionManager,
711 trader_key: &str,
712 position_id: &PositionId,
713 encoding: SerializationEncoding,
714 ) -> anyhow::Result<Option<Position>> {
715 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
716 let result = Self::read(con, trader_key, &key).await?;
717 if result.is_empty() {
718 return Ok(None);
719 }
720
721 let position: Position = Self::deserialize_payload(encoding, &result[0])?;
722 Ok(Some(position))
723 }
724
725 fn get_collection_key(key: &str) -> anyhow::Result<&str> {
726 key.split_once(REDIS_DELIMITER)
727 .map(|(collection, _)| collection)
728 .ok_or_else(|| {
729 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
730 })
731 }
732
733 async fn read_index(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
734 let index_key = get_index_key(key)?;
735 match index_key {
736 INDEX_ORDER_IDS => Self::read_set(conn, key).await,
737 INDEX_ORDER_POSITION => Self::read_hset(conn, key).await,
738 INDEX_ORDER_CLIENT => Self::read_hset(conn, key).await,
739 INDEX_ORDERS => Self::read_set(conn, key).await,
740 INDEX_ORDERS_OPEN => Self::read_set(conn, key).await,
741 INDEX_ORDERS_CLOSED => Self::read_set(conn, key).await,
742 INDEX_ORDERS_EMULATED => Self::read_set(conn, key).await,
743 INDEX_ORDERS_INFLIGHT => Self::read_set(conn, key).await,
744 INDEX_POSITIONS => Self::read_set(conn, key).await,
745 INDEX_POSITIONS_OPEN => Self::read_set(conn, key).await,
746 INDEX_POSITIONS_CLOSED => Self::read_set(conn, key).await,
747 _ => anyhow::bail!("Index unknown '{index_key}' on read"),
748 }
749 }
750
751 async fn read_string(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
752 let result: Vec<u8> = conn.get(key).await?;
753
754 if result.is_empty() {
755 Ok(vec![])
756 } else {
757 Ok(vec![Bytes::from(result)])
758 }
759 }
760
761 async fn read_set(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
762 let result: Vec<Bytes> = conn.smembers(key).await?;
763 Ok(result)
764 }
765
766 async fn read_hset(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
767 let result: HashMap<String, String> = conn.hgetall(key).await?;
768 let json = serde_json::to_string(&result)?;
769 Ok(vec![Bytes::from(json.into_bytes())])
770 }
771
772 async fn read_list(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
773 let result: Vec<Bytes> = conn.lrange(key, 0, -1).await?;
774 Ok(result)
775 }
776}
777
778fn is_timestamp_field(key: &str) -> bool {
779 let expire_match = key == "expire_time_ns";
780 let ts_match = key.starts_with("ts_");
781 expire_match || ts_match
782}
783
784fn convert_timestamps(value: &mut Value) {
785 match value {
786 Value::Object(map) => {
787 for (key, v) in map {
788 if is_timestamp_field(key)
789 && let Value::Number(n) = v
790 && let Some(n) = n.as_u64()
791 {
792 let dt = DateTime::<Utc>::from_timestamp_nanos(n as i64);
793 *v = Value::String(dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true));
794 }
795 convert_timestamps(v);
796 }
797 }
798 Value::Array(arr) => {
799 for item in arr {
800 convert_timestamps(item);
801 }
802 }
803 _ => {}
804 }
805}
806
807fn convert_timestamp_strings(value: &mut Value) {
808 match value {
809 Value::Object(map) => {
810 for (key, v) in map {
811 if is_timestamp_field(key)
812 && let Value::String(s) = v
813 && let Ok(dt) = DateTime::parse_from_rfc3339(s)
814 {
815 *v = Value::Number(
816 (dt.with_timezone(&Utc)
817 .timestamp_nanos_opt()
818 .expect("Invalid DateTime") as u64)
819 .into(),
820 );
821 }
822 convert_timestamp_strings(v);
823 }
824 }
825 Value::Array(arr) => {
826 for item in arr {
827 convert_timestamp_strings(item);
828 }
829 }
830 _ => {}
831 }
832}