nautilus_infrastructure/redis/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::{HashMap, VecDeque},
18    str::FromStr,
19    time::{Duration, Instant},
20};
21
22use bytes::Bytes;
23use nautilus_common::{
24    cache::{database::CacheDatabaseAdapter, CacheConfig},
25    custom::CustomData,
26    enums::SerializationEncoding,
27    runtime::get_runtime,
28    signal::Signal,
29};
30use nautilus_core::{correctness::check_slice_not_empty, UnixNanos, UUID4};
31use nautilus_cryptography::providers::install_cryptographic_provider;
32use nautilus_model::{
33    accounts::AccountAny,
34    data::{Bar, DataType, QuoteTick, TradeTick},
35    events::{position::snapshot::PositionSnapshot, OrderEventAny, OrderSnapshot},
36    identifiers::{
37        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
38        TraderId, VenueOrderId,
39    },
40    instruments::{InstrumentAny, SyntheticInstrument},
41    orderbook::OrderBook,
42    orders::OrderAny,
43    position::Position,
44    types::Currency,
45};
46use redis::{Commands, Connection, Pipeline, RedisError};
47use ustr::Ustr;
48
49use super::{REDIS_DELIMITER, REDIS_FLUSHDB};
50use crate::redis::create_redis_connection;
51
52// Task and connection names
53const CACHE_READ: &str = "cache-read";
54const CACHE_WRITE: &str = "cache-write";
55
56// Error constants
57const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
58
59// Collection keys
60const INDEX: &str = "index";
61const GENERAL: &str = "general";
62const CURRENCIES: &str = "currencies";
63const INSTRUMENTS: &str = "instruments";
64const SYNTHETICS: &str = "synthetics";
65const ACCOUNTS: &str = "accounts";
66const ORDERS: &str = "orders";
67const POSITIONS: &str = "positions";
68const ACTORS: &str = "actors";
69const STRATEGIES: &str = "strategies";
70const SNAPSHOTS: &str = "snapshots";
71const HEALTH: &str = "health";
72
73// Index keys
74const INDEX_ORDER_IDS: &str = "index:order_ids";
75const INDEX_ORDER_POSITION: &str = "index:order_position";
76const INDEX_ORDER_CLIENT: &str = "index:order_client";
77const INDEX_ORDERS: &str = "index:orders";
78const INDEX_ORDERS_OPEN: &str = "index:orders_open";
79const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
80const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
81const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
82const INDEX_POSITIONS: &str = "index:positions";
83const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
84const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
85
86/// A type of database operation.
87#[derive(Clone, Debug)]
88pub enum DatabaseOperation {
89    Insert,
90    Update,
91    Delete,
92    Close,
93}
94
95/// Represents a database command to be performed which may be executed in a task.
96#[derive(Clone, Debug)]
97pub struct DatabaseCommand {
98    /// The database operation type.
99    pub op_type: DatabaseOperation,
100    /// The primary key for the operation.
101    pub key: Option<String>,
102    /// The data payload for the operation.
103    pub payload: Option<Vec<Bytes>>,
104}
105
106impl DatabaseCommand {
107    /// Creates a new [`DatabaseCommand`] instance.
108    #[must_use]
109    pub fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
110        Self {
111            op_type,
112            key: Some(key),
113            payload,
114        }
115    }
116
117    /// Initialize a `Close` database command, this is meant to close the database cache channel.
118    #[must_use]
119    pub fn close() -> Self {
120        Self {
121            op_type: DatabaseOperation::Close,
122            key: None,
123            payload: None,
124        }
125    }
126}
127
128#[cfg_attr(
129    feature = "python",
130    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
131)]
132pub struct RedisCacheDatabase {
133    pub trader_id: TraderId,
134    trader_key: String,
135    con: Connection,
136    tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
137    handle: tokio::task::JoinHandle<()>,
138}
139
140impl RedisCacheDatabase {
141    /// Creates a new [`RedisCacheDatabase`] instance.
142    pub fn new(
143        trader_id: TraderId,
144        instance_id: UUID4,
145        config: CacheConfig,
146    ) -> anyhow::Result<RedisCacheDatabase> {
147        install_cryptographic_provider();
148
149        let db_config = config
150            .database
151            .as_ref()
152            .ok_or_else(|| anyhow::anyhow!("No database config"))?;
153        let con = create_redis_connection(CACHE_READ, db_config.clone())?;
154
155        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
156        let trader_key = get_trader_key(trader_id, instance_id, &config);
157        let trader_key_clone = trader_key.clone();
158
159        let handle = get_runtime().spawn(async move {
160            process_commands(rx, trader_key_clone, config.clone())
161                .await
162                .expect("Error spawning task '{CACHE_WRITE}'")
163        });
164
165        Ok(RedisCacheDatabase {
166            trader_id,
167            trader_key,
168            con,
169            tx,
170            handle,
171        })
172    }
173
174    pub fn close(&mut self) {
175        log::debug!("Closing");
176
177        if let Err(e) = self.tx.send(DatabaseCommand::close()) {
178            log::debug!("Error sending close message: {e:?}")
179        }
180
181        log::debug!("Awaiting task '{CACHE_WRITE}'");
182        tokio::task::block_in_place(|| {
183            if let Err(e) = get_runtime().block_on(&mut self.handle) {
184                log::error!("Error awaiting task '{CACHE_WRITE}': {e:?}");
185            }
186        });
187
188        log::debug!("Closed");
189    }
190
191    pub fn flushdb(&mut self) {
192        if let Err(e) = redis::cmd(REDIS_FLUSHDB).query::<()>(&mut self.con) {
193            log::error!("Failed to flush database: {e:?}");
194        }
195    }
196
197    pub fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
198        let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
199        log::debug!("Querying keys: {pattern}");
200        Ok(scan_keys(&mut self.con, pattern)?)
201    }
202
203    pub fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
204        let collection = get_collection_key(key)?;
205        let key = format!("{}{REDIS_DELIMITER}{}", self.trader_key, key);
206
207        match collection {
208            INDEX => read_index(&mut self.con, &key),
209            GENERAL => read_string(&mut self.con, &key),
210            CURRENCIES => read_string(&mut self.con, &key),
211            INSTRUMENTS => read_string(&mut self.con, &key),
212            SYNTHETICS => read_string(&mut self.con, &key),
213            ACCOUNTS => read_list(&mut self.con, &key),
214            ORDERS => read_list(&mut self.con, &key),
215            POSITIONS => read_list(&mut self.con, &key),
216            ACTORS => read_string(&mut self.con, &key),
217            STRATEGIES => read_string(&mut self.con, &key),
218            _ => anyhow::bail!("Unsupported operation: `read` for collection '{collection}'"),
219        }
220    }
221
222    pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
223        let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
224        match self.tx.send(op) {
225            Ok(_) => Ok(()),
226            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
227        }
228    }
229
230    pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
231        let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
232        match self.tx.send(op) {
233            Ok(_) => Ok(()),
234            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
235        }
236    }
237
238    pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
239        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
240        match self.tx.send(op) {
241            Ok(_) => Ok(()),
242            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
243        }
244    }
245}
246
247async fn process_commands(
248    mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
249    trader_key: String,
250    config: CacheConfig,
251) -> anyhow::Result<()> {
252    tracing::debug!("Starting cache processing");
253
254    let db_config = config
255        .database
256        .as_ref()
257        .ok_or_else(|| anyhow::anyhow!("No database config"))?;
258    let mut con = create_redis_connection(CACHE_WRITE, db_config.clone())?;
259
260    // Buffering
261    let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
262    let mut last_drain = Instant::now();
263    let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
264
265    // Continue to receive and handle messages until channel is hung up
266    loop {
267        if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
268            drain_buffer(&mut con, &trader_key, &mut buffer);
269            last_drain = Instant::now();
270        } else {
271            match rx.recv().await {
272                Some(msg) => {
273                    if let DatabaseOperation::Close = msg.op_type {
274                        break;
275                    }
276                    buffer.push_back(msg)
277                }
278                None => break, // Channel hung up
279            }
280        }
281    }
282
283    // Drain any remaining messages
284    if !buffer.is_empty() {
285        drain_buffer(&mut con, &trader_key, &mut buffer);
286    }
287
288    tracing::debug!("Stopped cache processing");
289    Ok(())
290}
291
292fn drain_buffer(conn: &mut Connection, trader_key: &str, buffer: &mut VecDeque<DatabaseCommand>) {
293    let mut pipe = redis::pipe();
294    pipe.atomic();
295
296    for msg in buffer.drain(..) {
297        let key = msg.key.expect("Null command `key`");
298        let collection = match get_collection_key(&key) {
299            Ok(collection) => collection,
300            Err(e) => {
301                tracing::error!("{e}");
302                continue; // Continue to next message
303            }
304        };
305
306        let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
307
308        match msg.op_type {
309            DatabaseOperation::Insert => {
310                if let Some(payload) = msg.payload {
311                    if let Err(e) = insert(&mut pipe, collection, &key, payload) {
312                        tracing::error!("{e}");
313                    }
314                } else {
315                    tracing::error!("Null `payload` for `insert`");
316                }
317            }
318            DatabaseOperation::Update => {
319                if let Some(payload) = msg.payload {
320                    if let Err(e) = update(&mut pipe, collection, &key, payload) {
321                        tracing::error!("{e}");
322                    }
323                } else {
324                    tracing::error!("Null `payload` for `update`");
325                };
326            }
327            DatabaseOperation::Delete => {
328                // `payload` can be `None` for a delete operation
329                if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
330                    tracing::error!("{e}");
331                }
332            }
333            DatabaseOperation::Close => panic!("Close command should not be drained"),
334        }
335    }
336
337    if let Err(e) = pipe.query::<()>(conn) {
338        tracing::error!("{e}");
339    }
340}
341
342fn scan_keys(con: &mut Connection, pattern: String) -> Result<Vec<String>, RedisError> {
343    Ok(con.scan_match::<String, String>(pattern)?.collect())
344}
345
346fn read_index(conn: &mut Connection, key: &str) -> anyhow::Result<Vec<Bytes>> {
347    let index_key = get_index_key(key)?;
348    match index_key {
349        INDEX_ORDER_IDS => read_set(conn, key),
350        INDEX_ORDER_POSITION => read_hset(conn, key),
351        INDEX_ORDER_CLIENT => read_hset(conn, key),
352        INDEX_ORDERS => read_set(conn, key),
353        INDEX_ORDERS_OPEN => read_set(conn, key),
354        INDEX_ORDERS_CLOSED => read_set(conn, key),
355        INDEX_ORDERS_EMULATED => read_set(conn, key),
356        INDEX_ORDERS_INFLIGHT => read_set(conn, key),
357        INDEX_POSITIONS => read_set(conn, key),
358        INDEX_POSITIONS_OPEN => read_set(conn, key),
359        INDEX_POSITIONS_CLOSED => read_set(conn, key),
360        _ => anyhow::bail!("Index unknown '{index_key}' on read"),
361    }
362}
363
364fn read_string(conn: &mut Connection, key: &str) -> anyhow::Result<Vec<Bytes>> {
365    let result: Vec<u8> = conn.get(key)?;
366
367    if result.is_empty() {
368        Ok(vec![])
369    } else {
370        Ok(vec![Bytes::from(result)])
371    }
372}
373
374fn read_set(conn: &mut Connection, key: &str) -> anyhow::Result<Vec<Bytes>> {
375    let result: Vec<Bytes> = conn.smembers(key)?;
376    Ok(result)
377}
378
379fn read_hset(conn: &mut Connection, key: &str) -> anyhow::Result<Vec<Bytes>> {
380    let result: HashMap<String, String> = conn.hgetall(key)?;
381    let json = serde_json::to_string(&result)?;
382    Ok(vec![Bytes::from(json.into_bytes())])
383}
384
385fn read_list(conn: &mut Connection, key: &str) -> anyhow::Result<Vec<Bytes>> {
386    let result: Vec<Bytes> = conn.lrange(key, 0, -1)?;
387    Ok(result)
388}
389
390fn insert(
391    pipe: &mut Pipeline,
392    collection: &str,
393    key: &str,
394    value: Vec<Bytes>,
395) -> anyhow::Result<()> {
396    check_slice_not_empty(value.as_slice(), stringify!(value))?;
397
398    match collection {
399        INDEX => insert_index(pipe, key, &value),
400        GENERAL => {
401            insert_string(pipe, key, value[0].as_ref());
402            Ok(())
403        }
404        CURRENCIES => {
405            insert_string(pipe, key, value[0].as_ref());
406            Ok(())
407        }
408        INSTRUMENTS => {
409            insert_string(pipe, key, value[0].as_ref());
410            Ok(())
411        }
412        SYNTHETICS => {
413            insert_string(pipe, key, value[0].as_ref());
414            Ok(())
415        }
416        ACCOUNTS => {
417            insert_list(pipe, key, value[0].as_ref());
418            Ok(())
419        }
420        ORDERS => {
421            insert_list(pipe, key, value[0].as_ref());
422            Ok(())
423        }
424        POSITIONS => {
425            insert_list(pipe, key, value[0].as_ref());
426            Ok(())
427        }
428        ACTORS => {
429            insert_string(pipe, key, value[0].as_ref());
430            Ok(())
431        }
432        STRATEGIES => {
433            insert_string(pipe, key, value[0].as_ref());
434            Ok(())
435        }
436        SNAPSHOTS => {
437            insert_list(pipe, key, value[0].as_ref());
438            Ok(())
439        }
440        HEALTH => {
441            insert_string(pipe, key, value[0].as_ref());
442            Ok(())
443        }
444        _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
445    }
446}
447
448fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
449    let index_key = get_index_key(key)?;
450    match index_key {
451        INDEX_ORDER_IDS => {
452            insert_set(pipe, key, value[0].as_ref());
453            Ok(())
454        }
455        INDEX_ORDER_POSITION => {
456            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
457            Ok(())
458        }
459        INDEX_ORDER_CLIENT => {
460            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
461            Ok(())
462        }
463        INDEX_ORDERS => {
464            insert_set(pipe, key, value[0].as_ref());
465            Ok(())
466        }
467        INDEX_ORDERS_OPEN => {
468            insert_set(pipe, key, value[0].as_ref());
469            Ok(())
470        }
471        INDEX_ORDERS_CLOSED => {
472            insert_set(pipe, key, value[0].as_ref());
473            Ok(())
474        }
475        INDEX_ORDERS_EMULATED => {
476            insert_set(pipe, key, value[0].as_ref());
477            Ok(())
478        }
479        INDEX_ORDERS_INFLIGHT => {
480            insert_set(pipe, key, value[0].as_ref());
481            Ok(())
482        }
483        INDEX_POSITIONS => {
484            insert_set(pipe, key, value[0].as_ref());
485            Ok(())
486        }
487        INDEX_POSITIONS_OPEN => {
488            insert_set(pipe, key, value[0].as_ref());
489            Ok(())
490        }
491        INDEX_POSITIONS_CLOSED => {
492            insert_set(pipe, key, value[0].as_ref());
493            Ok(())
494        }
495        _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
496    }
497}
498
499fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
500    pipe.set(key, value);
501}
502
503fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
504    pipe.sadd(key, value);
505}
506
507fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
508    pipe.hset(key, name, value);
509}
510
511fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
512    pipe.rpush(key, value);
513}
514
515fn update(
516    pipe: &mut Pipeline,
517    collection: &str,
518    key: &str,
519    value: Vec<Bytes>,
520) -> anyhow::Result<()> {
521    check_slice_not_empty(value.as_slice(), stringify!(value))?;
522
523    match collection {
524        ACCOUNTS => {
525            update_list(pipe, key, value[0].as_ref());
526            Ok(())
527        }
528        ORDERS => {
529            update_list(pipe, key, value[0].as_ref());
530            Ok(())
531        }
532        POSITIONS => {
533            update_list(pipe, key, value[0].as_ref());
534            Ok(())
535        }
536        _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
537    }
538}
539
540fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
541    pipe.rpush_exists(key, value);
542}
543
544fn delete(
545    pipe: &mut Pipeline,
546    collection: &str,
547    key: &str,
548    value: Option<Vec<Bytes>>,
549) -> anyhow::Result<()> {
550    match collection {
551        INDEX => remove_index(pipe, key, value),
552        ACTORS => {
553            delete_string(pipe, key);
554            Ok(())
555        }
556        STRATEGIES => {
557            delete_string(pipe, key);
558            Ok(())
559        }
560        _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
561    }
562}
563
564fn remove_index(pipe: &mut Pipeline, key: &str, value: Option<Vec<Bytes>>) -> anyhow::Result<()> {
565    let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
566    let index_key = get_index_key(key)?;
567
568    match index_key {
569        INDEX_ORDERS_OPEN => {
570            remove_from_set(pipe, key, value[0].as_ref());
571            Ok(())
572        }
573        INDEX_ORDERS_CLOSED => {
574            remove_from_set(pipe, key, value[0].as_ref());
575            Ok(())
576        }
577        INDEX_ORDERS_EMULATED => {
578            remove_from_set(pipe, key, value[0].as_ref());
579            Ok(())
580        }
581        INDEX_ORDERS_INFLIGHT => {
582            remove_from_set(pipe, key, value[0].as_ref());
583            Ok(())
584        }
585        INDEX_POSITIONS_OPEN => {
586            remove_from_set(pipe, key, value[0].as_ref());
587            Ok(())
588        }
589        INDEX_POSITIONS_CLOSED => {
590            remove_from_set(pipe, key, value[0].as_ref());
591            Ok(())
592        }
593        _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
594    }
595}
596
597fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
598    pipe.srem(key, member);
599}
600
601fn delete_string(pipe: &mut Pipeline, key: &str) {
602    pipe.del(key);
603}
604
605fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
606    let mut key = String::new();
607
608    if config.use_trader_prefix {
609        key.push_str("trader-");
610    }
611
612    key.push_str(trader_id.as_str());
613
614    if config.use_instance_id {
615        key.push(REDIS_DELIMITER);
616        key.push_str(&format!("{instance_id}"));
617    }
618
619    key
620}
621
622fn get_collection_key(key: &str) -> anyhow::Result<&str> {
623    key.split_once(REDIS_DELIMITER)
624        .map(|(collection, _)| collection)
625        .ok_or_else(|| {
626            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
627        })
628}
629
630fn get_index_key(key: &str) -> anyhow::Result<&str> {
631    key.split_once(REDIS_DELIMITER)
632        .map(|(_, index_key)| index_key)
633        .ok_or_else(|| {
634            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
635        })
636}
637
638// This function can be used when we handle cache serialization in Rust
639#[allow(dead_code)]
640fn get_encoding(config: &HashMap<String, serde_json::Value>) -> String {
641    config
642        .get("encoding")
643        .and_then(|v| v.as_str())
644        .unwrap_or("msgpack")
645        .to_string()
646}
647
648// This function can be used when we handle cache serialization in Rust
649#[allow(dead_code)]
650fn deserialize_payload(
651    encoding: &str,
652    payload: &[u8],
653) -> anyhow::Result<HashMap<String, serde_json::Value>> {
654    match encoding {
655        "msgpack" => rmp_serde::from_slice(payload)
656            .map_err(|e| anyhow::anyhow!("Failed to deserialize msgpack `payload`: {e}")),
657        "json" => serde_json::from_slice(payload)
658            .map_err(|e| anyhow::anyhow!("Failed to deserialize json `payload`: {e}")),
659        _ => Err(anyhow::anyhow!("Unsupported encoding: {encoding}")),
660    }
661}
662
663#[allow(dead_code)] // Under development
664pub struct RedisCacheDatabaseAdapter {
665    pub encoding: SerializationEncoding,
666    database: RedisCacheDatabase,
667}
668
669#[allow(dead_code)] // Under development
670#[allow(unused)] // Under development
671impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
672    fn close(&mut self) -> anyhow::Result<()> {
673        self.database.close();
674        Ok(())
675    }
676
677    fn flush(&mut self) -> anyhow::Result<()> {
678        self.database.flushdb();
679        Ok(())
680    }
681
682    fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
683        // self.database.load()
684        Ok(HashMap::new()) // TODO
685    }
686
687    fn load_currencies(&mut self) -> anyhow::Result<HashMap<Ustr, Currency>> {
688        let mut currencies = HashMap::new();
689        let pattern = format!("{CURRENCIES}*");
690
691        for key in scan_keys(&mut self.database.con, pattern)? {
692            let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
693            let currency_code = Ustr::from(parts.first().unwrap());
694            let result = self.load_currency(&currency_code)?;
695            match result {
696                Some(currency) => {
697                    currencies.insert(currency_code, currency);
698                }
699                None => {
700                    log::error!("Currency not found: {currency_code}");
701                }
702            }
703        }
704        Ok(currencies)
705    }
706
707    fn load_instruments(&mut self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
708        let mut instruments = HashMap::new();
709        let pattern = format!("{INSTRUMENTS}*");
710
711        for key in scan_keys(&mut self.database.con, pattern)? {
712            let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
713            let instrument_id = InstrumentId::from_str(parts.first().unwrap())?;
714            let result = self.load_instrument(&instrument_id)?;
715            match result {
716                Some(instrument) => {
717                    instruments.insert(instrument_id, instrument);
718                }
719                None => {
720                    log::error!("Instrument not found: {instrument_id}");
721                }
722            }
723        }
724
725        Ok(instruments)
726    }
727
728    fn load_synthetics(&mut self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
729        let mut synthetics = HashMap::new();
730        let pattern = format!("{SYNTHETICS}*");
731
732        for key in scan_keys(&mut self.database.con, pattern)? {
733            let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
734            let instrument_id = InstrumentId::from_str(parts.first().unwrap())?;
735            let synthetic = self.load_synthetic(&instrument_id)?;
736            synthetics.insert(instrument_id, synthetic);
737        }
738
739        Ok(synthetics)
740    }
741
742    fn load_accounts(&mut self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
743        let mut accounts = HashMap::new();
744        let pattern = format!("{ACCOUNTS}*");
745
746        for key in scan_keys(&mut self.database.con, pattern)? {
747            let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
748            let account_id = AccountId::from(*parts.first().unwrap());
749            let result = self.load_account(&account_id)?;
750            match result {
751                Some(account) => {
752                    accounts.insert(account_id, account);
753                }
754                None => {
755                    log::error!("Account not found: {account_id}");
756                }
757            }
758        }
759
760        Ok(accounts)
761    }
762
763    fn load_orders(&mut self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
764        let mut orders = HashMap::new();
765        let pattern = format!("{ORDERS}*");
766
767        for key in scan_keys(&mut self.database.con, pattern)? {
768            let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
769            let client_order_id = ClientOrderId::from(*parts.first().unwrap());
770            let result = self.load_order(&client_order_id)?;
771            match result {
772                Some(order) => {
773                    orders.insert(client_order_id, order);
774                }
775                None => {
776                    log::error!("Order not found: {client_order_id}");
777                }
778            }
779        }
780        Ok(orders)
781    }
782
783    fn load_positions(&mut self) -> anyhow::Result<HashMap<PositionId, Position>> {
784        let mut positions = HashMap::new();
785        let pattern = format!("{POSITIONS}*");
786
787        for key in scan_keys(&mut self.database.con, pattern)? {
788            let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
789            let position_id = PositionId::from(*parts.first().unwrap());
790            let position = self.load_position(&position_id)?;
791            positions.insert(position_id, position);
792        }
793
794        Ok(positions)
795    }
796
797    fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
798        todo!()
799    }
800
801    fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
802        todo!()
803    }
804
805    fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
806        todo!()
807    }
808
809    fn load_instrument(
810        &self,
811        instrument_id: &InstrumentId,
812    ) -> anyhow::Result<Option<InstrumentAny>> {
813        todo!()
814    }
815
816    fn load_synthetic(&self, instrument_id: &InstrumentId) -> anyhow::Result<SyntheticInstrument> {
817        todo!()
818    }
819
820    fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
821        todo!()
822    }
823
824    fn load_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<Option<OrderAny>> {
825        todo!()
826    }
827
828    fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Position> {
829        todo!()
830    }
831
832    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
833        todo!()
834    }
835
836    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
837        todo!()
838    }
839
840    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
841        todo!()
842    }
843
844    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
845        todo!()
846    }
847
848    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
849        todo!()
850    }
851
852    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
853        todo!()
854    }
855
856    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
857        todo!()
858    }
859
860    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
861        todo!()
862    }
863
864    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
865        todo!()
866    }
867
868    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
869        todo!()
870    }
871
872    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
873        todo!()
874    }
875
876    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
877        todo!()
878    }
879
880    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
881        todo!()
882    }
883
884    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
885        anyhow::bail!("Saving market data for Redis cache adapter not supported")
886    }
887
888    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
889        anyhow::bail!("Saving market data for Redis cache adapter not supported")
890    }
891
892    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
893        anyhow::bail!("Loading quote data for Redis cache adapter not supported")
894    }
895
896    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
897        anyhow::bail!("Saving market data for Redis cache adapter not supported")
898    }
899
900    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
901        anyhow::bail!("Loading market data for Redis cache adapter not supported")
902    }
903
904    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
905        anyhow::bail!("Saving market data for Redis cache adapter not supported")
906    }
907
908    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
909        anyhow::bail!("Loading market data for Redis cache adapter not supported")
910    }
911
912    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
913        anyhow::bail!("Saving signals for Redis cache adapter not supported")
914    }
915
916    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
917        anyhow::bail!("Loading signals from Redis cache adapter not supported")
918    }
919
920    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
921        anyhow::bail!("Saving custom data for Redis cache adapter not supported")
922    }
923
924    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
925        anyhow::bail!("Loading custom data from Redis cache adapter not supported")
926    }
927
928    fn load_order_snapshot(
929        &self,
930        client_order_id: &ClientOrderId,
931    ) -> anyhow::Result<Option<OrderSnapshot>> {
932        anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
933    }
934
935    fn load_position_snapshot(
936        &self,
937        position_id: &PositionId,
938    ) -> anyhow::Result<Option<PositionSnapshot>> {
939        anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
940    }
941
942    fn index_venue_order_id(
943        &self,
944        client_order_id: ClientOrderId,
945        venue_order_id: VenueOrderId,
946    ) -> anyhow::Result<()> {
947        todo!()
948    }
949
950    fn index_order_position(
951        &self,
952        client_order_id: ClientOrderId,
953        position_id: PositionId,
954    ) -> anyhow::Result<()> {
955        todo!()
956    }
957
958    fn update_actor(&self) -> anyhow::Result<()> {
959        todo!()
960    }
961
962    fn update_strategy(&self) -> anyhow::Result<()> {
963        todo!()
964    }
965
966    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
967        todo!()
968    }
969
970    fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
971        todo!()
972    }
973
974    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
975        todo!()
976    }
977
978    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
979        todo!()
980    }
981
982    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
983        todo!()
984    }
985
986    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
987        todo!()
988    }
989}
990
991////////////////////////////////////////////////////////////////////////////////
992// Tests
993////////////////////////////////////////////////////////////////////////////////
994#[cfg(test)]
995mod tests {
996    use rstest::rstest;
997
998    use super::*;
999
1000    #[rstest]
1001    fn test_get_trader_key_with_prefix_and_instance_id() {
1002        let trader_id = TraderId::from("tester-123");
1003        let instance_id = UUID4::new();
1004        let mut config = CacheConfig::default();
1005        config.use_instance_id = true;
1006
1007        let key = get_trader_key(trader_id, instance_id, &config);
1008        assert!(key.starts_with("trader-tester-123:"));
1009        assert!(key.ends_with(&instance_id.to_string()));
1010    }
1011
1012    #[rstest]
1013    fn test_get_collection_key_valid() {
1014        let key = "collection:123";
1015        assert_eq!(get_collection_key(key).unwrap(), "collection");
1016    }
1017
1018    #[rstest]
1019    fn test_get_collection_key_invalid() {
1020        let key = "no_delimiter";
1021        assert!(get_collection_key(key).is_err());
1022    }
1023
1024    #[rstest]
1025    fn test_get_index_key_valid() {
1026        let key = "index:123";
1027        assert_eq!(get_index_key(key).unwrap(), "123");
1028    }
1029
1030    #[rstest]
1031    fn test_get_index_key_invalid() {
1032        let key = "no_delimiter";
1033        assert!(get_index_key(key).is_err());
1034    }
1035}