1use std::{
17 collections::{HashMap, VecDeque},
18 fmt::Debug,
19 time::{Duration, Instant},
20};
21
22use bytes::Bytes;
23use nautilus_common::{
24 cache::{
25 CacheConfig,
26 database::{CacheDatabaseAdapter, CacheMap},
27 },
28 custom::CustomData,
29 enums::SerializationEncoding,
30 logging::{log_task_awaiting, log_task_started, log_task_stopped},
31 runtime::get_runtime,
32 signal::Signal,
33};
34use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
35use nautilus_cryptography::providers::install_cryptographic_provider;
36use nautilus_model::{
37 accounts::AccountAny,
38 data::{Bar, DataType, QuoteTick, TradeTick},
39 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
40 identifiers::{
41 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
42 TraderId, VenueOrderId,
43 },
44 instruments::{InstrumentAny, SyntheticInstrument},
45 orderbook::OrderBook,
46 orders::OrderAny,
47 position::Position,
48 types::Currency,
49};
50use redis::{Pipeline, aio::ConnectionManager};
51use tokio::try_join;
52use ustr::Ustr;
53
54use super::{REDIS_DELIMITER, REDIS_FLUSHDB};
55use crate::redis::{create_redis_connection, queries::DatabaseQueries};
56
57const CACHE_READ: &str = "cache-read";
59const CACHE_WRITE: &str = "cache-write";
60const CACHE_PROCESS: &str = "cache-process";
61
62const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
64
65const INDEX: &str = "index";
67const GENERAL: &str = "general";
68const CURRENCIES: &str = "currencies";
69const INSTRUMENTS: &str = "instruments";
70const SYNTHETICS: &str = "synthetics";
71const ACCOUNTS: &str = "accounts";
72const ORDERS: &str = "orders";
73const POSITIONS: &str = "positions";
74const ACTORS: &str = "actors";
75const STRATEGIES: &str = "strategies";
76const SNAPSHOTS: &str = "snapshots";
77const HEALTH: &str = "health";
78
79const INDEX_ORDER_IDS: &str = "index:order_ids";
81const INDEX_ORDER_POSITION: &str = "index:order_position";
82const INDEX_ORDER_CLIENT: &str = "index:order_client";
83const INDEX_ORDERS: &str = "index:orders";
84const INDEX_ORDERS_OPEN: &str = "index:orders_open";
85const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
86const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
87const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
88const INDEX_POSITIONS: &str = "index:positions";
89const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
90const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
91
92#[derive(Clone, Debug)]
94pub enum DatabaseOperation {
95 Insert,
96 Update,
97 Delete,
98 DeleteFromList,
99 Close,
100}
101
102#[derive(Clone, Debug)]
104pub struct DatabaseCommand {
105 pub op_type: DatabaseOperation,
107 pub key: Option<String>,
109 pub payload: Option<Vec<Bytes>>,
111}
112
113impl DatabaseCommand {
114 #[must_use]
116 pub const fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
117 Self {
118 op_type,
119 key: Some(key),
120 payload,
121 }
122 }
123
124 #[must_use]
126 pub const fn close() -> Self {
127 Self {
128 op_type: DatabaseOperation::Close,
129 key: None,
130 payload: None,
131 }
132 }
133}
134
135#[cfg_attr(
136 feature = "python",
137 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
138)]
139pub struct RedisCacheDatabase {
140 pub con: ConnectionManager,
141 pub trader_id: TraderId,
142 pub trader_key: String,
143 pub encoding: SerializationEncoding,
144 tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
145 handle: tokio::task::JoinHandle<()>,
146}
147
148impl Debug for RedisCacheDatabase {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct(stringify!(RedisCacheDatabase))
151 .field("trader_id", &self.trader_id)
152 .field("encoding", &self.encoding)
153 .finish()
154 }
155}
156
157impl RedisCacheDatabase {
158 pub async fn new(
167 trader_id: TraderId,
168 instance_id: UUID4,
169 config: CacheConfig,
170 ) -> anyhow::Result<Self> {
171 install_cryptographic_provider();
172
173 let db_config = config
174 .database
175 .as_ref()
176 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
177 let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
178
179 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
180 let trader_key = get_trader_key(trader_id, instance_id, &config);
181 let trader_key_clone = trader_key.clone();
182 let encoding = config.encoding;
183 let handle = get_runtime().spawn(async move {
184 if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
185 log::error!("Error in task '{CACHE_PROCESS}': {e}");
186 }
187 });
188
189 Ok(Self {
190 con,
191 trader_id,
192 trader_key,
193 encoding,
194 tx,
195 handle,
196 })
197 }
198
199 #[must_use]
200 pub const fn get_encoding(&self) -> SerializationEncoding {
201 self.encoding
202 }
203
204 #[must_use]
205 pub fn get_trader_key(&self) -> &str {
206 &self.trader_key
207 }
208
209 pub fn close(&mut self) {
210 log::debug!("Closing");
211
212 if let Err(e) = self.tx.send(DatabaseCommand::close()) {
213 log::debug!("Error sending close command: {e:?}");
214 }
215
216 log_task_awaiting(CACHE_PROCESS);
217
218 tokio::task::block_in_place(|| {
219 if let Err(e) = get_runtime().block_on(&mut self.handle) {
220 log::error!("Error awaiting task '{CACHE_PROCESS}': {e:?}");
221 }
222 });
223
224 log::debug!("Closed");
225 }
226
227 pub async fn flushdb(&mut self) {
228 if let Err(e) = redis::cmd(REDIS_FLUSHDB)
229 .query_async::<()>(&mut self.con)
230 .await
231 {
232 log::error!("Failed to flush database: {e:?}");
233 }
234 }
235
236 pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
242 let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
243 log::debug!("Querying keys: {pattern}");
244 DatabaseQueries::scan_keys(&mut self.con, pattern).await
245 }
246
247 pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
253 DatabaseQueries::read(&self.con, &self.trader_key, key).await
254 }
255
256 pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
262 let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
263 match self.tx.send(op) {
264 Ok(()) => Ok(()),
265 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
266 }
267 }
268
269 pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
275 let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
276 match self.tx.send(op) {
277 Ok(()) => Ok(()),
278 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
279 }
280 }
281
282 pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
288 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
289 match self.tx.send(op) {
290 Ok(()) => Ok(()),
291 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
292 }
293 }
294
295 pub fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
301 let order_id_bytes = Bytes::from(client_order_id.to_string());
302
303 log::debug!("Deleting order: {client_order_id} from Redis");
304 log::debug!("Trader key: {}", self.trader_key);
305
306 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
308 log::debug!("Deleting order key: {key}");
309 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
310 self.tx
311 .send(op)
312 .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
313
314 let index_keys = [
316 INDEX_ORDER_IDS,
317 INDEX_ORDERS,
318 INDEX_ORDERS_OPEN,
319 INDEX_ORDERS_CLOSED,
320 INDEX_ORDERS_EMULATED,
321 INDEX_ORDERS_INFLIGHT,
322 ];
323
324 for index_key in &index_keys {
325 let key = index_key.to_string();
326 log::debug!("Deleting from index: {key} (order_id: {client_order_id})");
327 let payload = vec![order_id_bytes.clone()];
328 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
329 self.tx
330 .send(op)
331 .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
332 }
333
334 let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
336 for index_key in &hash_indexes {
337 let key = index_key.to_string();
338 log::debug!("Deleting from hash index: {key} (order_id: {client_order_id})");
339 let payload = vec![order_id_bytes.clone()];
340 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
341 self.tx.send(op).map_err(|e| {
342 anyhow::anyhow!("Failed to send delete order hash index command: {e}")
343 })?;
344 }
345
346 log::debug!("Sent all delete commands for order: {client_order_id}");
347 Ok(())
348 }
349
350 pub fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
356 let position_id_bytes = Bytes::from(position_id.to_string());
357
358 log::debug!("Deleting position: {position_id} from Redis");
359 log::debug!("Trader key: {}", self.trader_key);
360
361 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
363 log::debug!("Deleting position key: {key}");
364 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
365 self.tx
366 .send(op)
367 .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
368
369 let index_keys = [
371 INDEX_POSITIONS,
372 INDEX_POSITIONS_OPEN,
373 INDEX_POSITIONS_CLOSED,
374 ];
375
376 for index_key in &index_keys {
377 let key = index_key.to_string();
378 log::debug!("Deleting from index: {key} (position_id: {position_id})");
379 let payload = vec![position_id_bytes.clone()];
380 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
381 self.tx.send(op).map_err(|e| {
382 anyhow::anyhow!("Failed to send delete position index command: {e}")
383 })?;
384 }
385
386 log::debug!("Sent all delete commands for position: {position_id}");
387 Ok(())
388 }
389
390 pub fn delete_account_event(
396 &self,
397 account_id: &AccountId,
398 event_id: &str,
399 ) -> anyhow::Result<()> {
400 log::debug!("Deleting account event: {account_id}:{event_id}");
401 log::debug!("Trader key: {}", self.trader_key);
402
403 let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
404 let payload = vec![Bytes::from(event_id.to_string())];
405 let op = DatabaseCommand::new(DatabaseOperation::DeleteFromList, key, Some(payload));
406 self.tx
407 .send(op)
408 .map_err(|e| anyhow::anyhow!("Failed to send delete account event command: {e}"))
409 }
410}
411
412async fn process_commands(
413 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
414 trader_key: String,
415 config: CacheConfig,
416) -> anyhow::Result<()> {
417 log_task_started(CACHE_PROCESS);
418
419 let db_config = config
420 .database
421 .as_ref()
422 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
423 let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
424
425 let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
427 let mut last_drain = Instant::now();
428 let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
429
430 loop {
432 if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
433 drain_buffer(&mut con, &trader_key, &mut buffer).await;
434 last_drain = Instant::now();
435 } else if let Some(cmd) = rx.recv().await {
436 tracing::trace!("Received {cmd:?}");
437
438 if matches!(cmd.op_type, DatabaseOperation::Close) {
439 break;
440 }
441 buffer.push_back(cmd);
442 } else {
443 tracing::debug!("Command channel closed");
444 break;
445 }
446 }
447
448 if !buffer.is_empty() {
450 drain_buffer(&mut con, &trader_key, &mut buffer).await;
451 }
452
453 log_task_stopped(CACHE_PROCESS);
454 Ok(())
455}
456
457async fn drain_buffer(
458 conn: &mut ConnectionManager,
459 trader_key: &str,
460 buffer: &mut VecDeque<DatabaseCommand>,
461) {
462 let mut pipe = redis::pipe();
463 pipe.atomic();
464
465 for msg in buffer.drain(..) {
466 let key = if let Some(key) = msg.key {
467 key
468 } else {
469 log::error!("Null key found for message: {msg:?}");
470 continue;
471 };
472 let collection = match get_collection_key(&key) {
473 Ok(collection) => collection,
474 Err(e) => {
475 tracing::error!("{e}");
476 continue; }
478 };
479
480 let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
481
482 match msg.op_type {
483 DatabaseOperation::Insert => {
484 if let Some(payload) = msg.payload {
485 log::debug!("Processing INSERT for collection: {collection}, key: {key}");
486 if let Err(e) = insert(&mut pipe, collection, &key, payload) {
487 tracing::error!("{e}");
488 }
489 } else {
490 tracing::error!("Null `payload` for `insert`");
491 }
492 }
493 DatabaseOperation::Update => {
494 if let Some(payload) = msg.payload {
495 log::debug!("Processing UPDATE for collection: {collection}, key: {key}");
496 if let Err(e) = update(&mut pipe, collection, &key, payload) {
497 tracing::error!("{e}");
498 }
499 } else {
500 tracing::error!("Null `payload` for `update`");
501 }
502 }
503 DatabaseOperation::Delete => {
504 tracing::debug!(
505 "Processing DELETE for collection: {}, key: {}, payload: {:?}",
506 collection,
507 key,
508 msg.payload.as_ref().map(|p| p.len())
509 );
510 if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
512 tracing::error!("{e}");
513 }
514 }
515 DatabaseOperation::DeleteFromList => {
516 log::debug!("Processing DELETE_FROM_LIST for collection: {collection}, key: {key}");
517 if let Some(payload) = &msg.payload {
519 if let Err(e) = delete_from_list(&mut pipe, collection, &key, payload) {
520 tracing::error!("{e}");
521 }
522 } else {
523 tracing::error!("Null `payload` for `delete_from_list`");
524 }
525 }
526 DatabaseOperation::Close => panic!("Close command should not be drained"),
527 }
528 }
529
530 if let Err(e) = pipe.query_async::<()>(conn).await {
531 tracing::error!("{e}");
532 }
533}
534
535fn insert(
536 pipe: &mut Pipeline,
537 collection: &str,
538 key: &str,
539 value: Vec<Bytes>,
540) -> anyhow::Result<()> {
541 check_slice_not_empty(value.as_slice(), stringify!(value))?;
542
543 match collection {
544 INDEX => insert_index(pipe, key, &value),
545 GENERAL => {
546 insert_string(pipe, key, value[0].as_ref());
547 Ok(())
548 }
549 CURRENCIES => {
550 insert_string(pipe, key, value[0].as_ref());
551 Ok(())
552 }
553 INSTRUMENTS => {
554 insert_string(pipe, key, value[0].as_ref());
555 Ok(())
556 }
557 SYNTHETICS => {
558 insert_string(pipe, key, value[0].as_ref());
559 Ok(())
560 }
561 ACCOUNTS => {
562 insert_list(pipe, key, value[0].as_ref());
563 Ok(())
564 }
565 ORDERS => {
566 insert_list(pipe, key, value[0].as_ref());
567 Ok(())
568 }
569 POSITIONS => {
570 insert_list(pipe, key, value[0].as_ref());
571 Ok(())
572 }
573 ACTORS => {
574 insert_string(pipe, key, value[0].as_ref());
575 Ok(())
576 }
577 STRATEGIES => {
578 insert_string(pipe, key, value[0].as_ref());
579 Ok(())
580 }
581 SNAPSHOTS => {
582 insert_list(pipe, key, value[0].as_ref());
583 Ok(())
584 }
585 HEALTH => {
586 insert_string(pipe, key, value[0].as_ref());
587 Ok(())
588 }
589 _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
590 }
591}
592
593fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
594 let index_key = get_index_key(key)?;
595 match index_key {
596 INDEX_ORDER_IDS => {
597 insert_set(pipe, key, value[0].as_ref());
598 Ok(())
599 }
600 INDEX_ORDER_POSITION => {
601 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
602 Ok(())
603 }
604 INDEX_ORDER_CLIENT => {
605 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
606 Ok(())
607 }
608 INDEX_ORDERS => {
609 insert_set(pipe, key, value[0].as_ref());
610 Ok(())
611 }
612 INDEX_ORDERS_OPEN => {
613 insert_set(pipe, key, value[0].as_ref());
614 Ok(())
615 }
616 INDEX_ORDERS_CLOSED => {
617 insert_set(pipe, key, value[0].as_ref());
618 Ok(())
619 }
620 INDEX_ORDERS_EMULATED => {
621 insert_set(pipe, key, value[0].as_ref());
622 Ok(())
623 }
624 INDEX_ORDERS_INFLIGHT => {
625 insert_set(pipe, key, value[0].as_ref());
626 Ok(())
627 }
628 INDEX_POSITIONS => {
629 insert_set(pipe, key, value[0].as_ref());
630 Ok(())
631 }
632 INDEX_POSITIONS_OPEN => {
633 insert_set(pipe, key, value[0].as_ref());
634 Ok(())
635 }
636 INDEX_POSITIONS_CLOSED => {
637 insert_set(pipe, key, value[0].as_ref());
638 Ok(())
639 }
640 _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
641 }
642}
643
644fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
645 pipe.set(key, value);
646}
647
648fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
649 pipe.sadd(key, value);
650}
651
652fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
653 pipe.hset(key, name, value);
654}
655
656fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
657 pipe.rpush(key, value);
658}
659
660fn update(
661 pipe: &mut Pipeline,
662 collection: &str,
663 key: &str,
664 value: Vec<Bytes>,
665) -> anyhow::Result<()> {
666 check_slice_not_empty(value.as_slice(), stringify!(value))?;
667
668 match collection {
669 ACCOUNTS => {
670 update_list(pipe, key, value[0].as_ref());
671 Ok(())
672 }
673 ORDERS => {
674 update_list(pipe, key, value[0].as_ref());
675 Ok(())
676 }
677 POSITIONS => {
678 update_list(pipe, key, value[0].as_ref());
679 Ok(())
680 }
681 _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
682 }
683}
684
685fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
686 pipe.rpush_exists(key, value);
687}
688
689fn delete(
690 pipe: &mut Pipeline,
691 collection: &str,
692 key: &str,
693 value: Option<Vec<Bytes>>,
694) -> anyhow::Result<()> {
695 tracing::debug!(
696 "delete: collection={}, key={}, has_payload={}",
697 collection,
698 key,
699 value.is_some()
700 );
701
702 match collection {
703 INDEX => delete_from_index(pipe, key, value),
704 ORDERS => {
705 delete_string(pipe, key);
706 Ok(())
707 }
708 POSITIONS => {
709 delete_string(pipe, key);
710 Ok(())
711 }
712 ACCOUNTS => {
713 delete_string(pipe, key);
714 Ok(())
715 }
716 ACTORS => {
717 delete_string(pipe, key);
718 Ok(())
719 }
720 STRATEGIES => {
721 delete_string(pipe, key);
722 Ok(())
723 }
724 _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
725 }
726}
727
728fn delete_from_index(
729 pipe: &mut Pipeline,
730 key: &str,
731 value: Option<Vec<Bytes>>,
732) -> anyhow::Result<()> {
733 let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
734 let index_key = get_index_key(key)?;
735
736 match index_key {
737 INDEX_ORDER_IDS => {
738 remove_from_set(pipe, key, value[0].as_ref());
739 Ok(())
740 }
741 INDEX_ORDER_POSITION => {
742 remove_from_hash(pipe, key, value[0].as_ref());
743 Ok(())
744 }
745 INDEX_ORDER_CLIENT => {
746 remove_from_hash(pipe, key, value[0].as_ref());
747 Ok(())
748 }
749 INDEX_ORDERS => {
750 remove_from_set(pipe, key, value[0].as_ref());
751 Ok(())
752 }
753 INDEX_ORDERS_OPEN => {
754 remove_from_set(pipe, key, value[0].as_ref());
755 Ok(())
756 }
757 INDEX_ORDERS_CLOSED => {
758 remove_from_set(pipe, key, value[0].as_ref());
759 Ok(())
760 }
761 INDEX_ORDERS_EMULATED => {
762 remove_from_set(pipe, key, value[0].as_ref());
763 Ok(())
764 }
765 INDEX_ORDERS_INFLIGHT => {
766 remove_from_set(pipe, key, value[0].as_ref());
767 Ok(())
768 }
769 INDEX_POSITIONS => {
770 remove_from_set(pipe, key, value[0].as_ref());
771 Ok(())
772 }
773 INDEX_POSITIONS_OPEN => {
774 remove_from_set(pipe, key, value[0].as_ref());
775 Ok(())
776 }
777 INDEX_POSITIONS_CLOSED => {
778 remove_from_set(pipe, key, value[0].as_ref());
779 Ok(())
780 }
781 _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
782 }
783}
784
785fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
786 pipe.srem(key, member);
787}
788
789fn remove_from_hash(pipe: &mut Pipeline, key: &str, field: &[u8]) {
790 pipe.hdel(key, field);
791}
792
793fn delete_string(pipe: &mut Pipeline, key: &str) {
794 pipe.del(key);
795}
796
797fn delete_from_list(
798 pipe: &mut Pipeline,
799 collection: &str,
800 key: &str,
801 payload: &[Bytes],
802) -> anyhow::Result<()> {
803 match collection {
804 ACCOUNTS => {
805 let event_id = std::str::from_utf8(&payload[0])?;
807
808 let lua_script = r#"
813 local key = KEYS[1]
814 local event_id = ARGV[1]
815 local removed_count = 0
816
817 -- Check if the key exists first
818 if redis.call('EXISTS', key) == 0 then
819 return 0 -- Nothing to delete
820 end
821
822 local items = redis.call('LRANGE', key, 0, -1)
823 redis.call('DEL', key)
824
825 for i, item in ipairs(items) do
826 if not string.find(item, event_id, 1, true) then
827 redis.call('RPUSH', key, item)
828 else
829 removed_count = removed_count + 1
830 end
831 end
832
833 return removed_count
834 "#;
835
836 pipe.cmd("EVAL")
837 .arg(lua_script)
838 .arg(1)
839 .arg(key)
840 .arg(event_id);
841 Ok(())
842 }
843 _ => {
844 anyhow::bail!("Unsupported operation: `delete_from_list` for collection '{collection}'")
845 }
846 }
847}
848
849fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
850 let mut key = String::new();
851
852 if config.use_trader_prefix {
853 key.push_str("trader-");
854 }
855
856 key.push_str(trader_id.as_str());
857
858 if config.use_instance_id {
859 key.push(REDIS_DELIMITER);
860 key.push_str(&format!("{instance_id}"));
861 }
862
863 key
864}
865
866fn get_collection_key(key: &str) -> anyhow::Result<&str> {
867 key.split_once(REDIS_DELIMITER)
868 .map(|(collection, _)| collection)
869 .ok_or_else(|| {
870 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
871 })
872}
873
874fn get_index_key(key: &str) -> anyhow::Result<&str> {
875 key.split_once(REDIS_DELIMITER)
876 .map(|(_, index_key)| index_key)
877 .ok_or_else(|| {
878 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
879 })
880}
881
882#[allow(dead_code)] #[derive(Debug)]
884pub struct RedisCacheDatabaseAdapter {
885 pub encoding: SerializationEncoding,
886 pub database: RedisCacheDatabase,
887}
888
889#[allow(dead_code)] #[allow(unused)] #[async_trait::async_trait]
892impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
893 fn close(&mut self) -> anyhow::Result<()> {
894 self.database.close();
895 Ok(())
896 }
897
898 fn flush(&mut self) -> anyhow::Result<()> {
899 self.database.flushdb();
900 Ok(())
901 }
902
903 async fn load_all(&self) -> anyhow::Result<CacheMap> {
904 tracing::debug!("Loading all data");
905
906 let (
907 currencies,
908 instruments,
909 synthetics,
910 accounts,
911 orders,
912 positions,
913 greeks,
914 yield_curves,
915 ) = try_join!(
916 self.load_currencies(),
917 self.load_instruments(),
918 self.load_synthetics(),
919 self.load_accounts(),
920 self.load_orders(),
921 self.load_positions(),
922 self.load_greeks(),
923 self.load_yield_curves()
924 )
925 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
926
927 Ok(CacheMap {
928 currencies,
929 instruments,
930 synthetics,
931 accounts,
932 orders,
933 positions,
934 greeks,
935 yield_curves,
936 })
937 }
938
939 fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
940 Ok(HashMap::new()) }
943
944 async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>> {
945 DatabaseQueries::load_currencies(
946 &self.database.con,
947 &self.database.trader_key,
948 self.encoding,
949 )
950 .await
951 }
952
953 async fn load_instruments(&self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
954 DatabaseQueries::load_instruments(
955 &self.database.con,
956 &self.database.trader_key,
957 self.encoding,
958 )
959 .await
960 }
961
962 async fn load_synthetics(&self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
963 DatabaseQueries::load_synthetics(
964 &self.database.con,
965 &self.database.trader_key,
966 self.encoding,
967 )
968 .await
969 }
970
971 async fn load_accounts(&self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
972 DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
973 .await
974 }
975
976 async fn load_orders(&self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
977 DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
978 .await
979 }
980
981 async fn load_positions(&self) -> anyhow::Result<HashMap<PositionId, Position>> {
982 DatabaseQueries::load_positions(
983 &self.database.con,
984 &self.database.trader_key,
985 self.encoding,
986 )
987 .await
988 }
989
990 fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
991 todo!()
992 }
993
994 fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
995 todo!()
996 }
997
998 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
999 DatabaseQueries::load_currency(
1000 &self.database.con,
1001 &self.database.trader_key,
1002 code,
1003 self.encoding,
1004 )
1005 .await
1006 }
1007
1008 async fn load_instrument(
1009 &self,
1010 instrument_id: &InstrumentId,
1011 ) -> anyhow::Result<Option<InstrumentAny>> {
1012 DatabaseQueries::load_instrument(
1013 &self.database.con,
1014 &self.database.trader_key,
1015 instrument_id,
1016 self.encoding,
1017 )
1018 .await
1019 }
1020
1021 async fn load_synthetic(
1022 &self,
1023 instrument_id: &InstrumentId,
1024 ) -> anyhow::Result<Option<SyntheticInstrument>> {
1025 DatabaseQueries::load_synthetic(
1026 &self.database.con,
1027 &self.database.trader_key,
1028 instrument_id,
1029 self.encoding,
1030 )
1031 .await
1032 }
1033
1034 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
1035 DatabaseQueries::load_account(
1036 &self.database.con,
1037 &self.database.trader_key,
1038 account_id,
1039 self.encoding,
1040 )
1041 .await
1042 }
1043
1044 async fn load_order(
1045 &self,
1046 client_order_id: &ClientOrderId,
1047 ) -> anyhow::Result<Option<OrderAny>> {
1048 DatabaseQueries::load_order(
1049 &self.database.con,
1050 &self.database.trader_key,
1051 client_order_id,
1052 self.encoding,
1053 )
1054 .await
1055 }
1056
1057 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
1058 DatabaseQueries::load_position(
1059 &self.database.con,
1060 &self.database.trader_key,
1061 position_id,
1062 self.encoding,
1063 )
1064 .await
1065 }
1066
1067 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
1068 todo!()
1069 }
1070
1071 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
1072 todo!()
1073 }
1074
1075 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
1076 todo!()
1077 }
1078
1079 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
1080 todo!()
1081 }
1082
1083 fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
1084 let order_id_bytes = Bytes::from(client_order_id.to_string());
1085
1086 log::debug!("Deleting order: {client_order_id} from Redis");
1087 log::debug!("Trader key: {}", self.database.trader_key);
1088
1089 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
1091 log::debug!("Deleting order key: {key}");
1092 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1093 self.database
1094 .tx
1095 .send(op)
1096 .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
1097
1098 let index_keys = [
1100 INDEX_ORDER_IDS,
1101 INDEX_ORDERS,
1102 INDEX_ORDERS_OPEN,
1103 INDEX_ORDERS_CLOSED,
1104 INDEX_ORDERS_EMULATED,
1105 INDEX_ORDERS_INFLIGHT,
1106 ];
1107
1108 for index_key in &index_keys {
1109 let key = index_key.to_string();
1110 log::debug!("Deleting from index: {key} (order_id: {client_order_id})");
1111 let payload = vec![order_id_bytes.clone()];
1112 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1113 self.database
1114 .tx
1115 .send(op)
1116 .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
1117 }
1118
1119 let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
1121 for index_key in &hash_indexes {
1122 let key = index_key.to_string();
1123 log::debug!("Deleting from hash index: {key} (order_id: {client_order_id})");
1124 let payload = vec![order_id_bytes.clone()];
1125 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1126 self.database.tx.send(op).map_err(|e| {
1127 anyhow::anyhow!("Failed to send delete order hash index command: {e}")
1128 })?;
1129 }
1130
1131 log::debug!("Sent all delete commands for order: {client_order_id}");
1132 Ok(())
1133 }
1134
1135 fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
1136 let position_id_bytes = Bytes::from(position_id.to_string());
1137
1138 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
1140 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1141 self.database
1142 .tx
1143 .send(op)
1144 .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
1145
1146 let index_keys = [
1148 INDEX_POSITIONS,
1149 INDEX_POSITIONS_OPEN,
1150 INDEX_POSITIONS_CLOSED,
1151 ];
1152
1153 for index_key in &index_keys {
1154 let key = index_key.to_string();
1155 let payload = vec![position_id_bytes.clone()];
1156 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1157 self.database.tx.send(op).map_err(|e| {
1158 anyhow::anyhow!("Failed to send delete position index command: {e}")
1159 })?;
1160 }
1161
1162 Ok(())
1163 }
1164
1165 fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
1166 let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
1167 let payload = vec![Bytes::from(event_id.to_string())];
1168 let op = DatabaseCommand::new(DatabaseOperation::DeleteFromList, key, Some(payload));
1169 self.database
1170 .tx
1171 .send(op)
1172 .map_err(|e| anyhow::anyhow!("Failed to send delete account event command: {e}"))
1173 }
1174
1175 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
1176 todo!()
1177 }
1178
1179 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
1180 todo!()
1181 }
1182
1183 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
1184 todo!()
1185 }
1186
1187 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
1188 todo!()
1189 }
1190
1191 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1192 todo!()
1193 }
1194
1195 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
1196 todo!()
1197 }
1198
1199 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
1200 todo!()
1201 }
1202
1203 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
1204 todo!()
1205 }
1206
1207 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
1208 todo!()
1209 }
1210
1211 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
1212 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1213 }
1214
1215 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
1216 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1217 }
1218
1219 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
1220 anyhow::bail!("Loading quote data for Redis cache adapter not supported")
1221 }
1222
1223 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
1224 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1225 }
1226
1227 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
1228 anyhow::bail!("Loading market data for Redis cache adapter not supported")
1229 }
1230
1231 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
1232 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1233 }
1234
1235 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
1236 anyhow::bail!("Loading market data for Redis cache adapter not supported")
1237 }
1238
1239 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
1240 anyhow::bail!("Saving signals for Redis cache adapter not supported")
1241 }
1242
1243 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
1244 anyhow::bail!("Loading signals from Redis cache adapter not supported")
1245 }
1246
1247 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
1248 anyhow::bail!("Saving custom data for Redis cache adapter not supported")
1249 }
1250
1251 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
1252 anyhow::bail!("Loading custom data from Redis cache adapter not supported")
1253 }
1254
1255 fn load_order_snapshot(
1256 &self,
1257 client_order_id: &ClientOrderId,
1258 ) -> anyhow::Result<Option<OrderSnapshot>> {
1259 anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
1260 }
1261
1262 fn load_position_snapshot(
1263 &self,
1264 position_id: &PositionId,
1265 ) -> anyhow::Result<Option<PositionSnapshot>> {
1266 anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
1267 }
1268
1269 fn index_venue_order_id(
1270 &self,
1271 client_order_id: ClientOrderId,
1272 venue_order_id: VenueOrderId,
1273 ) -> anyhow::Result<()> {
1274 todo!()
1275 }
1276
1277 fn index_order_position(
1278 &self,
1279 client_order_id: ClientOrderId,
1280 position_id: PositionId,
1281 ) -> anyhow::Result<()> {
1282 todo!()
1283 }
1284
1285 fn update_actor(&self) -> anyhow::Result<()> {
1286 todo!()
1287 }
1288
1289 fn update_strategy(&self) -> anyhow::Result<()> {
1290 todo!()
1291 }
1292
1293 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1294 todo!()
1295 }
1296
1297 fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
1298 todo!()
1299 }
1300
1301 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
1302 todo!()
1303 }
1304
1305 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1306 todo!()
1307 }
1308
1309 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
1310 todo!()
1311 }
1312
1313 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
1314 todo!()
1315 }
1316}
1317
1318#[cfg(test)]
1322mod tests {
1323 use rstest::rstest;
1324
1325 use super::*;
1326
1327 #[rstest]
1328 fn test_get_trader_key_with_prefix_and_instance_id() {
1329 let trader_id = TraderId::from("tester-123");
1330 let instance_id = UUID4::new();
1331 let config = CacheConfig {
1332 use_instance_id: true,
1333 ..Default::default()
1334 };
1335
1336 let key = get_trader_key(trader_id, instance_id, &config);
1337 assert!(key.starts_with("trader-tester-123:"));
1338 assert!(key.ends_with(&instance_id.to_string()));
1339 }
1340
1341 #[rstest]
1342 fn test_get_collection_key_valid() {
1343 let key = "collection:123";
1344 assert_eq!(get_collection_key(key).unwrap(), "collection");
1345 }
1346
1347 #[rstest]
1348 fn test_get_collection_key_invalid() {
1349 let key = "no_delimiter";
1350 assert!(get_collection_key(key).is_err());
1351 }
1352
1353 #[rstest]
1354 fn test_get_index_key_valid() {
1355 let key = "index:123";
1356 assert_eq!(get_index_key(key).unwrap(), "123");
1357 }
1358
1359 #[rstest]
1360 fn test_get_index_key_invalid() {
1361 let key = "no_delimiter";
1362 assert!(get_index_key(key).is_err());
1363 }
1364}