1use std::{
17 collections::{HashMap, VecDeque},
18 str::FromStr,
19 time::{Duration, Instant},
20};
21
22use bytes::Bytes;
23use nautilus_common::{
24 cache::{database::CacheDatabaseAdapter, CacheConfig},
25 custom::CustomData,
26 enums::SerializationEncoding,
27 runtime::get_runtime,
28 signal::Signal,
29};
30use nautilus_core::{correctness::check_slice_not_empty, UnixNanos, UUID4};
31use nautilus_cryptography::providers::install_cryptographic_provider;
32use nautilus_model::{
33 accounts::AccountAny,
34 data::{Bar, DataType, QuoteTick, TradeTick},
35 events::{position::snapshot::PositionSnapshot, OrderEventAny, OrderSnapshot},
36 identifiers::{
37 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
38 TraderId, VenueOrderId,
39 },
40 instruments::{InstrumentAny, SyntheticInstrument},
41 orderbook::OrderBook,
42 orders::OrderAny,
43 position::Position,
44 types::Currency,
45};
46use redis::{Commands, Connection, Pipeline, RedisError};
47use ustr::Ustr;
48
49use super::{REDIS_DELIMITER, REDIS_FLUSHDB};
50use crate::redis::create_redis_connection;
51
52const CACHE_READ: &str = "cache-read";
54const CACHE_WRITE: &str = "cache-write";
55
56const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
58
59const INDEX: &str = "index";
61const GENERAL: &str = "general";
62const CURRENCIES: &str = "currencies";
63const INSTRUMENTS: &str = "instruments";
64const SYNTHETICS: &str = "synthetics";
65const ACCOUNTS: &str = "accounts";
66const ORDERS: &str = "orders";
67const POSITIONS: &str = "positions";
68const ACTORS: &str = "actors";
69const STRATEGIES: &str = "strategies";
70const SNAPSHOTS: &str = "snapshots";
71const HEALTH: &str = "health";
72
73const INDEX_ORDER_IDS: &str = "index:order_ids";
75const INDEX_ORDER_POSITION: &str = "index:order_position";
76const INDEX_ORDER_CLIENT: &str = "index:order_client";
77const INDEX_ORDERS: &str = "index:orders";
78const INDEX_ORDERS_OPEN: &str = "index:orders_open";
79const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
80const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
81const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
82const INDEX_POSITIONS: &str = "index:positions";
83const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
84const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
85
86#[derive(Clone, Debug)]
88pub enum DatabaseOperation {
89 Insert,
90 Update,
91 Delete,
92 Close,
93}
94
95#[derive(Clone, Debug)]
97pub struct DatabaseCommand {
98 pub op_type: DatabaseOperation,
100 pub key: Option<String>,
102 pub payload: Option<Vec<Bytes>>,
104}
105
106impl DatabaseCommand {
107 #[must_use]
109 pub fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
110 Self {
111 op_type,
112 key: Some(key),
113 payload,
114 }
115 }
116
117 #[must_use]
119 pub fn close() -> Self {
120 Self {
121 op_type: DatabaseOperation::Close,
122 key: None,
123 payload: None,
124 }
125 }
126}
127
128#[cfg_attr(
129 feature = "python",
130 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
131)]
132pub struct RedisCacheDatabase {
133 pub trader_id: TraderId,
134 trader_key: String,
135 con: Connection,
136 tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
137 handle: tokio::task::JoinHandle<()>,
138}
139
140impl RedisCacheDatabase {
141 pub fn new(
143 trader_id: TraderId,
144 instance_id: UUID4,
145 config: CacheConfig,
146 ) -> anyhow::Result<RedisCacheDatabase> {
147 install_cryptographic_provider();
148
149 let db_config = config
150 .database
151 .as_ref()
152 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
153 let con = create_redis_connection(CACHE_READ, db_config.clone())?;
154
155 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
156 let trader_key = get_trader_key(trader_id, instance_id, &config);
157 let trader_key_clone = trader_key.clone();
158
159 let handle = get_runtime().spawn(async move {
160 process_commands(rx, trader_key_clone, config.clone())
161 .await
162 .expect("Error spawning task '{CACHE_WRITE}'")
163 });
164
165 Ok(RedisCacheDatabase {
166 trader_id,
167 trader_key,
168 con,
169 tx,
170 handle,
171 })
172 }
173
174 pub fn close(&mut self) {
175 log::debug!("Closing");
176
177 if let Err(e) = self.tx.send(DatabaseCommand::close()) {
178 log::debug!("Error sending close message: {e:?}")
179 }
180
181 log::debug!("Awaiting task '{CACHE_WRITE}'");
182 tokio::task::block_in_place(|| {
183 if let Err(e) = get_runtime().block_on(&mut self.handle) {
184 log::error!("Error awaiting task '{CACHE_WRITE}': {e:?}");
185 }
186 });
187
188 log::debug!("Closed");
189 }
190
191 pub fn flushdb(&mut self) {
192 if let Err(e) = redis::cmd(REDIS_FLUSHDB).query::<()>(&mut self.con) {
193 log::error!("Failed to flush database: {e:?}");
194 }
195 }
196
197 pub fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
198 let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
199 log::debug!("Querying keys: {pattern}");
200 Ok(scan_keys(&mut self.con, pattern)?)
201 }
202
203 pub fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
204 let collection = get_collection_key(key)?;
205 let key = format!("{}{REDIS_DELIMITER}{}", self.trader_key, key);
206
207 match collection {
208 INDEX => read_index(&mut self.con, &key),
209 GENERAL => read_string(&mut self.con, &key),
210 CURRENCIES => read_string(&mut self.con, &key),
211 INSTRUMENTS => read_string(&mut self.con, &key),
212 SYNTHETICS => read_string(&mut self.con, &key),
213 ACCOUNTS => read_list(&mut self.con, &key),
214 ORDERS => read_list(&mut self.con, &key),
215 POSITIONS => read_list(&mut self.con, &key),
216 ACTORS => read_string(&mut self.con, &key),
217 STRATEGIES => read_string(&mut self.con, &key),
218 _ => anyhow::bail!("Unsupported operation: `read` for collection '{collection}'"),
219 }
220 }
221
222 pub fn insert(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
223 let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
224 match self.tx.send(op) {
225 Ok(_) => Ok(()),
226 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
227 }
228 }
229
230 pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
231 let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
232 match self.tx.send(op) {
233 Ok(_) => Ok(()),
234 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
235 }
236 }
237
238 pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
239 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
240 match self.tx.send(op) {
241 Ok(_) => Ok(()),
242 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
243 }
244 }
245}
246
247async fn process_commands(
248 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
249 trader_key: String,
250 config: CacheConfig,
251) -> anyhow::Result<()> {
252 tracing::debug!("Starting cache processing");
253
254 let db_config = config
255 .database
256 .as_ref()
257 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
258 let mut con = create_redis_connection(CACHE_WRITE, db_config.clone())?;
259
260 let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
262 let mut last_drain = Instant::now();
263 let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
264
265 loop {
267 if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
268 drain_buffer(&mut con, &trader_key, &mut buffer);
269 last_drain = Instant::now();
270 } else {
271 match rx.recv().await {
272 Some(msg) => {
273 if let DatabaseOperation::Close = msg.op_type {
274 break;
275 }
276 buffer.push_back(msg)
277 }
278 None => break, }
280 }
281 }
282
283 if !buffer.is_empty() {
285 drain_buffer(&mut con, &trader_key, &mut buffer);
286 }
287
288 tracing::debug!("Stopped cache processing");
289 Ok(())
290}
291
292fn drain_buffer(conn: &mut Connection, trader_key: &str, buffer: &mut VecDeque<DatabaseCommand>) {
293 let mut pipe = redis::pipe();
294 pipe.atomic();
295
296 for msg in buffer.drain(..) {
297 let key = msg.key.expect("Null command `key`");
298 let collection = match get_collection_key(&key) {
299 Ok(collection) => collection,
300 Err(e) => {
301 tracing::error!("{e}");
302 continue; }
304 };
305
306 let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
307
308 match msg.op_type {
309 DatabaseOperation::Insert => {
310 if let Some(payload) = msg.payload {
311 if let Err(e) = insert(&mut pipe, collection, &key, payload) {
312 tracing::error!("{e}");
313 }
314 } else {
315 tracing::error!("Null `payload` for `insert`");
316 }
317 }
318 DatabaseOperation::Update => {
319 if let Some(payload) = msg.payload {
320 if let Err(e) = update(&mut pipe, collection, &key, payload) {
321 tracing::error!("{e}");
322 }
323 } else {
324 tracing::error!("Null `payload` for `update`");
325 };
326 }
327 DatabaseOperation::Delete => {
328 if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
330 tracing::error!("{e}");
331 }
332 }
333 DatabaseOperation::Close => panic!("Close command should not be drained"),
334 }
335 }
336
337 if let Err(e) = pipe.query::<()>(conn) {
338 tracing::error!("{e}");
339 }
340}
341
342fn scan_keys(con: &mut Connection, pattern: String) -> Result<Vec<String>, RedisError> {
343 Ok(con.scan_match::<String, String>(pattern)?.collect())
344}
345
346fn read_index(conn: &mut Connection, key: &str) -> anyhow::Result<Vec<Bytes>> {
347 let index_key = get_index_key(key)?;
348 match index_key {
349 INDEX_ORDER_IDS => read_set(conn, key),
350 INDEX_ORDER_POSITION => read_hset(conn, key),
351 INDEX_ORDER_CLIENT => read_hset(conn, key),
352 INDEX_ORDERS => read_set(conn, key),
353 INDEX_ORDERS_OPEN => read_set(conn, key),
354 INDEX_ORDERS_CLOSED => read_set(conn, key),
355 INDEX_ORDERS_EMULATED => read_set(conn, key),
356 INDEX_ORDERS_INFLIGHT => read_set(conn, key),
357 INDEX_POSITIONS => read_set(conn, key),
358 INDEX_POSITIONS_OPEN => read_set(conn, key),
359 INDEX_POSITIONS_CLOSED => read_set(conn, key),
360 _ => anyhow::bail!("Index unknown '{index_key}' on read"),
361 }
362}
363
364fn read_string(conn: &mut Connection, key: &str) -> anyhow::Result<Vec<Bytes>> {
365 let result: Vec<u8> = conn.get(key)?;
366
367 if result.is_empty() {
368 Ok(vec![])
369 } else {
370 Ok(vec![Bytes::from(result)])
371 }
372}
373
374fn read_set(conn: &mut Connection, key: &str) -> anyhow::Result<Vec<Bytes>> {
375 let result: Vec<Bytes> = conn.smembers(key)?;
376 Ok(result)
377}
378
379fn read_hset(conn: &mut Connection, key: &str) -> anyhow::Result<Vec<Bytes>> {
380 let result: HashMap<String, String> = conn.hgetall(key)?;
381 let json = serde_json::to_string(&result)?;
382 Ok(vec![Bytes::from(json.into_bytes())])
383}
384
385fn read_list(conn: &mut Connection, key: &str) -> anyhow::Result<Vec<Bytes>> {
386 let result: Vec<Bytes> = conn.lrange(key, 0, -1)?;
387 Ok(result)
388}
389
390fn insert(
391 pipe: &mut Pipeline,
392 collection: &str,
393 key: &str,
394 value: Vec<Bytes>,
395) -> anyhow::Result<()> {
396 check_slice_not_empty(value.as_slice(), stringify!(value))?;
397
398 match collection {
399 INDEX => insert_index(pipe, key, &value),
400 GENERAL => {
401 insert_string(pipe, key, value[0].as_ref());
402 Ok(())
403 }
404 CURRENCIES => {
405 insert_string(pipe, key, value[0].as_ref());
406 Ok(())
407 }
408 INSTRUMENTS => {
409 insert_string(pipe, key, value[0].as_ref());
410 Ok(())
411 }
412 SYNTHETICS => {
413 insert_string(pipe, key, value[0].as_ref());
414 Ok(())
415 }
416 ACCOUNTS => {
417 insert_list(pipe, key, value[0].as_ref());
418 Ok(())
419 }
420 ORDERS => {
421 insert_list(pipe, key, value[0].as_ref());
422 Ok(())
423 }
424 POSITIONS => {
425 insert_list(pipe, key, value[0].as_ref());
426 Ok(())
427 }
428 ACTORS => {
429 insert_string(pipe, key, value[0].as_ref());
430 Ok(())
431 }
432 STRATEGIES => {
433 insert_string(pipe, key, value[0].as_ref());
434 Ok(())
435 }
436 SNAPSHOTS => {
437 insert_list(pipe, key, value[0].as_ref());
438 Ok(())
439 }
440 HEALTH => {
441 insert_string(pipe, key, value[0].as_ref());
442 Ok(())
443 }
444 _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
445 }
446}
447
448fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
449 let index_key = get_index_key(key)?;
450 match index_key {
451 INDEX_ORDER_IDS => {
452 insert_set(pipe, key, value[0].as_ref());
453 Ok(())
454 }
455 INDEX_ORDER_POSITION => {
456 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
457 Ok(())
458 }
459 INDEX_ORDER_CLIENT => {
460 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
461 Ok(())
462 }
463 INDEX_ORDERS => {
464 insert_set(pipe, key, value[0].as_ref());
465 Ok(())
466 }
467 INDEX_ORDERS_OPEN => {
468 insert_set(pipe, key, value[0].as_ref());
469 Ok(())
470 }
471 INDEX_ORDERS_CLOSED => {
472 insert_set(pipe, key, value[0].as_ref());
473 Ok(())
474 }
475 INDEX_ORDERS_EMULATED => {
476 insert_set(pipe, key, value[0].as_ref());
477 Ok(())
478 }
479 INDEX_ORDERS_INFLIGHT => {
480 insert_set(pipe, key, value[0].as_ref());
481 Ok(())
482 }
483 INDEX_POSITIONS => {
484 insert_set(pipe, key, value[0].as_ref());
485 Ok(())
486 }
487 INDEX_POSITIONS_OPEN => {
488 insert_set(pipe, key, value[0].as_ref());
489 Ok(())
490 }
491 INDEX_POSITIONS_CLOSED => {
492 insert_set(pipe, key, value[0].as_ref());
493 Ok(())
494 }
495 _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
496 }
497}
498
499fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
500 pipe.set(key, value);
501}
502
503fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
504 pipe.sadd(key, value);
505}
506
507fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
508 pipe.hset(key, name, value);
509}
510
511fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
512 pipe.rpush(key, value);
513}
514
515fn update(
516 pipe: &mut Pipeline,
517 collection: &str,
518 key: &str,
519 value: Vec<Bytes>,
520) -> anyhow::Result<()> {
521 check_slice_not_empty(value.as_slice(), stringify!(value))?;
522
523 match collection {
524 ACCOUNTS => {
525 update_list(pipe, key, value[0].as_ref());
526 Ok(())
527 }
528 ORDERS => {
529 update_list(pipe, key, value[0].as_ref());
530 Ok(())
531 }
532 POSITIONS => {
533 update_list(pipe, key, value[0].as_ref());
534 Ok(())
535 }
536 _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
537 }
538}
539
540fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
541 pipe.rpush_exists(key, value);
542}
543
544fn delete(
545 pipe: &mut Pipeline,
546 collection: &str,
547 key: &str,
548 value: Option<Vec<Bytes>>,
549) -> anyhow::Result<()> {
550 match collection {
551 INDEX => remove_index(pipe, key, value),
552 ACTORS => {
553 delete_string(pipe, key);
554 Ok(())
555 }
556 STRATEGIES => {
557 delete_string(pipe, key);
558 Ok(())
559 }
560 _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
561 }
562}
563
564fn remove_index(pipe: &mut Pipeline, key: &str, value: Option<Vec<Bytes>>) -> anyhow::Result<()> {
565 let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
566 let index_key = get_index_key(key)?;
567
568 match index_key {
569 INDEX_ORDERS_OPEN => {
570 remove_from_set(pipe, key, value[0].as_ref());
571 Ok(())
572 }
573 INDEX_ORDERS_CLOSED => {
574 remove_from_set(pipe, key, value[0].as_ref());
575 Ok(())
576 }
577 INDEX_ORDERS_EMULATED => {
578 remove_from_set(pipe, key, value[0].as_ref());
579 Ok(())
580 }
581 INDEX_ORDERS_INFLIGHT => {
582 remove_from_set(pipe, key, value[0].as_ref());
583 Ok(())
584 }
585 INDEX_POSITIONS_OPEN => {
586 remove_from_set(pipe, key, value[0].as_ref());
587 Ok(())
588 }
589 INDEX_POSITIONS_CLOSED => {
590 remove_from_set(pipe, key, value[0].as_ref());
591 Ok(())
592 }
593 _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
594 }
595}
596
597fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
598 pipe.srem(key, member);
599}
600
601fn delete_string(pipe: &mut Pipeline, key: &str) {
602 pipe.del(key);
603}
604
605fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
606 let mut key = String::new();
607
608 if config.use_trader_prefix {
609 key.push_str("trader-");
610 }
611
612 key.push_str(trader_id.as_str());
613
614 if config.use_instance_id {
615 key.push(REDIS_DELIMITER);
616 key.push_str(&format!("{instance_id}"));
617 }
618
619 key
620}
621
622fn get_collection_key(key: &str) -> anyhow::Result<&str> {
623 key.split_once(REDIS_DELIMITER)
624 .map(|(collection, _)| collection)
625 .ok_or_else(|| {
626 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
627 })
628}
629
630fn get_index_key(key: &str) -> anyhow::Result<&str> {
631 key.split_once(REDIS_DELIMITER)
632 .map(|(_, index_key)| index_key)
633 .ok_or_else(|| {
634 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
635 })
636}
637
638#[allow(dead_code)]
640fn get_encoding(config: &HashMap<String, serde_json::Value>) -> String {
641 config
642 .get("encoding")
643 .and_then(|v| v.as_str())
644 .unwrap_or("msgpack")
645 .to_string()
646}
647
648#[allow(dead_code)]
650fn deserialize_payload(
651 encoding: &str,
652 payload: &[u8],
653) -> anyhow::Result<HashMap<String, serde_json::Value>> {
654 match encoding {
655 "msgpack" => rmp_serde::from_slice(payload)
656 .map_err(|e| anyhow::anyhow!("Failed to deserialize msgpack `payload`: {e}")),
657 "json" => serde_json::from_slice(payload)
658 .map_err(|e| anyhow::anyhow!("Failed to deserialize json `payload`: {e}")),
659 _ => Err(anyhow::anyhow!("Unsupported encoding: {encoding}")),
660 }
661}
662
663#[allow(dead_code)] pub struct RedisCacheDatabaseAdapter {
665 pub encoding: SerializationEncoding,
666 database: RedisCacheDatabase,
667}
668
669#[allow(dead_code)] #[allow(unused)] impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
672 fn close(&mut self) -> anyhow::Result<()> {
673 self.database.close();
674 Ok(())
675 }
676
677 fn flush(&mut self) -> anyhow::Result<()> {
678 self.database.flushdb();
679 Ok(())
680 }
681
682 fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
683 Ok(HashMap::new()) }
686
687 fn load_currencies(&mut self) -> anyhow::Result<HashMap<Ustr, Currency>> {
688 let mut currencies = HashMap::new();
689 let pattern = format!("{CURRENCIES}*");
690
691 for key in scan_keys(&mut self.database.con, pattern)? {
692 let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
693 let currency_code = Ustr::from(parts.first().unwrap());
694 let result = self.load_currency(¤cy_code)?;
695 match result {
696 Some(currency) => {
697 currencies.insert(currency_code, currency);
698 }
699 None => {
700 log::error!("Currency not found: {currency_code}");
701 }
702 }
703 }
704 Ok(currencies)
705 }
706
707 fn load_instruments(&mut self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
708 let mut instruments = HashMap::new();
709 let pattern = format!("{INSTRUMENTS}*");
710
711 for key in scan_keys(&mut self.database.con, pattern)? {
712 let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
713 let instrument_id = InstrumentId::from_str(parts.first().unwrap())?;
714 let result = self.load_instrument(&instrument_id)?;
715 match result {
716 Some(instrument) => {
717 instruments.insert(instrument_id, instrument);
718 }
719 None => {
720 log::error!("Instrument not found: {instrument_id}");
721 }
722 }
723 }
724
725 Ok(instruments)
726 }
727
728 fn load_synthetics(&mut self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
729 let mut synthetics = HashMap::new();
730 let pattern = format!("{SYNTHETICS}*");
731
732 for key in scan_keys(&mut self.database.con, pattern)? {
733 let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
734 let instrument_id = InstrumentId::from_str(parts.first().unwrap())?;
735 let synthetic = self.load_synthetic(&instrument_id)?;
736 synthetics.insert(instrument_id, synthetic);
737 }
738
739 Ok(synthetics)
740 }
741
742 fn load_accounts(&mut self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
743 let mut accounts = HashMap::new();
744 let pattern = format!("{ACCOUNTS}*");
745
746 for key in scan_keys(&mut self.database.con, pattern)? {
747 let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
748 let account_id = AccountId::from(*parts.first().unwrap());
749 let result = self.load_account(&account_id)?;
750 match result {
751 Some(account) => {
752 accounts.insert(account_id, account);
753 }
754 None => {
755 log::error!("Account not found: {account_id}");
756 }
757 }
758 }
759
760 Ok(accounts)
761 }
762
763 fn load_orders(&mut self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
764 let mut orders = HashMap::new();
765 let pattern = format!("{ORDERS}*");
766
767 for key in scan_keys(&mut self.database.con, pattern)? {
768 let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
769 let client_order_id = ClientOrderId::from(*parts.first().unwrap());
770 let result = self.load_order(&client_order_id)?;
771 match result {
772 Some(order) => {
773 orders.insert(client_order_id, order);
774 }
775 None => {
776 log::error!("Order not found: {client_order_id}");
777 }
778 }
779 }
780 Ok(orders)
781 }
782
783 fn load_positions(&mut self) -> anyhow::Result<HashMap<PositionId, Position>> {
784 let mut positions = HashMap::new();
785 let pattern = format!("{POSITIONS}*");
786
787 for key in scan_keys(&mut self.database.con, pattern)? {
788 let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
789 let position_id = PositionId::from(*parts.first().unwrap());
790 let position = self.load_position(&position_id)?;
791 positions.insert(position_id, position);
792 }
793
794 Ok(positions)
795 }
796
797 fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
798 todo!()
799 }
800
801 fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
802 todo!()
803 }
804
805 fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
806 todo!()
807 }
808
809 fn load_instrument(
810 &self,
811 instrument_id: &InstrumentId,
812 ) -> anyhow::Result<Option<InstrumentAny>> {
813 todo!()
814 }
815
816 fn load_synthetic(&self, instrument_id: &InstrumentId) -> anyhow::Result<SyntheticInstrument> {
817 todo!()
818 }
819
820 fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
821 todo!()
822 }
823
824 fn load_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<Option<OrderAny>> {
825 todo!()
826 }
827
828 fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Position> {
829 todo!()
830 }
831
832 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
833 todo!()
834 }
835
836 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
837 todo!()
838 }
839
840 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
841 todo!()
842 }
843
844 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
845 todo!()
846 }
847
848 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
849 todo!()
850 }
851
852 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
853 todo!()
854 }
855
856 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
857 todo!()
858 }
859
860 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
861 todo!()
862 }
863
864 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
865 todo!()
866 }
867
868 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
869 todo!()
870 }
871
872 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
873 todo!()
874 }
875
876 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
877 todo!()
878 }
879
880 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
881 todo!()
882 }
883
884 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
885 anyhow::bail!("Saving market data for Redis cache adapter not supported")
886 }
887
888 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
889 anyhow::bail!("Saving market data for Redis cache adapter not supported")
890 }
891
892 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
893 anyhow::bail!("Loading quote data for Redis cache adapter not supported")
894 }
895
896 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
897 anyhow::bail!("Saving market data for Redis cache adapter not supported")
898 }
899
900 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
901 anyhow::bail!("Loading market data for Redis cache adapter not supported")
902 }
903
904 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
905 anyhow::bail!("Saving market data for Redis cache adapter not supported")
906 }
907
908 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
909 anyhow::bail!("Loading market data for Redis cache adapter not supported")
910 }
911
912 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
913 anyhow::bail!("Saving signals for Redis cache adapter not supported")
914 }
915
916 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
917 anyhow::bail!("Loading signals from Redis cache adapter not supported")
918 }
919
920 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
921 anyhow::bail!("Saving custom data for Redis cache adapter not supported")
922 }
923
924 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
925 anyhow::bail!("Loading custom data from Redis cache adapter not supported")
926 }
927
928 fn load_order_snapshot(
929 &self,
930 client_order_id: &ClientOrderId,
931 ) -> anyhow::Result<Option<OrderSnapshot>> {
932 anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
933 }
934
935 fn load_position_snapshot(
936 &self,
937 position_id: &PositionId,
938 ) -> anyhow::Result<Option<PositionSnapshot>> {
939 anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
940 }
941
942 fn index_venue_order_id(
943 &self,
944 client_order_id: ClientOrderId,
945 venue_order_id: VenueOrderId,
946 ) -> anyhow::Result<()> {
947 todo!()
948 }
949
950 fn index_order_position(
951 &self,
952 client_order_id: ClientOrderId,
953 position_id: PositionId,
954 ) -> anyhow::Result<()> {
955 todo!()
956 }
957
958 fn update_actor(&self) -> anyhow::Result<()> {
959 todo!()
960 }
961
962 fn update_strategy(&self) -> anyhow::Result<()> {
963 todo!()
964 }
965
966 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
967 todo!()
968 }
969
970 fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
971 todo!()
972 }
973
974 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
975 todo!()
976 }
977
978 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
979 todo!()
980 }
981
982 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
983 todo!()
984 }
985
986 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
987 todo!()
988 }
989}
990
991#[cfg(test)]
995mod tests {
996 use rstest::rstest;
997
998 use super::*;
999
1000 #[rstest]
1001 fn test_get_trader_key_with_prefix_and_instance_id() {
1002 let trader_id = TraderId::from("tester-123");
1003 let instance_id = UUID4::new();
1004 let mut config = CacheConfig::default();
1005 config.use_instance_id = true;
1006
1007 let key = get_trader_key(trader_id, instance_id, &config);
1008 assert!(key.starts_with("trader-tester-123:"));
1009 assert!(key.ends_with(&instance_id.to_string()));
1010 }
1011
1012 #[rstest]
1013 fn test_get_collection_key_valid() {
1014 let key = "collection:123";
1015 assert_eq!(get_collection_key(key).unwrap(), "collection");
1016 }
1017
1018 #[rstest]
1019 fn test_get_collection_key_invalid() {
1020 let key = "no_delimiter";
1021 assert!(get_collection_key(key).is_err());
1022 }
1023
1024 #[rstest]
1025 fn test_get_index_key_valid() {
1026 let key = "index:123";
1027 assert_eq!(get_index_key(key).unwrap(), "123");
1028 }
1029
1030 #[rstest]
1031 fn test_get_index_key_invalid() {
1032 let key = "no_delimiter";
1033 assert!(get_index_key(key).is_err());
1034 }
1035}