1use std::{cell::RefCell, rc::Rc};
19
20use nautilus_common::{
21 actor::data_actor::{DataActorConfig, ImportableActorConfig},
22 component::{Component, register_component_actor_by_ref},
23 enums::Environment,
24 python::actor::PyDataActor,
25 runtime::get_runtime,
26};
27use nautilus_core::{UUID4, python::to_pyruntime_err};
28use nautilus_model::identifiers::{ActorId, TraderId};
29use nautilus_system::get_global_pyo3_registry;
30use pyo3::{
31 exceptions::{PyRuntimeError, PyValueError},
32 prelude::*,
33 types::{PyDict, PyTuple},
34};
35use serde_json;
36
37use crate::node::{LiveNode, LiveNodeBuilder};
38
39#[pymethods]
40impl LiveNode {
41 #[staticmethod]
43 #[pyo3(name = "builder")]
44 fn py_builder(
45 name: String,
46 trader_id: TraderId,
47 environment: Environment,
48 ) -> PyResult<LiveNodeBuilderPy> {
49 match Self::builder(name, trader_id, environment) {
50 Ok(builder) => Ok(LiveNodeBuilderPy {
51 inner: Rc::new(RefCell::new(Some(builder))),
52 }),
53 Err(e) => Err(PyErr::new::<PyRuntimeError, _>(e.to_string())),
54 }
55 }
56
57 #[getter]
59 #[pyo3(name = "environment")]
60 fn py_environment(&self) -> Environment {
61 self.environment()
62 }
63
64 #[getter]
66 #[pyo3(name = "trader_id")]
67 fn py_trader_id(&self) -> TraderId {
68 self.trader_id()
69 }
70
71 #[getter]
73 #[pyo3(name = "instance_id")]
74 const fn py_instance_id(&self) -> UUID4 {
75 self.instance_id()
76 }
77
78 #[getter]
80 #[pyo3(name = "is_running")]
81 const fn py_is_running(&self) -> bool {
82 self.is_running()
83 }
84
85 #[pyo3(name = "start")]
86 fn py_start(&mut self) -> PyResult<()> {
87 if self.is_running() {
88 return Err(PyRuntimeError::new_err("LiveNode is already running"));
89 }
90
91 get_runtime().block_on(async {
93 self.start()
94 .await
95 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
96 })
97 }
98
99 #[pyo3(name = "run")]
100 fn py_run(&mut self, py: Python) -> PyResult<()> {
101 if self.is_running() {
102 return Err(PyRuntimeError::new_err("LiveNode is already running"));
103 }
104
105 let handle = self.handle();
107
108 let signal_module = py.import("signal")?;
110 let original_handler =
111 signal_module.call_method1("signal", (2, signal_module.getattr("SIG_DFL")?))?; let handle_for_signal = handle;
115 let signal_callback = pyo3::types::PyCFunction::new_closure(
116 py,
117 None,
118 None,
119 move |_args: &pyo3::Bound<'_, PyTuple>,
120 _kwargs: Option<&pyo3::Bound<'_, PyDict>>|
121 -> PyResult<()> {
122 log::info!("Python signal handler called");
123 handle_for_signal.stop();
124 Ok(())
125 },
126 )?;
127
128 signal_module.call_method1("signal", (2, signal_callback))?;
130
131 let result = {
133 get_runtime().block_on(async {
134 self.run()
135 .await
136 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
137 })
138 };
139
140 signal_module.call_method1("signal", (2, original_handler))?;
142
143 result
144 }
145
146 #[pyo3(name = "stop")]
147 fn py_stop(&self) -> PyResult<()> {
148 if !self.is_running() {
149 return Err(PyRuntimeError::new_err("LiveNode is not running"));
150 }
151
152 self.handle().stop();
154 Ok(())
155 }
156
157 #[allow(unsafe_code)] #[pyo3(name = "add_actor_from_config")]
159 fn py_add_actor_from_config(
160 &mut self,
161 _py: Python,
162 config: ImportableActorConfig,
163 ) -> PyResult<()> {
164 log::debug!("`add_actor_from_config` with: {config:?}");
165
166 let parts: Vec<&str> = config.actor_path.split(':').collect();
168 if parts.len() != 2 {
169 return Err(PyValueError::new_err(
170 "actor_path must be in format 'module.path:ClassName'",
171 ));
172 }
173 let (module_name, class_name) = (parts[0], parts[1]);
174
175 log::info!("Importing actor from module: {module_name} class: {class_name}");
176
177 let _python_class = Python::with_gil(|py| -> PyResult<PyObject> {
179 let actor_module = py.import(module_name)?;
180 let actor_class = actor_module.getattr(class_name)?;
181 Ok(actor_class.unbind())
182 })
183 .map_err(|e| PyRuntimeError::new_err(format!("Failed to import Python class: {e}")))?;
184
185 let basic_data_actor_config = DataActorConfig::default();
188
189 log::debug!("Created basic DataActorConfig for Rust: {basic_data_actor_config:?}");
190
191 let python_actor = Python::with_gil(|py| -> anyhow::Result<PyObject> {
193 let actor_module = py
195 .import(module_name)
196 .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
197 let actor_class = actor_module
198 .getattr(class_name)
199 .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
200
201 let config_instance = if !config.config_path.is_empty() && !config.config.is_empty() {
203 let config_parts: Vec<&str> = config.config_path.split(':').collect();
205 if config_parts.len() != 2 {
206 anyhow::bail!(
207 "config_path must be in format 'module.path:ClassName', was {}",
208 config.config_path
209 );
210 }
211 let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
212
213 log::debug!("Importing config class from module: {config_module_name} class: {config_class_name}");
214
215 let config_module = py
217 .import(config_module_name)
218 .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
219 let config_class = config_module
220 .getattr(config_class_name)
221 .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
222
223 let py_dict = PyDict::new(py);
225 for (key, value) in &config.config {
226 let json_str = serde_json::to_string(value)
228 .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
229 let py_value = PyModule::import(py, "json")?
230 .call_method("loads", (json_str,), None)?;
231 py_dict.set_item(key, py_value)?;
232 }
233
234 log::debug!("Created config dict: {py_dict:?}");
235
236 let config_instance = {
238 match config_class.call((), Some(&py_dict)) {
240 Ok(instance) => {
241 log::debug!("Successfully created config instance with kwargs");
242
243 if let Err(e) = instance.call_method0("__post_init__") {
245 log::error!("Failed to call __post_init__ on config instance: {e}");
246 anyhow::bail!("__post_init__ failed: {e}");
247 }
248 log::debug!("Successfully called __post_init__ on config instance");
249
250 instance
251 },
252 Err(kwargs_err) => {
253 log::debug!("Failed to create config with kwargs: {kwargs_err}");
254
255 match config_class.call0() {
257 Ok(instance) => {
258 log::debug!("Created default config instance, setting attributes");
259 for (key, value) in &config.config {
260 let json_str = serde_json::to_string(value)
262 .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
263 let py_value = PyModule::import(py, "json")?
264 .call_method("loads", (json_str,), None)?;
265 if let Err(setattr_err) = instance.setattr(key, py_value) {
266 log::warn!("Failed to set attribute {key}: {setattr_err}");
267 }
268 }
269
270 if let Err(e) = instance.call_method0("__post_init__") {
272 log::error!("Failed to call __post_init__ on config instance: {e}");
273 anyhow::bail!("__post_init__ failed: {e}");
274 }
275 log::debug!("Called __post_init__ on config instance");
276
277 instance
278 },
279 Err(default_err) => {
280 log::debug!("Failed to create default config: {default_err}");
281
282 anyhow::bail!(
284 "Failed to create config instance. Tried kwargs approach: {kwargs_err}, default constructor: {default_err}"
285 );
286 }
287 }
288 }
289 }
290 };
291
292 log::debug!("Created config instance: {config_instance:?}");
293
294 Some(config_instance)
295 } else {
296 log::debug!("No config_path or empty config, using None");
297 None
298 };
299
300 let python_actor = if let Some(config_obj) = config_instance.clone() {
302 actor_class.call1((config_obj,))?
303 } else {
304 actor_class.call0()?
305 };
306
307 log::debug!("Created Python actor instance: {python_actor:?}");
308
309 let mut py_data_actor_ref = python_actor.extract::<PyRefMut<PyDataActor>>()?;
311
312 log::debug!(
313 "Internal PyDataActor mem_addr: {}, registered: {}",
314 &py_data_actor_ref.mem_address(),
315 py_data_actor_ref.is_registered()
316 );
317
318 if let Some(config_obj) = config_instance.as_ref() {
321 log::debug!("Extracting inherited config fields from Python actor config");
322
323 if let Ok(actor_id) = config_obj.getattr("actor_id")
325 && !actor_id.is_none() {
326 let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
328 actor_id_val
329 } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
330 ActorId::from(actor_id_str.as_str())
331 } else {
332 log::warn!("Failed to extract actor_id as ActorId or String");
333 anyhow::bail!("Invalid `actor_id` type");
334 };
335
336 log::debug!("Extracted actor_id: {actor_id_val}");
337 py_data_actor_ref.set_actor_id(actor_id_val);
338 }
339
340 if let Ok(log_events) = config_obj.getattr("log_events")
342 && let Ok(log_events_val) = log_events.extract::<bool>() {
343 log::debug!("Extracted log_events: {log_events_val}");
344 py_data_actor_ref.set_log_events(log_events_val);
345 }
346
347 if let Ok(log_commands) = config_obj.getattr("log_commands")
349 && let Ok(log_commands_val) = log_commands.extract::<bool>() {
350 log::debug!("Extracted log_commands: {log_commands_val}");
351 py_data_actor_ref.set_log_commands(log_commands_val);
352 }
353
354 log::debug!("Successfully updated PyDataActor config from Python actor instance");
355 }
356
357 py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
359
360 log::debug!("Set Python instance reference for method dispatch");
361
362 let trader_id = self.trader_id();
364 let clock = self.kernel().clock();
365 let cache = self.kernel().cache();
366
367 py_data_actor_ref
368 .register(trader_id, clock, cache)
369 .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
370
371 log::debug!(
372 "Internal PyDataActor registered: {}, state: {:?}",
373 py_data_actor_ref.is_registered(),
374 py_data_actor_ref.state()
375 );
376
377 Ok(python_actor.unbind())
378 })
379 .map_err(to_pyruntime_err)?;
380
381 let actor_id = Python::with_gil(
383 |py| -> anyhow::Result<nautilus_model::identifiers::ActorId> {
384 let py_actor = python_actor.bind(py);
385 let py_data_actor_ref = py_actor
386 .downcast::<PyDataActor>()
387 .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
388 let py_data_actor = py_data_actor_ref.borrow();
389
390 unsafe {
393 register_component_actor_by_ref(&*py_data_actor);
394 }
395
396 Ok(py_data_actor.actor_id())
397 },
398 )
399 .map_err(to_pyruntime_err)?;
400
401 self.kernel_mut()
403 .trader
404 .add_actor_id_for_lifecycle(actor_id)
405 .map_err(to_pyruntime_err)?;
406
407 std::mem::forget(python_actor); log::info!("Registered Python actor {actor_id}");
412 Ok(())
413 }
414
415 fn __repr__(&self) -> String {
417 format!(
418 "LiveNode(trader_id={}, environment={:?}, running={})",
419 self.trader_id(),
420 self.environment(),
421 self.is_running()
422 )
423 }
424}
425
426#[derive(Debug)]
429#[pyclass(name = "LiveNodeBuilder", module = "nautilus_trader.live", unsendable)]
430pub struct LiveNodeBuilderPy {
431 inner: Rc<RefCell<Option<LiveNodeBuilder>>>,
432}
433
434#[pymethods]
435impl LiveNodeBuilderPy {
436 #[pyo3(name = "with_instance_id")]
438 fn py_with_instance_id(&self, instance_id: UUID4) -> PyResult<Self> {
439 let mut inner_ref = self.inner.borrow_mut();
440 if let Some(builder) = inner_ref.take() {
441 *inner_ref = Some(builder.with_instance_id(instance_id));
442 Ok(Self {
443 inner: self.inner.clone(),
444 })
445 } else {
446 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
447 "Builder already consumed",
448 ))
449 }
450 }
451
452 #[pyo3(name = "with_load_state")]
454 fn py_with_load_state(&self, load_state: bool) -> PyResult<Self> {
455 let mut inner_ref = self.inner.borrow_mut();
456 if let Some(builder) = inner_ref.take() {
457 *inner_ref = Some(builder.with_load_state(load_state));
458 Ok(Self {
459 inner: self.inner.clone(),
460 })
461 } else {
462 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
463 "Builder already consumed",
464 ))
465 }
466 }
467
468 #[pyo3(name = "with_save_state")]
470 fn py_with_save_state(&self, save_state: bool) -> PyResult<Self> {
471 let mut inner_ref = self.inner.borrow_mut();
472 if let Some(builder) = inner_ref.take() {
473 *inner_ref = Some(builder.with_save_state(save_state));
474 Ok(Self {
475 inner: self.inner.clone(),
476 })
477 } else {
478 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
479 "Builder already consumed",
480 ))
481 }
482 }
483
484 #[pyo3(name = "add_data_client")]
486 fn py_add_data_client(
487 &self,
488 name: Option<String>,
489 factory: PyObject,
490 config: PyObject,
491 ) -> PyResult<Self> {
492 let mut inner_ref = self.inner.borrow_mut();
493 if let Some(builder) = inner_ref.take() {
494 Python::with_gil(|py| -> PyResult<Self> {
495 let registry = get_global_pyo3_registry();
497
498 let boxed_factory = registry.extract_factory(py, factory.clone_ref(py))?;
499 let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
500
501 let factory_name = factory
503 .getattr(py, "name")?
504 .call0(py)?
505 .extract::<String>(py)?;
506 let client_name = name.unwrap_or(factory_name);
507
508 match builder.add_data_client(Some(client_name), boxed_factory, boxed_config) {
510 Ok(updated_builder) => {
511 *inner_ref = Some(updated_builder);
512 Ok(Self {
513 inner: self.inner.clone(),
514 })
515 }
516 Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
517 "Failed to add data client: {e}"
518 ))),
519 }
520 })
521 } else {
522 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
523 "Builder already consumed",
524 ))
525 }
526 }
527
528 #[pyo3(name = "build")]
530 fn py_build(&self) -> PyResult<LiveNode> {
531 let mut inner_ref = self.inner.borrow_mut();
532 if let Some(builder) = inner_ref.take() {
533 match builder.build() {
534 Ok(node) => Ok(node),
535 Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
536 e.to_string(),
537 )),
538 }
539 } else {
540 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
541 "Builder already consumed",
542 ))
543 }
544 }
545
546 fn __repr__(&self) -> String {
548 format!("{self:?}")
549 }
550}