1use std::{cell::RefCell, rc::Rc};
19
20use nautilus_common::{
21 actor::data_actor::{DataActorConfig, ImportableActorConfig},
22 component::{Component, register_component_actor_by_ref},
23 enums::Environment,
24 python::actor::PyDataActor,
25 runtime::get_runtime,
26};
27use nautilus_core::{UUID4, python::to_pyruntime_err};
28use nautilus_model::identifiers::{ActorId, TraderId};
29use nautilus_system::get_global_pyo3_registry;
30use pyo3::{
31 exceptions::{PyRuntimeError, PyValueError},
32 prelude::*,
33 types::{PyDict, PyTuple},
34};
35use serde_json;
36
37use crate::node::{LiveNode, LiveNodeBuilder};
38
39#[pymethods]
40impl LiveNode {
41 #[staticmethod]
43 #[pyo3(name = "builder")]
44 fn py_builder(
45 name: String,
46 trader_id: TraderId,
47 environment: Environment,
48 ) -> PyResult<LiveNodeBuilderPy> {
49 match Self::builder(name, trader_id, environment) {
50 Ok(builder) => Ok(LiveNodeBuilderPy {
51 inner: Rc::new(RefCell::new(Some(builder))),
52 }),
53 Err(e) => Err(PyErr::new::<PyRuntimeError, _>(e.to_string())),
54 }
55 }
56
57 #[getter]
59 #[pyo3(name = "environment")]
60 fn py_environment(&self) -> Environment {
61 self.environment()
62 }
63
64 #[getter]
66 #[pyo3(name = "trader_id")]
67 fn py_trader_id(&self) -> TraderId {
68 self.trader_id()
69 }
70
71 #[getter]
73 #[pyo3(name = "instance_id")]
74 const fn py_instance_id(&self) -> UUID4 {
75 self.instance_id()
76 }
77
78 #[getter]
80 #[pyo3(name = "is_running")]
81 const fn py_is_running(&self) -> bool {
82 self.is_running()
83 }
84
85 #[pyo3(name = "start")]
86 fn py_start(&mut self) -> PyResult<()> {
87 if self.is_running() {
88 return Err(PyRuntimeError::new_err("LiveNode is already running"));
89 }
90
91 get_runtime().block_on(async {
93 self.start()
94 .await
95 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
96 })
97 }
98
99 #[pyo3(name = "run")]
100 fn py_run(&mut self, py: Python) -> PyResult<()> {
101 if self.is_running() {
102 return Err(PyRuntimeError::new_err("LiveNode is already running"));
103 }
104
105 let handle = self.handle();
107
108 let signal_module = py.import("signal")?;
110 let original_handler =
111 signal_module.call_method1("signal", (2, signal_module.getattr("SIG_DFL")?))?; let handle_for_signal = handle;
115 let signal_callback = pyo3::types::PyCFunction::new_closure(
116 py,
117 None,
118 None,
119 move |_args: &pyo3::Bound<'_, PyTuple>,
120 _kwargs: Option<&pyo3::Bound<'_, PyDict>>|
121 -> PyResult<()> {
122 log::info!("Python signal handler called");
123 handle_for_signal.stop();
124 Ok(())
125 },
126 )?;
127
128 signal_module.call_method1("signal", (2, signal_callback))?;
130
131 let result = {
133 get_runtime().block_on(async {
134 self.run()
135 .await
136 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
137 })
138 };
139
140 signal_module.call_method1("signal", (2, original_handler))?;
142
143 result
144 }
145
146 #[pyo3(name = "stop")]
147 fn py_stop(&self) -> PyResult<()> {
148 if !self.is_running() {
149 return Err(PyRuntimeError::new_err("LiveNode is not running"));
150 }
151
152 self.handle().stop();
154 Ok(())
155 }
156
157 #[allow(
158 unsafe_code,
159 reason = "Required for Python actor component registration"
160 )]
161 #[pyo3(name = "add_actor_from_config")]
162 fn py_add_actor_from_config(
163 &mut self,
164 _py: Python,
165 config: ImportableActorConfig,
166 ) -> PyResult<()> {
167 log::debug!("`add_actor_from_config` with: {config:?}");
168
169 let parts: Vec<&str> = config.actor_path.split(':').collect();
171 if parts.len() != 2 {
172 return Err(PyValueError::new_err(
173 "actor_path must be in format 'module.path:ClassName'",
174 ));
175 }
176 let (module_name, class_name) = (parts[0], parts[1]);
177
178 log::info!("Importing actor from module: {module_name} class: {class_name}");
179
180 let _python_class = Python::attach(|py| -> PyResult<Py<PyAny>> {
182 let actor_module = py.import(module_name)?;
183 let actor_class = actor_module.getattr(class_name)?;
184 Ok(actor_class.unbind())
185 })
186 .map_err(|e| PyRuntimeError::new_err(format!("Failed to import Python class: {e}")))?;
187
188 let basic_data_actor_config = DataActorConfig::default();
191
192 log::debug!("Created basic DataActorConfig for Rust: {basic_data_actor_config:?}");
193
194 let python_actor = Python::attach(|py| -> anyhow::Result<Py<PyAny>> {
196 let actor_module = py
198 .import(module_name)
199 .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
200 let actor_class = actor_module
201 .getattr(class_name)
202 .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
203
204 let config_instance = if !config.config_path.is_empty() && !config.config.is_empty() {
206 let config_parts: Vec<&str> = config.config_path.split(':').collect();
208 if config_parts.len() != 2 {
209 anyhow::bail!(
210 "config_path must be in format 'module.path:ClassName', was {}",
211 config.config_path
212 );
213 }
214 let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
215
216 log::debug!("Importing config class from module: {config_module_name} class: {config_class_name}");
217
218 let config_module = py
220 .import(config_module_name)
221 .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
222 let config_class = config_module
223 .getattr(config_class_name)
224 .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
225
226 let py_dict = PyDict::new(py);
228 for (key, value) in &config.config {
229 let json_str = serde_json::to_string(value)
231 .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
232 let py_value = PyModule::import(py, "json")?
233 .call_method("loads", (json_str,), None)?;
234 py_dict.set_item(key, py_value)?;
235 }
236
237 log::debug!("Created config dict: {py_dict:?}");
238
239 let config_instance = {
241 match config_class.call((), Some(&py_dict)) {
243 Ok(instance) => {
244 log::debug!("Successfully created config instance with kwargs");
245
246 if let Err(e) = instance.call_method0("__post_init__") {
248 log::error!("Failed to call __post_init__ on config instance: {e}");
249 anyhow::bail!("__post_init__ failed: {e}");
250 }
251 log::debug!("Successfully called __post_init__ on config instance");
252
253 instance
254 },
255 Err(kwargs_err) => {
256 log::debug!("Failed to create config with kwargs: {kwargs_err}");
257
258 match config_class.call0() {
260 Ok(instance) => {
261 log::debug!("Created default config instance, setting attributes");
262 for (key, value) in &config.config {
263 let json_str = serde_json::to_string(value)
265 .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
266 let py_value = PyModule::import(py, "json")?
267 .call_method("loads", (json_str,), None)?;
268 if let Err(setattr_err) = instance.setattr(key, py_value) {
269 log::warn!("Failed to set attribute {key}: {setattr_err}");
270 }
271 }
272
273 if let Err(e) = instance.call_method0("__post_init__") {
275 log::error!("Failed to call __post_init__ on config instance: {e}");
276 anyhow::bail!("__post_init__ failed: {e}");
277 }
278 log::debug!("Called __post_init__ on config instance");
279
280 instance
281 },
282 Err(default_err) => {
283 log::debug!("Failed to create default config: {default_err}");
284
285 anyhow::bail!(
287 "Failed to create config instance. Tried kwargs approach: {kwargs_err}, default constructor: {default_err}"
288 );
289 }
290 }
291 }
292 }
293 };
294
295 log::debug!("Created config instance: {config_instance:?}");
296
297 Some(config_instance)
298 } else {
299 log::debug!("No config_path or empty config, using None");
300 None
301 };
302
303 let python_actor = if let Some(config_obj) = config_instance.clone() {
305 actor_class.call1((config_obj,))?
306 } else {
307 actor_class.call0()?
308 };
309
310 log::debug!("Created Python actor instance: {python_actor:?}");
311
312 let mut py_data_actor_ref = python_actor
314 .extract::<PyRefMut<PyDataActor>>()
315 .map_err(Into::<PyErr>::into)
316 .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
317
318 log::debug!(
319 "Internal PyDataActor mem_addr: {}, registered: {}",
320 &py_data_actor_ref.mem_address(),
321 py_data_actor_ref.is_registered()
322 );
323
324 if let Some(config_obj) = config_instance.as_ref() {
327 log::debug!("Extracting inherited config fields from Python actor config");
328
329 if let Ok(actor_id) = config_obj.getattr("actor_id")
331 && !actor_id.is_none() {
332 let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
334 actor_id_val
335 } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
336 ActorId::from(actor_id_str.as_str())
337 } else {
338 log::warn!("Failed to extract actor_id as ActorId or String");
339 anyhow::bail!("Invalid `actor_id` type");
340 };
341
342 log::debug!("Extracted actor_id: {actor_id_val}");
343 py_data_actor_ref.set_actor_id(actor_id_val);
344 }
345
346 if let Ok(log_events) = config_obj.getattr("log_events")
348 && let Ok(log_events_val) = log_events.extract::<bool>() {
349 log::debug!("Extracted log_events: {log_events_val}");
350 py_data_actor_ref.set_log_events(log_events_val);
351 }
352
353 if let Ok(log_commands) = config_obj.getattr("log_commands")
355 && let Ok(log_commands_val) = log_commands.extract::<bool>() {
356 log::debug!("Extracted log_commands: {log_commands_val}");
357 py_data_actor_ref.set_log_commands(log_commands_val);
358 }
359
360 log::debug!("Successfully updated PyDataActor config from Python actor instance");
361 }
362
363 py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
365
366 log::debug!("Set Python instance reference for method dispatch");
367
368 let trader_id = self.trader_id();
370 let clock = self.kernel().clock();
371 let cache = self.kernel().cache();
372
373 py_data_actor_ref
374 .register(trader_id, clock, cache)
375 .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
376
377 log::debug!(
378 "Internal PyDataActor registered: {}, state: {:?}",
379 py_data_actor_ref.is_registered(),
380 py_data_actor_ref.state()
381 );
382
383 Ok(python_actor.unbind())
384 })
385 .map_err(to_pyruntime_err)?;
386
387 let actor_id = Python::attach(
389 |py| -> anyhow::Result<nautilus_model::identifiers::ActorId> {
390 let py_actor = python_actor.bind(py);
391 let py_data_actor_ref = py_actor
392 .cast::<PyDataActor>()
393 .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
394 let py_data_actor = py_data_actor_ref.borrow();
395
396 unsafe {
399 register_component_actor_by_ref(&*py_data_actor);
400 }
401
402 Ok(py_data_actor.actor_id())
403 },
404 )
405 .map_err(to_pyruntime_err)?;
406
407 self.kernel_mut()
409 .trader
410 .add_actor_id_for_lifecycle(actor_id)
411 .map_err(to_pyruntime_err)?;
412
413 std::mem::forget(python_actor); log::info!("Registered Python actor {actor_id}");
418 Ok(())
419 }
420
421 fn __repr__(&self) -> String {
423 format!(
424 "LiveNode(trader_id={}, environment={:?}, running={})",
425 self.trader_id(),
426 self.environment(),
427 self.is_running()
428 )
429 }
430}
431
432#[derive(Debug)]
435#[pyclass(name = "LiveNodeBuilder", module = "nautilus_trader.live", unsendable)]
436pub struct LiveNodeBuilderPy {
437 inner: Rc<RefCell<Option<LiveNodeBuilder>>>,
438}
439
440#[pymethods]
441impl LiveNodeBuilderPy {
442 #[pyo3(name = "with_instance_id")]
444 fn py_with_instance_id(&self, instance_id: UUID4) -> PyResult<Self> {
445 let mut inner_ref = self.inner.borrow_mut();
446 if let Some(builder) = inner_ref.take() {
447 *inner_ref = Some(builder.with_instance_id(instance_id));
448 Ok(Self {
449 inner: self.inner.clone(),
450 })
451 } else {
452 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
453 "Builder already consumed",
454 ))
455 }
456 }
457
458 #[pyo3(name = "with_load_state")]
460 fn py_with_load_state(&self, load_state: bool) -> PyResult<Self> {
461 let mut inner_ref = self.inner.borrow_mut();
462 if let Some(builder) = inner_ref.take() {
463 *inner_ref = Some(builder.with_load_state(load_state));
464 Ok(Self {
465 inner: self.inner.clone(),
466 })
467 } else {
468 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
469 "Builder already consumed",
470 ))
471 }
472 }
473
474 #[pyo3(name = "with_save_state")]
476 fn py_with_save_state(&self, save_state: bool) -> PyResult<Self> {
477 let mut inner_ref = self.inner.borrow_mut();
478 if let Some(builder) = inner_ref.take() {
479 *inner_ref = Some(builder.with_save_state(save_state));
480 Ok(Self {
481 inner: self.inner.clone(),
482 })
483 } else {
484 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
485 "Builder already consumed",
486 ))
487 }
488 }
489
490 #[pyo3(name = "add_data_client")]
492 fn py_add_data_client(
493 &self,
494 name: Option<String>,
495 factory: Py<PyAny>,
496 config: Py<PyAny>,
497 ) -> PyResult<Self> {
498 let mut inner_ref = self.inner.borrow_mut();
499 if let Some(builder) = inner_ref.take() {
500 Python::attach(|py| -> PyResult<Self> {
501 let registry = get_global_pyo3_registry();
503
504 let boxed_factory = registry.extract_factory(py, factory.clone_ref(py))?;
505 let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
506
507 let factory_name = factory
509 .getattr(py, "name")?
510 .call0(py)?
511 .extract::<String>(py)?;
512 let client_name = name.unwrap_or(factory_name);
513
514 match builder.add_data_client(Some(client_name), boxed_factory, boxed_config) {
516 Ok(updated_builder) => {
517 *inner_ref = Some(updated_builder);
518 Ok(Self {
519 inner: self.inner.clone(),
520 })
521 }
522 Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
523 "Failed to add data client: {e}"
524 ))),
525 }
526 })
527 } else {
528 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
529 "Builder already consumed",
530 ))
531 }
532 }
533
534 #[pyo3(name = "build")]
536 fn py_build(&self) -> PyResult<LiveNode> {
537 let mut inner_ref = self.inner.borrow_mut();
538 if let Some(builder) = inner_ref.take() {
539 match builder.build() {
540 Ok(node) => Ok(node),
541 Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
542 e.to_string(),
543 )),
544 }
545 } else {
546 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
547 "Builder already consumed",
548 ))
549 }
550 }
551
552 fn __repr__(&self) -> String {
554 format!("{self:?}")
555 }
556}