1use std::{
17 collections::{HashMap, VecDeque},
18 time::{Duration, Instant},
19};
20
21use bytes::Bytes;
22use nautilus_common::{
23 cache::{
24 CacheConfig,
25 database::{CacheDatabaseAdapter, CacheMap},
26 },
27 custom::CustomData,
28 enums::SerializationEncoding,
29 runtime::get_runtime,
30 signal::Signal,
31};
32use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
33use nautilus_cryptography::providers::install_cryptographic_provider;
34use nautilus_model::{
35 accounts::AccountAny,
36 data::{Bar, DataType, QuoteTick, TradeTick},
37 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
38 identifiers::{
39 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
40 TraderId, VenueOrderId,
41 },
42 instruments::{InstrumentAny, SyntheticInstrument},
43 orderbook::OrderBook,
44 orders::OrderAny,
45 position::Position,
46 types::Currency,
47};
48use redis::{Pipeline, aio::ConnectionManager};
49use tokio::try_join;
50use ustr::Ustr;
51
52use super::{REDIS_DELIMITER, REDIS_FLUSHDB};
53use crate::redis::{create_redis_connection, queries::DatabaseQueries};
54
55const CACHE_READ: &str = "cache-read";
57const CACHE_WRITE: &str = "cache-write";
58
59const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
61
62const INDEX: &str = "index";
64const GENERAL: &str = "general";
65const CURRENCIES: &str = "currencies";
66const INSTRUMENTS: &str = "instruments";
67const SYNTHETICS: &str = "synthetics";
68const ACCOUNTS: &str = "accounts";
69const ORDERS: &str = "orders";
70const POSITIONS: &str = "positions";
71const ACTORS: &str = "actors";
72const STRATEGIES: &str = "strategies";
73const SNAPSHOTS: &str = "snapshots";
74const HEALTH: &str = "health";
75
76const INDEX_ORDER_IDS: &str = "index:order_ids";
78const INDEX_ORDER_POSITION: &str = "index:order_position";
79const INDEX_ORDER_CLIENT: &str = "index:order_client";
80const INDEX_ORDERS: &str = "index:orders";
81const INDEX_ORDERS_OPEN: &str = "index:orders_open";
82const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
83const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
84const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
85const INDEX_POSITIONS: &str = "index:positions";
86const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
87const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
88
89#[derive(Clone, Debug)]
91pub enum DatabaseOperation {
92 Insert,
93 Update,
94 Delete,
95 Close,
96}
97
98#[derive(Clone, Debug)]
100pub struct DatabaseCommand {
101 pub op_type: DatabaseOperation,
103 pub key: Option<String>,
105 pub payload: Option<Vec<Bytes>>,
107}
108
109impl DatabaseCommand {
110 #[must_use]
112 pub fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
113 Self {
114 op_type,
115 key: Some(key),
116 payload,
117 }
118 }
119
120 #[must_use]
122 pub fn close() -> Self {
123 Self {
124 op_type: DatabaseOperation::Close,
125 key: None,
126 payload: None,
127 }
128 }
129}
130
131#[cfg_attr(
132 feature = "python",
133 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
134)]
135pub struct RedisCacheDatabase {
136 pub con: ConnectionManager,
137 pub trader_id: TraderId,
138 encoding: SerializationEncoding,
139 handle: tokio::task::JoinHandle<()>,
140 trader_key: String,
141 tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
142}
143
144impl RedisCacheDatabase {
145 pub async fn new(
148 trader_id: TraderId,
149 instance_id: UUID4,
150 config: CacheConfig,
151 ) -> anyhow::Result<RedisCacheDatabase> {
152 install_cryptographic_provider();
153
154 let db_config = config
155 .database
156 .as_ref()
157 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
158 let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
159
160 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
161 let trader_key = get_trader_key(trader_id, instance_id, &config);
162 let trader_key_clone = trader_key.clone();
163 let encoding = config.encoding;
164 let handle = get_runtime().spawn(async move {
165 if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
166 log::error!("Error in task '{CACHE_WRITE}': {e}");
167 }
168 });
169
170 Ok(RedisCacheDatabase {
171 trader_id,
172 trader_key,
173 con,
174 tx,
175 handle,
176 encoding,
177 })
178 }
179
180 pub fn get_encoding(&self) -> SerializationEncoding {
181 self.encoding
182 }
183
184 pub fn get_trader_key(&self) -> &str {
185 &self.trader_key
186 }
187
188 pub fn close(&mut self) {
189 log::debug!("Closing");
190
191 if let Err(e) = self.tx.send(DatabaseCommand::close()) {
192 log::debug!("Error sending close command: {e:?}")
193 }
194
195 log::debug!("Awaiting task '{CACHE_WRITE}'");
196 tokio::task::block_in_place(|| {
197 if let Err(e) = get_runtime().block_on(&mut self.handle) {
198 log::error!("Error awaiting task '{CACHE_WRITE}': {e:?}");
199 }
200 });
201
202 log::debug!("Closed");
203 }
204
205 pub async fn flushdb(&mut self) {
206 if let Err(e) = redis::cmd(REDIS_FLUSHDB)
207 .query_async::<()>(&mut self.con)
208 .await
209 {
210 log::error!("Failed to flush database: {e:?}");
211 }
212 }
213
214 pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
215 let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
216 log::debug!("Querying keys: {pattern}");
217 DatabaseQueries::scan_keys(&mut self.con, pattern).await
218 }
219
220 pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
221 DatabaseQueries::read(&self.con, &self.trader_key, key).await
222 }
223
224 pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
225 let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
226 match self.tx.send(op) {
227 Ok(_) => Ok(()),
228 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
229 }
230 }
231
232 pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
233 let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
234 match self.tx.send(op) {
235 Ok(_) => Ok(()),
236 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
237 }
238 }
239
240 pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
241 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
242 match self.tx.send(op) {
243 Ok(_) => Ok(()),
244 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
245 }
246 }
247}
248
249async fn process_commands(
250 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
251 trader_key: String,
252 config: CacheConfig,
253) -> anyhow::Result<()> {
254 tracing::debug!("Starting cache processing");
255
256 let db_config = config
257 .database
258 .as_ref()
259 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
260 let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
261
262 let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
264 let mut last_drain = Instant::now();
265 let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
266
267 loop {
269 if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
270 drain_buffer(&mut con, &trader_key, &mut buffer).await;
271 last_drain = Instant::now();
272 } else {
273 match rx.recv().await {
274 Some(cmd) => {
275 tracing::debug!("Received {cmd:?}");
276 if let DatabaseOperation::Close = cmd.op_type {
277 break;
278 }
279 buffer.push_back(cmd)
280 }
281 None => {
282 tracing::debug!("Command channel closed");
283 break;
284 }
285 }
286 }
287 }
288
289 if !buffer.is_empty() {
291 drain_buffer(&mut con, &trader_key, &mut buffer).await;
292 }
293
294 tracing::debug!("Stopped cache processing");
295 Ok(())
296}
297
298async fn drain_buffer(
299 conn: &mut ConnectionManager,
300 trader_key: &str,
301 buffer: &mut VecDeque<DatabaseCommand>,
302) {
303 let mut pipe = redis::pipe();
304 pipe.atomic();
305
306 for msg in buffer.drain(..) {
307 let key = match msg.key {
308 Some(key) => key,
309 None => {
310 log::error!("Null key found for message: {msg:?}");
311 continue;
312 }
313 };
314 let collection = match get_collection_key(&key) {
315 Ok(collection) => collection,
316 Err(e) => {
317 tracing::error!("{e}");
318 continue; }
320 };
321
322 let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
323
324 match msg.op_type {
325 DatabaseOperation::Insert => {
326 if let Some(payload) = msg.payload {
327 if let Err(e) = insert(&mut pipe, collection, &key, payload) {
328 tracing::error!("{e}");
329 }
330 } else {
331 tracing::error!("Null `payload` for `insert`");
332 }
333 }
334 DatabaseOperation::Update => {
335 if let Some(payload) = msg.payload {
336 if let Err(e) = update(&mut pipe, collection, &key, payload) {
337 tracing::error!("{e}");
338 }
339 } else {
340 tracing::error!("Null `payload` for `update`");
341 };
342 }
343 DatabaseOperation::Delete => {
344 if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
346 tracing::error!("{e}");
347 }
348 }
349 DatabaseOperation::Close => panic!("Close command should not be drained"),
350 }
351 }
352
353 if let Err(e) = pipe.query_async::<()>(conn).await {
354 tracing::error!("{e}");
355 }
356}
357
358fn insert(
359 pipe: &mut Pipeline,
360 collection: &str,
361 key: &str,
362 value: Vec<Bytes>,
363) -> anyhow::Result<()> {
364 check_slice_not_empty(value.as_slice(), stringify!(value))?;
365
366 match collection {
367 INDEX => insert_index(pipe, key, &value),
368 GENERAL => {
369 insert_string(pipe, key, value[0].as_ref());
370 Ok(())
371 }
372 CURRENCIES => {
373 insert_string(pipe, key, value[0].as_ref());
374 Ok(())
375 }
376 INSTRUMENTS => {
377 insert_string(pipe, key, value[0].as_ref());
378 Ok(())
379 }
380 SYNTHETICS => {
381 insert_string(pipe, key, value[0].as_ref());
382 Ok(())
383 }
384 ACCOUNTS => {
385 insert_list(pipe, key, value[0].as_ref());
386 Ok(())
387 }
388 ORDERS => {
389 insert_list(pipe, key, value[0].as_ref());
390 Ok(())
391 }
392 POSITIONS => {
393 insert_list(pipe, key, value[0].as_ref());
394 Ok(())
395 }
396 ACTORS => {
397 insert_string(pipe, key, value[0].as_ref());
398 Ok(())
399 }
400 STRATEGIES => {
401 insert_string(pipe, key, value[0].as_ref());
402 Ok(())
403 }
404 SNAPSHOTS => {
405 insert_list(pipe, key, value[0].as_ref());
406 Ok(())
407 }
408 HEALTH => {
409 insert_string(pipe, key, value[0].as_ref());
410 Ok(())
411 }
412 _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
413 }
414}
415
416fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
417 let index_key = get_index_key(key)?;
418 match index_key {
419 INDEX_ORDER_IDS => {
420 insert_set(pipe, key, value[0].as_ref());
421 Ok(())
422 }
423 INDEX_ORDER_POSITION => {
424 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
425 Ok(())
426 }
427 INDEX_ORDER_CLIENT => {
428 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
429 Ok(())
430 }
431 INDEX_ORDERS => {
432 insert_set(pipe, key, value[0].as_ref());
433 Ok(())
434 }
435 INDEX_ORDERS_OPEN => {
436 insert_set(pipe, key, value[0].as_ref());
437 Ok(())
438 }
439 INDEX_ORDERS_CLOSED => {
440 insert_set(pipe, key, value[0].as_ref());
441 Ok(())
442 }
443 INDEX_ORDERS_EMULATED => {
444 insert_set(pipe, key, value[0].as_ref());
445 Ok(())
446 }
447 INDEX_ORDERS_INFLIGHT => {
448 insert_set(pipe, key, value[0].as_ref());
449 Ok(())
450 }
451 INDEX_POSITIONS => {
452 insert_set(pipe, key, value[0].as_ref());
453 Ok(())
454 }
455 INDEX_POSITIONS_OPEN => {
456 insert_set(pipe, key, value[0].as_ref());
457 Ok(())
458 }
459 INDEX_POSITIONS_CLOSED => {
460 insert_set(pipe, key, value[0].as_ref());
461 Ok(())
462 }
463 _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
464 }
465}
466
467fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
468 pipe.set(key, value);
469}
470
471fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
472 pipe.sadd(key, value);
473}
474
475fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
476 pipe.hset(key, name, value);
477}
478
479fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
480 pipe.rpush(key, value);
481}
482
483fn update(
484 pipe: &mut Pipeline,
485 collection: &str,
486 key: &str,
487 value: Vec<Bytes>,
488) -> anyhow::Result<()> {
489 check_slice_not_empty(value.as_slice(), stringify!(value))?;
490
491 match collection {
492 ACCOUNTS => {
493 update_list(pipe, key, value[0].as_ref());
494 Ok(())
495 }
496 ORDERS => {
497 update_list(pipe, key, value[0].as_ref());
498 Ok(())
499 }
500 POSITIONS => {
501 update_list(pipe, key, value[0].as_ref());
502 Ok(())
503 }
504 _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
505 }
506}
507
508fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
509 pipe.rpush_exists(key, value);
510}
511
512fn delete(
513 pipe: &mut Pipeline,
514 collection: &str,
515 key: &str,
516 value: Option<Vec<Bytes>>,
517) -> anyhow::Result<()> {
518 match collection {
519 INDEX => remove_index(pipe, key, value),
520 ACTORS => {
521 delete_string(pipe, key);
522 Ok(())
523 }
524 STRATEGIES => {
525 delete_string(pipe, key);
526 Ok(())
527 }
528 _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
529 }
530}
531
532fn remove_index(pipe: &mut Pipeline, key: &str, value: Option<Vec<Bytes>>) -> anyhow::Result<()> {
533 let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
534 let index_key = get_index_key(key)?;
535
536 match index_key {
537 INDEX_ORDERS_OPEN => {
538 remove_from_set(pipe, key, value[0].as_ref());
539 Ok(())
540 }
541 INDEX_ORDERS_CLOSED => {
542 remove_from_set(pipe, key, value[0].as_ref());
543 Ok(())
544 }
545 INDEX_ORDERS_EMULATED => {
546 remove_from_set(pipe, key, value[0].as_ref());
547 Ok(())
548 }
549 INDEX_ORDERS_INFLIGHT => {
550 remove_from_set(pipe, key, value[0].as_ref());
551 Ok(())
552 }
553 INDEX_POSITIONS_OPEN => {
554 remove_from_set(pipe, key, value[0].as_ref());
555 Ok(())
556 }
557 INDEX_POSITIONS_CLOSED => {
558 remove_from_set(pipe, key, value[0].as_ref());
559 Ok(())
560 }
561 _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
562 }
563}
564
565fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
566 pipe.srem(key, member);
567}
568
569fn delete_string(pipe: &mut Pipeline, key: &str) {
570 pipe.del(key);
571}
572
573fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
574 let mut key = String::new();
575
576 if config.use_trader_prefix {
577 key.push_str("trader-");
578 }
579
580 key.push_str(trader_id.as_str());
581
582 if config.use_instance_id {
583 key.push(REDIS_DELIMITER);
584 key.push_str(&format!("{instance_id}"));
585 }
586
587 key
588}
589
590fn get_collection_key(key: &str) -> anyhow::Result<&str> {
591 key.split_once(REDIS_DELIMITER)
592 .map(|(collection, _)| collection)
593 .ok_or_else(|| {
594 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
595 })
596}
597
598fn get_index_key(key: &str) -> anyhow::Result<&str> {
599 key.split_once(REDIS_DELIMITER)
600 .map(|(_, index_key)| index_key)
601 .ok_or_else(|| {
602 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
603 })
604}
605
606#[allow(dead_code)] pub struct RedisCacheDatabaseAdapter {
608 pub encoding: SerializationEncoding,
609 database: RedisCacheDatabase,
610}
611
612#[allow(dead_code)] #[allow(unused)] #[async_trait::async_trait]
615impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
616 fn close(&mut self) -> anyhow::Result<()> {
617 self.database.close();
618 Ok(())
619 }
620
621 fn flush(&mut self) -> anyhow::Result<()> {
622 self.database.flushdb();
623 Ok(())
624 }
625
626 async fn load_all(&self) -> anyhow::Result<CacheMap> {
627 tracing::debug!("Loading all data");
628
629 let (
630 currencies,
631 instruments,
632 synthetics,
633 accounts,
634 orders,
635 positions,
636 greeks,
637 yield_curves,
638 ) = try_join!(
639 self.load_currencies(),
640 self.load_instruments(),
641 self.load_synthetics(),
642 self.load_accounts(),
643 self.load_orders(),
644 self.load_positions(),
645 self.load_greeks(),
646 self.load_yield_curves()
647 )
648 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
649
650 Ok(CacheMap {
651 currencies,
652 instruments,
653 synthetics,
654 accounts,
655 orders,
656 positions,
657 greeks,
658 yield_curves,
659 })
660 }
661
662 fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
663 Ok(HashMap::new()) }
666
667 async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>> {
668 DatabaseQueries::load_currencies(
669 &self.database.con,
670 &self.database.trader_key,
671 self.encoding,
672 )
673 .await
674 }
675
676 async fn load_instruments(&self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
677 DatabaseQueries::load_instruments(
678 &self.database.con,
679 &self.database.trader_key,
680 self.encoding,
681 )
682 .await
683 }
684
685 async fn load_synthetics(&self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
686 DatabaseQueries::load_synthetics(
687 &self.database.con,
688 &self.database.trader_key,
689 self.encoding,
690 )
691 .await
692 }
693
694 async fn load_accounts(&self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
695 DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
696 .await
697 }
698
699 async fn load_orders(&self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
700 DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
701 .await
702 }
703
704 async fn load_positions(&self) -> anyhow::Result<HashMap<PositionId, Position>> {
705 DatabaseQueries::load_positions(
706 &self.database.con,
707 &self.database.trader_key,
708 self.encoding,
709 )
710 .await
711 }
712
713 fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
714 todo!()
715 }
716
717 fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
718 todo!()
719 }
720
721 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
722 DatabaseQueries::load_currency(
723 &self.database.con,
724 &self.database.trader_key,
725 code,
726 self.encoding,
727 )
728 .await
729 }
730
731 async fn load_instrument(
732 &self,
733 instrument_id: &InstrumentId,
734 ) -> anyhow::Result<Option<InstrumentAny>> {
735 DatabaseQueries::load_instrument(
736 &self.database.con,
737 &self.database.trader_key,
738 instrument_id,
739 self.encoding,
740 )
741 .await
742 }
743
744 async fn load_synthetic(
745 &self,
746 instrument_id: &InstrumentId,
747 ) -> anyhow::Result<Option<SyntheticInstrument>> {
748 DatabaseQueries::load_synthetic(
749 &self.database.con,
750 &self.database.trader_key,
751 instrument_id,
752 self.encoding,
753 )
754 .await
755 }
756
757 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
758 DatabaseQueries::load_account(
759 &self.database.con,
760 &self.database.trader_key,
761 account_id,
762 self.encoding,
763 )
764 .await
765 }
766
767 async fn load_order(
768 &self,
769 client_order_id: &ClientOrderId,
770 ) -> anyhow::Result<Option<OrderAny>> {
771 DatabaseQueries::load_order(
772 &self.database.con,
773 &self.database.trader_key,
774 client_order_id,
775 self.encoding,
776 )
777 .await
778 }
779
780 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
781 DatabaseQueries::load_position(
782 &self.database.con,
783 &self.database.trader_key,
784 position_id,
785 self.encoding,
786 )
787 .await
788 }
789
790 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
791 todo!()
792 }
793
794 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
795 todo!()
796 }
797
798 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
799 todo!()
800 }
801
802 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
803 todo!()
804 }
805
806 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
807 todo!()
808 }
809
810 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
811 todo!()
812 }
813
814 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
815 todo!()
816 }
817
818 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
819 todo!()
820 }
821
822 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
823 todo!()
824 }
825
826 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
827 todo!()
828 }
829
830 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
831 todo!()
832 }
833
834 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
835 todo!()
836 }
837
838 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
839 todo!()
840 }
841
842 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
843 anyhow::bail!("Saving market data for Redis cache adapter not supported")
844 }
845
846 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
847 anyhow::bail!("Saving market data for Redis cache adapter not supported")
848 }
849
850 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
851 anyhow::bail!("Loading quote data for Redis cache adapter not supported")
852 }
853
854 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
855 anyhow::bail!("Saving market data for Redis cache adapter not supported")
856 }
857
858 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
859 anyhow::bail!("Loading market data for Redis cache adapter not supported")
860 }
861
862 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
863 anyhow::bail!("Saving market data for Redis cache adapter not supported")
864 }
865
866 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
867 anyhow::bail!("Loading market data for Redis cache adapter not supported")
868 }
869
870 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
871 anyhow::bail!("Saving signals for Redis cache adapter not supported")
872 }
873
874 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
875 anyhow::bail!("Loading signals from Redis cache adapter not supported")
876 }
877
878 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
879 anyhow::bail!("Saving custom data for Redis cache adapter not supported")
880 }
881
882 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
883 anyhow::bail!("Loading custom data from Redis cache adapter not supported")
884 }
885
886 fn load_order_snapshot(
887 &self,
888 client_order_id: &ClientOrderId,
889 ) -> anyhow::Result<Option<OrderSnapshot>> {
890 anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
891 }
892
893 fn load_position_snapshot(
894 &self,
895 position_id: &PositionId,
896 ) -> anyhow::Result<Option<PositionSnapshot>> {
897 anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
898 }
899
900 fn index_venue_order_id(
901 &self,
902 client_order_id: ClientOrderId,
903 venue_order_id: VenueOrderId,
904 ) -> anyhow::Result<()> {
905 todo!()
906 }
907
908 fn index_order_position(
909 &self,
910 client_order_id: ClientOrderId,
911 position_id: PositionId,
912 ) -> anyhow::Result<()> {
913 todo!()
914 }
915
916 fn update_actor(&self) -> anyhow::Result<()> {
917 todo!()
918 }
919
920 fn update_strategy(&self) -> anyhow::Result<()> {
921 todo!()
922 }
923
924 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
925 todo!()
926 }
927
928 fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
929 todo!()
930 }
931
932 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
933 todo!()
934 }
935
936 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
937 todo!()
938 }
939
940 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
941 todo!()
942 }
943
944 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
945 todo!()
946 }
947}
948
949#[cfg(test)]
953mod tests {
954 use rstest::rstest;
955
956 use super::*;
957
958 #[rstest]
959 fn test_get_trader_key_with_prefix_and_instance_id() {
960 let trader_id = TraderId::from("tester-123");
961 let instance_id = UUID4::new();
962 let mut config = CacheConfig::default();
963 config.use_instance_id = true;
964
965 let key = get_trader_key(trader_id, instance_id, &config);
966 assert!(key.starts_with("trader-tester-123:"));
967 assert!(key.ends_with(&instance_id.to_string()));
968 }
969
970 #[rstest]
971 fn test_get_collection_key_valid() {
972 let key = "collection:123";
973 assert_eq!(get_collection_key(key).unwrap(), "collection");
974 }
975
976 #[rstest]
977 fn test_get_collection_key_invalid() {
978 let key = "no_delimiter";
979 assert!(get_collection_key(key).is_err());
980 }
981
982 #[rstest]
983 fn test_get_index_key_valid() {
984 let key = "index:123";
985 assert_eq!(get_index_key(key).unwrap(), "123");
986 }
987
988 #[rstest]
989 fn test_get_index_key_invalid() {
990 let key = "no_delimiter";
991 assert!(get_index_key(key).is_err());
992 }
993}