nautilus_infrastructure/python/redis/
cache.rs
1use bytes::Bytes;
17use nautilus_common::runtime::get_runtime;
18use nautilus_core::{
19 UUID4,
20 python::{to_pyruntime_err, to_pyvalue_err},
21};
22use nautilus_model::{
23 identifiers::TraderId,
24 python::{
25 account::account_any_to_pyobject, instruments::instrument_any_to_pyobject,
26 orders::order_any_to_pyobject,
27 },
28};
29use pyo3::{
30 IntoPyObjectExt,
31 prelude::*,
32 types::{PyBytes, PyDict},
33};
34
35use crate::redis::{cache::RedisCacheDatabase, queries::DatabaseQueries};
36
37#[pymethods]
38impl RedisCacheDatabase {
39 #[new]
40 fn py_new(trader_id: TraderId, instance_id: UUID4, config_json: Vec<u8>) -> PyResult<Self> {
41 let config = serde_json::from_slice(&config_json).map_err(to_pyvalue_err)?;
42 let result = get_runtime()
43 .block_on(async { RedisCacheDatabase::new(trader_id, instance_id, config).await });
44 result.map_err(to_pyruntime_err)
45 }
46
47 #[pyo3(name = "close")]
48 fn py_close(&mut self) {
49 self.close()
50 }
51
52 #[pyo3(name = "flushdb")]
53 fn py_flushdb(&mut self) {
54 get_runtime().block_on(async { self.flushdb().await });
55 }
56
57 #[pyo3(name = "keys")]
58 fn py_keys(&mut self, pattern: &str) -> PyResult<Vec<String>> {
59 let result = get_runtime().block_on(async { self.keys(pattern).await });
60 result.map_err(to_pyruntime_err)
61 }
62
63 #[pyo3(name = "load_all")]
64 fn py_load_all(&mut self) -> PyResult<PyObject> {
65 let result = get_runtime().block_on(async {
66 DatabaseQueries::load_all(&self.con, self.get_encoding(), self.get_trader_key()).await
67 });
68 match result {
69 Ok(cache_map) => Python::with_gil(|py| {
70 let dict = PyDict::new(py);
71
72 let currencies_dict = PyDict::new(py);
74 for (key, value) in cache_map.currencies {
75 currencies_dict
76 .set_item(key.to_string(), value)
77 .map_err(to_pyvalue_err)?;
78 }
79 dict.set_item("currencies", currencies_dict)
80 .map_err(to_pyvalue_err)?;
81
82 let instruments_dict = PyDict::new(py);
84 for (key, value) in cache_map.instruments {
85 let py_object = instrument_any_to_pyobject(py, value)?;
86 instruments_dict
87 .set_item(key, py_object)
88 .map_err(to_pyvalue_err)?;
89 }
90 dict.set_item("instruments", instruments_dict)
91 .map_err(to_pyvalue_err)?;
92
93 let synthetics_dict = PyDict::new(py);
95 for (key, value) in cache_map.synthetics {
96 synthetics_dict
97 .set_item(key, value)
98 .map_err(to_pyvalue_err)?;
99 }
100 dict.set_item("synthetics", synthetics_dict)
101 .map_err(to_pyvalue_err)?;
102
103 let accounts_dict = PyDict::new(py);
105 for (key, value) in cache_map.accounts {
106 let py_object = account_any_to_pyobject(py, value)?;
107 accounts_dict
108 .set_item(key, py_object)
109 .map_err(to_pyvalue_err)?;
110 }
111 dict.set_item("accounts", accounts_dict)
112 .map_err(to_pyvalue_err)?;
113
114 let orders_dict = PyDict::new(py);
116 for (key, value) in cache_map.orders {
117 let py_object = order_any_to_pyobject(py, value)?;
118 orders_dict
119 .set_item(key, py_object)
120 .map_err(to_pyvalue_err)?;
121 }
122 dict.set_item("orders", orders_dict)
123 .map_err(to_pyvalue_err)?;
124
125 let positions_dict = PyDict::new(py);
127 for (key, value) in cache_map.positions {
128 positions_dict
129 .set_item(key, value)
130 .map_err(to_pyvalue_err)?;
131 }
132 dict.set_item("positions", positions_dict)
133 .map_err(to_pyvalue_err)?;
134
135 dict.into_py_any(py)
136 }),
137 Err(e) => Err(to_pyruntime_err(e)),
138 }
139 }
140
141 #[pyo3(name = "read")]
142 fn py_read(&mut self, py: Python, key: &str) -> PyResult<Vec<PyObject>> {
143 let result = get_runtime().block_on(async { self.read(key).await });
144 match result {
145 Ok(result) => {
146 let vec_py_bytes = result
147 .into_iter()
148 .map(|r| PyBytes::new(py, r.as_ref()).into())
149 .collect::<Vec<PyObject>>();
150 Ok(vec_py_bytes)
151 }
152 Err(e) => Err(to_pyruntime_err(e)),
153 }
154 }
155
156 #[pyo3(name = "insert")]
157 fn py_insert(&mut self, key: String, payload: Vec<Vec<u8>>) -> PyResult<()> {
158 let payload: Vec<Bytes> = payload.into_iter().map(Bytes::from).collect();
159 self.insert(key, Some(payload)).map_err(to_pyvalue_err)
160 }
161
162 #[pyo3(name = "update")]
163 fn py_update(&mut self, key: String, payload: Vec<Vec<u8>>) -> PyResult<()> {
164 let payload: Vec<Bytes> = payload.into_iter().map(Bytes::from).collect();
165 self.update(key, Some(payload)).map_err(to_pyvalue_err)
166 }
167
168 #[pyo3(name = "delete")]
169 #[pyo3(signature = (key, payload=None))]
170 fn py_delete(&mut self, key: String, payload: Option<Vec<Vec<u8>>>) -> PyResult<()> {
171 let payload: Option<Vec<Bytes>> =
172 payload.map(|vec| vec.into_iter().map(Bytes::from).collect());
173 self.delete(key, payload).map_err(to_pyvalue_err)
174 }
175}