nautilus_live/
config.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Configuration types for live Nautilus system nodes.
17
18use std::{collections::HashMap, time::Duration};
19
20use nautilus_common::{
21    cache::CacheConfig, enums::Environment, logging::logger::LoggerConfig,
22    msgbus::database::MessageBusConfig,
23};
24use nautilus_core::UUID4;
25use nautilus_data::engine::config::DataEngineConfig;
26use nautilus_execution::engine::config::ExecutionEngineConfig;
27use nautilus_model::identifiers::TraderId;
28#[cfg(feature = "streaming")]
29use nautilus_persistence::config::StreamingConfig;
30use nautilus_portfolio::config::PortfolioConfig;
31use nautilus_risk::engine::config::RiskEngineConfig;
32use nautilus_system::config::NautilusKernelConfig;
33use serde::{Deserialize, Serialize};
34
35/// Configuration for live data engines.
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37pub struct LiveDataEngineConfig {
38    /// The queue size for the engine's internal queue buffers.
39    pub qsize: u32,
40}
41
42impl Default for LiveDataEngineConfig {
43    fn default() -> Self {
44        Self { qsize: 100_000 }
45    }
46}
47
48impl From<LiveDataEngineConfig> for DataEngineConfig {
49    fn from(_config: LiveDataEngineConfig) -> Self {
50        Self::default()
51    }
52}
53
54/// Configuration for live risk engines.
55#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
56pub struct LiveRiskEngineConfig {
57    /// The queue size for the engine's internal queue buffers.
58    pub qsize: u32,
59}
60
61impl Default for LiveRiskEngineConfig {
62    fn default() -> Self {
63        Self { qsize: 100_000 }
64    }
65}
66
67impl From<LiveRiskEngineConfig> for RiskEngineConfig {
68    fn from(_config: LiveRiskEngineConfig) -> Self {
69        Self::default()
70    }
71}
72
73/// Configuration for live execution engines.
74#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
75pub struct LiveExecEngineConfig {
76    /// If reconciliation is active at start-up.
77    pub reconciliation: bool,
78    /// The delay (seconds) before starting reconciliation at startup.
79    pub reconciliation_startup_delay_secs: f64,
80    /// The maximum lookback minutes to reconcile state for.
81    pub reconciliation_lookback_mins: Option<u32>,
82    /// Specific instrument IDs to reconcile (if None, reconciles all).
83    pub reconciliation_instrument_ids: Option<Vec<String>>,
84    /// If unclaimed order events with an EXTERNAL strategy ID should be filtered/dropped.
85    pub filter_unclaimed_external_orders: bool,
86    /// If position status reports are filtered from reconciliation.
87    pub filter_position_reports: bool,
88    /// Client order IDs to filter from reconciliation.
89    pub filtered_client_order_ids: Option<Vec<String>>,
90    /// If MARKET order events will be generated during reconciliation to align discrepancies.
91    pub generate_missing_orders: bool,
92    /// The interval (milliseconds) between checking whether in-flight orders have exceeded their threshold.
93    pub inflight_check_interval_ms: u32,
94    /// The threshold (milliseconds) beyond which an in-flight order's status is checked with the venue.
95    pub inflight_check_threshold_ms: u32,
96    /// The number of retry attempts for verifying in-flight order status.
97    pub inflight_check_retries: u32,
98    /// The interval (seconds) between checks for open orders at the venue.
99    pub open_check_interval_secs: Option<f64>,
100    /// The lookback minutes for open order checks.
101    pub open_check_lookback_mins: Option<u32>,
102    /// The minimum elapsed time (milliseconds) since an order update before acting on discrepancies.
103    pub open_check_threshold_ms: u32,
104    /// The number of retries for missing open orders.
105    pub open_check_missing_retries: u32,
106    /// If the `check_open_orders` requests only currently open orders from the venue.
107    pub open_check_open_only: bool,
108    /// The maximum number of single-order queries per consistency check cycle.
109    pub max_single_order_queries_per_cycle: u32,
110    /// The delay (milliseconds) between consecutive single-order queries.
111    pub single_order_query_delay_ms: u32,
112    /// The interval (seconds) between checks for open positions at the venue.
113    pub position_check_interval_secs: Option<f64>,
114    /// The lookback minutes for position consistency checks.
115    pub position_check_lookback_mins: u32,
116    /// The minimum elapsed time (milliseconds) since a position update before acting on discrepancies.
117    pub position_check_threshold_ms: u32,
118    /// The interval (minutes) between purging closed orders from the in-memory cache.
119    pub purge_closed_orders_interval_mins: Option<u32>,
120    /// The time buffer (minutes) before closed orders can be purged.
121    pub purge_closed_orders_buffer_mins: Option<u32>,
122    /// The interval (minutes) between purging closed positions from the in-memory cache.
123    pub purge_closed_positions_interval_mins: Option<u32>,
124    /// The time buffer (minutes) before closed positions can be purged.
125    pub purge_closed_positions_buffer_mins: Option<u32>,
126    /// The interval (minutes) between purging account events from the in-memory cache.
127    pub purge_account_events_interval_mins: Option<u32>,
128    /// The time buffer (minutes) before account events can be purged.
129    pub purge_account_events_lookback_mins: Option<u32>,
130    /// If purge operations should also delete from the backing database.
131    pub purge_from_database: bool,
132    /// The interval (seconds) between auditing own books against public order books.
133    pub own_books_audit_interval_secs: Option<f64>,
134    /// If the engine should gracefully shutdown when queue processing encounters unexpected errors.
135    pub graceful_shutdown_on_error: bool,
136    /// The queue size for the engine's internal queue buffers.
137    pub qsize: u32,
138}
139
140impl Default for LiveExecEngineConfig {
141    fn default() -> Self {
142        Self {
143            reconciliation: true,
144            reconciliation_startup_delay_secs: 10.0,
145            reconciliation_lookback_mins: None,
146            reconciliation_instrument_ids: None,
147            filter_unclaimed_external_orders: false,
148            filter_position_reports: false,
149            filtered_client_order_ids: None,
150            generate_missing_orders: true,
151            inflight_check_interval_ms: 2_000,
152            inflight_check_threshold_ms: 5_000,
153            inflight_check_retries: 5,
154            open_check_interval_secs: None,
155            open_check_lookback_mins: Some(60),
156            open_check_threshold_ms: 5_000,
157            open_check_missing_retries: 5,
158            open_check_open_only: true,
159            max_single_order_queries_per_cycle: 5,
160            single_order_query_delay_ms: 100,
161            position_check_interval_secs: None,
162            position_check_lookback_mins: 60,
163            position_check_threshold_ms: 60_000,
164            purge_closed_orders_interval_mins: None,
165            purge_closed_orders_buffer_mins: None,
166            purge_closed_positions_interval_mins: None,
167            purge_closed_positions_buffer_mins: None,
168            purge_account_events_interval_mins: None,
169            purge_account_events_lookback_mins: None,
170            purge_from_database: false,
171            own_books_audit_interval_secs: None,
172            graceful_shutdown_on_error: false,
173            qsize: 100_000,
174        }
175    }
176}
177
178impl From<LiveExecEngineConfig> for ExecutionEngineConfig {
179    fn from(_config: LiveExecEngineConfig) -> Self {
180        Self::default()
181    }
182}
183
184/// Configuration for live client message routing.
185#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
186pub struct RoutingConfig {
187    /// If the client should be registered as the default routing client.
188    pub default: bool,
189    /// The venues to register for routing.
190    pub venues: Option<Vec<String>>,
191}
192
193/// Configuration for instrument providers.
194#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
195pub struct InstrumentProviderConfig {
196    /// Whether to load all instruments on startup.
197    pub load_all: bool,
198    /// Whether to load instrument IDs only.
199    pub load_ids: bool,
200    /// Filters for loading specific instruments.
201    pub filters: HashMap<String, String>,
202}
203
204impl Default for InstrumentProviderConfig {
205    fn default() -> Self {
206        Self {
207            load_all: false,
208            load_ids: true,
209            filters: HashMap::new(),
210        }
211    }
212}
213
214/// Configuration for live data clients.
215#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
216pub struct LiveDataClientConfig {
217    /// If `DataClient` will emit bar updates when a new bar opens.
218    pub handle_revised_bars: bool,
219    /// The client's instrument provider configuration.
220    pub instrument_provider: InstrumentProviderConfig,
221    /// The client's message routing configuration.
222    pub routing: RoutingConfig,
223}
224
225/// Configuration for live execution clients.
226#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
227pub struct LiveExecClientConfig {
228    /// The client's instrument provider configuration.
229    pub instrument_provider: InstrumentProviderConfig,
230    /// The client's message routing configuration.
231    pub routing: RoutingConfig,
232}
233
234/// Configuration for live Nautilus system nodes.
235#[derive(Debug, Clone)]
236pub struct LiveNodeConfig {
237    /// The trading environment.
238    pub environment: Environment,
239    /// The trader ID for the node.
240    pub trader_id: TraderId,
241    /// If trading strategy state should be loaded from the database on start.
242    pub load_state: bool,
243    /// If trading strategy state should be saved to the database on stop.
244    pub save_state: bool,
245    /// The logging configuration for the kernel.
246    pub logging: LoggerConfig,
247    /// The unique instance identifier for the kernel
248    pub instance_id: Option<UUID4>,
249    /// The timeout for all clients to connect and initialize.
250    pub timeout_connection: Duration,
251    /// The timeout for execution state to reconcile.
252    pub timeout_reconciliation: Duration,
253    /// The timeout for portfolio to initialize margins and unrealized pnls.
254    pub timeout_portfolio: Duration,
255    /// The timeout for all engine clients to disconnect.
256    pub timeout_disconnection: Duration,
257    /// The delay after stopping the node to await residual events before final shutdown.
258    pub delay_post_stop: Duration,
259    /// The timeout to await pending tasks cancellation during shutdown.
260    pub timeout_shutdown: Duration,
261    /// The cache configuration.
262    pub cache: Option<CacheConfig>,
263    /// The message bus configuration.
264    pub msgbus: Option<MessageBusConfig>,
265    /// The portfolio configuration.
266    pub portfolio: Option<PortfolioConfig>,
267    /// The configuration for streaming to feather files.
268    #[cfg(feature = "streaming")]
269    pub streaming: Option<StreamingConfig>,
270    /// The live data engine configuration.
271    pub data_engine: LiveDataEngineConfig,
272    /// The live risk engine configuration.
273    pub risk_engine: LiveRiskEngineConfig,
274    /// The live execution engine configuration.
275    pub exec_engine: LiveExecEngineConfig,
276    /// The data client configurations.
277    pub data_clients: HashMap<String, LiveDataClientConfig>,
278    /// The execution client configurations.
279    pub exec_clients: HashMap<String, LiveExecClientConfig>,
280}
281
282impl Default for LiveNodeConfig {
283    fn default() -> Self {
284        Self {
285            environment: Environment::Live,
286            trader_id: TraderId::from("TRADER-001"),
287            load_state: false,
288            save_state: false,
289            logging: LoggerConfig::default(),
290            instance_id: None,
291            timeout_connection: Duration::from_secs(60),
292            timeout_reconciliation: Duration::from_secs(30),
293            timeout_portfolio: Duration::from_secs(10),
294            timeout_disconnection: Duration::from_secs(10),
295            delay_post_stop: Duration::from_secs(10),
296            timeout_shutdown: Duration::from_secs(5),
297            cache: None,
298            msgbus: None,
299            portfolio: None,
300            #[cfg(feature = "streaming")]
301            streaming: None,
302            data_engine: LiveDataEngineConfig::default(),
303            risk_engine: LiveRiskEngineConfig::default(),
304            exec_engine: LiveExecEngineConfig::default(),
305            data_clients: HashMap::new(),
306            exec_clients: HashMap::new(),
307        }
308    }
309}
310
311impl NautilusKernelConfig for LiveNodeConfig {
312    fn environment(&self) -> Environment {
313        self.environment
314    }
315
316    fn trader_id(&self) -> TraderId {
317        self.trader_id
318    }
319
320    fn load_state(&self) -> bool {
321        self.load_state
322    }
323
324    fn save_state(&self) -> bool {
325        self.save_state
326    }
327
328    fn logging(&self) -> LoggerConfig {
329        self.logging.clone()
330    }
331
332    fn instance_id(&self) -> Option<UUID4> {
333        self.instance_id
334    }
335
336    fn timeout_connection(&self) -> Duration {
337        self.timeout_connection
338    }
339
340    fn timeout_reconciliation(&self) -> Duration {
341        self.timeout_reconciliation
342    }
343
344    fn timeout_portfolio(&self) -> Duration {
345        self.timeout_portfolio
346    }
347
348    fn timeout_disconnection(&self) -> Duration {
349        self.timeout_disconnection
350    }
351
352    fn delay_post_stop(&self) -> Duration {
353        self.delay_post_stop
354    }
355
356    fn timeout_shutdown(&self) -> Duration {
357        self.timeout_shutdown
358    }
359
360    fn cache(&self) -> Option<CacheConfig> {
361        self.cache.clone()
362    }
363
364    fn msgbus(&self) -> Option<MessageBusConfig> {
365        self.msgbus.clone()
366    }
367
368    fn data_engine(&self) -> Option<DataEngineConfig> {
369        Some(self.data_engine.clone().into())
370    }
371
372    fn risk_engine(&self) -> Option<RiskEngineConfig> {
373        Some(self.risk_engine.clone().into())
374    }
375
376    fn exec_engine(&self) -> Option<ExecutionEngineConfig> {
377        Some(self.exec_engine.clone().into())
378    }
379
380    fn portfolio(&self) -> Option<PortfolioConfig> {
381        self.portfolio.clone()
382    }
383
384    #[cfg(feature = "streaming")]
385    fn streaming(&self) -> Option<StreamingConfig> {
386        self.streaming.clone()
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use rstest::rstest;
393
394    use super::*;
395
396    #[rstest]
397    fn test_trading_node_config_default() {
398        let config = LiveNodeConfig::default();
399
400        assert_eq!(config.environment, Environment::Live);
401        assert_eq!(config.trader_id, TraderId::from("TRADER-001"));
402        assert_eq!(config.data_engine.qsize, 100_000);
403        assert_eq!(config.risk_engine.qsize, 100_000);
404        assert_eq!(config.exec_engine.qsize, 100_000);
405        assert!(config.exec_engine.reconciliation);
406        assert!(!config.exec_engine.filter_unclaimed_external_orders);
407        assert!(config.data_clients.is_empty());
408        assert!(config.exec_clients.is_empty());
409    }
410
411    #[rstest]
412    fn test_trading_node_config_as_kernel_config() {
413        let config = LiveNodeConfig::default();
414
415        assert_eq!(config.environment(), Environment::Live);
416        assert_eq!(config.trader_id(), TraderId::from("TRADER-001"));
417        assert!(config.data_engine().is_some());
418        assert!(config.risk_engine().is_some());
419        assert!(config.exec_engine().is_some());
420        assert!(!config.load_state());
421        assert!(!config.save_state());
422    }
423
424    #[rstest]
425    fn test_live_exec_engine_config_defaults() {
426        let config = LiveExecEngineConfig::default();
427
428        assert!(config.reconciliation);
429        assert_eq!(config.reconciliation_startup_delay_secs, 10.0);
430        assert_eq!(config.reconciliation_lookback_mins, None);
431        assert_eq!(config.reconciliation_instrument_ids, None);
432        assert_eq!(config.filtered_client_order_ids, None);
433        assert!(!config.filter_unclaimed_external_orders);
434        assert!(!config.filter_position_reports);
435        assert!(config.generate_missing_orders);
436        assert_eq!(config.inflight_check_interval_ms, 2_000);
437        assert_eq!(config.inflight_check_threshold_ms, 5_000);
438        assert_eq!(config.inflight_check_retries, 5);
439        assert_eq!(config.open_check_threshold_ms, 5_000);
440        assert_eq!(config.open_check_lookback_mins, Some(60));
441        assert_eq!(config.open_check_missing_retries, 5);
442        assert!(config.open_check_open_only);
443        assert!(!config.purge_from_database);
444        assert!(!config.graceful_shutdown_on_error);
445        assert_eq!(config.qsize, 100_000);
446        assert_eq!(config.reconciliation_startup_delay_secs, 10.0);
447    }
448
449    #[rstest]
450    fn test_routing_config_default() {
451        let config = RoutingConfig::default();
452
453        assert!(!config.default);
454        assert_eq!(config.venues, None);
455    }
456
457    #[rstest]
458    fn test_live_data_client_config_default() {
459        let config = LiveDataClientConfig::default();
460
461        assert!(!config.handle_revised_bars);
462        assert!(!config.instrument_provider.load_all);
463        assert!(config.instrument_provider.load_ids);
464        assert!(!config.routing.default);
465    }
466}