1use std::{cell::RefCell, rc::Rc};
19
20use nautilus_common::{
21 actor::data_actor::{DataActorConfig, ImportableActorConfig},
22 enums::Environment,
23 live::get_runtime,
24 python::actor::PyDataActor,
25};
26use nautilus_core::{
27 UUID4,
28 python::{to_pyruntime_err, to_pyvalue_err},
29};
30use nautilus_model::identifiers::{ActorId, TraderId};
31use nautilus_system::get_global_pyo3_registry;
32use pyo3::{
33 prelude::*,
34 types::{PyDict, PyTuple},
35};
36use serde_json;
37
38use crate::{builder::LiveNodeBuilder, node::LiveNode};
39
40#[pymethods]
41impl LiveNode {
42 #[staticmethod]
43 #[pyo3(name = "builder")]
44 fn py_builder(
45 name: String,
46 trader_id: TraderId,
47 environment: Environment,
48 ) -> PyResult<LiveNodeBuilderPy> {
49 match Self::builder(trader_id, environment) {
50 Ok(builder) => Ok(LiveNodeBuilderPy {
51 inner: Rc::new(RefCell::new(Some(builder.with_name(name)))),
52 }),
53 Err(e) => Err(to_pyruntime_err(e)),
54 }
55 }
56
57 #[getter]
58 #[pyo3(name = "environment")]
59 fn py_environment(&self) -> Environment {
60 self.environment()
61 }
62
63 #[getter]
64 #[pyo3(name = "trader_id")]
65 fn py_trader_id(&self) -> TraderId {
66 self.trader_id()
67 }
68
69 #[getter]
70 #[pyo3(name = "instance_id")]
71 const fn py_instance_id(&self) -> UUID4 {
72 self.instance_id()
73 }
74
75 #[getter]
76 #[pyo3(name = "is_running")]
77 fn py_is_running(&self) -> bool {
78 self.is_running()
79 }
80
81 #[pyo3(name = "start")]
82 fn py_start(&mut self) -> PyResult<()> {
83 if self.is_running() {
84 return Err(to_pyruntime_err("LiveNode is already running"));
85 }
86
87 get_runtime().block_on(async { self.start().await.map_err(to_pyruntime_err) })
89 }
90
91 #[pyo3(name = "run")]
92 fn py_run(&mut self, py: Python) -> PyResult<()> {
93 if self.is_running() {
94 return Err(to_pyruntime_err("LiveNode is already running"));
95 }
96
97 let handle = self.handle();
99
100 let signal_module = py.import("signal")?;
102 let original_handler =
103 signal_module.call_method1("signal", (2, signal_module.getattr("SIG_DFL")?))?; let handle_for_signal = handle;
107 let signal_callback = pyo3::types::PyCFunction::new_closure(
108 py,
109 None,
110 None,
111 move |_args: &pyo3::Bound<'_, PyTuple>,
112 _kwargs: Option<&pyo3::Bound<'_, PyDict>>|
113 -> PyResult<()> {
114 log::info!("Python signal handler called");
115 handle_for_signal.stop();
116 Ok(())
117 },
118 )?;
119
120 signal_module.call_method1("signal", (2, signal_callback))?;
122
123 let result =
125 { get_runtime().block_on(async { self.run().await.map_err(to_pyruntime_err) }) };
126
127 signal_module.call_method1("signal", (2, original_handler))?;
129
130 result
131 }
132
133 #[pyo3(name = "stop")]
134 fn py_stop(&self) -> PyResult<()> {
135 if !self.is_running() {
136 return Err(to_pyruntime_err("LiveNode is not running"));
137 }
138
139 self.handle().stop();
141 Ok(())
142 }
143
144 #[allow(
145 unsafe_code,
146 reason = "Required for Python actor component registration"
147 )]
148 #[pyo3(name = "add_actor_from_config")]
149 fn py_add_actor_from_config(
150 &mut self,
151 _py: Python,
152 config: ImportableActorConfig,
153 ) -> PyResult<()> {
154 log::debug!("`add_actor_from_config` with: {config:?}");
155
156 let parts: Vec<&str> = config.actor_path.split(':').collect();
158 if parts.len() != 2 {
159 return Err(to_pyvalue_err(
160 "actor_path must be in format 'module.path:ClassName'",
161 ));
162 }
163 let (module_name, class_name) = (parts[0], parts[1]);
164
165 log::info!("Importing actor from module: {module_name} class: {class_name}");
166
167 let _python_class = Python::attach(|py| -> PyResult<Py<PyAny>> {
169 let actor_module = py.import(module_name)?;
170 let actor_class = actor_module.getattr(class_name)?;
171 Ok(actor_class.unbind())
172 })
173 .map_err(|e| to_pyruntime_err(format!("Failed to import Python class: {e}")))?;
174
175 let basic_data_actor_config = DataActorConfig::default();
178
179 log::debug!("Created basic DataActorConfig for Rust: {basic_data_actor_config:?}");
180
181 let python_actor = Python::attach(|py| -> anyhow::Result<Py<PyAny>> {
183 let actor_module = py
185 .import(module_name)
186 .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
187 let actor_class = actor_module
188 .getattr(class_name)
189 .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
190
191 let config_instance = if !config.config_path.is_empty() && !config.config.is_empty() {
193 let config_parts: Vec<&str> = config.config_path.split(':').collect();
195 if config_parts.len() != 2 {
196 anyhow::bail!(
197 "config_path must be in format 'module.path:ClassName', was {}",
198 config.config_path
199 );
200 }
201 let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
202
203 log::debug!("Importing config class from module: {config_module_name} class: {config_class_name}");
204
205 let config_module = py
207 .import(config_module_name)
208 .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
209 let config_class = config_module
210 .getattr(config_class_name)
211 .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
212
213 let py_dict = PyDict::new(py);
215 for (key, value) in &config.config {
216 let json_str = serde_json::to_string(value)
218 .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
219 let py_value = PyModule::import(py, "json")?
220 .call_method("loads", (json_str,), None)?;
221 py_dict.set_item(key, py_value)?;
222 }
223
224 log::debug!("Created config dict: {py_dict:?}");
225
226 let config_instance = {
228 match config_class.call((), Some(&py_dict)) {
230 Ok(instance) => {
231 log::debug!("Successfully created config instance with kwargs");
232
233 if let Err(e) = instance.call_method0("__post_init__") {
235 log::error!("Failed to call __post_init__ on config instance: {e}");
236 anyhow::bail!("__post_init__ failed: {e}");
237 }
238 log::debug!("Successfully called __post_init__ on config instance");
239
240 instance
241 },
242 Err(kwargs_err) => {
243 log::debug!("Failed to create config with kwargs: {kwargs_err}");
244
245 match config_class.call0() {
247 Ok(instance) => {
248 log::debug!("Created default config instance, setting attributes");
249 for (key, value) in &config.config {
250 let json_str = serde_json::to_string(value)
252 .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
253 let py_value = PyModule::import(py, "json")?
254 .call_method("loads", (json_str,), None)?;
255 if let Err(setattr_err) = instance.setattr(key, py_value) {
256 log::warn!("Failed to set attribute {key}: {setattr_err}");
257 }
258 }
259
260 if let Err(e) = instance.call_method0("__post_init__") {
262 log::error!("Failed to call __post_init__ on config instance: {e}");
263 anyhow::bail!("__post_init__ failed: {e}");
264 }
265 log::debug!("Called __post_init__ on config instance");
266
267 instance
268 },
269 Err(default_err) => {
270 log::debug!("Failed to create default config: {default_err}");
271
272 anyhow::bail!(
274 "Failed to create config instance. Tried kwargs approach: {kwargs_err}, default constructor: {default_err}"
275 );
276 }
277 }
278 }
279 }
280 };
281
282 log::debug!("Created config instance: {config_instance:?}");
283
284 Some(config_instance)
285 } else {
286 log::debug!("No config_path or empty config, using None");
287 None
288 };
289
290 let python_actor = if let Some(config_obj) = config_instance.clone() {
292 actor_class.call1((config_obj,))?
293 } else {
294 actor_class.call0()?
295 };
296
297 log::debug!("Created Python actor instance: {python_actor:?}");
298
299 let mut py_data_actor_ref = python_actor
301 .extract::<PyRefMut<PyDataActor>>()
302 .map_err(Into::<PyErr>::into)
303 .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
304
305 log::debug!(
306 "Internal PyDataActor mem_addr: {}, registered: {}",
307 &py_data_actor_ref.mem_address(),
308 py_data_actor_ref.is_registered()
309 );
310
311 if let Some(config_obj) = config_instance.as_ref() {
314 log::debug!("Extracting inherited config fields from Python actor config");
315
316 if let Ok(actor_id) = config_obj.getattr("actor_id")
318 && !actor_id.is_none() {
319 let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
321 actor_id_val
322 } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
323 ActorId::from(actor_id_str.as_str())
324 } else {
325 log::warn!("Failed to extract actor_id as ActorId or String");
326 anyhow::bail!("Invalid `actor_id` type");
327 };
328
329 log::debug!("Extracted actor_id: {actor_id_val}");
330 py_data_actor_ref.set_actor_id(actor_id_val);
331 }
332
333 if let Ok(log_events) = config_obj.getattr("log_events")
335 && let Ok(log_events_val) = log_events.extract::<bool>() {
336 log::debug!("Extracted log_events: {log_events_val}");
337 py_data_actor_ref.set_log_events(log_events_val);
338 }
339
340 if let Ok(log_commands) = config_obj.getattr("log_commands")
342 && let Ok(log_commands_val) = log_commands.extract::<bool>() {
343 log::debug!("Extracted log_commands: {log_commands_val}");
344 py_data_actor_ref.set_log_commands(log_commands_val);
345 }
346
347 log::debug!("Successfully updated PyDataActor config from Python actor instance");
348 }
349
350 py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
352
353 log::debug!("Set Python instance reference for method dispatch");
354
355 let trader_id = self.trader_id();
357 let clock = self.kernel().clock();
358 let cache = self.kernel().cache();
359
360 py_data_actor_ref
361 .register(trader_id, clock, cache)
362 .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
363
364 log::debug!(
365 "Internal PyDataActor registered: {}, state: {:?}",
366 py_data_actor_ref.is_registered(),
367 py_data_actor_ref.state()
368 );
369
370 Ok(python_actor.unbind())
371 })
372 .map_err(to_pyruntime_err)?;
373
374 let actor_id = Python::attach(|py| -> anyhow::Result<ActorId> {
375 let py_actor = python_actor.bind(py);
376 let py_data_actor_ref = py_actor
377 .cast::<PyDataActor>()
378 .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
379 let py_data_actor = py_data_actor_ref.borrow();
380 py_data_actor.register_in_global_registries();
381
382 Ok(py_data_actor.actor_id())
383 })
384 .map_err(to_pyruntime_err)?;
385
386 self.kernel_mut()
387 .trader
388 .add_actor_id_for_lifecycle(actor_id)
389 .map_err(to_pyruntime_err)?;
390
391 log::info!("Registered Python actor {actor_id}");
395 Ok(())
396 }
397
398 fn __repr__(&self) -> String {
400 format!(
401 "LiveNode(trader_id={}, environment={:?}, running={})",
402 self.trader_id(),
403 self.environment(),
404 self.is_running()
405 )
406 }
407}
408
409#[derive(Debug)]
412#[pyclass(name = "LiveNodeBuilder", module = "nautilus_trader.live", unsendable)]
413pub struct LiveNodeBuilderPy {
414 inner: Rc<RefCell<Option<LiveNodeBuilder>>>,
415}
416
417#[pymethods]
418impl LiveNodeBuilderPy {
419 #[pyo3(name = "with_instance_id")]
421 fn py_with_instance_id(&self, instance_id: UUID4) -> PyResult<Self> {
422 let mut inner_ref = self.inner.borrow_mut();
423 if let Some(builder) = inner_ref.take() {
424 *inner_ref = Some(builder.with_instance_id(instance_id));
425 Ok(Self {
426 inner: self.inner.clone(),
427 })
428 } else {
429 Err(to_pyruntime_err("Builder already consumed"))
430 }
431 }
432
433 #[pyo3(name = "with_load_state")]
435 fn py_with_load_state(&self, load_state: bool) -> PyResult<Self> {
436 let mut inner_ref = self.inner.borrow_mut();
437 if let Some(builder) = inner_ref.take() {
438 *inner_ref = Some(builder.with_load_state(load_state));
439 Ok(Self {
440 inner: self.inner.clone(),
441 })
442 } else {
443 Err(to_pyruntime_err("Builder already consumed"))
444 }
445 }
446
447 #[pyo3(name = "with_save_state")]
449 fn py_with_save_state(&self, save_state: bool) -> PyResult<Self> {
450 let mut inner_ref = self.inner.borrow_mut();
451 if let Some(builder) = inner_ref.take() {
452 *inner_ref = Some(builder.with_save_state(save_state));
453 Ok(Self {
454 inner: self.inner.clone(),
455 })
456 } else {
457 Err(to_pyruntime_err("Builder already consumed"))
458 }
459 }
460
461 #[pyo3(name = "add_data_client")]
463 fn py_add_data_client(
464 &self,
465 name: Option<String>,
466 factory: Py<PyAny>,
467 config: Py<PyAny>,
468 ) -> PyResult<Self> {
469 let mut inner_ref = self.inner.borrow_mut();
470 if let Some(builder) = inner_ref.take() {
471 Python::attach(|py| -> PyResult<Self> {
472 let registry = get_global_pyo3_registry();
474
475 let boxed_factory = registry.extract_factory(py, factory.clone_ref(py))?;
476 let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
477
478 let factory_name = factory
480 .getattr(py, "name")?
481 .call0(py)?
482 .extract::<String>(py)?;
483 let client_name = name.unwrap_or(factory_name);
484
485 match builder.add_data_client(Some(client_name), boxed_factory, boxed_config) {
487 Ok(updated_builder) => {
488 *inner_ref = Some(updated_builder);
489 Ok(Self {
490 inner: self.inner.clone(),
491 })
492 }
493 Err(e) => Err(to_pyruntime_err(format!("Failed to add data client: {e}"))),
494 }
495 })
496 } else {
497 Err(to_pyruntime_err("Builder already consumed"))
498 }
499 }
500
501 #[pyo3(name = "build")]
503 fn py_build(&self) -> PyResult<LiveNode> {
504 let mut inner_ref = self.inner.borrow_mut();
505 if let Some(builder) = inner_ref.take() {
506 match builder.build() {
507 Ok(node) => Ok(node),
508 Err(e) => Err(to_pyruntime_err(e)),
509 }
510 } else {
511 Err(to_pyruntime_err("Builder already consumed"))
512 }
513 }
514
515 fn __repr__(&self) -> String {
517 format!("{self:?}")
518 }
519}