1use std::{cell::RefCell, rc::Rc};
19
20use nautilus_common::{
21 actor::data_actor::{DataActorConfig, ImportableActorConfig},
22 component::{Component, register_component_actor_by_ref},
23 enums::Environment,
24 python::actor::PyDataActor,
25 runtime::get_runtime,
26};
27use nautilus_core::{UUID4, python::to_pyruntime_err};
28use nautilus_model::identifiers::{ActorId, TraderId};
29use nautilus_system::get_global_pyo3_registry;
30use pyo3::{
31 exceptions::{PyRuntimeError, PyValueError},
32 prelude::*,
33 types::{PyDict, PyTuple},
34};
35use serde_json;
36
37use crate::node::{LiveNode, LiveNodeBuilder};
38
39#[pymethods]
40impl LiveNode {
41 #[staticmethod]
43 #[pyo3(name = "builder")]
44 fn py_builder(
45 name: String,
46 trader_id: TraderId,
47 environment: Environment,
48 ) -> PyResult<LiveNodeBuilderPy> {
49 match Self::builder(name, trader_id, environment) {
50 Ok(builder) => Ok(LiveNodeBuilderPy {
51 inner: Rc::new(RefCell::new(Some(builder))),
52 }),
53 Err(e) => Err(PyErr::new::<PyRuntimeError, _>(e.to_string())),
54 }
55 }
56
57 #[getter]
59 #[pyo3(name = "environment")]
60 fn py_environment(&self) -> Environment {
61 self.environment()
62 }
63
64 #[getter]
66 #[pyo3(name = "trader_id")]
67 fn py_trader_id(&self) -> TraderId {
68 self.trader_id()
69 }
70
71 #[getter]
73 #[pyo3(name = "instance_id")]
74 const fn py_instance_id(&self) -> UUID4 {
75 self.instance_id()
76 }
77
78 #[getter]
80 #[pyo3(name = "is_running")]
81 const fn py_is_running(&self) -> bool {
82 self.is_running()
83 }
84
85 #[pyo3(name = "start")]
86 fn py_start(&mut self) -> PyResult<()> {
87 if self.is_running() {
88 return Err(PyRuntimeError::new_err("LiveNode is already running"));
89 }
90
91 get_runtime().block_on(async {
93 self.start()
94 .await
95 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
96 })
97 }
98
99 #[pyo3(name = "run")]
100 fn py_run(&mut self, py: Python) -> PyResult<()> {
101 if self.is_running() {
102 return Err(PyRuntimeError::new_err("LiveNode is already running"));
103 }
104
105 let handle = self.handle();
107
108 let signal_module = py.import("signal")?;
110 let original_handler =
111 signal_module.call_method1("signal", (2, signal_module.getattr("SIG_DFL")?))?; let handle_for_signal = handle;
115 let signal_callback = pyo3::types::PyCFunction::new_closure(
116 py,
117 None,
118 None,
119 move |_args: &pyo3::Bound<'_, PyTuple>,
120 _kwargs: Option<&pyo3::Bound<'_, PyDict>>|
121 -> PyResult<()> {
122 log::info!("Python signal handler called");
123 handle_for_signal.stop();
124 Ok(())
125 },
126 )?;
127
128 signal_module.call_method1("signal", (2, signal_callback))?;
130
131 let result = {
133 get_runtime().block_on(async {
134 self.run()
135 .await
136 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
137 })
138 };
139
140 signal_module.call_method1("signal", (2, original_handler))?;
142
143 result
144 }
145
146 #[pyo3(name = "stop")]
147 fn py_stop(&self) -> PyResult<()> {
148 if !self.is_running() {
149 return Err(PyRuntimeError::new_err("LiveNode is not running"));
150 }
151
152 self.handle().stop();
154 Ok(())
155 }
156
157 #[allow(
158 unsafe_code,
159 reason = "Required for Python actor component registration"
160 )]
161 #[pyo3(name = "add_actor_from_config")]
162 fn py_add_actor_from_config(
163 &mut self,
164 _py: Python,
165 config: ImportableActorConfig,
166 ) -> PyResult<()> {
167 log::debug!("`add_actor_from_config` with: {config:?}");
168
169 let parts: Vec<&str> = config.actor_path.split(':').collect();
171 if parts.len() != 2 {
172 return Err(PyValueError::new_err(
173 "actor_path must be in format 'module.path:ClassName'",
174 ));
175 }
176 let (module_name, class_name) = (parts[0], parts[1]);
177
178 log::info!("Importing actor from module: {module_name} class: {class_name}");
179
180 let _python_class = Python::attach(|py| -> PyResult<Py<PyAny>> {
182 let actor_module = py.import(module_name)?;
183 let actor_class = actor_module.getattr(class_name)?;
184 Ok(actor_class.unbind())
185 })
186 .map_err(|e| PyRuntimeError::new_err(format!("Failed to import Python class: {e}")))?;
187
188 let basic_data_actor_config = DataActorConfig::default();
191
192 log::debug!("Created basic DataActorConfig for Rust: {basic_data_actor_config:?}");
193
194 let python_actor = Python::attach(|py| -> anyhow::Result<Py<PyAny>> {
196 let actor_module = py
198 .import(module_name)
199 .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
200 let actor_class = actor_module
201 .getattr(class_name)
202 .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
203
204 let config_instance = if !config.config_path.is_empty() && !config.config.is_empty() {
206 let config_parts: Vec<&str> = config.config_path.split(':').collect();
208 if config_parts.len() != 2 {
209 anyhow::bail!(
210 "config_path must be in format 'module.path:ClassName', was {}",
211 config.config_path
212 );
213 }
214 let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
215
216 log::debug!("Importing config class from module: {config_module_name} class: {config_class_name}");
217
218 let config_module = py
220 .import(config_module_name)
221 .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
222 let config_class = config_module
223 .getattr(config_class_name)
224 .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
225
226 let py_dict = PyDict::new(py);
228 for (key, value) in &config.config {
229 let json_str = serde_json::to_string(value)
231 .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
232 let py_value = PyModule::import(py, "json")?
233 .call_method("loads", (json_str,), None)?;
234 py_dict.set_item(key, py_value)?;
235 }
236
237 log::debug!("Created config dict: {py_dict:?}");
238
239 let config_instance = {
241 match config_class.call((), Some(&py_dict)) {
243 Ok(instance) => {
244 log::debug!("Successfully created config instance with kwargs");
245
246 if let Err(e) = instance.call_method0("__post_init__") {
248 log::error!("Failed to call __post_init__ on config instance: {e}");
249 anyhow::bail!("__post_init__ failed: {e}");
250 }
251 log::debug!("Successfully called __post_init__ on config instance");
252
253 instance
254 },
255 Err(kwargs_err) => {
256 log::debug!("Failed to create config with kwargs: {kwargs_err}");
257
258 match config_class.call0() {
260 Ok(instance) => {
261 log::debug!("Created default config instance, setting attributes");
262 for (key, value) in &config.config {
263 let json_str = serde_json::to_string(value)
265 .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
266 let py_value = PyModule::import(py, "json")?
267 .call_method("loads", (json_str,), None)?;
268 if let Err(setattr_err) = instance.setattr(key, py_value) {
269 log::warn!("Failed to set attribute {key}: {setattr_err}");
270 }
271 }
272
273 if let Err(e) = instance.call_method0("__post_init__") {
275 log::error!("Failed to call __post_init__ on config instance: {e}");
276 anyhow::bail!("__post_init__ failed: {e}");
277 }
278 log::debug!("Called __post_init__ on config instance");
279
280 instance
281 },
282 Err(default_err) => {
283 log::debug!("Failed to create default config: {default_err}");
284
285 anyhow::bail!(
287 "Failed to create config instance. Tried kwargs approach: {kwargs_err}, default constructor: {default_err}"
288 );
289 }
290 }
291 }
292 }
293 };
294
295 log::debug!("Created config instance: {config_instance:?}");
296
297 Some(config_instance)
298 } else {
299 log::debug!("No config_path or empty config, using None");
300 None
301 };
302
303 let python_actor = if let Some(config_obj) = config_instance.clone() {
305 actor_class.call1((config_obj,))?
306 } else {
307 actor_class.call0()?
308 };
309
310 log::debug!("Created Python actor instance: {python_actor:?}");
311
312 let mut py_data_actor_ref = python_actor.extract::<PyRefMut<PyDataActor>>()?;
314
315 log::debug!(
316 "Internal PyDataActor mem_addr: {}, registered: {}",
317 &py_data_actor_ref.mem_address(),
318 py_data_actor_ref.is_registered()
319 );
320
321 if let Some(config_obj) = config_instance.as_ref() {
324 log::debug!("Extracting inherited config fields from Python actor config");
325
326 if let Ok(actor_id) = config_obj.getattr("actor_id")
328 && !actor_id.is_none() {
329 let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
331 actor_id_val
332 } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
333 ActorId::from(actor_id_str.as_str())
334 } else {
335 log::warn!("Failed to extract actor_id as ActorId or String");
336 anyhow::bail!("Invalid `actor_id` type");
337 };
338
339 log::debug!("Extracted actor_id: {actor_id_val}");
340 py_data_actor_ref.set_actor_id(actor_id_val);
341 }
342
343 if let Ok(log_events) = config_obj.getattr("log_events")
345 && let Ok(log_events_val) = log_events.extract::<bool>() {
346 log::debug!("Extracted log_events: {log_events_val}");
347 py_data_actor_ref.set_log_events(log_events_val);
348 }
349
350 if let Ok(log_commands) = config_obj.getattr("log_commands")
352 && let Ok(log_commands_val) = log_commands.extract::<bool>() {
353 log::debug!("Extracted log_commands: {log_commands_val}");
354 py_data_actor_ref.set_log_commands(log_commands_val);
355 }
356
357 log::debug!("Successfully updated PyDataActor config from Python actor instance");
358 }
359
360 py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
362
363 log::debug!("Set Python instance reference for method dispatch");
364
365 let trader_id = self.trader_id();
367 let clock = self.kernel().clock();
368 let cache = self.kernel().cache();
369
370 py_data_actor_ref
371 .register(trader_id, clock, cache)
372 .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
373
374 log::debug!(
375 "Internal PyDataActor registered: {}, state: {:?}",
376 py_data_actor_ref.is_registered(),
377 py_data_actor_ref.state()
378 );
379
380 Ok(python_actor.unbind())
381 })
382 .map_err(to_pyruntime_err)?;
383
384 let actor_id = Python::attach(
386 |py| -> anyhow::Result<nautilus_model::identifiers::ActorId> {
387 let py_actor = python_actor.bind(py);
388 let py_data_actor_ref = py_actor
389 .downcast::<PyDataActor>()
390 .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
391 let py_data_actor = py_data_actor_ref.borrow();
392
393 unsafe {
396 register_component_actor_by_ref(&*py_data_actor);
397 }
398
399 Ok(py_data_actor.actor_id())
400 },
401 )
402 .map_err(to_pyruntime_err)?;
403
404 self.kernel_mut()
406 .trader
407 .add_actor_id_for_lifecycle(actor_id)
408 .map_err(to_pyruntime_err)?;
409
410 std::mem::forget(python_actor); log::info!("Registered Python actor {actor_id}");
415 Ok(())
416 }
417
418 fn __repr__(&self) -> String {
420 format!(
421 "LiveNode(trader_id={}, environment={:?}, running={})",
422 self.trader_id(),
423 self.environment(),
424 self.is_running()
425 )
426 }
427}
428
429#[derive(Debug)]
432#[pyclass(name = "LiveNodeBuilder", module = "nautilus_trader.live", unsendable)]
433pub struct LiveNodeBuilderPy {
434 inner: Rc<RefCell<Option<LiveNodeBuilder>>>,
435}
436
437#[pymethods]
438impl LiveNodeBuilderPy {
439 #[pyo3(name = "with_instance_id")]
441 fn py_with_instance_id(&self, instance_id: UUID4) -> PyResult<Self> {
442 let mut inner_ref = self.inner.borrow_mut();
443 if let Some(builder) = inner_ref.take() {
444 *inner_ref = Some(builder.with_instance_id(instance_id));
445 Ok(Self {
446 inner: self.inner.clone(),
447 })
448 } else {
449 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
450 "Builder already consumed",
451 ))
452 }
453 }
454
455 #[pyo3(name = "with_load_state")]
457 fn py_with_load_state(&self, load_state: bool) -> PyResult<Self> {
458 let mut inner_ref = self.inner.borrow_mut();
459 if let Some(builder) = inner_ref.take() {
460 *inner_ref = Some(builder.with_load_state(load_state));
461 Ok(Self {
462 inner: self.inner.clone(),
463 })
464 } else {
465 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
466 "Builder already consumed",
467 ))
468 }
469 }
470
471 #[pyo3(name = "with_save_state")]
473 fn py_with_save_state(&self, save_state: bool) -> PyResult<Self> {
474 let mut inner_ref = self.inner.borrow_mut();
475 if let Some(builder) = inner_ref.take() {
476 *inner_ref = Some(builder.with_save_state(save_state));
477 Ok(Self {
478 inner: self.inner.clone(),
479 })
480 } else {
481 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
482 "Builder already consumed",
483 ))
484 }
485 }
486
487 #[pyo3(name = "add_data_client")]
489 fn py_add_data_client(
490 &self,
491 name: Option<String>,
492 factory: Py<PyAny>,
493 config: Py<PyAny>,
494 ) -> PyResult<Self> {
495 let mut inner_ref = self.inner.borrow_mut();
496 if let Some(builder) = inner_ref.take() {
497 Python::attach(|py| -> PyResult<Self> {
498 let registry = get_global_pyo3_registry();
500
501 let boxed_factory = registry.extract_factory(py, factory.clone_ref(py))?;
502 let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
503
504 let factory_name = factory
506 .getattr(py, "name")?
507 .call0(py)?
508 .extract::<String>(py)?;
509 let client_name = name.unwrap_or(factory_name);
510
511 match builder.add_data_client(Some(client_name), boxed_factory, boxed_config) {
513 Ok(updated_builder) => {
514 *inner_ref = Some(updated_builder);
515 Ok(Self {
516 inner: self.inner.clone(),
517 })
518 }
519 Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
520 "Failed to add data client: {e}"
521 ))),
522 }
523 })
524 } else {
525 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
526 "Builder already consumed",
527 ))
528 }
529 }
530
531 #[pyo3(name = "build")]
533 fn py_build(&self) -> PyResult<LiveNode> {
534 let mut inner_ref = self.inner.borrow_mut();
535 if let Some(builder) = inner_ref.take() {
536 match builder.build() {
537 Ok(node) => Ok(node),
538 Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
539 e.to_string(),
540 )),
541 }
542 } else {
543 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
544 "Builder already consumed",
545 ))
546 }
547 }
548
549 fn __repr__(&self) -> String {
551 format!("{self:?}")
552 }
553}