nautilus_infrastructure/python/redis/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use bytes::Bytes;
17use nautilus_common::runtime::get_runtime;
18use nautilus_core::{
19    UUID4,
20    python::{to_pyruntime_err, to_pyvalue_err},
21};
22use nautilus_model::{
23    identifiers::TraderId,
24    python::{
25        account::account_any_to_pyobject, instruments::instrument_any_to_pyobject,
26        orders::order_any_to_pyobject,
27    },
28};
29use pyo3::{
30    IntoPyObjectExt,
31    prelude::*,
32    types::{PyBytes, PyDict},
33};
34
35use crate::redis::{cache::RedisCacheDatabase, queries::DatabaseQueries};
36
37#[pymethods]
38impl RedisCacheDatabase {
39    #[new]
40    fn py_new(trader_id: TraderId, instance_id: UUID4, config_json: Vec<u8>) -> PyResult<Self> {
41        let config = serde_json::from_slice(&config_json).map_err(to_pyvalue_err)?;
42        let result = get_runtime()
43            .block_on(async { RedisCacheDatabase::new(trader_id, instance_id, config).await });
44        result.map_err(to_pyruntime_err)
45    }
46
47    #[pyo3(name = "close")]
48    fn py_close(&mut self) {
49        self.close()
50    }
51
52    #[pyo3(name = "flushdb")]
53    fn py_flushdb(&mut self) {
54        get_runtime().block_on(async { self.flushdb().await });
55    }
56
57    #[pyo3(name = "keys")]
58    fn py_keys(&mut self, pattern: &str) -> PyResult<Vec<String>> {
59        let result = get_runtime().block_on(async { self.keys(pattern).await });
60        result.map_err(to_pyruntime_err)
61    }
62
63    #[pyo3(name = "load_all")]
64    fn py_load_all(&mut self) -> PyResult<PyObject> {
65        let result = get_runtime().block_on(async {
66            DatabaseQueries::load_all(&self.con, self.get_encoding(), self.get_trader_key()).await
67        });
68        match result {
69            Ok(cache_map) => Python::with_gil(|py| {
70                let dict = PyDict::new(py);
71
72                // Load currencies
73                let currencies_dict = PyDict::new(py);
74                for (key, value) in cache_map.currencies {
75                    currencies_dict
76                        .set_item(key.to_string(), value)
77                        .map_err(to_pyvalue_err)?;
78                }
79                dict.set_item("currencies", currencies_dict)
80                    .map_err(to_pyvalue_err)?;
81
82                // Load instruments
83                let instruments_dict = PyDict::new(py);
84                for (key, value) in cache_map.instruments {
85                    let py_object = instrument_any_to_pyobject(py, value)?;
86                    instruments_dict
87                        .set_item(key, py_object)
88                        .map_err(to_pyvalue_err)?;
89                }
90                dict.set_item("instruments", instruments_dict)
91                    .map_err(to_pyvalue_err)?;
92
93                // Load synthetics
94                let synthetics_dict = PyDict::new(py);
95                for (key, value) in cache_map.synthetics {
96                    synthetics_dict
97                        .set_item(key, value)
98                        .map_err(to_pyvalue_err)?;
99                }
100                dict.set_item("synthetics", synthetics_dict)
101                    .map_err(to_pyvalue_err)?;
102
103                // Load accounts
104                let accounts_dict = PyDict::new(py);
105                for (key, value) in cache_map.accounts {
106                    let py_object = account_any_to_pyobject(py, value)?;
107                    accounts_dict
108                        .set_item(key, py_object)
109                        .map_err(to_pyvalue_err)?;
110                }
111                dict.set_item("accounts", accounts_dict)
112                    .map_err(to_pyvalue_err)?;
113
114                // Load orders
115                let orders_dict = PyDict::new(py);
116                for (key, value) in cache_map.orders {
117                    let py_object = order_any_to_pyobject(py, value)?;
118                    orders_dict
119                        .set_item(key, py_object)
120                        .map_err(to_pyvalue_err)?;
121                }
122                dict.set_item("orders", orders_dict)
123                    .map_err(to_pyvalue_err)?;
124
125                // Load positions
126                let positions_dict = PyDict::new(py);
127                for (key, value) in cache_map.positions {
128                    positions_dict
129                        .set_item(key, value)
130                        .map_err(to_pyvalue_err)?;
131                }
132                dict.set_item("positions", positions_dict)
133                    .map_err(to_pyvalue_err)?;
134
135                dict.into_py_any(py)
136            }),
137            Err(e) => Err(to_pyruntime_err(e)),
138        }
139    }
140
141    #[pyo3(name = "read")]
142    fn py_read(&mut self, py: Python, key: &str) -> PyResult<Vec<PyObject>> {
143        let result = get_runtime().block_on(async { self.read(key).await });
144        match result {
145            Ok(result) => {
146                let vec_py_bytes = result
147                    .into_iter()
148                    .map(|r| PyBytes::new(py, r.as_ref()).into())
149                    .collect::<Vec<PyObject>>();
150                Ok(vec_py_bytes)
151            }
152            Err(e) => Err(to_pyruntime_err(e)),
153        }
154    }
155
156    #[pyo3(name = "insert")]
157    fn py_insert(&mut self, key: String, payload: Vec<Vec<u8>>) -> PyResult<()> {
158        let payload: Vec<Bytes> = payload.into_iter().map(Bytes::from).collect();
159        self.insert(key, Some(payload)).map_err(to_pyvalue_err)
160    }
161
162    #[pyo3(name = "update")]
163    fn py_update(&mut self, key: String, payload: Vec<Vec<u8>>) -> PyResult<()> {
164        let payload: Vec<Bytes> = payload.into_iter().map(Bytes::from).collect();
165        self.update(key, Some(payload)).map_err(to_pyvalue_err)
166    }
167
168    #[pyo3(name = "delete")]
169    #[pyo3(signature = (key, payload=None))]
170    fn py_delete(&mut self, key: String, payload: Option<Vec<Vec<u8>>>) -> PyResult<()> {
171        let payload: Option<Vec<Bytes>> =
172            payload.map(|vec| vec.into_iter().map(Bytes::from).collect());
173        self.delete(key, payload).map_err(to_pyvalue_err)
174    }
175}