1use std::{collections::HashMap, time::Duration};
19
20use nautilus_common::{enums::Environment, logging::logger::LoggerConfig};
21use nautilus_core::UUID4;
22use nautilus_data::client::DataClientAdapter;
23use nautilus_model::identifiers::TraderId;
24use nautilus_system::{
25 factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
26 kernel::NautilusKernel,
27};
28
29use crate::{
30 config::LiveNodeConfig,
31 manager::{ExecutionManager, ExecutionManagerConfig},
32 node::LiveNode,
33 runner::AsyncRunner,
34};
35
36#[derive(Debug)]
41#[cfg_attr(
42 feature = "python",
43 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
44)]
45pub struct LiveNodeBuilder {
46 name: String,
47 config: LiveNodeConfig,
48 data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
49 exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
50 data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
51 exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
52}
53
54impl LiveNodeBuilder {
55 pub fn new(trader_id: TraderId, environment: Environment) -> anyhow::Result<Self> {
61 match environment {
62 Environment::Sandbox | Environment::Live => {}
63 Environment::Backtest => {
64 anyhow::bail!("LiveNode cannot be used with Backtest environment");
65 }
66 }
67
68 let config = LiveNodeConfig {
69 environment,
70 trader_id,
71 ..Default::default()
72 };
73
74 Ok(Self {
75 name: "LiveNode".to_string(),
76 config,
77 data_client_factories: HashMap::new(),
78 exec_client_factories: HashMap::new(),
79 data_client_configs: HashMap::new(),
80 exec_client_configs: HashMap::new(),
81 })
82 }
83
84 #[must_use]
86 pub fn name(&self) -> &str {
87 &self.name
88 }
89
90 #[must_use]
92 pub fn with_name(mut self, name: impl Into<String>) -> Self {
93 self.name = name.into();
94 self
95 }
96
97 #[must_use]
99 pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
100 self.config.instance_id = Some(instance_id);
101 self
102 }
103
104 #[must_use]
106 pub const fn with_load_state(mut self, load_state: bool) -> Self {
107 self.config.load_state = load_state;
108 self
109 }
110
111 #[must_use]
113 pub const fn with_save_state(mut self, save_state: bool) -> Self {
114 self.config.save_state = save_state;
115 self
116 }
117
118 #[must_use]
120 pub const fn with_timeout_connection(mut self, timeout_secs: u64) -> Self {
121 self.config.timeout_connection = Duration::from_secs(timeout_secs);
122 self
123 }
124
125 #[must_use]
127 pub const fn with_timeout_reconciliation(mut self, timeout_secs: u64) -> Self {
128 self.config.timeout_reconciliation = Duration::from_secs(timeout_secs);
129 self
130 }
131
132 #[must_use]
134 pub fn with_reconciliation(mut self, reconciliation: bool) -> Self {
135 self.config.exec_engine.reconciliation = reconciliation;
136 self
137 }
138
139 #[must_use]
141 pub fn with_reconciliation_lookback_mins(mut self, mins: u32) -> Self {
142 self.config.exec_engine.reconciliation_lookback_mins = Some(mins);
143 self
144 }
145
146 #[must_use]
148 pub const fn with_timeout_portfolio(mut self, timeout_secs: u64) -> Self {
149 self.config.timeout_portfolio = Duration::from_secs(timeout_secs);
150 self
151 }
152
153 #[must_use]
155 pub const fn with_timeout_disconnection_secs(mut self, timeout_secs: u64) -> Self {
156 self.config.timeout_disconnection = Duration::from_secs(timeout_secs);
157 self
158 }
159
160 #[must_use]
162 pub const fn with_delay_post_stop_secs(mut self, delay_secs: u64) -> Self {
163 self.config.delay_post_stop = Duration::from_secs(delay_secs);
164 self
165 }
166
167 #[must_use]
169 pub const fn with_delay_shutdown_secs(mut self, delay_secs: u64) -> Self {
170 self.config.timeout_shutdown = Duration::from_secs(delay_secs);
171 self
172 }
173
174 #[must_use]
176 pub fn with_logging(mut self, logging: LoggerConfig) -> Self {
177 self.config.logging = logging;
178 self
179 }
180
181 pub fn add_data_client(
187 mut self,
188 name: Option<String>,
189 factory: Box<dyn DataClientFactory>,
190 config: Box<dyn ClientConfig>,
191 ) -> anyhow::Result<Self> {
192 let name = name.unwrap_or_else(|| factory.name().to_string());
193
194 if self.data_client_factories.contains_key(&name) {
195 anyhow::bail!("Data client '{name}' is already registered");
196 }
197
198 self.data_client_factories.insert(name.clone(), factory);
199 self.data_client_configs.insert(name, config);
200 Ok(self)
201 }
202
203 pub fn add_exec_client(
209 mut self,
210 name: Option<String>,
211 factory: Box<dyn ExecutionClientFactory>,
212 config: Box<dyn ClientConfig>,
213 ) -> anyhow::Result<Self> {
214 let name = name.unwrap_or_else(|| factory.name().to_string());
215
216 if self.exec_client_factories.contains_key(&name) {
217 anyhow::bail!("Execution client '{name}' is already registered");
218 }
219
220 self.exec_client_factories.insert(name.clone(), factory);
221 self.exec_client_configs.insert(name, config);
222 Ok(self)
223 }
224
225 pub fn build(mut self) -> anyhow::Result<LiveNode> {
236 log::info!(
237 "Building LiveNode with {} data clients and {} execution clients",
238 self.data_client_factories.len(),
239 self.exec_client_factories.len()
240 );
241
242 let runner = AsyncRunner::new();
243 let kernel = NautilusKernel::new(self.name.clone(), self.config.clone())?;
244
245 for (name, factory) in self.data_client_factories {
246 if let Some(config) = self.data_client_configs.remove(&name) {
247 log::debug!("Creating data client {name}");
248
249 let client =
250 factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
251 let client_id = client.client_id();
252 let venue = client.venue();
253
254 let adapter = DataClientAdapter::new(
255 client_id, venue, true, true, client,
258 );
259
260 kernel
261 .data_engine
262 .borrow_mut()
263 .register_client(adapter, venue);
264
265 log::info!("Registered DataClient-{client_id}");
266 } else {
267 log::warn!("No config found for data client factory {name}");
268 }
269 }
270
271 for (name, factory) in self.exec_client_factories {
272 if let Some(config) = self.exec_client_configs.remove(&name) {
273 log::debug!("Creating execution client {name}");
274
275 let client = factory.create(&name, config.as_ref(), kernel.cache())?;
276 let client_id = client.client_id();
277
278 kernel.exec_engine.borrow_mut().register_client(client)?;
279
280 log::info!("Registered ExecutionClient-{client_id}");
281 } else {
282 log::warn!("No config found for execution client factory {name}");
283 }
284 }
285
286 let exec_manager_config = ExecutionManagerConfig::from(&self.config.exec_engine)
287 .with_trader_id(self.config.trader_id);
288 let exec_manager = ExecutionManager::new(
289 kernel.clock.clone(),
290 kernel.cache.clone(),
291 exec_manager_config,
292 );
293
294 log::info!("Built successfully");
295
296 Ok(LiveNode::new_from_builder(
297 kernel,
298 runner,
299 self.config,
300 exec_manager,
301 ))
302 }
303}