1use std::{
37 collections::VecDeque,
38 fmt::Debug,
39 ops::ControlFlow,
40 pin::Pin,
41 sync::mpsc::{self, SyncSender},
42 time::Duration,
43};
44
45use ahash::AHashMap;
46use bytes::Bytes;
47use nautilus_common::{
48 cache::{
49 CacheConfig,
50 database::{CacheDatabaseAdapter, CacheMap},
51 },
52 custom::CustomData,
53 enums::SerializationEncoding,
54 live::get_runtime,
55 logging::{log_task_awaiting, log_task_started, log_task_stopped},
56 signal::Signal,
57};
58use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
59use nautilus_cryptography::providers::install_cryptographic_provider;
60use nautilus_model::{
61 accounts::AccountAny,
62 data::{Bar, DataType, FundingRateUpdate, QuoteTick, TradeTick},
63 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
64 identifiers::{
65 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
66 TraderId, VenueOrderId,
67 },
68 instruments::{InstrumentAny, SyntheticInstrument},
69 orderbook::OrderBook,
70 orders::OrderAny,
71 position::Position,
72 types::Currency,
73};
74use redis::{Pipeline, aio::ConnectionManager};
75use ustr::Ustr;
76
77use super::{REDIS_DELIMITER, REDIS_FLUSHDB, get_index_key};
78use crate::redis::{create_redis_connection, queries::DatabaseQueries};
79
80const CACHE_READ: &str = "cache-read";
82const CACHE_WRITE: &str = "cache-write";
83const CACHE_PROCESS: &str = "cache-process";
84
85const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
87
88const INDEX: &str = "index";
90const GENERAL: &str = "general";
91const CURRENCIES: &str = "currencies";
92const INSTRUMENTS: &str = "instruments";
93const SYNTHETICS: &str = "synthetics";
94const ACCOUNTS: &str = "accounts";
95const ORDERS: &str = "orders";
96const POSITIONS: &str = "positions";
97const ACTORS: &str = "actors";
98const STRATEGIES: &str = "strategies";
99const SNAPSHOTS: &str = "snapshots";
100const HEALTH: &str = "health";
101
102const INDEX_ORDER_IDS: &str = "index:order_ids";
104const INDEX_ORDER_POSITION: &str = "index:order_position";
105const INDEX_ORDER_CLIENT: &str = "index:order_client";
106const INDEX_ORDERS: &str = "index:orders";
107const INDEX_ORDERS_OPEN: &str = "index:orders_open";
108const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
109const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
110const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
111const INDEX_POSITIONS: &str = "index:positions";
112const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
113const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
114
115#[derive(Clone, Debug)]
117pub enum DatabaseOperation {
118 Insert,
119 Update,
120 Delete,
121 Flush(SyncSender<()>),
122 Close,
123}
124
125#[derive(Clone, Debug)]
127pub struct DatabaseCommand {
128 pub op_type: DatabaseOperation,
130 pub key: Option<String>,
132 pub payload: Option<Vec<Bytes>>,
134}
135
136impl DatabaseCommand {
137 #[must_use]
139 pub const fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
140 Self {
141 op_type,
142 key: Some(key),
143 payload,
144 }
145 }
146
147 #[must_use]
149 pub const fn close() -> Self {
150 Self {
151 op_type: DatabaseOperation::Close,
152 key: None,
153 payload: None,
154 }
155 }
156}
157
158#[cfg_attr(
159 feature = "python",
160 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
161)]
162pub struct RedisCacheDatabase {
163 pub con: ConnectionManager,
164 pub trader_id: TraderId,
165 pub trader_key: String,
166 pub encoding: SerializationEncoding,
167 pub bulk_read_batch_size: Option<usize>,
168 tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
169 handle: Option<tokio::task::JoinHandle<()>>,
170}
171
172impl Debug for RedisCacheDatabase {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 f.debug_struct(stringify!(RedisCacheDatabase))
175 .field("trader_id", &self.trader_id)
176 .field("encoding", &self.encoding)
177 .finish()
178 }
179}
180
181impl RedisCacheDatabase {
182 pub async fn new(
191 trader_id: TraderId,
192 instance_id: UUID4,
193 config: CacheConfig,
194 ) -> anyhow::Result<Self> {
195 install_cryptographic_provider();
196
197 let db_config = config
198 .database
199 .as_ref()
200 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
201 let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
202
203 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
204 let trader_key = get_trader_key(trader_id, instance_id, &config);
205 let trader_key_clone = trader_key.clone();
206 let encoding = config.encoding;
207 let bulk_read_batch_size = config.bulk_read_batch_size;
208
209 let handle = get_runtime().spawn(async move {
210 if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
211 log::error!("Error in task '{CACHE_PROCESS}': {e}");
212 }
213 });
214
215 Ok(Self {
216 con,
217 trader_id,
218 trader_key,
219 encoding,
220 bulk_read_batch_size,
221 tx,
222 handle: Some(handle),
223 })
224 }
225
226 #[must_use]
227 pub const fn get_encoding(&self) -> SerializationEncoding {
228 self.encoding
229 }
230
231 #[must_use]
232 pub fn get_trader_key(&self) -> &str {
233 &self.trader_key
234 }
235
236 pub fn close(&mut self) {
237 log::debug!("Closing");
238
239 let Some(handle) = self.handle.take() else {
240 log::debug!("Already closed");
241 return;
242 };
243
244 if let Err(e) = self.tx.send(DatabaseCommand::close()) {
245 log::debug!("Error sending close command: {e:?}");
246 }
247
248 log_task_awaiting(CACHE_PROCESS);
249
250 let (tx, rx) = mpsc::sync_channel(1);
251
252 get_runtime().spawn(async move {
253 if let Err(e) = handle.await {
254 log::error!("Error awaiting task '{CACHE_PROCESS}': {e:?}");
255 }
256 let _ = tx.send(());
257 });
258 let _ = blocking_recv(&rx);
259
260 log::debug!("Closed");
261 }
262
263 pub async fn flushdb(&mut self) {
264 if let Err(e) = redis::cmd(REDIS_FLUSHDB)
265 .query_async::<()>(&mut self.con)
266 .await
267 {
268 log::error!("Failed to flush database: {e:?}");
269 }
270 }
271
272 pub fn flushdb_sync(&self) -> anyhow::Result<()> {
279 let (reply_tx, reply_rx) = mpsc::sync_channel(1);
280 let cmd = DatabaseCommand {
281 op_type: DatabaseOperation::Flush(reply_tx),
282 key: None,
283 payload: None,
284 };
285 self.tx
286 .send(cmd)
287 .map_err(|e| anyhow::anyhow!("{FAILED_TX_CHANNEL}: {e}"))?;
288 blocking_recv(&reply_rx).map_err(|e| anyhow::anyhow!("Failed to flush database: {e}"))?;
289 Ok(())
290 }
291
292 pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
298 let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
299 DatabaseQueries::scan_keys(&mut self.con, pattern).await
300 }
301
302 pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
308 DatabaseQueries::read(&self.con, &self.trader_key, key).await
309 }
310
311 pub async fn read_bulk(&mut self, keys: &[String]) -> anyhow::Result<Vec<Option<Bytes>>> {
317 match self.bulk_read_batch_size {
318 Some(batch_size) => {
319 DatabaseQueries::read_bulk_batched(&self.con, keys, batch_size).await
320 }
321 None => DatabaseQueries::read_bulk(&self.con, keys).await,
322 }
323 }
324
325 pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
331 let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
332 match self.tx.send(op) {
333 Ok(()) => Ok(()),
334 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
335 }
336 }
337
338 pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
344 let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
345 match self.tx.send(op) {
346 Ok(()) => Ok(()),
347 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
348 }
349 }
350
351 pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
357 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
358 match self.tx.send(op) {
359 Ok(()) => Ok(()),
360 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
361 }
362 }
363
364 pub fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
370 let order_id_bytes = Bytes::from(client_order_id.to_string());
371
372 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
374 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
375 self.tx
376 .send(op)
377 .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
378
379 let index_keys = [
381 INDEX_ORDER_IDS,
382 INDEX_ORDERS,
383 INDEX_ORDERS_OPEN,
384 INDEX_ORDERS_CLOSED,
385 INDEX_ORDERS_EMULATED,
386 INDEX_ORDERS_INFLIGHT,
387 ];
388
389 for index_key in &index_keys {
390 let key = (*index_key).to_string();
391 let payload = vec![order_id_bytes.clone()];
392 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
393 self.tx
394 .send(op)
395 .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
396 }
397
398 let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
400 for index_key in &hash_indexes {
401 let key = (*index_key).to_string();
402 let payload = vec![order_id_bytes.clone()];
403 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
404 self.tx.send(op).map_err(|e| {
405 anyhow::anyhow!("Failed to send delete order hash index command: {e}")
406 })?;
407 }
408
409 Ok(())
410 }
411
412 pub fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
418 let position_id_bytes = Bytes::from(position_id.to_string());
419
420 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
422 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
423 self.tx
424 .send(op)
425 .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
426
427 let index_keys = [
429 INDEX_POSITIONS,
430 INDEX_POSITIONS_OPEN,
431 INDEX_POSITIONS_CLOSED,
432 ];
433
434 for index_key in &index_keys {
435 let key = (*index_key).to_string();
436 let payload = vec![position_id_bytes.clone()];
437 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
438 self.tx.send(op).map_err(|e| {
439 anyhow::anyhow!("Failed to send delete position index command: {e}")
440 })?;
441 }
442
443 Ok(())
444 }
445
446 pub fn delete_account_event(
452 &self,
453 _account_id: &AccountId,
454 _event_id: &str,
455 ) -> anyhow::Result<()> {
456 log::warn!("Deleting account events currently a no-op (pending redesign)");
457 Ok(())
458 }
459}
460
461fn blocking_recv<T>(rx: &mpsc::Receiver<T>) -> Result<T, mpsc::RecvError> {
462 let on_nautilus_runtime = tokio::runtime::Handle::try_current()
463 .ok()
464 .is_some_and(|h| h.id() == get_runtime().handle().id());
465
466 if on_nautilus_runtime {
467 tokio::task::block_in_place(|| rx.recv())
468 } else {
469 rx.recv()
470 }
471}
472
473async fn process_commands(
474 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
475 trader_key: String,
476 config: CacheConfig,
477) -> anyhow::Result<()> {
478 log_task_started(CACHE_PROCESS);
479
480 let db_config = config
481 .database
482 .as_ref()
483 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
484 let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
485
486 let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
488 let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
489
490 let flush_timer = tokio::time::sleep(buffer_interval);
494 tokio::pin!(flush_timer);
495
496 loop {
498 tokio::select! {
499 maybe_cmd = rx.recv() => {
500 let result = handle_command(
501 maybe_cmd,
502 &mut buffer,
503 buffer_interval,
504 &mut con,
505 &trader_key,
506 ).await;
507 if result.is_break() {
508 break;
509 }
510 }
511 () = &mut flush_timer, if !buffer_interval.is_zero() => {
512 flush_buffer(&mut buffer, &mut con, &trader_key, &mut flush_timer, buffer_interval).await;
513 }
514 }
515 }
516
517 if !buffer.is_empty() {
519 drain_buffer(&mut con, &trader_key, &mut buffer).await;
520 }
521
522 log_task_stopped(CACHE_PROCESS);
523 Ok(())
524}
525
526async fn handle_command(
527 maybe_cmd: Option<DatabaseCommand>,
528 buffer: &mut VecDeque<DatabaseCommand>,
529 buffer_interval: Duration,
530 con: &mut ConnectionManager,
531 trader_key: &str,
532) -> ControlFlow<()> {
533 let Some(cmd) = maybe_cmd else {
534 log::debug!("Command channel closed");
535 return ControlFlow::Break(());
536 };
537
538 log::trace!("Received {cmd:?}");
539
540 match cmd.op_type {
541 DatabaseOperation::Close => {
542 if !buffer.is_empty() {
543 drain_buffer(con, trader_key, buffer).await;
544 }
545 return ControlFlow::Break(());
546 }
547 DatabaseOperation::Flush(reply_tx) => {
548 if !buffer.is_empty() {
549 drain_buffer(con, trader_key, buffer).await;
550 }
551 if let Err(e) = redis::cmd(REDIS_FLUSHDB).query_async::<()>(con).await {
552 log::error!("Failed to flush database: {e:?}");
553 }
554 let _ = reply_tx.send(());
555 return ControlFlow::Continue(());
556 }
557 _ => {}
558 }
559
560 buffer.push_back(cmd);
561
562 if buffer_interval.is_zero() {
563 drain_buffer(con, trader_key, buffer).await;
564 }
565
566 ControlFlow::Continue(())
567}
568
569async fn flush_buffer(
570 buffer: &mut VecDeque<DatabaseCommand>,
571 con: &mut ConnectionManager,
572 trader_key: &str,
573 flush_timer: &mut Pin<&mut tokio::time::Sleep>,
574 buffer_interval: Duration,
575) {
576 if !buffer.is_empty() {
577 drain_buffer(con, trader_key, buffer).await;
578 }
579 flush_timer
580 .as_mut()
581 .reset(tokio::time::Instant::now() + buffer_interval);
582}
583
584async fn drain_buffer(
585 conn: &mut ConnectionManager,
586 trader_key: &str,
587 buffer: &mut VecDeque<DatabaseCommand>,
588) {
589 let mut pipe = redis::pipe();
590 pipe.atomic();
591
592 for msg in buffer.drain(..) {
593 let key = if let Some(key) = msg.key {
594 key
595 } else {
596 log::error!("Null key found for message: {msg:?}");
597 continue;
598 };
599 let collection = match get_collection_key(&key) {
600 Ok(collection) => collection,
601 Err(e) => {
602 log::error!("{e}");
603 continue; }
605 };
606
607 let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
608
609 match msg.op_type {
610 DatabaseOperation::Insert => {
611 if let Some(payload) = msg.payload {
612 log::debug!("Processing INSERT for collection: {collection}, key: {key}");
613 if let Err(e) = insert(&mut pipe, collection, &key, payload) {
614 log::error!("{e}");
615 }
616 } else {
617 log::error!("Null `payload` for `insert`");
618 }
619 }
620 DatabaseOperation::Update => {
621 if let Some(payload) = msg.payload {
622 log::debug!("Processing UPDATE for collection: {collection}, key: {key}");
623 if let Err(e) = update(&mut pipe, collection, &key, payload) {
624 log::error!("{e}");
625 }
626 } else {
627 log::error!("Null `payload` for `update`");
628 }
629 }
630 DatabaseOperation::Delete => {
631 log::debug!(
632 "Processing DELETE for collection: {}, key: {}, payload: {:?}",
633 collection,
634 key,
635 msg.payload.as_ref().map(std::vec::Vec::len)
636 );
637 if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
639 log::error!("{e}");
640 }
641 }
642 DatabaseOperation::Close => panic!("Close command should not be drained"),
643 DatabaseOperation::Flush(_) => panic!("Flush command should not be drained"),
644 }
645 }
646
647 if let Err(e) = pipe.query_async::<()>(conn).await {
648 log::error!("{e}");
649 }
650}
651
652fn insert(
653 pipe: &mut Pipeline,
654 collection: &str,
655 key: &str,
656 value: Vec<Bytes>,
657) -> anyhow::Result<()> {
658 check_slice_not_empty(value.as_slice(), stringify!(value))?;
659
660 match collection {
661 INDEX => insert_index(pipe, key, &value),
662 GENERAL => {
663 insert_string(pipe, key, value[0].as_ref());
664 Ok(())
665 }
666 CURRENCIES => {
667 insert_string(pipe, key, value[0].as_ref());
668 Ok(())
669 }
670 INSTRUMENTS => {
671 insert_string(pipe, key, value[0].as_ref());
672 Ok(())
673 }
674 SYNTHETICS => {
675 insert_string(pipe, key, value[0].as_ref());
676 Ok(())
677 }
678 ACCOUNTS => {
679 insert_list(pipe, key, value[0].as_ref());
680 Ok(())
681 }
682 ORDERS => {
683 insert_list(pipe, key, value[0].as_ref());
684 Ok(())
685 }
686 POSITIONS => {
687 insert_list(pipe, key, value[0].as_ref());
688 Ok(())
689 }
690 ACTORS => {
691 insert_string(pipe, key, value[0].as_ref());
692 Ok(())
693 }
694 STRATEGIES => {
695 insert_string(pipe, key, value[0].as_ref());
696 Ok(())
697 }
698 SNAPSHOTS => {
699 insert_list(pipe, key, value[0].as_ref());
700 Ok(())
701 }
702 HEALTH => {
703 insert_string(pipe, key, value[0].as_ref());
704 Ok(())
705 }
706 _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
707 }
708}
709
710fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
711 let index_key = get_index_key(key)?;
712 match index_key {
713 INDEX_ORDER_IDS => {
714 insert_set(pipe, key, value[0].as_ref());
715 Ok(())
716 }
717 INDEX_ORDER_POSITION => {
718 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
719 Ok(())
720 }
721 INDEX_ORDER_CLIENT => {
722 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
723 Ok(())
724 }
725 INDEX_ORDERS => {
726 insert_set(pipe, key, value[0].as_ref());
727 Ok(())
728 }
729 INDEX_ORDERS_OPEN => {
730 insert_set(pipe, key, value[0].as_ref());
731 Ok(())
732 }
733 INDEX_ORDERS_CLOSED => {
734 insert_set(pipe, key, value[0].as_ref());
735 Ok(())
736 }
737 INDEX_ORDERS_EMULATED => {
738 insert_set(pipe, key, value[0].as_ref());
739 Ok(())
740 }
741 INDEX_ORDERS_INFLIGHT => {
742 insert_set(pipe, key, value[0].as_ref());
743 Ok(())
744 }
745 INDEX_POSITIONS => {
746 insert_set(pipe, key, value[0].as_ref());
747 Ok(())
748 }
749 INDEX_POSITIONS_OPEN => {
750 insert_set(pipe, key, value[0].as_ref());
751 Ok(())
752 }
753 INDEX_POSITIONS_CLOSED => {
754 insert_set(pipe, key, value[0].as_ref());
755 Ok(())
756 }
757 _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
758 }
759}
760
761fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
762 pipe.set(key, value);
763}
764
765fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
766 pipe.sadd(key, value);
767}
768
769fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
770 pipe.hset(key, name, value);
771}
772
773fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
774 pipe.rpush(key, value);
775}
776
777fn update(
778 pipe: &mut Pipeline,
779 collection: &str,
780 key: &str,
781 value: Vec<Bytes>,
782) -> anyhow::Result<()> {
783 check_slice_not_empty(value.as_slice(), stringify!(value))?;
784
785 match collection {
786 ACCOUNTS => {
787 update_list(pipe, key, value[0].as_ref());
788 Ok(())
789 }
790 ORDERS => {
791 update_list(pipe, key, value[0].as_ref());
792 Ok(())
793 }
794 POSITIONS => {
795 update_list(pipe, key, value[0].as_ref());
796 Ok(())
797 }
798 _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
799 }
800}
801
802fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
803 pipe.rpush_exists(key, value);
804}
805
806fn delete(
807 pipe: &mut Pipeline,
808 collection: &str,
809 key: &str,
810 value: Option<Vec<Bytes>>,
811) -> anyhow::Result<()> {
812 log::debug!(
813 "delete: collection={}, key={}, has_payload={}",
814 collection,
815 key,
816 value.is_some()
817 );
818
819 match collection {
820 INDEX => delete_from_index(pipe, key, value),
821 ORDERS => {
822 delete_string(pipe, key);
823 Ok(())
824 }
825 POSITIONS => {
826 delete_string(pipe, key);
827 Ok(())
828 }
829 ACCOUNTS => {
830 delete_string(pipe, key);
831 Ok(())
832 }
833 ACTORS => {
834 delete_string(pipe, key);
835 Ok(())
836 }
837 STRATEGIES => {
838 delete_string(pipe, key);
839 Ok(())
840 }
841 _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
842 }
843}
844
845fn delete_from_index(
846 pipe: &mut Pipeline,
847 key: &str,
848 value: Option<Vec<Bytes>>,
849) -> anyhow::Result<()> {
850 let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
851 let index_key = get_index_key(key)?;
852
853 match index_key {
854 INDEX_ORDER_IDS => {
855 remove_from_set(pipe, key, value[0].as_ref());
856 Ok(())
857 }
858 INDEX_ORDER_POSITION => {
859 remove_from_hash(pipe, key, value[0].as_ref());
860 Ok(())
861 }
862 INDEX_ORDER_CLIENT => {
863 remove_from_hash(pipe, key, value[0].as_ref());
864 Ok(())
865 }
866 INDEX_ORDERS => {
867 remove_from_set(pipe, key, value[0].as_ref());
868 Ok(())
869 }
870 INDEX_ORDERS_OPEN => {
871 remove_from_set(pipe, key, value[0].as_ref());
872 Ok(())
873 }
874 INDEX_ORDERS_CLOSED => {
875 remove_from_set(pipe, key, value[0].as_ref());
876 Ok(())
877 }
878 INDEX_ORDERS_EMULATED => {
879 remove_from_set(pipe, key, value[0].as_ref());
880 Ok(())
881 }
882 INDEX_ORDERS_INFLIGHT => {
883 remove_from_set(pipe, key, value[0].as_ref());
884 Ok(())
885 }
886 INDEX_POSITIONS => {
887 remove_from_set(pipe, key, value[0].as_ref());
888 Ok(())
889 }
890 INDEX_POSITIONS_OPEN => {
891 remove_from_set(pipe, key, value[0].as_ref());
892 Ok(())
893 }
894 INDEX_POSITIONS_CLOSED => {
895 remove_from_set(pipe, key, value[0].as_ref());
896 Ok(())
897 }
898 _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
899 }
900}
901
902fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
903 pipe.srem(key, member);
904}
905
906fn remove_from_hash(pipe: &mut Pipeline, key: &str, field: &[u8]) {
907 pipe.hdel(key, field);
908}
909
910fn delete_string(pipe: &mut Pipeline, key: &str) {
911 pipe.del(key);
912}
913
914fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
915 let mut key = String::new();
916
917 if config.use_trader_prefix {
918 key.push_str("trader-");
919 }
920
921 key.push_str(trader_id.as_str());
922
923 if config.use_instance_id {
924 key.push(REDIS_DELIMITER);
925 key.push_str(&format!("{instance_id}"));
926 }
927
928 key
929}
930
931fn get_collection_key(key: &str) -> anyhow::Result<&str> {
932 key.split_once(REDIS_DELIMITER)
933 .map(|(collection, _)| collection)
934 .ok_or_else(|| {
935 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
936 })
937}
938
939#[allow(dead_code)]
940#[derive(Debug)]
941pub struct RedisCacheDatabaseAdapter {
942 pub encoding: SerializationEncoding,
943 pub database: RedisCacheDatabase,
944}
945
946#[allow(dead_code)]
947#[allow(unused)]
948#[async_trait::async_trait]
949impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
950 fn close(&mut self) -> anyhow::Result<()> {
951 self.database.close();
952 Ok(())
953 }
954
955 fn flush(&mut self) -> anyhow::Result<()> {
956 self.database.flushdb_sync()
957 }
958
959 async fn load_all(&self) -> anyhow::Result<CacheMap> {
960 log::debug!("Loading all data");
961
962 let (
963 currencies,
964 instruments,
965 synthetics,
966 accounts,
967 orders,
968 positions,
969 greeks,
970 yield_curves,
971 ) = tokio::try_join!(
972 self.load_currencies(),
973 self.load_instruments(),
974 self.load_synthetics(),
975 self.load_accounts(),
976 self.load_orders(),
977 self.load_positions(),
978 self.load_greeks(),
979 self.load_yield_curves()
980 )
981 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
982
983 Ok(CacheMap {
984 currencies,
985 instruments,
986 synthetics,
987 accounts,
988 orders,
989 positions,
990 greeks,
991 yield_curves,
992 })
993 }
994
995 fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
996 Ok(AHashMap::new()) }
999
1000 async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
1001 DatabaseQueries::load_currencies(
1002 &self.database.con,
1003 &self.database.trader_key,
1004 self.encoding,
1005 )
1006 .await
1007 }
1008
1009 async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
1010 DatabaseQueries::load_instruments(
1011 &self.database.con,
1012 &self.database.trader_key,
1013 self.encoding,
1014 )
1015 .await
1016 }
1017
1018 async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
1019 DatabaseQueries::load_synthetics(
1020 &self.database.con,
1021 &self.database.trader_key,
1022 self.encoding,
1023 )
1024 .await
1025 }
1026
1027 async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
1028 DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
1029 .await
1030 }
1031
1032 async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
1033 DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
1034 .await
1035 }
1036
1037 async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
1038 DatabaseQueries::load_positions(
1039 &self.database.con,
1040 &self.database.trader_key,
1041 self.encoding,
1042 )
1043 .await
1044 }
1045
1046 fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
1047 todo!()
1048 }
1049
1050 fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
1051 todo!()
1052 }
1053
1054 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
1055 DatabaseQueries::load_currency(
1056 &self.database.con,
1057 &self.database.trader_key,
1058 code,
1059 self.encoding,
1060 )
1061 .await
1062 }
1063
1064 async fn load_instrument(
1065 &self,
1066 instrument_id: &InstrumentId,
1067 ) -> anyhow::Result<Option<InstrumentAny>> {
1068 DatabaseQueries::load_instrument(
1069 &self.database.con,
1070 &self.database.trader_key,
1071 instrument_id,
1072 self.encoding,
1073 )
1074 .await
1075 }
1076
1077 async fn load_synthetic(
1078 &self,
1079 instrument_id: &InstrumentId,
1080 ) -> anyhow::Result<Option<SyntheticInstrument>> {
1081 DatabaseQueries::load_synthetic(
1082 &self.database.con,
1083 &self.database.trader_key,
1084 instrument_id,
1085 self.encoding,
1086 )
1087 .await
1088 }
1089
1090 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
1091 DatabaseQueries::load_account(
1092 &self.database.con,
1093 &self.database.trader_key,
1094 account_id,
1095 self.encoding,
1096 )
1097 .await
1098 }
1099
1100 async fn load_order(
1101 &self,
1102 client_order_id: &ClientOrderId,
1103 ) -> anyhow::Result<Option<OrderAny>> {
1104 DatabaseQueries::load_order(
1105 &self.database.con,
1106 &self.database.trader_key,
1107 client_order_id,
1108 self.encoding,
1109 )
1110 .await
1111 }
1112
1113 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
1114 DatabaseQueries::load_position(
1115 &self.database.con,
1116 &self.database.trader_key,
1117 position_id,
1118 self.encoding,
1119 )
1120 .await
1121 }
1122
1123 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
1124 todo!()
1125 }
1126
1127 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
1128 todo!()
1129 }
1130
1131 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
1132 todo!()
1133 }
1134
1135 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
1136 todo!()
1137 }
1138
1139 fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
1140 let order_id_bytes = Bytes::from(client_order_id.to_string());
1141
1142 log::debug!("Deleting order: {client_order_id} from Redis");
1143 log::debug!("Trader key: {}", self.database.trader_key);
1144
1145 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
1147 log::debug!("Deleting order key: {key}");
1148 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1149 self.database
1150 .tx
1151 .send(op)
1152 .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
1153
1154 let index_keys = [
1156 INDEX_ORDER_IDS,
1157 INDEX_ORDERS,
1158 INDEX_ORDERS_OPEN,
1159 INDEX_ORDERS_CLOSED,
1160 INDEX_ORDERS_EMULATED,
1161 INDEX_ORDERS_INFLIGHT,
1162 ];
1163
1164 for index_key in &index_keys {
1165 let key = (*index_key).to_string();
1166 log::debug!("Deleting from index: {key} (order_id: {client_order_id})");
1167 let payload = vec![order_id_bytes.clone()];
1168 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1169 self.database
1170 .tx
1171 .send(op)
1172 .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
1173 }
1174
1175 let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
1177 for index_key in &hash_indexes {
1178 let key = (*index_key).to_string();
1179 log::debug!("Deleting from hash index: {key} (order_id: {client_order_id})");
1180 let payload = vec![order_id_bytes.clone()];
1181 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1182 self.database.tx.send(op).map_err(|e| {
1183 anyhow::anyhow!("Failed to send delete order hash index command: {e}")
1184 })?;
1185 }
1186
1187 log::debug!("Sent all delete commands for order: {client_order_id}");
1188 Ok(())
1189 }
1190
1191 fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
1192 let position_id_bytes = Bytes::from(position_id.to_string());
1193
1194 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
1196 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1197 self.database
1198 .tx
1199 .send(op)
1200 .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
1201
1202 let index_keys = [
1204 INDEX_POSITIONS,
1205 INDEX_POSITIONS_OPEN,
1206 INDEX_POSITIONS_CLOSED,
1207 ];
1208
1209 for index_key in &index_keys {
1210 let key = (*index_key).to_string();
1211 let payload = vec![position_id_bytes.clone()];
1212 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1213 self.database.tx.send(op).map_err(|e| {
1214 anyhow::anyhow!("Failed to send delete position index command: {e}")
1215 })?;
1216 }
1217
1218 Ok(())
1219 }
1220
1221 fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
1222 todo!()
1223 }
1224
1225 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
1226 todo!()
1227 }
1228
1229 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
1230 todo!()
1231 }
1232
1233 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
1234 todo!()
1235 }
1236
1237 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
1238 todo!()
1239 }
1240
1241 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1242 todo!()
1243 }
1244
1245 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
1246 todo!()
1247 }
1248
1249 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
1250 todo!()
1251 }
1252
1253 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
1254 todo!()
1255 }
1256
1257 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
1258 todo!()
1259 }
1260
1261 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
1262 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1263 }
1264
1265 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
1266 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1267 }
1268
1269 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
1270 anyhow::bail!("Loading quote data for Redis cache adapter not supported")
1271 }
1272
1273 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
1274 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1275 }
1276
1277 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
1278 anyhow::bail!("Loading market data for Redis cache adapter not supported")
1279 }
1280
1281 fn add_funding_rate(&self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
1282 anyhow::bail!("Loading market data for Redis cache adapter not supported")
1283 }
1284
1285 fn load_funding_rates(
1286 &self,
1287 instrument_id: &InstrumentId,
1288 ) -> anyhow::Result<Vec<FundingRateUpdate>> {
1289 anyhow::bail!("Loading market data for Redis cache adapter not supported")
1290 }
1291
1292 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
1293 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1294 }
1295
1296 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
1297 anyhow::bail!("Loading market data for Redis cache adapter not supported")
1298 }
1299
1300 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
1301 anyhow::bail!("Saving signals for Redis cache adapter not supported")
1302 }
1303
1304 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
1305 anyhow::bail!("Loading signals from Redis cache adapter not supported")
1306 }
1307
1308 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
1309 anyhow::bail!("Saving custom data for Redis cache adapter not supported")
1310 }
1311
1312 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
1313 anyhow::bail!("Loading custom data from Redis cache adapter not supported")
1314 }
1315
1316 fn load_order_snapshot(
1317 &self,
1318 client_order_id: &ClientOrderId,
1319 ) -> anyhow::Result<Option<OrderSnapshot>> {
1320 anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
1321 }
1322
1323 fn load_position_snapshot(
1324 &self,
1325 position_id: &PositionId,
1326 ) -> anyhow::Result<Option<PositionSnapshot>> {
1327 anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
1328 }
1329
1330 fn index_venue_order_id(
1331 &self,
1332 client_order_id: ClientOrderId,
1333 venue_order_id: VenueOrderId,
1334 ) -> anyhow::Result<()> {
1335 todo!()
1336 }
1337
1338 fn index_order_position(
1339 &self,
1340 client_order_id: ClientOrderId,
1341 position_id: PositionId,
1342 ) -> anyhow::Result<()> {
1343 todo!()
1344 }
1345
1346 fn update_actor(&self) -> anyhow::Result<()> {
1347 todo!()
1348 }
1349
1350 fn update_strategy(&self) -> anyhow::Result<()> {
1351 todo!()
1352 }
1353
1354 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1355 todo!()
1356 }
1357
1358 fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
1359 todo!()
1360 }
1361
1362 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
1363 todo!()
1364 }
1365
1366 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1367 todo!()
1368 }
1369
1370 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
1371 todo!()
1372 }
1373
1374 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
1375 todo!()
1376 }
1377}
1378
1379#[cfg(test)]
1380mod tests {
1381 use rstest::rstest;
1382
1383 use super::*;
1384
1385 #[rstest]
1386 fn test_get_trader_key_with_prefix_and_instance_id() {
1387 let trader_id = TraderId::from("tester-123");
1388 let instance_id = UUID4::new();
1389 let config = CacheConfig {
1390 use_instance_id: true,
1391 ..Default::default()
1392 };
1393
1394 let key = get_trader_key(trader_id, instance_id, &config);
1395 assert!(key.starts_with("trader-tester-123:"));
1396 assert!(key.ends_with(&instance_id.to_string()));
1397 }
1398
1399 #[rstest]
1400 fn test_get_collection_key_valid() {
1401 let key = "collection:123";
1402 assert_eq!(get_collection_key(key).unwrap(), "collection");
1403 }
1404
1405 #[rstest]
1406 fn test_get_collection_key_invalid() {
1407 let key = "no_delimiter";
1408 assert!(get_collection_key(key).is_err());
1409 }
1410}