1use std::{collections::HashMap, time::Duration};
19
20use nautilus_common::enums::Environment;
21use nautilus_core::UUID4;
22use nautilus_data::client::DataClientAdapter;
23use nautilus_model::identifiers::TraderId;
24use nautilus_system::{
25 factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
26 kernel::NautilusKernel,
27};
28
29use crate::{
30 config::LiveNodeConfig,
31 manager::{ExecutionManager, ExecutionManagerConfig},
32 node::LiveNode,
33 runner::AsyncRunner,
34};
35
36#[derive(Debug)]
41#[cfg_attr(
42 feature = "python",
43 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
44)]
45pub struct LiveNodeBuilder {
46 name: String,
47 config: LiveNodeConfig,
48 data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
49 exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
50 data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
51 exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
52}
53
54impl LiveNodeBuilder {
55 pub fn new(trader_id: TraderId, environment: Environment) -> anyhow::Result<Self> {
61 match environment {
62 Environment::Sandbox | Environment::Live => {}
63 Environment::Backtest => {
64 anyhow::bail!("LiveNode cannot be used with Backtest environment");
65 }
66 }
67
68 let config = LiveNodeConfig {
69 environment,
70 trader_id,
71 ..Default::default()
72 };
73
74 Ok(Self {
75 name: "LiveNode".to_string(),
76 config,
77 data_client_factories: HashMap::new(),
78 exec_client_factories: HashMap::new(),
79 data_client_configs: HashMap::new(),
80 exec_client_configs: HashMap::new(),
81 })
82 }
83
84 #[must_use]
86 pub fn name(&self) -> &str {
87 &self.name
88 }
89
90 #[must_use]
92 pub fn with_name(mut self, name: impl Into<String>) -> Self {
93 self.name = name.into();
94 self
95 }
96
97 #[must_use]
99 pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
100 self.config.instance_id = Some(instance_id);
101 self
102 }
103
104 #[must_use]
106 pub const fn with_load_state(mut self, load_state: bool) -> Self {
107 self.config.load_state = load_state;
108 self
109 }
110
111 #[must_use]
113 pub const fn with_save_state(mut self, save_state: bool) -> Self {
114 self.config.save_state = save_state;
115 self
116 }
117
118 #[must_use]
120 pub const fn with_timeout_connection(mut self, timeout_secs: u64) -> Self {
121 self.config.timeout_connection = Duration::from_secs(timeout_secs);
122 self
123 }
124
125 #[must_use]
127 pub const fn with_timeout_reconciliation(mut self, timeout_secs: u64) -> Self {
128 self.config.timeout_reconciliation = Duration::from_secs(timeout_secs);
129 self
130 }
131
132 #[must_use]
134 pub fn with_reconciliation(mut self, reconciliation: bool) -> Self {
135 self.config.exec_engine.reconciliation = reconciliation;
136 self
137 }
138
139 #[must_use]
141 pub fn with_reconciliation_lookback_mins(mut self, mins: u32) -> Self {
142 self.config.exec_engine.reconciliation_lookback_mins = Some(mins);
143 self
144 }
145
146 #[must_use]
148 pub const fn with_timeout_portfolio(mut self, timeout_secs: u64) -> Self {
149 self.config.timeout_portfolio = Duration::from_secs(timeout_secs);
150 self
151 }
152
153 #[must_use]
155 pub const fn with_timeout_disconnection_secs(mut self, timeout_secs: u64) -> Self {
156 self.config.timeout_disconnection = Duration::from_secs(timeout_secs);
157 self
158 }
159
160 #[must_use]
162 pub const fn with_delay_post_stop_secs(mut self, delay_secs: u64) -> Self {
163 self.config.delay_post_stop = Duration::from_secs(delay_secs);
164 self
165 }
166
167 #[must_use]
169 pub const fn with_delay_shutdown_secs(mut self, delay_secs: u64) -> Self {
170 self.config.timeout_shutdown = Duration::from_secs(delay_secs);
171 self
172 }
173
174 pub fn add_data_client(
180 mut self,
181 name: Option<String>,
182 factory: Box<dyn DataClientFactory>,
183 config: Box<dyn ClientConfig>,
184 ) -> anyhow::Result<Self> {
185 let name = name.unwrap_or_else(|| factory.name().to_string());
186
187 if self.data_client_factories.contains_key(&name) {
188 anyhow::bail!("Data client '{name}' is already registered");
189 }
190
191 self.data_client_factories.insert(name.clone(), factory);
192 self.data_client_configs.insert(name, config);
193 Ok(self)
194 }
195
196 pub fn add_exec_client(
202 mut self,
203 name: Option<String>,
204 factory: Box<dyn ExecutionClientFactory>,
205 config: Box<dyn ClientConfig>,
206 ) -> anyhow::Result<Self> {
207 let name = name.unwrap_or_else(|| factory.name().to_string());
208
209 if self.exec_client_factories.contains_key(&name) {
210 anyhow::bail!("Execution client '{name}' is already registered");
211 }
212
213 self.exec_client_factories.insert(name.clone(), factory);
214 self.exec_client_configs.insert(name, config);
215 Ok(self)
216 }
217
218 pub fn build(mut self) -> anyhow::Result<LiveNode> {
229 log::info!(
230 "Building LiveNode with {} data clients and {} execution clients",
231 self.data_client_factories.len(),
232 self.exec_client_factories.len()
233 );
234
235 let runner = AsyncRunner::new();
236 let kernel = NautilusKernel::new(self.name.clone(), self.config.clone())?;
237
238 for (name, factory) in self.data_client_factories {
239 if let Some(config) = self.data_client_configs.remove(&name) {
240 log::debug!("Creating data client {name}");
241
242 let client =
243 factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
244 let client_id = client.client_id();
245 let venue = client.venue();
246
247 let adapter = DataClientAdapter::new(
248 client_id, venue, true, true, client,
251 );
252
253 kernel
254 .data_engine
255 .borrow_mut()
256 .register_client(adapter, venue);
257
258 log::info!("Registered DataClient-{client_id}");
259 } else {
260 log::warn!("No config found for data client factory {name}");
261 }
262 }
263
264 for (name, factory) in self.exec_client_factories {
265 if let Some(config) = self.exec_client_configs.remove(&name) {
266 log::debug!("Creating execution client {name}");
267
268 let client =
269 factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
270 let client_id = client.client_id();
271
272 kernel.exec_engine.borrow_mut().register_client(client)?;
273
274 log::info!("Registered ExecutionClient-{client_id}");
275 } else {
276 log::warn!("No config found for execution client factory {name}");
277 }
278 }
279
280 let exec_manager_config = ExecutionManagerConfig::from(&self.config.exec_engine)
281 .with_trader_id(self.config.trader_id);
282 let exec_manager = ExecutionManager::new(
283 kernel.clock.clone(),
284 kernel.cache.clone(),
285 exec_manager_config,
286 );
287
288 log::info!("Built successfully");
289
290 Ok(LiveNode::new_from_builder(
291 kernel,
292 runner,
293 self.config,
294 exec_manager,
295 ))
296 }
297}