Skip to main content

nautilus_infrastructure/redis/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Redis-backed cache database for the system.
17//!
18//! # Architecture
19//!
20//! Uses two Redis connections with distinct roles:
21//! - **READ** (`self.con`): synchronous queries (`keys`, `read`, `load_all`),
22//!   owned by the main struct.
23//! - **WRITE**: owned by a background task on `get_runtime()`, receives
24//!   commands via an unbounded `tokio::sync::mpsc` channel.
25//!
26//! All write operations (`insert`, `update`, `delete`, `flush`) are routed
27//! through the command channel so they execute on the WRITE connection. This
28//! avoids cross-runtime I/O issues since the WRITE connection is always
29//! created on the Nautilus runtime.
30//!
31//! Synchronous callers (`close`, `flushdb_sync`) use `std::sync::mpsc` reply
32//! channels to block until the background task confirms completion. When
33//! called from the Nautilus runtime itself, `block_in_place` is used
34//! automatically to avoid stalling the worker thread.
35
36use std::{
37    collections::VecDeque,
38    fmt::Debug,
39    ops::ControlFlow,
40    pin::Pin,
41    sync::mpsc::{self, SyncSender},
42    time::Duration,
43};
44
45use ahash::AHashMap;
46use bytes::Bytes;
47use nautilus_common::{
48    cache::{
49        CacheConfig,
50        database::{CacheDatabaseAdapter, CacheMap},
51    },
52    custom::CustomData,
53    enums::SerializationEncoding,
54    live::get_runtime,
55    logging::{log_task_awaiting, log_task_started, log_task_stopped},
56    signal::Signal,
57};
58use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
59use nautilus_cryptography::providers::install_cryptographic_provider;
60use nautilus_model::{
61    accounts::AccountAny,
62    data::{Bar, DataType, FundingRateUpdate, QuoteTick, TradeTick},
63    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
64    identifiers::{
65        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
66        TraderId, VenueOrderId,
67    },
68    instruments::{InstrumentAny, SyntheticInstrument},
69    orderbook::OrderBook,
70    orders::OrderAny,
71    position::Position,
72    types::Currency,
73};
74use redis::{Pipeline, aio::ConnectionManager};
75use ustr::Ustr;
76
77use super::{REDIS_DELIMITER, REDIS_FLUSHDB, get_index_key};
78use crate::redis::{create_redis_connection, queries::DatabaseQueries};
79
80// Task and connection names
81const CACHE_READ: &str = "cache-read";
82const CACHE_WRITE: &str = "cache-write";
83const CACHE_PROCESS: &str = "cache-process";
84
85// Error constants
86const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
87
88// Collection keys
89const INDEX: &str = "index";
90const GENERAL: &str = "general";
91const CURRENCIES: &str = "currencies";
92const INSTRUMENTS: &str = "instruments";
93const SYNTHETICS: &str = "synthetics";
94const ACCOUNTS: &str = "accounts";
95const ORDERS: &str = "orders";
96const POSITIONS: &str = "positions";
97const ACTORS: &str = "actors";
98const STRATEGIES: &str = "strategies";
99const SNAPSHOTS: &str = "snapshots";
100const HEALTH: &str = "health";
101
102// Index keys
103const INDEX_ORDER_IDS: &str = "index:order_ids";
104const INDEX_ORDER_POSITION: &str = "index:order_position";
105const INDEX_ORDER_CLIENT: &str = "index:order_client";
106const INDEX_ORDERS: &str = "index:orders";
107const INDEX_ORDERS_OPEN: &str = "index:orders_open";
108const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
109const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
110const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
111const INDEX_POSITIONS: &str = "index:positions";
112const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
113const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
114
115/// A type of database operation.
116#[derive(Clone, Debug)]
117pub enum DatabaseOperation {
118    Insert,
119    Update,
120    Delete,
121    Flush(SyncSender<()>),
122    Close,
123}
124
125/// Represents a database command to be performed which may be executed in a task.
126#[derive(Clone, Debug)]
127pub struct DatabaseCommand {
128    /// The database operation type.
129    pub op_type: DatabaseOperation,
130    /// The primary key for the operation.
131    pub key: Option<String>,
132    /// The data payload for the operation.
133    pub payload: Option<Vec<Bytes>>,
134}
135
136impl DatabaseCommand {
137    /// Creates a new [`DatabaseCommand`] instance.
138    #[must_use]
139    pub const fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
140        Self {
141            op_type,
142            key: Some(key),
143            payload,
144        }
145    }
146
147    /// Initialize a `Close` database command, this is meant to close the database cache channel.
148    #[must_use]
149    pub const fn close() -> Self {
150        Self {
151            op_type: DatabaseOperation::Close,
152            key: None,
153            payload: None,
154        }
155    }
156}
157
158#[cfg_attr(
159    feature = "python",
160    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
161)]
162pub struct RedisCacheDatabase {
163    pub con: ConnectionManager,
164    pub trader_id: TraderId,
165    pub trader_key: String,
166    pub encoding: SerializationEncoding,
167    pub bulk_read_batch_size: Option<usize>,
168    tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
169    handle: Option<tokio::task::JoinHandle<()>>,
170}
171
172impl Debug for RedisCacheDatabase {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        f.debug_struct(stringify!(RedisCacheDatabase))
175            .field("trader_id", &self.trader_id)
176            .field("encoding", &self.encoding)
177            .finish()
178    }
179}
180
181impl RedisCacheDatabase {
182    /// Creates a new [`RedisCacheDatabase`] instance for the given `trader_id`, `instance_id`, and `config`.
183    ///
184    /// # Errors
185    ///
186    /// Returns an error if:
187    /// - The database configuration is missing in `config`.
188    /// - Establishing the Redis connection fails.
189    /// - The command processing task cannot be spawned.
190    pub async fn new(
191        trader_id: TraderId,
192        instance_id: UUID4,
193        config: CacheConfig,
194    ) -> anyhow::Result<Self> {
195        install_cryptographic_provider();
196
197        let db_config = config
198            .database
199            .as_ref()
200            .ok_or_else(|| anyhow::anyhow!("No database config"))?;
201        let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
202
203        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
204        let trader_key = get_trader_key(trader_id, instance_id, &config);
205        let trader_key_clone = trader_key.clone();
206        let encoding = config.encoding;
207        let bulk_read_batch_size = config.bulk_read_batch_size;
208
209        let handle = get_runtime().spawn(async move {
210            if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
211                log::error!("Error in task '{CACHE_PROCESS}': {e}");
212            }
213        });
214
215        Ok(Self {
216            con,
217            trader_id,
218            trader_key,
219            encoding,
220            bulk_read_batch_size,
221            tx,
222            handle: Some(handle),
223        })
224    }
225
226    #[must_use]
227    pub const fn get_encoding(&self) -> SerializationEncoding {
228        self.encoding
229    }
230
231    #[must_use]
232    pub fn get_trader_key(&self) -> &str {
233        &self.trader_key
234    }
235
236    pub fn close(&mut self) {
237        log::debug!("Closing");
238
239        let Some(handle) = self.handle.take() else {
240            log::debug!("Already closed");
241            return;
242        };
243
244        if let Err(e) = self.tx.send(DatabaseCommand::close()) {
245            log::debug!("Error sending close command: {e:?}");
246        }
247
248        log_task_awaiting(CACHE_PROCESS);
249
250        let (tx, rx) = mpsc::sync_channel(1);
251
252        get_runtime().spawn(async move {
253            if let Err(e) = handle.await {
254                log::error!("Error awaiting task '{CACHE_PROCESS}': {e:?}");
255            }
256            let _ = tx.send(());
257        });
258        let _ = blocking_recv(&rx);
259
260        log::debug!("Closed");
261    }
262
263    pub async fn flushdb(&mut self) {
264        if let Err(e) = redis::cmd(REDIS_FLUSHDB)
265            .query_async::<()>(&mut self.con)
266            .await
267        {
268            log::error!("Failed to flush database: {e:?}");
269        }
270    }
271
272    /// Sends a flush command through the background task channel and blocks
273    /// until it completes. Safe to call from any runtime context.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if the command channel is closed or the reply is lost.
278    pub fn flushdb_sync(&self) -> anyhow::Result<()> {
279        let (reply_tx, reply_rx) = mpsc::sync_channel(1);
280        let cmd = DatabaseCommand {
281            op_type: DatabaseOperation::Flush(reply_tx),
282            key: None,
283            payload: None,
284        };
285        self.tx
286            .send(cmd)
287            .map_err(|e| anyhow::anyhow!("{FAILED_TX_CHANNEL}: {e}"))?;
288        blocking_recv(&reply_rx).map_err(|e| anyhow::anyhow!("Failed to flush database: {e}"))?;
289        Ok(())
290    }
291
292    /// Retrieves all keys matching the given `pattern` from Redis for this trader.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if the underlying Redis scan operation fails.
297    pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
298        let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
299        DatabaseQueries::scan_keys(&mut self.con, pattern).await
300    }
301
302    /// Reads the value(s) associated with `key` for this trader from Redis.
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if the underlying Redis read operation fails.
307    pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
308        DatabaseQueries::read(&self.con, &self.trader_key, key).await
309    }
310
311    /// Reads multiple values using bulk operations for efficiency.
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if the underlying Redis read operation fails.
316    pub async fn read_bulk(&mut self, keys: &[String]) -> anyhow::Result<Vec<Option<Bytes>>> {
317        match self.bulk_read_batch_size {
318            Some(batch_size) => {
319                DatabaseQueries::read_bulk_batched(&self.con, keys, batch_size).await
320            }
321            None => DatabaseQueries::read_bulk(&self.con, keys).await,
322        }
323    }
324
325    /// Sends an insert command for `key` with optional `payload` to Redis via the background task.
326    ///
327    /// # Errors
328    ///
329    /// Returns an error if the command cannot be sent to the background task channel.
330    pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
331        let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
332        match self.tx.send(op) {
333            Ok(()) => Ok(()),
334            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
335        }
336    }
337
338    /// Sends an update command for `key` with optional `payload` to Redis via the background task.
339    ///
340    /// # Errors
341    ///
342    /// Returns an error if the command cannot be sent to the background task channel.
343    pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
344        let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
345        match self.tx.send(op) {
346            Ok(()) => Ok(()),
347            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
348        }
349    }
350
351    /// Sends a delete command for `key` with optional `payload` to Redis via the background task.
352    ///
353    /// # Errors
354    ///
355    /// Returns an error if the command cannot be sent to the background task channel.
356    pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
357        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
358        match self.tx.send(op) {
359            Ok(()) => Ok(()),
360            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
361        }
362    }
363
364    /// Delete the given order from the database with comprehensive index cleanup.
365    ///
366    /// # Errors
367    ///
368    /// Returns an error if the command cannot be sent to the background task channel.
369    pub fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
370        let order_id_bytes = Bytes::from(client_order_id.to_string());
371
372        // Delete the order itself
373        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
374        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
375        self.tx
376            .send(op)
377            .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
378
379        // Delete from all order indexes
380        let index_keys = [
381            INDEX_ORDER_IDS,
382            INDEX_ORDERS,
383            INDEX_ORDERS_OPEN,
384            INDEX_ORDERS_CLOSED,
385            INDEX_ORDERS_EMULATED,
386            INDEX_ORDERS_INFLIGHT,
387        ];
388
389        for index_key in &index_keys {
390            let key = (*index_key).to_string();
391            let payload = vec![order_id_bytes.clone()];
392            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
393            self.tx
394                .send(op)
395                .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
396        }
397
398        // Delete from hash indexes
399        let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
400        for index_key in &hash_indexes {
401            let key = (*index_key).to_string();
402            let payload = vec![order_id_bytes.clone()];
403            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
404            self.tx.send(op).map_err(|e| {
405                anyhow::anyhow!("Failed to send delete order hash index command: {e}")
406            })?;
407        }
408
409        Ok(())
410    }
411
412    /// Delete the given position from the database with comprehensive index cleanup.
413    ///
414    /// # Errors
415    ///
416    /// Returns an error if the command cannot be sent to the background task channel.
417    pub fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
418        let position_id_bytes = Bytes::from(position_id.to_string());
419
420        // Delete the position itself
421        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
422        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
423        self.tx
424            .send(op)
425            .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
426
427        // Delete from all position indexes
428        let index_keys = [
429            INDEX_POSITIONS,
430            INDEX_POSITIONS_OPEN,
431            INDEX_POSITIONS_CLOSED,
432        ];
433
434        for index_key in &index_keys {
435            let key = (*index_key).to_string();
436            let payload = vec![position_id_bytes.clone()];
437            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
438            self.tx.send(op).map_err(|e| {
439                anyhow::anyhow!("Failed to send delete position index command: {e}")
440            })?;
441        }
442
443        Ok(())
444    }
445
446    /// Delete the given account event from the database.
447    ///
448    /// # Errors
449    ///
450    /// Returns an error if the command cannot be sent to the background task channel.
451    pub fn delete_account_event(
452        &self,
453        _account_id: &AccountId,
454        _event_id: &str,
455    ) -> anyhow::Result<()> {
456        log::warn!("Deleting account events currently a no-op (pending redesign)");
457        Ok(())
458    }
459}
460
461fn blocking_recv<T>(rx: &mpsc::Receiver<T>) -> Result<T, mpsc::RecvError> {
462    let on_nautilus_runtime = tokio::runtime::Handle::try_current()
463        .ok()
464        .is_some_and(|h| h.id() == get_runtime().handle().id());
465
466    if on_nautilus_runtime {
467        tokio::task::block_in_place(|| rx.recv())
468    } else {
469        rx.recv()
470    }
471}
472
473async fn process_commands(
474    mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
475    trader_key: String,
476    config: CacheConfig,
477) -> anyhow::Result<()> {
478    log_task_started(CACHE_PROCESS);
479
480    let db_config = config
481        .database
482        .as_ref()
483        .ok_or_else(|| anyhow::anyhow!("No database config"))?;
484    let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
485
486    // Buffering
487    let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
488    let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
489
490    // A sleep used to trigger periodic flushing of the buffer.
491    // When `buffer_interval` is zero we skip using the timer and flush immediately
492    // after every message.
493    let flush_timer = tokio::time::sleep(buffer_interval);
494    tokio::pin!(flush_timer);
495
496    // Continue to receive and handle messages until channel is hung up
497    loop {
498        tokio::select! {
499            maybe_cmd = rx.recv() => {
500                let result = handle_command(
501                    maybe_cmd,
502                    &mut buffer,
503                    buffer_interval,
504                    &mut con,
505                    &trader_key,
506                ).await;
507                if result.is_break() {
508                    break;
509                }
510            }
511            () = &mut flush_timer, if !buffer_interval.is_zero() => {
512                flush_buffer(&mut buffer, &mut con, &trader_key, &mut flush_timer, buffer_interval).await;
513            }
514        }
515    }
516
517    // Drain any remaining messages
518    if !buffer.is_empty() {
519        drain_buffer(&mut con, &trader_key, &mut buffer).await;
520    }
521
522    log_task_stopped(CACHE_PROCESS);
523    Ok(())
524}
525
526async fn handle_command(
527    maybe_cmd: Option<DatabaseCommand>,
528    buffer: &mut VecDeque<DatabaseCommand>,
529    buffer_interval: Duration,
530    con: &mut ConnectionManager,
531    trader_key: &str,
532) -> ControlFlow<()> {
533    let Some(cmd) = maybe_cmd else {
534        log::debug!("Command channel closed");
535        return ControlFlow::Break(());
536    };
537
538    log::trace!("Received {cmd:?}");
539
540    match cmd.op_type {
541        DatabaseOperation::Close => {
542            if !buffer.is_empty() {
543                drain_buffer(con, trader_key, buffer).await;
544            }
545            return ControlFlow::Break(());
546        }
547        DatabaseOperation::Flush(reply_tx) => {
548            if !buffer.is_empty() {
549                drain_buffer(con, trader_key, buffer).await;
550            }
551            if let Err(e) = redis::cmd(REDIS_FLUSHDB).query_async::<()>(con).await {
552                log::error!("Failed to flush database: {e:?}");
553            }
554            let _ = reply_tx.send(());
555            return ControlFlow::Continue(());
556        }
557        _ => {}
558    }
559
560    buffer.push_back(cmd);
561
562    if buffer_interval.is_zero() {
563        drain_buffer(con, trader_key, buffer).await;
564    }
565
566    ControlFlow::Continue(())
567}
568
569async fn flush_buffer(
570    buffer: &mut VecDeque<DatabaseCommand>,
571    con: &mut ConnectionManager,
572    trader_key: &str,
573    flush_timer: &mut Pin<&mut tokio::time::Sleep>,
574    buffer_interval: Duration,
575) {
576    if !buffer.is_empty() {
577        drain_buffer(con, trader_key, buffer).await;
578    }
579    flush_timer
580        .as_mut()
581        .reset(tokio::time::Instant::now() + buffer_interval);
582}
583
584async fn drain_buffer(
585    conn: &mut ConnectionManager,
586    trader_key: &str,
587    buffer: &mut VecDeque<DatabaseCommand>,
588) {
589    let mut pipe = redis::pipe();
590    pipe.atomic();
591
592    for msg in buffer.drain(..) {
593        let key = if let Some(key) = msg.key {
594            key
595        } else {
596            log::error!("Null key found for message: {msg:?}");
597            continue;
598        };
599        let collection = match get_collection_key(&key) {
600            Ok(collection) => collection,
601            Err(e) => {
602                log::error!("{e}");
603                continue; // Continue to next message
604            }
605        };
606
607        let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
608
609        match msg.op_type {
610            DatabaseOperation::Insert => {
611                if let Some(payload) = msg.payload {
612                    log::debug!("Processing INSERT for collection: {collection}, key: {key}");
613                    if let Err(e) = insert(&mut pipe, collection, &key, payload) {
614                        log::error!("{e}");
615                    }
616                } else {
617                    log::error!("Null `payload` for `insert`");
618                }
619            }
620            DatabaseOperation::Update => {
621                if let Some(payload) = msg.payload {
622                    log::debug!("Processing UPDATE for collection: {collection}, key: {key}");
623                    if let Err(e) = update(&mut pipe, collection, &key, payload) {
624                        log::error!("{e}");
625                    }
626                } else {
627                    log::error!("Null `payload` for `update`");
628                }
629            }
630            DatabaseOperation::Delete => {
631                log::debug!(
632                    "Processing DELETE for collection: {}, key: {}, payload: {:?}",
633                    collection,
634                    key,
635                    msg.payload.as_ref().map(std::vec::Vec::len)
636                );
637                // `payload` can be `None` for a delete operation
638                if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
639                    log::error!("{e}");
640                }
641            }
642            DatabaseOperation::Close => panic!("Close command should not be drained"),
643            DatabaseOperation::Flush(_) => panic!("Flush command should not be drained"),
644        }
645    }
646
647    if let Err(e) = pipe.query_async::<()>(conn).await {
648        log::error!("{e}");
649    }
650}
651
652fn insert(
653    pipe: &mut Pipeline,
654    collection: &str,
655    key: &str,
656    value: Vec<Bytes>,
657) -> anyhow::Result<()> {
658    check_slice_not_empty(value.as_slice(), stringify!(value))?;
659
660    match collection {
661        INDEX => insert_index(pipe, key, &value),
662        GENERAL => {
663            insert_string(pipe, key, value[0].as_ref());
664            Ok(())
665        }
666        CURRENCIES => {
667            insert_string(pipe, key, value[0].as_ref());
668            Ok(())
669        }
670        INSTRUMENTS => {
671            insert_string(pipe, key, value[0].as_ref());
672            Ok(())
673        }
674        SYNTHETICS => {
675            insert_string(pipe, key, value[0].as_ref());
676            Ok(())
677        }
678        ACCOUNTS => {
679            insert_list(pipe, key, value[0].as_ref());
680            Ok(())
681        }
682        ORDERS => {
683            insert_list(pipe, key, value[0].as_ref());
684            Ok(())
685        }
686        POSITIONS => {
687            insert_list(pipe, key, value[0].as_ref());
688            Ok(())
689        }
690        ACTORS => {
691            insert_string(pipe, key, value[0].as_ref());
692            Ok(())
693        }
694        STRATEGIES => {
695            insert_string(pipe, key, value[0].as_ref());
696            Ok(())
697        }
698        SNAPSHOTS => {
699            insert_list(pipe, key, value[0].as_ref());
700            Ok(())
701        }
702        HEALTH => {
703            insert_string(pipe, key, value[0].as_ref());
704            Ok(())
705        }
706        _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
707    }
708}
709
710fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
711    let index_key = get_index_key(key)?;
712    match index_key {
713        INDEX_ORDER_IDS => {
714            insert_set(pipe, key, value[0].as_ref());
715            Ok(())
716        }
717        INDEX_ORDER_POSITION => {
718            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
719            Ok(())
720        }
721        INDEX_ORDER_CLIENT => {
722            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
723            Ok(())
724        }
725        INDEX_ORDERS => {
726            insert_set(pipe, key, value[0].as_ref());
727            Ok(())
728        }
729        INDEX_ORDERS_OPEN => {
730            insert_set(pipe, key, value[0].as_ref());
731            Ok(())
732        }
733        INDEX_ORDERS_CLOSED => {
734            insert_set(pipe, key, value[0].as_ref());
735            Ok(())
736        }
737        INDEX_ORDERS_EMULATED => {
738            insert_set(pipe, key, value[0].as_ref());
739            Ok(())
740        }
741        INDEX_ORDERS_INFLIGHT => {
742            insert_set(pipe, key, value[0].as_ref());
743            Ok(())
744        }
745        INDEX_POSITIONS => {
746            insert_set(pipe, key, value[0].as_ref());
747            Ok(())
748        }
749        INDEX_POSITIONS_OPEN => {
750            insert_set(pipe, key, value[0].as_ref());
751            Ok(())
752        }
753        INDEX_POSITIONS_CLOSED => {
754            insert_set(pipe, key, value[0].as_ref());
755            Ok(())
756        }
757        _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
758    }
759}
760
761fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
762    pipe.set(key, value);
763}
764
765fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
766    pipe.sadd(key, value);
767}
768
769fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
770    pipe.hset(key, name, value);
771}
772
773fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
774    pipe.rpush(key, value);
775}
776
777fn update(
778    pipe: &mut Pipeline,
779    collection: &str,
780    key: &str,
781    value: Vec<Bytes>,
782) -> anyhow::Result<()> {
783    check_slice_not_empty(value.as_slice(), stringify!(value))?;
784
785    match collection {
786        ACCOUNTS => {
787            update_list(pipe, key, value[0].as_ref());
788            Ok(())
789        }
790        ORDERS => {
791            update_list(pipe, key, value[0].as_ref());
792            Ok(())
793        }
794        POSITIONS => {
795            update_list(pipe, key, value[0].as_ref());
796            Ok(())
797        }
798        _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
799    }
800}
801
802fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
803    pipe.rpush_exists(key, value);
804}
805
806fn delete(
807    pipe: &mut Pipeline,
808    collection: &str,
809    key: &str,
810    value: Option<Vec<Bytes>>,
811) -> anyhow::Result<()> {
812    log::debug!(
813        "delete: collection={}, key={}, has_payload={}",
814        collection,
815        key,
816        value.is_some()
817    );
818
819    match collection {
820        INDEX => delete_from_index(pipe, key, value),
821        ORDERS => {
822            delete_string(pipe, key);
823            Ok(())
824        }
825        POSITIONS => {
826            delete_string(pipe, key);
827            Ok(())
828        }
829        ACCOUNTS => {
830            delete_string(pipe, key);
831            Ok(())
832        }
833        ACTORS => {
834            delete_string(pipe, key);
835            Ok(())
836        }
837        STRATEGIES => {
838            delete_string(pipe, key);
839            Ok(())
840        }
841        _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
842    }
843}
844
845fn delete_from_index(
846    pipe: &mut Pipeline,
847    key: &str,
848    value: Option<Vec<Bytes>>,
849) -> anyhow::Result<()> {
850    let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
851    let index_key = get_index_key(key)?;
852
853    match index_key {
854        INDEX_ORDER_IDS => {
855            remove_from_set(pipe, key, value[0].as_ref());
856            Ok(())
857        }
858        INDEX_ORDER_POSITION => {
859            remove_from_hash(pipe, key, value[0].as_ref());
860            Ok(())
861        }
862        INDEX_ORDER_CLIENT => {
863            remove_from_hash(pipe, key, value[0].as_ref());
864            Ok(())
865        }
866        INDEX_ORDERS => {
867            remove_from_set(pipe, key, value[0].as_ref());
868            Ok(())
869        }
870        INDEX_ORDERS_OPEN => {
871            remove_from_set(pipe, key, value[0].as_ref());
872            Ok(())
873        }
874        INDEX_ORDERS_CLOSED => {
875            remove_from_set(pipe, key, value[0].as_ref());
876            Ok(())
877        }
878        INDEX_ORDERS_EMULATED => {
879            remove_from_set(pipe, key, value[0].as_ref());
880            Ok(())
881        }
882        INDEX_ORDERS_INFLIGHT => {
883            remove_from_set(pipe, key, value[0].as_ref());
884            Ok(())
885        }
886        INDEX_POSITIONS => {
887            remove_from_set(pipe, key, value[0].as_ref());
888            Ok(())
889        }
890        INDEX_POSITIONS_OPEN => {
891            remove_from_set(pipe, key, value[0].as_ref());
892            Ok(())
893        }
894        INDEX_POSITIONS_CLOSED => {
895            remove_from_set(pipe, key, value[0].as_ref());
896            Ok(())
897        }
898        _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
899    }
900}
901
902fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
903    pipe.srem(key, member);
904}
905
906fn remove_from_hash(pipe: &mut Pipeline, key: &str, field: &[u8]) {
907    pipe.hdel(key, field);
908}
909
910fn delete_string(pipe: &mut Pipeline, key: &str) {
911    pipe.del(key);
912}
913
914fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
915    let mut key = String::new();
916
917    if config.use_trader_prefix {
918        key.push_str("trader-");
919    }
920
921    key.push_str(trader_id.as_str());
922
923    if config.use_instance_id {
924        key.push(REDIS_DELIMITER);
925        key.push_str(&format!("{instance_id}"));
926    }
927
928    key
929}
930
931fn get_collection_key(key: &str) -> anyhow::Result<&str> {
932    key.split_once(REDIS_DELIMITER)
933        .map(|(collection, _)| collection)
934        .ok_or_else(|| {
935            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
936        })
937}
938
939#[allow(dead_code)]
940#[derive(Debug)]
941pub struct RedisCacheDatabaseAdapter {
942    pub encoding: SerializationEncoding,
943    pub database: RedisCacheDatabase,
944}
945
946#[allow(dead_code)]
947#[allow(unused)]
948#[async_trait::async_trait]
949impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
950    fn close(&mut self) -> anyhow::Result<()> {
951        self.database.close();
952        Ok(())
953    }
954
955    fn flush(&mut self) -> anyhow::Result<()> {
956        self.database.flushdb_sync()
957    }
958
959    async fn load_all(&self) -> anyhow::Result<CacheMap> {
960        log::debug!("Loading all data");
961
962        let (
963            currencies,
964            instruments,
965            synthetics,
966            accounts,
967            orders,
968            positions,
969            greeks,
970            yield_curves,
971        ) = tokio::try_join!(
972            self.load_currencies(),
973            self.load_instruments(),
974            self.load_synthetics(),
975            self.load_accounts(),
976            self.load_orders(),
977            self.load_positions(),
978            self.load_greeks(),
979            self.load_yield_curves()
980        )
981        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
982
983        Ok(CacheMap {
984            currencies,
985            instruments,
986            synthetics,
987            accounts,
988            orders,
989            positions,
990            greeks,
991            yield_curves,
992        })
993    }
994
995    fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
996        // self.database.load()
997        Ok(AHashMap::new()) // TODO
998    }
999
1000    async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
1001        DatabaseQueries::load_currencies(
1002            &self.database.con,
1003            &self.database.trader_key,
1004            self.encoding,
1005        )
1006        .await
1007    }
1008
1009    async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
1010        DatabaseQueries::load_instruments(
1011            &self.database.con,
1012            &self.database.trader_key,
1013            self.encoding,
1014        )
1015        .await
1016    }
1017
1018    async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
1019        DatabaseQueries::load_synthetics(
1020            &self.database.con,
1021            &self.database.trader_key,
1022            self.encoding,
1023        )
1024        .await
1025    }
1026
1027    async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
1028        DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
1029            .await
1030    }
1031
1032    async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
1033        DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
1034            .await
1035    }
1036
1037    async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
1038        DatabaseQueries::load_positions(
1039            &self.database.con,
1040            &self.database.trader_key,
1041            self.encoding,
1042        )
1043        .await
1044    }
1045
1046    fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
1047        todo!()
1048    }
1049
1050    fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
1051        todo!()
1052    }
1053
1054    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
1055        DatabaseQueries::load_currency(
1056            &self.database.con,
1057            &self.database.trader_key,
1058            code,
1059            self.encoding,
1060        )
1061        .await
1062    }
1063
1064    async fn load_instrument(
1065        &self,
1066        instrument_id: &InstrumentId,
1067    ) -> anyhow::Result<Option<InstrumentAny>> {
1068        DatabaseQueries::load_instrument(
1069            &self.database.con,
1070            &self.database.trader_key,
1071            instrument_id,
1072            self.encoding,
1073        )
1074        .await
1075    }
1076
1077    async fn load_synthetic(
1078        &self,
1079        instrument_id: &InstrumentId,
1080    ) -> anyhow::Result<Option<SyntheticInstrument>> {
1081        DatabaseQueries::load_synthetic(
1082            &self.database.con,
1083            &self.database.trader_key,
1084            instrument_id,
1085            self.encoding,
1086        )
1087        .await
1088    }
1089
1090    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
1091        DatabaseQueries::load_account(
1092            &self.database.con,
1093            &self.database.trader_key,
1094            account_id,
1095            self.encoding,
1096        )
1097        .await
1098    }
1099
1100    async fn load_order(
1101        &self,
1102        client_order_id: &ClientOrderId,
1103    ) -> anyhow::Result<Option<OrderAny>> {
1104        DatabaseQueries::load_order(
1105            &self.database.con,
1106            &self.database.trader_key,
1107            client_order_id,
1108            self.encoding,
1109        )
1110        .await
1111    }
1112
1113    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
1114        DatabaseQueries::load_position(
1115            &self.database.con,
1116            &self.database.trader_key,
1117            position_id,
1118            self.encoding,
1119        )
1120        .await
1121    }
1122
1123    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
1124        todo!()
1125    }
1126
1127    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
1128        todo!()
1129    }
1130
1131    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
1132        todo!()
1133    }
1134
1135    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
1136        todo!()
1137    }
1138
1139    fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
1140        let order_id_bytes = Bytes::from(client_order_id.to_string());
1141
1142        log::debug!("Deleting order: {client_order_id} from Redis");
1143        log::debug!("Trader key: {}", self.database.trader_key);
1144
1145        // Delete the order itself
1146        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
1147        log::debug!("Deleting order key: {key}");
1148        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1149        self.database
1150            .tx
1151            .send(op)
1152            .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
1153
1154        // Delete from all order indexes
1155        let index_keys = [
1156            INDEX_ORDER_IDS,
1157            INDEX_ORDERS,
1158            INDEX_ORDERS_OPEN,
1159            INDEX_ORDERS_CLOSED,
1160            INDEX_ORDERS_EMULATED,
1161            INDEX_ORDERS_INFLIGHT,
1162        ];
1163
1164        for index_key in &index_keys {
1165            let key = (*index_key).to_string();
1166            log::debug!("Deleting from index: {key} (order_id: {client_order_id})");
1167            let payload = vec![order_id_bytes.clone()];
1168            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1169            self.database
1170                .tx
1171                .send(op)
1172                .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
1173        }
1174
1175        // Delete from hash indexes
1176        let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
1177        for index_key in &hash_indexes {
1178            let key = (*index_key).to_string();
1179            log::debug!("Deleting from hash index: {key} (order_id: {client_order_id})");
1180            let payload = vec![order_id_bytes.clone()];
1181            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1182            self.database.tx.send(op).map_err(|e| {
1183                anyhow::anyhow!("Failed to send delete order hash index command: {e}")
1184            })?;
1185        }
1186
1187        log::debug!("Sent all delete commands for order: {client_order_id}");
1188        Ok(())
1189    }
1190
1191    fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
1192        let position_id_bytes = Bytes::from(position_id.to_string());
1193
1194        // Delete the position itself
1195        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
1196        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1197        self.database
1198            .tx
1199            .send(op)
1200            .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
1201
1202        // Delete from all position indexes
1203        let index_keys = [
1204            INDEX_POSITIONS,
1205            INDEX_POSITIONS_OPEN,
1206            INDEX_POSITIONS_CLOSED,
1207        ];
1208
1209        for index_key in &index_keys {
1210            let key = (*index_key).to_string();
1211            let payload = vec![position_id_bytes.clone()];
1212            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1213            self.database.tx.send(op).map_err(|e| {
1214                anyhow::anyhow!("Failed to send delete position index command: {e}")
1215            })?;
1216        }
1217
1218        Ok(())
1219    }
1220
1221    fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
1222        todo!()
1223    }
1224
1225    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
1226        todo!()
1227    }
1228
1229    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
1230        todo!()
1231    }
1232
1233    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
1234        todo!()
1235    }
1236
1237    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
1238        todo!()
1239    }
1240
1241    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1242        todo!()
1243    }
1244
1245    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
1246        todo!()
1247    }
1248
1249    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
1250        todo!()
1251    }
1252
1253    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
1254        todo!()
1255    }
1256
1257    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
1258        todo!()
1259    }
1260
1261    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
1262        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1263    }
1264
1265    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
1266        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1267    }
1268
1269    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
1270        anyhow::bail!("Loading quote data for Redis cache adapter not supported")
1271    }
1272
1273    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
1274        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1275    }
1276
1277    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
1278        anyhow::bail!("Loading market data for Redis cache adapter not supported")
1279    }
1280
1281    fn add_funding_rate(&self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
1282        anyhow::bail!("Loading market data for Redis cache adapter not supported")
1283    }
1284
1285    fn load_funding_rates(
1286        &self,
1287        instrument_id: &InstrumentId,
1288    ) -> anyhow::Result<Vec<FundingRateUpdate>> {
1289        anyhow::bail!("Loading market data for Redis cache adapter not supported")
1290    }
1291
1292    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
1293        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1294    }
1295
1296    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
1297        anyhow::bail!("Loading market data for Redis cache adapter not supported")
1298    }
1299
1300    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
1301        anyhow::bail!("Saving signals for Redis cache adapter not supported")
1302    }
1303
1304    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
1305        anyhow::bail!("Loading signals from Redis cache adapter not supported")
1306    }
1307
1308    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
1309        anyhow::bail!("Saving custom data for Redis cache adapter not supported")
1310    }
1311
1312    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
1313        anyhow::bail!("Loading custom data from Redis cache adapter not supported")
1314    }
1315
1316    fn load_order_snapshot(
1317        &self,
1318        client_order_id: &ClientOrderId,
1319    ) -> anyhow::Result<Option<OrderSnapshot>> {
1320        anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
1321    }
1322
1323    fn load_position_snapshot(
1324        &self,
1325        position_id: &PositionId,
1326    ) -> anyhow::Result<Option<PositionSnapshot>> {
1327        anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
1328    }
1329
1330    fn index_venue_order_id(
1331        &self,
1332        client_order_id: ClientOrderId,
1333        venue_order_id: VenueOrderId,
1334    ) -> anyhow::Result<()> {
1335        todo!()
1336    }
1337
1338    fn index_order_position(
1339        &self,
1340        client_order_id: ClientOrderId,
1341        position_id: PositionId,
1342    ) -> anyhow::Result<()> {
1343        todo!()
1344    }
1345
1346    fn update_actor(&self) -> anyhow::Result<()> {
1347        todo!()
1348    }
1349
1350    fn update_strategy(&self) -> anyhow::Result<()> {
1351        todo!()
1352    }
1353
1354    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1355        todo!()
1356    }
1357
1358    fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
1359        todo!()
1360    }
1361
1362    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
1363        todo!()
1364    }
1365
1366    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1367        todo!()
1368    }
1369
1370    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
1371        todo!()
1372    }
1373
1374    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
1375        todo!()
1376    }
1377}
1378
1379#[cfg(test)]
1380mod tests {
1381    use rstest::rstest;
1382
1383    use super::*;
1384
1385    #[rstest]
1386    fn test_get_trader_key_with_prefix_and_instance_id() {
1387        let trader_id = TraderId::from("tester-123");
1388        let instance_id = UUID4::new();
1389        let config = CacheConfig {
1390            use_instance_id: true,
1391            ..Default::default()
1392        };
1393
1394        let key = get_trader_key(trader_id, instance_id, &config);
1395        assert!(key.starts_with("trader-tester-123:"));
1396        assert!(key.ends_with(&instance_id.to_string()));
1397    }
1398
1399    #[rstest]
1400    fn test_get_collection_key_valid() {
1401        let key = "collection:123";
1402        assert_eq!(get_collection_key(key).unwrap(), "collection");
1403    }
1404
1405    #[rstest]
1406    fn test_get_collection_key_invalid() {
1407        let key = "no_delimiter";
1408        assert!(get_collection_key(key).is_err());
1409    }
1410}