nautilus_infrastructure/redis/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::VecDeque, fmt::Debug, time::Duration};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use nautilus_common::{
21    cache::{
22        CacheConfig,
23        database::{CacheDatabaseAdapter, CacheMap},
24    },
25    custom::CustomData,
26    enums::SerializationEncoding,
27    logging::{log_task_awaiting, log_task_started, log_task_stopped},
28    runtime::get_runtime,
29    signal::Signal,
30};
31use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
32use nautilus_cryptography::providers::install_cryptographic_provider;
33use nautilus_model::{
34    accounts::AccountAny,
35    data::{Bar, DataType, QuoteTick, TradeTick},
36    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
37    identifiers::{
38        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
39        TraderId, VenueOrderId,
40    },
41    instruments::{InstrumentAny, SyntheticInstrument},
42    orderbook::OrderBook,
43    orders::OrderAny,
44    position::Position,
45    types::Currency,
46};
47use redis::{Pipeline, aio::ConnectionManager};
48use tokio::try_join;
49use ustr::Ustr;
50
51use super::{REDIS_DELIMITER, REDIS_FLUSHDB};
52use crate::redis::{create_redis_connection, queries::DatabaseQueries};
53
54// Task and connection names
55const CACHE_READ: &str = "cache-read";
56const CACHE_WRITE: &str = "cache-write";
57const CACHE_PROCESS: &str = "cache-process";
58
59// Error constants
60const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
61
62// Collection keys
63const INDEX: &str = "index";
64const GENERAL: &str = "general";
65const CURRENCIES: &str = "currencies";
66const INSTRUMENTS: &str = "instruments";
67const SYNTHETICS: &str = "synthetics";
68const ACCOUNTS: &str = "accounts";
69const ORDERS: &str = "orders";
70const POSITIONS: &str = "positions";
71const ACTORS: &str = "actors";
72const STRATEGIES: &str = "strategies";
73const SNAPSHOTS: &str = "snapshots";
74const HEALTH: &str = "health";
75
76// Index keys
77const INDEX_ORDER_IDS: &str = "index:order_ids";
78const INDEX_ORDER_POSITION: &str = "index:order_position";
79const INDEX_ORDER_CLIENT: &str = "index:order_client";
80const INDEX_ORDERS: &str = "index:orders";
81const INDEX_ORDERS_OPEN: &str = "index:orders_open";
82const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
83const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
84const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
85const INDEX_POSITIONS: &str = "index:positions";
86const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
87const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
88
89/// A type of database operation.
90#[derive(Clone, Debug)]
91pub enum DatabaseOperation {
92    Insert,
93    Update,
94    Delete,
95    Close,
96}
97
98/// Represents a database command to be performed which may be executed in a task.
99#[derive(Clone, Debug)]
100pub struct DatabaseCommand {
101    /// The database operation type.
102    pub op_type: DatabaseOperation,
103    /// The primary key for the operation.
104    pub key: Option<String>,
105    /// The data payload for the operation.
106    pub payload: Option<Vec<Bytes>>,
107}
108
109impl DatabaseCommand {
110    /// Creates a new [`DatabaseCommand`] instance.
111    #[must_use]
112    pub const fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
113        Self {
114            op_type,
115            key: Some(key),
116            payload,
117        }
118    }
119
120    /// Initialize a `Close` database command, this is meant to close the database cache channel.
121    #[must_use]
122    pub const fn close() -> Self {
123        Self {
124            op_type: DatabaseOperation::Close,
125            key: None,
126            payload: None,
127        }
128    }
129}
130
131#[cfg_attr(
132    feature = "python",
133    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
134)]
135pub struct RedisCacheDatabase {
136    pub con: ConnectionManager,
137    pub trader_id: TraderId,
138    pub trader_key: String,
139    pub encoding: SerializationEncoding,
140    tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
141    handle: tokio::task::JoinHandle<()>,
142}
143
144impl Debug for RedisCacheDatabase {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct(stringify!(RedisCacheDatabase))
147            .field("trader_id", &self.trader_id)
148            .field("encoding", &self.encoding)
149            .finish()
150    }
151}
152
153impl RedisCacheDatabase {
154    /// Creates a new [`RedisCacheDatabase`] instance for the given `trader_id`, `instance_id`, and `config`.
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if:
159    /// - The database configuration is missing in `config`.
160    /// - Establishing the Redis connection fails.
161    /// - The command processing task cannot be spawned.
162    pub async fn new(
163        trader_id: TraderId,
164        instance_id: UUID4,
165        config: CacheConfig,
166    ) -> anyhow::Result<Self> {
167        install_cryptographic_provider();
168
169        let db_config = config
170            .database
171            .as_ref()
172            .ok_or_else(|| anyhow::anyhow!("No database config"))?;
173        let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
174
175        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
176        let trader_key = get_trader_key(trader_id, instance_id, &config);
177        let trader_key_clone = trader_key.clone();
178        let encoding = config.encoding;
179
180        let handle = get_runtime().spawn(async move {
181            if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
182                log::error!("Error in task '{CACHE_PROCESS}': {e}");
183            }
184        });
185
186        Ok(Self {
187            con,
188            trader_id,
189            trader_key,
190            encoding,
191            tx,
192            handle,
193        })
194    }
195
196    #[must_use]
197    pub const fn get_encoding(&self) -> SerializationEncoding {
198        self.encoding
199    }
200
201    #[must_use]
202    pub fn get_trader_key(&self) -> &str {
203        &self.trader_key
204    }
205
206    pub fn close(&mut self) {
207        log::debug!("Closing");
208
209        if let Err(e) = self.tx.send(DatabaseCommand::close()) {
210            log::debug!("Error sending close command: {e:?}");
211        }
212
213        log_task_awaiting(CACHE_PROCESS);
214
215        tokio::task::block_in_place(|| {
216            if let Err(e) = get_runtime().block_on(&mut self.handle) {
217                log::error!("Error awaiting task '{CACHE_PROCESS}': {e:?}");
218            }
219        });
220
221        log::debug!("Closed");
222    }
223
224    pub async fn flushdb(&mut self) {
225        if let Err(e) = redis::cmd(REDIS_FLUSHDB)
226            .query_async::<()>(&mut self.con)
227            .await
228        {
229            log::error!("Failed to flush database: {e:?}");
230        }
231    }
232
233    /// Retrieves all keys matching the given `pattern` from Redis for this trader.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the underlying Redis scan operation fails.
238    pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
239        let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
240        DatabaseQueries::scan_keys(&mut self.con, pattern).await
241    }
242
243    /// Reads the value(s) associated with `key` for this trader from Redis.
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the underlying Redis read operation fails.
248    pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
249        DatabaseQueries::read(&self.con, &self.trader_key, key).await
250    }
251
252    /// Reads multiple values using bulk operations for efficiency.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if the underlying Redis read operation fails.
257    pub async fn read_bulk(&mut self, keys: &[String]) -> anyhow::Result<Vec<Option<Bytes>>> {
258        DatabaseQueries::read_bulk(&self.con, keys).await
259    }
260
261    /// Sends an insert command for `key` with optional `payload` to Redis via the background task.
262    ///
263    /// # Errors
264    ///
265    /// Returns an error if the command cannot be sent to the background task channel.
266    pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
267        let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
268        match self.tx.send(op) {
269            Ok(()) => Ok(()),
270            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
271        }
272    }
273
274    /// Sends an update command for `key` with optional `payload` to Redis via the background task.
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if the command cannot be sent to the background task channel.
279    pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
280        let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
281        match self.tx.send(op) {
282            Ok(()) => Ok(()),
283            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
284        }
285    }
286
287    /// Sends a delete command for `key` with optional `payload` to Redis via the background task.
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if the command cannot be sent to the background task channel.
292    pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
293        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
294        match self.tx.send(op) {
295            Ok(()) => Ok(()),
296            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
297        }
298    }
299
300    /// Delete the given order from the database with comprehensive index cleanup.
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if the command cannot be sent to the background task channel.
305    pub fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
306        let order_id_bytes = Bytes::from(client_order_id.to_string());
307
308        // Delete the order itself
309        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
310        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
311        self.tx
312            .send(op)
313            .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
314
315        // Delete from all order indexes
316        let index_keys = [
317            INDEX_ORDER_IDS,
318            INDEX_ORDERS,
319            INDEX_ORDERS_OPEN,
320            INDEX_ORDERS_CLOSED,
321            INDEX_ORDERS_EMULATED,
322            INDEX_ORDERS_INFLIGHT,
323        ];
324
325        for index_key in &index_keys {
326            let key = (*index_key).to_string();
327            let payload = vec![order_id_bytes.clone()];
328            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
329            self.tx
330                .send(op)
331                .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
332        }
333
334        // Delete from hash indexes
335        let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
336        for index_key in &hash_indexes {
337            let key = (*index_key).to_string();
338            let payload = vec![order_id_bytes.clone()];
339            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
340            self.tx.send(op).map_err(|e| {
341                anyhow::anyhow!("Failed to send delete order hash index command: {e}")
342            })?;
343        }
344
345        Ok(())
346    }
347
348    /// Delete the given position from the database with comprehensive index cleanup.
349    ///
350    /// # Errors
351    ///
352    /// Returns an error if the command cannot be sent to the background task channel.
353    pub fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
354        let position_id_bytes = Bytes::from(position_id.to_string());
355
356        // Delete the position itself
357        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
358        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
359        self.tx
360            .send(op)
361            .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
362
363        // Delete from all position indexes
364        let index_keys = [
365            INDEX_POSITIONS,
366            INDEX_POSITIONS_OPEN,
367            INDEX_POSITIONS_CLOSED,
368        ];
369
370        for index_key in &index_keys {
371            let key = (*index_key).to_string();
372            let payload = vec![position_id_bytes.clone()];
373            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
374            self.tx.send(op).map_err(|e| {
375                anyhow::anyhow!("Failed to send delete position index command: {e}")
376            })?;
377        }
378
379        Ok(())
380    }
381
382    /// Delete the given account event from the database.
383    ///
384    /// # Errors
385    ///
386    /// Returns an error if the command cannot be sent to the background task channel.
387    pub fn delete_account_event(
388        &self,
389        _account_id: &AccountId,
390        _event_id: &str,
391    ) -> anyhow::Result<()> {
392        tracing::warn!("Deleting account events currently a no-op (pending redesign)");
393        Ok(())
394    }
395}
396
397async fn process_commands(
398    mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
399    trader_key: String,
400    config: CacheConfig,
401) -> anyhow::Result<()> {
402    log_task_started(CACHE_PROCESS);
403
404    let db_config = config
405        .database
406        .as_ref()
407        .ok_or_else(|| anyhow::anyhow!("No database config"))?;
408    let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
409
410    // Buffering
411    let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
412    let mut last_drain = std::time::Instant::now();
413    let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
414
415    // Continue to receive and handle messages until channel is hung up
416    loop {
417        if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
418            drain_buffer(&mut con, &trader_key, &mut buffer).await;
419            last_drain = std::time::Instant::now();
420        } else if let Some(cmd) = rx.recv().await {
421            tracing::trace!("Received {cmd:?}");
422
423            if matches!(cmd.op_type, DatabaseOperation::Close) {
424                break;
425            }
426            buffer.push_back(cmd);
427        } else {
428            tracing::debug!("Command channel closed");
429            break;
430        }
431    }
432
433    // Drain any remaining messages
434    if !buffer.is_empty() {
435        drain_buffer(&mut con, &trader_key, &mut buffer).await;
436    }
437
438    log_task_stopped(CACHE_PROCESS);
439    Ok(())
440}
441
442async fn drain_buffer(
443    conn: &mut ConnectionManager,
444    trader_key: &str,
445    buffer: &mut VecDeque<DatabaseCommand>,
446) {
447    let mut pipe = redis::pipe();
448    pipe.atomic();
449
450    for msg in buffer.drain(..) {
451        let key = if let Some(key) = msg.key {
452            key
453        } else {
454            log::error!("Null key found for message: {msg:?}");
455            continue;
456        };
457        let collection = match get_collection_key(&key) {
458            Ok(collection) => collection,
459            Err(e) => {
460                tracing::error!("{e}");
461                continue; // Continue to next message
462            }
463        };
464
465        let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
466
467        match msg.op_type {
468            DatabaseOperation::Insert => {
469                if let Some(payload) = msg.payload {
470                    log::debug!("Processing INSERT for collection: {collection}, key: {key}");
471                    if let Err(e) = insert(&mut pipe, collection, &key, payload) {
472                        tracing::error!("{e}");
473                    }
474                } else {
475                    tracing::error!("Null `payload` for `insert`");
476                }
477            }
478            DatabaseOperation::Update => {
479                if let Some(payload) = msg.payload {
480                    log::debug!("Processing UPDATE for collection: {collection}, key: {key}");
481                    if let Err(e) = update(&mut pipe, collection, &key, payload) {
482                        tracing::error!("{e}");
483                    }
484                } else {
485                    tracing::error!("Null `payload` for `update`");
486                }
487            }
488            DatabaseOperation::Delete => {
489                tracing::debug!(
490                    "Processing DELETE for collection: {}, key: {}, payload: {:?}",
491                    collection,
492                    key,
493                    msg.payload.as_ref().map(std::vec::Vec::len)
494                );
495                // `payload` can be `None` for a delete operation
496                if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
497                    tracing::error!("{e}");
498                }
499            }
500            DatabaseOperation::Close => panic!("Close command should not be drained"),
501        }
502    }
503
504    if let Err(e) = pipe.query_async::<()>(conn).await {
505        tracing::error!("{e}");
506    }
507}
508
509fn insert(
510    pipe: &mut Pipeline,
511    collection: &str,
512    key: &str,
513    value: Vec<Bytes>,
514) -> anyhow::Result<()> {
515    check_slice_not_empty(value.as_slice(), stringify!(value))?;
516
517    match collection {
518        INDEX => insert_index(pipe, key, &value),
519        GENERAL => {
520            insert_string(pipe, key, value[0].as_ref());
521            Ok(())
522        }
523        CURRENCIES => {
524            insert_string(pipe, key, value[0].as_ref());
525            Ok(())
526        }
527        INSTRUMENTS => {
528            insert_string(pipe, key, value[0].as_ref());
529            Ok(())
530        }
531        SYNTHETICS => {
532            insert_string(pipe, key, value[0].as_ref());
533            Ok(())
534        }
535        ACCOUNTS => {
536            insert_list(pipe, key, value[0].as_ref());
537            Ok(())
538        }
539        ORDERS => {
540            insert_list(pipe, key, value[0].as_ref());
541            Ok(())
542        }
543        POSITIONS => {
544            insert_list(pipe, key, value[0].as_ref());
545            Ok(())
546        }
547        ACTORS => {
548            insert_string(pipe, key, value[0].as_ref());
549            Ok(())
550        }
551        STRATEGIES => {
552            insert_string(pipe, key, value[0].as_ref());
553            Ok(())
554        }
555        SNAPSHOTS => {
556            insert_list(pipe, key, value[0].as_ref());
557            Ok(())
558        }
559        HEALTH => {
560            insert_string(pipe, key, value[0].as_ref());
561            Ok(())
562        }
563        _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
564    }
565}
566
567fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
568    let index_key = get_index_key(key)?;
569    match index_key {
570        INDEX_ORDER_IDS => {
571            insert_set(pipe, key, value[0].as_ref());
572            Ok(())
573        }
574        INDEX_ORDER_POSITION => {
575            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
576            Ok(())
577        }
578        INDEX_ORDER_CLIENT => {
579            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
580            Ok(())
581        }
582        INDEX_ORDERS => {
583            insert_set(pipe, key, value[0].as_ref());
584            Ok(())
585        }
586        INDEX_ORDERS_OPEN => {
587            insert_set(pipe, key, value[0].as_ref());
588            Ok(())
589        }
590        INDEX_ORDERS_CLOSED => {
591            insert_set(pipe, key, value[0].as_ref());
592            Ok(())
593        }
594        INDEX_ORDERS_EMULATED => {
595            insert_set(pipe, key, value[0].as_ref());
596            Ok(())
597        }
598        INDEX_ORDERS_INFLIGHT => {
599            insert_set(pipe, key, value[0].as_ref());
600            Ok(())
601        }
602        INDEX_POSITIONS => {
603            insert_set(pipe, key, value[0].as_ref());
604            Ok(())
605        }
606        INDEX_POSITIONS_OPEN => {
607            insert_set(pipe, key, value[0].as_ref());
608            Ok(())
609        }
610        INDEX_POSITIONS_CLOSED => {
611            insert_set(pipe, key, value[0].as_ref());
612            Ok(())
613        }
614        _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
615    }
616}
617
618fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
619    pipe.set(key, value);
620}
621
622fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
623    pipe.sadd(key, value);
624}
625
626fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
627    pipe.hset(key, name, value);
628}
629
630fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
631    pipe.rpush(key, value);
632}
633
634fn update(
635    pipe: &mut Pipeline,
636    collection: &str,
637    key: &str,
638    value: Vec<Bytes>,
639) -> anyhow::Result<()> {
640    check_slice_not_empty(value.as_slice(), stringify!(value))?;
641
642    match collection {
643        ACCOUNTS => {
644            update_list(pipe, key, value[0].as_ref());
645            Ok(())
646        }
647        ORDERS => {
648            update_list(pipe, key, value[0].as_ref());
649            Ok(())
650        }
651        POSITIONS => {
652            update_list(pipe, key, value[0].as_ref());
653            Ok(())
654        }
655        _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
656    }
657}
658
659fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
660    pipe.rpush_exists(key, value);
661}
662
663fn delete(
664    pipe: &mut Pipeline,
665    collection: &str,
666    key: &str,
667    value: Option<Vec<Bytes>>,
668) -> anyhow::Result<()> {
669    tracing::debug!(
670        "delete: collection={}, key={}, has_payload={}",
671        collection,
672        key,
673        value.is_some()
674    );
675
676    match collection {
677        INDEX => delete_from_index(pipe, key, value),
678        ORDERS => {
679            delete_string(pipe, key);
680            Ok(())
681        }
682        POSITIONS => {
683            delete_string(pipe, key);
684            Ok(())
685        }
686        ACCOUNTS => {
687            delete_string(pipe, key);
688            Ok(())
689        }
690        ACTORS => {
691            delete_string(pipe, key);
692            Ok(())
693        }
694        STRATEGIES => {
695            delete_string(pipe, key);
696            Ok(())
697        }
698        _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
699    }
700}
701
702fn delete_from_index(
703    pipe: &mut Pipeline,
704    key: &str,
705    value: Option<Vec<Bytes>>,
706) -> anyhow::Result<()> {
707    let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
708    let index_key = get_index_key(key)?;
709
710    match index_key {
711        INDEX_ORDER_IDS => {
712            remove_from_set(pipe, key, value[0].as_ref());
713            Ok(())
714        }
715        INDEX_ORDER_POSITION => {
716            remove_from_hash(pipe, key, value[0].as_ref());
717            Ok(())
718        }
719        INDEX_ORDER_CLIENT => {
720            remove_from_hash(pipe, key, value[0].as_ref());
721            Ok(())
722        }
723        INDEX_ORDERS => {
724            remove_from_set(pipe, key, value[0].as_ref());
725            Ok(())
726        }
727        INDEX_ORDERS_OPEN => {
728            remove_from_set(pipe, key, value[0].as_ref());
729            Ok(())
730        }
731        INDEX_ORDERS_CLOSED => {
732            remove_from_set(pipe, key, value[0].as_ref());
733            Ok(())
734        }
735        INDEX_ORDERS_EMULATED => {
736            remove_from_set(pipe, key, value[0].as_ref());
737            Ok(())
738        }
739        INDEX_ORDERS_INFLIGHT => {
740            remove_from_set(pipe, key, value[0].as_ref());
741            Ok(())
742        }
743        INDEX_POSITIONS => {
744            remove_from_set(pipe, key, value[0].as_ref());
745            Ok(())
746        }
747        INDEX_POSITIONS_OPEN => {
748            remove_from_set(pipe, key, value[0].as_ref());
749            Ok(())
750        }
751        INDEX_POSITIONS_CLOSED => {
752            remove_from_set(pipe, key, value[0].as_ref());
753            Ok(())
754        }
755        _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
756    }
757}
758
759fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
760    pipe.srem(key, member);
761}
762
763fn remove_from_hash(pipe: &mut Pipeline, key: &str, field: &[u8]) {
764    pipe.hdel(key, field);
765}
766
767fn delete_string(pipe: &mut Pipeline, key: &str) {
768    pipe.del(key);
769}
770
771fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
772    let mut key = String::new();
773
774    if config.use_trader_prefix {
775        key.push_str("trader-");
776    }
777
778    key.push_str(trader_id.as_str());
779
780    if config.use_instance_id {
781        key.push(REDIS_DELIMITER);
782        key.push_str(&format!("{instance_id}"));
783    }
784
785    key
786}
787
788fn get_collection_key(key: &str) -> anyhow::Result<&str> {
789    key.split_once(REDIS_DELIMITER)
790        .map(|(collection, _)| collection)
791        .ok_or_else(|| {
792            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
793        })
794}
795
796fn get_index_key(key: &str) -> anyhow::Result<&str> {
797    key.split_once(REDIS_DELIMITER)
798        .map(|(_, index_key)| index_key)
799        .ok_or_else(|| {
800            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
801        })
802}
803
804#[allow(dead_code)]
805#[derive(Debug)]
806pub struct RedisCacheDatabaseAdapter {
807    pub encoding: SerializationEncoding,
808    pub database: RedisCacheDatabase,
809}
810
811#[allow(dead_code)]
812#[allow(unused)]
813#[async_trait::async_trait]
814impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
815    fn close(&mut self) -> anyhow::Result<()> {
816        self.database.close();
817        Ok(())
818    }
819
820    fn flush(&mut self) -> anyhow::Result<()> {
821        self.database.flushdb();
822        Ok(())
823    }
824
825    async fn load_all(&self) -> anyhow::Result<CacheMap> {
826        tracing::debug!("Loading all data");
827
828        let (
829            currencies,
830            instruments,
831            synthetics,
832            accounts,
833            orders,
834            positions,
835            greeks,
836            yield_curves,
837        ) = try_join!(
838            self.load_currencies(),
839            self.load_instruments(),
840            self.load_synthetics(),
841            self.load_accounts(),
842            self.load_orders(),
843            self.load_positions(),
844            self.load_greeks(),
845            self.load_yield_curves()
846        )
847        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
848
849        Ok(CacheMap {
850            currencies,
851            instruments,
852            synthetics,
853            accounts,
854            orders,
855            positions,
856            greeks,
857            yield_curves,
858        })
859    }
860
861    fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
862        // self.database.load()
863        Ok(AHashMap::new()) // TODO
864    }
865
866    async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
867        DatabaseQueries::load_currencies(
868            &self.database.con,
869            &self.database.trader_key,
870            self.encoding,
871        )
872        .await
873    }
874
875    async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
876        DatabaseQueries::load_instruments(
877            &self.database.con,
878            &self.database.trader_key,
879            self.encoding,
880        )
881        .await
882    }
883
884    async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
885        DatabaseQueries::load_synthetics(
886            &self.database.con,
887            &self.database.trader_key,
888            self.encoding,
889        )
890        .await
891    }
892
893    async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
894        DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
895            .await
896    }
897
898    async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
899        DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
900            .await
901    }
902
903    async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
904        DatabaseQueries::load_positions(
905            &self.database.con,
906            &self.database.trader_key,
907            self.encoding,
908        )
909        .await
910    }
911
912    fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
913        todo!()
914    }
915
916    fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
917        todo!()
918    }
919
920    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
921        DatabaseQueries::load_currency(
922            &self.database.con,
923            &self.database.trader_key,
924            code,
925            self.encoding,
926        )
927        .await
928    }
929
930    async fn load_instrument(
931        &self,
932        instrument_id: &InstrumentId,
933    ) -> anyhow::Result<Option<InstrumentAny>> {
934        DatabaseQueries::load_instrument(
935            &self.database.con,
936            &self.database.trader_key,
937            instrument_id,
938            self.encoding,
939        )
940        .await
941    }
942
943    async fn load_synthetic(
944        &self,
945        instrument_id: &InstrumentId,
946    ) -> anyhow::Result<Option<SyntheticInstrument>> {
947        DatabaseQueries::load_synthetic(
948            &self.database.con,
949            &self.database.trader_key,
950            instrument_id,
951            self.encoding,
952        )
953        .await
954    }
955
956    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
957        DatabaseQueries::load_account(
958            &self.database.con,
959            &self.database.trader_key,
960            account_id,
961            self.encoding,
962        )
963        .await
964    }
965
966    async fn load_order(
967        &self,
968        client_order_id: &ClientOrderId,
969    ) -> anyhow::Result<Option<OrderAny>> {
970        DatabaseQueries::load_order(
971            &self.database.con,
972            &self.database.trader_key,
973            client_order_id,
974            self.encoding,
975        )
976        .await
977    }
978
979    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
980        DatabaseQueries::load_position(
981            &self.database.con,
982            &self.database.trader_key,
983            position_id,
984            self.encoding,
985        )
986        .await
987    }
988
989    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
990        todo!()
991    }
992
993    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
994        todo!()
995    }
996
997    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
998        todo!()
999    }
1000
1001    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
1002        todo!()
1003    }
1004
1005    fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
1006        let order_id_bytes = Bytes::from(client_order_id.to_string());
1007
1008        log::debug!("Deleting order: {client_order_id} from Redis");
1009        log::debug!("Trader key: {}", self.database.trader_key);
1010
1011        // Delete the order itself
1012        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
1013        log::debug!("Deleting order key: {key}");
1014        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1015        self.database
1016            .tx
1017            .send(op)
1018            .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
1019
1020        // Delete from all order indexes
1021        let index_keys = [
1022            INDEX_ORDER_IDS,
1023            INDEX_ORDERS,
1024            INDEX_ORDERS_OPEN,
1025            INDEX_ORDERS_CLOSED,
1026            INDEX_ORDERS_EMULATED,
1027            INDEX_ORDERS_INFLIGHT,
1028        ];
1029
1030        for index_key in &index_keys {
1031            let key = (*index_key).to_string();
1032            log::debug!("Deleting from index: {key} (order_id: {client_order_id})");
1033            let payload = vec![order_id_bytes.clone()];
1034            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1035            self.database
1036                .tx
1037                .send(op)
1038                .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
1039        }
1040
1041        // Delete from hash indexes
1042        let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
1043        for index_key in &hash_indexes {
1044            let key = (*index_key).to_string();
1045            log::debug!("Deleting from hash index: {key} (order_id: {client_order_id})");
1046            let payload = vec![order_id_bytes.clone()];
1047            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1048            self.database.tx.send(op).map_err(|e| {
1049                anyhow::anyhow!("Failed to send delete order hash index command: {e}")
1050            })?;
1051        }
1052
1053        log::debug!("Sent all delete commands for order: {client_order_id}");
1054        Ok(())
1055    }
1056
1057    fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
1058        let position_id_bytes = Bytes::from(position_id.to_string());
1059
1060        // Delete the position itself
1061        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
1062        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1063        self.database
1064            .tx
1065            .send(op)
1066            .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
1067
1068        // Delete from all position indexes
1069        let index_keys = [
1070            INDEX_POSITIONS,
1071            INDEX_POSITIONS_OPEN,
1072            INDEX_POSITIONS_CLOSED,
1073        ];
1074
1075        for index_key in &index_keys {
1076            let key = (*index_key).to_string();
1077            let payload = vec![position_id_bytes.clone()];
1078            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1079            self.database.tx.send(op).map_err(|e| {
1080                anyhow::anyhow!("Failed to send delete position index command: {e}")
1081            })?;
1082        }
1083
1084        Ok(())
1085    }
1086
1087    fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
1088        todo!()
1089    }
1090
1091    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
1092        todo!()
1093    }
1094
1095    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
1096        todo!()
1097    }
1098
1099    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
1100        todo!()
1101    }
1102
1103    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
1104        todo!()
1105    }
1106
1107    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1108        todo!()
1109    }
1110
1111    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
1112        todo!()
1113    }
1114
1115    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
1116        todo!()
1117    }
1118
1119    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
1120        todo!()
1121    }
1122
1123    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
1124        todo!()
1125    }
1126
1127    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
1128        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1129    }
1130
1131    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
1132        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1133    }
1134
1135    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
1136        anyhow::bail!("Loading quote data for Redis cache adapter not supported")
1137    }
1138
1139    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
1140        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1141    }
1142
1143    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
1144        anyhow::bail!("Loading market data for Redis cache adapter not supported")
1145    }
1146
1147    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
1148        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1149    }
1150
1151    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
1152        anyhow::bail!("Loading market data for Redis cache adapter not supported")
1153    }
1154
1155    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
1156        anyhow::bail!("Saving signals for Redis cache adapter not supported")
1157    }
1158
1159    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
1160        anyhow::bail!("Loading signals from Redis cache adapter not supported")
1161    }
1162
1163    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
1164        anyhow::bail!("Saving custom data for Redis cache adapter not supported")
1165    }
1166
1167    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
1168        anyhow::bail!("Loading custom data from Redis cache adapter not supported")
1169    }
1170
1171    fn load_order_snapshot(
1172        &self,
1173        client_order_id: &ClientOrderId,
1174    ) -> anyhow::Result<Option<OrderSnapshot>> {
1175        anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
1176    }
1177
1178    fn load_position_snapshot(
1179        &self,
1180        position_id: &PositionId,
1181    ) -> anyhow::Result<Option<PositionSnapshot>> {
1182        anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
1183    }
1184
1185    fn index_venue_order_id(
1186        &self,
1187        client_order_id: ClientOrderId,
1188        venue_order_id: VenueOrderId,
1189    ) -> anyhow::Result<()> {
1190        todo!()
1191    }
1192
1193    fn index_order_position(
1194        &self,
1195        client_order_id: ClientOrderId,
1196        position_id: PositionId,
1197    ) -> anyhow::Result<()> {
1198        todo!()
1199    }
1200
1201    fn update_actor(&self) -> anyhow::Result<()> {
1202        todo!()
1203    }
1204
1205    fn update_strategy(&self) -> anyhow::Result<()> {
1206        todo!()
1207    }
1208
1209    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1210        todo!()
1211    }
1212
1213    fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
1214        todo!()
1215    }
1216
1217    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
1218        todo!()
1219    }
1220
1221    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1222        todo!()
1223    }
1224
1225    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
1226        todo!()
1227    }
1228
1229    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
1230        todo!()
1231    }
1232}
1233
1234////////////////////////////////////////////////////////////////////////////////
1235// Tests
1236////////////////////////////////////////////////////////////////////////////////
1237#[cfg(test)]
1238mod tests {
1239    use rstest::rstest;
1240
1241    use super::*;
1242
1243    #[rstest]
1244    fn test_get_trader_key_with_prefix_and_instance_id() {
1245        let trader_id = TraderId::from("tester-123");
1246        let instance_id = UUID4::new();
1247        let config = CacheConfig {
1248            use_instance_id: true,
1249            ..Default::default()
1250        };
1251
1252        let key = get_trader_key(trader_id, instance_id, &config);
1253        assert!(key.starts_with("trader-tester-123:"));
1254        assert!(key.ends_with(&instance_id.to_string()));
1255    }
1256
1257    #[rstest]
1258    fn test_get_collection_key_valid() {
1259        let key = "collection:123";
1260        assert_eq!(get_collection_key(key).unwrap(), "collection");
1261    }
1262
1263    #[rstest]
1264    fn test_get_collection_key_invalid() {
1265        let key = "no_delimiter";
1266        assert!(get_collection_key(key).is_err());
1267    }
1268
1269    #[rstest]
1270    fn test_get_index_key_valid() {
1271        let key = "index:123";
1272        assert_eq!(get_index_key(key).unwrap(), "123");
1273    }
1274
1275    #[rstest]
1276    fn test_get_index_key_invalid() {
1277        let key = "no_delimiter";
1278        assert!(get_index_key(key).is_err());
1279    }
1280}