nautilus_infrastructure/redis/
mod.rs1pub mod cache;
19pub mod msgbus;
20pub mod queries;
21
22use std::time::Duration;
23
24use nautilus_common::{
25 logging::log_task_awaiting,
26 msgbus::database::{DatabaseConfig, MessageBusConfig},
27};
28use nautilus_core::UUID4;
29use nautilus_model::identifiers::TraderId;
30use redis::RedisError;
31use semver::Version;
32
33const REDIS_MIN_VERSION: &str = "6.2.0";
34const REDIS_DELIMITER: char = ':';
35const REDIS_INDEX_PATTERN: &str = ":index:";
36const REDIS_XTRIM: &str = "XTRIM";
37const REDIS_MINID: &str = "MINID";
38const REDIS_FLUSHDB: &str = "FLUSHDB";
39
40pub(crate) fn get_index_key(key: &str) -> anyhow::Result<&str> {
45 if let Some(pos) = key.find(REDIS_INDEX_PATTERN) {
46 return Ok(&key[pos + 1..]);
47 }
48
49 if key.starts_with("index:") {
50 return Ok(key);
51 }
52
53 anyhow::bail!("Invalid index key format: {key}")
54}
55
56async fn await_handle(handle: Option<tokio::task::JoinHandle<()>>, task_name: &str) {
57 if let Some(handle) = handle {
58 log_task_awaiting(task_name);
59
60 let timeout = Duration::from_secs(2);
61 match tokio::time::timeout(timeout, handle).await {
62 Ok(result) => {
63 if let Err(e) = result {
64 log::error!("Error awaiting task '{task_name}': {e:?}");
65 }
66 }
67 Err(_) => {
68 log::error!("Timeout {timeout:?} awaiting task '{task_name}'");
69 }
70 }
71 }
72}
73
74#[must_use]
90pub fn get_redis_url(config: DatabaseConfig) -> (String, String) {
91 let host = config.host.unwrap_or("127.0.0.1".to_string());
92 let port = config.port.unwrap_or(6379);
93 let username = config.username.unwrap_or_default();
94 let password = config.password.unwrap_or_default();
95 let ssl = config.ssl;
96
97 let redact_pw = |pw: &str| {
99 if pw.len() > 4 {
100 format!("{}...{}", &pw[..2], &pw[pw.len() - 2..])
101 } else {
102 pw.to_owned()
103 }
104 };
105
106 let (auth, auth_redacted) = match (username.is_empty(), password.is_empty()) {
108 (false, false) => (
110 format!("{username}:{password}@"),
111 format!("{username}:{}@", redact_pw(&password)),
112 ),
113 (true, false) => (
115 format!(":{password}@"),
116 format!(":{}@", redact_pw(&password)),
117 ),
118 (false, true) => panic!(
120 "Redis config error: username supplied without password. \
121 Either supply a password or omit the username."
122 ),
123 (true, true) => (String::new(), String::new()),
125 };
126
127 let scheme = if ssl { "rediss" } else { "redis" };
128
129 let url = format!("{scheme}://{auth}{host}:{port}");
130 let redacted_url = format!("{scheme}://{auth_redacted}{host}:{port}");
131
132 (url, redacted_url)
133}
134
135pub async fn create_redis_connection(
150 con_name: &str,
151 config: DatabaseConfig,
152) -> anyhow::Result<redis::aio::ConnectionManager> {
153 tracing::debug!("Creating {con_name} redis connection");
154 let (redis_url, redacted_url) = get_redis_url(config.clone());
155 tracing::debug!("Connecting to {redacted_url}");
156
157 let connection_timeout = Duration::from_secs(u64::from(config.connection_timeout));
158 let response_timeout = Duration::from_secs(u64::from(config.response_timeout));
159 let number_of_retries = config.number_of_retries;
160 let exponent_base = config.exponent_base as f32;
161
162 let min_delay = Duration::from_millis(config.factor);
164 let max_delay = Duration::from_secs(config.max_delay);
165
166 let client = redis::Client::open(redis_url)?;
167
168 let connection_manager_config = redis::aio::ConnectionManagerConfig::new()
169 .set_exponent_base(exponent_base)
170 .set_number_of_retries(number_of_retries)
171 .set_response_timeout(Some(response_timeout))
172 .set_connection_timeout(Some(connection_timeout))
173 .set_min_delay(min_delay)
174 .set_max_delay(max_delay);
175
176 let mut con = client
177 .get_connection_manager_with_config(connection_manager_config)
178 .await?;
179
180 let version = get_redis_version(&mut con).await?;
181 let min_version = Version::parse(REDIS_MIN_VERSION)?;
182 let con_msg = format!("Connected to redis v{version}");
183
184 if version >= min_version {
185 tracing::info!(con_msg);
186 } else {
187 log::error!("{con_msg}, but minimum supported version is {REDIS_MIN_VERSION}");
190 }
191
192 Ok(con)
193}
194
195pub async fn flush_redis(
201 con: &mut redis::aio::ConnectionManager,
202) -> anyhow::Result<(), RedisError> {
203 redis::cmd(REDIS_FLUSHDB).exec_async(con).await
204}
205
206#[must_use]
208pub fn get_stream_key(
209 trader_id: TraderId,
210 instance_id: UUID4,
211 config: &MessageBusConfig,
212) -> String {
213 let mut stream_key = String::new();
214
215 if config.use_trader_prefix {
216 stream_key.push_str("trader-");
217 }
218
219 if config.use_trader_id {
220 stream_key.push_str(trader_id.as_str());
221 stream_key.push(REDIS_DELIMITER);
222 }
223
224 if config.use_instance_id {
225 stream_key.push_str(&format!("{instance_id}"));
226 stream_key.push(REDIS_DELIMITER);
227 }
228
229 stream_key.push_str(&config.streams_prefix);
230 stream_key
231}
232
233pub async fn get_redis_version(
239 conn: &mut redis::aio::ConnectionManager,
240) -> anyhow::Result<Version> {
241 let info: String = redis::cmd("INFO").query_async(conn).await?;
242 let version_str = match info.lines().find_map(|line| {
243 if line.starts_with("redis_version:") {
244 line.split(':').nth(1).map(|s| s.trim().to_string())
245 } else {
246 None
247 }
248 }) {
249 Some(info) => info,
250 None => {
251 anyhow::bail!("Redis version not available");
252 }
253 };
254
255 parse_redis_version(&version_str)
256}
257
258fn parse_redis_version(version_str: &str) -> anyhow::Result<Version> {
259 let mut components = version_str.split('.').map(str::parse::<u64>);
260
261 let major = components.next().unwrap_or(Ok(0))?;
262 let minor = components.next().unwrap_or(Ok(0))?;
263 let patch = components.next().unwrap_or(Ok(0))?;
264
265 Ok(Version::new(major, minor, patch))
266}
267
268#[cfg(test)]
269mod tests {
270 use rstest::rstest;
271 use serde_json::json;
272
273 use super::*;
274
275 #[rstest]
276 fn test_get_redis_url_default_values() {
277 let config: DatabaseConfig = serde_json::from_value(json!({})).unwrap();
278 let (url, redacted_url) = get_redis_url(config);
279 assert_eq!(url, "redis://127.0.0.1:6379");
280 assert_eq!(redacted_url, "redis://127.0.0.1:6379");
281 }
282
283 #[rstest]
284 fn test_get_redis_url_password_only() {
285 let config_json = json!({
287 "host": "example.com",
288 "port": 6380,
289 "password": "secretpw", });
291 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
292 let (url, redacted_url) = get_redis_url(config);
293 assert_eq!(url, "redis://:secretpw@example.com:6380");
294 assert_eq!(redacted_url, "redis://:se...pw@example.com:6380");
295 }
296
297 #[rstest]
298 fn test_get_redis_url_full_config_with_ssl() {
299 let config_json = json!({
300 "host": "example.com",
301 "port": 6380,
302 "username": "user",
303 "password": "pass",
304 "ssl": true,
305 });
306 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
307 let (url, redacted_url) = get_redis_url(config);
308 assert_eq!(url, "rediss://user:pass@example.com:6380");
309 assert_eq!(redacted_url, "rediss://user:pass@example.com:6380");
310 }
311
312 #[rstest]
313 fn test_get_redis_url_full_config_without_ssl() {
314 let config_json = json!({
315 "host": "example.com",
316 "port": 6380,
317 "username": "username",
318 "password": "password",
319 "ssl": false,
320 });
321 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
322 let (url, redacted_url) = get_redis_url(config);
323 assert_eq!(url, "redis://username:password@example.com:6380");
324 assert_eq!(redacted_url, "redis://username:pa...rd@example.com:6380");
325 }
326
327 #[rstest]
328 fn test_get_redis_url_missing_username_and_password() {
329 let config_json = json!({
330 "host": "example.com",
331 "port": 6380,
332 "ssl": false,
333 });
334 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
335 let (url, redacted_url) = get_redis_url(config);
336 assert_eq!(url, "redis://example.com:6380");
337 assert_eq!(redacted_url, "redis://example.com:6380");
338 }
339
340 #[rstest]
341 fn test_get_redis_url_ssl_default_false() {
342 let config_json = json!({
343 "host": "example.com",
344 "port": 6380,
345 "username": "username",
346 "password": "password",
347 });
349 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
350 let (url, redacted_url) = get_redis_url(config);
351 assert_eq!(url, "redis://username:password@example.com:6380");
352 assert_eq!(redacted_url, "redis://username:pa...rd@example.com:6380");
353 }
354
355 #[rstest]
356 fn test_get_stream_key_with_trader_prefix_and_instance_id() {
357 let trader_id = TraderId::from("tester-123");
358 let instance_id = UUID4::new();
359 let config = MessageBusConfig {
360 use_instance_id: true,
361 ..Default::default()
362 };
363
364 let key = get_stream_key(trader_id, instance_id, &config);
365 assert_eq!(key, format!("trader-tester-123:{instance_id}:stream"));
366 }
367
368 #[rstest]
369 fn test_get_stream_key_without_trader_prefix_or_instance_id() {
370 let trader_id = TraderId::from("tester-123");
371 let instance_id = UUID4::new();
372 let config = MessageBusConfig {
373 use_trader_prefix: false,
374 use_trader_id: false,
375 ..Default::default()
376 };
377
378 let key = get_stream_key(trader_id, instance_id, &config);
379 assert_eq!(key, format!("stream"));
380 }
381
382 #[rstest]
383 fn test_get_index_key_without_prefix() {
384 let key = "index:order_position";
385 assert_eq!(get_index_key(key).unwrap(), "index:order_position");
386 }
387
388 #[rstest]
389 fn test_get_index_key_with_trader_prefix() {
390 let key = "trader-tester-123:index:order_position";
391 assert_eq!(get_index_key(key).unwrap(), "index:order_position");
392 }
393
394 #[rstest]
395 fn test_get_index_key_with_instance_id() {
396 let key = "trader-tester-123:abc-uuid-123:index:order_position";
397 assert_eq!(get_index_key(key).unwrap(), "index:order_position");
398 }
399
400 #[rstest]
401 fn test_get_index_key_invalid() {
402 let key = "no_index_pattern";
403 assert!(get_index_key(key).is_err());
404 }
405}