1use std::{collections::HashMap, str::FromStr};
17
18use bytes::Bytes;
19use chrono::{DateTime, Utc};
20use futures::{StreamExt, future::join_all};
21use nautilus_common::{cache::database::CacheMap, enums::SerializationEncoding};
22use nautilus_model::{
23 accounts::AccountAny,
24 identifiers::{AccountId, ClientOrderId, InstrumentId, PositionId},
25 instruments::{InstrumentAny, SyntheticInstrument},
26 orders::OrderAny,
27 position::Position,
28 types::Currency,
29};
30use redis::{AsyncCommands, aio::ConnectionManager};
31use serde::{Serialize, de::DeserializeOwned};
32use serde_json::Value;
33use tokio::try_join;
34use ustr::Ustr;
35
36const INDEX: &str = "index";
38const GENERAL: &str = "general";
39const CURRENCIES: &str = "currencies";
40const INSTRUMENTS: &str = "instruments";
41const SYNTHETICS: &str = "synthetics";
42const ACCOUNTS: &str = "accounts";
43const ORDERS: &str = "orders";
44const POSITIONS: &str = "positions";
45const ACTORS: &str = "actors";
46const STRATEGIES: &str = "strategies";
47const REDIS_DELIMITER: char = ':';
48
49const INDEX_ORDER_IDS: &str = "index:order_ids";
51const INDEX_ORDER_POSITION: &str = "index:order_position";
52const INDEX_ORDER_CLIENT: &str = "index:order_client";
53const INDEX_ORDERS: &str = "index:orders";
54const INDEX_ORDERS_OPEN: &str = "index:orders_open";
55const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
56const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
57const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
58const INDEX_POSITIONS: &str = "index:positions";
59const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
60const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
61
62pub struct DatabaseQueries;
63
64impl DatabaseQueries {
65 pub fn serialize_payload<T: Serialize>(
66 encoding: SerializationEncoding,
67 payload: &T,
68 ) -> anyhow::Result<Vec<u8>> {
69 let mut value = serde_json::to_value(payload)?;
70 convert_timestamps(&mut value);
71 match encoding {
72 SerializationEncoding::MsgPack => rmp_serde::to_vec(&value)
73 .map_err(|e| anyhow::anyhow!("Failed to serialize msgpack `payload`: {e}")),
74 SerializationEncoding::Json => serde_json::to_vec(&value)
75 .map_err(|e| anyhow::anyhow!("Failed to serialize json `payload`: {e}")),
76 }
77 }
78
79 pub fn deserialize_payload<T: DeserializeOwned>(
80 encoding: SerializationEncoding,
81 payload: &[u8],
82 ) -> anyhow::Result<T> {
83 let mut value = match encoding {
84 SerializationEncoding::MsgPack => rmp_serde::from_slice(payload)
85 .map_err(|e| anyhow::anyhow!("Failed to deserialize msgpack `payload`: {e}"))?,
86 SerializationEncoding::Json => serde_json::from_slice(payload)
87 .map_err(|e| anyhow::anyhow!("Failed to deserialize json `payload`: {e}"))?,
88 };
89
90 convert_timestamp_strings(&mut value);
91
92 serde_json::from_value(value)
93 .map_err(|e| anyhow::anyhow!("Failed to convert value to target type: {e}"))
94 }
95
96 pub async fn scan_keys(
97 con: &mut ConnectionManager,
98 pattern: String,
99 ) -> anyhow::Result<Vec<String>> {
100 Ok(con
101 .scan_match::<String, String>(pattern)
102 .await?
103 .collect()
104 .await)
105 }
106
107 pub async fn read(
108 con: &ConnectionManager,
109 trader_key: &str,
110 key: &str,
111 ) -> anyhow::Result<Vec<Bytes>> {
112 let collection = Self::get_collection_key(key)?;
113 let key = format!("{trader_key}{REDIS_DELIMITER}{key}");
114 let mut con = con.clone();
115
116 match collection {
117 INDEX => Self::read_index(&mut con, &key).await,
118 GENERAL => Self::read_string(&mut con, &key).await,
119 CURRENCIES => Self::read_string(&mut con, &key).await,
120 INSTRUMENTS => Self::read_string(&mut con, &key).await,
121 SYNTHETICS => Self::read_string(&mut con, &key).await,
122 ACCOUNTS => Self::read_list(&mut con, &key).await,
123 ORDERS => Self::read_list(&mut con, &key).await,
124 POSITIONS => Self::read_list(&mut con, &key).await,
125 ACTORS => Self::read_string(&mut con, &key).await,
126 STRATEGIES => Self::read_string(&mut con, &key).await,
127 _ => anyhow::bail!("Unsupported operation: `read` for collection '{collection}'"),
128 }
129 }
130
131 pub async fn load_all(
132 con: &ConnectionManager,
133 encoding: SerializationEncoding,
134 trader_key: &str,
135 ) -> anyhow::Result<CacheMap> {
136 let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
137 Self::load_currencies(con, trader_key, encoding),
138 Self::load_instruments(con, trader_key, encoding),
139 Self::load_synthetics(con, trader_key, encoding),
140 Self::load_accounts(con, trader_key, encoding),
141 Self::load_orders(con, trader_key, encoding),
142 Self::load_positions(con, trader_key, encoding)
143 )
144 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
145
146 Ok(CacheMap {
147 currencies,
148 instruments,
149 synthetics,
150 accounts,
151 orders,
152 positions,
153 })
154 }
155
156 pub async fn load_currencies(
157 con: &ConnectionManager,
158 trader_key: &str,
159 encoding: SerializationEncoding,
160 ) -> anyhow::Result<HashMap<Ustr, Currency>> {
161 let mut currencies = HashMap::new();
162 let pattern = format!("{trader_key}{REDIS_DELIMITER}{CURRENCIES}*");
163 tracing::debug!("Loading {pattern}");
164
165 let mut con = con.clone();
166 let keys = Self::scan_keys(&mut con, pattern).await?;
167
168 let futures: Vec<_> = keys
169 .iter()
170 .map(|key| {
171 let con = con.clone();
172 async move {
173 let currency_code = match key.as_str().rsplit(':').next() {
174 Some(code) => Ustr::from(code),
175 None => {
176 log::error!("Invalid key format: {key}");
177 return None;
178 }
179 };
180
181 match Self::load_currency(&con, trader_key, ¤cy_code, encoding).await {
182 Ok(Some(currency)) => Some((currency_code, currency)),
183 Ok(None) => {
184 log::error!("Currency not found: {currency_code}");
185 None
186 }
187 Err(e) => {
188 log::error!("Failed to load currency {currency_code}: {e}");
189 None
190 }
191 }
192 }
193 })
194 .collect();
195
196 currencies.extend(join_all(futures).await.into_iter().flatten());
198 tracing::debug!("Loaded {} currencies(s)", currencies.len());
199
200 Ok(currencies)
201 }
202
203 pub async fn load_instruments(
204 con: &ConnectionManager,
205 trader_key: &str,
206 encoding: SerializationEncoding,
207 ) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
208 let mut instruments = HashMap::new();
209 let pattern = format!("{trader_key}{REDIS_DELIMITER}{INSTRUMENTS}*");
210 tracing::debug!("Loading {pattern}");
211
212 let mut con = con.clone();
213 let keys = Self::scan_keys(&mut con, pattern).await?;
214
215 let futures: Vec<_> = keys
216 .iter()
217 .map(|key| {
218 let con = con.clone();
219 async move {
220 let instrument_id = key
221 .as_str()
222 .rsplit(':')
223 .next()
224 .ok_or_else(|| {
225 log::error!("Invalid key format: {key}");
226 "Invalid key format"
227 })
228 .and_then(|code| {
229 InstrumentId::from_str(code).map_err(|e| {
230 log::error!("Failed to convert to InstrumentId for {key}: {e}");
231 "Invalid instrument ID"
232 })
233 });
234
235 let instrument_id = match instrument_id {
236 Ok(id) => id,
237 Err(_) => return None,
238 };
239
240 match Self::load_instrument(&con, trader_key, &instrument_id, encoding).await {
241 Ok(Some(instrument)) => Some((instrument_id, instrument)),
242 Ok(None) => {
243 log::error!("Instrument not found: {instrument_id}");
244 None
245 }
246 Err(e) => {
247 log::error!("Failed to load instrument {instrument_id}: {e}");
248 None
249 }
250 }
251 }
252 })
253 .collect();
254
255 instruments.extend(join_all(futures).await.into_iter().flatten());
257 tracing::debug!("Loaded {} instruments(s)", instruments.len());
258
259 Ok(instruments)
260 }
261
262 pub async fn load_synthetics(
263 con: &ConnectionManager,
264 trader_key: &str,
265 encoding: SerializationEncoding,
266 ) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
267 let mut synthetics = HashMap::new();
268 let pattern = format!("{trader_key}{REDIS_DELIMITER}{SYNTHETICS}*");
269 tracing::debug!("Loading {pattern}");
270
271 let mut con = con.clone();
272 let keys = Self::scan_keys(&mut con, pattern).await?;
273
274 let futures: Vec<_> = keys
275 .iter()
276 .map(|key| {
277 let con = con.clone();
278 async move {
279 let instrument_id = key
280 .as_str()
281 .rsplit(':')
282 .next()
283 .ok_or_else(|| {
284 log::error!("Invalid key format: {key}");
285 "Invalid key format"
286 })
287 .and_then(|code| {
288 InstrumentId::from_str(code).map_err(|e| {
289 log::error!("Failed to parse InstrumentId for {key}: {e}");
290 "Invalid instrument ID"
291 })
292 });
293
294 let instrument_id = match instrument_id {
295 Ok(id) => id,
296 Err(_) => return None,
297 };
298
299 match Self::load_synthetic(&con, trader_key, &instrument_id, encoding).await {
300 Ok(Some(synthetic)) => Some((instrument_id, synthetic)),
301 Ok(None) => {
302 log::error!("Synthetic not found: {instrument_id}");
303 None
304 }
305 Err(e) => {
306 log::error!("Failed to load synthetic {instrument_id}: {e}");
307 None
308 }
309 }
310 }
311 })
312 .collect();
313
314 synthetics.extend(join_all(futures).await.into_iter().flatten());
316 tracing::debug!("Loaded {} synthetics(s)", synthetics.len());
317
318 Ok(synthetics)
319 }
320
321 pub async fn load_accounts(
322 con: &ConnectionManager,
323 trader_key: &str,
324 encoding: SerializationEncoding,
325 ) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
326 let mut accounts = HashMap::new();
327 let pattern = format!("{trader_key}{REDIS_DELIMITER}{ACCOUNTS}*");
328 tracing::debug!("Loading {pattern}");
329
330 let mut con = con.clone();
331 let keys = Self::scan_keys(&mut con, pattern).await?;
332
333 let futures: Vec<_> = keys
334 .iter()
335 .map(|key| {
336 let con = con.clone();
337 async move {
338 let account_id = match key.as_str().rsplit(':').next() {
339 Some(code) => AccountId::from(code),
340 None => {
341 log::error!("Invalid key format: {key}");
342 return None;
343 }
344 };
345
346 match Self::load_account(&con, trader_key, &account_id, encoding).await {
347 Ok(Some(account)) => Some((account_id, account)),
348 Ok(None) => {
349 log::error!("Account not found: {account_id}");
350 None
351 }
352 Err(e) => {
353 log::error!("Failed to load account {account_id}: {e}");
354 None
355 }
356 }
357 }
358 })
359 .collect();
360
361 accounts.extend(join_all(futures).await.into_iter().flatten());
363 tracing::debug!("Loaded {} accounts(s)", accounts.len());
364
365 Ok(accounts)
366 }
367
368 pub async fn load_orders(
369 con: &ConnectionManager,
370 trader_key: &str,
371 encoding: SerializationEncoding,
372 ) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
373 let mut orders = HashMap::new();
374 let pattern = format!("{trader_key}{REDIS_DELIMITER}{ORDERS}*");
375 tracing::debug!("Loading {pattern}");
376
377 let mut con = con.clone();
378 let keys = Self::scan_keys(&mut con, pattern).await?;
379
380 let futures: Vec<_> = keys
381 .iter()
382 .map(|key| {
383 let con = con.clone();
384 async move {
385 let client_order_id = match key.as_str().rsplit(':').next() {
386 Some(code) => ClientOrderId::from(code),
387 None => {
388 log::error!("Invalid key format: {key}");
389 return None;
390 }
391 };
392
393 match Self::load_order(&con, trader_key, &client_order_id, encoding).await {
394 Ok(Some(order)) => Some((client_order_id, order)),
395 Ok(None) => {
396 log::error!("Order not found: {client_order_id}");
397 None
398 }
399 Err(e) => {
400 log::error!("Failed to load order {client_order_id}: {e}");
401 None
402 }
403 }
404 }
405 })
406 .collect();
407
408 orders.extend(join_all(futures).await.into_iter().flatten());
410 tracing::debug!("Loaded {} order(s)", orders.len());
411
412 Ok(orders)
413 }
414
415 pub async fn load_positions(
416 con: &ConnectionManager,
417 trader_key: &str,
418 encoding: SerializationEncoding,
419 ) -> anyhow::Result<HashMap<PositionId, Position>> {
420 let mut positions = HashMap::new();
421 let pattern = format!("{trader_key}{REDIS_DELIMITER}{POSITIONS}*");
422 tracing::debug!("Loading {pattern}");
423
424 let mut con = con.clone();
425 let keys = Self::scan_keys(&mut con, pattern).await?;
426
427 let futures: Vec<_> = keys
428 .iter()
429 .map(|key| {
430 let con = con.clone();
431 async move {
432 let position_id = match key.as_str().rsplit(':').next() {
433 Some(code) => PositionId::from(code),
434 None => {
435 log::error!("Invalid key format: {key}");
436 return None;
437 }
438 };
439
440 match Self::load_position(&con, trader_key, &position_id, encoding).await {
441 Ok(Some(position)) => Some((position_id, position)),
442 Ok(None) => {
443 log::error!("Position not found: {position_id}");
444 None
445 }
446 Err(e) => {
447 log::error!("Failed to load position {position_id}: {e}");
448 None
449 }
450 }
451 }
452 })
453 .collect();
454
455 positions.extend(join_all(futures).await.into_iter().flatten());
457 tracing::debug!("Loaded {} position(s)", positions.len());
458
459 Ok(positions)
460 }
461
462 pub async fn load_currency(
463 con: &ConnectionManager,
464 trader_key: &str,
465 code: &Ustr,
466 encoding: SerializationEncoding,
467 ) -> anyhow::Result<Option<Currency>> {
468 let key = format!("{CURRENCIES}{REDIS_DELIMITER}{code}");
469 let result = Self::read(con, trader_key, &key).await?;
470
471 if result.is_empty() {
472 return Ok(None);
473 }
474
475 let currency = Self::deserialize_payload(encoding, &result[0])?;
476 Ok(currency)
477 }
478
479 pub async fn load_instrument(
480 con: &ConnectionManager,
481 trader_key: &str,
482 instrument_id: &InstrumentId,
483 encoding: SerializationEncoding,
484 ) -> anyhow::Result<Option<InstrumentAny>> {
485 let key = format!("{INSTRUMENTS}{REDIS_DELIMITER}{instrument_id}");
486 let result = Self::read(con, trader_key, &key).await?;
487 if result.is_empty() {
488 return Ok(None);
489 }
490
491 let instrument: InstrumentAny = Self::deserialize_payload(encoding, &result[0])?;
492 Ok(Some(instrument))
493 }
494
495 pub async fn load_synthetic(
496 con: &ConnectionManager,
497 trader_key: &str,
498 instrument_id: &InstrumentId,
499 encoding: SerializationEncoding,
500 ) -> anyhow::Result<Option<SyntheticInstrument>> {
501 let key = format!("{SYNTHETICS}{REDIS_DELIMITER}{instrument_id}");
502 let result = Self::read(con, trader_key, &key).await?;
503 if result.is_empty() {
504 return Ok(None);
505 }
506
507 let synthetic: SyntheticInstrument = Self::deserialize_payload(encoding, &result[0])?;
508 Ok(Some(synthetic))
509 }
510
511 pub async fn load_account(
512 con: &ConnectionManager,
513 trader_key: &str,
514 account_id: &AccountId,
515 encoding: SerializationEncoding,
516 ) -> anyhow::Result<Option<AccountAny>> {
517 let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
518 let result = Self::read(con, trader_key, &key).await?;
519 if result.is_empty() {
520 return Ok(None);
521 }
522
523 let account: AccountAny = Self::deserialize_payload(encoding, &result[0])?;
524 Ok(Some(account))
525 }
526
527 pub async fn load_order(
528 con: &ConnectionManager,
529 trader_key: &str,
530 client_order_id: &ClientOrderId,
531 encoding: SerializationEncoding,
532 ) -> anyhow::Result<Option<OrderAny>> {
533 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
534 let result = Self::read(con, trader_key, &key).await?;
535 if result.is_empty() {
536 return Ok(None);
537 }
538
539 let order: OrderAny = Self::deserialize_payload(encoding, &result[0])?;
540 Ok(Some(order))
541 }
542
543 pub async fn load_position(
544 con: &ConnectionManager,
545 trader_key: &str,
546 position_id: &PositionId,
547 encoding: SerializationEncoding,
548 ) -> anyhow::Result<Option<Position>> {
549 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
550 let result = Self::read(con, trader_key, &key).await?;
551 if result.is_empty() {
552 return Ok(None);
553 }
554
555 let position: Position = Self::deserialize_payload(encoding, &result[0])?;
556 Ok(Some(position))
557 }
558
559 fn get_collection_key(key: &str) -> anyhow::Result<&str> {
560 key.split_once(REDIS_DELIMITER)
561 .map(|(collection, _)| collection)
562 .ok_or_else(|| {
563 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
564 })
565 }
566
567 async fn read_index(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
568 let index_key = Self::get_index_key(key)?;
569 match index_key {
570 INDEX_ORDER_IDS => Self::read_set(conn, key).await,
571 INDEX_ORDER_POSITION => Self::read_hset(conn, key).await,
572 INDEX_ORDER_CLIENT => Self::read_hset(conn, key).await,
573 INDEX_ORDERS => Self::read_set(conn, key).await,
574 INDEX_ORDERS_OPEN => Self::read_set(conn, key).await,
575 INDEX_ORDERS_CLOSED => Self::read_set(conn, key).await,
576 INDEX_ORDERS_EMULATED => Self::read_set(conn, key).await,
577 INDEX_ORDERS_INFLIGHT => Self::read_set(conn, key).await,
578 INDEX_POSITIONS => Self::read_set(conn, key).await,
579 INDEX_POSITIONS_OPEN => Self::read_set(conn, key).await,
580 INDEX_POSITIONS_CLOSED => Self::read_set(conn, key).await,
581 _ => anyhow::bail!("Index unknown '{index_key}' on read"),
582 }
583 }
584
585 async fn read_string(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
586 let result: Vec<u8> = conn.get(key).await?;
587
588 if result.is_empty() {
589 Ok(vec![])
590 } else {
591 Ok(vec![Bytes::from(result)])
592 }
593 }
594
595 async fn read_set(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
596 let result: Vec<Bytes> = conn.smembers(key).await?;
597 Ok(result)
598 }
599
600 async fn read_hset(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
601 let result: HashMap<String, String> = conn.hgetall(key).await?;
602 let json = serde_json::to_string(&result)?;
603 Ok(vec![Bytes::from(json.into_bytes())])
604 }
605
606 async fn read_list(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
607 let result: Vec<Bytes> = conn.lrange(key, 0, -1).await?;
608 Ok(result)
609 }
610
611 fn get_index_key(key: &str) -> anyhow::Result<&str> {
612 key.split_once(REDIS_DELIMITER)
613 .map(|(_, index_key)| index_key)
614 .ok_or_else(|| {
615 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
616 })
617 }
618}
619
620fn is_timestamp_field(key: &str) -> bool {
621 let expire_match = key == "expire_time_ns";
622 let ts_match = key.starts_with("ts_");
623 expire_match || ts_match
624}
625
626fn convert_timestamps(value: &mut Value) {
627 match value {
628 Value::Object(map) => {
629 for (key, v) in map {
630 if is_timestamp_field(key) {
631 if let Value::Number(n) = v {
632 if let Some(n) = n.as_u64() {
633 let dt = DateTime::<Utc>::from_timestamp_nanos(n as i64);
634 *v = Value::String(
635 dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true),
636 );
637 }
638 }
639 }
640 convert_timestamps(v);
641 }
642 }
643 Value::Array(arr) => {
644 for item in arr {
645 convert_timestamps(item);
646 }
647 }
648 _ => {}
649 }
650}
651
652fn convert_timestamp_strings(value: &mut Value) {
653 match value {
654 Value::Object(map) => {
655 for (key, v) in map {
656 if is_timestamp_field(key) {
657 if let Value::String(s) = v {
658 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
659 *v = Value::Number(
660 (dt.with_timezone(&Utc)
661 .timestamp_nanos_opt()
662 .expect("Invalid DateTime")
663 as u64)
664 .into(),
665 );
666 }
667 }
668 }
669 convert_timestamp_strings(v);
670 }
671 }
672 Value::Array(arr) => {
673 for item in arr {
674 convert_timestamp_strings(item);
675 }
676 }
677 _ => {}
678 }
679}