1use std::{collections::VecDeque, fmt::Debug, time::Duration};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use nautilus_common::{
21 cache::{
22 CacheConfig,
23 database::{CacheDatabaseAdapter, CacheMap},
24 },
25 custom::CustomData,
26 enums::SerializationEncoding,
27 logging::{log_task_awaiting, log_task_started, log_task_stopped},
28 runtime::get_runtime,
29 signal::Signal,
30};
31use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
32use nautilus_cryptography::providers::install_cryptographic_provider;
33use nautilus_model::{
34 accounts::AccountAny,
35 data::{Bar, DataType, QuoteTick, TradeTick},
36 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
37 identifiers::{
38 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
39 TraderId, VenueOrderId,
40 },
41 instruments::{InstrumentAny, SyntheticInstrument},
42 orderbook::OrderBook,
43 orders::OrderAny,
44 position::Position,
45 types::Currency,
46};
47use redis::{Pipeline, aio::ConnectionManager};
48use tokio::try_join;
49use ustr::Ustr;
50
51use super::{REDIS_DELIMITER, REDIS_FLUSHDB};
52use crate::redis::{create_redis_connection, queries::DatabaseQueries};
53
54const CACHE_READ: &str = "cache-read";
56const CACHE_WRITE: &str = "cache-write";
57const CACHE_PROCESS: &str = "cache-process";
58
59const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
61
62const INDEX: &str = "index";
64const GENERAL: &str = "general";
65const CURRENCIES: &str = "currencies";
66const INSTRUMENTS: &str = "instruments";
67const SYNTHETICS: &str = "synthetics";
68const ACCOUNTS: &str = "accounts";
69const ORDERS: &str = "orders";
70const POSITIONS: &str = "positions";
71const ACTORS: &str = "actors";
72const STRATEGIES: &str = "strategies";
73const SNAPSHOTS: &str = "snapshots";
74const HEALTH: &str = "health";
75
76const INDEX_ORDER_IDS: &str = "index:order_ids";
78const INDEX_ORDER_POSITION: &str = "index:order_position";
79const INDEX_ORDER_CLIENT: &str = "index:order_client";
80const INDEX_ORDERS: &str = "index:orders";
81const INDEX_ORDERS_OPEN: &str = "index:orders_open";
82const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
83const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
84const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
85const INDEX_POSITIONS: &str = "index:positions";
86const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
87const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
88
89#[derive(Clone, Debug)]
91pub enum DatabaseOperation {
92 Insert,
93 Update,
94 Delete,
95 Close,
96}
97
98#[derive(Clone, Debug)]
100pub struct DatabaseCommand {
101 pub op_type: DatabaseOperation,
103 pub key: Option<String>,
105 pub payload: Option<Vec<Bytes>>,
107}
108
109impl DatabaseCommand {
110 #[must_use]
112 pub const fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
113 Self {
114 op_type,
115 key: Some(key),
116 payload,
117 }
118 }
119
120 #[must_use]
122 pub const fn close() -> Self {
123 Self {
124 op_type: DatabaseOperation::Close,
125 key: None,
126 payload: None,
127 }
128 }
129}
130
131#[cfg_attr(
132 feature = "python",
133 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
134)]
135pub struct RedisCacheDatabase {
136 pub con: ConnectionManager,
137 pub trader_id: TraderId,
138 pub trader_key: String,
139 pub encoding: SerializationEncoding,
140 tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
141 handle: tokio::task::JoinHandle<()>,
142}
143
144impl Debug for RedisCacheDatabase {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct(stringify!(RedisCacheDatabase))
147 .field("trader_id", &self.trader_id)
148 .field("encoding", &self.encoding)
149 .finish()
150 }
151}
152
153impl RedisCacheDatabase {
154 pub async fn new(
163 trader_id: TraderId,
164 instance_id: UUID4,
165 config: CacheConfig,
166 ) -> anyhow::Result<Self> {
167 install_cryptographic_provider();
168
169 let db_config = config
170 .database
171 .as_ref()
172 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
173 let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
174
175 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
176 let trader_key = get_trader_key(trader_id, instance_id, &config);
177 let trader_key_clone = trader_key.clone();
178 let encoding = config.encoding;
179
180 let handle = get_runtime().spawn(async move {
181 if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
182 log::error!("Error in task '{CACHE_PROCESS}': {e}");
183 }
184 });
185
186 Ok(Self {
187 con,
188 trader_id,
189 trader_key,
190 encoding,
191 tx,
192 handle,
193 })
194 }
195
196 #[must_use]
197 pub const fn get_encoding(&self) -> SerializationEncoding {
198 self.encoding
199 }
200
201 #[must_use]
202 pub fn get_trader_key(&self) -> &str {
203 &self.trader_key
204 }
205
206 pub fn close(&mut self) {
207 log::debug!("Closing");
208
209 if let Err(e) = self.tx.send(DatabaseCommand::close()) {
210 log::debug!("Error sending close command: {e:?}");
211 }
212
213 log_task_awaiting(CACHE_PROCESS);
214
215 tokio::task::block_in_place(|| {
216 if let Err(e) = get_runtime().block_on(&mut self.handle) {
217 log::error!("Error awaiting task '{CACHE_PROCESS}': {e:?}");
218 }
219 });
220
221 log::debug!("Closed");
222 }
223
224 pub async fn flushdb(&mut self) {
225 if let Err(e) = redis::cmd(REDIS_FLUSHDB)
226 .query_async::<()>(&mut self.con)
227 .await
228 {
229 log::error!("Failed to flush database: {e:?}");
230 }
231 }
232
233 pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
239 let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
240 DatabaseQueries::scan_keys(&mut self.con, pattern).await
241 }
242
243 pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
249 DatabaseQueries::read(&self.con, &self.trader_key, key).await
250 }
251
252 pub async fn read_bulk(&mut self, keys: &[String]) -> anyhow::Result<Vec<Option<Bytes>>> {
258 DatabaseQueries::read_bulk(&self.con, keys).await
259 }
260
261 pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
267 let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
268 match self.tx.send(op) {
269 Ok(()) => Ok(()),
270 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
271 }
272 }
273
274 pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
280 let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
281 match self.tx.send(op) {
282 Ok(()) => Ok(()),
283 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
284 }
285 }
286
287 pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
293 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
294 match self.tx.send(op) {
295 Ok(()) => Ok(()),
296 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
297 }
298 }
299
300 pub fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
306 let order_id_bytes = Bytes::from(client_order_id.to_string());
307
308 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
310 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
311 self.tx
312 .send(op)
313 .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
314
315 let index_keys = [
317 INDEX_ORDER_IDS,
318 INDEX_ORDERS,
319 INDEX_ORDERS_OPEN,
320 INDEX_ORDERS_CLOSED,
321 INDEX_ORDERS_EMULATED,
322 INDEX_ORDERS_INFLIGHT,
323 ];
324
325 for index_key in &index_keys {
326 let key = (*index_key).to_string();
327 let payload = vec![order_id_bytes.clone()];
328 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
329 self.tx
330 .send(op)
331 .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
332 }
333
334 let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
336 for index_key in &hash_indexes {
337 let key = (*index_key).to_string();
338 let payload = vec![order_id_bytes.clone()];
339 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
340 self.tx.send(op).map_err(|e| {
341 anyhow::anyhow!("Failed to send delete order hash index command: {e}")
342 })?;
343 }
344
345 Ok(())
346 }
347
348 pub fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
354 let position_id_bytes = Bytes::from(position_id.to_string());
355
356 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
358 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
359 self.tx
360 .send(op)
361 .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
362
363 let index_keys = [
365 INDEX_POSITIONS,
366 INDEX_POSITIONS_OPEN,
367 INDEX_POSITIONS_CLOSED,
368 ];
369
370 for index_key in &index_keys {
371 let key = (*index_key).to_string();
372 let payload = vec![position_id_bytes.clone()];
373 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
374 self.tx.send(op).map_err(|e| {
375 anyhow::anyhow!("Failed to send delete position index command: {e}")
376 })?;
377 }
378
379 Ok(())
380 }
381
382 pub fn delete_account_event(
388 &self,
389 _account_id: &AccountId,
390 _event_id: &str,
391 ) -> anyhow::Result<()> {
392 tracing::warn!("Deleting account events currently a no-op (pending redesign)");
393 Ok(())
394 }
395}
396
397async fn process_commands(
398 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
399 trader_key: String,
400 config: CacheConfig,
401) -> anyhow::Result<()> {
402 log_task_started(CACHE_PROCESS);
403
404 let db_config = config
405 .database
406 .as_ref()
407 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
408 let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
409
410 let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
412 let mut last_drain = std::time::Instant::now();
413 let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
414
415 loop {
417 if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
418 drain_buffer(&mut con, &trader_key, &mut buffer).await;
419 last_drain = std::time::Instant::now();
420 } else if let Some(cmd) = rx.recv().await {
421 tracing::trace!("Received {cmd:?}");
422
423 if matches!(cmd.op_type, DatabaseOperation::Close) {
424 break;
425 }
426 buffer.push_back(cmd);
427 } else {
428 tracing::debug!("Command channel closed");
429 break;
430 }
431 }
432
433 if !buffer.is_empty() {
435 drain_buffer(&mut con, &trader_key, &mut buffer).await;
436 }
437
438 log_task_stopped(CACHE_PROCESS);
439 Ok(())
440}
441
442async fn drain_buffer(
443 conn: &mut ConnectionManager,
444 trader_key: &str,
445 buffer: &mut VecDeque<DatabaseCommand>,
446) {
447 let mut pipe = redis::pipe();
448 pipe.atomic();
449
450 for msg in buffer.drain(..) {
451 let key = if let Some(key) = msg.key {
452 key
453 } else {
454 log::error!("Null key found for message: {msg:?}");
455 continue;
456 };
457 let collection = match get_collection_key(&key) {
458 Ok(collection) => collection,
459 Err(e) => {
460 tracing::error!("{e}");
461 continue; }
463 };
464
465 let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
466
467 match msg.op_type {
468 DatabaseOperation::Insert => {
469 if let Some(payload) = msg.payload {
470 log::debug!("Processing INSERT for collection: {collection}, key: {key}");
471 if let Err(e) = insert(&mut pipe, collection, &key, payload) {
472 tracing::error!("{e}");
473 }
474 } else {
475 tracing::error!("Null `payload` for `insert`");
476 }
477 }
478 DatabaseOperation::Update => {
479 if let Some(payload) = msg.payload {
480 log::debug!("Processing UPDATE for collection: {collection}, key: {key}");
481 if let Err(e) = update(&mut pipe, collection, &key, payload) {
482 tracing::error!("{e}");
483 }
484 } else {
485 tracing::error!("Null `payload` for `update`");
486 }
487 }
488 DatabaseOperation::Delete => {
489 tracing::debug!(
490 "Processing DELETE for collection: {}, key: {}, payload: {:?}",
491 collection,
492 key,
493 msg.payload.as_ref().map(std::vec::Vec::len)
494 );
495 if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
497 tracing::error!("{e}");
498 }
499 }
500 DatabaseOperation::Close => panic!("Close command should not be drained"),
501 }
502 }
503
504 if let Err(e) = pipe.query_async::<()>(conn).await {
505 tracing::error!("{e}");
506 }
507}
508
509fn insert(
510 pipe: &mut Pipeline,
511 collection: &str,
512 key: &str,
513 value: Vec<Bytes>,
514) -> anyhow::Result<()> {
515 check_slice_not_empty(value.as_slice(), stringify!(value))?;
516
517 match collection {
518 INDEX => insert_index(pipe, key, &value),
519 GENERAL => {
520 insert_string(pipe, key, value[0].as_ref());
521 Ok(())
522 }
523 CURRENCIES => {
524 insert_string(pipe, key, value[0].as_ref());
525 Ok(())
526 }
527 INSTRUMENTS => {
528 insert_string(pipe, key, value[0].as_ref());
529 Ok(())
530 }
531 SYNTHETICS => {
532 insert_string(pipe, key, value[0].as_ref());
533 Ok(())
534 }
535 ACCOUNTS => {
536 insert_list(pipe, key, value[0].as_ref());
537 Ok(())
538 }
539 ORDERS => {
540 insert_list(pipe, key, value[0].as_ref());
541 Ok(())
542 }
543 POSITIONS => {
544 insert_list(pipe, key, value[0].as_ref());
545 Ok(())
546 }
547 ACTORS => {
548 insert_string(pipe, key, value[0].as_ref());
549 Ok(())
550 }
551 STRATEGIES => {
552 insert_string(pipe, key, value[0].as_ref());
553 Ok(())
554 }
555 SNAPSHOTS => {
556 insert_list(pipe, key, value[0].as_ref());
557 Ok(())
558 }
559 HEALTH => {
560 insert_string(pipe, key, value[0].as_ref());
561 Ok(())
562 }
563 _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
564 }
565}
566
567fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
568 let index_key = get_index_key(key)?;
569 match index_key {
570 INDEX_ORDER_IDS => {
571 insert_set(pipe, key, value[0].as_ref());
572 Ok(())
573 }
574 INDEX_ORDER_POSITION => {
575 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
576 Ok(())
577 }
578 INDEX_ORDER_CLIENT => {
579 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
580 Ok(())
581 }
582 INDEX_ORDERS => {
583 insert_set(pipe, key, value[0].as_ref());
584 Ok(())
585 }
586 INDEX_ORDERS_OPEN => {
587 insert_set(pipe, key, value[0].as_ref());
588 Ok(())
589 }
590 INDEX_ORDERS_CLOSED => {
591 insert_set(pipe, key, value[0].as_ref());
592 Ok(())
593 }
594 INDEX_ORDERS_EMULATED => {
595 insert_set(pipe, key, value[0].as_ref());
596 Ok(())
597 }
598 INDEX_ORDERS_INFLIGHT => {
599 insert_set(pipe, key, value[0].as_ref());
600 Ok(())
601 }
602 INDEX_POSITIONS => {
603 insert_set(pipe, key, value[0].as_ref());
604 Ok(())
605 }
606 INDEX_POSITIONS_OPEN => {
607 insert_set(pipe, key, value[0].as_ref());
608 Ok(())
609 }
610 INDEX_POSITIONS_CLOSED => {
611 insert_set(pipe, key, value[0].as_ref());
612 Ok(())
613 }
614 _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
615 }
616}
617
618fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
619 pipe.set(key, value);
620}
621
622fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
623 pipe.sadd(key, value);
624}
625
626fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
627 pipe.hset(key, name, value);
628}
629
630fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
631 pipe.rpush(key, value);
632}
633
634fn update(
635 pipe: &mut Pipeline,
636 collection: &str,
637 key: &str,
638 value: Vec<Bytes>,
639) -> anyhow::Result<()> {
640 check_slice_not_empty(value.as_slice(), stringify!(value))?;
641
642 match collection {
643 ACCOUNTS => {
644 update_list(pipe, key, value[0].as_ref());
645 Ok(())
646 }
647 ORDERS => {
648 update_list(pipe, key, value[0].as_ref());
649 Ok(())
650 }
651 POSITIONS => {
652 update_list(pipe, key, value[0].as_ref());
653 Ok(())
654 }
655 _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
656 }
657}
658
659fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
660 pipe.rpush_exists(key, value);
661}
662
663fn delete(
664 pipe: &mut Pipeline,
665 collection: &str,
666 key: &str,
667 value: Option<Vec<Bytes>>,
668) -> anyhow::Result<()> {
669 tracing::debug!(
670 "delete: collection={}, key={}, has_payload={}",
671 collection,
672 key,
673 value.is_some()
674 );
675
676 match collection {
677 INDEX => delete_from_index(pipe, key, value),
678 ORDERS => {
679 delete_string(pipe, key);
680 Ok(())
681 }
682 POSITIONS => {
683 delete_string(pipe, key);
684 Ok(())
685 }
686 ACCOUNTS => {
687 delete_string(pipe, key);
688 Ok(())
689 }
690 ACTORS => {
691 delete_string(pipe, key);
692 Ok(())
693 }
694 STRATEGIES => {
695 delete_string(pipe, key);
696 Ok(())
697 }
698 _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
699 }
700}
701
702fn delete_from_index(
703 pipe: &mut Pipeline,
704 key: &str,
705 value: Option<Vec<Bytes>>,
706) -> anyhow::Result<()> {
707 let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
708 let index_key = get_index_key(key)?;
709
710 match index_key {
711 INDEX_ORDER_IDS => {
712 remove_from_set(pipe, key, value[0].as_ref());
713 Ok(())
714 }
715 INDEX_ORDER_POSITION => {
716 remove_from_hash(pipe, key, value[0].as_ref());
717 Ok(())
718 }
719 INDEX_ORDER_CLIENT => {
720 remove_from_hash(pipe, key, value[0].as_ref());
721 Ok(())
722 }
723 INDEX_ORDERS => {
724 remove_from_set(pipe, key, value[0].as_ref());
725 Ok(())
726 }
727 INDEX_ORDERS_OPEN => {
728 remove_from_set(pipe, key, value[0].as_ref());
729 Ok(())
730 }
731 INDEX_ORDERS_CLOSED => {
732 remove_from_set(pipe, key, value[0].as_ref());
733 Ok(())
734 }
735 INDEX_ORDERS_EMULATED => {
736 remove_from_set(pipe, key, value[0].as_ref());
737 Ok(())
738 }
739 INDEX_ORDERS_INFLIGHT => {
740 remove_from_set(pipe, key, value[0].as_ref());
741 Ok(())
742 }
743 INDEX_POSITIONS => {
744 remove_from_set(pipe, key, value[0].as_ref());
745 Ok(())
746 }
747 INDEX_POSITIONS_OPEN => {
748 remove_from_set(pipe, key, value[0].as_ref());
749 Ok(())
750 }
751 INDEX_POSITIONS_CLOSED => {
752 remove_from_set(pipe, key, value[0].as_ref());
753 Ok(())
754 }
755 _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
756 }
757}
758
759fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
760 pipe.srem(key, member);
761}
762
763fn remove_from_hash(pipe: &mut Pipeline, key: &str, field: &[u8]) {
764 pipe.hdel(key, field);
765}
766
767fn delete_string(pipe: &mut Pipeline, key: &str) {
768 pipe.del(key);
769}
770
771fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
772 let mut key = String::new();
773
774 if config.use_trader_prefix {
775 key.push_str("trader-");
776 }
777
778 key.push_str(trader_id.as_str());
779
780 if config.use_instance_id {
781 key.push(REDIS_DELIMITER);
782 key.push_str(&format!("{instance_id}"));
783 }
784
785 key
786}
787
788fn get_collection_key(key: &str) -> anyhow::Result<&str> {
789 key.split_once(REDIS_DELIMITER)
790 .map(|(collection, _)| collection)
791 .ok_or_else(|| {
792 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
793 })
794}
795
796fn get_index_key(key: &str) -> anyhow::Result<&str> {
797 key.split_once(REDIS_DELIMITER)
798 .map(|(_, index_key)| index_key)
799 .ok_or_else(|| {
800 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
801 })
802}
803
804#[allow(dead_code)]
805#[derive(Debug)]
806pub struct RedisCacheDatabaseAdapter {
807 pub encoding: SerializationEncoding,
808 pub database: RedisCacheDatabase,
809}
810
811#[allow(dead_code)]
812#[allow(unused)]
813#[async_trait::async_trait]
814impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
815 fn close(&mut self) -> anyhow::Result<()> {
816 self.database.close();
817 Ok(())
818 }
819
820 fn flush(&mut self) -> anyhow::Result<()> {
821 self.database.flushdb();
822 Ok(())
823 }
824
825 async fn load_all(&self) -> anyhow::Result<CacheMap> {
826 tracing::debug!("Loading all data");
827
828 let (
829 currencies,
830 instruments,
831 synthetics,
832 accounts,
833 orders,
834 positions,
835 greeks,
836 yield_curves,
837 ) = try_join!(
838 self.load_currencies(),
839 self.load_instruments(),
840 self.load_synthetics(),
841 self.load_accounts(),
842 self.load_orders(),
843 self.load_positions(),
844 self.load_greeks(),
845 self.load_yield_curves()
846 )
847 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
848
849 Ok(CacheMap {
850 currencies,
851 instruments,
852 synthetics,
853 accounts,
854 orders,
855 positions,
856 greeks,
857 yield_curves,
858 })
859 }
860
861 fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
862 Ok(AHashMap::new()) }
865
866 async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
867 DatabaseQueries::load_currencies(
868 &self.database.con,
869 &self.database.trader_key,
870 self.encoding,
871 )
872 .await
873 }
874
875 async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
876 DatabaseQueries::load_instruments(
877 &self.database.con,
878 &self.database.trader_key,
879 self.encoding,
880 )
881 .await
882 }
883
884 async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
885 DatabaseQueries::load_synthetics(
886 &self.database.con,
887 &self.database.trader_key,
888 self.encoding,
889 )
890 .await
891 }
892
893 async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
894 DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
895 .await
896 }
897
898 async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
899 DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
900 .await
901 }
902
903 async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
904 DatabaseQueries::load_positions(
905 &self.database.con,
906 &self.database.trader_key,
907 self.encoding,
908 )
909 .await
910 }
911
912 fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
913 todo!()
914 }
915
916 fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
917 todo!()
918 }
919
920 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
921 DatabaseQueries::load_currency(
922 &self.database.con,
923 &self.database.trader_key,
924 code,
925 self.encoding,
926 )
927 .await
928 }
929
930 async fn load_instrument(
931 &self,
932 instrument_id: &InstrumentId,
933 ) -> anyhow::Result<Option<InstrumentAny>> {
934 DatabaseQueries::load_instrument(
935 &self.database.con,
936 &self.database.trader_key,
937 instrument_id,
938 self.encoding,
939 )
940 .await
941 }
942
943 async fn load_synthetic(
944 &self,
945 instrument_id: &InstrumentId,
946 ) -> anyhow::Result<Option<SyntheticInstrument>> {
947 DatabaseQueries::load_synthetic(
948 &self.database.con,
949 &self.database.trader_key,
950 instrument_id,
951 self.encoding,
952 )
953 .await
954 }
955
956 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
957 DatabaseQueries::load_account(
958 &self.database.con,
959 &self.database.trader_key,
960 account_id,
961 self.encoding,
962 )
963 .await
964 }
965
966 async fn load_order(
967 &self,
968 client_order_id: &ClientOrderId,
969 ) -> anyhow::Result<Option<OrderAny>> {
970 DatabaseQueries::load_order(
971 &self.database.con,
972 &self.database.trader_key,
973 client_order_id,
974 self.encoding,
975 )
976 .await
977 }
978
979 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
980 DatabaseQueries::load_position(
981 &self.database.con,
982 &self.database.trader_key,
983 position_id,
984 self.encoding,
985 )
986 .await
987 }
988
989 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
990 todo!()
991 }
992
993 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
994 todo!()
995 }
996
997 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
998 todo!()
999 }
1000
1001 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
1002 todo!()
1003 }
1004
1005 fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
1006 let order_id_bytes = Bytes::from(client_order_id.to_string());
1007
1008 log::debug!("Deleting order: {client_order_id} from Redis");
1009 log::debug!("Trader key: {}", self.database.trader_key);
1010
1011 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
1013 log::debug!("Deleting order key: {key}");
1014 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1015 self.database
1016 .tx
1017 .send(op)
1018 .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
1019
1020 let index_keys = [
1022 INDEX_ORDER_IDS,
1023 INDEX_ORDERS,
1024 INDEX_ORDERS_OPEN,
1025 INDEX_ORDERS_CLOSED,
1026 INDEX_ORDERS_EMULATED,
1027 INDEX_ORDERS_INFLIGHT,
1028 ];
1029
1030 for index_key in &index_keys {
1031 let key = (*index_key).to_string();
1032 log::debug!("Deleting from index: {key} (order_id: {client_order_id})");
1033 let payload = vec![order_id_bytes.clone()];
1034 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1035 self.database
1036 .tx
1037 .send(op)
1038 .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
1039 }
1040
1041 let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
1043 for index_key in &hash_indexes {
1044 let key = (*index_key).to_string();
1045 log::debug!("Deleting from hash index: {key} (order_id: {client_order_id})");
1046 let payload = vec![order_id_bytes.clone()];
1047 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1048 self.database.tx.send(op).map_err(|e| {
1049 anyhow::anyhow!("Failed to send delete order hash index command: {e}")
1050 })?;
1051 }
1052
1053 log::debug!("Sent all delete commands for order: {client_order_id}");
1054 Ok(())
1055 }
1056
1057 fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
1058 let position_id_bytes = Bytes::from(position_id.to_string());
1059
1060 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
1062 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1063 self.database
1064 .tx
1065 .send(op)
1066 .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
1067
1068 let index_keys = [
1070 INDEX_POSITIONS,
1071 INDEX_POSITIONS_OPEN,
1072 INDEX_POSITIONS_CLOSED,
1073 ];
1074
1075 for index_key in &index_keys {
1076 let key = (*index_key).to_string();
1077 let payload = vec![position_id_bytes.clone()];
1078 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1079 self.database.tx.send(op).map_err(|e| {
1080 anyhow::anyhow!("Failed to send delete position index command: {e}")
1081 })?;
1082 }
1083
1084 Ok(())
1085 }
1086
1087 fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
1088 todo!()
1089 }
1090
1091 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
1092 todo!()
1093 }
1094
1095 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
1096 todo!()
1097 }
1098
1099 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
1100 todo!()
1101 }
1102
1103 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
1104 todo!()
1105 }
1106
1107 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1108 todo!()
1109 }
1110
1111 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
1112 todo!()
1113 }
1114
1115 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
1116 todo!()
1117 }
1118
1119 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
1120 todo!()
1121 }
1122
1123 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
1124 todo!()
1125 }
1126
1127 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
1128 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1129 }
1130
1131 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
1132 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1133 }
1134
1135 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
1136 anyhow::bail!("Loading quote data for Redis cache adapter not supported")
1137 }
1138
1139 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
1140 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1141 }
1142
1143 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
1144 anyhow::bail!("Loading market data for Redis cache adapter not supported")
1145 }
1146
1147 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
1148 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1149 }
1150
1151 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
1152 anyhow::bail!("Loading market data for Redis cache adapter not supported")
1153 }
1154
1155 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
1156 anyhow::bail!("Saving signals for Redis cache adapter not supported")
1157 }
1158
1159 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
1160 anyhow::bail!("Loading signals from Redis cache adapter not supported")
1161 }
1162
1163 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
1164 anyhow::bail!("Saving custom data for Redis cache adapter not supported")
1165 }
1166
1167 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
1168 anyhow::bail!("Loading custom data from Redis cache adapter not supported")
1169 }
1170
1171 fn load_order_snapshot(
1172 &self,
1173 client_order_id: &ClientOrderId,
1174 ) -> anyhow::Result<Option<OrderSnapshot>> {
1175 anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
1176 }
1177
1178 fn load_position_snapshot(
1179 &self,
1180 position_id: &PositionId,
1181 ) -> anyhow::Result<Option<PositionSnapshot>> {
1182 anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
1183 }
1184
1185 fn index_venue_order_id(
1186 &self,
1187 client_order_id: ClientOrderId,
1188 venue_order_id: VenueOrderId,
1189 ) -> anyhow::Result<()> {
1190 todo!()
1191 }
1192
1193 fn index_order_position(
1194 &self,
1195 client_order_id: ClientOrderId,
1196 position_id: PositionId,
1197 ) -> anyhow::Result<()> {
1198 todo!()
1199 }
1200
1201 fn update_actor(&self) -> anyhow::Result<()> {
1202 todo!()
1203 }
1204
1205 fn update_strategy(&self) -> anyhow::Result<()> {
1206 todo!()
1207 }
1208
1209 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1210 todo!()
1211 }
1212
1213 fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
1214 todo!()
1215 }
1216
1217 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
1218 todo!()
1219 }
1220
1221 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1222 todo!()
1223 }
1224
1225 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
1226 todo!()
1227 }
1228
1229 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
1230 todo!()
1231 }
1232}
1233
1234#[cfg(test)]
1238mod tests {
1239 use rstest::rstest;
1240
1241 use super::*;
1242
1243 #[rstest]
1244 fn test_get_trader_key_with_prefix_and_instance_id() {
1245 let trader_id = TraderId::from("tester-123");
1246 let instance_id = UUID4::new();
1247 let config = CacheConfig {
1248 use_instance_id: true,
1249 ..Default::default()
1250 };
1251
1252 let key = get_trader_key(trader_id, instance_id, &config);
1253 assert!(key.starts_with("trader-tester-123:"));
1254 assert!(key.ends_with(&instance_id.to_string()));
1255 }
1256
1257 #[rstest]
1258 fn test_get_collection_key_valid() {
1259 let key = "collection:123";
1260 assert_eq!(get_collection_key(key).unwrap(), "collection");
1261 }
1262
1263 #[rstest]
1264 fn test_get_collection_key_invalid() {
1265 let key = "no_delimiter";
1266 assert!(get_collection_key(key).is_err());
1267 }
1268
1269 #[rstest]
1270 fn test_get_index_key_valid() {
1271 let key = "index:123";
1272 assert_eq!(get_index_key(key).unwrap(), "123");
1273 }
1274
1275 #[rstest]
1276 fn test_get_index_key_invalid() {
1277 let key = "no_delimiter";
1278 assert!(get_index_key(key).is_err());
1279 }
1280}