1use std::{collections::VecDeque, fmt::Debug, time::Duration};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use nautilus_common::{
21 cache::{
22 CacheConfig,
23 database::{CacheDatabaseAdapter, CacheMap},
24 },
25 custom::CustomData,
26 enums::SerializationEncoding,
27 logging::{log_task_awaiting, log_task_started, log_task_stopped},
28 runtime::get_runtime,
29 signal::Signal,
30};
31use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
32use nautilus_cryptography::providers::install_cryptographic_provider;
33use nautilus_model::{
34 accounts::AccountAny,
35 data::{Bar, DataType, QuoteTick, TradeTick},
36 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
37 identifiers::{
38 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
39 TraderId, VenueOrderId,
40 },
41 instruments::{InstrumentAny, SyntheticInstrument},
42 orderbook::OrderBook,
43 orders::OrderAny,
44 position::Position,
45 types::Currency,
46};
47use redis::{Pipeline, aio::ConnectionManager};
48use tokio::try_join;
49use ustr::Ustr;
50
51use super::{REDIS_DELIMITER, REDIS_FLUSHDB};
52use crate::redis::{create_redis_connection, queries::DatabaseQueries};
53
54const CACHE_READ: &str = "cache-read";
56const CACHE_WRITE: &str = "cache-write";
57const CACHE_PROCESS: &str = "cache-process";
58
59const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
61
62const INDEX: &str = "index";
64const GENERAL: &str = "general";
65const CURRENCIES: &str = "currencies";
66const INSTRUMENTS: &str = "instruments";
67const SYNTHETICS: &str = "synthetics";
68const ACCOUNTS: &str = "accounts";
69const ORDERS: &str = "orders";
70const POSITIONS: &str = "positions";
71const ACTORS: &str = "actors";
72const STRATEGIES: &str = "strategies";
73const SNAPSHOTS: &str = "snapshots";
74const HEALTH: &str = "health";
75
76const INDEX_ORDER_IDS: &str = "index:order_ids";
78const INDEX_ORDER_POSITION: &str = "index:order_position";
79const INDEX_ORDER_CLIENT: &str = "index:order_client";
80const INDEX_ORDERS: &str = "index:orders";
81const INDEX_ORDERS_OPEN: &str = "index:orders_open";
82const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
83const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
84const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
85const INDEX_POSITIONS: &str = "index:positions";
86const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
87const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
88
89#[derive(Clone, Debug)]
91pub enum DatabaseOperation {
92 Insert,
93 Update,
94 Delete,
95 Close,
96}
97
98#[derive(Clone, Debug)]
100pub struct DatabaseCommand {
101 pub op_type: DatabaseOperation,
103 pub key: Option<String>,
105 pub payload: Option<Vec<Bytes>>,
107}
108
109impl DatabaseCommand {
110 #[must_use]
112 pub const fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
113 Self {
114 op_type,
115 key: Some(key),
116 payload,
117 }
118 }
119
120 #[must_use]
122 pub const fn close() -> Self {
123 Self {
124 op_type: DatabaseOperation::Close,
125 key: None,
126 payload: None,
127 }
128 }
129}
130
131#[cfg_attr(
132 feature = "python",
133 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
134)]
135pub struct RedisCacheDatabase {
136 pub con: ConnectionManager,
137 pub trader_id: TraderId,
138 pub trader_key: String,
139 pub encoding: SerializationEncoding,
140 tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
141 handle: tokio::task::JoinHandle<()>,
142}
143
144impl Debug for RedisCacheDatabase {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct(stringify!(RedisCacheDatabase))
147 .field("trader_id", &self.trader_id)
148 .field("encoding", &self.encoding)
149 .finish()
150 }
151}
152
153impl RedisCacheDatabase {
154 pub async fn new(
163 trader_id: TraderId,
164 instance_id: UUID4,
165 config: CacheConfig,
166 ) -> anyhow::Result<Self> {
167 install_cryptographic_provider();
168
169 let db_config = config
170 .database
171 .as_ref()
172 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
173 let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
174
175 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
176 let trader_key = get_trader_key(trader_id, instance_id, &config);
177 let trader_key_clone = trader_key.clone();
178 let encoding = config.encoding;
179 let handle = get_runtime().spawn(async move {
180 if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
181 log::error!("Error in task '{CACHE_PROCESS}': {e}");
182 }
183 });
184
185 Ok(Self {
186 con,
187 trader_id,
188 trader_key,
189 encoding,
190 tx,
191 handle,
192 })
193 }
194
195 #[must_use]
196 pub const fn get_encoding(&self) -> SerializationEncoding {
197 self.encoding
198 }
199
200 #[must_use]
201 pub fn get_trader_key(&self) -> &str {
202 &self.trader_key
203 }
204
205 pub fn close(&mut self) {
206 log::debug!("Closing");
207
208 if let Err(e) = self.tx.send(DatabaseCommand::close()) {
209 log::debug!("Error sending close command: {e:?}");
210 }
211
212 log_task_awaiting(CACHE_PROCESS);
213
214 tokio::task::block_in_place(|| {
215 if let Err(e) = get_runtime().block_on(&mut self.handle) {
216 log::error!("Error awaiting task '{CACHE_PROCESS}': {e:?}");
217 }
218 });
219
220 log::debug!("Closed");
221 }
222
223 pub async fn flushdb(&mut self) {
224 if let Err(e) = redis::cmd(REDIS_FLUSHDB)
225 .query_async::<()>(&mut self.con)
226 .await
227 {
228 log::error!("Failed to flush database: {e:?}");
229 }
230 }
231
232 pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
238 let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
239 DatabaseQueries::scan_keys(&mut self.con, pattern).await
240 }
241
242 pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
248 DatabaseQueries::read(&self.con, &self.trader_key, key).await
249 }
250
251 pub async fn read_bulk(&mut self, keys: &[String]) -> anyhow::Result<Vec<Option<Bytes>>> {
257 DatabaseQueries::read_bulk(&self.con, keys).await
258 }
259
260 pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
266 let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
267 match self.tx.send(op) {
268 Ok(()) => Ok(()),
269 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
270 }
271 }
272
273 pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
279 let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
280 match self.tx.send(op) {
281 Ok(()) => Ok(()),
282 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
283 }
284 }
285
286 pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
292 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
293 match self.tx.send(op) {
294 Ok(()) => Ok(()),
295 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
296 }
297 }
298
299 pub fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
305 let order_id_bytes = Bytes::from(client_order_id.to_string());
306
307 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
309 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
310 self.tx
311 .send(op)
312 .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
313
314 let index_keys = [
316 INDEX_ORDER_IDS,
317 INDEX_ORDERS,
318 INDEX_ORDERS_OPEN,
319 INDEX_ORDERS_CLOSED,
320 INDEX_ORDERS_EMULATED,
321 INDEX_ORDERS_INFLIGHT,
322 ];
323
324 for index_key in &index_keys {
325 let key = (*index_key).to_string();
326 let payload = vec![order_id_bytes.clone()];
327 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
328 self.tx
329 .send(op)
330 .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
331 }
332
333 let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
335 for index_key in &hash_indexes {
336 let key = (*index_key).to_string();
337 let payload = vec![order_id_bytes.clone()];
338 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
339 self.tx.send(op).map_err(|e| {
340 anyhow::anyhow!("Failed to send delete order hash index command: {e}")
341 })?;
342 }
343
344 Ok(())
345 }
346
347 pub fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
353 let position_id_bytes = Bytes::from(position_id.to_string());
354
355 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
357 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
358 self.tx
359 .send(op)
360 .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
361
362 let index_keys = [
364 INDEX_POSITIONS,
365 INDEX_POSITIONS_OPEN,
366 INDEX_POSITIONS_CLOSED,
367 ];
368
369 for index_key in &index_keys {
370 let key = (*index_key).to_string();
371 let payload = vec![position_id_bytes.clone()];
372 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
373 self.tx.send(op).map_err(|e| {
374 anyhow::anyhow!("Failed to send delete position index command: {e}")
375 })?;
376 }
377
378 Ok(())
379 }
380
381 pub fn delete_account_event(
387 &self,
388 _account_id: &AccountId,
389 _event_id: &str,
390 ) -> anyhow::Result<()> {
391 tracing::warn!("Deleting account events currently a no-op (pending redesign)");
392 Ok(())
393 }
394}
395
396async fn process_commands(
397 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
398 trader_key: String,
399 config: CacheConfig,
400) -> anyhow::Result<()> {
401 log_task_started(CACHE_PROCESS);
402
403 let db_config = config
404 .database
405 .as_ref()
406 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
407 let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
408
409 let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
411 let mut last_drain = std::time::Instant::now();
412 let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
413
414 loop {
416 if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
417 drain_buffer(&mut con, &trader_key, &mut buffer).await;
418 last_drain = std::time::Instant::now();
419 } else if let Some(cmd) = rx.recv().await {
420 tracing::trace!("Received {cmd:?}");
421
422 if matches!(cmd.op_type, DatabaseOperation::Close) {
423 break;
424 }
425 buffer.push_back(cmd);
426 } else {
427 tracing::debug!("Command channel closed");
428 break;
429 }
430 }
431
432 if !buffer.is_empty() {
434 drain_buffer(&mut con, &trader_key, &mut buffer).await;
435 }
436
437 log_task_stopped(CACHE_PROCESS);
438 Ok(())
439}
440
441async fn drain_buffer(
442 conn: &mut ConnectionManager,
443 trader_key: &str,
444 buffer: &mut VecDeque<DatabaseCommand>,
445) {
446 let mut pipe = redis::pipe();
447 pipe.atomic();
448
449 for msg in buffer.drain(..) {
450 let key = if let Some(key) = msg.key {
451 key
452 } else {
453 log::error!("Null key found for message: {msg:?}");
454 continue;
455 };
456 let collection = match get_collection_key(&key) {
457 Ok(collection) => collection,
458 Err(e) => {
459 tracing::error!("{e}");
460 continue; }
462 };
463
464 let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
465
466 match msg.op_type {
467 DatabaseOperation::Insert => {
468 if let Some(payload) = msg.payload {
469 log::debug!("Processing INSERT for collection: {collection}, key: {key}");
470 if let Err(e) = insert(&mut pipe, collection, &key, payload) {
471 tracing::error!("{e}");
472 }
473 } else {
474 tracing::error!("Null `payload` for `insert`");
475 }
476 }
477 DatabaseOperation::Update => {
478 if let Some(payload) = msg.payload {
479 log::debug!("Processing UPDATE for collection: {collection}, key: {key}");
480 if let Err(e) = update(&mut pipe, collection, &key, payload) {
481 tracing::error!("{e}");
482 }
483 } else {
484 tracing::error!("Null `payload` for `update`");
485 }
486 }
487 DatabaseOperation::Delete => {
488 tracing::debug!(
489 "Processing DELETE for collection: {}, key: {}, payload: {:?}",
490 collection,
491 key,
492 msg.payload.as_ref().map(std::vec::Vec::len)
493 );
494 if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
496 tracing::error!("{e}");
497 }
498 }
499 DatabaseOperation::Close => panic!("Close command should not be drained"),
500 }
501 }
502
503 if let Err(e) = pipe.query_async::<()>(conn).await {
504 tracing::error!("{e}");
505 }
506}
507
508fn insert(
509 pipe: &mut Pipeline,
510 collection: &str,
511 key: &str,
512 value: Vec<Bytes>,
513) -> anyhow::Result<()> {
514 check_slice_not_empty(value.as_slice(), stringify!(value))?;
515
516 match collection {
517 INDEX => insert_index(pipe, key, &value),
518 GENERAL => {
519 insert_string(pipe, key, value[0].as_ref());
520 Ok(())
521 }
522 CURRENCIES => {
523 insert_string(pipe, key, value[0].as_ref());
524 Ok(())
525 }
526 INSTRUMENTS => {
527 insert_string(pipe, key, value[0].as_ref());
528 Ok(())
529 }
530 SYNTHETICS => {
531 insert_string(pipe, key, value[0].as_ref());
532 Ok(())
533 }
534 ACCOUNTS => {
535 insert_list(pipe, key, value[0].as_ref());
536 Ok(())
537 }
538 ORDERS => {
539 insert_list(pipe, key, value[0].as_ref());
540 Ok(())
541 }
542 POSITIONS => {
543 insert_list(pipe, key, value[0].as_ref());
544 Ok(())
545 }
546 ACTORS => {
547 insert_string(pipe, key, value[0].as_ref());
548 Ok(())
549 }
550 STRATEGIES => {
551 insert_string(pipe, key, value[0].as_ref());
552 Ok(())
553 }
554 SNAPSHOTS => {
555 insert_list(pipe, key, value[0].as_ref());
556 Ok(())
557 }
558 HEALTH => {
559 insert_string(pipe, key, value[0].as_ref());
560 Ok(())
561 }
562 _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
563 }
564}
565
566fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
567 let index_key = get_index_key(key)?;
568 match index_key {
569 INDEX_ORDER_IDS => {
570 insert_set(pipe, key, value[0].as_ref());
571 Ok(())
572 }
573 INDEX_ORDER_POSITION => {
574 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
575 Ok(())
576 }
577 INDEX_ORDER_CLIENT => {
578 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
579 Ok(())
580 }
581 INDEX_ORDERS => {
582 insert_set(pipe, key, value[0].as_ref());
583 Ok(())
584 }
585 INDEX_ORDERS_OPEN => {
586 insert_set(pipe, key, value[0].as_ref());
587 Ok(())
588 }
589 INDEX_ORDERS_CLOSED => {
590 insert_set(pipe, key, value[0].as_ref());
591 Ok(())
592 }
593 INDEX_ORDERS_EMULATED => {
594 insert_set(pipe, key, value[0].as_ref());
595 Ok(())
596 }
597 INDEX_ORDERS_INFLIGHT => {
598 insert_set(pipe, key, value[0].as_ref());
599 Ok(())
600 }
601 INDEX_POSITIONS => {
602 insert_set(pipe, key, value[0].as_ref());
603 Ok(())
604 }
605 INDEX_POSITIONS_OPEN => {
606 insert_set(pipe, key, value[0].as_ref());
607 Ok(())
608 }
609 INDEX_POSITIONS_CLOSED => {
610 insert_set(pipe, key, value[0].as_ref());
611 Ok(())
612 }
613 _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
614 }
615}
616
617fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
618 pipe.set(key, value);
619}
620
621fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
622 pipe.sadd(key, value);
623}
624
625fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
626 pipe.hset(key, name, value);
627}
628
629fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
630 pipe.rpush(key, value);
631}
632
633fn update(
634 pipe: &mut Pipeline,
635 collection: &str,
636 key: &str,
637 value: Vec<Bytes>,
638) -> anyhow::Result<()> {
639 check_slice_not_empty(value.as_slice(), stringify!(value))?;
640
641 match collection {
642 ACCOUNTS => {
643 update_list(pipe, key, value[0].as_ref());
644 Ok(())
645 }
646 ORDERS => {
647 update_list(pipe, key, value[0].as_ref());
648 Ok(())
649 }
650 POSITIONS => {
651 update_list(pipe, key, value[0].as_ref());
652 Ok(())
653 }
654 _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
655 }
656}
657
658fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
659 pipe.rpush_exists(key, value);
660}
661
662fn delete(
663 pipe: &mut Pipeline,
664 collection: &str,
665 key: &str,
666 value: Option<Vec<Bytes>>,
667) -> anyhow::Result<()> {
668 tracing::debug!(
669 "delete: collection={}, key={}, has_payload={}",
670 collection,
671 key,
672 value.is_some()
673 );
674
675 match collection {
676 INDEX => delete_from_index(pipe, key, value),
677 ORDERS => {
678 delete_string(pipe, key);
679 Ok(())
680 }
681 POSITIONS => {
682 delete_string(pipe, key);
683 Ok(())
684 }
685 ACCOUNTS => {
686 delete_string(pipe, key);
687 Ok(())
688 }
689 ACTORS => {
690 delete_string(pipe, key);
691 Ok(())
692 }
693 STRATEGIES => {
694 delete_string(pipe, key);
695 Ok(())
696 }
697 _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
698 }
699}
700
701fn delete_from_index(
702 pipe: &mut Pipeline,
703 key: &str,
704 value: Option<Vec<Bytes>>,
705) -> anyhow::Result<()> {
706 let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
707 let index_key = get_index_key(key)?;
708
709 match index_key {
710 INDEX_ORDER_IDS => {
711 remove_from_set(pipe, key, value[0].as_ref());
712 Ok(())
713 }
714 INDEX_ORDER_POSITION => {
715 remove_from_hash(pipe, key, value[0].as_ref());
716 Ok(())
717 }
718 INDEX_ORDER_CLIENT => {
719 remove_from_hash(pipe, key, value[0].as_ref());
720 Ok(())
721 }
722 INDEX_ORDERS => {
723 remove_from_set(pipe, key, value[0].as_ref());
724 Ok(())
725 }
726 INDEX_ORDERS_OPEN => {
727 remove_from_set(pipe, key, value[0].as_ref());
728 Ok(())
729 }
730 INDEX_ORDERS_CLOSED => {
731 remove_from_set(pipe, key, value[0].as_ref());
732 Ok(())
733 }
734 INDEX_ORDERS_EMULATED => {
735 remove_from_set(pipe, key, value[0].as_ref());
736 Ok(())
737 }
738 INDEX_ORDERS_INFLIGHT => {
739 remove_from_set(pipe, key, value[0].as_ref());
740 Ok(())
741 }
742 INDEX_POSITIONS => {
743 remove_from_set(pipe, key, value[0].as_ref());
744 Ok(())
745 }
746 INDEX_POSITIONS_OPEN => {
747 remove_from_set(pipe, key, value[0].as_ref());
748 Ok(())
749 }
750 INDEX_POSITIONS_CLOSED => {
751 remove_from_set(pipe, key, value[0].as_ref());
752 Ok(())
753 }
754 _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
755 }
756}
757
758fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
759 pipe.srem(key, member);
760}
761
762fn remove_from_hash(pipe: &mut Pipeline, key: &str, field: &[u8]) {
763 pipe.hdel(key, field);
764}
765
766fn delete_string(pipe: &mut Pipeline, key: &str) {
767 pipe.del(key);
768}
769
770fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
771 let mut key = String::new();
772
773 if config.use_trader_prefix {
774 key.push_str("trader-");
775 }
776
777 key.push_str(trader_id.as_str());
778
779 if config.use_instance_id {
780 key.push(REDIS_DELIMITER);
781 key.push_str(&format!("{instance_id}"));
782 }
783
784 key
785}
786
787fn get_collection_key(key: &str) -> anyhow::Result<&str> {
788 key.split_once(REDIS_DELIMITER)
789 .map(|(collection, _)| collection)
790 .ok_or_else(|| {
791 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
792 })
793}
794
795fn get_index_key(key: &str) -> anyhow::Result<&str> {
796 key.split_once(REDIS_DELIMITER)
797 .map(|(_, index_key)| index_key)
798 .ok_or_else(|| {
799 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
800 })
801}
802
803#[allow(dead_code)] #[derive(Debug)]
805pub struct RedisCacheDatabaseAdapter {
806 pub encoding: SerializationEncoding,
807 pub database: RedisCacheDatabase,
808}
809
810#[allow(dead_code)] #[allow(unused)] #[async_trait::async_trait]
813impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
814 fn close(&mut self) -> anyhow::Result<()> {
815 self.database.close();
816 Ok(())
817 }
818
819 fn flush(&mut self) -> anyhow::Result<()> {
820 self.database.flushdb();
821 Ok(())
822 }
823
824 async fn load_all(&self) -> anyhow::Result<CacheMap> {
825 tracing::debug!("Loading all data");
826
827 let (
828 currencies,
829 instruments,
830 synthetics,
831 accounts,
832 orders,
833 positions,
834 greeks,
835 yield_curves,
836 ) = try_join!(
837 self.load_currencies(),
838 self.load_instruments(),
839 self.load_synthetics(),
840 self.load_accounts(),
841 self.load_orders(),
842 self.load_positions(),
843 self.load_greeks(),
844 self.load_yield_curves()
845 )
846 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
847
848 Ok(CacheMap {
849 currencies,
850 instruments,
851 synthetics,
852 accounts,
853 orders,
854 positions,
855 greeks,
856 yield_curves,
857 })
858 }
859
860 fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
861 Ok(AHashMap::new()) }
864
865 async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
866 DatabaseQueries::load_currencies(
867 &self.database.con,
868 &self.database.trader_key,
869 self.encoding,
870 )
871 .await
872 }
873
874 async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
875 DatabaseQueries::load_instruments(
876 &self.database.con,
877 &self.database.trader_key,
878 self.encoding,
879 )
880 .await
881 }
882
883 async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
884 DatabaseQueries::load_synthetics(
885 &self.database.con,
886 &self.database.trader_key,
887 self.encoding,
888 )
889 .await
890 }
891
892 async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
893 DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
894 .await
895 }
896
897 async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
898 DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
899 .await
900 }
901
902 async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
903 DatabaseQueries::load_positions(
904 &self.database.con,
905 &self.database.trader_key,
906 self.encoding,
907 )
908 .await
909 }
910
911 fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
912 todo!()
913 }
914
915 fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
916 todo!()
917 }
918
919 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
920 DatabaseQueries::load_currency(
921 &self.database.con,
922 &self.database.trader_key,
923 code,
924 self.encoding,
925 )
926 .await
927 }
928
929 async fn load_instrument(
930 &self,
931 instrument_id: &InstrumentId,
932 ) -> anyhow::Result<Option<InstrumentAny>> {
933 DatabaseQueries::load_instrument(
934 &self.database.con,
935 &self.database.trader_key,
936 instrument_id,
937 self.encoding,
938 )
939 .await
940 }
941
942 async fn load_synthetic(
943 &self,
944 instrument_id: &InstrumentId,
945 ) -> anyhow::Result<Option<SyntheticInstrument>> {
946 DatabaseQueries::load_synthetic(
947 &self.database.con,
948 &self.database.trader_key,
949 instrument_id,
950 self.encoding,
951 )
952 .await
953 }
954
955 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
956 DatabaseQueries::load_account(
957 &self.database.con,
958 &self.database.trader_key,
959 account_id,
960 self.encoding,
961 )
962 .await
963 }
964
965 async fn load_order(
966 &self,
967 client_order_id: &ClientOrderId,
968 ) -> anyhow::Result<Option<OrderAny>> {
969 DatabaseQueries::load_order(
970 &self.database.con,
971 &self.database.trader_key,
972 client_order_id,
973 self.encoding,
974 )
975 .await
976 }
977
978 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
979 DatabaseQueries::load_position(
980 &self.database.con,
981 &self.database.trader_key,
982 position_id,
983 self.encoding,
984 )
985 .await
986 }
987
988 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
989 todo!()
990 }
991
992 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
993 todo!()
994 }
995
996 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
997 todo!()
998 }
999
1000 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
1001 todo!()
1002 }
1003
1004 fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
1005 let order_id_bytes = Bytes::from(client_order_id.to_string());
1006
1007 log::debug!("Deleting order: {client_order_id} from Redis");
1008 log::debug!("Trader key: {}", self.database.trader_key);
1009
1010 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
1012 log::debug!("Deleting order key: {key}");
1013 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1014 self.database
1015 .tx
1016 .send(op)
1017 .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
1018
1019 let index_keys = [
1021 INDEX_ORDER_IDS,
1022 INDEX_ORDERS,
1023 INDEX_ORDERS_OPEN,
1024 INDEX_ORDERS_CLOSED,
1025 INDEX_ORDERS_EMULATED,
1026 INDEX_ORDERS_INFLIGHT,
1027 ];
1028
1029 for index_key in &index_keys {
1030 let key = (*index_key).to_string();
1031 log::debug!("Deleting from index: {key} (order_id: {client_order_id})");
1032 let payload = vec![order_id_bytes.clone()];
1033 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1034 self.database
1035 .tx
1036 .send(op)
1037 .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
1038 }
1039
1040 let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
1042 for index_key in &hash_indexes {
1043 let key = (*index_key).to_string();
1044 log::debug!("Deleting from hash index: {key} (order_id: {client_order_id})");
1045 let payload = vec![order_id_bytes.clone()];
1046 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1047 self.database.tx.send(op).map_err(|e| {
1048 anyhow::anyhow!("Failed to send delete order hash index command: {e}")
1049 })?;
1050 }
1051
1052 log::debug!("Sent all delete commands for order: {client_order_id}");
1053 Ok(())
1054 }
1055
1056 fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
1057 let position_id_bytes = Bytes::from(position_id.to_string());
1058
1059 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
1061 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1062 self.database
1063 .tx
1064 .send(op)
1065 .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
1066
1067 let index_keys = [
1069 INDEX_POSITIONS,
1070 INDEX_POSITIONS_OPEN,
1071 INDEX_POSITIONS_CLOSED,
1072 ];
1073
1074 for index_key in &index_keys {
1075 let key = (*index_key).to_string();
1076 let payload = vec![position_id_bytes.clone()];
1077 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1078 self.database.tx.send(op).map_err(|e| {
1079 anyhow::anyhow!("Failed to send delete position index command: {e}")
1080 })?;
1081 }
1082
1083 Ok(())
1084 }
1085
1086 fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
1087 todo!()
1088 }
1089
1090 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
1091 todo!()
1092 }
1093
1094 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
1095 todo!()
1096 }
1097
1098 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
1099 todo!()
1100 }
1101
1102 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
1103 todo!()
1104 }
1105
1106 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1107 todo!()
1108 }
1109
1110 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
1111 todo!()
1112 }
1113
1114 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
1115 todo!()
1116 }
1117
1118 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
1119 todo!()
1120 }
1121
1122 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
1123 todo!()
1124 }
1125
1126 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
1127 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1128 }
1129
1130 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
1131 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1132 }
1133
1134 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
1135 anyhow::bail!("Loading quote data for Redis cache adapter not supported")
1136 }
1137
1138 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
1139 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1140 }
1141
1142 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
1143 anyhow::bail!("Loading market data for Redis cache adapter not supported")
1144 }
1145
1146 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
1147 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1148 }
1149
1150 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
1151 anyhow::bail!("Loading market data for Redis cache adapter not supported")
1152 }
1153
1154 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
1155 anyhow::bail!("Saving signals for Redis cache adapter not supported")
1156 }
1157
1158 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
1159 anyhow::bail!("Loading signals from Redis cache adapter not supported")
1160 }
1161
1162 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
1163 anyhow::bail!("Saving custom data for Redis cache adapter not supported")
1164 }
1165
1166 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
1167 anyhow::bail!("Loading custom data from Redis cache adapter not supported")
1168 }
1169
1170 fn load_order_snapshot(
1171 &self,
1172 client_order_id: &ClientOrderId,
1173 ) -> anyhow::Result<Option<OrderSnapshot>> {
1174 anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
1175 }
1176
1177 fn load_position_snapshot(
1178 &self,
1179 position_id: &PositionId,
1180 ) -> anyhow::Result<Option<PositionSnapshot>> {
1181 anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
1182 }
1183
1184 fn index_venue_order_id(
1185 &self,
1186 client_order_id: ClientOrderId,
1187 venue_order_id: VenueOrderId,
1188 ) -> anyhow::Result<()> {
1189 todo!()
1190 }
1191
1192 fn index_order_position(
1193 &self,
1194 client_order_id: ClientOrderId,
1195 position_id: PositionId,
1196 ) -> anyhow::Result<()> {
1197 todo!()
1198 }
1199
1200 fn update_actor(&self) -> anyhow::Result<()> {
1201 todo!()
1202 }
1203
1204 fn update_strategy(&self) -> anyhow::Result<()> {
1205 todo!()
1206 }
1207
1208 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1209 todo!()
1210 }
1211
1212 fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
1213 todo!()
1214 }
1215
1216 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
1217 todo!()
1218 }
1219
1220 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1221 todo!()
1222 }
1223
1224 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
1225 todo!()
1226 }
1227
1228 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
1229 todo!()
1230 }
1231}
1232
1233#[cfg(test)]
1237mod tests {
1238 use rstest::rstest;
1239
1240 use super::*;
1241
1242 #[rstest]
1243 fn test_get_trader_key_with_prefix_and_instance_id() {
1244 let trader_id = TraderId::from("tester-123");
1245 let instance_id = UUID4::new();
1246 let config = CacheConfig {
1247 use_instance_id: true,
1248 ..Default::default()
1249 };
1250
1251 let key = get_trader_key(trader_id, instance_id, &config);
1252 assert!(key.starts_with("trader-tester-123:"));
1253 assert!(key.ends_with(&instance_id.to_string()));
1254 }
1255
1256 #[rstest]
1257 fn test_get_collection_key_valid() {
1258 let key = "collection:123";
1259 assert_eq!(get_collection_key(key).unwrap(), "collection");
1260 }
1261
1262 #[rstest]
1263 fn test_get_collection_key_invalid() {
1264 let key = "no_delimiter";
1265 assert!(get_collection_key(key).is_err());
1266 }
1267
1268 #[rstest]
1269 fn test_get_index_key_valid() {
1270 let key = "index:123";
1271 assert_eq!(get_index_key(key).unwrap(), "123");
1272 }
1273
1274 #[rstest]
1275 fn test_get_index_key_invalid() {
1276 let key = "no_delimiter";
1277 assert!(get_index_key(key).is_err());
1278 }
1279}