nautilus_infrastructure/redis/
queries.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, str::FromStr};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use chrono::{DateTime, Utc};
21use futures::future::join_all;
22use nautilus_common::{cache::database::CacheMap, enums::SerializationEncoding};
23use nautilus_model::{
24    accounts::AccountAny,
25    identifiers::{AccountId, ClientOrderId, InstrumentId, PositionId},
26    instruments::{InstrumentAny, SyntheticInstrument},
27    orders::OrderAny,
28    position::Position,
29    types::Currency,
30};
31use redis::{AsyncCommands, aio::ConnectionManager};
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use ustr::Ustr;
35
36use super::get_index_key;
37
38// Collection keys
39const INDEX: &str = "index";
40const GENERAL: &str = "general";
41const CURRENCIES: &str = "currencies";
42const INSTRUMENTS: &str = "instruments";
43const SYNTHETICS: &str = "synthetics";
44const ACCOUNTS: &str = "accounts";
45const ORDERS: &str = "orders";
46const POSITIONS: &str = "positions";
47const ACTORS: &str = "actors";
48const STRATEGIES: &str = "strategies";
49const REDIS_DELIMITER: char = ':';
50
51// Index keys
52const INDEX_ORDER_IDS: &str = "index:order_ids";
53const INDEX_ORDER_POSITION: &str = "index:order_position";
54const INDEX_ORDER_CLIENT: &str = "index:order_client";
55const INDEX_ORDERS: &str = "index:orders";
56const INDEX_ORDERS_OPEN: &str = "index:orders_open";
57const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
58const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
59const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
60const INDEX_POSITIONS: &str = "index:positions";
61const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
62const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
63
64#[derive(Debug)]
65pub struct DatabaseQueries;
66
67impl DatabaseQueries {
68    /// Serializes the given `payload` using the specified `encoding` to a byte vector.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if serialization to the chosen encoding fails.
73    pub fn serialize_payload<T: Serialize>(
74        encoding: SerializationEncoding,
75        payload: &T,
76    ) -> anyhow::Result<Vec<u8>> {
77        let mut value = serde_json::to_value(payload)?;
78        convert_timestamps(&mut value);
79        match encoding {
80            SerializationEncoding::MsgPack => rmp_serde::to_vec(&value)
81                .map_err(|e| anyhow::anyhow!("Failed to serialize msgpack `payload`: {e}")),
82            SerializationEncoding::Json => serde_json::to_vec(&value)
83                .map_err(|e| anyhow::anyhow!("Failed to serialize json `payload`: {e}")),
84        }
85    }
86
87    /// Deserializes the given byte slice `payload` into type `T` using the specified `encoding`.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if deserialization from the chosen encoding fails or converting to the target type fails.
92    pub fn deserialize_payload<T: DeserializeOwned>(
93        encoding: SerializationEncoding,
94        payload: &[u8],
95    ) -> anyhow::Result<T> {
96        let mut value = match encoding {
97            SerializationEncoding::MsgPack => rmp_serde::from_slice(payload)
98                .map_err(|e| anyhow::anyhow!("Failed to deserialize msgpack `payload`: {e}"))?,
99            SerializationEncoding::Json => serde_json::from_slice(payload)
100                .map_err(|e| anyhow::anyhow!("Failed to deserialize json `payload`: {e}"))?,
101        };
102
103        convert_timestamp_strings(&mut value);
104
105        serde_json::from_value(value)
106            .map_err(|e| anyhow::anyhow!("Failed to convert value to target type: {e}"))
107    }
108
109    /// Scans Redis for keys matching the given `pattern`.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if the Redis scan operation fails.
114    pub async fn scan_keys(
115        con: &mut ConnectionManager,
116        pattern: String,
117    ) -> anyhow::Result<Vec<String>> {
118        let mut result = Vec::new();
119        let mut cursor = 0u64;
120
121        loop {
122            let scan_result: (u64, Vec<String>) = redis::cmd("SCAN")
123                .arg(cursor)
124                .arg("MATCH")
125                .arg(&pattern)
126                .arg("COUNT")
127                .arg(5000)
128                .query_async(con)
129                .await?;
130
131            let (new_cursor, keys) = scan_result;
132            result.extend(keys);
133
134            // If cursor is 0, we've completed the full scan
135            if new_cursor == 0 {
136                break;
137            }
138
139            cursor = new_cursor;
140        }
141
142        Ok(result)
143    }
144
145    /// Bulk reads multiple keys from Redis using MGET for efficiency.
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if the underlying Redis MGET operation fails.
150    pub async fn read_bulk(
151        con: &ConnectionManager,
152        keys: &[String],
153    ) -> anyhow::Result<Vec<Option<Bytes>>> {
154        if keys.is_empty() {
155            return Ok(vec![]);
156        }
157
158        let mut con = con.clone();
159
160        // Use MGET to fetch all keys in a single network operation
161        let results: Vec<Option<Vec<u8>>> =
162            redis::cmd("MGET").arg(keys).query_async(&mut con).await?;
163
164        // Convert Vec<u8> to Bytes
165        let bytes_results: Vec<Option<Bytes>> = results
166            .into_iter()
167            .map(|opt| opt.map(Bytes::from))
168            .collect();
169
170        Ok(bytes_results)
171    }
172
173    /// Reads raw byte payloads for `key` under `trader_key` from Redis.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if the underlying Redis read operation fails or if the collection is unsupported.
178    pub async fn read(
179        con: &ConnectionManager,
180        trader_key: &str,
181        key: &str,
182    ) -> anyhow::Result<Vec<Bytes>> {
183        let collection = Self::get_collection_key(key)?;
184        let full_key = format!("{trader_key}{REDIS_DELIMITER}{key}");
185
186        let mut con = con.clone();
187
188        match collection {
189            INDEX => Self::read_index(&mut con, &full_key).await,
190            GENERAL => Self::read_string(&mut con, &full_key).await,
191            CURRENCIES => Self::read_string(&mut con, &full_key).await,
192            INSTRUMENTS => Self::read_string(&mut con, &full_key).await,
193            SYNTHETICS => Self::read_string(&mut con, &full_key).await,
194            ACCOUNTS => Self::read_list(&mut con, &full_key).await,
195            ORDERS => Self::read_list(&mut con, &full_key).await,
196            POSITIONS => Self::read_list(&mut con, &full_key).await,
197            ACTORS => Self::read_string(&mut con, &full_key).await,
198            STRATEGIES => Self::read_string(&mut con, &full_key).await,
199            _ => anyhow::bail!("Unsupported operation: `read` for collection '{collection}'"),
200        }
201    }
202
203    /// Loads all cache data (currencies, instruments, synthetics, accounts, orders, positions) for `trader_key`.
204    ///
205    /// # Errors
206    ///
207    /// Returns an error if loading any of the individual caches fails or combining data fails.
208    pub async fn load_all(
209        con: &ConnectionManager,
210        encoding: SerializationEncoding,
211        trader_key: &str,
212    ) -> anyhow::Result<CacheMap> {
213        let (currencies, instruments, synthetics, accounts, orders, positions) = tokio::try_join!(
214            Self::load_currencies(con, trader_key, encoding),
215            Self::load_instruments(con, trader_key, encoding),
216            Self::load_synthetics(con, trader_key, encoding),
217            Self::load_accounts(con, trader_key, encoding),
218            Self::load_orders(con, trader_key, encoding),
219            Self::load_positions(con, trader_key, encoding)
220        )
221        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
222
223        // For now, we don't load greeks and yield curves from the database
224        // This will be implemented in the future
225        let greeks = AHashMap::new();
226        let yield_curves = AHashMap::new();
227
228        Ok(CacheMap {
229            currencies,
230            instruments,
231            synthetics,
232            accounts,
233            orders,
234            positions,
235            greeks,
236            yield_curves,
237        })
238    }
239
240    /// Loads all currencies for `trader_key` using the specified `encoding`.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if scanning keys or reading currency data fails.
245    pub async fn load_currencies(
246        con: &ConnectionManager,
247        trader_key: &str,
248        encoding: SerializationEncoding,
249    ) -> anyhow::Result<AHashMap<Ustr, Currency>> {
250        let mut currencies = AHashMap::new();
251        let pattern = format!("{trader_key}{REDIS_DELIMITER}{CURRENCIES}*");
252        tracing::debug!("Loading {pattern}");
253
254        let mut con = con.clone();
255        let keys = Self::scan_keys(&mut con, pattern).await?;
256
257        if keys.is_empty() {
258            return Ok(currencies);
259        }
260
261        // Use bulk loading with MGET for efficiency
262        let bulk_values = Self::read_bulk(&con, &keys).await?;
263
264        // Process the bulk results
265        for (key, value_opt) in keys.iter().zip(bulk_values.iter()) {
266            let currency_code = if let Some(code) = key.as_str().rsplit(':').next() {
267                Ustr::from(code)
268            } else {
269                log::error!("Invalid key format: {key}");
270                continue;
271            };
272
273            if let Some(value_bytes) = value_opt {
274                match Self::deserialize_payload(encoding, value_bytes) {
275                    Ok(currency) => {
276                        currencies.insert(currency_code, currency);
277                    }
278                    Err(e) => {
279                        log::error!("Failed to deserialize currency {currency_code}: {e}");
280                    }
281                }
282            } else {
283                log::error!("Currency not found in Redis: {currency_code}");
284            }
285        }
286
287        tracing::debug!("Loaded {} currencies(s)", currencies.len());
288
289        Ok(currencies)
290    }
291
292    /// Loads all instruments for `trader_key` using the specified `encoding`.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if scanning keys or reading instrument data fails.
297    /// Loads all instruments for `trader_key` using the specified `encoding`.
298    ///
299    /// # Errors
300    ///
301    /// Returns an error if scanning keys or reading instrument data fails.
302    pub async fn load_instruments(
303        con: &ConnectionManager,
304        trader_key: &str,
305        encoding: SerializationEncoding,
306    ) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
307        let mut instruments = AHashMap::new();
308        let pattern = format!("{trader_key}{REDIS_DELIMITER}{INSTRUMENTS}*");
309        tracing::debug!("Loading {pattern}");
310
311        let mut con = con.clone();
312        let keys = Self::scan_keys(&mut con, pattern).await?;
313
314        let futures: Vec<_> = keys
315            .iter()
316            .map(|key| {
317                let con = con.clone();
318                async move {
319                    let instrument_id = key
320                        .as_str()
321                        .rsplit(':')
322                        .next()
323                        .ok_or_else(|| {
324                            log::error!("Invalid key format: {key}");
325                            "Invalid key format"
326                        })
327                        .and_then(|code| {
328                            InstrumentId::from_str(code).map_err(|e| {
329                                log::error!("Failed to convert to InstrumentId for {key}: {e}");
330                                "Invalid instrument ID"
331                            })
332                        });
333
334                    let instrument_id = match instrument_id {
335                        Ok(id) => id,
336                        Err(_) => return None,
337                    };
338
339                    match Self::load_instrument(&con, trader_key, &instrument_id, encoding).await {
340                        Ok(Some(instrument)) => Some((instrument_id, instrument)),
341                        Ok(None) => {
342                            log::error!("Instrument not found: {instrument_id}");
343                            None
344                        }
345                        Err(e) => {
346                            log::error!("Failed to load instrument {instrument_id}: {e}");
347                            None
348                        }
349                    }
350                }
351            })
352            .collect();
353
354        // Insert all Instrument_id (key) and Instrument (value) into the HashMap, filtering out None values.
355        instruments.extend(join_all(futures).await.into_iter().flatten());
356        tracing::debug!("Loaded {} instruments(s)", instruments.len());
357
358        Ok(instruments)
359    }
360
361    /// Loads all synthetic instruments for `trader_key` using the specified `encoding`.
362    ///
363    /// # Errors
364    ///
365    /// Returns an error if scanning keys or reading synthetic instrument data fails.
366    /// Loads all synthetic instruments for `trader_key` using the specified `encoding`.
367    ///
368    /// # Errors
369    ///
370    /// Returns an error if scanning keys or reading synthetic instrument data fails.
371    pub async fn load_synthetics(
372        con: &ConnectionManager,
373        trader_key: &str,
374        encoding: SerializationEncoding,
375    ) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
376        let mut synthetics = AHashMap::new();
377        let pattern = format!("{trader_key}{REDIS_DELIMITER}{SYNTHETICS}*");
378        tracing::debug!("Loading {pattern}");
379
380        let mut con = con.clone();
381        let keys = Self::scan_keys(&mut con, pattern).await?;
382
383        let futures: Vec<_> = keys
384            .iter()
385            .map(|key| {
386                let con = con.clone();
387                async move {
388                    let instrument_id = key
389                        .as_str()
390                        .rsplit(':')
391                        .next()
392                        .ok_or_else(|| {
393                            log::error!("Invalid key format: {key}");
394                            "Invalid key format"
395                        })
396                        .and_then(|code| {
397                            InstrumentId::from_str(code).map_err(|e| {
398                                log::error!("Failed to parse InstrumentId for {key}: {e}");
399                                "Invalid instrument ID"
400                            })
401                        });
402
403                    let instrument_id = match instrument_id {
404                        Ok(id) => id,
405                        Err(_) => return None,
406                    };
407
408                    match Self::load_synthetic(&con, trader_key, &instrument_id, encoding).await {
409                        Ok(Some(synthetic)) => Some((instrument_id, synthetic)),
410                        Ok(None) => {
411                            log::error!("Synthetic not found: {instrument_id}");
412                            None
413                        }
414                        Err(e) => {
415                            log::error!("Failed to load synthetic {instrument_id}: {e}");
416                            None
417                        }
418                    }
419                }
420            })
421            .collect();
422
423        // Insert all Instrument_id (key) and Synthetic (value) into the HashMap, filtering out None values.
424        synthetics.extend(join_all(futures).await.into_iter().flatten());
425        tracing::debug!("Loaded {} synthetics(s)", synthetics.len());
426
427        Ok(synthetics)
428    }
429
430    /// Loads all accounts for `trader_key` using the specified `encoding`.
431    ///
432    /// # Errors
433    ///
434    /// Returns an error if scanning keys or reading account data fails.
435    /// Loads all accounts for `trader_key` using the specified `encoding`.
436    ///
437    /// # Errors
438    ///
439    /// Returns an error if scanning keys or reading account data fails.
440    pub async fn load_accounts(
441        con: &ConnectionManager,
442        trader_key: &str,
443        encoding: SerializationEncoding,
444    ) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
445        let mut accounts = AHashMap::new();
446        let pattern = format!("{trader_key}{REDIS_DELIMITER}{ACCOUNTS}*");
447        tracing::debug!("Loading {pattern}");
448
449        let mut con = con.clone();
450        let keys = Self::scan_keys(&mut con, pattern).await?;
451
452        let futures: Vec<_> = keys
453            .iter()
454            .map(|key| {
455                let con = con.clone();
456                async move {
457                    let account_id = if let Some(code) = key.as_str().rsplit(':').next() {
458                        AccountId::from(code)
459                    } else {
460                        log::error!("Invalid key format: {key}");
461                        return None;
462                    };
463
464                    match Self::load_account(&con, trader_key, &account_id, encoding).await {
465                        Ok(Some(account)) => Some((account_id, account)),
466                        Ok(None) => {
467                            log::error!("Account not found: {account_id}");
468                            None
469                        }
470                        Err(e) => {
471                            log::error!("Failed to load account {account_id}: {e}");
472                            None
473                        }
474                    }
475                }
476            })
477            .collect();
478
479        // Insert all Account_id (key) and Account (value) into the HashMap, filtering out None values.
480        accounts.extend(join_all(futures).await.into_iter().flatten());
481        tracing::debug!("Loaded {} accounts(s)", accounts.len());
482
483        Ok(accounts)
484    }
485
486    /// Loads all orders for `trader_key` using the specified `encoding`.
487    ///
488    /// # Errors
489    ///
490    /// Returns an error if scanning keys or reading order data fails.
491    /// Loads all orders for `trader_key` using the specified `encoding`.
492    ///
493    /// # Errors
494    ///
495    /// Returns an error if scanning keys or reading order data fails.
496    pub async fn load_orders(
497        con: &ConnectionManager,
498        trader_key: &str,
499        encoding: SerializationEncoding,
500    ) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
501        let mut orders = AHashMap::new();
502        let pattern = format!("{trader_key}{REDIS_DELIMITER}{ORDERS}*");
503        tracing::debug!("Loading {pattern}");
504
505        let mut con = con.clone();
506        let keys = Self::scan_keys(&mut con, pattern).await?;
507
508        let futures: Vec<_> = keys
509            .iter()
510            .map(|key| {
511                let con = con.clone();
512                async move {
513                    let client_order_id = if let Some(code) = key.as_str().rsplit(':').next() {
514                        ClientOrderId::from(code)
515                    } else {
516                        log::error!("Invalid key format: {key}");
517                        return None;
518                    };
519
520                    match Self::load_order(&con, trader_key, &client_order_id, encoding).await {
521                        Ok(Some(order)) => Some((client_order_id, order)),
522                        Ok(None) => {
523                            log::error!("Order not found: {client_order_id}");
524                            None
525                        }
526                        Err(e) => {
527                            log::error!("Failed to load order {client_order_id}: {e}");
528                            None
529                        }
530                    }
531                }
532            })
533            .collect();
534
535        // Insert all Client-Order-Id (key) and Order (value) into the HashMap, filtering out None values.
536        orders.extend(join_all(futures).await.into_iter().flatten());
537        tracing::debug!("Loaded {} order(s)", orders.len());
538
539        Ok(orders)
540    }
541
542    /// Loads all positions for `trader_key` using the specified `encoding`.
543    ///
544    /// # Errors
545    ///
546    /// Returns an error if scanning keys or reading position data fails.
547    /// Loads all positions for `trader_key` using the specified `encoding`.
548    ///
549    /// # Errors
550    ///
551    /// Returns an error if scanning keys or reading position data fails.
552    pub async fn load_positions(
553        con: &ConnectionManager,
554        trader_key: &str,
555        encoding: SerializationEncoding,
556    ) -> anyhow::Result<AHashMap<PositionId, Position>> {
557        let mut positions = AHashMap::new();
558        let pattern = format!("{trader_key}{REDIS_DELIMITER}{POSITIONS}*");
559        tracing::debug!("Loading {pattern}");
560
561        let mut con = con.clone();
562        let keys = Self::scan_keys(&mut con, pattern).await?;
563
564        let futures: Vec<_> = keys
565            .iter()
566            .map(|key| {
567                let con = con.clone();
568                async move {
569                    let position_id = if let Some(code) = key.as_str().rsplit(':').next() {
570                        PositionId::from(code)
571                    } else {
572                        log::error!("Invalid key format: {key}");
573                        return None;
574                    };
575
576                    match Self::load_position(&con, trader_key, &position_id, encoding).await {
577                        Ok(Some(position)) => Some((position_id, position)),
578                        Ok(None) => {
579                            log::error!("Position not found: {position_id}");
580                            None
581                        }
582                        Err(e) => {
583                            log::error!("Failed to load position {position_id}: {e}");
584                            None
585                        }
586                    }
587                }
588            })
589            .collect();
590
591        // Insert all Position_id (key) and Position (value) into the HashMap, filtering out None values.
592        positions.extend(join_all(futures).await.into_iter().flatten());
593        tracing::debug!("Loaded {} position(s)", positions.len());
594
595        Ok(positions)
596    }
597
598    /// Loads a single currency for `trader_key` and `code` using the specified `encoding`.
599    ///
600    /// # Errors
601    ///
602    /// Returns an error if the underlying read or deserialization fails.
603    pub async fn load_currency(
604        con: &ConnectionManager,
605        trader_key: &str,
606        code: &Ustr,
607        encoding: SerializationEncoding,
608    ) -> anyhow::Result<Option<Currency>> {
609        let key = format!("{CURRENCIES}{REDIS_DELIMITER}{code}");
610        let result = Self::read(con, trader_key, &key).await?;
611
612        if result.is_empty() {
613            return Ok(None);
614        }
615
616        let currency = Self::deserialize_payload(encoding, &result[0])?;
617        Ok(currency)
618    }
619
620    /// Loads a single instrument for `trader_key` and `instrument_id` using the specified `encoding`.
621    ///
622    /// # Errors
623    ///
624    /// Returns an error if the underlying read or deserialization fails.
625    pub async fn load_instrument(
626        con: &ConnectionManager,
627        trader_key: &str,
628        instrument_id: &InstrumentId,
629        encoding: SerializationEncoding,
630    ) -> anyhow::Result<Option<InstrumentAny>> {
631        let key = format!("{INSTRUMENTS}{REDIS_DELIMITER}{instrument_id}");
632        let result = Self::read(con, trader_key, &key).await?;
633        if result.is_empty() {
634            return Ok(None);
635        }
636
637        let instrument: InstrumentAny = Self::deserialize_payload(encoding, &result[0])?;
638        Ok(Some(instrument))
639    }
640
641    /// Loads a single synthetic instrument for `trader_key` and `instrument_id` using the specified `encoding`.
642    ///
643    /// # Errors
644    ///
645    /// Returns an error if the underlying read or deserialization fails.
646    pub async fn load_synthetic(
647        con: &ConnectionManager,
648        trader_key: &str,
649        instrument_id: &InstrumentId,
650        encoding: SerializationEncoding,
651    ) -> anyhow::Result<Option<SyntheticInstrument>> {
652        let key = format!("{SYNTHETICS}{REDIS_DELIMITER}{instrument_id}");
653        let result = Self::read(con, trader_key, &key).await?;
654        if result.is_empty() {
655            return Ok(None);
656        }
657
658        let synthetic: SyntheticInstrument = Self::deserialize_payload(encoding, &result[0])?;
659        Ok(Some(synthetic))
660    }
661
662    /// Loads a single account for `trader_key` and `account_id` using the specified `encoding`.
663    ///
664    /// # Errors
665    ///
666    /// Returns an error if the underlying read or deserialization fails.
667    pub async fn load_account(
668        con: &ConnectionManager,
669        trader_key: &str,
670        account_id: &AccountId,
671        encoding: SerializationEncoding,
672    ) -> anyhow::Result<Option<AccountAny>> {
673        let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
674        let result = Self::read(con, trader_key, &key).await?;
675        if result.is_empty() {
676            return Ok(None);
677        }
678
679        let account: AccountAny = Self::deserialize_payload(encoding, &result[0])?;
680        Ok(Some(account))
681    }
682
683    /// Loads a single order for `trader_key` and `client_order_id` using the specified `encoding`.
684    ///
685    /// # Errors
686    ///
687    /// Returns an error if the underlying read or deserialization fails.
688    pub async fn load_order(
689        con: &ConnectionManager,
690        trader_key: &str,
691        client_order_id: &ClientOrderId,
692        encoding: SerializationEncoding,
693    ) -> anyhow::Result<Option<OrderAny>> {
694        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
695        let result = Self::read(con, trader_key, &key).await?;
696        if result.is_empty() {
697            return Ok(None);
698        }
699
700        let order: OrderAny = Self::deserialize_payload(encoding, &result[0])?;
701        Ok(Some(order))
702    }
703
704    /// Loads a single position for `trader_key` and `position_id` using the specified `encoding`.
705    ///
706    /// # Errors
707    ///
708    /// Returns an error if the underlying read or deserialization fails.
709    pub async fn load_position(
710        con: &ConnectionManager,
711        trader_key: &str,
712        position_id: &PositionId,
713        encoding: SerializationEncoding,
714    ) -> anyhow::Result<Option<Position>> {
715        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
716        let result = Self::read(con, trader_key, &key).await?;
717        if result.is_empty() {
718            return Ok(None);
719        }
720
721        let position: Position = Self::deserialize_payload(encoding, &result[0])?;
722        Ok(Some(position))
723    }
724
725    fn get_collection_key(key: &str) -> anyhow::Result<&str> {
726        key.split_once(REDIS_DELIMITER)
727            .map(|(collection, _)| collection)
728            .ok_or_else(|| {
729                anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
730            })
731    }
732
733    async fn read_index(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
734        let index_key = get_index_key(key)?;
735        match index_key {
736            INDEX_ORDER_IDS => Self::read_set(conn, key).await,
737            INDEX_ORDER_POSITION => Self::read_hset(conn, key).await,
738            INDEX_ORDER_CLIENT => Self::read_hset(conn, key).await,
739            INDEX_ORDERS => Self::read_set(conn, key).await,
740            INDEX_ORDERS_OPEN => Self::read_set(conn, key).await,
741            INDEX_ORDERS_CLOSED => Self::read_set(conn, key).await,
742            INDEX_ORDERS_EMULATED => Self::read_set(conn, key).await,
743            INDEX_ORDERS_INFLIGHT => Self::read_set(conn, key).await,
744            INDEX_POSITIONS => Self::read_set(conn, key).await,
745            INDEX_POSITIONS_OPEN => Self::read_set(conn, key).await,
746            INDEX_POSITIONS_CLOSED => Self::read_set(conn, key).await,
747            _ => anyhow::bail!("Index unknown '{index_key}' on read"),
748        }
749    }
750
751    async fn read_string(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
752        let result: Vec<u8> = conn.get(key).await?;
753
754        if result.is_empty() {
755            Ok(vec![])
756        } else {
757            Ok(vec![Bytes::from(result)])
758        }
759    }
760
761    async fn read_set(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
762        let result: Vec<Bytes> = conn.smembers(key).await?;
763        Ok(result)
764    }
765
766    async fn read_hset(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
767        let result: HashMap<String, String> = conn.hgetall(key).await?;
768        let json = serde_json::to_string(&result)?;
769        Ok(vec![Bytes::from(json.into_bytes())])
770    }
771
772    async fn read_list(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
773        let result: Vec<Bytes> = conn.lrange(key, 0, -1).await?;
774        Ok(result)
775    }
776}
777
778fn is_timestamp_field(key: &str) -> bool {
779    let expire_match = key == "expire_time_ns";
780    let ts_match = key.starts_with("ts_");
781    expire_match || ts_match
782}
783
784fn convert_timestamps(value: &mut Value) {
785    match value {
786        Value::Object(map) => {
787            for (key, v) in map {
788                if is_timestamp_field(key)
789                    && let Value::Number(n) = v
790                    && let Some(n) = n.as_u64()
791                {
792                    let dt = DateTime::<Utc>::from_timestamp_nanos(n as i64);
793                    *v = Value::String(dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true));
794                }
795                convert_timestamps(v);
796            }
797        }
798        Value::Array(arr) => {
799            for item in arr {
800                convert_timestamps(item);
801            }
802        }
803        _ => {}
804    }
805}
806
807fn convert_timestamp_strings(value: &mut Value) {
808    match value {
809        Value::Object(map) => {
810            for (key, v) in map {
811                if is_timestamp_field(key)
812                    && let Value::String(s) = v
813                    && let Ok(dt) = DateTime::parse_from_rfc3339(s)
814                {
815                    *v = Value::Number(
816                        (dt.with_timezone(&Utc)
817                            .timestamp_nanos_opt()
818                            .expect("Invalid DateTime") as u64)
819                            .into(),
820                    );
821                }
822                convert_timestamp_strings(v);
823            }
824        }
825        Value::Array(arr) => {
826            for item in arr {
827                convert_timestamp_strings(item);
828            }
829        }
830        _ => {}
831    }
832}