nautilus_infrastructure/redis/
queries.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, str::FromStr};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use chrono::{DateTime, Utc};
21use futures::future::join_all;
22use nautilus_common::{cache::database::CacheMap, enums::SerializationEncoding};
23use nautilus_model::{
24    accounts::AccountAny,
25    identifiers::{AccountId, ClientOrderId, InstrumentId, PositionId},
26    instruments::{InstrumentAny, SyntheticInstrument},
27    orders::OrderAny,
28    position::Position,
29    types::Currency,
30};
31use redis::{AsyncCommands, aio::ConnectionManager};
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use ustr::Ustr;
35
36// Collection keys
37const INDEX: &str = "index";
38const GENERAL: &str = "general";
39const CURRENCIES: &str = "currencies";
40const INSTRUMENTS: &str = "instruments";
41const SYNTHETICS: &str = "synthetics";
42const ACCOUNTS: &str = "accounts";
43const ORDERS: &str = "orders";
44const POSITIONS: &str = "positions";
45const ACTORS: &str = "actors";
46const STRATEGIES: &str = "strategies";
47const REDIS_DELIMITER: char = ':';
48
49// Index keys
50const INDEX_ORDER_IDS: &str = "index:order_ids";
51const INDEX_ORDER_POSITION: &str = "index:order_position";
52const INDEX_ORDER_CLIENT: &str = "index:order_client";
53const INDEX_ORDERS: &str = "index:orders";
54const INDEX_ORDERS_OPEN: &str = "index:orders_open";
55const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
56const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
57const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
58const INDEX_POSITIONS: &str = "index:positions";
59const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
60const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
61
62#[derive(Debug)]
63pub struct DatabaseQueries;
64
65impl DatabaseQueries {
66    /// Serializes the given `payload` using the specified `encoding` to a byte vector.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if serialization to the chosen encoding fails.
71    pub fn serialize_payload<T: Serialize>(
72        encoding: SerializationEncoding,
73        payload: &T,
74    ) -> anyhow::Result<Vec<u8>> {
75        let mut value = serde_json::to_value(payload)?;
76        convert_timestamps(&mut value);
77        match encoding {
78            SerializationEncoding::MsgPack => rmp_serde::to_vec(&value)
79                .map_err(|e| anyhow::anyhow!("Failed to serialize msgpack `payload`: {e}")),
80            SerializationEncoding::Json => serde_json::to_vec(&value)
81                .map_err(|e| anyhow::anyhow!("Failed to serialize json `payload`: {e}")),
82        }
83    }
84
85    /// Deserializes the given byte slice `payload` into type `T` using the specified `encoding`.
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if deserialization from the chosen encoding fails or converting to the target type fails.
90    pub fn deserialize_payload<T: DeserializeOwned>(
91        encoding: SerializationEncoding,
92        payload: &[u8],
93    ) -> anyhow::Result<T> {
94        let mut value = match encoding {
95            SerializationEncoding::MsgPack => rmp_serde::from_slice(payload)
96                .map_err(|e| anyhow::anyhow!("Failed to deserialize msgpack `payload`: {e}"))?,
97            SerializationEncoding::Json => serde_json::from_slice(payload)
98                .map_err(|e| anyhow::anyhow!("Failed to deserialize json `payload`: {e}"))?,
99        };
100
101        convert_timestamp_strings(&mut value);
102
103        serde_json::from_value(value)
104            .map_err(|e| anyhow::anyhow!("Failed to convert value to target type: {e}"))
105    }
106
107    /// Scans Redis for keys matching the given `pattern`.
108    ///
109    /// # Errors
110    ///
111    /// Returns an error if the Redis scan operation fails.
112    pub async fn scan_keys(
113        con: &mut ConnectionManager,
114        pattern: String,
115    ) -> anyhow::Result<Vec<String>> {
116        let mut result = Vec::new();
117        let mut cursor = 0u64;
118
119        loop {
120            let scan_result: (u64, Vec<String>) = redis::cmd("SCAN")
121                .arg(cursor)
122                .arg("MATCH")
123                .arg(&pattern)
124                .arg("COUNT")
125                .arg(5000)
126                .query_async(con)
127                .await?;
128
129            let (new_cursor, keys) = scan_result;
130            result.extend(keys);
131
132            // If cursor is 0, we've completed the full scan
133            if new_cursor == 0 {
134                break;
135            }
136
137            cursor = new_cursor;
138        }
139
140        Ok(result)
141    }
142
143    /// Bulk reads multiple keys from Redis using MGET for efficiency.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the underlying Redis MGET operation fails.
148    pub async fn read_bulk(
149        con: &ConnectionManager,
150        keys: &[String],
151    ) -> anyhow::Result<Vec<Option<Bytes>>> {
152        if keys.is_empty() {
153            return Ok(vec![]);
154        }
155
156        let mut con = con.clone();
157
158        // Use MGET to fetch all keys in a single network operation
159        let results: Vec<Option<Vec<u8>>> =
160            redis::cmd("MGET").arg(keys).query_async(&mut con).await?;
161
162        // Convert Vec<u8> to Bytes
163        let bytes_results: Vec<Option<Bytes>> = results
164            .into_iter()
165            .map(|opt| opt.map(Bytes::from))
166            .collect();
167
168        Ok(bytes_results)
169    }
170
171    /// Reads raw byte payloads for `key` under `trader_key` from Redis.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if the underlying Redis read operation fails or if the collection is unsupported.
176    pub async fn read(
177        con: &ConnectionManager,
178        trader_key: &str,
179        key: &str,
180    ) -> anyhow::Result<Vec<Bytes>> {
181        let collection = Self::get_collection_key(key)?;
182        let full_key = format!("{trader_key}{REDIS_DELIMITER}{key}");
183
184        let mut con = con.clone();
185
186        match collection {
187            INDEX => Self::read_index(&mut con, &full_key).await,
188            GENERAL => Self::read_string(&mut con, &full_key).await,
189            CURRENCIES => Self::read_string(&mut con, &full_key).await,
190            INSTRUMENTS => Self::read_string(&mut con, &full_key).await,
191            SYNTHETICS => Self::read_string(&mut con, &full_key).await,
192            ACCOUNTS => Self::read_list(&mut con, &full_key).await,
193            ORDERS => Self::read_list(&mut con, &full_key).await,
194            POSITIONS => Self::read_list(&mut con, &full_key).await,
195            ACTORS => Self::read_string(&mut con, &full_key).await,
196            STRATEGIES => Self::read_string(&mut con, &full_key).await,
197            _ => anyhow::bail!("Unsupported operation: `read` for collection '{collection}'"),
198        }
199    }
200
201    /// Loads all cache data (currencies, instruments, synthetics, accounts, orders, positions) for `trader_key`.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if loading any of the individual caches fails or combining data fails.
206    pub async fn load_all(
207        con: &ConnectionManager,
208        encoding: SerializationEncoding,
209        trader_key: &str,
210    ) -> anyhow::Result<CacheMap> {
211        let (currencies, instruments, synthetics, accounts, orders, positions) = tokio::try_join!(
212            Self::load_currencies(con, trader_key, encoding),
213            Self::load_instruments(con, trader_key, encoding),
214            Self::load_synthetics(con, trader_key, encoding),
215            Self::load_accounts(con, trader_key, encoding),
216            Self::load_orders(con, trader_key, encoding),
217            Self::load_positions(con, trader_key, encoding)
218        )
219        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
220
221        // For now, we don't load greeks and yield curves from the database
222        // This will be implemented in the future
223        let greeks = AHashMap::new();
224        let yield_curves = AHashMap::new();
225
226        Ok(CacheMap {
227            currencies,
228            instruments,
229            synthetics,
230            accounts,
231            orders,
232            positions,
233            greeks,
234            yield_curves,
235        })
236    }
237
238    /// Loads all currencies for `trader_key` using the specified `encoding`.
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if scanning keys or reading currency data fails.
243    pub async fn load_currencies(
244        con: &ConnectionManager,
245        trader_key: &str,
246        encoding: SerializationEncoding,
247    ) -> anyhow::Result<AHashMap<Ustr, Currency>> {
248        let mut currencies = AHashMap::new();
249        let pattern = format!("{trader_key}{REDIS_DELIMITER}{CURRENCIES}*");
250        tracing::debug!("Loading {pattern}");
251
252        let mut con = con.clone();
253        let keys = Self::scan_keys(&mut con, pattern).await?;
254
255        if keys.is_empty() {
256            return Ok(currencies);
257        }
258
259        // Use bulk loading with MGET for efficiency
260        let bulk_values = Self::read_bulk(&con, &keys).await?;
261
262        // Process the bulk results
263        for (key, value_opt) in keys.iter().zip(bulk_values.iter()) {
264            let currency_code = if let Some(code) = key.as_str().rsplit(':').next() {
265                Ustr::from(code)
266            } else {
267                log::error!("Invalid key format: {key}");
268                continue;
269            };
270
271            if let Some(value_bytes) = value_opt {
272                match Self::deserialize_payload(encoding, value_bytes) {
273                    Ok(currency) => {
274                        currencies.insert(currency_code, currency);
275                    }
276                    Err(e) => {
277                        log::error!("Failed to deserialize currency {currency_code}: {e}");
278                    }
279                }
280            } else {
281                log::error!("Currency not found in Redis: {currency_code}");
282            }
283        }
284
285        tracing::debug!("Loaded {} currencies(s)", currencies.len());
286
287        Ok(currencies)
288    }
289
290    /// Loads all instruments for `trader_key` using the specified `encoding`.
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if scanning keys or reading instrument data fails.
295    /// Loads all instruments for `trader_key` using the specified `encoding`.
296    ///
297    /// # Errors
298    ///
299    /// Returns an error if scanning keys or reading instrument data fails.
300    pub async fn load_instruments(
301        con: &ConnectionManager,
302        trader_key: &str,
303        encoding: SerializationEncoding,
304    ) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
305        let mut instruments = AHashMap::new();
306        let pattern = format!("{trader_key}{REDIS_DELIMITER}{INSTRUMENTS}*");
307        tracing::debug!("Loading {pattern}");
308
309        let mut con = con.clone();
310        let keys = Self::scan_keys(&mut con, pattern).await?;
311
312        let futures: Vec<_> = keys
313            .iter()
314            .map(|key| {
315                let con = con.clone();
316                async move {
317                    let instrument_id = key
318                        .as_str()
319                        .rsplit(':')
320                        .next()
321                        .ok_or_else(|| {
322                            log::error!("Invalid key format: {key}");
323                            "Invalid key format"
324                        })
325                        .and_then(|code| {
326                            InstrumentId::from_str(code).map_err(|e| {
327                                log::error!("Failed to convert to InstrumentId for {key}: {e}");
328                                "Invalid instrument ID"
329                            })
330                        });
331
332                    let instrument_id = match instrument_id {
333                        Ok(id) => id,
334                        Err(_) => return None,
335                    };
336
337                    match Self::load_instrument(&con, trader_key, &instrument_id, encoding).await {
338                        Ok(Some(instrument)) => Some((instrument_id, instrument)),
339                        Ok(None) => {
340                            log::error!("Instrument not found: {instrument_id}");
341                            None
342                        }
343                        Err(e) => {
344                            log::error!("Failed to load instrument {instrument_id}: {e}");
345                            None
346                        }
347                    }
348                }
349            })
350            .collect();
351
352        // Insert all Instrument_id (key) and Instrument (value) into the HashMap, filtering out None values.
353        instruments.extend(join_all(futures).await.into_iter().flatten());
354        tracing::debug!("Loaded {} instruments(s)", instruments.len());
355
356        Ok(instruments)
357    }
358
359    /// Loads all synthetic instruments for `trader_key` using the specified `encoding`.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if scanning keys or reading synthetic instrument data fails.
364    /// Loads all synthetic instruments for `trader_key` using the specified `encoding`.
365    ///
366    /// # Errors
367    ///
368    /// Returns an error if scanning keys or reading synthetic instrument data fails.
369    pub async fn load_synthetics(
370        con: &ConnectionManager,
371        trader_key: &str,
372        encoding: SerializationEncoding,
373    ) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
374        let mut synthetics = AHashMap::new();
375        let pattern = format!("{trader_key}{REDIS_DELIMITER}{SYNTHETICS}*");
376        tracing::debug!("Loading {pattern}");
377
378        let mut con = con.clone();
379        let keys = Self::scan_keys(&mut con, pattern).await?;
380
381        let futures: Vec<_> = keys
382            .iter()
383            .map(|key| {
384                let con = con.clone();
385                async move {
386                    let instrument_id = key
387                        .as_str()
388                        .rsplit(':')
389                        .next()
390                        .ok_or_else(|| {
391                            log::error!("Invalid key format: {key}");
392                            "Invalid key format"
393                        })
394                        .and_then(|code| {
395                            InstrumentId::from_str(code).map_err(|e| {
396                                log::error!("Failed to parse InstrumentId for {key}: {e}");
397                                "Invalid instrument ID"
398                            })
399                        });
400
401                    let instrument_id = match instrument_id {
402                        Ok(id) => id,
403                        Err(_) => return None,
404                    };
405
406                    match Self::load_synthetic(&con, trader_key, &instrument_id, encoding).await {
407                        Ok(Some(synthetic)) => Some((instrument_id, synthetic)),
408                        Ok(None) => {
409                            log::error!("Synthetic not found: {instrument_id}");
410                            None
411                        }
412                        Err(e) => {
413                            log::error!("Failed to load synthetic {instrument_id}: {e}");
414                            None
415                        }
416                    }
417                }
418            })
419            .collect();
420
421        // Insert all Instrument_id (key) and Synthetic (value) into the HashMap, filtering out None values.
422        synthetics.extend(join_all(futures).await.into_iter().flatten());
423        tracing::debug!("Loaded {} synthetics(s)", synthetics.len());
424
425        Ok(synthetics)
426    }
427
428    /// Loads all accounts for `trader_key` using the specified `encoding`.
429    ///
430    /// # Errors
431    ///
432    /// Returns an error if scanning keys or reading account data fails.
433    /// Loads all accounts for `trader_key` using the specified `encoding`.
434    ///
435    /// # Errors
436    ///
437    /// Returns an error if scanning keys or reading account data fails.
438    pub async fn load_accounts(
439        con: &ConnectionManager,
440        trader_key: &str,
441        encoding: SerializationEncoding,
442    ) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
443        let mut accounts = AHashMap::new();
444        let pattern = format!("{trader_key}{REDIS_DELIMITER}{ACCOUNTS}*");
445        tracing::debug!("Loading {pattern}");
446
447        let mut con = con.clone();
448        let keys = Self::scan_keys(&mut con, pattern).await?;
449
450        let futures: Vec<_> = keys
451            .iter()
452            .map(|key| {
453                let con = con.clone();
454                async move {
455                    let account_id = if let Some(code) = key.as_str().rsplit(':').next() {
456                        AccountId::from(code)
457                    } else {
458                        log::error!("Invalid key format: {key}");
459                        return None;
460                    };
461
462                    match Self::load_account(&con, trader_key, &account_id, encoding).await {
463                        Ok(Some(account)) => Some((account_id, account)),
464                        Ok(None) => {
465                            log::error!("Account not found: {account_id}");
466                            None
467                        }
468                        Err(e) => {
469                            log::error!("Failed to load account {account_id}: {e}");
470                            None
471                        }
472                    }
473                }
474            })
475            .collect();
476
477        // Insert all Account_id (key) and Account (value) into the HashMap, filtering out None values.
478        accounts.extend(join_all(futures).await.into_iter().flatten());
479        tracing::debug!("Loaded {} accounts(s)", accounts.len());
480
481        Ok(accounts)
482    }
483
484    /// Loads all orders for `trader_key` using the specified `encoding`.
485    ///
486    /// # Errors
487    ///
488    /// Returns an error if scanning keys or reading order data fails.
489    /// Loads all orders for `trader_key` using the specified `encoding`.
490    ///
491    /// # Errors
492    ///
493    /// Returns an error if scanning keys or reading order data fails.
494    pub async fn load_orders(
495        con: &ConnectionManager,
496        trader_key: &str,
497        encoding: SerializationEncoding,
498    ) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
499        let mut orders = AHashMap::new();
500        let pattern = format!("{trader_key}{REDIS_DELIMITER}{ORDERS}*");
501        tracing::debug!("Loading {pattern}");
502
503        let mut con = con.clone();
504        let keys = Self::scan_keys(&mut con, pattern).await?;
505
506        let futures: Vec<_> = keys
507            .iter()
508            .map(|key| {
509                let con = con.clone();
510                async move {
511                    let client_order_id = if let Some(code) = key.as_str().rsplit(':').next() {
512                        ClientOrderId::from(code)
513                    } else {
514                        log::error!("Invalid key format: {key}");
515                        return None;
516                    };
517
518                    match Self::load_order(&con, trader_key, &client_order_id, encoding).await {
519                        Ok(Some(order)) => Some((client_order_id, order)),
520                        Ok(None) => {
521                            log::error!("Order not found: {client_order_id}");
522                            None
523                        }
524                        Err(e) => {
525                            log::error!("Failed to load order {client_order_id}: {e}");
526                            None
527                        }
528                    }
529                }
530            })
531            .collect();
532
533        // Insert all Client-Order-Id (key) and Order (value) into the HashMap, filtering out None values.
534        orders.extend(join_all(futures).await.into_iter().flatten());
535        tracing::debug!("Loaded {} order(s)", orders.len());
536
537        Ok(orders)
538    }
539
540    /// Loads all positions for `trader_key` using the specified `encoding`.
541    ///
542    /// # Errors
543    ///
544    /// Returns an error if scanning keys or reading position data fails.
545    /// Loads all positions for `trader_key` using the specified `encoding`.
546    ///
547    /// # Errors
548    ///
549    /// Returns an error if scanning keys or reading position data fails.
550    pub async fn load_positions(
551        con: &ConnectionManager,
552        trader_key: &str,
553        encoding: SerializationEncoding,
554    ) -> anyhow::Result<AHashMap<PositionId, Position>> {
555        let mut positions = AHashMap::new();
556        let pattern = format!("{trader_key}{REDIS_DELIMITER}{POSITIONS}*");
557        tracing::debug!("Loading {pattern}");
558
559        let mut con = con.clone();
560        let keys = Self::scan_keys(&mut con, pattern).await?;
561
562        let futures: Vec<_> = keys
563            .iter()
564            .map(|key| {
565                let con = con.clone();
566                async move {
567                    let position_id = if let Some(code) = key.as_str().rsplit(':').next() {
568                        PositionId::from(code)
569                    } else {
570                        log::error!("Invalid key format: {key}");
571                        return None;
572                    };
573
574                    match Self::load_position(&con, trader_key, &position_id, encoding).await {
575                        Ok(Some(position)) => Some((position_id, position)),
576                        Ok(None) => {
577                            log::error!("Position not found: {position_id}");
578                            None
579                        }
580                        Err(e) => {
581                            log::error!("Failed to load position {position_id}: {e}");
582                            None
583                        }
584                    }
585                }
586            })
587            .collect();
588
589        // Insert all Position_id (key) and Position (value) into the HashMap, filtering out None values.
590        positions.extend(join_all(futures).await.into_iter().flatten());
591        tracing::debug!("Loaded {} position(s)", positions.len());
592
593        Ok(positions)
594    }
595
596    /// Loads a single currency for `trader_key` and `code` using the specified `encoding`.
597    ///
598    /// # Errors
599    ///
600    /// Returns an error if the underlying read or deserialization fails.
601    pub async fn load_currency(
602        con: &ConnectionManager,
603        trader_key: &str,
604        code: &Ustr,
605        encoding: SerializationEncoding,
606    ) -> anyhow::Result<Option<Currency>> {
607        let key = format!("{CURRENCIES}{REDIS_DELIMITER}{code}");
608        let result = Self::read(con, trader_key, &key).await?;
609
610        if result.is_empty() {
611            return Ok(None);
612        }
613
614        let currency = Self::deserialize_payload(encoding, &result[0])?;
615        Ok(currency)
616    }
617
618    /// Loads a single instrument for `trader_key` and `instrument_id` using the specified `encoding`.
619    ///
620    /// # Errors
621    ///
622    /// Returns an error if the underlying read or deserialization fails.
623    pub async fn load_instrument(
624        con: &ConnectionManager,
625        trader_key: &str,
626        instrument_id: &InstrumentId,
627        encoding: SerializationEncoding,
628    ) -> anyhow::Result<Option<InstrumentAny>> {
629        let key = format!("{INSTRUMENTS}{REDIS_DELIMITER}{instrument_id}");
630        let result = Self::read(con, trader_key, &key).await?;
631        if result.is_empty() {
632            return Ok(None);
633        }
634
635        let instrument: InstrumentAny = Self::deserialize_payload(encoding, &result[0])?;
636        Ok(Some(instrument))
637    }
638
639    /// Loads a single synthetic instrument for `trader_key` and `instrument_id` using the specified `encoding`.
640    ///
641    /// # Errors
642    ///
643    /// Returns an error if the underlying read or deserialization fails.
644    pub async fn load_synthetic(
645        con: &ConnectionManager,
646        trader_key: &str,
647        instrument_id: &InstrumentId,
648        encoding: SerializationEncoding,
649    ) -> anyhow::Result<Option<SyntheticInstrument>> {
650        let key = format!("{SYNTHETICS}{REDIS_DELIMITER}{instrument_id}");
651        let result = Self::read(con, trader_key, &key).await?;
652        if result.is_empty() {
653            return Ok(None);
654        }
655
656        let synthetic: SyntheticInstrument = Self::deserialize_payload(encoding, &result[0])?;
657        Ok(Some(synthetic))
658    }
659
660    /// Loads a single account for `trader_key` and `account_id` using the specified `encoding`.
661    ///
662    /// # Errors
663    ///
664    /// Returns an error if the underlying read or deserialization fails.
665    pub async fn load_account(
666        con: &ConnectionManager,
667        trader_key: &str,
668        account_id: &AccountId,
669        encoding: SerializationEncoding,
670    ) -> anyhow::Result<Option<AccountAny>> {
671        let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
672        let result = Self::read(con, trader_key, &key).await?;
673        if result.is_empty() {
674            return Ok(None);
675        }
676
677        let account: AccountAny = Self::deserialize_payload(encoding, &result[0])?;
678        Ok(Some(account))
679    }
680
681    /// Loads a single order for `trader_key` and `client_order_id` using the specified `encoding`.
682    ///
683    /// # Errors
684    ///
685    /// Returns an error if the underlying read or deserialization fails.
686    pub async fn load_order(
687        con: &ConnectionManager,
688        trader_key: &str,
689        client_order_id: &ClientOrderId,
690        encoding: SerializationEncoding,
691    ) -> anyhow::Result<Option<OrderAny>> {
692        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
693        let result = Self::read(con, trader_key, &key).await?;
694        if result.is_empty() {
695            return Ok(None);
696        }
697
698        let order: OrderAny = Self::deserialize_payload(encoding, &result[0])?;
699        Ok(Some(order))
700    }
701
702    /// Loads a single position for `trader_key` and `position_id` using the specified `encoding`.
703    ///
704    /// # Errors
705    ///
706    /// Returns an error if the underlying read or deserialization fails.
707    pub async fn load_position(
708        con: &ConnectionManager,
709        trader_key: &str,
710        position_id: &PositionId,
711        encoding: SerializationEncoding,
712    ) -> anyhow::Result<Option<Position>> {
713        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
714        let result = Self::read(con, trader_key, &key).await?;
715        if result.is_empty() {
716            return Ok(None);
717        }
718
719        let position: Position = Self::deserialize_payload(encoding, &result[0])?;
720        Ok(Some(position))
721    }
722
723    fn get_collection_key(key: &str) -> anyhow::Result<&str> {
724        key.split_once(REDIS_DELIMITER)
725            .map(|(collection, _)| collection)
726            .ok_or_else(|| {
727                anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
728            })
729    }
730
731    async fn read_index(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
732        let index_key = Self::get_index_key(key)?;
733        match index_key {
734            INDEX_ORDER_IDS => Self::read_set(conn, key).await,
735            INDEX_ORDER_POSITION => Self::read_hset(conn, key).await,
736            INDEX_ORDER_CLIENT => Self::read_hset(conn, key).await,
737            INDEX_ORDERS => Self::read_set(conn, key).await,
738            INDEX_ORDERS_OPEN => Self::read_set(conn, key).await,
739            INDEX_ORDERS_CLOSED => Self::read_set(conn, key).await,
740            INDEX_ORDERS_EMULATED => Self::read_set(conn, key).await,
741            INDEX_ORDERS_INFLIGHT => Self::read_set(conn, key).await,
742            INDEX_POSITIONS => Self::read_set(conn, key).await,
743            INDEX_POSITIONS_OPEN => Self::read_set(conn, key).await,
744            INDEX_POSITIONS_CLOSED => Self::read_set(conn, key).await,
745            _ => anyhow::bail!("Index unknown '{index_key}' on read"),
746        }
747    }
748
749    async fn read_string(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
750        let result: Vec<u8> = conn.get(key).await?;
751
752        if result.is_empty() {
753            Ok(vec![])
754        } else {
755            Ok(vec![Bytes::from(result)])
756        }
757    }
758
759    async fn read_set(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
760        let result: Vec<Bytes> = conn.smembers(key).await?;
761        Ok(result)
762    }
763
764    async fn read_hset(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
765        let result: HashMap<String, String> = conn.hgetall(key).await?;
766        let json = serde_json::to_string(&result)?;
767        Ok(vec![Bytes::from(json.into_bytes())])
768    }
769
770    async fn read_list(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
771        let result: Vec<Bytes> = conn.lrange(key, 0, -1).await?;
772        Ok(result)
773    }
774
775    fn get_index_key(key: &str) -> anyhow::Result<&str> {
776        key.split_once(REDIS_DELIMITER)
777            .map(|(_, index_key)| index_key)
778            .ok_or_else(|| {
779                anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
780            })
781    }
782}
783
784fn is_timestamp_field(key: &str) -> bool {
785    let expire_match = key == "expire_time_ns";
786    let ts_match = key.starts_with("ts_");
787    expire_match || ts_match
788}
789
790fn convert_timestamps(value: &mut Value) {
791    match value {
792        Value::Object(map) => {
793            for (key, v) in map {
794                if is_timestamp_field(key)
795                    && let Value::Number(n) = v
796                    && let Some(n) = n.as_u64()
797                {
798                    let dt = DateTime::<Utc>::from_timestamp_nanos(n as i64);
799                    *v = Value::String(dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true));
800                }
801                convert_timestamps(v);
802            }
803        }
804        Value::Array(arr) => {
805            for item in arr {
806                convert_timestamps(item);
807            }
808        }
809        _ => {}
810    }
811}
812
813fn convert_timestamp_strings(value: &mut Value) {
814    match value {
815        Value::Object(map) => {
816            for (key, v) in map {
817                if is_timestamp_field(key)
818                    && let Value::String(s) = v
819                    && let Ok(dt) = DateTime::parse_from_rfc3339(s)
820                {
821                    *v = Value::Number(
822                        (dt.with_timezone(&Utc)
823                            .timestamp_nanos_opt()
824                            .expect("Invalid DateTime") as u64)
825                            .into(),
826                    );
827                }
828                convert_timestamp_strings(v);
829            }
830        }
831        Value::Array(arr) => {
832            for item in arr {
833                convert_timestamp_strings(item);
834            }
835        }
836        _ => {}
837    }
838}