nautilus_infrastructure/redis/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::{HashMap, VecDeque},
18    time::{Duration, Instant},
19};
20
21use bytes::Bytes;
22use nautilus_common::{
23    cache::{
24        CacheConfig,
25        database::{CacheDatabaseAdapter, CacheMap},
26    },
27    custom::CustomData,
28    enums::SerializationEncoding,
29    runtime::get_runtime,
30    signal::Signal,
31};
32use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
33use nautilus_cryptography::providers::install_cryptographic_provider;
34use nautilus_model::{
35    accounts::AccountAny,
36    data::{Bar, DataType, QuoteTick, TradeTick},
37    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
38    identifiers::{
39        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
40        TraderId, VenueOrderId,
41    },
42    instruments::{InstrumentAny, SyntheticInstrument},
43    orderbook::OrderBook,
44    orders::OrderAny,
45    position::Position,
46    types::Currency,
47};
48use redis::{Pipeline, aio::ConnectionManager};
49use tokio::try_join;
50use ustr::Ustr;
51
52use super::{REDIS_DELIMITER, REDIS_FLUSHDB};
53use crate::redis::{create_redis_connection, queries::DatabaseQueries};
54
55// Task and connection names
56const CACHE_READ: &str = "cache-read";
57const CACHE_WRITE: &str = "cache-write";
58
59// Error constants
60const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
61
62// Collection keys
63const INDEX: &str = "index";
64const GENERAL: &str = "general";
65const CURRENCIES: &str = "currencies";
66const INSTRUMENTS: &str = "instruments";
67const SYNTHETICS: &str = "synthetics";
68const ACCOUNTS: &str = "accounts";
69const ORDERS: &str = "orders";
70const POSITIONS: &str = "positions";
71const ACTORS: &str = "actors";
72const STRATEGIES: &str = "strategies";
73const SNAPSHOTS: &str = "snapshots";
74const HEALTH: &str = "health";
75
76// Index keys
77const INDEX_ORDER_IDS: &str = "index:order_ids";
78const INDEX_ORDER_POSITION: &str = "index:order_position";
79const INDEX_ORDER_CLIENT: &str = "index:order_client";
80const INDEX_ORDERS: &str = "index:orders";
81const INDEX_ORDERS_OPEN: &str = "index:orders_open";
82const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
83const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
84const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
85const INDEX_POSITIONS: &str = "index:positions";
86const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
87const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
88
89/// A type of database operation.
90#[derive(Clone, Debug)]
91pub enum DatabaseOperation {
92    Insert,
93    Update,
94    Delete,
95    Close,
96}
97
98/// Represents a database command to be performed which may be executed in a task.
99#[derive(Clone, Debug)]
100pub struct DatabaseCommand {
101    /// The database operation type.
102    pub op_type: DatabaseOperation,
103    /// The primary key for the operation.
104    pub key: Option<String>,
105    /// The data payload for the operation.
106    pub payload: Option<Vec<Bytes>>,
107}
108
109impl DatabaseCommand {
110    /// Creates a new [`DatabaseCommand`] instance.
111    #[must_use]
112    pub fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
113        Self {
114            op_type,
115            key: Some(key),
116            payload,
117        }
118    }
119
120    /// Initialize a `Close` database command, this is meant to close the database cache channel.
121    #[must_use]
122    pub fn close() -> Self {
123        Self {
124            op_type: DatabaseOperation::Close,
125            key: None,
126            payload: None,
127        }
128    }
129}
130
131#[cfg_attr(
132    feature = "python",
133    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
134)]
135pub struct RedisCacheDatabase {
136    pub con: ConnectionManager,
137    pub trader_id: TraderId,
138    encoding: SerializationEncoding,
139    handle: tokio::task::JoinHandle<()>,
140    trader_key: String,
141    tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
142}
143
144impl RedisCacheDatabase {
145    /// Creates a new [`RedisCacheDatabase`] instance.
146    // need to remove async from here
147    pub async fn new(
148        trader_id: TraderId,
149        instance_id: UUID4,
150        config: CacheConfig,
151    ) -> anyhow::Result<RedisCacheDatabase> {
152        install_cryptographic_provider();
153
154        let db_config = config
155            .database
156            .as_ref()
157            .ok_or_else(|| anyhow::anyhow!("No database config"))?;
158        let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
159
160        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
161        let trader_key = get_trader_key(trader_id, instance_id, &config);
162        let trader_key_clone = trader_key.clone();
163        let encoding = config.encoding;
164        let handle = get_runtime().spawn(async move {
165            if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
166                log::error!("Failed to spawn task '{CACHE_WRITE}': {e}");
167            }
168        });
169
170        Ok(RedisCacheDatabase {
171            trader_id,
172            trader_key,
173            con,
174            tx,
175            handle,
176            encoding,
177        })
178    }
179
180    pub fn get_encoding(&self) -> SerializationEncoding {
181        self.encoding
182    }
183
184    pub fn get_trader_key(&self) -> &str {
185        &self.trader_key
186    }
187
188    pub fn close(&mut self) {
189        log::debug!("Closing");
190
191        if let Err(e) = self.tx.send(DatabaseCommand::close()) {
192            log::debug!("Error sending close message: {e:?}")
193        }
194
195        log::debug!("Awaiting task '{CACHE_WRITE}'");
196        tokio::task::block_in_place(|| {
197            if let Err(e) = get_runtime().block_on(&mut self.handle) {
198                log::error!("Error awaiting task '{CACHE_WRITE}': {e:?}");
199            }
200        });
201
202        log::debug!("Closed");
203    }
204
205    pub async fn flushdb(&mut self) {
206        if let Err(e) = redis::cmd(REDIS_FLUSHDB)
207            .query_async::<()>(&mut self.con)
208            .await
209        {
210            log::error!("Failed to flush database: {e:?}");
211        }
212    }
213
214    pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
215        let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
216        log::debug!("Querying keys: {pattern}");
217        DatabaseQueries::scan_keys(&mut self.con, pattern).await
218    }
219
220    pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
221        DatabaseQueries::read(&self.con, &self.trader_key, key).await
222    }
223
224    pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
225        let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
226        match self.tx.send(op) {
227            Ok(_) => Ok(()),
228            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
229        }
230    }
231
232    pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
233        let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
234        match self.tx.send(op) {
235            Ok(_) => Ok(()),
236            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
237        }
238    }
239
240    pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
241        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
242        match self.tx.send(op) {
243            Ok(_) => Ok(()),
244            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
245        }
246    }
247}
248
249async fn process_commands(
250    mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
251    trader_key: String,
252    config: CacheConfig,
253) -> anyhow::Result<()> {
254    tracing::debug!("Starting cache processing");
255
256    let db_config = config
257        .database
258        .as_ref()
259        .ok_or_else(|| anyhow::anyhow!("No database config"))?;
260    let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
261
262    // Buffering
263    let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
264    let mut last_drain = Instant::now();
265    let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
266
267    // Continue to receive and handle messages until channel is hung up
268    loop {
269        if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
270            drain_buffer(&mut con, &trader_key, &mut buffer).await;
271            last_drain = Instant::now();
272        } else {
273            match rx.recv().await {
274                Some(msg) => {
275                    if let DatabaseOperation::Close = msg.op_type {
276                        break;
277                    }
278                    buffer.push_back(msg)
279                }
280                None => break, // Channel hung up
281            }
282        }
283    }
284
285    // Drain any remaining messages
286    if !buffer.is_empty() {
287        drain_buffer(&mut con, &trader_key, &mut buffer).await;
288    }
289
290    tracing::debug!("Stopped cache processing");
291    Ok(())
292}
293
294async fn drain_buffer(
295    conn: &mut ConnectionManager,
296    trader_key: &str,
297    buffer: &mut VecDeque<DatabaseCommand>,
298) {
299    let mut pipe = redis::pipe();
300    pipe.atomic();
301
302    for msg in buffer.drain(..) {
303        let key = match msg.key {
304            Some(key) => key,
305            None => {
306                log::error!("Null key found for message: {msg:?}");
307                continue;
308            }
309        };
310        let collection = match get_collection_key(&key) {
311            Ok(collection) => collection,
312            Err(e) => {
313                tracing::error!("{e}");
314                continue; // Continue to next message
315            }
316        };
317
318        let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
319
320        match msg.op_type {
321            DatabaseOperation::Insert => {
322                if let Some(payload) = msg.payload {
323                    if let Err(e) = insert(&mut pipe, collection, &key, payload) {
324                        tracing::error!("{e}");
325                    }
326                } else {
327                    tracing::error!("Null `payload` for `insert`");
328                }
329            }
330            DatabaseOperation::Update => {
331                if let Some(payload) = msg.payload {
332                    if let Err(e) = update(&mut pipe, collection, &key, payload) {
333                        tracing::error!("{e}");
334                    }
335                } else {
336                    tracing::error!("Null `payload` for `update`");
337                };
338            }
339            DatabaseOperation::Delete => {
340                // `payload` can be `None` for a delete operation
341                if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
342                    tracing::error!("{e}");
343                }
344            }
345            DatabaseOperation::Close => panic!("Close command should not be drained"),
346        }
347    }
348
349    if let Err(e) = pipe.query_async::<()>(conn).await {
350        tracing::error!("{e}");
351    }
352}
353
354fn insert(
355    pipe: &mut Pipeline,
356    collection: &str,
357    key: &str,
358    value: Vec<Bytes>,
359) -> anyhow::Result<()> {
360    check_slice_not_empty(value.as_slice(), stringify!(value))?;
361
362    match collection {
363        INDEX => insert_index(pipe, key, &value),
364        GENERAL => {
365            insert_string(pipe, key, value[0].as_ref());
366            Ok(())
367        }
368        CURRENCIES => {
369            insert_string(pipe, key, value[0].as_ref());
370            Ok(())
371        }
372        INSTRUMENTS => {
373            insert_string(pipe, key, value[0].as_ref());
374            Ok(())
375        }
376        SYNTHETICS => {
377            insert_string(pipe, key, value[0].as_ref());
378            Ok(())
379        }
380        ACCOUNTS => {
381            insert_list(pipe, key, value[0].as_ref());
382            Ok(())
383        }
384        ORDERS => {
385            insert_list(pipe, key, value[0].as_ref());
386            Ok(())
387        }
388        POSITIONS => {
389            insert_list(pipe, key, value[0].as_ref());
390            Ok(())
391        }
392        ACTORS => {
393            insert_string(pipe, key, value[0].as_ref());
394            Ok(())
395        }
396        STRATEGIES => {
397            insert_string(pipe, key, value[0].as_ref());
398            Ok(())
399        }
400        SNAPSHOTS => {
401            insert_list(pipe, key, value[0].as_ref());
402            Ok(())
403        }
404        HEALTH => {
405            insert_string(pipe, key, value[0].as_ref());
406            Ok(())
407        }
408        _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
409    }
410}
411
412fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
413    let index_key = get_index_key(key)?;
414    match index_key {
415        INDEX_ORDER_IDS => {
416            insert_set(pipe, key, value[0].as_ref());
417            Ok(())
418        }
419        INDEX_ORDER_POSITION => {
420            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
421            Ok(())
422        }
423        INDEX_ORDER_CLIENT => {
424            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
425            Ok(())
426        }
427        INDEX_ORDERS => {
428            insert_set(pipe, key, value[0].as_ref());
429            Ok(())
430        }
431        INDEX_ORDERS_OPEN => {
432            insert_set(pipe, key, value[0].as_ref());
433            Ok(())
434        }
435        INDEX_ORDERS_CLOSED => {
436            insert_set(pipe, key, value[0].as_ref());
437            Ok(())
438        }
439        INDEX_ORDERS_EMULATED => {
440            insert_set(pipe, key, value[0].as_ref());
441            Ok(())
442        }
443        INDEX_ORDERS_INFLIGHT => {
444            insert_set(pipe, key, value[0].as_ref());
445            Ok(())
446        }
447        INDEX_POSITIONS => {
448            insert_set(pipe, key, value[0].as_ref());
449            Ok(())
450        }
451        INDEX_POSITIONS_OPEN => {
452            insert_set(pipe, key, value[0].as_ref());
453            Ok(())
454        }
455        INDEX_POSITIONS_CLOSED => {
456            insert_set(pipe, key, value[0].as_ref());
457            Ok(())
458        }
459        _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
460    }
461}
462
463fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
464    pipe.set(key, value);
465}
466
467fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
468    pipe.sadd(key, value);
469}
470
471fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
472    pipe.hset(key, name, value);
473}
474
475fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
476    pipe.rpush(key, value);
477}
478
479fn update(
480    pipe: &mut Pipeline,
481    collection: &str,
482    key: &str,
483    value: Vec<Bytes>,
484) -> anyhow::Result<()> {
485    check_slice_not_empty(value.as_slice(), stringify!(value))?;
486
487    match collection {
488        ACCOUNTS => {
489            update_list(pipe, key, value[0].as_ref());
490            Ok(())
491        }
492        ORDERS => {
493            update_list(pipe, key, value[0].as_ref());
494            Ok(())
495        }
496        POSITIONS => {
497            update_list(pipe, key, value[0].as_ref());
498            Ok(())
499        }
500        _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
501    }
502}
503
504fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
505    pipe.rpush_exists(key, value);
506}
507
508fn delete(
509    pipe: &mut Pipeline,
510    collection: &str,
511    key: &str,
512    value: Option<Vec<Bytes>>,
513) -> anyhow::Result<()> {
514    match collection {
515        INDEX => remove_index(pipe, key, value),
516        ACTORS => {
517            delete_string(pipe, key);
518            Ok(())
519        }
520        STRATEGIES => {
521            delete_string(pipe, key);
522            Ok(())
523        }
524        _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
525    }
526}
527
528fn remove_index(pipe: &mut Pipeline, key: &str, value: Option<Vec<Bytes>>) -> anyhow::Result<()> {
529    let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
530    let index_key = get_index_key(key)?;
531
532    match index_key {
533        INDEX_ORDERS_OPEN => {
534            remove_from_set(pipe, key, value[0].as_ref());
535            Ok(())
536        }
537        INDEX_ORDERS_CLOSED => {
538            remove_from_set(pipe, key, value[0].as_ref());
539            Ok(())
540        }
541        INDEX_ORDERS_EMULATED => {
542            remove_from_set(pipe, key, value[0].as_ref());
543            Ok(())
544        }
545        INDEX_ORDERS_INFLIGHT => {
546            remove_from_set(pipe, key, value[0].as_ref());
547            Ok(())
548        }
549        INDEX_POSITIONS_OPEN => {
550            remove_from_set(pipe, key, value[0].as_ref());
551            Ok(())
552        }
553        INDEX_POSITIONS_CLOSED => {
554            remove_from_set(pipe, key, value[0].as_ref());
555            Ok(())
556        }
557        _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
558    }
559}
560
561fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
562    pipe.srem(key, member);
563}
564
565fn delete_string(pipe: &mut Pipeline, key: &str) {
566    pipe.del(key);
567}
568
569fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
570    let mut key = String::new();
571
572    if config.use_trader_prefix {
573        key.push_str("trader-");
574    }
575
576    key.push_str(trader_id.as_str());
577
578    if config.use_instance_id {
579        key.push(REDIS_DELIMITER);
580        key.push_str(&format!("{instance_id}"));
581    }
582
583    key
584}
585
586fn get_collection_key(key: &str) -> anyhow::Result<&str> {
587    key.split_once(REDIS_DELIMITER)
588        .map(|(collection, _)| collection)
589        .ok_or_else(|| {
590            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
591        })
592}
593
594fn get_index_key(key: &str) -> anyhow::Result<&str> {
595    key.split_once(REDIS_DELIMITER)
596        .map(|(_, index_key)| index_key)
597        .ok_or_else(|| {
598            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
599        })
600}
601
602#[allow(dead_code)] // Under development
603pub struct RedisCacheDatabaseAdapter {
604    pub encoding: SerializationEncoding,
605    database: RedisCacheDatabase,
606}
607
608#[allow(dead_code)] // Under development
609#[allow(unused)] // Under development
610#[async_trait::async_trait]
611impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
612    fn close(&mut self) -> anyhow::Result<()> {
613        self.database.close();
614        Ok(())
615    }
616
617    fn flush(&mut self) -> anyhow::Result<()> {
618        self.database.flushdb();
619        Ok(())
620    }
621
622    async fn load_all(&self) -> anyhow::Result<CacheMap> {
623        tracing::debug!("Loading all data");
624
625        let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
626            self.load_currencies(),
627            self.load_instruments(),
628            self.load_synthetics(),
629            self.load_accounts(),
630            self.load_orders(),
631            self.load_positions()
632        )
633        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
634
635        Ok(CacheMap {
636            currencies,
637            instruments,
638            synthetics,
639            accounts,
640            orders,
641            positions,
642        })
643    }
644
645    fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
646        // self.database.load()
647        Ok(HashMap::new()) // TODO
648    }
649
650    async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>> {
651        DatabaseQueries::load_currencies(
652            &self.database.con,
653            &self.database.trader_key,
654            self.encoding,
655        )
656        .await
657    }
658
659    async fn load_instruments(&self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
660        DatabaseQueries::load_instruments(
661            &self.database.con,
662            &self.database.trader_key,
663            self.encoding,
664        )
665        .await
666    }
667
668    async fn load_synthetics(&self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
669        DatabaseQueries::load_synthetics(
670            &self.database.con,
671            &self.database.trader_key,
672            self.encoding,
673        )
674        .await
675    }
676
677    async fn load_accounts(&self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
678        DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
679            .await
680    }
681
682    async fn load_orders(&self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
683        DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
684            .await
685    }
686
687    async fn load_positions(&self) -> anyhow::Result<HashMap<PositionId, Position>> {
688        DatabaseQueries::load_positions(
689            &self.database.con,
690            &self.database.trader_key,
691            self.encoding,
692        )
693        .await
694    }
695
696    fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
697        todo!()
698    }
699
700    fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
701        todo!()
702    }
703
704    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
705        DatabaseQueries::load_currency(
706            &self.database.con,
707            &self.database.trader_key,
708            code,
709            self.encoding,
710        )
711        .await
712    }
713
714    async fn load_instrument(
715        &self,
716        instrument_id: &InstrumentId,
717    ) -> anyhow::Result<Option<InstrumentAny>> {
718        DatabaseQueries::load_instrument(
719            &self.database.con,
720            &self.database.trader_key,
721            instrument_id,
722            self.encoding,
723        )
724        .await
725    }
726
727    async fn load_synthetic(
728        &self,
729        instrument_id: &InstrumentId,
730    ) -> anyhow::Result<Option<SyntheticInstrument>> {
731        DatabaseQueries::load_synthetic(
732            &self.database.con,
733            &self.database.trader_key,
734            instrument_id,
735            self.encoding,
736        )
737        .await
738    }
739
740    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
741        DatabaseQueries::load_account(
742            &self.database.con,
743            &self.database.trader_key,
744            account_id,
745            self.encoding,
746        )
747        .await
748    }
749
750    async fn load_order(
751        &self,
752        client_order_id: &ClientOrderId,
753    ) -> anyhow::Result<Option<OrderAny>> {
754        DatabaseQueries::load_order(
755            &self.database.con,
756            &self.database.trader_key,
757            client_order_id,
758            self.encoding,
759        )
760        .await
761    }
762
763    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
764        DatabaseQueries::load_position(
765            &self.database.con,
766            &self.database.trader_key,
767            position_id,
768            self.encoding,
769        )
770        .await
771    }
772
773    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
774        todo!()
775    }
776
777    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
778        todo!()
779    }
780
781    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
782        todo!()
783    }
784
785    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
786        todo!()
787    }
788
789    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
790        todo!()
791    }
792
793    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
794        todo!()
795    }
796
797    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
798        todo!()
799    }
800
801    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
802        todo!()
803    }
804
805    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
806        todo!()
807    }
808
809    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
810        todo!()
811    }
812
813    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
814        todo!()
815    }
816
817    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
818        todo!()
819    }
820
821    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
822        todo!()
823    }
824
825    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
826        anyhow::bail!("Saving market data for Redis cache adapter not supported")
827    }
828
829    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
830        anyhow::bail!("Saving market data for Redis cache adapter not supported")
831    }
832
833    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
834        anyhow::bail!("Loading quote data for Redis cache adapter not supported")
835    }
836
837    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
838        anyhow::bail!("Saving market data for Redis cache adapter not supported")
839    }
840
841    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
842        anyhow::bail!("Loading market data for Redis cache adapter not supported")
843    }
844
845    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
846        anyhow::bail!("Saving market data for Redis cache adapter not supported")
847    }
848
849    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
850        anyhow::bail!("Loading market data for Redis cache adapter not supported")
851    }
852
853    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
854        anyhow::bail!("Saving signals for Redis cache adapter not supported")
855    }
856
857    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
858        anyhow::bail!("Loading signals from Redis cache adapter not supported")
859    }
860
861    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
862        anyhow::bail!("Saving custom data for Redis cache adapter not supported")
863    }
864
865    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
866        anyhow::bail!("Loading custom data from Redis cache adapter not supported")
867    }
868
869    fn load_order_snapshot(
870        &self,
871        client_order_id: &ClientOrderId,
872    ) -> anyhow::Result<Option<OrderSnapshot>> {
873        anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
874    }
875
876    fn load_position_snapshot(
877        &self,
878        position_id: &PositionId,
879    ) -> anyhow::Result<Option<PositionSnapshot>> {
880        anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
881    }
882
883    fn index_venue_order_id(
884        &self,
885        client_order_id: ClientOrderId,
886        venue_order_id: VenueOrderId,
887    ) -> anyhow::Result<()> {
888        todo!()
889    }
890
891    fn index_order_position(
892        &self,
893        client_order_id: ClientOrderId,
894        position_id: PositionId,
895    ) -> anyhow::Result<()> {
896        todo!()
897    }
898
899    fn update_actor(&self) -> anyhow::Result<()> {
900        todo!()
901    }
902
903    fn update_strategy(&self) -> anyhow::Result<()> {
904        todo!()
905    }
906
907    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
908        todo!()
909    }
910
911    fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
912        todo!()
913    }
914
915    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
916        todo!()
917    }
918
919    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
920        todo!()
921    }
922
923    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
924        todo!()
925    }
926
927    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
928        todo!()
929    }
930}
931
932////////////////////////////////////////////////////////////////////////////////
933// Tests
934////////////////////////////////////////////////////////////////////////////////
935#[cfg(test)]
936mod tests {
937    use rstest::rstest;
938
939    use super::*;
940
941    #[rstest]
942    fn test_get_trader_key_with_prefix_and_instance_id() {
943        let trader_id = TraderId::from("tester-123");
944        let instance_id = UUID4::new();
945        let mut config = CacheConfig::default();
946        config.use_instance_id = true;
947
948        let key = get_trader_key(trader_id, instance_id, &config);
949        assert!(key.starts_with("trader-tester-123:"));
950        assert!(key.ends_with(&instance_id.to_string()));
951    }
952
953    #[rstest]
954    fn test_get_collection_key_valid() {
955        let key = "collection:123";
956        assert_eq!(get_collection_key(key).unwrap(), "collection");
957    }
958
959    #[rstest]
960    fn test_get_collection_key_invalid() {
961        let key = "no_delimiter";
962        assert!(get_collection_key(key).is_err());
963    }
964
965    #[rstest]
966    fn test_get_index_key_valid() {
967        let key = "index:123";
968        assert_eq!(get_index_key(key).unwrap(), "123");
969    }
970
971    #[rstest]
972    fn test_get_index_key_invalid() {
973        let key = "no_delimiter";
974        assert!(get_index_key(key).is_err());
975    }
976}