nautilus_infrastructure/redis/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a Redis backed `CacheDatabase` and `MessageBusDatabase` implementation.
17
18pub mod cache;
19pub mod msgbus;
20pub mod queries;
21
22use std::time::Duration;
23
24use nautilus_common::{
25    logging::log_task_awaiting,
26    msgbus::database::{DatabaseConfig, MessageBusConfig},
27};
28use nautilus_core::UUID4;
29use nautilus_model::identifiers::TraderId;
30use redis::RedisError;
31use semver::Version;
32
33const REDIS_MIN_VERSION: &str = "6.2.0";
34const REDIS_DELIMITER: char = ':';
35const REDIS_INDEX_PATTERN: &str = ":index:";
36const REDIS_XTRIM: &str = "XTRIM";
37const REDIS_MINID: &str = "MINID";
38const REDIS_FLUSHDB: &str = "FLUSHDB";
39
40/// Extracts the index key from a full Redis key.
41///
42/// Handles keys with instance_id prefix by finding the `:index:` pattern.
43/// e.g., "trader-id:uuid:index:order_position" -> "index:order_position"
44pub(crate) fn get_index_key(key: &str) -> anyhow::Result<&str> {
45    if let Some(pos) = key.find(REDIS_INDEX_PATTERN) {
46        return Ok(&key[pos + 1..]);
47    }
48
49    if key.starts_with("index:") {
50        return Ok(key);
51    }
52
53    anyhow::bail!("Invalid index key format: {key}")
54}
55
56async fn await_handle(handle: Option<tokio::task::JoinHandle<()>>, task_name: &str) {
57    if let Some(handle) = handle {
58        log_task_awaiting(task_name);
59
60        let timeout = Duration::from_secs(2);
61        match tokio::time::timeout(timeout, handle).await {
62            Ok(result) => {
63                if let Err(e) = result {
64                    log::error!("Error awaiting task '{task_name}': {e:?}");
65                }
66            }
67            Err(_) => {
68                log::error!("Timeout {timeout:?} awaiting task '{task_name}'");
69            }
70        }
71    }
72}
73
74/// Parses a Redis connection URL from the given database config, returning the
75/// full URL and a redacted version with the password obfuscated.
76///
77/// Authentication matrix handled:
78/// ┌───────────┬───────────┬────────────────────────────┐
79/// │ Username  │ Password  │ Resulting user-info part   │
80/// ├───────────┼───────────┼────────────────────────────┤
81/// │ non-empty │ non-empty │ user:pass@                 │
82/// │ empty     │ non-empty │ :pass@                     │
83/// │ empty     │ empty     │ (omitted)                  │
84/// └───────────┴───────────┴────────────────────────────┘
85///
86/// # Panics
87///
88/// Panics if a username is provided without a corresponding password.
89#[must_use]
90pub fn get_redis_url(config: DatabaseConfig) -> (String, String) {
91    let host = config.host.unwrap_or("127.0.0.1".to_string());
92    let port = config.port.unwrap_or(6379);
93    let username = config.username.unwrap_or_default();
94    let password = config.password.unwrap_or_default();
95    let ssl = config.ssl;
96
97    // Redact the password for logging/metrics: keep the first & last two chars.
98    let redact_pw = |pw: &str| {
99        if pw.len() > 4 {
100            format!("{}...{}", &pw[..2], &pw[pw.len() - 2..])
101        } else {
102            pw.to_owned()
103        }
104    };
105
106    // Build the `userinfo@` portion for both the real and redacted URLs.
107    let (auth, auth_redacted) = match (username.is_empty(), password.is_empty()) {
108        // user:pass@
109        (false, false) => (
110            format!("{username}:{password}@"),
111            format!("{username}:{}@", redact_pw(&password)),
112        ),
113        // :pass@
114        (true, false) => (
115            format!(":{password}@"),
116            format!(":{}@", redact_pw(&password)),
117        ),
118        // username but no password ⇒  configuration error
119        (false, true) => panic!(
120            "Redis config error: username supplied without password. \
121            Either supply a password or omit the username."
122        ),
123        // no credentials
124        (true, true) => (String::new(), String::new()),
125    };
126
127    let scheme = if ssl { "rediss" } else { "redis" };
128
129    let url = format!("{scheme}://{auth}{host}:{port}");
130    let redacted_url = format!("{scheme}://{auth_redacted}{host}:{port}");
131
132    (url, redacted_url)
133}
134
135/// Creates a new Redis connection manager based on the provided database `config` and connection name.
136///
137/// # Errors
138///
139/// Returns an error if:
140/// - Constructing the Redis client fails.
141/// - Establishing or configuring the connection manager fails.
142///
143/// In case of reconnection issues, the connection will retry reconnection
144/// `number_of_retries` times, with an exponentially increasing delay, calculated as
145/// `factor * (exponent_base ^ current-try)`, bounded by `max_delay`.
146///
147/// The new connection will time out operations after `response_timeout` has passed.
148/// Each connection attempt to the server will time out after `connection_timeout`.
149pub async fn create_redis_connection(
150    con_name: &str,
151    config: DatabaseConfig,
152) -> anyhow::Result<redis::aio::ConnectionManager> {
153    log::debug!("Creating {con_name} redis connection");
154    let (redis_url, redacted_url) = get_redis_url(config.clone());
155    log::debug!("Connecting to {redacted_url}");
156
157    let connection_timeout = Duration::from_secs(u64::from(config.connection_timeout));
158    let response_timeout = Duration::from_secs(u64::from(config.response_timeout));
159    let number_of_retries = config.number_of_retries;
160    let exponent_base = config.exponent_base as f32;
161
162    // Use factor as min_delay base for backoff: factor * (exponent_base ^ tries)
163    let min_delay = Duration::from_millis(config.factor);
164    let max_delay = Duration::from_secs(config.max_delay);
165
166    let client = redis::Client::open(redis_url)?;
167
168    let connection_manager_config = redis::aio::ConnectionManagerConfig::new()
169        .set_exponent_base(exponent_base)
170        .set_number_of_retries(number_of_retries)
171        .set_response_timeout(Some(response_timeout))
172        .set_connection_timeout(Some(connection_timeout))
173        .set_min_delay(min_delay)
174        .set_max_delay(max_delay);
175
176    let mut con = client
177        .get_connection_manager_with_config(connection_manager_config)
178        .await?;
179
180    let version = get_redis_version(&mut con).await?;
181    let min_version = Version::parse(REDIS_MIN_VERSION)?;
182    let con_msg = format!("Connected to redis v{version}");
183
184    if version >= min_version {
185        log::info!("{con_msg}");
186    } else {
187        log::error!("{con_msg}, but minimum supported version is {REDIS_MIN_VERSION}");
188    }
189
190    Ok(con)
191}
192
193/// Flushes the entire Redis database for the specified connection.
194///
195/// # Errors
196///
197/// Returns an error if the FLUSHDB command fails.
198pub async fn flush_redis(
199    con: &mut redis::aio::ConnectionManager,
200) -> anyhow::Result<(), RedisError> {
201    redis::cmd(REDIS_FLUSHDB).exec_async(con).await
202}
203
204/// Parse the stream key from the given identifiers and config.
205#[must_use]
206pub fn get_stream_key(
207    trader_id: TraderId,
208    instance_id: UUID4,
209    config: &MessageBusConfig,
210) -> String {
211    let mut stream_key = String::new();
212
213    if config.use_trader_prefix {
214        stream_key.push_str("trader-");
215    }
216
217    if config.use_trader_id {
218        stream_key.push_str(trader_id.as_str());
219        stream_key.push(REDIS_DELIMITER);
220    }
221
222    if config.use_instance_id {
223        stream_key.push_str(&format!("{instance_id}"));
224        stream_key.push(REDIS_DELIMITER);
225    }
226
227    stream_key.push_str(&config.streams_prefix);
228    stream_key
229}
230
231/// Retrieves and parses the Redis server version via the INFO command.
232///
233/// # Errors
234///
235/// Returns an error if the INFO command fails or version parsing fails.
236pub async fn get_redis_version(
237    conn: &mut redis::aio::ConnectionManager,
238) -> anyhow::Result<Version> {
239    let info: String = redis::cmd("INFO").query_async(conn).await?;
240    let version_str = match info.lines().find_map(|line| {
241        if line.starts_with("redis_version:") {
242            line.split(':').nth(1).map(|s| s.trim().to_string())
243        } else {
244            None
245        }
246    }) {
247        Some(info) => info,
248        None => {
249            anyhow::bail!("Redis version not available");
250        }
251    };
252
253    parse_redis_version(&version_str)
254}
255
256fn parse_redis_version(version_str: &str) -> anyhow::Result<Version> {
257    let mut components = version_str.split('.').map(str::parse::<u64>);
258
259    let major = components.next().unwrap_or(Ok(0))?;
260    let minor = components.next().unwrap_or(Ok(0))?;
261    let patch = components.next().unwrap_or(Ok(0))?;
262
263    Ok(Version::new(major, minor, patch))
264}
265
266#[cfg(test)]
267mod tests {
268    use rstest::rstest;
269    use serde_json::json;
270
271    use super::*;
272
273    #[rstest]
274    fn test_get_redis_url_default_values() {
275        let config: DatabaseConfig = serde_json::from_value(json!({})).unwrap();
276        let (url, redacted_url) = get_redis_url(config);
277        assert_eq!(url, "redis://127.0.0.1:6379");
278        assert_eq!(redacted_url, "redis://127.0.0.1:6379");
279    }
280
281    #[rstest]
282    fn test_get_redis_url_password_only() {
283        // Username omitted, but password present
284        let config_json = json!({
285            "host": "example.com",
286            "port": 6380,
287            "password": "secretpw",   // >4 chars ⇒ will be redacted
288        });
289        let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
290        let (url, redacted_url) = get_redis_url(config);
291        assert_eq!(url, "redis://:secretpw@example.com:6380");
292        assert_eq!(redacted_url, "redis://:se...pw@example.com:6380");
293    }
294
295    #[rstest]
296    fn test_get_redis_url_full_config_with_ssl() {
297        let config_json = json!({
298            "host": "example.com",
299            "port": 6380,
300            "username": "user",
301            "password": "pass",
302            "ssl": true,
303        });
304        let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
305        let (url, redacted_url) = get_redis_url(config);
306        assert_eq!(url, "rediss://user:pass@example.com:6380");
307        assert_eq!(redacted_url, "rediss://user:pass@example.com:6380");
308    }
309
310    #[rstest]
311    fn test_get_redis_url_full_config_without_ssl() {
312        let config_json = json!({
313            "host": "example.com",
314            "port": 6380,
315            "username": "username",
316            "password": "password",
317            "ssl": false,
318        });
319        let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
320        let (url, redacted_url) = get_redis_url(config);
321        assert_eq!(url, "redis://username:password@example.com:6380");
322        assert_eq!(redacted_url, "redis://username:pa...rd@example.com:6380");
323    }
324
325    #[rstest]
326    fn test_get_redis_url_missing_username_and_password() {
327        let config_json = json!({
328            "host": "example.com",
329            "port": 6380,
330            "ssl": false,
331        });
332        let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
333        let (url, redacted_url) = get_redis_url(config);
334        assert_eq!(url, "redis://example.com:6380");
335        assert_eq!(redacted_url, "redis://example.com:6380");
336    }
337
338    #[rstest]
339    fn test_get_redis_url_ssl_default_false() {
340        let config_json = json!({
341            "host": "example.com",
342            "port": 6380,
343            "username": "username",
344            "password": "password",
345            // "ssl" is intentionally omitted to test default behavior
346        });
347        let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
348        let (url, redacted_url) = get_redis_url(config);
349        assert_eq!(url, "redis://username:password@example.com:6380");
350        assert_eq!(redacted_url, "redis://username:pa...rd@example.com:6380");
351    }
352
353    #[rstest]
354    fn test_get_stream_key_with_trader_prefix_and_instance_id() {
355        let trader_id = TraderId::from("tester-123");
356        let instance_id = UUID4::new();
357        let config = MessageBusConfig {
358            use_instance_id: true,
359            ..Default::default()
360        };
361
362        let key = get_stream_key(trader_id, instance_id, &config);
363        assert_eq!(key, format!("trader-tester-123:{instance_id}:stream"));
364    }
365
366    #[rstest]
367    fn test_get_stream_key_without_trader_prefix_or_instance_id() {
368        let trader_id = TraderId::from("tester-123");
369        let instance_id = UUID4::new();
370        let config = MessageBusConfig {
371            use_trader_prefix: false,
372            use_trader_id: false,
373            ..Default::default()
374        };
375
376        let key = get_stream_key(trader_id, instance_id, &config);
377        assert_eq!(key, format!("stream"));
378    }
379
380    #[rstest]
381    fn test_get_index_key_without_prefix() {
382        let key = "index:order_position";
383        assert_eq!(get_index_key(key).unwrap(), "index:order_position");
384    }
385
386    #[rstest]
387    fn test_get_index_key_with_trader_prefix() {
388        let key = "trader-tester-123:index:order_position";
389        assert_eq!(get_index_key(key).unwrap(), "index:order_position");
390    }
391
392    #[rstest]
393    fn test_get_index_key_with_instance_id() {
394        let key = "trader-tester-123:abc-uuid-123:index:order_position";
395        assert_eq!(get_index_key(key).unwrap(), "index:order_position");
396    }
397
398    #[rstest]
399    fn test_get_index_key_invalid() {
400        let key = "no_index_pattern";
401        assert!(get_index_key(key).is_err());
402    }
403}