Skip to main content

nautilus_live/
builder.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Builder for constructing [`LiveNode`] instances.
17
18use std::{collections::HashMap, time::Duration};
19
20use nautilus_common::{enums::Environment, logging::logger::LoggerConfig};
21use nautilus_core::UUID4;
22use nautilus_data::client::DataClientAdapter;
23use nautilus_model::identifiers::TraderId;
24use nautilus_system::{
25    factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
26    kernel::NautilusKernel,
27};
28
29use crate::{
30    config::LiveNodeConfig,
31    manager::{ExecutionManager, ExecutionManagerConfig},
32    node::LiveNode,
33    runner::AsyncRunner,
34};
35
36/// Builder for constructing a [`LiveNode`] with a fluent API.
37///
38/// Provides configuration options specific to live nodes,
39/// including client factory registration and timeout settings.
40#[derive(Debug)]
41#[cfg_attr(
42    feature = "python",
43    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
44)]
45pub struct LiveNodeBuilder {
46    name: String,
47    config: LiveNodeConfig,
48    data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
49    exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
50    data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
51    exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
52}
53
54impl LiveNodeBuilder {
55    /// Creates a new [`LiveNodeBuilder`] with required parameters.
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if `environment` is invalid (BACKTEST).
60    pub fn new(trader_id: TraderId, environment: Environment) -> anyhow::Result<Self> {
61        match environment {
62            Environment::Sandbox | Environment::Live => {}
63            Environment::Backtest => {
64                anyhow::bail!("LiveNode cannot be used with Backtest environment");
65            }
66        }
67
68        let config = LiveNodeConfig {
69            environment,
70            trader_id,
71            ..Default::default()
72        };
73
74        Ok(Self {
75            name: "LiveNode".to_string(),
76            config,
77            data_client_factories: HashMap::new(),
78            exec_client_factories: HashMap::new(),
79            data_client_configs: HashMap::new(),
80            exec_client_configs: HashMap::new(),
81        })
82    }
83
84    /// Returns the name for the node.
85    #[must_use]
86    pub fn name(&self) -> &str {
87        &self.name
88    }
89
90    /// Set the name for the node.
91    #[must_use]
92    pub fn with_name(mut self, name: impl Into<String>) -> Self {
93        self.name = name.into();
94        self
95    }
96
97    /// Set the instance ID for the node.
98    #[must_use]
99    pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
100        self.config.instance_id = Some(instance_id);
101        self
102    }
103
104    /// Configure whether to load state on startup.
105    #[must_use]
106    pub const fn with_load_state(mut self, load_state: bool) -> Self {
107        self.config.load_state = load_state;
108        self
109    }
110
111    /// Configure whether to save state on shutdown.
112    #[must_use]
113    pub const fn with_save_state(mut self, save_state: bool) -> Self {
114        self.config.save_state = save_state;
115        self
116    }
117
118    /// Set the connection timeout in seconds.
119    #[must_use]
120    pub const fn with_timeout_connection(mut self, timeout_secs: u64) -> Self {
121        self.config.timeout_connection = Duration::from_secs(timeout_secs);
122        self
123    }
124
125    /// Set the reconciliation timeout in seconds.
126    #[must_use]
127    pub const fn with_timeout_reconciliation(mut self, timeout_secs: u64) -> Self {
128        self.config.timeout_reconciliation = Duration::from_secs(timeout_secs);
129        self
130    }
131
132    /// Configure whether to run startup reconciliation.
133    #[must_use]
134    pub fn with_reconciliation(mut self, reconciliation: bool) -> Self {
135        self.config.exec_engine.reconciliation = reconciliation;
136        self
137    }
138
139    /// Set the reconciliation lookback in minutes.
140    #[must_use]
141    pub fn with_reconciliation_lookback_mins(mut self, mins: u32) -> Self {
142        self.config.exec_engine.reconciliation_lookback_mins = Some(mins);
143        self
144    }
145
146    /// Set the portfolio initialization timeout in seconds.
147    #[must_use]
148    pub const fn with_timeout_portfolio(mut self, timeout_secs: u64) -> Self {
149        self.config.timeout_portfolio = Duration::from_secs(timeout_secs);
150        self
151    }
152
153    /// Set the disconnection timeout in seconds.
154    #[must_use]
155    pub const fn with_timeout_disconnection_secs(mut self, timeout_secs: u64) -> Self {
156        self.config.timeout_disconnection = Duration::from_secs(timeout_secs);
157        self
158    }
159
160    /// Set the post-stop delay in seconds.
161    #[must_use]
162    pub const fn with_delay_post_stop_secs(mut self, delay_secs: u64) -> Self {
163        self.config.delay_post_stop = Duration::from_secs(delay_secs);
164        self
165    }
166
167    /// Set the shutdown timeout in seconds.
168    #[must_use]
169    pub const fn with_delay_shutdown_secs(mut self, delay_secs: u64) -> Self {
170        self.config.timeout_shutdown = Duration::from_secs(delay_secs);
171        self
172    }
173
174    /// Set the logging configuration.
175    #[must_use]
176    pub fn with_logging(mut self, logging: LoggerConfig) -> Self {
177        self.config.logging = logging;
178        self
179    }
180
181    /// Adds a data client factory with configuration.
182    ///
183    /// # Errors
184    ///
185    /// Returns an error if a client with the same name is already registered.
186    pub fn add_data_client(
187        mut self,
188        name: Option<String>,
189        factory: Box<dyn DataClientFactory>,
190        config: Box<dyn ClientConfig>,
191    ) -> anyhow::Result<Self> {
192        let name = name.unwrap_or_else(|| factory.name().to_string());
193
194        if self.data_client_factories.contains_key(&name) {
195            anyhow::bail!("Data client '{name}' is already registered");
196        }
197
198        self.data_client_factories.insert(name.clone(), factory);
199        self.data_client_configs.insert(name, config);
200        Ok(self)
201    }
202
203    /// Adds an execution client factory with configuration.
204    ///
205    /// # Errors
206    ///
207    /// Returns an error if a client with the same name is already registered.
208    pub fn add_exec_client(
209        mut self,
210        name: Option<String>,
211        factory: Box<dyn ExecutionClientFactory>,
212        config: Box<dyn ClientConfig>,
213    ) -> anyhow::Result<Self> {
214        let name = name.unwrap_or_else(|| factory.name().to_string());
215
216        if self.exec_client_factories.contains_key(&name) {
217            anyhow::bail!("Execution client '{name}' is already registered");
218        }
219
220        self.exec_client_factories.insert(name.clone(), factory);
221        self.exec_client_configs.insert(name, config);
222        Ok(self)
223    }
224
225    /// Build the [`LiveNode`] with the configured settings.
226    ///
227    /// This will:
228    /// 1. Build the underlying kernel.
229    /// 2. Create clients using factories.
230    /// 3. Register clients with engines.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if node construction fails.
235    pub fn build(mut self) -> anyhow::Result<LiveNode> {
236        log::info!(
237            "Building LiveNode with {} data clients and {} execution clients",
238            self.data_client_factories.len(),
239            self.exec_client_factories.len()
240        );
241
242        let runner = AsyncRunner::new();
243        let kernel = NautilusKernel::new(self.name.clone(), self.config.clone())?;
244
245        for (name, factory) in self.data_client_factories {
246            if let Some(config) = self.data_client_configs.remove(&name) {
247                log::debug!("Creating data client {name}");
248
249                let client =
250                    factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
251                let client_id = client.client_id();
252                let venue = client.venue();
253
254                let adapter = DataClientAdapter::new(
255                    client_id, venue, true, // handles_order_book_deltas
256                    true, // handles_order_book_snapshots
257                    client,
258                );
259
260                kernel
261                    .data_engine
262                    .borrow_mut()
263                    .register_client(adapter, venue);
264
265                log::info!("Registered DataClient-{client_id}");
266            } else {
267                log::warn!("No config found for data client factory {name}");
268            }
269        }
270
271        for (name, factory) in self.exec_client_factories {
272            if let Some(config) = self.exec_client_configs.remove(&name) {
273                log::debug!("Creating execution client {name}");
274
275                let client = factory.create(&name, config.as_ref(), kernel.cache())?;
276                let client_id = client.client_id();
277
278                kernel.exec_engine.borrow_mut().register_client(client)?;
279
280                log::info!("Registered ExecutionClient-{client_id}");
281            } else {
282                log::warn!("No config found for execution client factory {name}");
283            }
284        }
285
286        let exec_manager_config = ExecutionManagerConfig::from(&self.config.exec_engine)
287            .with_trader_id(self.config.trader_id);
288        let exec_manager = ExecutionManager::new(
289            kernel.clock.clone(),
290            kernel.cache.clone(),
291            exec_manager_config,
292        );
293
294        log::info!("Built successfully");
295
296        Ok(LiveNode::new_from_builder(
297            kernel,
298            runner,
299            self.config,
300            exec_manager,
301        ))
302    }
303}