1use std::{
17 collections::{HashMap, VecDeque},
18 time::{Duration, Instant},
19};
20
21use bytes::Bytes;
22use nautilus_common::{
23 cache::{
24 CacheConfig,
25 database::{CacheDatabaseAdapter, CacheMap},
26 },
27 custom::CustomData,
28 enums::SerializationEncoding,
29 runtime::get_runtime,
30 signal::Signal,
31};
32use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
33use nautilus_cryptography::providers::install_cryptographic_provider;
34use nautilus_model::{
35 accounts::AccountAny,
36 data::{Bar, DataType, QuoteTick, TradeTick},
37 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
38 identifiers::{
39 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
40 TraderId, VenueOrderId,
41 },
42 instruments::{InstrumentAny, SyntheticInstrument},
43 orderbook::OrderBook,
44 orders::OrderAny,
45 position::Position,
46 types::Currency,
47};
48use redis::{Pipeline, aio::ConnectionManager};
49use tokio::try_join;
50use ustr::Ustr;
51
52use super::{REDIS_DELIMITER, REDIS_FLUSHDB};
53use crate::redis::{create_redis_connection, queries::DatabaseQueries};
54
55const CACHE_READ: &str = "cache-read";
57const CACHE_WRITE: &str = "cache-write";
58
59const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
61
62const INDEX: &str = "index";
64const GENERAL: &str = "general";
65const CURRENCIES: &str = "currencies";
66const INSTRUMENTS: &str = "instruments";
67const SYNTHETICS: &str = "synthetics";
68const ACCOUNTS: &str = "accounts";
69const ORDERS: &str = "orders";
70const POSITIONS: &str = "positions";
71const ACTORS: &str = "actors";
72const STRATEGIES: &str = "strategies";
73const SNAPSHOTS: &str = "snapshots";
74const HEALTH: &str = "health";
75
76const INDEX_ORDER_IDS: &str = "index:order_ids";
78const INDEX_ORDER_POSITION: &str = "index:order_position";
79const INDEX_ORDER_CLIENT: &str = "index:order_client";
80const INDEX_ORDERS: &str = "index:orders";
81const INDEX_ORDERS_OPEN: &str = "index:orders_open";
82const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
83const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
84const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
85const INDEX_POSITIONS: &str = "index:positions";
86const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
87const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
88
89#[derive(Clone, Debug)]
91pub enum DatabaseOperation {
92 Insert,
93 Update,
94 Delete,
95 Close,
96}
97
98#[derive(Clone, Debug)]
100pub struct DatabaseCommand {
101 pub op_type: DatabaseOperation,
103 pub key: Option<String>,
105 pub payload: Option<Vec<Bytes>>,
107}
108
109impl DatabaseCommand {
110 #[must_use]
112 pub fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
113 Self {
114 op_type,
115 key: Some(key),
116 payload,
117 }
118 }
119
120 #[must_use]
122 pub fn close() -> Self {
123 Self {
124 op_type: DatabaseOperation::Close,
125 key: None,
126 payload: None,
127 }
128 }
129}
130
131#[cfg_attr(
132 feature = "python",
133 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
134)]
135pub struct RedisCacheDatabase {
136 pub con: ConnectionManager,
137 pub trader_id: TraderId,
138 encoding: SerializationEncoding,
139 handle: tokio::task::JoinHandle<()>,
140 trader_key: String,
141 tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
142}
143
144impl RedisCacheDatabase {
145 pub async fn new(
148 trader_id: TraderId,
149 instance_id: UUID4,
150 config: CacheConfig,
151 ) -> anyhow::Result<RedisCacheDatabase> {
152 install_cryptographic_provider();
153
154 let db_config = config
155 .database
156 .as_ref()
157 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
158 let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
159
160 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
161 let trader_key = get_trader_key(trader_id, instance_id, &config);
162 let trader_key_clone = trader_key.clone();
163 let encoding = config.encoding;
164 let handle = get_runtime().spawn(async move {
165 if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
166 log::error!("Failed to spawn task '{CACHE_WRITE}': {e}");
167 }
168 });
169
170 Ok(RedisCacheDatabase {
171 trader_id,
172 trader_key,
173 con,
174 tx,
175 handle,
176 encoding,
177 })
178 }
179
180 pub fn get_encoding(&self) -> SerializationEncoding {
181 self.encoding
182 }
183
184 pub fn get_trader_key(&self) -> &str {
185 &self.trader_key
186 }
187
188 pub fn close(&mut self) {
189 log::debug!("Closing");
190
191 if let Err(e) = self.tx.send(DatabaseCommand::close()) {
192 log::debug!("Error sending close message: {e:?}")
193 }
194
195 log::debug!("Awaiting task '{CACHE_WRITE}'");
196 tokio::task::block_in_place(|| {
197 if let Err(e) = get_runtime().block_on(&mut self.handle) {
198 log::error!("Error awaiting task '{CACHE_WRITE}': {e:?}");
199 }
200 });
201
202 log::debug!("Closed");
203 }
204
205 pub async fn flushdb(&mut self) {
206 if let Err(e) = redis::cmd(REDIS_FLUSHDB)
207 .query_async::<()>(&mut self.con)
208 .await
209 {
210 log::error!("Failed to flush database: {e:?}");
211 }
212 }
213
214 pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
215 let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
216 log::debug!("Querying keys: {pattern}");
217 DatabaseQueries::scan_keys(&mut self.con, pattern).await
218 }
219
220 pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
221 DatabaseQueries::read(&self.con, &self.trader_key, key).await
222 }
223
224 pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
225 let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
226 match self.tx.send(op) {
227 Ok(_) => Ok(()),
228 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
229 }
230 }
231
232 pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
233 let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
234 match self.tx.send(op) {
235 Ok(_) => Ok(()),
236 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
237 }
238 }
239
240 pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
241 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
242 match self.tx.send(op) {
243 Ok(_) => Ok(()),
244 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
245 }
246 }
247}
248
249async fn process_commands(
250 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
251 trader_key: String,
252 config: CacheConfig,
253) -> anyhow::Result<()> {
254 tracing::debug!("Starting cache processing");
255
256 let db_config = config
257 .database
258 .as_ref()
259 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
260 let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
261
262 let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
264 let mut last_drain = Instant::now();
265 let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
266
267 loop {
269 if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
270 drain_buffer(&mut con, &trader_key, &mut buffer).await;
271 last_drain = Instant::now();
272 } else {
273 match rx.recv().await {
274 Some(msg) => {
275 if let DatabaseOperation::Close = msg.op_type {
276 break;
277 }
278 buffer.push_back(msg)
279 }
280 None => break, }
282 }
283 }
284
285 if !buffer.is_empty() {
287 drain_buffer(&mut con, &trader_key, &mut buffer).await;
288 }
289
290 tracing::debug!("Stopped cache processing");
291 Ok(())
292}
293
294async fn drain_buffer(
295 conn: &mut ConnectionManager,
296 trader_key: &str,
297 buffer: &mut VecDeque<DatabaseCommand>,
298) {
299 let mut pipe = redis::pipe();
300 pipe.atomic();
301
302 for msg in buffer.drain(..) {
303 let key = match msg.key {
304 Some(key) => key,
305 None => {
306 log::error!("Null key found for message: {msg:?}");
307 continue;
308 }
309 };
310 let collection = match get_collection_key(&key) {
311 Ok(collection) => collection,
312 Err(e) => {
313 tracing::error!("{e}");
314 continue; }
316 };
317
318 let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
319
320 match msg.op_type {
321 DatabaseOperation::Insert => {
322 if let Some(payload) = msg.payload {
323 if let Err(e) = insert(&mut pipe, collection, &key, payload) {
324 tracing::error!("{e}");
325 }
326 } else {
327 tracing::error!("Null `payload` for `insert`");
328 }
329 }
330 DatabaseOperation::Update => {
331 if let Some(payload) = msg.payload {
332 if let Err(e) = update(&mut pipe, collection, &key, payload) {
333 tracing::error!("{e}");
334 }
335 } else {
336 tracing::error!("Null `payload` for `update`");
337 };
338 }
339 DatabaseOperation::Delete => {
340 if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
342 tracing::error!("{e}");
343 }
344 }
345 DatabaseOperation::Close => panic!("Close command should not be drained"),
346 }
347 }
348
349 if let Err(e) = pipe.query_async::<()>(conn).await {
350 tracing::error!("{e}");
351 }
352}
353
354fn insert(
355 pipe: &mut Pipeline,
356 collection: &str,
357 key: &str,
358 value: Vec<Bytes>,
359) -> anyhow::Result<()> {
360 check_slice_not_empty(value.as_slice(), stringify!(value))?;
361
362 match collection {
363 INDEX => insert_index(pipe, key, &value),
364 GENERAL => {
365 insert_string(pipe, key, value[0].as_ref());
366 Ok(())
367 }
368 CURRENCIES => {
369 insert_string(pipe, key, value[0].as_ref());
370 Ok(())
371 }
372 INSTRUMENTS => {
373 insert_string(pipe, key, value[0].as_ref());
374 Ok(())
375 }
376 SYNTHETICS => {
377 insert_string(pipe, key, value[0].as_ref());
378 Ok(())
379 }
380 ACCOUNTS => {
381 insert_list(pipe, key, value[0].as_ref());
382 Ok(())
383 }
384 ORDERS => {
385 insert_list(pipe, key, value[0].as_ref());
386 Ok(())
387 }
388 POSITIONS => {
389 insert_list(pipe, key, value[0].as_ref());
390 Ok(())
391 }
392 ACTORS => {
393 insert_string(pipe, key, value[0].as_ref());
394 Ok(())
395 }
396 STRATEGIES => {
397 insert_string(pipe, key, value[0].as_ref());
398 Ok(())
399 }
400 SNAPSHOTS => {
401 insert_list(pipe, key, value[0].as_ref());
402 Ok(())
403 }
404 HEALTH => {
405 insert_string(pipe, key, value[0].as_ref());
406 Ok(())
407 }
408 _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
409 }
410}
411
412fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
413 let index_key = get_index_key(key)?;
414 match index_key {
415 INDEX_ORDER_IDS => {
416 insert_set(pipe, key, value[0].as_ref());
417 Ok(())
418 }
419 INDEX_ORDER_POSITION => {
420 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
421 Ok(())
422 }
423 INDEX_ORDER_CLIENT => {
424 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
425 Ok(())
426 }
427 INDEX_ORDERS => {
428 insert_set(pipe, key, value[0].as_ref());
429 Ok(())
430 }
431 INDEX_ORDERS_OPEN => {
432 insert_set(pipe, key, value[0].as_ref());
433 Ok(())
434 }
435 INDEX_ORDERS_CLOSED => {
436 insert_set(pipe, key, value[0].as_ref());
437 Ok(())
438 }
439 INDEX_ORDERS_EMULATED => {
440 insert_set(pipe, key, value[0].as_ref());
441 Ok(())
442 }
443 INDEX_ORDERS_INFLIGHT => {
444 insert_set(pipe, key, value[0].as_ref());
445 Ok(())
446 }
447 INDEX_POSITIONS => {
448 insert_set(pipe, key, value[0].as_ref());
449 Ok(())
450 }
451 INDEX_POSITIONS_OPEN => {
452 insert_set(pipe, key, value[0].as_ref());
453 Ok(())
454 }
455 INDEX_POSITIONS_CLOSED => {
456 insert_set(pipe, key, value[0].as_ref());
457 Ok(())
458 }
459 _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
460 }
461}
462
463fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
464 pipe.set(key, value);
465}
466
467fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
468 pipe.sadd(key, value);
469}
470
471fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
472 pipe.hset(key, name, value);
473}
474
475fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
476 pipe.rpush(key, value);
477}
478
479fn update(
480 pipe: &mut Pipeline,
481 collection: &str,
482 key: &str,
483 value: Vec<Bytes>,
484) -> anyhow::Result<()> {
485 check_slice_not_empty(value.as_slice(), stringify!(value))?;
486
487 match collection {
488 ACCOUNTS => {
489 update_list(pipe, key, value[0].as_ref());
490 Ok(())
491 }
492 ORDERS => {
493 update_list(pipe, key, value[0].as_ref());
494 Ok(())
495 }
496 POSITIONS => {
497 update_list(pipe, key, value[0].as_ref());
498 Ok(())
499 }
500 _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
501 }
502}
503
504fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
505 pipe.rpush_exists(key, value);
506}
507
508fn delete(
509 pipe: &mut Pipeline,
510 collection: &str,
511 key: &str,
512 value: Option<Vec<Bytes>>,
513) -> anyhow::Result<()> {
514 match collection {
515 INDEX => remove_index(pipe, key, value),
516 ACTORS => {
517 delete_string(pipe, key);
518 Ok(())
519 }
520 STRATEGIES => {
521 delete_string(pipe, key);
522 Ok(())
523 }
524 _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
525 }
526}
527
528fn remove_index(pipe: &mut Pipeline, key: &str, value: Option<Vec<Bytes>>) -> anyhow::Result<()> {
529 let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
530 let index_key = get_index_key(key)?;
531
532 match index_key {
533 INDEX_ORDERS_OPEN => {
534 remove_from_set(pipe, key, value[0].as_ref());
535 Ok(())
536 }
537 INDEX_ORDERS_CLOSED => {
538 remove_from_set(pipe, key, value[0].as_ref());
539 Ok(())
540 }
541 INDEX_ORDERS_EMULATED => {
542 remove_from_set(pipe, key, value[0].as_ref());
543 Ok(())
544 }
545 INDEX_ORDERS_INFLIGHT => {
546 remove_from_set(pipe, key, value[0].as_ref());
547 Ok(())
548 }
549 INDEX_POSITIONS_OPEN => {
550 remove_from_set(pipe, key, value[0].as_ref());
551 Ok(())
552 }
553 INDEX_POSITIONS_CLOSED => {
554 remove_from_set(pipe, key, value[0].as_ref());
555 Ok(())
556 }
557 _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
558 }
559}
560
561fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
562 pipe.srem(key, member);
563}
564
565fn delete_string(pipe: &mut Pipeline, key: &str) {
566 pipe.del(key);
567}
568
569fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
570 let mut key = String::new();
571
572 if config.use_trader_prefix {
573 key.push_str("trader-");
574 }
575
576 key.push_str(trader_id.as_str());
577
578 if config.use_instance_id {
579 key.push(REDIS_DELIMITER);
580 key.push_str(&format!("{instance_id}"));
581 }
582
583 key
584}
585
586fn get_collection_key(key: &str) -> anyhow::Result<&str> {
587 key.split_once(REDIS_DELIMITER)
588 .map(|(collection, _)| collection)
589 .ok_or_else(|| {
590 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
591 })
592}
593
594fn get_index_key(key: &str) -> anyhow::Result<&str> {
595 key.split_once(REDIS_DELIMITER)
596 .map(|(_, index_key)| index_key)
597 .ok_or_else(|| {
598 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
599 })
600}
601
602#[allow(dead_code)] pub struct RedisCacheDatabaseAdapter {
604 pub encoding: SerializationEncoding,
605 database: RedisCacheDatabase,
606}
607
608#[allow(dead_code)] #[allow(unused)] #[async_trait::async_trait]
611impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
612 fn close(&mut self) -> anyhow::Result<()> {
613 self.database.close();
614 Ok(())
615 }
616
617 fn flush(&mut self) -> anyhow::Result<()> {
618 self.database.flushdb();
619 Ok(())
620 }
621
622 async fn load_all(&self) -> anyhow::Result<CacheMap> {
623 tracing::debug!("Loading all data");
624
625 let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
626 self.load_currencies(),
627 self.load_instruments(),
628 self.load_synthetics(),
629 self.load_accounts(),
630 self.load_orders(),
631 self.load_positions()
632 )
633 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
634
635 Ok(CacheMap {
636 currencies,
637 instruments,
638 synthetics,
639 accounts,
640 orders,
641 positions,
642 })
643 }
644
645 fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
646 Ok(HashMap::new()) }
649
650 async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>> {
651 DatabaseQueries::load_currencies(
652 &self.database.con,
653 &self.database.trader_key,
654 self.encoding,
655 )
656 .await
657 }
658
659 async fn load_instruments(&self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
660 DatabaseQueries::load_instruments(
661 &self.database.con,
662 &self.database.trader_key,
663 self.encoding,
664 )
665 .await
666 }
667
668 async fn load_synthetics(&self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
669 DatabaseQueries::load_synthetics(
670 &self.database.con,
671 &self.database.trader_key,
672 self.encoding,
673 )
674 .await
675 }
676
677 async fn load_accounts(&self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
678 DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
679 .await
680 }
681
682 async fn load_orders(&self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
683 DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
684 .await
685 }
686
687 async fn load_positions(&self) -> anyhow::Result<HashMap<PositionId, Position>> {
688 DatabaseQueries::load_positions(
689 &self.database.con,
690 &self.database.trader_key,
691 self.encoding,
692 )
693 .await
694 }
695
696 fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
697 todo!()
698 }
699
700 fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
701 todo!()
702 }
703
704 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
705 DatabaseQueries::load_currency(
706 &self.database.con,
707 &self.database.trader_key,
708 code,
709 self.encoding,
710 )
711 .await
712 }
713
714 async fn load_instrument(
715 &self,
716 instrument_id: &InstrumentId,
717 ) -> anyhow::Result<Option<InstrumentAny>> {
718 DatabaseQueries::load_instrument(
719 &self.database.con,
720 &self.database.trader_key,
721 instrument_id,
722 self.encoding,
723 )
724 .await
725 }
726
727 async fn load_synthetic(
728 &self,
729 instrument_id: &InstrumentId,
730 ) -> anyhow::Result<Option<SyntheticInstrument>> {
731 DatabaseQueries::load_synthetic(
732 &self.database.con,
733 &self.database.trader_key,
734 instrument_id,
735 self.encoding,
736 )
737 .await
738 }
739
740 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
741 DatabaseQueries::load_account(
742 &self.database.con,
743 &self.database.trader_key,
744 account_id,
745 self.encoding,
746 )
747 .await
748 }
749
750 async fn load_order(
751 &self,
752 client_order_id: &ClientOrderId,
753 ) -> anyhow::Result<Option<OrderAny>> {
754 DatabaseQueries::load_order(
755 &self.database.con,
756 &self.database.trader_key,
757 client_order_id,
758 self.encoding,
759 )
760 .await
761 }
762
763 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
764 DatabaseQueries::load_position(
765 &self.database.con,
766 &self.database.trader_key,
767 position_id,
768 self.encoding,
769 )
770 .await
771 }
772
773 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
774 todo!()
775 }
776
777 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
778 todo!()
779 }
780
781 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
782 todo!()
783 }
784
785 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
786 todo!()
787 }
788
789 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
790 todo!()
791 }
792
793 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
794 todo!()
795 }
796
797 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
798 todo!()
799 }
800
801 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
802 todo!()
803 }
804
805 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
806 todo!()
807 }
808
809 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
810 todo!()
811 }
812
813 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
814 todo!()
815 }
816
817 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
818 todo!()
819 }
820
821 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
822 todo!()
823 }
824
825 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
826 anyhow::bail!("Saving market data for Redis cache adapter not supported")
827 }
828
829 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
830 anyhow::bail!("Saving market data for Redis cache adapter not supported")
831 }
832
833 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
834 anyhow::bail!("Loading quote data for Redis cache adapter not supported")
835 }
836
837 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
838 anyhow::bail!("Saving market data for Redis cache adapter not supported")
839 }
840
841 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
842 anyhow::bail!("Loading market data for Redis cache adapter not supported")
843 }
844
845 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
846 anyhow::bail!("Saving market data for Redis cache adapter not supported")
847 }
848
849 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
850 anyhow::bail!("Loading market data for Redis cache adapter not supported")
851 }
852
853 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
854 anyhow::bail!("Saving signals for Redis cache adapter not supported")
855 }
856
857 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
858 anyhow::bail!("Loading signals from Redis cache adapter not supported")
859 }
860
861 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
862 anyhow::bail!("Saving custom data for Redis cache adapter not supported")
863 }
864
865 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
866 anyhow::bail!("Loading custom data from Redis cache adapter not supported")
867 }
868
869 fn load_order_snapshot(
870 &self,
871 client_order_id: &ClientOrderId,
872 ) -> anyhow::Result<Option<OrderSnapshot>> {
873 anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
874 }
875
876 fn load_position_snapshot(
877 &self,
878 position_id: &PositionId,
879 ) -> anyhow::Result<Option<PositionSnapshot>> {
880 anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
881 }
882
883 fn index_venue_order_id(
884 &self,
885 client_order_id: ClientOrderId,
886 venue_order_id: VenueOrderId,
887 ) -> anyhow::Result<()> {
888 todo!()
889 }
890
891 fn index_order_position(
892 &self,
893 client_order_id: ClientOrderId,
894 position_id: PositionId,
895 ) -> anyhow::Result<()> {
896 todo!()
897 }
898
899 fn update_actor(&self) -> anyhow::Result<()> {
900 todo!()
901 }
902
903 fn update_strategy(&self) -> anyhow::Result<()> {
904 todo!()
905 }
906
907 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
908 todo!()
909 }
910
911 fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
912 todo!()
913 }
914
915 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
916 todo!()
917 }
918
919 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
920 todo!()
921 }
922
923 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
924 todo!()
925 }
926
927 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
928 todo!()
929 }
930}
931
932#[cfg(test)]
936mod tests {
937 use rstest::rstest;
938
939 use super::*;
940
941 #[rstest]
942 fn test_get_trader_key_with_prefix_and_instance_id() {
943 let trader_id = TraderId::from("tester-123");
944 let instance_id = UUID4::new();
945 let mut config = CacheConfig::default();
946 config.use_instance_id = true;
947
948 let key = get_trader_key(trader_id, instance_id, &config);
949 assert!(key.starts_with("trader-tester-123:"));
950 assert!(key.ends_with(&instance_id.to_string()));
951 }
952
953 #[rstest]
954 fn test_get_collection_key_valid() {
955 let key = "collection:123";
956 assert_eq!(get_collection_key(key).unwrap(), "collection");
957 }
958
959 #[rstest]
960 fn test_get_collection_key_invalid() {
961 let key = "no_delimiter";
962 assert!(get_collection_key(key).is_err());
963 }
964
965 #[rstest]
966 fn test_get_index_key_valid() {
967 let key = "index:123";
968 assert_eq!(get_index_key(key).unwrap(), "123");
969 }
970
971 #[rstest]
972 fn test_get_index_key_invalid() {
973 let key = "no_delimiter";
974 assert!(get_index_key(key).is_err());
975 }
976}