nautilus_infrastructure/redis/
queries.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, str::FromStr};
17
18use bytes::Bytes;
19use chrono::{DateTime, Utc};
20use futures::{StreamExt, future::join_all};
21use nautilus_common::{cache::database::CacheMap, enums::SerializationEncoding};
22use nautilus_model::{
23    accounts::AccountAny,
24    identifiers::{AccountId, ClientOrderId, InstrumentId, PositionId},
25    instruments::{InstrumentAny, SyntheticInstrument},
26    orders::OrderAny,
27    position::Position,
28    types::Currency,
29};
30use redis::{AsyncCommands, aio::ConnectionManager};
31use serde::{Serialize, de::DeserializeOwned};
32use serde_json::Value;
33use tokio::try_join;
34use ustr::Ustr;
35
36// Collection keys
37const INDEX: &str = "index";
38const GENERAL: &str = "general";
39const CURRENCIES: &str = "currencies";
40const INSTRUMENTS: &str = "instruments";
41const SYNTHETICS: &str = "synthetics";
42const ACCOUNTS: &str = "accounts";
43const ORDERS: &str = "orders";
44const POSITIONS: &str = "positions";
45const ACTORS: &str = "actors";
46const STRATEGIES: &str = "strategies";
47const REDIS_DELIMITER: char = ':';
48
49// Index keys
50const INDEX_ORDER_IDS: &str = "index:order_ids";
51const INDEX_ORDER_POSITION: &str = "index:order_position";
52const INDEX_ORDER_CLIENT: &str = "index:order_client";
53const INDEX_ORDERS: &str = "index:orders";
54const INDEX_ORDERS_OPEN: &str = "index:orders_open";
55const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
56const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
57const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
58const INDEX_POSITIONS: &str = "index:positions";
59const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
60const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
61
62pub struct DatabaseQueries;
63
64impl DatabaseQueries {
65    pub fn serialize_payload<T: Serialize>(
66        encoding: SerializationEncoding,
67        payload: &T,
68    ) -> anyhow::Result<Vec<u8>> {
69        let mut value = serde_json::to_value(payload)?;
70        convert_timestamps(&mut value);
71        match encoding {
72            SerializationEncoding::MsgPack => rmp_serde::to_vec(&value)
73                .map_err(|e| anyhow::anyhow!("Failed to serialize msgpack `payload`: {e}")),
74            SerializationEncoding::Json => serde_json::to_vec(&value)
75                .map_err(|e| anyhow::anyhow!("Failed to serialize json `payload`: {e}")),
76        }
77    }
78
79    pub fn deserialize_payload<T: DeserializeOwned>(
80        encoding: SerializationEncoding,
81        payload: &[u8],
82    ) -> anyhow::Result<T> {
83        let mut value = match encoding {
84            SerializationEncoding::MsgPack => rmp_serde::from_slice(payload)
85                .map_err(|e| anyhow::anyhow!("Failed to deserialize msgpack `payload`: {e}"))?,
86            SerializationEncoding::Json => serde_json::from_slice(payload)
87                .map_err(|e| anyhow::anyhow!("Failed to deserialize json `payload`: {e}"))?,
88        };
89
90        convert_timestamp_strings(&mut value);
91
92        serde_json::from_value(value)
93            .map_err(|e| anyhow::anyhow!("Failed to convert value to target type: {e}"))
94    }
95
96    pub async fn scan_keys(
97        con: &mut ConnectionManager,
98        pattern: String,
99    ) -> anyhow::Result<Vec<String>> {
100        Ok(con
101            .scan_match::<String, String>(pattern)
102            .await?
103            .collect()
104            .await)
105    }
106
107    pub async fn read(
108        con: &ConnectionManager,
109        trader_key: &str,
110        key: &str,
111    ) -> anyhow::Result<Vec<Bytes>> {
112        let collection = Self::get_collection_key(key)?;
113        let key = format!("{trader_key}{REDIS_DELIMITER}{key}");
114        let mut con = con.clone();
115
116        match collection {
117            INDEX => Self::read_index(&mut con, &key).await,
118            GENERAL => Self::read_string(&mut con, &key).await,
119            CURRENCIES => Self::read_string(&mut con, &key).await,
120            INSTRUMENTS => Self::read_string(&mut con, &key).await,
121            SYNTHETICS => Self::read_string(&mut con, &key).await,
122            ACCOUNTS => Self::read_list(&mut con, &key).await,
123            ORDERS => Self::read_list(&mut con, &key).await,
124            POSITIONS => Self::read_list(&mut con, &key).await,
125            ACTORS => Self::read_string(&mut con, &key).await,
126            STRATEGIES => Self::read_string(&mut con, &key).await,
127            _ => anyhow::bail!("Unsupported operation: `read` for collection '{collection}'"),
128        }
129    }
130
131    pub async fn load_all(
132        con: &ConnectionManager,
133        encoding: SerializationEncoding,
134        trader_key: &str,
135    ) -> anyhow::Result<CacheMap> {
136        let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
137            Self::load_currencies(con, trader_key, encoding),
138            Self::load_instruments(con, trader_key, encoding),
139            Self::load_synthetics(con, trader_key, encoding),
140            Self::load_accounts(con, trader_key, encoding),
141            Self::load_orders(con, trader_key, encoding),
142            Self::load_positions(con, trader_key, encoding)
143        )
144        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
145
146        Ok(CacheMap {
147            currencies,
148            instruments,
149            synthetics,
150            accounts,
151            orders,
152            positions,
153        })
154    }
155
156    pub async fn load_currencies(
157        con: &ConnectionManager,
158        trader_key: &str,
159        encoding: SerializationEncoding,
160    ) -> anyhow::Result<HashMap<Ustr, Currency>> {
161        let mut currencies = HashMap::new();
162        let pattern = format!("{trader_key}{REDIS_DELIMITER}{CURRENCIES}*");
163        tracing::debug!("Loading {pattern}");
164
165        let mut con = con.clone();
166        let keys = Self::scan_keys(&mut con, pattern).await?;
167
168        let futures: Vec<_> = keys
169            .iter()
170            .map(|key| {
171                let con = con.clone();
172                async move {
173                    let currency_code = match key.as_str().rsplit(':').next() {
174                        Some(code) => Ustr::from(code),
175                        None => {
176                            log::error!("Invalid key format: {key}");
177                            return None;
178                        }
179                    };
180
181                    match Self::load_currency(&con, trader_key, &currency_code, encoding).await {
182                        Ok(Some(currency)) => Some((currency_code, currency)),
183                        Ok(None) => {
184                            log::error!("Currency not found: {currency_code}");
185                            None
186                        }
187                        Err(e) => {
188                            log::error!("Failed to load currency {currency_code}: {e}");
189                            None
190                        }
191                    }
192                }
193            })
194            .collect();
195
196        // Insert all Currency_code (key) and Currency (value) into the HashMap, filtering out None values.
197        currencies.extend(join_all(futures).await.into_iter().flatten());
198        tracing::debug!("Loaded {} currencies(s)", currencies.len());
199
200        Ok(currencies)
201    }
202
203    pub async fn load_instruments(
204        con: &ConnectionManager,
205        trader_key: &str,
206        encoding: SerializationEncoding,
207    ) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
208        let mut instruments = HashMap::new();
209        let pattern = format!("{trader_key}{REDIS_DELIMITER}{INSTRUMENTS}*");
210        tracing::debug!("Loading {pattern}");
211
212        let mut con = con.clone();
213        let keys = Self::scan_keys(&mut con, pattern).await?;
214
215        let futures: Vec<_> = keys
216            .iter()
217            .map(|key| {
218                let con = con.clone();
219                async move {
220                    let instrument_id = key
221                        .as_str()
222                        .rsplit(':')
223                        .next()
224                        .ok_or_else(|| {
225                            log::error!("Invalid key format: {key}");
226                            "Invalid key format"
227                        })
228                        .and_then(|code| {
229                            InstrumentId::from_str(code).map_err(|e| {
230                                log::error!("Failed to convert to InstrumentId for {key}: {e}");
231                                "Invalid instrument ID"
232                            })
233                        });
234
235                    let instrument_id = match instrument_id {
236                        Ok(id) => id,
237                        Err(_) => return None,
238                    };
239
240                    match Self::load_instrument(&con, trader_key, &instrument_id, encoding).await {
241                        Ok(Some(instrument)) => Some((instrument_id, instrument)),
242                        Ok(None) => {
243                            log::error!("Instrument not found: {instrument_id}");
244                            None
245                        }
246                        Err(e) => {
247                            log::error!("Failed to load instrument {instrument_id}: {e}");
248                            None
249                        }
250                    }
251                }
252            })
253            .collect();
254
255        // Insert all Instrument_id (key) and Instrument (value) into the HashMap, filtering out None values.
256        instruments.extend(join_all(futures).await.into_iter().flatten());
257        tracing::debug!("Loaded {} instruments(s)", instruments.len());
258
259        Ok(instruments)
260    }
261
262    pub async fn load_synthetics(
263        con: &ConnectionManager,
264        trader_key: &str,
265        encoding: SerializationEncoding,
266    ) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
267        let mut synthetics = HashMap::new();
268        let pattern = format!("{trader_key}{REDIS_DELIMITER}{SYNTHETICS}*");
269        tracing::debug!("Loading {pattern}");
270
271        let mut con = con.clone();
272        let keys = Self::scan_keys(&mut con, pattern).await?;
273
274        let futures: Vec<_> = keys
275            .iter()
276            .map(|key| {
277                let con = con.clone();
278                async move {
279                    let instrument_id = key
280                        .as_str()
281                        .rsplit(':')
282                        .next()
283                        .ok_or_else(|| {
284                            log::error!("Invalid key format: {key}");
285                            "Invalid key format"
286                        })
287                        .and_then(|code| {
288                            InstrumentId::from_str(code).map_err(|e| {
289                                log::error!("Failed to parse InstrumentId for {key}: {e}");
290                                "Invalid instrument ID"
291                            })
292                        });
293
294                    let instrument_id = match instrument_id {
295                        Ok(id) => id,
296                        Err(_) => return None,
297                    };
298
299                    match Self::load_synthetic(&con, trader_key, &instrument_id, encoding).await {
300                        Ok(Some(synthetic)) => Some((instrument_id, synthetic)),
301                        Ok(None) => {
302                            log::error!("Synthetic not found: {instrument_id}");
303                            None
304                        }
305                        Err(e) => {
306                            log::error!("Failed to load synthetic {instrument_id}: {e}");
307                            None
308                        }
309                    }
310                }
311            })
312            .collect();
313
314        // Insert all Instrument_id (key) and Synthetic (value) into the HashMap, filtering out None values.
315        synthetics.extend(join_all(futures).await.into_iter().flatten());
316        tracing::debug!("Loaded {} synthetics(s)", synthetics.len());
317
318        Ok(synthetics)
319    }
320
321    pub async fn load_accounts(
322        con: &ConnectionManager,
323        trader_key: &str,
324        encoding: SerializationEncoding,
325    ) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
326        let mut accounts = HashMap::new();
327        let pattern = format!("{trader_key}{REDIS_DELIMITER}{ACCOUNTS}*");
328        tracing::debug!("Loading {pattern}");
329
330        let mut con = con.clone();
331        let keys = Self::scan_keys(&mut con, pattern).await?;
332
333        let futures: Vec<_> = keys
334            .iter()
335            .map(|key| {
336                let con = con.clone();
337                async move {
338                    let account_id = match key.as_str().rsplit(':').next() {
339                        Some(code) => AccountId::from(code),
340                        None => {
341                            log::error!("Invalid key format: {key}");
342                            return None;
343                        }
344                    };
345
346                    match Self::load_account(&con, trader_key, &account_id, encoding).await {
347                        Ok(Some(account)) => Some((account_id, account)),
348                        Ok(None) => {
349                            log::error!("Account not found: {account_id}");
350                            None
351                        }
352                        Err(e) => {
353                            log::error!("Failed to load account {account_id}: {e}");
354                            None
355                        }
356                    }
357                }
358            })
359            .collect();
360
361        // Insert all Account_id (key) and Account (value) into the HashMap, filtering out None values.
362        accounts.extend(join_all(futures).await.into_iter().flatten());
363        tracing::debug!("Loaded {} accounts(s)", accounts.len());
364
365        Ok(accounts)
366    }
367
368    pub async fn load_orders(
369        con: &ConnectionManager,
370        trader_key: &str,
371        encoding: SerializationEncoding,
372    ) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
373        let mut orders = HashMap::new();
374        let pattern = format!("{trader_key}{REDIS_DELIMITER}{ORDERS}*");
375        tracing::debug!("Loading {pattern}");
376
377        let mut con = con.clone();
378        let keys = Self::scan_keys(&mut con, pattern).await?;
379
380        let futures: Vec<_> = keys
381            .iter()
382            .map(|key| {
383                let con = con.clone();
384                async move {
385                    let client_order_id = match key.as_str().rsplit(':').next() {
386                        Some(code) => ClientOrderId::from(code),
387                        None => {
388                            log::error!("Invalid key format: {key}");
389                            return None;
390                        }
391                    };
392
393                    match Self::load_order(&con, trader_key, &client_order_id, encoding).await {
394                        Ok(Some(order)) => Some((client_order_id, order)),
395                        Ok(None) => {
396                            log::error!("Order not found: {client_order_id}");
397                            None
398                        }
399                        Err(e) => {
400                            log::error!("Failed to load order {client_order_id}: {e}");
401                            None
402                        }
403                    }
404                }
405            })
406            .collect();
407
408        // Insert all Client-Order-Id (key) and Order (value) into the HashMap, filtering out None values.
409        orders.extend(join_all(futures).await.into_iter().flatten());
410        tracing::debug!("Loaded {} order(s)", orders.len());
411
412        Ok(orders)
413    }
414
415    pub async fn load_positions(
416        con: &ConnectionManager,
417        trader_key: &str,
418        encoding: SerializationEncoding,
419    ) -> anyhow::Result<HashMap<PositionId, Position>> {
420        let mut positions = HashMap::new();
421        let pattern = format!("{trader_key}{REDIS_DELIMITER}{POSITIONS}*");
422        tracing::debug!("Loading {pattern}");
423
424        let mut con = con.clone();
425        let keys = Self::scan_keys(&mut con, pattern).await?;
426
427        let futures: Vec<_> = keys
428            .iter()
429            .map(|key| {
430                let con = con.clone();
431                async move {
432                    let position_id = match key.as_str().rsplit(':').next() {
433                        Some(code) => PositionId::from(code),
434                        None => {
435                            log::error!("Invalid key format: {key}");
436                            return None;
437                        }
438                    };
439
440                    match Self::load_position(&con, trader_key, &position_id, encoding).await {
441                        Ok(Some(position)) => Some((position_id, position)),
442                        Ok(None) => {
443                            log::error!("Position not found: {position_id}");
444                            None
445                        }
446                        Err(e) => {
447                            log::error!("Failed to load position {position_id}: {e}");
448                            None
449                        }
450                    }
451                }
452            })
453            .collect();
454
455        // Insert all Position_id (key) and Position (value) into the HashMap, filtering out None values.
456        positions.extend(join_all(futures).await.into_iter().flatten());
457        tracing::debug!("Loaded {} position(s)", positions.len());
458
459        Ok(positions)
460    }
461
462    pub async fn load_currency(
463        con: &ConnectionManager,
464        trader_key: &str,
465        code: &Ustr,
466        encoding: SerializationEncoding,
467    ) -> anyhow::Result<Option<Currency>> {
468        let key = format!("{CURRENCIES}{REDIS_DELIMITER}{code}");
469        let result = Self::read(con, trader_key, &key).await?;
470
471        if result.is_empty() {
472            return Ok(None);
473        }
474
475        let currency = Self::deserialize_payload(encoding, &result[0])?;
476        Ok(currency)
477    }
478
479    pub async fn load_instrument(
480        con: &ConnectionManager,
481        trader_key: &str,
482        instrument_id: &InstrumentId,
483        encoding: SerializationEncoding,
484    ) -> anyhow::Result<Option<InstrumentAny>> {
485        let key = format!("{INSTRUMENTS}{REDIS_DELIMITER}{instrument_id}");
486        let result = Self::read(con, trader_key, &key).await?;
487        if result.is_empty() {
488            return Ok(None);
489        }
490
491        let instrument: InstrumentAny = Self::deserialize_payload(encoding, &result[0])?;
492        Ok(Some(instrument))
493    }
494
495    pub async fn load_synthetic(
496        con: &ConnectionManager,
497        trader_key: &str,
498        instrument_id: &InstrumentId,
499        encoding: SerializationEncoding,
500    ) -> anyhow::Result<Option<SyntheticInstrument>> {
501        let key = format!("{SYNTHETICS}{REDIS_DELIMITER}{instrument_id}");
502        let result = Self::read(con, trader_key, &key).await?;
503        if result.is_empty() {
504            return Ok(None);
505        }
506
507        let synthetic: SyntheticInstrument = Self::deserialize_payload(encoding, &result[0])?;
508        Ok(Some(synthetic))
509    }
510
511    pub async fn load_account(
512        con: &ConnectionManager,
513        trader_key: &str,
514        account_id: &AccountId,
515        encoding: SerializationEncoding,
516    ) -> anyhow::Result<Option<AccountAny>> {
517        let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
518        let result = Self::read(con, trader_key, &key).await?;
519        if result.is_empty() {
520            return Ok(None);
521        }
522
523        let account: AccountAny = Self::deserialize_payload(encoding, &result[0])?;
524        Ok(Some(account))
525    }
526
527    pub async fn load_order(
528        con: &ConnectionManager,
529        trader_key: &str,
530        client_order_id: &ClientOrderId,
531        encoding: SerializationEncoding,
532    ) -> anyhow::Result<Option<OrderAny>> {
533        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
534        let result = Self::read(con, trader_key, &key).await?;
535        if result.is_empty() {
536            return Ok(None);
537        }
538
539        let order: OrderAny = Self::deserialize_payload(encoding, &result[0])?;
540        Ok(Some(order))
541    }
542
543    pub async fn load_position(
544        con: &ConnectionManager,
545        trader_key: &str,
546        position_id: &PositionId,
547        encoding: SerializationEncoding,
548    ) -> anyhow::Result<Option<Position>> {
549        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
550        let result = Self::read(con, trader_key, &key).await?;
551        if result.is_empty() {
552            return Ok(None);
553        }
554
555        let position: Position = Self::deserialize_payload(encoding, &result[0])?;
556        Ok(Some(position))
557    }
558
559    fn get_collection_key(key: &str) -> anyhow::Result<&str> {
560        key.split_once(REDIS_DELIMITER)
561            .map(|(collection, _)| collection)
562            .ok_or_else(|| {
563                anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
564            })
565    }
566
567    async fn read_index(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
568        let index_key = Self::get_index_key(key)?;
569        match index_key {
570            INDEX_ORDER_IDS => Self::read_set(conn, key).await,
571            INDEX_ORDER_POSITION => Self::read_hset(conn, key).await,
572            INDEX_ORDER_CLIENT => Self::read_hset(conn, key).await,
573            INDEX_ORDERS => Self::read_set(conn, key).await,
574            INDEX_ORDERS_OPEN => Self::read_set(conn, key).await,
575            INDEX_ORDERS_CLOSED => Self::read_set(conn, key).await,
576            INDEX_ORDERS_EMULATED => Self::read_set(conn, key).await,
577            INDEX_ORDERS_INFLIGHT => Self::read_set(conn, key).await,
578            INDEX_POSITIONS => Self::read_set(conn, key).await,
579            INDEX_POSITIONS_OPEN => Self::read_set(conn, key).await,
580            INDEX_POSITIONS_CLOSED => Self::read_set(conn, key).await,
581            _ => anyhow::bail!("Index unknown '{index_key}' on read"),
582        }
583    }
584
585    async fn read_string(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
586        let result: Vec<u8> = conn.get(key).await?;
587
588        if result.is_empty() {
589            Ok(vec![])
590        } else {
591            Ok(vec![Bytes::from(result)])
592        }
593    }
594
595    async fn read_set(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
596        let result: Vec<Bytes> = conn.smembers(key).await?;
597        Ok(result)
598    }
599
600    async fn read_hset(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
601        let result: HashMap<String, String> = conn.hgetall(key).await?;
602        let json = serde_json::to_string(&result)?;
603        Ok(vec![Bytes::from(json.into_bytes())])
604    }
605
606    async fn read_list(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
607        let result: Vec<Bytes> = conn.lrange(key, 0, -1).await?;
608        Ok(result)
609    }
610
611    fn get_index_key(key: &str) -> anyhow::Result<&str> {
612        key.split_once(REDIS_DELIMITER)
613            .map(|(_, index_key)| index_key)
614            .ok_or_else(|| {
615                anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
616            })
617    }
618}
619
620fn is_timestamp_field(key: &str) -> bool {
621    let expire_match = key == "expire_time_ns";
622    let ts_match = key.starts_with("ts_");
623    expire_match || ts_match
624}
625
626fn convert_timestamps(value: &mut Value) {
627    match value {
628        Value::Object(map) => {
629            for (key, v) in map {
630                if is_timestamp_field(key) {
631                    if let Value::Number(n) = v {
632                        if let Some(n) = n.as_u64() {
633                            let dt = DateTime::<Utc>::from_timestamp_nanos(n as i64);
634                            *v = Value::String(
635                                dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true),
636                            );
637                        }
638                    }
639                }
640                convert_timestamps(v);
641            }
642        }
643        Value::Array(arr) => {
644            for item in arr {
645                convert_timestamps(item);
646            }
647        }
648        _ => {}
649    }
650}
651
652fn convert_timestamp_strings(value: &mut Value) {
653    match value {
654        Value::Object(map) => {
655            for (key, v) in map {
656                if is_timestamp_field(key) {
657                    if let Value::String(s) = v {
658                        if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
659                            *v = Value::Number(
660                                (dt.with_timezone(&Utc)
661                                    .timestamp_nanos_opt()
662                                    .expect("Invalid DateTime")
663                                    as u64)
664                                    .into(),
665                            );
666                        }
667                    }
668                }
669                convert_timestamp_strings(v);
670            }
671        }
672        Value::Array(arr) => {
673            for item in arr {
674                convert_timestamp_strings(item);
675            }
676        }
677        _ => {}
678    }
679}