nautilus_infrastructure/redis/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::VecDeque, fmt::Debug, time::Duration};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use nautilus_common::{
21    cache::{
22        CacheConfig,
23        database::{CacheDatabaseAdapter, CacheMap},
24    },
25    custom::CustomData,
26    enums::SerializationEncoding,
27    logging::{log_task_awaiting, log_task_started, log_task_stopped},
28    runtime::get_runtime,
29    signal::Signal,
30};
31use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
32use nautilus_cryptography::providers::install_cryptographic_provider;
33use nautilus_model::{
34    accounts::AccountAny,
35    data::{Bar, DataType, QuoteTick, TradeTick},
36    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
37    identifiers::{
38        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
39        TraderId, VenueOrderId,
40    },
41    instruments::{InstrumentAny, SyntheticInstrument},
42    orderbook::OrderBook,
43    orders::OrderAny,
44    position::Position,
45    types::Currency,
46};
47use redis::{Pipeline, aio::ConnectionManager};
48use tokio::try_join;
49use ustr::Ustr;
50
51use super::{REDIS_DELIMITER, REDIS_FLUSHDB};
52use crate::redis::{create_redis_connection, queries::DatabaseQueries};
53
54// Task and connection names
55const CACHE_READ: &str = "cache-read";
56const CACHE_WRITE: &str = "cache-write";
57const CACHE_PROCESS: &str = "cache-process";
58
59// Error constants
60const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
61
62// Collection keys
63const INDEX: &str = "index";
64const GENERAL: &str = "general";
65const CURRENCIES: &str = "currencies";
66const INSTRUMENTS: &str = "instruments";
67const SYNTHETICS: &str = "synthetics";
68const ACCOUNTS: &str = "accounts";
69const ORDERS: &str = "orders";
70const POSITIONS: &str = "positions";
71const ACTORS: &str = "actors";
72const STRATEGIES: &str = "strategies";
73const SNAPSHOTS: &str = "snapshots";
74const HEALTH: &str = "health";
75
76// Index keys
77const INDEX_ORDER_IDS: &str = "index:order_ids";
78const INDEX_ORDER_POSITION: &str = "index:order_position";
79const INDEX_ORDER_CLIENT: &str = "index:order_client";
80const INDEX_ORDERS: &str = "index:orders";
81const INDEX_ORDERS_OPEN: &str = "index:orders_open";
82const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
83const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
84const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
85const INDEX_POSITIONS: &str = "index:positions";
86const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
87const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
88
89/// A type of database operation.
90#[derive(Clone, Debug)]
91pub enum DatabaseOperation {
92    Insert,
93    Update,
94    Delete,
95    Close,
96}
97
98/// Represents a database command to be performed which may be executed in a task.
99#[derive(Clone, Debug)]
100pub struct DatabaseCommand {
101    /// The database operation type.
102    pub op_type: DatabaseOperation,
103    /// The primary key for the operation.
104    pub key: Option<String>,
105    /// The data payload for the operation.
106    pub payload: Option<Vec<Bytes>>,
107}
108
109impl DatabaseCommand {
110    /// Creates a new [`DatabaseCommand`] instance.
111    #[must_use]
112    pub const fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
113        Self {
114            op_type,
115            key: Some(key),
116            payload,
117        }
118    }
119
120    /// Initialize a `Close` database command, this is meant to close the database cache channel.
121    #[must_use]
122    pub const fn close() -> Self {
123        Self {
124            op_type: DatabaseOperation::Close,
125            key: None,
126            payload: None,
127        }
128    }
129}
130
131#[cfg_attr(
132    feature = "python",
133    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
134)]
135pub struct RedisCacheDatabase {
136    pub con: ConnectionManager,
137    pub trader_id: TraderId,
138    pub trader_key: String,
139    pub encoding: SerializationEncoding,
140    tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
141    handle: tokio::task::JoinHandle<()>,
142}
143
144impl Debug for RedisCacheDatabase {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct(stringify!(RedisCacheDatabase))
147            .field("trader_id", &self.trader_id)
148            .field("encoding", &self.encoding)
149            .finish()
150    }
151}
152
153impl RedisCacheDatabase {
154    /// Creates a new [`RedisCacheDatabase`] instance for the given `trader_id`, `instance_id`, and `config`.
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if:
159    /// - The database configuration is missing in `config`.
160    /// - Establishing the Redis connection fails.
161    /// - The command processing task cannot be spawned.
162    pub async fn new(
163        trader_id: TraderId,
164        instance_id: UUID4,
165        config: CacheConfig,
166    ) -> anyhow::Result<Self> {
167        install_cryptographic_provider();
168
169        let db_config = config
170            .database
171            .as_ref()
172            .ok_or_else(|| anyhow::anyhow!("No database config"))?;
173        let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
174
175        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
176        let trader_key = get_trader_key(trader_id, instance_id, &config);
177        let trader_key_clone = trader_key.clone();
178        let encoding = config.encoding;
179        let handle = get_runtime().spawn(async move {
180            if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
181                log::error!("Error in task '{CACHE_PROCESS}': {e}");
182            }
183        });
184
185        Ok(Self {
186            con,
187            trader_id,
188            trader_key,
189            encoding,
190            tx,
191            handle,
192        })
193    }
194
195    #[must_use]
196    pub const fn get_encoding(&self) -> SerializationEncoding {
197        self.encoding
198    }
199
200    #[must_use]
201    pub fn get_trader_key(&self) -> &str {
202        &self.trader_key
203    }
204
205    pub fn close(&mut self) {
206        log::debug!("Closing");
207
208        if let Err(e) = self.tx.send(DatabaseCommand::close()) {
209            log::debug!("Error sending close command: {e:?}");
210        }
211
212        log_task_awaiting(CACHE_PROCESS);
213
214        tokio::task::block_in_place(|| {
215            if let Err(e) = get_runtime().block_on(&mut self.handle) {
216                log::error!("Error awaiting task '{CACHE_PROCESS}': {e:?}");
217            }
218        });
219
220        log::debug!("Closed");
221    }
222
223    pub async fn flushdb(&mut self) {
224        if let Err(e) = redis::cmd(REDIS_FLUSHDB)
225            .query_async::<()>(&mut self.con)
226            .await
227        {
228            log::error!("Failed to flush database: {e:?}");
229        }
230    }
231
232    /// Retrieves all keys matching the given `pattern` from Redis for this trader.
233    ///
234    /// # Errors
235    ///
236    /// Returns an error if the underlying Redis scan operation fails.
237    pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
238        let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
239        DatabaseQueries::scan_keys(&mut self.con, pattern).await
240    }
241
242    /// Reads the value(s) associated with `key` for this trader from Redis.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if the underlying Redis read operation fails.
247    pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
248        DatabaseQueries::read(&self.con, &self.trader_key, key).await
249    }
250
251    /// Reads multiple values using bulk operations for efficiency.
252    ///
253    /// # Errors
254    ///
255    /// Returns an error if the underlying Redis read operation fails.
256    pub async fn read_bulk(&mut self, keys: &[String]) -> anyhow::Result<Vec<Option<Bytes>>> {
257        DatabaseQueries::read_bulk(&self.con, keys).await
258    }
259
260    /// Sends an insert command for `key` with optional `payload` to Redis via the background task.
261    ///
262    /// # Errors
263    ///
264    /// Returns an error if the command cannot be sent to the background task channel.
265    pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
266        let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
267        match self.tx.send(op) {
268            Ok(()) => Ok(()),
269            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
270        }
271    }
272
273    /// Sends an update command for `key` with optional `payload` to Redis via the background task.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if the command cannot be sent to the background task channel.
278    pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
279        let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
280        match self.tx.send(op) {
281            Ok(()) => Ok(()),
282            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
283        }
284    }
285
286    /// Sends a delete command for `key` with optional `payload` to Redis via the background task.
287    ///
288    /// # Errors
289    ///
290    /// Returns an error if the command cannot be sent to the background task channel.
291    pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
292        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
293        match self.tx.send(op) {
294            Ok(()) => Ok(()),
295            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
296        }
297    }
298
299    /// Delete the given order from the database with comprehensive index cleanup.
300    ///
301    /// # Errors
302    ///
303    /// Returns an error if the command cannot be sent to the background task channel.
304    pub fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
305        let order_id_bytes = Bytes::from(client_order_id.to_string());
306
307        // Delete the order itself
308        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
309        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
310        self.tx
311            .send(op)
312            .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
313
314        // Delete from all order indexes
315        let index_keys = [
316            INDEX_ORDER_IDS,
317            INDEX_ORDERS,
318            INDEX_ORDERS_OPEN,
319            INDEX_ORDERS_CLOSED,
320            INDEX_ORDERS_EMULATED,
321            INDEX_ORDERS_INFLIGHT,
322        ];
323
324        for index_key in &index_keys {
325            let key = (*index_key).to_string();
326            let payload = vec![order_id_bytes.clone()];
327            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
328            self.tx
329                .send(op)
330                .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
331        }
332
333        // Delete from hash indexes
334        let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
335        for index_key in &hash_indexes {
336            let key = (*index_key).to_string();
337            let payload = vec![order_id_bytes.clone()];
338            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
339            self.tx.send(op).map_err(|e| {
340                anyhow::anyhow!("Failed to send delete order hash index command: {e}")
341            })?;
342        }
343
344        Ok(())
345    }
346
347    /// Delete the given position from the database with comprehensive index cleanup.
348    ///
349    /// # Errors
350    ///
351    /// Returns an error if the command cannot be sent to the background task channel.
352    pub fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
353        let position_id_bytes = Bytes::from(position_id.to_string());
354
355        // Delete the position itself
356        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
357        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
358        self.tx
359            .send(op)
360            .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
361
362        // Delete from all position indexes
363        let index_keys = [
364            INDEX_POSITIONS,
365            INDEX_POSITIONS_OPEN,
366            INDEX_POSITIONS_CLOSED,
367        ];
368
369        for index_key in &index_keys {
370            let key = (*index_key).to_string();
371            let payload = vec![position_id_bytes.clone()];
372            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
373            self.tx.send(op).map_err(|e| {
374                anyhow::anyhow!("Failed to send delete position index command: {e}")
375            })?;
376        }
377
378        Ok(())
379    }
380
381    /// Delete the given account event from the database.
382    ///
383    /// # Errors
384    ///
385    /// Returns an error if the command cannot be sent to the background task channel.
386    pub fn delete_account_event(
387        &self,
388        _account_id: &AccountId,
389        _event_id: &str,
390    ) -> anyhow::Result<()> {
391        tracing::warn!("Deleting account events currently a no-op (pending redesign)");
392        Ok(())
393    }
394}
395
396async fn process_commands(
397    mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
398    trader_key: String,
399    config: CacheConfig,
400) -> anyhow::Result<()> {
401    log_task_started(CACHE_PROCESS);
402
403    let db_config = config
404        .database
405        .as_ref()
406        .ok_or_else(|| anyhow::anyhow!("No database config"))?;
407    let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
408
409    // Buffering
410    let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
411    let mut last_drain = std::time::Instant::now();
412    let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
413
414    // Continue to receive and handle messages until channel is hung up
415    loop {
416        if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
417            drain_buffer(&mut con, &trader_key, &mut buffer).await;
418            last_drain = std::time::Instant::now();
419        } else if let Some(cmd) = rx.recv().await {
420            tracing::trace!("Received {cmd:?}");
421
422            if matches!(cmd.op_type, DatabaseOperation::Close) {
423                break;
424            }
425            buffer.push_back(cmd);
426        } else {
427            tracing::debug!("Command channel closed");
428            break;
429        }
430    }
431
432    // Drain any remaining messages
433    if !buffer.is_empty() {
434        drain_buffer(&mut con, &trader_key, &mut buffer).await;
435    }
436
437    log_task_stopped(CACHE_PROCESS);
438    Ok(())
439}
440
441async fn drain_buffer(
442    conn: &mut ConnectionManager,
443    trader_key: &str,
444    buffer: &mut VecDeque<DatabaseCommand>,
445) {
446    let mut pipe = redis::pipe();
447    pipe.atomic();
448
449    for msg in buffer.drain(..) {
450        let key = if let Some(key) = msg.key {
451            key
452        } else {
453            log::error!("Null key found for message: {msg:?}");
454            continue;
455        };
456        let collection = match get_collection_key(&key) {
457            Ok(collection) => collection,
458            Err(e) => {
459                tracing::error!("{e}");
460                continue; // Continue to next message
461            }
462        };
463
464        let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
465
466        match msg.op_type {
467            DatabaseOperation::Insert => {
468                if let Some(payload) = msg.payload {
469                    log::debug!("Processing INSERT for collection: {collection}, key: {key}");
470                    if let Err(e) = insert(&mut pipe, collection, &key, payload) {
471                        tracing::error!("{e}");
472                    }
473                } else {
474                    tracing::error!("Null `payload` for `insert`");
475                }
476            }
477            DatabaseOperation::Update => {
478                if let Some(payload) = msg.payload {
479                    log::debug!("Processing UPDATE for collection: {collection}, key: {key}");
480                    if let Err(e) = update(&mut pipe, collection, &key, payload) {
481                        tracing::error!("{e}");
482                    }
483                } else {
484                    tracing::error!("Null `payload` for `update`");
485                }
486            }
487            DatabaseOperation::Delete => {
488                tracing::debug!(
489                    "Processing DELETE for collection: {}, key: {}, payload: {:?}",
490                    collection,
491                    key,
492                    msg.payload.as_ref().map(std::vec::Vec::len)
493                );
494                // `payload` can be `None` for a delete operation
495                if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
496                    tracing::error!("{e}");
497                }
498            }
499            DatabaseOperation::Close => panic!("Close command should not be drained"),
500        }
501    }
502
503    if let Err(e) = pipe.query_async::<()>(conn).await {
504        tracing::error!("{e}");
505    }
506}
507
508fn insert(
509    pipe: &mut Pipeline,
510    collection: &str,
511    key: &str,
512    value: Vec<Bytes>,
513) -> anyhow::Result<()> {
514    check_slice_not_empty(value.as_slice(), stringify!(value))?;
515
516    match collection {
517        INDEX => insert_index(pipe, key, &value),
518        GENERAL => {
519            insert_string(pipe, key, value[0].as_ref());
520            Ok(())
521        }
522        CURRENCIES => {
523            insert_string(pipe, key, value[0].as_ref());
524            Ok(())
525        }
526        INSTRUMENTS => {
527            insert_string(pipe, key, value[0].as_ref());
528            Ok(())
529        }
530        SYNTHETICS => {
531            insert_string(pipe, key, value[0].as_ref());
532            Ok(())
533        }
534        ACCOUNTS => {
535            insert_list(pipe, key, value[0].as_ref());
536            Ok(())
537        }
538        ORDERS => {
539            insert_list(pipe, key, value[0].as_ref());
540            Ok(())
541        }
542        POSITIONS => {
543            insert_list(pipe, key, value[0].as_ref());
544            Ok(())
545        }
546        ACTORS => {
547            insert_string(pipe, key, value[0].as_ref());
548            Ok(())
549        }
550        STRATEGIES => {
551            insert_string(pipe, key, value[0].as_ref());
552            Ok(())
553        }
554        SNAPSHOTS => {
555            insert_list(pipe, key, value[0].as_ref());
556            Ok(())
557        }
558        HEALTH => {
559            insert_string(pipe, key, value[0].as_ref());
560            Ok(())
561        }
562        _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
563    }
564}
565
566fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
567    let index_key = get_index_key(key)?;
568    match index_key {
569        INDEX_ORDER_IDS => {
570            insert_set(pipe, key, value[0].as_ref());
571            Ok(())
572        }
573        INDEX_ORDER_POSITION => {
574            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
575            Ok(())
576        }
577        INDEX_ORDER_CLIENT => {
578            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
579            Ok(())
580        }
581        INDEX_ORDERS => {
582            insert_set(pipe, key, value[0].as_ref());
583            Ok(())
584        }
585        INDEX_ORDERS_OPEN => {
586            insert_set(pipe, key, value[0].as_ref());
587            Ok(())
588        }
589        INDEX_ORDERS_CLOSED => {
590            insert_set(pipe, key, value[0].as_ref());
591            Ok(())
592        }
593        INDEX_ORDERS_EMULATED => {
594            insert_set(pipe, key, value[0].as_ref());
595            Ok(())
596        }
597        INDEX_ORDERS_INFLIGHT => {
598            insert_set(pipe, key, value[0].as_ref());
599            Ok(())
600        }
601        INDEX_POSITIONS => {
602            insert_set(pipe, key, value[0].as_ref());
603            Ok(())
604        }
605        INDEX_POSITIONS_OPEN => {
606            insert_set(pipe, key, value[0].as_ref());
607            Ok(())
608        }
609        INDEX_POSITIONS_CLOSED => {
610            insert_set(pipe, key, value[0].as_ref());
611            Ok(())
612        }
613        _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
614    }
615}
616
617fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
618    pipe.set(key, value);
619}
620
621fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
622    pipe.sadd(key, value);
623}
624
625fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
626    pipe.hset(key, name, value);
627}
628
629fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
630    pipe.rpush(key, value);
631}
632
633fn update(
634    pipe: &mut Pipeline,
635    collection: &str,
636    key: &str,
637    value: Vec<Bytes>,
638) -> anyhow::Result<()> {
639    check_slice_not_empty(value.as_slice(), stringify!(value))?;
640
641    match collection {
642        ACCOUNTS => {
643            update_list(pipe, key, value[0].as_ref());
644            Ok(())
645        }
646        ORDERS => {
647            update_list(pipe, key, value[0].as_ref());
648            Ok(())
649        }
650        POSITIONS => {
651            update_list(pipe, key, value[0].as_ref());
652            Ok(())
653        }
654        _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
655    }
656}
657
658fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
659    pipe.rpush_exists(key, value);
660}
661
662fn delete(
663    pipe: &mut Pipeline,
664    collection: &str,
665    key: &str,
666    value: Option<Vec<Bytes>>,
667) -> anyhow::Result<()> {
668    tracing::debug!(
669        "delete: collection={}, key={}, has_payload={}",
670        collection,
671        key,
672        value.is_some()
673    );
674
675    match collection {
676        INDEX => delete_from_index(pipe, key, value),
677        ORDERS => {
678            delete_string(pipe, key);
679            Ok(())
680        }
681        POSITIONS => {
682            delete_string(pipe, key);
683            Ok(())
684        }
685        ACCOUNTS => {
686            delete_string(pipe, key);
687            Ok(())
688        }
689        ACTORS => {
690            delete_string(pipe, key);
691            Ok(())
692        }
693        STRATEGIES => {
694            delete_string(pipe, key);
695            Ok(())
696        }
697        _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
698    }
699}
700
701fn delete_from_index(
702    pipe: &mut Pipeline,
703    key: &str,
704    value: Option<Vec<Bytes>>,
705) -> anyhow::Result<()> {
706    let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
707    let index_key = get_index_key(key)?;
708
709    match index_key {
710        INDEX_ORDER_IDS => {
711            remove_from_set(pipe, key, value[0].as_ref());
712            Ok(())
713        }
714        INDEX_ORDER_POSITION => {
715            remove_from_hash(pipe, key, value[0].as_ref());
716            Ok(())
717        }
718        INDEX_ORDER_CLIENT => {
719            remove_from_hash(pipe, key, value[0].as_ref());
720            Ok(())
721        }
722        INDEX_ORDERS => {
723            remove_from_set(pipe, key, value[0].as_ref());
724            Ok(())
725        }
726        INDEX_ORDERS_OPEN => {
727            remove_from_set(pipe, key, value[0].as_ref());
728            Ok(())
729        }
730        INDEX_ORDERS_CLOSED => {
731            remove_from_set(pipe, key, value[0].as_ref());
732            Ok(())
733        }
734        INDEX_ORDERS_EMULATED => {
735            remove_from_set(pipe, key, value[0].as_ref());
736            Ok(())
737        }
738        INDEX_ORDERS_INFLIGHT => {
739            remove_from_set(pipe, key, value[0].as_ref());
740            Ok(())
741        }
742        INDEX_POSITIONS => {
743            remove_from_set(pipe, key, value[0].as_ref());
744            Ok(())
745        }
746        INDEX_POSITIONS_OPEN => {
747            remove_from_set(pipe, key, value[0].as_ref());
748            Ok(())
749        }
750        INDEX_POSITIONS_CLOSED => {
751            remove_from_set(pipe, key, value[0].as_ref());
752            Ok(())
753        }
754        _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
755    }
756}
757
758fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
759    pipe.srem(key, member);
760}
761
762fn remove_from_hash(pipe: &mut Pipeline, key: &str, field: &[u8]) {
763    pipe.hdel(key, field);
764}
765
766fn delete_string(pipe: &mut Pipeline, key: &str) {
767    pipe.del(key);
768}
769
770fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
771    let mut key = String::new();
772
773    if config.use_trader_prefix {
774        key.push_str("trader-");
775    }
776
777    key.push_str(trader_id.as_str());
778
779    if config.use_instance_id {
780        key.push(REDIS_DELIMITER);
781        key.push_str(&format!("{instance_id}"));
782    }
783
784    key
785}
786
787fn get_collection_key(key: &str) -> anyhow::Result<&str> {
788    key.split_once(REDIS_DELIMITER)
789        .map(|(collection, _)| collection)
790        .ok_or_else(|| {
791            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
792        })
793}
794
795fn get_index_key(key: &str) -> anyhow::Result<&str> {
796    key.split_once(REDIS_DELIMITER)
797        .map(|(_, index_key)| index_key)
798        .ok_or_else(|| {
799            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
800        })
801}
802
803#[allow(dead_code)] // Under development
804#[derive(Debug)]
805pub struct RedisCacheDatabaseAdapter {
806    pub encoding: SerializationEncoding,
807    pub database: RedisCacheDatabase,
808}
809
810#[allow(dead_code)] // Under development
811#[allow(unused)] // Under development
812#[async_trait::async_trait]
813impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
814    fn close(&mut self) -> anyhow::Result<()> {
815        self.database.close();
816        Ok(())
817    }
818
819    fn flush(&mut self) -> anyhow::Result<()> {
820        self.database.flushdb();
821        Ok(())
822    }
823
824    async fn load_all(&self) -> anyhow::Result<CacheMap> {
825        tracing::debug!("Loading all data");
826
827        let (
828            currencies,
829            instruments,
830            synthetics,
831            accounts,
832            orders,
833            positions,
834            greeks,
835            yield_curves,
836        ) = try_join!(
837            self.load_currencies(),
838            self.load_instruments(),
839            self.load_synthetics(),
840            self.load_accounts(),
841            self.load_orders(),
842            self.load_positions(),
843            self.load_greeks(),
844            self.load_yield_curves()
845        )
846        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
847
848        Ok(CacheMap {
849            currencies,
850            instruments,
851            synthetics,
852            accounts,
853            orders,
854            positions,
855            greeks,
856            yield_curves,
857        })
858    }
859
860    fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
861        // self.database.load()
862        Ok(AHashMap::new()) // TODO
863    }
864
865    async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
866        DatabaseQueries::load_currencies(
867            &self.database.con,
868            &self.database.trader_key,
869            self.encoding,
870        )
871        .await
872    }
873
874    async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
875        DatabaseQueries::load_instruments(
876            &self.database.con,
877            &self.database.trader_key,
878            self.encoding,
879        )
880        .await
881    }
882
883    async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
884        DatabaseQueries::load_synthetics(
885            &self.database.con,
886            &self.database.trader_key,
887            self.encoding,
888        )
889        .await
890    }
891
892    async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
893        DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
894            .await
895    }
896
897    async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
898        DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
899            .await
900    }
901
902    async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
903        DatabaseQueries::load_positions(
904            &self.database.con,
905            &self.database.trader_key,
906            self.encoding,
907        )
908        .await
909    }
910
911    fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
912        todo!()
913    }
914
915    fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
916        todo!()
917    }
918
919    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
920        DatabaseQueries::load_currency(
921            &self.database.con,
922            &self.database.trader_key,
923            code,
924            self.encoding,
925        )
926        .await
927    }
928
929    async fn load_instrument(
930        &self,
931        instrument_id: &InstrumentId,
932    ) -> anyhow::Result<Option<InstrumentAny>> {
933        DatabaseQueries::load_instrument(
934            &self.database.con,
935            &self.database.trader_key,
936            instrument_id,
937            self.encoding,
938        )
939        .await
940    }
941
942    async fn load_synthetic(
943        &self,
944        instrument_id: &InstrumentId,
945    ) -> anyhow::Result<Option<SyntheticInstrument>> {
946        DatabaseQueries::load_synthetic(
947            &self.database.con,
948            &self.database.trader_key,
949            instrument_id,
950            self.encoding,
951        )
952        .await
953    }
954
955    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
956        DatabaseQueries::load_account(
957            &self.database.con,
958            &self.database.trader_key,
959            account_id,
960            self.encoding,
961        )
962        .await
963    }
964
965    async fn load_order(
966        &self,
967        client_order_id: &ClientOrderId,
968    ) -> anyhow::Result<Option<OrderAny>> {
969        DatabaseQueries::load_order(
970            &self.database.con,
971            &self.database.trader_key,
972            client_order_id,
973            self.encoding,
974        )
975        .await
976    }
977
978    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
979        DatabaseQueries::load_position(
980            &self.database.con,
981            &self.database.trader_key,
982            position_id,
983            self.encoding,
984        )
985        .await
986    }
987
988    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
989        todo!()
990    }
991
992    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
993        todo!()
994    }
995
996    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
997        todo!()
998    }
999
1000    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
1001        todo!()
1002    }
1003
1004    fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
1005        let order_id_bytes = Bytes::from(client_order_id.to_string());
1006
1007        log::debug!("Deleting order: {client_order_id} from Redis");
1008        log::debug!("Trader key: {}", self.database.trader_key);
1009
1010        // Delete the order itself
1011        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
1012        log::debug!("Deleting order key: {key}");
1013        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1014        self.database
1015            .tx
1016            .send(op)
1017            .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
1018
1019        // Delete from all order indexes
1020        let index_keys = [
1021            INDEX_ORDER_IDS,
1022            INDEX_ORDERS,
1023            INDEX_ORDERS_OPEN,
1024            INDEX_ORDERS_CLOSED,
1025            INDEX_ORDERS_EMULATED,
1026            INDEX_ORDERS_INFLIGHT,
1027        ];
1028
1029        for index_key in &index_keys {
1030            let key = (*index_key).to_string();
1031            log::debug!("Deleting from index: {key} (order_id: {client_order_id})");
1032            let payload = vec![order_id_bytes.clone()];
1033            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1034            self.database
1035                .tx
1036                .send(op)
1037                .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
1038        }
1039
1040        // Delete from hash indexes
1041        let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
1042        for index_key in &hash_indexes {
1043            let key = (*index_key).to_string();
1044            log::debug!("Deleting from hash index: {key} (order_id: {client_order_id})");
1045            let payload = vec![order_id_bytes.clone()];
1046            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1047            self.database.tx.send(op).map_err(|e| {
1048                anyhow::anyhow!("Failed to send delete order hash index command: {e}")
1049            })?;
1050        }
1051
1052        log::debug!("Sent all delete commands for order: {client_order_id}");
1053        Ok(())
1054    }
1055
1056    fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
1057        let position_id_bytes = Bytes::from(position_id.to_string());
1058
1059        // Delete the position itself
1060        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
1061        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1062        self.database
1063            .tx
1064            .send(op)
1065            .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
1066
1067        // Delete from all position indexes
1068        let index_keys = [
1069            INDEX_POSITIONS,
1070            INDEX_POSITIONS_OPEN,
1071            INDEX_POSITIONS_CLOSED,
1072        ];
1073
1074        for index_key in &index_keys {
1075            let key = (*index_key).to_string();
1076            let payload = vec![position_id_bytes.clone()];
1077            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1078            self.database.tx.send(op).map_err(|e| {
1079                anyhow::anyhow!("Failed to send delete position index command: {e}")
1080            })?;
1081        }
1082
1083        Ok(())
1084    }
1085
1086    fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
1087        todo!()
1088    }
1089
1090    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
1091        todo!()
1092    }
1093
1094    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
1095        todo!()
1096    }
1097
1098    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
1099        todo!()
1100    }
1101
1102    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
1103        todo!()
1104    }
1105
1106    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1107        todo!()
1108    }
1109
1110    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
1111        todo!()
1112    }
1113
1114    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
1115        todo!()
1116    }
1117
1118    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
1119        todo!()
1120    }
1121
1122    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
1123        todo!()
1124    }
1125
1126    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
1127        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1128    }
1129
1130    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
1131        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1132    }
1133
1134    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
1135        anyhow::bail!("Loading quote data for Redis cache adapter not supported")
1136    }
1137
1138    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
1139        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1140    }
1141
1142    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
1143        anyhow::bail!("Loading market data for Redis cache adapter not supported")
1144    }
1145
1146    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
1147        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1148    }
1149
1150    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
1151        anyhow::bail!("Loading market data for Redis cache adapter not supported")
1152    }
1153
1154    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
1155        anyhow::bail!("Saving signals for Redis cache adapter not supported")
1156    }
1157
1158    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
1159        anyhow::bail!("Loading signals from Redis cache adapter not supported")
1160    }
1161
1162    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
1163        anyhow::bail!("Saving custom data for Redis cache adapter not supported")
1164    }
1165
1166    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
1167        anyhow::bail!("Loading custom data from Redis cache adapter not supported")
1168    }
1169
1170    fn load_order_snapshot(
1171        &self,
1172        client_order_id: &ClientOrderId,
1173    ) -> anyhow::Result<Option<OrderSnapshot>> {
1174        anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
1175    }
1176
1177    fn load_position_snapshot(
1178        &self,
1179        position_id: &PositionId,
1180    ) -> anyhow::Result<Option<PositionSnapshot>> {
1181        anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
1182    }
1183
1184    fn index_venue_order_id(
1185        &self,
1186        client_order_id: ClientOrderId,
1187        venue_order_id: VenueOrderId,
1188    ) -> anyhow::Result<()> {
1189        todo!()
1190    }
1191
1192    fn index_order_position(
1193        &self,
1194        client_order_id: ClientOrderId,
1195        position_id: PositionId,
1196    ) -> anyhow::Result<()> {
1197        todo!()
1198    }
1199
1200    fn update_actor(&self) -> anyhow::Result<()> {
1201        todo!()
1202    }
1203
1204    fn update_strategy(&self) -> anyhow::Result<()> {
1205        todo!()
1206    }
1207
1208    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1209        todo!()
1210    }
1211
1212    fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
1213        todo!()
1214    }
1215
1216    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
1217        todo!()
1218    }
1219
1220    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1221        todo!()
1222    }
1223
1224    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
1225        todo!()
1226    }
1227
1228    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
1229        todo!()
1230    }
1231}
1232
1233////////////////////////////////////////////////////////////////////////////////
1234// Tests
1235////////////////////////////////////////////////////////////////////////////////
1236#[cfg(test)]
1237mod tests {
1238    use rstest::rstest;
1239
1240    use super::*;
1241
1242    #[rstest]
1243    fn test_get_trader_key_with_prefix_and_instance_id() {
1244        let trader_id = TraderId::from("tester-123");
1245        let instance_id = UUID4::new();
1246        let config = CacheConfig {
1247            use_instance_id: true,
1248            ..Default::default()
1249        };
1250
1251        let key = get_trader_key(trader_id, instance_id, &config);
1252        assert!(key.starts_with("trader-tester-123:"));
1253        assert!(key.ends_with(&instance_id.to_string()));
1254    }
1255
1256    #[rstest]
1257    fn test_get_collection_key_valid() {
1258        let key = "collection:123";
1259        assert_eq!(get_collection_key(key).unwrap(), "collection");
1260    }
1261
1262    #[rstest]
1263    fn test_get_collection_key_invalid() {
1264        let key = "no_delimiter";
1265        assert!(get_collection_key(key).is_err());
1266    }
1267
1268    #[rstest]
1269    fn test_get_index_key_valid() {
1270        let key = "index:123";
1271        assert_eq!(get_index_key(key).unwrap(), "123");
1272    }
1273
1274    #[rstest]
1275    fn test_get_index_key_invalid() {
1276        let key = "no_delimiter";
1277        assert!(get_index_key(key).is_err());
1278    }
1279}