nautilus_infrastructure/redis/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::{HashMap, VecDeque},
18    fmt::Debug,
19    time::{Duration, Instant},
20};
21
22use bytes::Bytes;
23use nautilus_common::{
24    cache::{
25        CacheConfig,
26        database::{CacheDatabaseAdapter, CacheMap},
27    },
28    custom::CustomData,
29    enums::SerializationEncoding,
30    logging::{log_task_awaiting, log_task_started, log_task_stopped},
31    runtime::get_runtime,
32    signal::Signal,
33};
34use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
35use nautilus_cryptography::providers::install_cryptographic_provider;
36use nautilus_model::{
37    accounts::AccountAny,
38    data::{Bar, DataType, QuoteTick, TradeTick},
39    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
40    identifiers::{
41        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
42        TraderId, VenueOrderId,
43    },
44    instruments::{InstrumentAny, SyntheticInstrument},
45    orderbook::OrderBook,
46    orders::OrderAny,
47    position::Position,
48    types::Currency,
49};
50use redis::{Pipeline, aio::ConnectionManager};
51use tokio::try_join;
52use ustr::Ustr;
53
54use super::{REDIS_DELIMITER, REDIS_FLUSHDB};
55use crate::redis::{create_redis_connection, queries::DatabaseQueries};
56
57// Task and connection names
58const CACHE_READ: &str = "cache-read";
59const CACHE_WRITE: &str = "cache-write";
60const CACHE_PROCESS: &str = "cache-process";
61
62// Error constants
63const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
64
65// Collection keys
66const INDEX: &str = "index";
67const GENERAL: &str = "general";
68const CURRENCIES: &str = "currencies";
69const INSTRUMENTS: &str = "instruments";
70const SYNTHETICS: &str = "synthetics";
71const ACCOUNTS: &str = "accounts";
72const ORDERS: &str = "orders";
73const POSITIONS: &str = "positions";
74const ACTORS: &str = "actors";
75const STRATEGIES: &str = "strategies";
76const SNAPSHOTS: &str = "snapshots";
77const HEALTH: &str = "health";
78
79// Index keys
80const INDEX_ORDER_IDS: &str = "index:order_ids";
81const INDEX_ORDER_POSITION: &str = "index:order_position";
82const INDEX_ORDER_CLIENT: &str = "index:order_client";
83const INDEX_ORDERS: &str = "index:orders";
84const INDEX_ORDERS_OPEN: &str = "index:orders_open";
85const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
86const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
87const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
88const INDEX_POSITIONS: &str = "index:positions";
89const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
90const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
91
92/// A type of database operation.
93#[derive(Clone, Debug)]
94pub enum DatabaseOperation {
95    Insert,
96    Update,
97    Delete,
98    DeleteFromList,
99    Close,
100}
101
102/// Represents a database command to be performed which may be executed in a task.
103#[derive(Clone, Debug)]
104pub struct DatabaseCommand {
105    /// The database operation type.
106    pub op_type: DatabaseOperation,
107    /// The primary key for the operation.
108    pub key: Option<String>,
109    /// The data payload for the operation.
110    pub payload: Option<Vec<Bytes>>,
111}
112
113impl DatabaseCommand {
114    /// Creates a new [`DatabaseCommand`] instance.
115    #[must_use]
116    pub const fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
117        Self {
118            op_type,
119            key: Some(key),
120            payload,
121        }
122    }
123
124    /// Initialize a `Close` database command, this is meant to close the database cache channel.
125    #[must_use]
126    pub const fn close() -> Self {
127        Self {
128            op_type: DatabaseOperation::Close,
129            key: None,
130            payload: None,
131        }
132    }
133}
134
135#[cfg_attr(
136    feature = "python",
137    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
138)]
139pub struct RedisCacheDatabase {
140    pub con: ConnectionManager,
141    pub trader_id: TraderId,
142    pub trader_key: String,
143    pub encoding: SerializationEncoding,
144    tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
145    handle: tokio::task::JoinHandle<()>,
146}
147
148impl Debug for RedisCacheDatabase {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct(stringify!(RedisCacheDatabase))
151            .field("trader_id", &self.trader_id)
152            .field("encoding", &self.encoding)
153            .finish()
154    }
155}
156
157impl RedisCacheDatabase {
158    /// Creates a new [`RedisCacheDatabase`] instance for the given `trader_id`, `instance_id`, and `config`.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if:
163    /// - The database configuration is missing in `config`.
164    /// - Establishing the Redis connection fails.
165    /// - The command processing task cannot be spawned.
166    pub async fn new(
167        trader_id: TraderId,
168        instance_id: UUID4,
169        config: CacheConfig,
170    ) -> anyhow::Result<Self> {
171        install_cryptographic_provider();
172
173        let db_config = config
174            .database
175            .as_ref()
176            .ok_or_else(|| anyhow::anyhow!("No database config"))?;
177        let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
178
179        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
180        let trader_key = get_trader_key(trader_id, instance_id, &config);
181        let trader_key_clone = trader_key.clone();
182        let encoding = config.encoding;
183        let handle = get_runtime().spawn(async move {
184            if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
185                log::error!("Error in task '{CACHE_PROCESS}': {e}");
186            }
187        });
188
189        Ok(Self {
190            con,
191            trader_id,
192            trader_key,
193            encoding,
194            tx,
195            handle,
196        })
197    }
198
199    #[must_use]
200    pub const fn get_encoding(&self) -> SerializationEncoding {
201        self.encoding
202    }
203
204    #[must_use]
205    pub fn get_trader_key(&self) -> &str {
206        &self.trader_key
207    }
208
209    pub fn close(&mut self) {
210        log::debug!("Closing");
211
212        if let Err(e) = self.tx.send(DatabaseCommand::close()) {
213            log::debug!("Error sending close command: {e:?}");
214        }
215
216        log_task_awaiting(CACHE_PROCESS);
217
218        tokio::task::block_in_place(|| {
219            if let Err(e) = get_runtime().block_on(&mut self.handle) {
220                log::error!("Error awaiting task '{CACHE_PROCESS}': {e:?}");
221            }
222        });
223
224        log::debug!("Closed");
225    }
226
227    pub async fn flushdb(&mut self) {
228        if let Err(e) = redis::cmd(REDIS_FLUSHDB)
229            .query_async::<()>(&mut self.con)
230            .await
231        {
232            log::error!("Failed to flush database: {e:?}");
233        }
234    }
235
236    /// Retrieves all keys matching the given `pattern` from Redis for this trader.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if the underlying Redis scan operation fails.
241    pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
242        let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
243        log::debug!("Querying keys: {pattern}");
244        DatabaseQueries::scan_keys(&mut self.con, pattern).await
245    }
246
247    /// Reads the value(s) associated with `key` for this trader from Redis.
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if the underlying Redis read operation fails.
252    pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
253        DatabaseQueries::read(&self.con, &self.trader_key, key).await
254    }
255
256    /// Sends an insert command for `key` with optional `payload` to Redis via the background task.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if the command cannot be sent to the background task channel.
261    pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
262        let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
263        match self.tx.send(op) {
264            Ok(()) => Ok(()),
265            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
266        }
267    }
268
269    /// Sends an update command for `key` with optional `payload` to Redis via the background task.
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if the command cannot be sent to the background task channel.
274    pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
275        let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
276        match self.tx.send(op) {
277            Ok(()) => Ok(()),
278            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
279        }
280    }
281
282    /// Sends a delete command for `key` with optional `payload` to Redis via the background task.
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if the command cannot be sent to the background task channel.
287    pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
288        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
289        match self.tx.send(op) {
290            Ok(()) => Ok(()),
291            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
292        }
293    }
294
295    /// Delete the given order from the database with comprehensive index cleanup.
296    ///
297    /// # Errors
298    ///
299    /// Returns an error if the command cannot be sent to the background task channel.
300    pub fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
301        let order_id_bytes = Bytes::from(client_order_id.to_string());
302
303        log::debug!("Deleting order: {client_order_id} from Redis");
304        log::debug!("Trader key: {}", self.trader_key);
305
306        // Delete the order itself
307        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
308        log::debug!("Deleting order key: {key}");
309        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
310        self.tx
311            .send(op)
312            .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
313
314        // Delete from all order indexes
315        let index_keys = [
316            INDEX_ORDER_IDS,
317            INDEX_ORDERS,
318            INDEX_ORDERS_OPEN,
319            INDEX_ORDERS_CLOSED,
320            INDEX_ORDERS_EMULATED,
321            INDEX_ORDERS_INFLIGHT,
322        ];
323
324        for index_key in &index_keys {
325            let key = index_key.to_string();
326            log::debug!("Deleting from index: {key} (order_id: {client_order_id})");
327            let payload = vec![order_id_bytes.clone()];
328            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
329            self.tx
330                .send(op)
331                .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
332        }
333
334        // Delete from hash indexes
335        let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
336        for index_key in &hash_indexes {
337            let key = index_key.to_string();
338            log::debug!("Deleting from hash index: {key} (order_id: {client_order_id})");
339            let payload = vec![order_id_bytes.clone()];
340            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
341            self.tx.send(op).map_err(|e| {
342                anyhow::anyhow!("Failed to send delete order hash index command: {e}")
343            })?;
344        }
345
346        log::debug!("Sent all delete commands for order: {client_order_id}");
347        Ok(())
348    }
349
350    /// Delete the given position from the database with comprehensive index cleanup.
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if the command cannot be sent to the background task channel.
355    pub fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
356        let position_id_bytes = Bytes::from(position_id.to_string());
357
358        log::debug!("Deleting position: {position_id} from Redis");
359        log::debug!("Trader key: {}", self.trader_key);
360
361        // Delete the position itself
362        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
363        log::debug!("Deleting position key: {key}");
364        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
365        self.tx
366            .send(op)
367            .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
368
369        // Delete from all position indexes
370        let index_keys = [
371            INDEX_POSITIONS,
372            INDEX_POSITIONS_OPEN,
373            INDEX_POSITIONS_CLOSED,
374        ];
375
376        for index_key in &index_keys {
377            let key = index_key.to_string();
378            log::debug!("Deleting from index: {key} (position_id: {position_id})");
379            let payload = vec![position_id_bytes.clone()];
380            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
381            self.tx.send(op).map_err(|e| {
382                anyhow::anyhow!("Failed to send delete position index command: {e}")
383            })?;
384        }
385
386        log::debug!("Sent all delete commands for position: {position_id}");
387        Ok(())
388    }
389
390    /// Delete the given account event from the database.
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if the command cannot be sent to the background task channel.
395    pub fn delete_account_event(
396        &self,
397        account_id: &AccountId,
398        event_id: &str,
399    ) -> anyhow::Result<()> {
400        log::debug!("Deleting account event: {account_id}:{event_id}");
401        log::debug!("Trader key: {}", self.trader_key);
402
403        let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
404        let payload = vec![Bytes::from(event_id.to_string())];
405        let op = DatabaseCommand::new(DatabaseOperation::DeleteFromList, key, Some(payload));
406        self.tx
407            .send(op)
408            .map_err(|e| anyhow::anyhow!("Failed to send delete account event command: {e}"))
409    }
410}
411
412async fn process_commands(
413    mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
414    trader_key: String,
415    config: CacheConfig,
416) -> anyhow::Result<()> {
417    log_task_started(CACHE_PROCESS);
418
419    let db_config = config
420        .database
421        .as_ref()
422        .ok_or_else(|| anyhow::anyhow!("No database config"))?;
423    let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
424
425    // Buffering
426    let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
427    let mut last_drain = Instant::now();
428    let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
429
430    // Continue to receive and handle messages until channel is hung up
431    loop {
432        if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
433            drain_buffer(&mut con, &trader_key, &mut buffer).await;
434            last_drain = Instant::now();
435        } else if let Some(cmd) = rx.recv().await {
436            tracing::trace!("Received {cmd:?}");
437
438            if matches!(cmd.op_type, DatabaseOperation::Close) {
439                break;
440            }
441            buffer.push_back(cmd);
442        } else {
443            tracing::debug!("Command channel closed");
444            break;
445        }
446    }
447
448    // Drain any remaining messages
449    if !buffer.is_empty() {
450        drain_buffer(&mut con, &trader_key, &mut buffer).await;
451    }
452
453    log_task_stopped(CACHE_PROCESS);
454    Ok(())
455}
456
457async fn drain_buffer(
458    conn: &mut ConnectionManager,
459    trader_key: &str,
460    buffer: &mut VecDeque<DatabaseCommand>,
461) {
462    let mut pipe = redis::pipe();
463    pipe.atomic();
464
465    for msg in buffer.drain(..) {
466        let key = if let Some(key) = msg.key {
467            key
468        } else {
469            log::error!("Null key found for message: {msg:?}");
470            continue;
471        };
472        let collection = match get_collection_key(&key) {
473            Ok(collection) => collection,
474            Err(e) => {
475                tracing::error!("{e}");
476                continue; // Continue to next message
477            }
478        };
479
480        let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
481
482        match msg.op_type {
483            DatabaseOperation::Insert => {
484                if let Some(payload) = msg.payload {
485                    log::debug!("Processing INSERT for collection: {collection}, key: {key}");
486                    if let Err(e) = insert(&mut pipe, collection, &key, payload) {
487                        tracing::error!("{e}");
488                    }
489                } else {
490                    tracing::error!("Null `payload` for `insert`");
491                }
492            }
493            DatabaseOperation::Update => {
494                if let Some(payload) = msg.payload {
495                    log::debug!("Processing UPDATE for collection: {collection}, key: {key}");
496                    if let Err(e) = update(&mut pipe, collection, &key, payload) {
497                        tracing::error!("{e}");
498                    }
499                } else {
500                    tracing::error!("Null `payload` for `update`");
501                }
502            }
503            DatabaseOperation::Delete => {
504                tracing::debug!(
505                    "Processing DELETE for collection: {}, key: {}, payload: {:?}",
506                    collection,
507                    key,
508                    msg.payload.as_ref().map(|p| p.len())
509                );
510                // `payload` can be `None` for a delete operation
511                if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
512                    tracing::error!("{e}");
513                }
514            }
515            DatabaseOperation::DeleteFromList => {
516                log::debug!("Processing DELETE_FROM_LIST for collection: {collection}, key: {key}");
517                // For deleting specific items from Redis lists (TBD if this remains)
518                if let Some(payload) = &msg.payload {
519                    if let Err(e) = delete_from_list(&mut pipe, collection, &key, payload) {
520                        tracing::error!("{e}");
521                    }
522                } else {
523                    tracing::error!("Null `payload` for `delete_from_list`");
524                }
525            }
526            DatabaseOperation::Close => panic!("Close command should not be drained"),
527        }
528    }
529
530    if let Err(e) = pipe.query_async::<()>(conn).await {
531        tracing::error!("{e}");
532    }
533}
534
535fn insert(
536    pipe: &mut Pipeline,
537    collection: &str,
538    key: &str,
539    value: Vec<Bytes>,
540) -> anyhow::Result<()> {
541    check_slice_not_empty(value.as_slice(), stringify!(value))?;
542
543    match collection {
544        INDEX => insert_index(pipe, key, &value),
545        GENERAL => {
546            insert_string(pipe, key, value[0].as_ref());
547            Ok(())
548        }
549        CURRENCIES => {
550            insert_string(pipe, key, value[0].as_ref());
551            Ok(())
552        }
553        INSTRUMENTS => {
554            insert_string(pipe, key, value[0].as_ref());
555            Ok(())
556        }
557        SYNTHETICS => {
558            insert_string(pipe, key, value[0].as_ref());
559            Ok(())
560        }
561        ACCOUNTS => {
562            insert_list(pipe, key, value[0].as_ref());
563            Ok(())
564        }
565        ORDERS => {
566            insert_list(pipe, key, value[0].as_ref());
567            Ok(())
568        }
569        POSITIONS => {
570            insert_list(pipe, key, value[0].as_ref());
571            Ok(())
572        }
573        ACTORS => {
574            insert_string(pipe, key, value[0].as_ref());
575            Ok(())
576        }
577        STRATEGIES => {
578            insert_string(pipe, key, value[0].as_ref());
579            Ok(())
580        }
581        SNAPSHOTS => {
582            insert_list(pipe, key, value[0].as_ref());
583            Ok(())
584        }
585        HEALTH => {
586            insert_string(pipe, key, value[0].as_ref());
587            Ok(())
588        }
589        _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
590    }
591}
592
593fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
594    let index_key = get_index_key(key)?;
595    match index_key {
596        INDEX_ORDER_IDS => {
597            insert_set(pipe, key, value[0].as_ref());
598            Ok(())
599        }
600        INDEX_ORDER_POSITION => {
601            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
602            Ok(())
603        }
604        INDEX_ORDER_CLIENT => {
605            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
606            Ok(())
607        }
608        INDEX_ORDERS => {
609            insert_set(pipe, key, value[0].as_ref());
610            Ok(())
611        }
612        INDEX_ORDERS_OPEN => {
613            insert_set(pipe, key, value[0].as_ref());
614            Ok(())
615        }
616        INDEX_ORDERS_CLOSED => {
617            insert_set(pipe, key, value[0].as_ref());
618            Ok(())
619        }
620        INDEX_ORDERS_EMULATED => {
621            insert_set(pipe, key, value[0].as_ref());
622            Ok(())
623        }
624        INDEX_ORDERS_INFLIGHT => {
625            insert_set(pipe, key, value[0].as_ref());
626            Ok(())
627        }
628        INDEX_POSITIONS => {
629            insert_set(pipe, key, value[0].as_ref());
630            Ok(())
631        }
632        INDEX_POSITIONS_OPEN => {
633            insert_set(pipe, key, value[0].as_ref());
634            Ok(())
635        }
636        INDEX_POSITIONS_CLOSED => {
637            insert_set(pipe, key, value[0].as_ref());
638            Ok(())
639        }
640        _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
641    }
642}
643
644fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
645    pipe.set(key, value);
646}
647
648fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
649    pipe.sadd(key, value);
650}
651
652fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
653    pipe.hset(key, name, value);
654}
655
656fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
657    pipe.rpush(key, value);
658}
659
660fn update(
661    pipe: &mut Pipeline,
662    collection: &str,
663    key: &str,
664    value: Vec<Bytes>,
665) -> anyhow::Result<()> {
666    check_slice_not_empty(value.as_slice(), stringify!(value))?;
667
668    match collection {
669        ACCOUNTS => {
670            update_list(pipe, key, value[0].as_ref());
671            Ok(())
672        }
673        ORDERS => {
674            update_list(pipe, key, value[0].as_ref());
675            Ok(())
676        }
677        POSITIONS => {
678            update_list(pipe, key, value[0].as_ref());
679            Ok(())
680        }
681        _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
682    }
683}
684
685fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
686    pipe.rpush_exists(key, value);
687}
688
689fn delete(
690    pipe: &mut Pipeline,
691    collection: &str,
692    key: &str,
693    value: Option<Vec<Bytes>>,
694) -> anyhow::Result<()> {
695    tracing::debug!(
696        "delete: collection={}, key={}, has_payload={}",
697        collection,
698        key,
699        value.is_some()
700    );
701
702    match collection {
703        INDEX => delete_from_index(pipe, key, value),
704        ORDERS => {
705            delete_string(pipe, key);
706            Ok(())
707        }
708        POSITIONS => {
709            delete_string(pipe, key);
710            Ok(())
711        }
712        ACCOUNTS => {
713            delete_string(pipe, key);
714            Ok(())
715        }
716        ACTORS => {
717            delete_string(pipe, key);
718            Ok(())
719        }
720        STRATEGIES => {
721            delete_string(pipe, key);
722            Ok(())
723        }
724        _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
725    }
726}
727
728fn delete_from_index(
729    pipe: &mut Pipeline,
730    key: &str,
731    value: Option<Vec<Bytes>>,
732) -> anyhow::Result<()> {
733    let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
734    let index_key = get_index_key(key)?;
735
736    match index_key {
737        INDEX_ORDER_IDS => {
738            remove_from_set(pipe, key, value[0].as_ref());
739            Ok(())
740        }
741        INDEX_ORDER_POSITION => {
742            remove_from_hash(pipe, key, value[0].as_ref());
743            Ok(())
744        }
745        INDEX_ORDER_CLIENT => {
746            remove_from_hash(pipe, key, value[0].as_ref());
747            Ok(())
748        }
749        INDEX_ORDERS => {
750            remove_from_set(pipe, key, value[0].as_ref());
751            Ok(())
752        }
753        INDEX_ORDERS_OPEN => {
754            remove_from_set(pipe, key, value[0].as_ref());
755            Ok(())
756        }
757        INDEX_ORDERS_CLOSED => {
758            remove_from_set(pipe, key, value[0].as_ref());
759            Ok(())
760        }
761        INDEX_ORDERS_EMULATED => {
762            remove_from_set(pipe, key, value[0].as_ref());
763            Ok(())
764        }
765        INDEX_ORDERS_INFLIGHT => {
766            remove_from_set(pipe, key, value[0].as_ref());
767            Ok(())
768        }
769        INDEX_POSITIONS => {
770            remove_from_set(pipe, key, value[0].as_ref());
771            Ok(())
772        }
773        INDEX_POSITIONS_OPEN => {
774            remove_from_set(pipe, key, value[0].as_ref());
775            Ok(())
776        }
777        INDEX_POSITIONS_CLOSED => {
778            remove_from_set(pipe, key, value[0].as_ref());
779            Ok(())
780        }
781        _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
782    }
783}
784
785fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
786    pipe.srem(key, member);
787}
788
789fn remove_from_hash(pipe: &mut Pipeline, key: &str, field: &[u8]) {
790    pipe.hdel(key, field);
791}
792
793fn delete_string(pipe: &mut Pipeline, key: &str) {
794    pipe.del(key);
795}
796
797fn delete_from_list(
798    pipe: &mut Pipeline,
799    collection: &str,
800    key: &str,
801    payload: &[Bytes],
802) -> anyhow::Result<()> {
803    match collection {
804        ACCOUNTS => {
805            // payload[0] contains the event_id as a string
806            let event_id = std::str::from_utf8(&payload[0])?;
807
808            // The Python layer has already determined this event is safe to delete
809            // (it's not the last event). We can safely remove it from the Redis list.
810            // Since account events are stored as serialized objects, we need to find
811            // and remove items that contain the event_id within their serialized data.
812            let lua_script = r#"
813                local key = KEYS[1]
814                local event_id = ARGV[1]
815                local removed_count = 0
816
817                -- Check if the key exists first
818                if redis.call('EXISTS', key) == 0 then
819                    return 0  -- Nothing to delete
820                end
821
822                local items = redis.call('LRANGE', key, 0, -1)
823                redis.call('DEL', key)
824
825                for i, item in ipairs(items) do
826                    if not string.find(item, event_id, 1, true) then
827                        redis.call('RPUSH', key, item)
828                    else
829                        removed_count = removed_count + 1
830                    end
831                end
832
833                return removed_count
834            "#;
835
836            pipe.cmd("EVAL")
837                .arg(lua_script)
838                .arg(1)
839                .arg(key)
840                .arg(event_id);
841            Ok(())
842        }
843        _ => {
844            anyhow::bail!("Unsupported operation: `delete_from_list` for collection '{collection}'")
845        }
846    }
847}
848
849fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
850    let mut key = String::new();
851
852    if config.use_trader_prefix {
853        key.push_str("trader-");
854    }
855
856    key.push_str(trader_id.as_str());
857
858    if config.use_instance_id {
859        key.push(REDIS_DELIMITER);
860        key.push_str(&format!("{instance_id}"));
861    }
862
863    key
864}
865
866fn get_collection_key(key: &str) -> anyhow::Result<&str> {
867    key.split_once(REDIS_DELIMITER)
868        .map(|(collection, _)| collection)
869        .ok_or_else(|| {
870            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
871        })
872}
873
874fn get_index_key(key: &str) -> anyhow::Result<&str> {
875    key.split_once(REDIS_DELIMITER)
876        .map(|(_, index_key)| index_key)
877        .ok_or_else(|| {
878            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
879        })
880}
881
882#[allow(dead_code)] // Under development
883#[derive(Debug)]
884pub struct RedisCacheDatabaseAdapter {
885    pub encoding: SerializationEncoding,
886    pub database: RedisCacheDatabase,
887}
888
889#[allow(dead_code)] // Under development
890#[allow(unused)] // Under development
891#[async_trait::async_trait]
892impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
893    fn close(&mut self) -> anyhow::Result<()> {
894        self.database.close();
895        Ok(())
896    }
897
898    fn flush(&mut self) -> anyhow::Result<()> {
899        self.database.flushdb();
900        Ok(())
901    }
902
903    async fn load_all(&self) -> anyhow::Result<CacheMap> {
904        tracing::debug!("Loading all data");
905
906        let (
907            currencies,
908            instruments,
909            synthetics,
910            accounts,
911            orders,
912            positions,
913            greeks,
914            yield_curves,
915        ) = try_join!(
916            self.load_currencies(),
917            self.load_instruments(),
918            self.load_synthetics(),
919            self.load_accounts(),
920            self.load_orders(),
921            self.load_positions(),
922            self.load_greeks(),
923            self.load_yield_curves()
924        )
925        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
926
927        Ok(CacheMap {
928            currencies,
929            instruments,
930            synthetics,
931            accounts,
932            orders,
933            positions,
934            greeks,
935            yield_curves,
936        })
937    }
938
939    fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
940        // self.database.load()
941        Ok(HashMap::new()) // TODO
942    }
943
944    async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>> {
945        DatabaseQueries::load_currencies(
946            &self.database.con,
947            &self.database.trader_key,
948            self.encoding,
949        )
950        .await
951    }
952
953    async fn load_instruments(&self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
954        DatabaseQueries::load_instruments(
955            &self.database.con,
956            &self.database.trader_key,
957            self.encoding,
958        )
959        .await
960    }
961
962    async fn load_synthetics(&self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
963        DatabaseQueries::load_synthetics(
964            &self.database.con,
965            &self.database.trader_key,
966            self.encoding,
967        )
968        .await
969    }
970
971    async fn load_accounts(&self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
972        DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
973            .await
974    }
975
976    async fn load_orders(&self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
977        DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
978            .await
979    }
980
981    async fn load_positions(&self) -> anyhow::Result<HashMap<PositionId, Position>> {
982        DatabaseQueries::load_positions(
983            &self.database.con,
984            &self.database.trader_key,
985            self.encoding,
986        )
987        .await
988    }
989
990    fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
991        todo!()
992    }
993
994    fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
995        todo!()
996    }
997
998    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
999        DatabaseQueries::load_currency(
1000            &self.database.con,
1001            &self.database.trader_key,
1002            code,
1003            self.encoding,
1004        )
1005        .await
1006    }
1007
1008    async fn load_instrument(
1009        &self,
1010        instrument_id: &InstrumentId,
1011    ) -> anyhow::Result<Option<InstrumentAny>> {
1012        DatabaseQueries::load_instrument(
1013            &self.database.con,
1014            &self.database.trader_key,
1015            instrument_id,
1016            self.encoding,
1017        )
1018        .await
1019    }
1020
1021    async fn load_synthetic(
1022        &self,
1023        instrument_id: &InstrumentId,
1024    ) -> anyhow::Result<Option<SyntheticInstrument>> {
1025        DatabaseQueries::load_synthetic(
1026            &self.database.con,
1027            &self.database.trader_key,
1028            instrument_id,
1029            self.encoding,
1030        )
1031        .await
1032    }
1033
1034    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
1035        DatabaseQueries::load_account(
1036            &self.database.con,
1037            &self.database.trader_key,
1038            account_id,
1039            self.encoding,
1040        )
1041        .await
1042    }
1043
1044    async fn load_order(
1045        &self,
1046        client_order_id: &ClientOrderId,
1047    ) -> anyhow::Result<Option<OrderAny>> {
1048        DatabaseQueries::load_order(
1049            &self.database.con,
1050            &self.database.trader_key,
1051            client_order_id,
1052            self.encoding,
1053        )
1054        .await
1055    }
1056
1057    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
1058        DatabaseQueries::load_position(
1059            &self.database.con,
1060            &self.database.trader_key,
1061            position_id,
1062            self.encoding,
1063        )
1064        .await
1065    }
1066
1067    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
1068        todo!()
1069    }
1070
1071    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
1072        todo!()
1073    }
1074
1075    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
1076        todo!()
1077    }
1078
1079    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
1080        todo!()
1081    }
1082
1083    fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
1084        let order_id_bytes = Bytes::from(client_order_id.to_string());
1085
1086        log::debug!("Deleting order: {client_order_id} from Redis");
1087        log::debug!("Trader key: {}", self.database.trader_key);
1088
1089        // Delete the order itself
1090        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
1091        log::debug!("Deleting order key: {key}");
1092        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1093        self.database
1094            .tx
1095            .send(op)
1096            .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
1097
1098        // Delete from all order indexes
1099        let index_keys = [
1100            INDEX_ORDER_IDS,
1101            INDEX_ORDERS,
1102            INDEX_ORDERS_OPEN,
1103            INDEX_ORDERS_CLOSED,
1104            INDEX_ORDERS_EMULATED,
1105            INDEX_ORDERS_INFLIGHT,
1106        ];
1107
1108        for index_key in &index_keys {
1109            let key = index_key.to_string();
1110            log::debug!("Deleting from index: {key} (order_id: {client_order_id})");
1111            let payload = vec![order_id_bytes.clone()];
1112            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1113            self.database
1114                .tx
1115                .send(op)
1116                .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
1117        }
1118
1119        // Delete from hash indexes
1120        let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
1121        for index_key in &hash_indexes {
1122            let key = index_key.to_string();
1123            log::debug!("Deleting from hash index: {key} (order_id: {client_order_id})");
1124            let payload = vec![order_id_bytes.clone()];
1125            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1126            self.database.tx.send(op).map_err(|e| {
1127                anyhow::anyhow!("Failed to send delete order hash index command: {e}")
1128            })?;
1129        }
1130
1131        log::debug!("Sent all delete commands for order: {client_order_id}");
1132        Ok(())
1133    }
1134
1135    fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
1136        let position_id_bytes = Bytes::from(position_id.to_string());
1137
1138        // Delete the position itself
1139        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
1140        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1141        self.database
1142            .tx
1143            .send(op)
1144            .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
1145
1146        // Delete from all position indexes
1147        let index_keys = [
1148            INDEX_POSITIONS,
1149            INDEX_POSITIONS_OPEN,
1150            INDEX_POSITIONS_CLOSED,
1151        ];
1152
1153        for index_key in &index_keys {
1154            let key = index_key.to_string();
1155            let payload = vec![position_id_bytes.clone()];
1156            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1157            self.database.tx.send(op).map_err(|e| {
1158                anyhow::anyhow!("Failed to send delete position index command: {e}")
1159            })?;
1160        }
1161
1162        Ok(())
1163    }
1164
1165    fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
1166        let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
1167        let payload = vec![Bytes::from(event_id.to_string())];
1168        let op = DatabaseCommand::new(DatabaseOperation::DeleteFromList, key, Some(payload));
1169        self.database
1170            .tx
1171            .send(op)
1172            .map_err(|e| anyhow::anyhow!("Failed to send delete account event command: {e}"))
1173    }
1174
1175    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
1176        todo!()
1177    }
1178
1179    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
1180        todo!()
1181    }
1182
1183    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
1184        todo!()
1185    }
1186
1187    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
1188        todo!()
1189    }
1190
1191    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1192        todo!()
1193    }
1194
1195    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
1196        todo!()
1197    }
1198
1199    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
1200        todo!()
1201    }
1202
1203    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
1204        todo!()
1205    }
1206
1207    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
1208        todo!()
1209    }
1210
1211    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
1212        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1213    }
1214
1215    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
1216        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1217    }
1218
1219    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
1220        anyhow::bail!("Loading quote data for Redis cache adapter not supported")
1221    }
1222
1223    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
1224        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1225    }
1226
1227    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
1228        anyhow::bail!("Loading market data for Redis cache adapter not supported")
1229    }
1230
1231    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
1232        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1233    }
1234
1235    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
1236        anyhow::bail!("Loading market data for Redis cache adapter not supported")
1237    }
1238
1239    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
1240        anyhow::bail!("Saving signals for Redis cache adapter not supported")
1241    }
1242
1243    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
1244        anyhow::bail!("Loading signals from Redis cache adapter not supported")
1245    }
1246
1247    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
1248        anyhow::bail!("Saving custom data for Redis cache adapter not supported")
1249    }
1250
1251    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
1252        anyhow::bail!("Loading custom data from Redis cache adapter not supported")
1253    }
1254
1255    fn load_order_snapshot(
1256        &self,
1257        client_order_id: &ClientOrderId,
1258    ) -> anyhow::Result<Option<OrderSnapshot>> {
1259        anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
1260    }
1261
1262    fn load_position_snapshot(
1263        &self,
1264        position_id: &PositionId,
1265    ) -> anyhow::Result<Option<PositionSnapshot>> {
1266        anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
1267    }
1268
1269    fn index_venue_order_id(
1270        &self,
1271        client_order_id: ClientOrderId,
1272        venue_order_id: VenueOrderId,
1273    ) -> anyhow::Result<()> {
1274        todo!()
1275    }
1276
1277    fn index_order_position(
1278        &self,
1279        client_order_id: ClientOrderId,
1280        position_id: PositionId,
1281    ) -> anyhow::Result<()> {
1282        todo!()
1283    }
1284
1285    fn update_actor(&self) -> anyhow::Result<()> {
1286        todo!()
1287    }
1288
1289    fn update_strategy(&self) -> anyhow::Result<()> {
1290        todo!()
1291    }
1292
1293    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1294        todo!()
1295    }
1296
1297    fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
1298        todo!()
1299    }
1300
1301    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
1302        todo!()
1303    }
1304
1305    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1306        todo!()
1307    }
1308
1309    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
1310        todo!()
1311    }
1312
1313    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
1314        todo!()
1315    }
1316}
1317
1318////////////////////////////////////////////////////////////////////////////////
1319// Tests
1320////////////////////////////////////////////////////////////////////////////////
1321#[cfg(test)]
1322mod tests {
1323    use rstest::rstest;
1324
1325    use super::*;
1326
1327    #[rstest]
1328    fn test_get_trader_key_with_prefix_and_instance_id() {
1329        let trader_id = TraderId::from("tester-123");
1330        let instance_id = UUID4::new();
1331        let config = CacheConfig {
1332            use_instance_id: true,
1333            ..Default::default()
1334        };
1335
1336        let key = get_trader_key(trader_id, instance_id, &config);
1337        assert!(key.starts_with("trader-tester-123:"));
1338        assert!(key.ends_with(&instance_id.to_string()));
1339    }
1340
1341    #[rstest]
1342    fn test_get_collection_key_valid() {
1343        let key = "collection:123";
1344        assert_eq!(get_collection_key(key).unwrap(), "collection");
1345    }
1346
1347    #[rstest]
1348    fn test_get_collection_key_invalid() {
1349        let key = "no_delimiter";
1350        assert!(get_collection_key(key).is_err());
1351    }
1352
1353    #[rstest]
1354    fn test_get_index_key_valid() {
1355        let key = "index:123";
1356        assert_eq!(get_index_key(key).unwrap(), "123");
1357    }
1358
1359    #[rstest]
1360    fn test_get_index_key_invalid() {
1361        let key = "no_delimiter";
1362        assert!(get_index_key(key).is_err());
1363    }
1364}