nautilus_infrastructure/redis/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::{HashMap, VecDeque},
18    time::{Duration, Instant},
19};
20
21use bytes::Bytes;
22use nautilus_common::{
23    cache::{
24        CacheConfig,
25        database::{CacheDatabaseAdapter, CacheMap},
26    },
27    custom::CustomData,
28    enums::SerializationEncoding,
29    runtime::get_runtime,
30    signal::Signal,
31};
32use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
33use nautilus_cryptography::providers::install_cryptographic_provider;
34use nautilus_model::{
35    accounts::AccountAny,
36    data::{Bar, DataType, QuoteTick, TradeTick},
37    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
38    identifiers::{
39        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
40        TraderId, VenueOrderId,
41    },
42    instruments::{InstrumentAny, SyntheticInstrument},
43    orderbook::OrderBook,
44    orders::OrderAny,
45    position::Position,
46    types::Currency,
47};
48use redis::{Pipeline, aio::ConnectionManager};
49use tokio::try_join;
50use ustr::Ustr;
51
52use super::{REDIS_DELIMITER, REDIS_FLUSHDB};
53use crate::redis::{create_redis_connection, queries::DatabaseQueries};
54
55// Task and connection names
56const CACHE_READ: &str = "cache-read";
57const CACHE_WRITE: &str = "cache-write";
58
59// Error constants
60const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
61
62// Collection keys
63const INDEX: &str = "index";
64const GENERAL: &str = "general";
65const CURRENCIES: &str = "currencies";
66const INSTRUMENTS: &str = "instruments";
67const SYNTHETICS: &str = "synthetics";
68const ACCOUNTS: &str = "accounts";
69const ORDERS: &str = "orders";
70const POSITIONS: &str = "positions";
71const ACTORS: &str = "actors";
72const STRATEGIES: &str = "strategies";
73const SNAPSHOTS: &str = "snapshots";
74const HEALTH: &str = "health";
75
76// Index keys
77const INDEX_ORDER_IDS: &str = "index:order_ids";
78const INDEX_ORDER_POSITION: &str = "index:order_position";
79const INDEX_ORDER_CLIENT: &str = "index:order_client";
80const INDEX_ORDERS: &str = "index:orders";
81const INDEX_ORDERS_OPEN: &str = "index:orders_open";
82const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
83const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
84const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
85const INDEX_POSITIONS: &str = "index:positions";
86const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
87const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
88
89/// A type of database operation.
90#[derive(Clone, Debug)]
91pub enum DatabaseOperation {
92    Insert,
93    Update,
94    Delete,
95    Close,
96}
97
98/// Represents a database command to be performed which may be executed in a task.
99#[derive(Clone, Debug)]
100pub struct DatabaseCommand {
101    /// The database operation type.
102    pub op_type: DatabaseOperation,
103    /// The primary key for the operation.
104    pub key: Option<String>,
105    /// The data payload for the operation.
106    pub payload: Option<Vec<Bytes>>,
107}
108
109impl DatabaseCommand {
110    /// Creates a new [`DatabaseCommand`] instance.
111    #[must_use]
112    pub fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
113        Self {
114            op_type,
115            key: Some(key),
116            payload,
117        }
118    }
119
120    /// Initialize a `Close` database command, this is meant to close the database cache channel.
121    #[must_use]
122    pub fn close() -> Self {
123        Self {
124            op_type: DatabaseOperation::Close,
125            key: None,
126            payload: None,
127        }
128    }
129}
130
131#[cfg_attr(
132    feature = "python",
133    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
134)]
135pub struct RedisCacheDatabase {
136    pub con: ConnectionManager,
137    pub trader_id: TraderId,
138    encoding: SerializationEncoding,
139    handle: tokio::task::JoinHandle<()>,
140    trader_key: String,
141    tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
142}
143
144impl RedisCacheDatabase {
145    /// Creates a new [`RedisCacheDatabase`] instance.
146    // need to remove async from here
147    pub async fn new(
148        trader_id: TraderId,
149        instance_id: UUID4,
150        config: CacheConfig,
151    ) -> anyhow::Result<RedisCacheDatabase> {
152        install_cryptographic_provider();
153
154        let db_config = config
155            .database
156            .as_ref()
157            .ok_or_else(|| anyhow::anyhow!("No database config"))?;
158        let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
159
160        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
161        let trader_key = get_trader_key(trader_id, instance_id, &config);
162        let trader_key_clone = trader_key.clone();
163        let encoding = config.encoding;
164        let handle = get_runtime().spawn(async move {
165            if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
166                log::error!("Error in task '{CACHE_WRITE}': {e}");
167            }
168        });
169
170        Ok(RedisCacheDatabase {
171            trader_id,
172            trader_key,
173            con,
174            tx,
175            handle,
176            encoding,
177        })
178    }
179
180    pub fn get_encoding(&self) -> SerializationEncoding {
181        self.encoding
182    }
183
184    pub fn get_trader_key(&self) -> &str {
185        &self.trader_key
186    }
187
188    pub fn close(&mut self) {
189        log::debug!("Closing");
190
191        if let Err(e) = self.tx.send(DatabaseCommand::close()) {
192            log::debug!("Error sending close command: {e:?}")
193        }
194
195        log::debug!("Awaiting task '{CACHE_WRITE}'");
196        tokio::task::block_in_place(|| {
197            if let Err(e) = get_runtime().block_on(&mut self.handle) {
198                log::error!("Error awaiting task '{CACHE_WRITE}': {e:?}");
199            }
200        });
201
202        log::debug!("Closed");
203    }
204
205    pub async fn flushdb(&mut self) {
206        if let Err(e) = redis::cmd(REDIS_FLUSHDB)
207            .query_async::<()>(&mut self.con)
208            .await
209        {
210            log::error!("Failed to flush database: {e:?}");
211        }
212    }
213
214    pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
215        let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
216        log::debug!("Querying keys: {pattern}");
217        DatabaseQueries::scan_keys(&mut self.con, pattern).await
218    }
219
220    pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
221        DatabaseQueries::read(&self.con, &self.trader_key, key).await
222    }
223
224    pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
225        let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
226        match self.tx.send(op) {
227            Ok(_) => Ok(()),
228            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
229        }
230    }
231
232    pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
233        let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
234        match self.tx.send(op) {
235            Ok(_) => Ok(()),
236            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
237        }
238    }
239
240    pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
241        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
242        match self.tx.send(op) {
243            Ok(_) => Ok(()),
244            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
245        }
246    }
247}
248
249async fn process_commands(
250    mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
251    trader_key: String,
252    config: CacheConfig,
253) -> anyhow::Result<()> {
254    tracing::debug!("Starting cache processing");
255
256    let db_config = config
257        .database
258        .as_ref()
259        .ok_or_else(|| anyhow::anyhow!("No database config"))?;
260    let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
261
262    // Buffering
263    let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
264    let mut last_drain = Instant::now();
265    let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
266
267    // Continue to receive and handle messages until channel is hung up
268    loop {
269        if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
270            drain_buffer(&mut con, &trader_key, &mut buffer).await;
271            last_drain = Instant::now();
272        } else {
273            match rx.recv().await {
274                Some(cmd) => {
275                    tracing::debug!("Received {cmd:?}");
276                    if let DatabaseOperation::Close = cmd.op_type {
277                        break;
278                    }
279                    buffer.push_back(cmd)
280                }
281                None => {
282                    tracing::debug!("Command channel closed");
283                    break;
284                }
285            }
286        }
287    }
288
289    // Drain any remaining messages
290    if !buffer.is_empty() {
291        drain_buffer(&mut con, &trader_key, &mut buffer).await;
292    }
293
294    tracing::debug!("Stopped cache processing");
295    Ok(())
296}
297
298async fn drain_buffer(
299    conn: &mut ConnectionManager,
300    trader_key: &str,
301    buffer: &mut VecDeque<DatabaseCommand>,
302) {
303    let mut pipe = redis::pipe();
304    pipe.atomic();
305
306    for msg in buffer.drain(..) {
307        let key = match msg.key {
308            Some(key) => key,
309            None => {
310                log::error!("Null key found for message: {msg:?}");
311                continue;
312            }
313        };
314        let collection = match get_collection_key(&key) {
315            Ok(collection) => collection,
316            Err(e) => {
317                tracing::error!("{e}");
318                continue; // Continue to next message
319            }
320        };
321
322        let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
323
324        match msg.op_type {
325            DatabaseOperation::Insert => {
326                if let Some(payload) = msg.payload {
327                    if let Err(e) = insert(&mut pipe, collection, &key, payload) {
328                        tracing::error!("{e}");
329                    }
330                } else {
331                    tracing::error!("Null `payload` for `insert`");
332                }
333            }
334            DatabaseOperation::Update => {
335                if let Some(payload) = msg.payload {
336                    if let Err(e) = update(&mut pipe, collection, &key, payload) {
337                        tracing::error!("{e}");
338                    }
339                } else {
340                    tracing::error!("Null `payload` for `update`");
341                };
342            }
343            DatabaseOperation::Delete => {
344                // `payload` can be `None` for a delete operation
345                if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
346                    tracing::error!("{e}");
347                }
348            }
349            DatabaseOperation::Close => panic!("Close command should not be drained"),
350        }
351    }
352
353    if let Err(e) = pipe.query_async::<()>(conn).await {
354        tracing::error!("{e}");
355    }
356}
357
358fn insert(
359    pipe: &mut Pipeline,
360    collection: &str,
361    key: &str,
362    value: Vec<Bytes>,
363) -> anyhow::Result<()> {
364    check_slice_not_empty(value.as_slice(), stringify!(value))?;
365
366    match collection {
367        INDEX => insert_index(pipe, key, &value),
368        GENERAL => {
369            insert_string(pipe, key, value[0].as_ref());
370            Ok(())
371        }
372        CURRENCIES => {
373            insert_string(pipe, key, value[0].as_ref());
374            Ok(())
375        }
376        INSTRUMENTS => {
377            insert_string(pipe, key, value[0].as_ref());
378            Ok(())
379        }
380        SYNTHETICS => {
381            insert_string(pipe, key, value[0].as_ref());
382            Ok(())
383        }
384        ACCOUNTS => {
385            insert_list(pipe, key, value[0].as_ref());
386            Ok(())
387        }
388        ORDERS => {
389            insert_list(pipe, key, value[0].as_ref());
390            Ok(())
391        }
392        POSITIONS => {
393            insert_list(pipe, key, value[0].as_ref());
394            Ok(())
395        }
396        ACTORS => {
397            insert_string(pipe, key, value[0].as_ref());
398            Ok(())
399        }
400        STRATEGIES => {
401            insert_string(pipe, key, value[0].as_ref());
402            Ok(())
403        }
404        SNAPSHOTS => {
405            insert_list(pipe, key, value[0].as_ref());
406            Ok(())
407        }
408        HEALTH => {
409            insert_string(pipe, key, value[0].as_ref());
410            Ok(())
411        }
412        _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
413    }
414}
415
416fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
417    let index_key = get_index_key(key)?;
418    match index_key {
419        INDEX_ORDER_IDS => {
420            insert_set(pipe, key, value[0].as_ref());
421            Ok(())
422        }
423        INDEX_ORDER_POSITION => {
424            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
425            Ok(())
426        }
427        INDEX_ORDER_CLIENT => {
428            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
429            Ok(())
430        }
431        INDEX_ORDERS => {
432            insert_set(pipe, key, value[0].as_ref());
433            Ok(())
434        }
435        INDEX_ORDERS_OPEN => {
436            insert_set(pipe, key, value[0].as_ref());
437            Ok(())
438        }
439        INDEX_ORDERS_CLOSED => {
440            insert_set(pipe, key, value[0].as_ref());
441            Ok(())
442        }
443        INDEX_ORDERS_EMULATED => {
444            insert_set(pipe, key, value[0].as_ref());
445            Ok(())
446        }
447        INDEX_ORDERS_INFLIGHT => {
448            insert_set(pipe, key, value[0].as_ref());
449            Ok(())
450        }
451        INDEX_POSITIONS => {
452            insert_set(pipe, key, value[0].as_ref());
453            Ok(())
454        }
455        INDEX_POSITIONS_OPEN => {
456            insert_set(pipe, key, value[0].as_ref());
457            Ok(())
458        }
459        INDEX_POSITIONS_CLOSED => {
460            insert_set(pipe, key, value[0].as_ref());
461            Ok(())
462        }
463        _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
464    }
465}
466
467fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
468    pipe.set(key, value);
469}
470
471fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
472    pipe.sadd(key, value);
473}
474
475fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
476    pipe.hset(key, name, value);
477}
478
479fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
480    pipe.rpush(key, value);
481}
482
483fn update(
484    pipe: &mut Pipeline,
485    collection: &str,
486    key: &str,
487    value: Vec<Bytes>,
488) -> anyhow::Result<()> {
489    check_slice_not_empty(value.as_slice(), stringify!(value))?;
490
491    match collection {
492        ACCOUNTS => {
493            update_list(pipe, key, value[0].as_ref());
494            Ok(())
495        }
496        ORDERS => {
497            update_list(pipe, key, value[0].as_ref());
498            Ok(())
499        }
500        POSITIONS => {
501            update_list(pipe, key, value[0].as_ref());
502            Ok(())
503        }
504        _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
505    }
506}
507
508fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
509    pipe.rpush_exists(key, value);
510}
511
512fn delete(
513    pipe: &mut Pipeline,
514    collection: &str,
515    key: &str,
516    value: Option<Vec<Bytes>>,
517) -> anyhow::Result<()> {
518    match collection {
519        INDEX => remove_index(pipe, key, value),
520        ACTORS => {
521            delete_string(pipe, key);
522            Ok(())
523        }
524        STRATEGIES => {
525            delete_string(pipe, key);
526            Ok(())
527        }
528        _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
529    }
530}
531
532fn remove_index(pipe: &mut Pipeline, key: &str, value: Option<Vec<Bytes>>) -> anyhow::Result<()> {
533    let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
534    let index_key = get_index_key(key)?;
535
536    match index_key {
537        INDEX_ORDERS_OPEN => {
538            remove_from_set(pipe, key, value[0].as_ref());
539            Ok(())
540        }
541        INDEX_ORDERS_CLOSED => {
542            remove_from_set(pipe, key, value[0].as_ref());
543            Ok(())
544        }
545        INDEX_ORDERS_EMULATED => {
546            remove_from_set(pipe, key, value[0].as_ref());
547            Ok(())
548        }
549        INDEX_ORDERS_INFLIGHT => {
550            remove_from_set(pipe, key, value[0].as_ref());
551            Ok(())
552        }
553        INDEX_POSITIONS_OPEN => {
554            remove_from_set(pipe, key, value[0].as_ref());
555            Ok(())
556        }
557        INDEX_POSITIONS_CLOSED => {
558            remove_from_set(pipe, key, value[0].as_ref());
559            Ok(())
560        }
561        _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
562    }
563}
564
565fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
566    pipe.srem(key, member);
567}
568
569fn delete_string(pipe: &mut Pipeline, key: &str) {
570    pipe.del(key);
571}
572
573fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
574    let mut key = String::new();
575
576    if config.use_trader_prefix {
577        key.push_str("trader-");
578    }
579
580    key.push_str(trader_id.as_str());
581
582    if config.use_instance_id {
583        key.push(REDIS_DELIMITER);
584        key.push_str(&format!("{instance_id}"));
585    }
586
587    key
588}
589
590fn get_collection_key(key: &str) -> anyhow::Result<&str> {
591    key.split_once(REDIS_DELIMITER)
592        .map(|(collection, _)| collection)
593        .ok_or_else(|| {
594            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
595        })
596}
597
598fn get_index_key(key: &str) -> anyhow::Result<&str> {
599    key.split_once(REDIS_DELIMITER)
600        .map(|(_, index_key)| index_key)
601        .ok_or_else(|| {
602            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
603        })
604}
605
606#[allow(dead_code)] // Under development
607pub struct RedisCacheDatabaseAdapter {
608    pub encoding: SerializationEncoding,
609    database: RedisCacheDatabase,
610}
611
612#[allow(dead_code)] // Under development
613#[allow(unused)] // Under development
614#[async_trait::async_trait]
615impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
616    fn close(&mut self) -> anyhow::Result<()> {
617        self.database.close();
618        Ok(())
619    }
620
621    fn flush(&mut self) -> anyhow::Result<()> {
622        self.database.flushdb();
623        Ok(())
624    }
625
626    async fn load_all(&self) -> anyhow::Result<CacheMap> {
627        tracing::debug!("Loading all data");
628
629        let (
630            currencies,
631            instruments,
632            synthetics,
633            accounts,
634            orders,
635            positions,
636            greeks,
637            yield_curves,
638        ) = try_join!(
639            self.load_currencies(),
640            self.load_instruments(),
641            self.load_synthetics(),
642            self.load_accounts(),
643            self.load_orders(),
644            self.load_positions(),
645            self.load_greeks(),
646            self.load_yield_curves()
647        )
648        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
649
650        Ok(CacheMap {
651            currencies,
652            instruments,
653            synthetics,
654            accounts,
655            orders,
656            positions,
657            greeks,
658            yield_curves,
659        })
660    }
661
662    fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
663        // self.database.load()
664        Ok(HashMap::new()) // TODO
665    }
666
667    async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>> {
668        DatabaseQueries::load_currencies(
669            &self.database.con,
670            &self.database.trader_key,
671            self.encoding,
672        )
673        .await
674    }
675
676    async fn load_instruments(&self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
677        DatabaseQueries::load_instruments(
678            &self.database.con,
679            &self.database.trader_key,
680            self.encoding,
681        )
682        .await
683    }
684
685    async fn load_synthetics(&self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
686        DatabaseQueries::load_synthetics(
687            &self.database.con,
688            &self.database.trader_key,
689            self.encoding,
690        )
691        .await
692    }
693
694    async fn load_accounts(&self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
695        DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
696            .await
697    }
698
699    async fn load_orders(&self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
700        DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
701            .await
702    }
703
704    async fn load_positions(&self) -> anyhow::Result<HashMap<PositionId, Position>> {
705        DatabaseQueries::load_positions(
706            &self.database.con,
707            &self.database.trader_key,
708            self.encoding,
709        )
710        .await
711    }
712
713    fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
714        todo!()
715    }
716
717    fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
718        todo!()
719    }
720
721    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
722        DatabaseQueries::load_currency(
723            &self.database.con,
724            &self.database.trader_key,
725            code,
726            self.encoding,
727        )
728        .await
729    }
730
731    async fn load_instrument(
732        &self,
733        instrument_id: &InstrumentId,
734    ) -> anyhow::Result<Option<InstrumentAny>> {
735        DatabaseQueries::load_instrument(
736            &self.database.con,
737            &self.database.trader_key,
738            instrument_id,
739            self.encoding,
740        )
741        .await
742    }
743
744    async fn load_synthetic(
745        &self,
746        instrument_id: &InstrumentId,
747    ) -> anyhow::Result<Option<SyntheticInstrument>> {
748        DatabaseQueries::load_synthetic(
749            &self.database.con,
750            &self.database.trader_key,
751            instrument_id,
752            self.encoding,
753        )
754        .await
755    }
756
757    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
758        DatabaseQueries::load_account(
759            &self.database.con,
760            &self.database.trader_key,
761            account_id,
762            self.encoding,
763        )
764        .await
765    }
766
767    async fn load_order(
768        &self,
769        client_order_id: &ClientOrderId,
770    ) -> anyhow::Result<Option<OrderAny>> {
771        DatabaseQueries::load_order(
772            &self.database.con,
773            &self.database.trader_key,
774            client_order_id,
775            self.encoding,
776        )
777        .await
778    }
779
780    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
781        DatabaseQueries::load_position(
782            &self.database.con,
783            &self.database.trader_key,
784            position_id,
785            self.encoding,
786        )
787        .await
788    }
789
790    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
791        todo!()
792    }
793
794    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
795        todo!()
796    }
797
798    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
799        todo!()
800    }
801
802    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
803        todo!()
804    }
805
806    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
807        todo!()
808    }
809
810    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
811        todo!()
812    }
813
814    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
815        todo!()
816    }
817
818    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
819        todo!()
820    }
821
822    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
823        todo!()
824    }
825
826    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
827        todo!()
828    }
829
830    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
831        todo!()
832    }
833
834    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
835        todo!()
836    }
837
838    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
839        todo!()
840    }
841
842    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
843        anyhow::bail!("Saving market data for Redis cache adapter not supported")
844    }
845
846    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
847        anyhow::bail!("Saving market data for Redis cache adapter not supported")
848    }
849
850    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
851        anyhow::bail!("Loading quote data for Redis cache adapter not supported")
852    }
853
854    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
855        anyhow::bail!("Saving market data for Redis cache adapter not supported")
856    }
857
858    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
859        anyhow::bail!("Loading market data for Redis cache adapter not supported")
860    }
861
862    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
863        anyhow::bail!("Saving market data for Redis cache adapter not supported")
864    }
865
866    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
867        anyhow::bail!("Loading market data for Redis cache adapter not supported")
868    }
869
870    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
871        anyhow::bail!("Saving signals for Redis cache adapter not supported")
872    }
873
874    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
875        anyhow::bail!("Loading signals from Redis cache adapter not supported")
876    }
877
878    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
879        anyhow::bail!("Saving custom data for Redis cache adapter not supported")
880    }
881
882    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
883        anyhow::bail!("Loading custom data from Redis cache adapter not supported")
884    }
885
886    fn load_order_snapshot(
887        &self,
888        client_order_id: &ClientOrderId,
889    ) -> anyhow::Result<Option<OrderSnapshot>> {
890        anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
891    }
892
893    fn load_position_snapshot(
894        &self,
895        position_id: &PositionId,
896    ) -> anyhow::Result<Option<PositionSnapshot>> {
897        anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
898    }
899
900    fn index_venue_order_id(
901        &self,
902        client_order_id: ClientOrderId,
903        venue_order_id: VenueOrderId,
904    ) -> anyhow::Result<()> {
905        todo!()
906    }
907
908    fn index_order_position(
909        &self,
910        client_order_id: ClientOrderId,
911        position_id: PositionId,
912    ) -> anyhow::Result<()> {
913        todo!()
914    }
915
916    fn update_actor(&self) -> anyhow::Result<()> {
917        todo!()
918    }
919
920    fn update_strategy(&self) -> anyhow::Result<()> {
921        todo!()
922    }
923
924    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
925        todo!()
926    }
927
928    fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
929        todo!()
930    }
931
932    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
933        todo!()
934    }
935
936    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
937        todo!()
938    }
939
940    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
941        todo!()
942    }
943
944    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
945        todo!()
946    }
947}
948
949////////////////////////////////////////////////////////////////////////////////
950// Tests
951////////////////////////////////////////////////////////////////////////////////
952#[cfg(test)]
953mod tests {
954    use rstest::rstest;
955
956    use super::*;
957
958    #[rstest]
959    fn test_get_trader_key_with_prefix_and_instance_id() {
960        let trader_id = TraderId::from("tester-123");
961        let instance_id = UUID4::new();
962        let mut config = CacheConfig::default();
963        config.use_instance_id = true;
964
965        let key = get_trader_key(trader_id, instance_id, &config);
966        assert!(key.starts_with("trader-tester-123:"));
967        assert!(key.ends_with(&instance_id.to_string()));
968    }
969
970    #[rstest]
971    fn test_get_collection_key_valid() {
972        let key = "collection:123";
973        assert_eq!(get_collection_key(key).unwrap(), "collection");
974    }
975
976    #[rstest]
977    fn test_get_collection_key_invalid() {
978        let key = "no_delimiter";
979        assert!(get_collection_key(key).is_err());
980    }
981
982    #[rstest]
983    fn test_get_index_key_valid() {
984        let key = "index:123";
985        assert_eq!(get_index_key(key).unwrap(), "123");
986    }
987
988    #[rstest]
989    fn test_get_index_key_invalid() {
990        let key = "no_delimiter";
991        assert!(get_index_key(key).is_err());
992    }
993}