nautilus_live/
builder.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Builder for constructing [`LiveNode`] instances.
17
18use std::{collections::HashMap, time::Duration};
19
20use nautilus_common::enums::Environment;
21use nautilus_core::UUID4;
22use nautilus_data::client::DataClientAdapter;
23use nautilus_model::identifiers::TraderId;
24use nautilus_system::{
25    factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
26    kernel::NautilusKernel,
27};
28
29use crate::{
30    config::LiveNodeConfig,
31    manager::{ExecutionManager, ExecutionManagerConfig},
32    node::LiveNode,
33    runner::AsyncRunner,
34};
35
36/// Builder for constructing a [`LiveNode`] with a fluent API.
37///
38/// Provides configuration options specific to live nodes,
39/// including client factory registration and timeout settings.
40#[derive(Debug)]
41#[cfg_attr(
42    feature = "python",
43    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
44)]
45pub struct LiveNodeBuilder {
46    name: String,
47    config: LiveNodeConfig,
48    data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
49    exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
50    data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
51    exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
52}
53
54impl LiveNodeBuilder {
55    /// Creates a new [`LiveNodeBuilder`] with required parameters.
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if `environment` is invalid (BACKTEST).
60    pub fn new(trader_id: TraderId, environment: Environment) -> anyhow::Result<Self> {
61        match environment {
62            Environment::Sandbox | Environment::Live => {}
63            Environment::Backtest => {
64                anyhow::bail!("LiveNode cannot be used with Backtest environment");
65            }
66        }
67
68        let config = LiveNodeConfig {
69            environment,
70            trader_id,
71            ..Default::default()
72        };
73
74        Ok(Self {
75            name: "LiveNode".to_string(),
76            config,
77            data_client_factories: HashMap::new(),
78            exec_client_factories: HashMap::new(),
79            data_client_configs: HashMap::new(),
80            exec_client_configs: HashMap::new(),
81        })
82    }
83
84    /// Returns the name for the node.
85    #[must_use]
86    pub fn name(&self) -> &str {
87        &self.name
88    }
89
90    /// Set the name for the node.
91    #[must_use]
92    pub fn with_name(mut self, name: impl Into<String>) -> Self {
93        self.name = name.into();
94        self
95    }
96
97    /// Set the instance ID for the node.
98    #[must_use]
99    pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
100        self.config.instance_id = Some(instance_id);
101        self
102    }
103
104    /// Configure whether to load state on startup.
105    #[must_use]
106    pub const fn with_load_state(mut self, load_state: bool) -> Self {
107        self.config.load_state = load_state;
108        self
109    }
110
111    /// Configure whether to save state on shutdown.
112    #[must_use]
113    pub const fn with_save_state(mut self, save_state: bool) -> Self {
114        self.config.save_state = save_state;
115        self
116    }
117
118    /// Set the connection timeout in seconds.
119    #[must_use]
120    pub const fn with_timeout_connection(mut self, timeout_secs: u64) -> Self {
121        self.config.timeout_connection = Duration::from_secs(timeout_secs);
122        self
123    }
124
125    /// Set the reconciliation timeout in seconds.
126    #[must_use]
127    pub const fn with_timeout_reconciliation(mut self, timeout_secs: u64) -> Self {
128        self.config.timeout_reconciliation = Duration::from_secs(timeout_secs);
129        self
130    }
131
132    /// Configure whether to run startup reconciliation.
133    #[must_use]
134    pub fn with_reconciliation(mut self, reconciliation: bool) -> Self {
135        self.config.exec_engine.reconciliation = reconciliation;
136        self
137    }
138
139    /// Set the reconciliation lookback in minutes.
140    #[must_use]
141    pub fn with_reconciliation_lookback_mins(mut self, mins: u32) -> Self {
142        self.config.exec_engine.reconciliation_lookback_mins = Some(mins);
143        self
144    }
145
146    /// Set the portfolio initialization timeout in seconds.
147    #[must_use]
148    pub const fn with_timeout_portfolio(mut self, timeout_secs: u64) -> Self {
149        self.config.timeout_portfolio = Duration::from_secs(timeout_secs);
150        self
151    }
152
153    /// Set the disconnection timeout in seconds.
154    #[must_use]
155    pub const fn with_timeout_disconnection_secs(mut self, timeout_secs: u64) -> Self {
156        self.config.timeout_disconnection = Duration::from_secs(timeout_secs);
157        self
158    }
159
160    /// Set the post-stop delay in seconds.
161    #[must_use]
162    pub const fn with_delay_post_stop_secs(mut self, delay_secs: u64) -> Self {
163        self.config.delay_post_stop = Duration::from_secs(delay_secs);
164        self
165    }
166
167    /// Set the shutdown timeout in seconds.
168    #[must_use]
169    pub const fn with_delay_shutdown_secs(mut self, delay_secs: u64) -> Self {
170        self.config.timeout_shutdown = Duration::from_secs(delay_secs);
171        self
172    }
173
174    /// Adds a data client factory with configuration.
175    ///
176    /// # Errors
177    ///
178    /// Returns an error if a client with the same name is already registered.
179    pub fn add_data_client(
180        mut self,
181        name: Option<String>,
182        factory: Box<dyn DataClientFactory>,
183        config: Box<dyn ClientConfig>,
184    ) -> anyhow::Result<Self> {
185        let name = name.unwrap_or_else(|| factory.name().to_string());
186
187        if self.data_client_factories.contains_key(&name) {
188            anyhow::bail!("Data client '{name}' is already registered");
189        }
190
191        self.data_client_factories.insert(name.clone(), factory);
192        self.data_client_configs.insert(name, config);
193        Ok(self)
194    }
195
196    /// Adds an execution client factory with configuration.
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if a client with the same name is already registered.
201    pub fn add_exec_client(
202        mut self,
203        name: Option<String>,
204        factory: Box<dyn ExecutionClientFactory>,
205        config: Box<dyn ClientConfig>,
206    ) -> anyhow::Result<Self> {
207        let name = name.unwrap_or_else(|| factory.name().to_string());
208
209        if self.exec_client_factories.contains_key(&name) {
210            anyhow::bail!("Execution client '{name}' is already registered");
211        }
212
213        self.exec_client_factories.insert(name.clone(), factory);
214        self.exec_client_configs.insert(name, config);
215        Ok(self)
216    }
217
218    /// Build the [`LiveNode`] with the configured settings.
219    ///
220    /// This will:
221    /// 1. Build the underlying kernel.
222    /// 2. Create clients using factories.
223    /// 3. Register clients with engines.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if node construction fails.
228    pub fn build(mut self) -> anyhow::Result<LiveNode> {
229        log::info!(
230            "Building LiveNode with {} data clients and {} execution clients",
231            self.data_client_factories.len(),
232            self.exec_client_factories.len()
233        );
234
235        let runner = AsyncRunner::new();
236        let kernel = NautilusKernel::new(self.name.clone(), self.config.clone())?;
237
238        for (name, factory) in self.data_client_factories {
239            if let Some(config) = self.data_client_configs.remove(&name) {
240                log::debug!("Creating data client {name}");
241
242                let client =
243                    factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
244                let client_id = client.client_id();
245                let venue = client.venue();
246
247                let adapter = DataClientAdapter::new(
248                    client_id, venue, true, // handles_order_book_deltas
249                    true, // handles_order_book_snapshots
250                    client,
251                );
252
253                kernel
254                    .data_engine
255                    .borrow_mut()
256                    .register_client(adapter, venue);
257
258                log::info!("Registered DataClient-{client_id}");
259            } else {
260                log::warn!("No config found for data client factory {name}");
261            }
262        }
263
264        for (name, factory) in self.exec_client_factories {
265            if let Some(config) = self.exec_client_configs.remove(&name) {
266                log::debug!("Creating execution client {name}");
267
268                let client =
269                    factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
270                let client_id = client.client_id();
271
272                kernel.exec_engine.borrow_mut().register_client(client)?;
273
274                log::info!("Registered ExecutionClient-{client_id}");
275            } else {
276                log::warn!("No config found for execution client factory {name}");
277            }
278        }
279
280        let exec_manager_config = ExecutionManagerConfig::from(&self.config.exec_engine)
281            .with_trader_id(self.config.trader_id);
282        let exec_manager = ExecutionManager::new(
283            kernel.clock.clone(),
284            kernel.cache.clone(),
285            exec_manager_config,
286        );
287
288        log::info!("Built successfully");
289
290        Ok(LiveNode::new_from_builder(
291            kernel,
292            runner,
293            self.config,
294            exec_manager,
295        ))
296    }
297}