nautilus_infrastructure/redis/
queries.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, str::FromStr};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use chrono::{DateTime, Utc};
21use futures::future::join_all;
22use nautilus_common::{cache::database::CacheMap, enums::SerializationEncoding};
23use nautilus_model::{
24    accounts::AccountAny,
25    identifiers::{AccountId, ClientOrderId, InstrumentId, PositionId},
26    instruments::{InstrumentAny, SyntheticInstrument},
27    orders::OrderAny,
28    position::Position,
29    types::Currency,
30};
31use redis::{AsyncCommands, aio::ConnectionManager};
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use tokio::try_join;
35use ustr::Ustr;
36
37// Collection keys
38const INDEX: &str = "index";
39const GENERAL: &str = "general";
40const CURRENCIES: &str = "currencies";
41const INSTRUMENTS: &str = "instruments";
42const SYNTHETICS: &str = "synthetics";
43const ACCOUNTS: &str = "accounts";
44const ORDERS: &str = "orders";
45const POSITIONS: &str = "positions";
46const ACTORS: &str = "actors";
47const STRATEGIES: &str = "strategies";
48const REDIS_DELIMITER: char = ':';
49
50// Index keys
51const INDEX_ORDER_IDS: &str = "index:order_ids";
52const INDEX_ORDER_POSITION: &str = "index:order_position";
53const INDEX_ORDER_CLIENT: &str = "index:order_client";
54const INDEX_ORDERS: &str = "index:orders";
55const INDEX_ORDERS_OPEN: &str = "index:orders_open";
56const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
57const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
58const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
59const INDEX_POSITIONS: &str = "index:positions";
60const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
61const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
62
63#[derive(Debug)]
64pub struct DatabaseQueries;
65
66impl DatabaseQueries {
67    /// Serializes the given `payload` using the specified `encoding` to a byte vector.
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if serialization to the chosen encoding fails.
72    pub fn serialize_payload<T: Serialize>(
73        encoding: SerializationEncoding,
74        payload: &T,
75    ) -> anyhow::Result<Vec<u8>> {
76        let mut value = serde_json::to_value(payload)?;
77        convert_timestamps(&mut value);
78        match encoding {
79            SerializationEncoding::MsgPack => rmp_serde::to_vec(&value)
80                .map_err(|e| anyhow::anyhow!("Failed to serialize msgpack `payload`: {e}")),
81            SerializationEncoding::Json => serde_json::to_vec(&value)
82                .map_err(|e| anyhow::anyhow!("Failed to serialize json `payload`: {e}")),
83        }
84    }
85
86    /// Deserializes the given byte slice `payload` into type `T` using the specified `encoding`.
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if deserialization from the chosen encoding fails or converting to the target type fails.
91    pub fn deserialize_payload<T: DeserializeOwned>(
92        encoding: SerializationEncoding,
93        payload: &[u8],
94    ) -> anyhow::Result<T> {
95        let mut value = match encoding {
96            SerializationEncoding::MsgPack => rmp_serde::from_slice(payload)
97                .map_err(|e| anyhow::anyhow!("Failed to deserialize msgpack `payload`: {e}"))?,
98            SerializationEncoding::Json => serde_json::from_slice(payload)
99                .map_err(|e| anyhow::anyhow!("Failed to deserialize json `payload`: {e}"))?,
100        };
101
102        convert_timestamp_strings(&mut value);
103
104        serde_json::from_value(value)
105            .map_err(|e| anyhow::anyhow!("Failed to convert value to target type: {e}"))
106    }
107
108    /// Scans Redis for keys matching the given `pattern`.
109    ///
110    /// # Errors
111    ///
112    /// Returns an error if the Redis scan operation fails.
113    pub async fn scan_keys(
114        con: &mut ConnectionManager,
115        pattern: String,
116    ) -> anyhow::Result<Vec<String>> {
117        let mut result = Vec::new();
118        let mut cursor = 0u64;
119
120        loop {
121            let scan_result: (u64, Vec<String>) = redis::cmd("SCAN")
122                .arg(cursor)
123                .arg("MATCH")
124                .arg(&pattern)
125                .arg("COUNT")
126                .arg(5000)
127                .query_async(con)
128                .await?;
129
130            let (new_cursor, keys) = scan_result;
131            result.extend(keys);
132
133            // If cursor is 0, we've completed the full scan
134            if new_cursor == 0 {
135                break;
136            }
137
138            cursor = new_cursor;
139        }
140
141        Ok(result)
142    }
143
144    /// Bulk reads multiple keys from Redis using MGET for efficiency.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if the underlying Redis MGET operation fails.
149    pub async fn read_bulk(
150        con: &ConnectionManager,
151        keys: &[String],
152    ) -> anyhow::Result<Vec<Option<Bytes>>> {
153        if keys.is_empty() {
154            return Ok(vec![]);
155        }
156
157        let mut con = con.clone();
158
159        // Use MGET to fetch all keys in a single network operation
160        let results: Vec<Option<Vec<u8>>> =
161            redis::cmd("MGET").arg(keys).query_async(&mut con).await?;
162
163        // Convert Vec<u8> to Bytes
164        let bytes_results: Vec<Option<Bytes>> = results
165            .into_iter()
166            .map(|opt| opt.map(Bytes::from))
167            .collect();
168
169        Ok(bytes_results)
170    }
171
172    /// Reads raw byte payloads for `key` under `trader_key` from Redis.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if the underlying Redis read operation fails or if the collection is unsupported.
177    pub async fn read(
178        con: &ConnectionManager,
179        trader_key: &str,
180        key: &str,
181    ) -> anyhow::Result<Vec<Bytes>> {
182        let collection = Self::get_collection_key(key)?;
183        let full_key = format!("{trader_key}{REDIS_DELIMITER}{key}");
184
185        let mut con = con.clone();
186
187        match collection {
188            INDEX => Self::read_index(&mut con, &full_key).await,
189            GENERAL => Self::read_string(&mut con, &full_key).await,
190            CURRENCIES => Self::read_string(&mut con, &full_key).await,
191            INSTRUMENTS => Self::read_string(&mut con, &full_key).await,
192            SYNTHETICS => Self::read_string(&mut con, &full_key).await,
193            ACCOUNTS => Self::read_list(&mut con, &full_key).await,
194            ORDERS => Self::read_list(&mut con, &full_key).await,
195            POSITIONS => Self::read_list(&mut con, &full_key).await,
196            ACTORS => Self::read_string(&mut con, &full_key).await,
197            STRATEGIES => Self::read_string(&mut con, &full_key).await,
198            _ => anyhow::bail!("Unsupported operation: `read` for collection '{collection}'"),
199        }
200    }
201
202    /// Loads all cache data (currencies, instruments, synthetics, accounts, orders, positions) for `trader_key`.
203    ///
204    /// # Errors
205    ///
206    /// Returns an error if loading any of the individual caches fails or combining data fails.
207    pub async fn load_all(
208        con: &ConnectionManager,
209        encoding: SerializationEncoding,
210        trader_key: &str,
211    ) -> anyhow::Result<CacheMap> {
212        let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
213            Self::load_currencies(con, trader_key, encoding),
214            Self::load_instruments(con, trader_key, encoding),
215            Self::load_synthetics(con, trader_key, encoding),
216            Self::load_accounts(con, trader_key, encoding),
217            Self::load_orders(con, trader_key, encoding),
218            Self::load_positions(con, trader_key, encoding)
219        )
220        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
221
222        // For now, we don't load greeks and yield curves from the database
223        // This will be implemented in the future
224        let greeks = AHashMap::new();
225        let yield_curves = AHashMap::new();
226
227        Ok(CacheMap {
228            currencies,
229            instruments,
230            synthetics,
231            accounts,
232            orders,
233            positions,
234            greeks,
235            yield_curves,
236        })
237    }
238
239    /// Loads all currencies for `trader_key` using the specified `encoding`.
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if scanning keys or reading currency data fails.
244    pub async fn load_currencies(
245        con: &ConnectionManager,
246        trader_key: &str,
247        encoding: SerializationEncoding,
248    ) -> anyhow::Result<AHashMap<Ustr, Currency>> {
249        let mut currencies = AHashMap::new();
250        let pattern = format!("{trader_key}{REDIS_DELIMITER}{CURRENCIES}*");
251        tracing::debug!("Loading {pattern}");
252
253        let mut con = con.clone();
254        let keys = Self::scan_keys(&mut con, pattern).await?;
255
256        if keys.is_empty() {
257            return Ok(currencies);
258        }
259
260        // Use bulk loading with MGET for efficiency
261        let bulk_values = Self::read_bulk(&con, &keys).await?;
262
263        // Process the bulk results
264        for (key, value_opt) in keys.iter().zip(bulk_values.iter()) {
265            let currency_code = if let Some(code) = key.as_str().rsplit(':').next() {
266                Ustr::from(code)
267            } else {
268                log::error!("Invalid key format: {key}");
269                continue;
270            };
271
272            if let Some(value_bytes) = value_opt {
273                match Self::deserialize_payload(encoding, value_bytes) {
274                    Ok(currency) => {
275                        currencies.insert(currency_code, currency);
276                    }
277                    Err(e) => {
278                        log::error!("Failed to deserialize currency {currency_code}: {e}");
279                    }
280                }
281            } else {
282                log::error!("Currency not found in Redis: {currency_code}");
283            }
284        }
285
286        tracing::debug!("Loaded {} currencies(s)", currencies.len());
287
288        Ok(currencies)
289    }
290
291    /// Loads all instruments for `trader_key` using the specified `encoding`.
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if scanning keys or reading instrument data fails.
296    /// Loads all instruments for `trader_key` using the specified `encoding`.
297    ///
298    /// # Errors
299    ///
300    /// Returns an error if scanning keys or reading instrument data fails.
301    pub async fn load_instruments(
302        con: &ConnectionManager,
303        trader_key: &str,
304        encoding: SerializationEncoding,
305    ) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
306        let mut instruments = AHashMap::new();
307        let pattern = format!("{trader_key}{REDIS_DELIMITER}{INSTRUMENTS}*");
308        tracing::debug!("Loading {pattern}");
309
310        let mut con = con.clone();
311        let keys = Self::scan_keys(&mut con, pattern).await?;
312
313        let futures: Vec<_> = keys
314            .iter()
315            .map(|key| {
316                let con = con.clone();
317                async move {
318                    let instrument_id = key
319                        .as_str()
320                        .rsplit(':')
321                        .next()
322                        .ok_or_else(|| {
323                            log::error!("Invalid key format: {key}");
324                            "Invalid key format"
325                        })
326                        .and_then(|code| {
327                            InstrumentId::from_str(code).map_err(|e| {
328                                log::error!("Failed to convert to InstrumentId for {key}: {e}");
329                                "Invalid instrument ID"
330                            })
331                        });
332
333                    let instrument_id = match instrument_id {
334                        Ok(id) => id,
335                        Err(_) => return None,
336                    };
337
338                    match Self::load_instrument(&con, trader_key, &instrument_id, encoding).await {
339                        Ok(Some(instrument)) => Some((instrument_id, instrument)),
340                        Ok(None) => {
341                            log::error!("Instrument not found: {instrument_id}");
342                            None
343                        }
344                        Err(e) => {
345                            log::error!("Failed to load instrument {instrument_id}: {e}");
346                            None
347                        }
348                    }
349                }
350            })
351            .collect();
352
353        // Insert all Instrument_id (key) and Instrument (value) into the HashMap, filtering out None values.
354        instruments.extend(join_all(futures).await.into_iter().flatten());
355        tracing::debug!("Loaded {} instruments(s)", instruments.len());
356
357        Ok(instruments)
358    }
359
360    /// Loads all synthetic instruments for `trader_key` using the specified `encoding`.
361    ///
362    /// # Errors
363    ///
364    /// Returns an error if scanning keys or reading synthetic instrument data fails.
365    /// Loads all synthetic instruments for `trader_key` using the specified `encoding`.
366    ///
367    /// # Errors
368    ///
369    /// Returns an error if scanning keys or reading synthetic instrument data fails.
370    pub async fn load_synthetics(
371        con: &ConnectionManager,
372        trader_key: &str,
373        encoding: SerializationEncoding,
374    ) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
375        let mut synthetics = AHashMap::new();
376        let pattern = format!("{trader_key}{REDIS_DELIMITER}{SYNTHETICS}*");
377        tracing::debug!("Loading {pattern}");
378
379        let mut con = con.clone();
380        let keys = Self::scan_keys(&mut con, pattern).await?;
381
382        let futures: Vec<_> = keys
383            .iter()
384            .map(|key| {
385                let con = con.clone();
386                async move {
387                    let instrument_id = key
388                        .as_str()
389                        .rsplit(':')
390                        .next()
391                        .ok_or_else(|| {
392                            log::error!("Invalid key format: {key}");
393                            "Invalid key format"
394                        })
395                        .and_then(|code| {
396                            InstrumentId::from_str(code).map_err(|e| {
397                                log::error!("Failed to parse InstrumentId for {key}: {e}");
398                                "Invalid instrument ID"
399                            })
400                        });
401
402                    let instrument_id = match instrument_id {
403                        Ok(id) => id,
404                        Err(_) => return None,
405                    };
406
407                    match Self::load_synthetic(&con, trader_key, &instrument_id, encoding).await {
408                        Ok(Some(synthetic)) => Some((instrument_id, synthetic)),
409                        Ok(None) => {
410                            log::error!("Synthetic not found: {instrument_id}");
411                            None
412                        }
413                        Err(e) => {
414                            log::error!("Failed to load synthetic {instrument_id}: {e}");
415                            None
416                        }
417                    }
418                }
419            })
420            .collect();
421
422        // Insert all Instrument_id (key) and Synthetic (value) into the HashMap, filtering out None values.
423        synthetics.extend(join_all(futures).await.into_iter().flatten());
424        tracing::debug!("Loaded {} synthetics(s)", synthetics.len());
425
426        Ok(synthetics)
427    }
428
429    /// Loads all accounts for `trader_key` using the specified `encoding`.
430    ///
431    /// # Errors
432    ///
433    /// Returns an error if scanning keys or reading account data fails.
434    /// Loads all accounts for `trader_key` using the specified `encoding`.
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if scanning keys or reading account data fails.
439    pub async fn load_accounts(
440        con: &ConnectionManager,
441        trader_key: &str,
442        encoding: SerializationEncoding,
443    ) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
444        let mut accounts = AHashMap::new();
445        let pattern = format!("{trader_key}{REDIS_DELIMITER}{ACCOUNTS}*");
446        tracing::debug!("Loading {pattern}");
447
448        let mut con = con.clone();
449        let keys = Self::scan_keys(&mut con, pattern).await?;
450
451        let futures: Vec<_> = keys
452            .iter()
453            .map(|key| {
454                let con = con.clone();
455                async move {
456                    let account_id = if let Some(code) = key.as_str().rsplit(':').next() {
457                        AccountId::from(code)
458                    } else {
459                        log::error!("Invalid key format: {key}");
460                        return None;
461                    };
462
463                    match Self::load_account(&con, trader_key, &account_id, encoding).await {
464                        Ok(Some(account)) => Some((account_id, account)),
465                        Ok(None) => {
466                            log::error!("Account not found: {account_id}");
467                            None
468                        }
469                        Err(e) => {
470                            log::error!("Failed to load account {account_id}: {e}");
471                            None
472                        }
473                    }
474                }
475            })
476            .collect();
477
478        // Insert all Account_id (key) and Account (value) into the HashMap, filtering out None values.
479        accounts.extend(join_all(futures).await.into_iter().flatten());
480        tracing::debug!("Loaded {} accounts(s)", accounts.len());
481
482        Ok(accounts)
483    }
484
485    /// Loads all orders for `trader_key` using the specified `encoding`.
486    ///
487    /// # Errors
488    ///
489    /// Returns an error if scanning keys or reading order data fails.
490    /// Loads all orders for `trader_key` using the specified `encoding`.
491    ///
492    /// # Errors
493    ///
494    /// Returns an error if scanning keys or reading order data fails.
495    pub async fn load_orders(
496        con: &ConnectionManager,
497        trader_key: &str,
498        encoding: SerializationEncoding,
499    ) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
500        let mut orders = AHashMap::new();
501        let pattern = format!("{trader_key}{REDIS_DELIMITER}{ORDERS}*");
502        tracing::debug!("Loading {pattern}");
503
504        let mut con = con.clone();
505        let keys = Self::scan_keys(&mut con, pattern).await?;
506
507        let futures: Vec<_> = keys
508            .iter()
509            .map(|key| {
510                let con = con.clone();
511                async move {
512                    let client_order_id = if let Some(code) = key.as_str().rsplit(':').next() {
513                        ClientOrderId::from(code)
514                    } else {
515                        log::error!("Invalid key format: {key}");
516                        return None;
517                    };
518
519                    match Self::load_order(&con, trader_key, &client_order_id, encoding).await {
520                        Ok(Some(order)) => Some((client_order_id, order)),
521                        Ok(None) => {
522                            log::error!("Order not found: {client_order_id}");
523                            None
524                        }
525                        Err(e) => {
526                            log::error!("Failed to load order {client_order_id}: {e}");
527                            None
528                        }
529                    }
530                }
531            })
532            .collect();
533
534        // Insert all Client-Order-Id (key) and Order (value) into the HashMap, filtering out None values.
535        orders.extend(join_all(futures).await.into_iter().flatten());
536        tracing::debug!("Loaded {} order(s)", orders.len());
537
538        Ok(orders)
539    }
540
541    /// Loads all positions for `trader_key` using the specified `encoding`.
542    ///
543    /// # Errors
544    ///
545    /// Returns an error if scanning keys or reading position data fails.
546    /// Loads all positions for `trader_key` using the specified `encoding`.
547    ///
548    /// # Errors
549    ///
550    /// Returns an error if scanning keys or reading position data fails.
551    pub async fn load_positions(
552        con: &ConnectionManager,
553        trader_key: &str,
554        encoding: SerializationEncoding,
555    ) -> anyhow::Result<AHashMap<PositionId, Position>> {
556        let mut positions = AHashMap::new();
557        let pattern = format!("{trader_key}{REDIS_DELIMITER}{POSITIONS}*");
558        tracing::debug!("Loading {pattern}");
559
560        let mut con = con.clone();
561        let keys = Self::scan_keys(&mut con, pattern).await?;
562
563        let futures: Vec<_> = keys
564            .iter()
565            .map(|key| {
566                let con = con.clone();
567                async move {
568                    let position_id = if let Some(code) = key.as_str().rsplit(':').next() {
569                        PositionId::from(code)
570                    } else {
571                        log::error!("Invalid key format: {key}");
572                        return None;
573                    };
574
575                    match Self::load_position(&con, trader_key, &position_id, encoding).await {
576                        Ok(Some(position)) => Some((position_id, position)),
577                        Ok(None) => {
578                            log::error!("Position not found: {position_id}");
579                            None
580                        }
581                        Err(e) => {
582                            log::error!("Failed to load position {position_id}: {e}");
583                            None
584                        }
585                    }
586                }
587            })
588            .collect();
589
590        // Insert all Position_id (key) and Position (value) into the HashMap, filtering out None values.
591        positions.extend(join_all(futures).await.into_iter().flatten());
592        tracing::debug!("Loaded {} position(s)", positions.len());
593
594        Ok(positions)
595    }
596
597    /// Loads a single currency for `trader_key` and `code` using the specified `encoding`.
598    ///
599    /// # Errors
600    ///
601    /// Returns an error if the underlying read or deserialization fails.
602    pub async fn load_currency(
603        con: &ConnectionManager,
604        trader_key: &str,
605        code: &Ustr,
606        encoding: SerializationEncoding,
607    ) -> anyhow::Result<Option<Currency>> {
608        let key = format!("{CURRENCIES}{REDIS_DELIMITER}{code}");
609        let result = Self::read(con, trader_key, &key).await?;
610
611        if result.is_empty() {
612            return Ok(None);
613        }
614
615        let currency = Self::deserialize_payload(encoding, &result[0])?;
616        Ok(currency)
617    }
618
619    /// Loads a single instrument for `trader_key` and `instrument_id` using the specified `encoding`.
620    ///
621    /// # Errors
622    ///
623    /// Returns an error if the underlying read or deserialization fails.
624    pub async fn load_instrument(
625        con: &ConnectionManager,
626        trader_key: &str,
627        instrument_id: &InstrumentId,
628        encoding: SerializationEncoding,
629    ) -> anyhow::Result<Option<InstrumentAny>> {
630        let key = format!("{INSTRUMENTS}{REDIS_DELIMITER}{instrument_id}");
631        let result = Self::read(con, trader_key, &key).await?;
632        if result.is_empty() {
633            return Ok(None);
634        }
635
636        let instrument: InstrumentAny = Self::deserialize_payload(encoding, &result[0])?;
637        Ok(Some(instrument))
638    }
639
640    /// Loads a single synthetic instrument for `trader_key` and `instrument_id` using the specified `encoding`.
641    ///
642    /// # Errors
643    ///
644    /// Returns an error if the underlying read or deserialization fails.
645    pub async fn load_synthetic(
646        con: &ConnectionManager,
647        trader_key: &str,
648        instrument_id: &InstrumentId,
649        encoding: SerializationEncoding,
650    ) -> anyhow::Result<Option<SyntheticInstrument>> {
651        let key = format!("{SYNTHETICS}{REDIS_DELIMITER}{instrument_id}");
652        let result = Self::read(con, trader_key, &key).await?;
653        if result.is_empty() {
654            return Ok(None);
655        }
656
657        let synthetic: SyntheticInstrument = Self::deserialize_payload(encoding, &result[0])?;
658        Ok(Some(synthetic))
659    }
660
661    /// Loads a single account for `trader_key` and `account_id` using the specified `encoding`.
662    ///
663    /// # Errors
664    ///
665    /// Returns an error if the underlying read or deserialization fails.
666    pub async fn load_account(
667        con: &ConnectionManager,
668        trader_key: &str,
669        account_id: &AccountId,
670        encoding: SerializationEncoding,
671    ) -> anyhow::Result<Option<AccountAny>> {
672        let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
673        let result = Self::read(con, trader_key, &key).await?;
674        if result.is_empty() {
675            return Ok(None);
676        }
677
678        let account: AccountAny = Self::deserialize_payload(encoding, &result[0])?;
679        Ok(Some(account))
680    }
681
682    /// Loads a single order for `trader_key` and `client_order_id` using the specified `encoding`.
683    ///
684    /// # Errors
685    ///
686    /// Returns an error if the underlying read or deserialization fails.
687    pub async fn load_order(
688        con: &ConnectionManager,
689        trader_key: &str,
690        client_order_id: &ClientOrderId,
691        encoding: SerializationEncoding,
692    ) -> anyhow::Result<Option<OrderAny>> {
693        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
694        let result = Self::read(con, trader_key, &key).await?;
695        if result.is_empty() {
696            return Ok(None);
697        }
698
699        let order: OrderAny = Self::deserialize_payload(encoding, &result[0])?;
700        Ok(Some(order))
701    }
702
703    /// Loads a single position for `trader_key` and `position_id` using the specified `encoding`.
704    ///
705    /// # Errors
706    ///
707    /// Returns an error if the underlying read or deserialization fails.
708    pub async fn load_position(
709        con: &ConnectionManager,
710        trader_key: &str,
711        position_id: &PositionId,
712        encoding: SerializationEncoding,
713    ) -> anyhow::Result<Option<Position>> {
714        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
715        let result = Self::read(con, trader_key, &key).await?;
716        if result.is_empty() {
717            return Ok(None);
718        }
719
720        let position: Position = Self::deserialize_payload(encoding, &result[0])?;
721        Ok(Some(position))
722    }
723
724    fn get_collection_key(key: &str) -> anyhow::Result<&str> {
725        key.split_once(REDIS_DELIMITER)
726            .map(|(collection, _)| collection)
727            .ok_or_else(|| {
728                anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
729            })
730    }
731
732    async fn read_index(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
733        let index_key = Self::get_index_key(key)?;
734        match index_key {
735            INDEX_ORDER_IDS => Self::read_set(conn, key).await,
736            INDEX_ORDER_POSITION => Self::read_hset(conn, key).await,
737            INDEX_ORDER_CLIENT => Self::read_hset(conn, key).await,
738            INDEX_ORDERS => Self::read_set(conn, key).await,
739            INDEX_ORDERS_OPEN => Self::read_set(conn, key).await,
740            INDEX_ORDERS_CLOSED => Self::read_set(conn, key).await,
741            INDEX_ORDERS_EMULATED => Self::read_set(conn, key).await,
742            INDEX_ORDERS_INFLIGHT => Self::read_set(conn, key).await,
743            INDEX_POSITIONS => Self::read_set(conn, key).await,
744            INDEX_POSITIONS_OPEN => Self::read_set(conn, key).await,
745            INDEX_POSITIONS_CLOSED => Self::read_set(conn, key).await,
746            _ => anyhow::bail!("Index unknown '{index_key}' on read"),
747        }
748    }
749
750    async fn read_string(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
751        let result: Vec<u8> = conn.get(key).await?;
752
753        if result.is_empty() {
754            Ok(vec![])
755        } else {
756            Ok(vec![Bytes::from(result)])
757        }
758    }
759
760    async fn read_set(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
761        let result: Vec<Bytes> = conn.smembers(key).await?;
762        Ok(result)
763    }
764
765    async fn read_hset(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
766        let result: HashMap<String, String> = conn.hgetall(key).await?;
767        let json = serde_json::to_string(&result)?;
768        Ok(vec![Bytes::from(json.into_bytes())])
769    }
770
771    async fn read_list(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
772        let result: Vec<Bytes> = conn.lrange(key, 0, -1).await?;
773        Ok(result)
774    }
775
776    fn get_index_key(key: &str) -> anyhow::Result<&str> {
777        key.split_once(REDIS_DELIMITER)
778            .map(|(_, index_key)| index_key)
779            .ok_or_else(|| {
780                anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
781            })
782    }
783}
784
785fn is_timestamp_field(key: &str) -> bool {
786    let expire_match = key == "expire_time_ns";
787    let ts_match = key.starts_with("ts_");
788    expire_match || ts_match
789}
790
791fn convert_timestamps(value: &mut Value) {
792    match value {
793        Value::Object(map) => {
794            for (key, v) in map {
795                if is_timestamp_field(key)
796                    && let Value::Number(n) = v
797                    && let Some(n) = n.as_u64()
798                {
799                    let dt = DateTime::<Utc>::from_timestamp_nanos(n as i64);
800                    *v = Value::String(dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true));
801                }
802                convert_timestamps(v);
803            }
804        }
805        Value::Array(arr) => {
806            for item in arr {
807                convert_timestamps(item);
808            }
809        }
810        _ => {}
811    }
812}
813
814fn convert_timestamp_strings(value: &mut Value) {
815    match value {
816        Value::Object(map) => {
817            for (key, v) in map {
818                if is_timestamp_field(key)
819                    && let Value::String(s) = v
820                    && let Ok(dt) = DateTime::parse_from_rfc3339(s)
821                {
822                    *v = Value::Number(
823                        (dt.with_timezone(&Utc)
824                            .timestamp_nanos_opt()
825                            .expect("Invalid DateTime") as u64)
826                            .into(),
827                    );
828                }
829                convert_timestamp_strings(v);
830            }
831        }
832        Value::Array(arr) => {
833            for item in arr {
834                convert_timestamp_strings(item);
835            }
836        }
837        _ => {}
838    }
839}