1use std::{cell::RefCell, rc::Rc};
19
20use nautilus_common::{
21 actor::data_actor::{DataActorConfig, ImportableActorConfig},
22 enums::Environment,
23 live::runtime::get_runtime,
24 python::actor::PyDataActor,
25};
26use nautilus_core::{UUID4, python::to_pyruntime_err};
27use nautilus_model::identifiers::{ActorId, TraderId};
28use nautilus_system::get_global_pyo3_registry;
29use pyo3::{
30 exceptions::{PyRuntimeError, PyValueError},
31 prelude::*,
32 types::{PyDict, PyTuple},
33};
34use serde_json;
35
36use crate::node::{LiveNode, LiveNodeBuilder};
37
38#[pymethods]
39impl LiveNode {
40 #[staticmethod]
41 #[pyo3(name = "builder")]
42 fn py_builder(
43 name: String,
44 trader_id: TraderId,
45 environment: Environment,
46 ) -> PyResult<LiveNodeBuilderPy> {
47 match Self::builder(trader_id, environment) {
48 Ok(builder) => Ok(LiveNodeBuilderPy {
49 inner: Rc::new(RefCell::new(Some(builder.with_name(name)))),
50 }),
51 Err(e) => Err(PyErr::new::<PyRuntimeError, _>(e.to_string())),
52 }
53 }
54
55 #[getter]
56 #[pyo3(name = "environment")]
57 fn py_environment(&self) -> Environment {
58 self.environment()
59 }
60
61 #[getter]
62 #[pyo3(name = "trader_id")]
63 fn py_trader_id(&self) -> TraderId {
64 self.trader_id()
65 }
66
67 #[getter]
68 #[pyo3(name = "instance_id")]
69 const fn py_instance_id(&self) -> UUID4 {
70 self.instance_id()
71 }
72
73 #[getter]
74 #[pyo3(name = "is_running")]
75 const fn py_is_running(&self) -> bool {
76 self.is_running()
77 }
78
79 #[pyo3(name = "start")]
80 fn py_start(&mut self) -> PyResult<()> {
81 if self.is_running() {
82 return Err(PyRuntimeError::new_err("LiveNode is already running"));
83 }
84
85 get_runtime().block_on(async {
87 self.start()
88 .await
89 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
90 })
91 }
92
93 #[pyo3(name = "run")]
94 fn py_run(&mut self, py: Python) -> PyResult<()> {
95 if self.is_running() {
96 return Err(PyRuntimeError::new_err("LiveNode is already running"));
97 }
98
99 let handle = self.handle();
101
102 let signal_module = py.import("signal")?;
104 let original_handler =
105 signal_module.call_method1("signal", (2, signal_module.getattr("SIG_DFL")?))?; let handle_for_signal = handle;
109 let signal_callback = pyo3::types::PyCFunction::new_closure(
110 py,
111 None,
112 None,
113 move |_args: &pyo3::Bound<'_, PyTuple>,
114 _kwargs: Option<&pyo3::Bound<'_, PyDict>>|
115 -> PyResult<()> {
116 log::info!("Python signal handler called");
117 handle_for_signal.stop();
118 Ok(())
119 },
120 )?;
121
122 signal_module.call_method1("signal", (2, signal_callback))?;
124
125 let result = {
127 get_runtime().block_on(async {
128 self.run()
129 .await
130 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
131 })
132 };
133
134 signal_module.call_method1("signal", (2, original_handler))?;
136
137 result
138 }
139
140 #[pyo3(name = "stop")]
141 fn py_stop(&self) -> PyResult<()> {
142 if !self.is_running() {
143 return Err(PyRuntimeError::new_err("LiveNode is not running"));
144 }
145
146 self.handle().stop();
148 Ok(())
149 }
150
151 #[allow(
152 unsafe_code,
153 reason = "Required for Python actor component registration"
154 )]
155 #[pyo3(name = "add_actor_from_config")]
156 fn py_add_actor_from_config(
157 &mut self,
158 _py: Python,
159 config: ImportableActorConfig,
160 ) -> PyResult<()> {
161 log::debug!("`add_actor_from_config` with: {config:?}");
162
163 let parts: Vec<&str> = config.actor_path.split(':').collect();
165 if parts.len() != 2 {
166 return Err(PyValueError::new_err(
167 "actor_path must be in format 'module.path:ClassName'",
168 ));
169 }
170 let (module_name, class_name) = (parts[0], parts[1]);
171
172 log::info!("Importing actor from module: {module_name} class: {class_name}");
173
174 let _python_class = Python::attach(|py| -> PyResult<Py<PyAny>> {
176 let actor_module = py.import(module_name)?;
177 let actor_class = actor_module.getattr(class_name)?;
178 Ok(actor_class.unbind())
179 })
180 .map_err(|e| PyRuntimeError::new_err(format!("Failed to import Python class: {e}")))?;
181
182 let basic_data_actor_config = DataActorConfig::default();
185
186 log::debug!("Created basic DataActorConfig for Rust: {basic_data_actor_config:?}");
187
188 let python_actor = Python::attach(|py| -> anyhow::Result<Py<PyAny>> {
190 let actor_module = py
192 .import(module_name)
193 .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
194 let actor_class = actor_module
195 .getattr(class_name)
196 .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
197
198 let config_instance = if !config.config_path.is_empty() && !config.config.is_empty() {
200 let config_parts: Vec<&str> = config.config_path.split(':').collect();
202 if config_parts.len() != 2 {
203 anyhow::bail!(
204 "config_path must be in format 'module.path:ClassName', was {}",
205 config.config_path
206 );
207 }
208 let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
209
210 log::debug!("Importing config class from module: {config_module_name} class: {config_class_name}");
211
212 let config_module = py
214 .import(config_module_name)
215 .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
216 let config_class = config_module
217 .getattr(config_class_name)
218 .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
219
220 let py_dict = PyDict::new(py);
222 for (key, value) in &config.config {
223 let json_str = serde_json::to_string(value)
225 .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
226 let py_value = PyModule::import(py, "json")?
227 .call_method("loads", (json_str,), None)?;
228 py_dict.set_item(key, py_value)?;
229 }
230
231 log::debug!("Created config dict: {py_dict:?}");
232
233 let config_instance = {
235 match config_class.call((), Some(&py_dict)) {
237 Ok(instance) => {
238 log::debug!("Successfully created config instance with kwargs");
239
240 if let Err(e) = instance.call_method0("__post_init__") {
242 log::error!("Failed to call __post_init__ on config instance: {e}");
243 anyhow::bail!("__post_init__ failed: {e}");
244 }
245 log::debug!("Successfully called __post_init__ on config instance");
246
247 instance
248 },
249 Err(kwargs_err) => {
250 log::debug!("Failed to create config with kwargs: {kwargs_err}");
251
252 match config_class.call0() {
254 Ok(instance) => {
255 log::debug!("Created default config instance, setting attributes");
256 for (key, value) in &config.config {
257 let json_str = serde_json::to_string(value)
259 .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
260 let py_value = PyModule::import(py, "json")?
261 .call_method("loads", (json_str,), None)?;
262 if let Err(setattr_err) = instance.setattr(key, py_value) {
263 log::warn!("Failed to set attribute {key}: {setattr_err}");
264 }
265 }
266
267 if let Err(e) = instance.call_method0("__post_init__") {
269 log::error!("Failed to call __post_init__ on config instance: {e}");
270 anyhow::bail!("__post_init__ failed: {e}");
271 }
272 log::debug!("Called __post_init__ on config instance");
273
274 instance
275 },
276 Err(default_err) => {
277 log::debug!("Failed to create default config: {default_err}");
278
279 anyhow::bail!(
281 "Failed to create config instance. Tried kwargs approach: {kwargs_err}, default constructor: {default_err}"
282 );
283 }
284 }
285 }
286 }
287 };
288
289 log::debug!("Created config instance: {config_instance:?}");
290
291 Some(config_instance)
292 } else {
293 log::debug!("No config_path or empty config, using None");
294 None
295 };
296
297 let python_actor = if let Some(config_obj) = config_instance.clone() {
299 actor_class.call1((config_obj,))?
300 } else {
301 actor_class.call0()?
302 };
303
304 log::debug!("Created Python actor instance: {python_actor:?}");
305
306 let mut py_data_actor_ref = python_actor
308 .extract::<PyRefMut<PyDataActor>>()
309 .map_err(Into::<PyErr>::into)
310 .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
311
312 log::debug!(
313 "Internal PyDataActor mem_addr: {}, registered: {}",
314 &py_data_actor_ref.mem_address(),
315 py_data_actor_ref.is_registered()
316 );
317
318 if let Some(config_obj) = config_instance.as_ref() {
321 log::debug!("Extracting inherited config fields from Python actor config");
322
323 if let Ok(actor_id) = config_obj.getattr("actor_id")
325 && !actor_id.is_none() {
326 let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
328 actor_id_val
329 } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
330 ActorId::from(actor_id_str.as_str())
331 } else {
332 log::warn!("Failed to extract actor_id as ActorId or String");
333 anyhow::bail!("Invalid `actor_id` type");
334 };
335
336 log::debug!("Extracted actor_id: {actor_id_val}");
337 py_data_actor_ref.set_actor_id(actor_id_val);
338 }
339
340 if let Ok(log_events) = config_obj.getattr("log_events")
342 && let Ok(log_events_val) = log_events.extract::<bool>() {
343 log::debug!("Extracted log_events: {log_events_val}");
344 py_data_actor_ref.set_log_events(log_events_val);
345 }
346
347 if let Ok(log_commands) = config_obj.getattr("log_commands")
349 && let Ok(log_commands_val) = log_commands.extract::<bool>() {
350 log::debug!("Extracted log_commands: {log_commands_val}");
351 py_data_actor_ref.set_log_commands(log_commands_val);
352 }
353
354 log::debug!("Successfully updated PyDataActor config from Python actor instance");
355 }
356
357 py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
359
360 log::debug!("Set Python instance reference for method dispatch");
361
362 let trader_id = self.trader_id();
364 let clock = self.kernel().clock();
365 let cache = self.kernel().cache();
366
367 py_data_actor_ref
368 .register(trader_id, clock, cache)
369 .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
370
371 log::debug!(
372 "Internal PyDataActor registered: {}, state: {:?}",
373 py_data_actor_ref.is_registered(),
374 py_data_actor_ref.state()
375 );
376
377 Ok(python_actor.unbind())
378 })
379 .map_err(to_pyruntime_err)?;
380
381 let actor_id = Python::attach(|py| -> anyhow::Result<ActorId> {
382 let py_actor = python_actor.bind(py);
383 let py_data_actor_ref = py_actor
384 .cast::<PyDataActor>()
385 .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
386 let py_data_actor = py_data_actor_ref.borrow();
387 py_data_actor.register_in_global_registries();
388
389 Ok(py_data_actor.actor_id())
390 })
391 .map_err(to_pyruntime_err)?;
392
393 self.kernel_mut()
394 .trader
395 .add_actor_id_for_lifecycle(actor_id)
396 .map_err(to_pyruntime_err)?;
397
398 log::info!("Registered Python actor {actor_id}");
402 Ok(())
403 }
404
405 fn __repr__(&self) -> String {
407 format!(
408 "LiveNode(trader_id={}, environment={:?}, running={})",
409 self.trader_id(),
410 self.environment(),
411 self.is_running()
412 )
413 }
414}
415
416#[derive(Debug)]
419#[pyclass(name = "LiveNodeBuilder", module = "nautilus_trader.live", unsendable)]
420pub struct LiveNodeBuilderPy {
421 inner: Rc<RefCell<Option<LiveNodeBuilder>>>,
422}
423
424#[pymethods]
425impl LiveNodeBuilderPy {
426 #[pyo3(name = "with_instance_id")]
428 fn py_with_instance_id(&self, instance_id: UUID4) -> PyResult<Self> {
429 let mut inner_ref = self.inner.borrow_mut();
430 if let Some(builder) = inner_ref.take() {
431 *inner_ref = Some(builder.with_instance_id(instance_id));
432 Ok(Self {
433 inner: self.inner.clone(),
434 })
435 } else {
436 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
437 "Builder already consumed",
438 ))
439 }
440 }
441
442 #[pyo3(name = "with_load_state")]
444 fn py_with_load_state(&self, load_state: bool) -> PyResult<Self> {
445 let mut inner_ref = self.inner.borrow_mut();
446 if let Some(builder) = inner_ref.take() {
447 *inner_ref = Some(builder.with_load_state(load_state));
448 Ok(Self {
449 inner: self.inner.clone(),
450 })
451 } else {
452 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
453 "Builder already consumed",
454 ))
455 }
456 }
457
458 #[pyo3(name = "with_save_state")]
460 fn py_with_save_state(&self, save_state: bool) -> PyResult<Self> {
461 let mut inner_ref = self.inner.borrow_mut();
462 if let Some(builder) = inner_ref.take() {
463 *inner_ref = Some(builder.with_save_state(save_state));
464 Ok(Self {
465 inner: self.inner.clone(),
466 })
467 } else {
468 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
469 "Builder already consumed",
470 ))
471 }
472 }
473
474 #[pyo3(name = "add_data_client")]
476 fn py_add_data_client(
477 &self,
478 name: Option<String>,
479 factory: Py<PyAny>,
480 config: Py<PyAny>,
481 ) -> PyResult<Self> {
482 let mut inner_ref = self.inner.borrow_mut();
483 if let Some(builder) = inner_ref.take() {
484 Python::attach(|py| -> PyResult<Self> {
485 let registry = get_global_pyo3_registry();
487
488 let boxed_factory = registry.extract_factory(py, factory.clone_ref(py))?;
489 let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
490
491 let factory_name = factory
493 .getattr(py, "name")?
494 .call0(py)?
495 .extract::<String>(py)?;
496 let client_name = name.unwrap_or(factory_name);
497
498 match builder.add_data_client(Some(client_name), boxed_factory, boxed_config) {
500 Ok(updated_builder) => {
501 *inner_ref = Some(updated_builder);
502 Ok(Self {
503 inner: self.inner.clone(),
504 })
505 }
506 Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
507 "Failed to add data client: {e}"
508 ))),
509 }
510 })
511 } else {
512 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
513 "Builder already consumed",
514 ))
515 }
516 }
517
518 #[pyo3(name = "build")]
520 fn py_build(&self) -> PyResult<LiveNode> {
521 let mut inner_ref = self.inner.borrow_mut();
522 if let Some(builder) = inner_ref.take() {
523 match builder.build() {
524 Ok(node) => Ok(node),
525 Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
526 e.to_string(),
527 )),
528 }
529 } else {
530 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
531 "Builder already consumed",
532 ))
533 }
534 }
535
536 fn __repr__(&self) -> String {
538 format!("{self:?}")
539 }
540}