nautilus_infrastructure/redis/
mod.rs1pub mod cache;
19pub mod msgbus;
20pub mod queries;
21
22use std::time::Duration;
23
24use nautilus_common::{
25 logging::log_task_awaiting,
26 msgbus::database::{DatabaseConfig, MessageBusConfig},
27};
28use nautilus_core::UUID4;
29use nautilus_model::identifiers::TraderId;
30use redis::RedisError;
31use semver::Version;
32
33const REDIS_MIN_VERSION: &str = "6.2.0";
34const REDIS_DELIMITER: char = ':';
35const REDIS_XTRIM: &str = "XTRIM";
36const REDIS_MINID: &str = "MINID";
37const REDIS_FLUSHDB: &str = "FLUSHDB";
38
39async fn await_handle(handle: Option<tokio::task::JoinHandle<()>>, task_name: &str) {
40 if let Some(handle) = handle {
41 log_task_awaiting(task_name);
42
43 let timeout = Duration::from_secs(2);
44 match tokio::time::timeout(timeout, handle).await {
45 Ok(result) => {
46 if let Err(e) = result {
47 log::error!("Error awaiting task '{task_name}': {e:?}");
48 }
49 }
50 Err(_) => {
51 log::error!("Timeout {timeout:?} awaiting task '{task_name}'");
52 }
53 }
54 }
55}
56
57#[must_use]
73pub fn get_redis_url(config: DatabaseConfig) -> (String, String) {
74 let host = config.host.unwrap_or("127.0.0.1".to_string());
75 let port = config.port.unwrap_or(6379);
76 let username = config.username.unwrap_or_default();
77 let password = config.password.unwrap_or_default();
78 let ssl = config.ssl;
79
80 let redact_pw = |pw: &str| {
82 if pw.len() > 4 {
83 format!("{}...{}", &pw[..2], &pw[pw.len() - 2..])
84 } else {
85 pw.to_owned()
86 }
87 };
88
89 let (auth, auth_redacted) = match (username.is_empty(), password.is_empty()) {
91 (false, false) => (
93 format!("{username}:{password}@"),
94 format!("{username}:{}@", redact_pw(&password)),
95 ),
96 (true, false) => (
98 format!(":{password}@"),
99 format!(":{}@", redact_pw(&password)),
100 ),
101 (false, true) => panic!(
103 "Redis config error: username supplied without password. \
104 Either supply a password or omit the username."
105 ),
106 (true, true) => (String::new(), String::new()),
108 };
109
110 let scheme = if ssl { "rediss" } else { "redis" };
111
112 let url = format!("{scheme}://{auth}{host}:{port}");
113 let redacted_url = format!("{scheme}://{auth_redacted}{host}:{port}");
114
115 (url, redacted_url)
116}
117pub async fn create_redis_connection(
132 con_name: &str,
133 config: DatabaseConfig,
134) -> anyhow::Result<redis::aio::ConnectionManager> {
135 tracing::debug!("Creating {con_name} redis connection");
136 let (redis_url, redacted_url) = get_redis_url(config.clone());
137 tracing::debug!("Connecting to {redacted_url}");
138
139 let connection_timeout = Duration::from_secs(u64::from(config.connection_timeout));
140 let response_timeout = Duration::from_secs(u64::from(config.response_timeout));
141 let number_of_retries = config.number_of_retries;
142 let exponent_base = config.exponent_base as f32;
143
144 let min_delay = Duration::from_millis(config.factor);
146 let max_delay = Duration::from_secs(config.max_delay);
147
148 let client = redis::Client::open(redis_url)?;
149
150 let connection_manager_config = redis::aio::ConnectionManagerConfig::new()
151 .set_exponent_base(exponent_base)
152 .set_number_of_retries(number_of_retries)
153 .set_response_timeout(Some(response_timeout))
154 .set_connection_timeout(Some(connection_timeout))
155 .set_min_delay(min_delay)
156 .set_max_delay(max_delay);
157
158 let mut con = client
159 .get_connection_manager_with_config(connection_manager_config)
160 .await?;
161
162 let version = get_redis_version(&mut con).await?;
163 let min_version = Version::parse(REDIS_MIN_VERSION)?;
164 let con_msg = format!("Connected to redis v{version}");
165
166 if version >= min_version {
167 tracing::info!(con_msg);
168 } else {
169 log::error!("{con_msg}, but minimum supported version is {REDIS_MIN_VERSION}");
172 }
173
174 Ok(con)
175}
176
177pub async fn flush_redis(
183 con: &mut redis::aio::ConnectionManager,
184) -> anyhow::Result<(), RedisError> {
185 redis::cmd(REDIS_FLUSHDB).exec_async(con).await
186}
187
188#[must_use]
190pub fn get_stream_key(
191 trader_id: TraderId,
192 instance_id: UUID4,
193 config: &MessageBusConfig,
194) -> String {
195 let mut stream_key = String::new();
196
197 if config.use_trader_prefix {
198 stream_key.push_str("trader-");
199 }
200
201 if config.use_trader_id {
202 stream_key.push_str(trader_id.as_str());
203 stream_key.push(REDIS_DELIMITER);
204 }
205
206 if config.use_instance_id {
207 stream_key.push_str(&format!("{instance_id}"));
208 stream_key.push(REDIS_DELIMITER);
209 }
210
211 stream_key.push_str(&config.streams_prefix);
212 stream_key
213}
214
215pub async fn get_redis_version(
221 conn: &mut redis::aio::ConnectionManager,
222) -> anyhow::Result<Version> {
223 let info: String = redis::cmd("INFO").query_async(conn).await?;
224 let version_str = match info.lines().find_map(|line| {
225 if line.starts_with("redis_version:") {
226 line.split(':').nth(1).map(|s| s.trim().to_string())
227 } else {
228 None
229 }
230 }) {
231 Some(info) => info,
232 None => {
233 anyhow::bail!("Redis version not available");
234 }
235 };
236
237 parse_redis_version(&version_str)
238}
239
240fn parse_redis_version(version_str: &str) -> anyhow::Result<Version> {
241 let mut components = version_str.split('.').map(str::parse::<u64>);
242
243 let major = components.next().unwrap_or(Ok(0))?;
244 let minor = components.next().unwrap_or(Ok(0))?;
245 let patch = components.next().unwrap_or(Ok(0))?;
246
247 Ok(Version::new(major, minor, patch))
248}
249
250#[cfg(test)]
251mod tests {
252 use rstest::rstest;
253 use serde_json::json;
254
255 use super::*;
256
257 #[rstest]
258 fn test_get_redis_url_default_values() {
259 let config: DatabaseConfig = serde_json::from_value(json!({})).unwrap();
260 let (url, redacted_url) = get_redis_url(config);
261 assert_eq!(url, "redis://127.0.0.1:6379");
262 assert_eq!(redacted_url, "redis://127.0.0.1:6379");
263 }
264
265 #[rstest]
266 fn test_get_redis_url_password_only() {
267 let config_json = json!({
269 "host": "example.com",
270 "port": 6380,
271 "password": "secretpw", });
273 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
274 let (url, redacted_url) = get_redis_url(config);
275 assert_eq!(url, "redis://:secretpw@example.com:6380");
276 assert_eq!(redacted_url, "redis://:se...pw@example.com:6380");
277 }
278
279 #[rstest]
280 fn test_get_redis_url_full_config_with_ssl() {
281 let config_json = json!({
282 "host": "example.com",
283 "port": 6380,
284 "username": "user",
285 "password": "pass",
286 "ssl": true,
287 });
288 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
289 let (url, redacted_url) = get_redis_url(config);
290 assert_eq!(url, "rediss://user:pass@example.com:6380");
291 assert_eq!(redacted_url, "rediss://user:pass@example.com:6380");
292 }
293
294 #[rstest]
295 fn test_get_redis_url_full_config_without_ssl() {
296 let config_json = json!({
297 "host": "example.com",
298 "port": 6380,
299 "username": "username",
300 "password": "password",
301 "ssl": false,
302 });
303 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
304 let (url, redacted_url) = get_redis_url(config);
305 assert_eq!(url, "redis://username:password@example.com:6380");
306 assert_eq!(redacted_url, "redis://username:pa...rd@example.com:6380");
307 }
308
309 #[rstest]
310 fn test_get_redis_url_missing_username_and_password() {
311 let config_json = json!({
312 "host": "example.com",
313 "port": 6380,
314 "ssl": false,
315 });
316 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
317 let (url, redacted_url) = get_redis_url(config);
318 assert_eq!(url, "redis://example.com:6380");
319 assert_eq!(redacted_url, "redis://example.com:6380");
320 }
321
322 #[rstest]
323 fn test_get_redis_url_ssl_default_false() {
324 let config_json = json!({
325 "host": "example.com",
326 "port": 6380,
327 "username": "username",
328 "password": "password",
329 });
331 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
332 let (url, redacted_url) = get_redis_url(config);
333 assert_eq!(url, "redis://username:password@example.com:6380");
334 assert_eq!(redacted_url, "redis://username:pa...rd@example.com:6380");
335 }
336
337 #[rstest]
338 fn test_get_stream_key_with_trader_prefix_and_instance_id() {
339 let trader_id = TraderId::from("tester-123");
340 let instance_id = UUID4::new();
341 let config = MessageBusConfig {
342 use_instance_id: true,
343 ..Default::default()
344 };
345
346 let key = get_stream_key(trader_id, instance_id, &config);
347 assert_eq!(key, format!("trader-tester-123:{instance_id}:stream"));
348 }
349
350 #[rstest]
351 fn test_get_stream_key_without_trader_prefix_or_instance_id() {
352 let trader_id = TraderId::from("tester-123");
353 let instance_id = UUID4::new();
354 let config = MessageBusConfig {
355 use_trader_prefix: false,
356 use_trader_id: false,
357 ..Default::default()
358 };
359
360 let key = get_stream_key(trader_id, instance_id, &config);
361 assert_eq!(key, format!("stream"));
362 }
363}