nautilus_live/python/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for live node.
17
18use std::{cell::RefCell, rc::Rc};
19
20use nautilus_common::{
21    actor::data_actor::{DataActorConfig, ImportableActorConfig},
22    component::{Component, register_component_actor_by_ref},
23    enums::Environment,
24    python::actor::PyDataActor,
25    runtime::get_runtime,
26};
27use nautilus_core::{UUID4, python::to_pyruntime_err};
28use nautilus_model::identifiers::{ActorId, TraderId};
29use nautilus_system::get_global_pyo3_registry;
30use pyo3::{
31    exceptions::{PyRuntimeError, PyValueError},
32    prelude::*,
33    types::{PyDict, PyTuple},
34};
35use serde_json;
36
37use crate::node::{LiveNode, LiveNodeBuilder};
38
39#[pymethods]
40impl LiveNode {
41    /// Creates a new `LiveNode` builder.
42    #[staticmethod]
43    #[pyo3(name = "builder")]
44    fn py_builder(
45        name: String,
46        trader_id: TraderId,
47        environment: Environment,
48    ) -> PyResult<LiveNodeBuilderPy> {
49        match Self::builder(name, trader_id, environment) {
50            Ok(builder) => Ok(LiveNodeBuilderPy {
51                inner: Rc::new(RefCell::new(Some(builder))),
52            }),
53            Err(e) => Err(PyErr::new::<PyRuntimeError, _>(e.to_string())),
54        }
55    }
56
57    /// Returns the node's environment.
58    #[getter]
59    #[pyo3(name = "environment")]
60    fn py_environment(&self) -> Environment {
61        self.environment()
62    }
63
64    /// Returns the node's trader ID.
65    #[getter]
66    #[pyo3(name = "trader_id")]
67    fn py_trader_id(&self) -> TraderId {
68        self.trader_id()
69    }
70
71    /// Returns the node's instance ID.
72    #[getter]
73    #[pyo3(name = "instance_id")]
74    const fn py_instance_id(&self) -> UUID4 {
75        self.instance_id()
76    }
77
78    /// Returns whether the node is running.
79    #[getter]
80    #[pyo3(name = "is_running")]
81    const fn py_is_running(&self) -> bool {
82        self.is_running()
83    }
84
85    #[pyo3(name = "start")]
86    fn py_start(&mut self) -> PyResult<()> {
87        if self.is_running() {
88            return Err(PyRuntimeError::new_err("LiveNode is already running"));
89        }
90
91        // Non-blocking start - just start the node in the background
92        get_runtime().block_on(async {
93            self.start()
94                .await
95                .map_err(|e| PyRuntimeError::new_err(e.to_string()))
96        })
97    }
98
99    #[pyo3(name = "run")]
100    fn py_run(&mut self, py: Python) -> PyResult<()> {
101        if self.is_running() {
102            return Err(PyRuntimeError::new_err("LiveNode is already running"));
103        }
104
105        // Get a handle for coordinating with the signal checker
106        let handle = self.handle();
107
108        // Import signal module
109        let signal_module = py.import("signal")?;
110        let original_handler =
111            signal_module.call_method1("signal", (2, signal_module.getattr("SIG_DFL")?))?; // Save original SIGINT handler (signal 2)
112
113        // Set up a custom signal handler that uses our handle
114        let handle_for_signal = handle;
115        let signal_callback = pyo3::types::PyCFunction::new_closure(
116            py,
117            None,
118            None,
119            move |_args: &pyo3::Bound<'_, PyTuple>,
120                  _kwargs: Option<&pyo3::Bound<'_, PyDict>>|
121                  -> PyResult<()> {
122                log::info!("Python signal handler called");
123                handle_for_signal.stop();
124                Ok(())
125            },
126        )?;
127
128        // Install our signal handler
129        signal_module.call_method1("signal", (2, signal_callback))?;
130
131        // Run the node and restore signal handler afterward
132        let result = {
133            get_runtime().block_on(async {
134                self.run()
135                    .await
136                    .map_err(|e| PyRuntimeError::new_err(e.to_string()))
137            })
138        };
139
140        // Restore original signal handler
141        signal_module.call_method1("signal", (2, original_handler))?;
142
143        result
144    }
145
146    #[pyo3(name = "stop")]
147    fn py_stop(&self) -> PyResult<()> {
148        if !self.is_running() {
149            return Err(PyRuntimeError::new_err("LiveNode is not running"));
150        }
151
152        // Use the handle to signal stop - this is thread-safe and doesn't require async
153        self.handle().stop();
154        Ok(())
155    }
156
157    #[allow(unsafe_code)] // Required for Python actor component registration
158    #[pyo3(name = "add_actor_from_config")]
159    fn py_add_actor_from_config(
160        &mut self,
161        _py: Python,
162        config: ImportableActorConfig,
163    ) -> PyResult<()> {
164        log::debug!("`add_actor_from_config` with: {config:?}");
165
166        // Extract module and class name from actor_path
167        let parts: Vec<&str> = config.actor_path.split(':').collect();
168        if parts.len() != 2 {
169            return Err(PyValueError::new_err(
170                "actor_path must be in format 'module.path:ClassName'",
171            ));
172        }
173        let (module_name, class_name) = (parts[0], parts[1]);
174
175        log::info!("Importing actor from module: {module_name} class: {class_name}");
176
177        // Import the Python class to verify it exists and get it for method dispatch
178        let _python_class = Python::with_gil(|py| -> PyResult<PyObject> {
179            let actor_module = py.import(module_name)?;
180            let actor_class = actor_module.getattr(class_name)?;
181            Ok(actor_class.unbind())
182        })
183        .map_err(|e| PyRuntimeError::new_err(format!("Failed to import Python class: {e}")))?;
184
185        // Create default DataActorConfig for Rust PyDataActor
186        // Inherited config attributes will be extracted and wired in after Python actor creation
187        let basic_data_actor_config = DataActorConfig::default();
188
189        log::debug!("Created basic DataActorConfig for Rust: {basic_data_actor_config:?}");
190
191        // Create the Python actor and register the internal PyDataActor
192        let python_actor = Python::with_gil(|py| -> anyhow::Result<PyObject> {
193            // Import the Python class
194            let actor_module = py
195                .import(module_name)
196                .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
197            let actor_class = actor_module
198                .getattr(class_name)
199                .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
200
201            // Create config instance if config_path and config are provided
202            let config_instance = if !config.config_path.is_empty() && !config.config.is_empty() {
203                // Parse the config_path to get module and class
204                let config_parts: Vec<&str> = config.config_path.split(':').collect();
205                if config_parts.len() != 2 {
206                    anyhow::bail!(
207                        "config_path must be in format 'module.path:ClassName', was {}",
208                        config.config_path
209                    );
210                }
211                let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
212
213                log::debug!("Importing config class from module: {config_module_name} class: {config_class_name}");
214
215                // Import the config class
216                let config_module = py
217                    .import(config_module_name)
218                    .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
219                let config_class = config_module
220                    .getattr(config_class_name)
221                    .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
222
223                // Convert the serde_json::Value config dict to a Python dict
224                let py_dict = PyDict::new(py);
225                for (key, value) in &config.config {
226                    // Convert serde_json::Value back to Python object via JSON
227                    let json_str = serde_json::to_string(value)
228                        .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
229                    let py_value = PyModule::import(py, "json")?
230                        .call_method("loads", (json_str,), None)?;
231                    py_dict.set_item(key, py_value)?;
232                }
233
234                log::debug!("Created config dict: {py_dict:?}");
235
236                // Try multiple approaches to create the config instance
237                let config_instance = {
238                    // First, try calling the config class with **kwargs (this works if the dataclass handles string conversion)
239                    match config_class.call((), Some(&py_dict)) {
240                        Ok(instance) => {
241                            log::debug!("Successfully created config instance with kwargs");
242
243                            // Manually call __post_init__ if it exists
244                            if let Err(e) = instance.call_method0("__post_init__") {
245                                log::error!("Failed to call __post_init__ on config instance: {e}");
246                                anyhow::bail!("__post_init__ failed: {e}");
247                            }
248                            log::debug!("Successfully called __post_init__ on config instance");
249
250                            instance
251                        },
252                        Err(kwargs_err) => {
253                            log::debug!("Failed to create config with kwargs: {kwargs_err}");
254
255                            // Second approach: try to create with default constructor and set attributes
256                            match config_class.call0() {
257                                Ok(instance) => {
258                                    log::debug!("Created default config instance, setting attributes");
259                                    for (key, value) in &config.config {
260                                        // Convert serde_json::Value to Python object
261                                        let json_str = serde_json::to_string(value)
262                                            .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
263                                        let py_value = PyModule::import(py, "json")?
264                                            .call_method("loads", (json_str,), None)?;
265                                        if let Err(setattr_err) = instance.setattr(key, py_value) {
266                                            log::warn!("Failed to set attribute {key}: {setattr_err}");
267                                        }
268                                    }
269
270                                    // Manually call __post_init__ if it exists
271                                    if let Err(e) = instance.call_method0("__post_init__") {
272                                        log::error!("Failed to call __post_init__ on config instance: {e}");
273                                        anyhow::bail!("__post_init__ failed: {e}");
274                                    }
275                                    log::debug!("Called __post_init__ on config instance");
276
277                                    instance
278                                },
279                                Err(default_err) => {
280                                    log::debug!("Failed to create default config: {default_err}");
281
282                                    // If both approaches fail, return the original error
283                                    anyhow::bail!(
284                                        "Failed to create config instance. Tried kwargs approach: {kwargs_err}, default constructor: {default_err}"
285                                    );
286                                }
287                            }
288                        }
289                    }
290                };
291
292                log::debug!("Created config instance: {config_instance:?}");
293
294                Some(config_instance)
295            } else {
296                log::debug!("No config_path or empty config, using None");
297                None
298            };
299
300            // Create the Python actor instance with the config
301            let python_actor = if let Some(config_obj) = config_instance.clone() {
302                actor_class.call1((config_obj,))?
303            } else {
304                actor_class.call0()?
305            };
306
307            log::debug!("Created Python actor instance: {python_actor:?}");
308
309            // Get a mutable reference to the internal PyDataActor for registration
310            let mut py_data_actor_ref = python_actor.extract::<PyRefMut<PyDataActor>>()?;
311
312            log::debug!(
313                "Internal PyDataActor mem_addr: {}, registered: {}",
314                &py_data_actor_ref.mem_address(),
315                py_data_actor_ref.is_registered()
316            );
317
318            // Extract inherited DataActorConfig fields from the Python actor instance
319            // and wire them into the PyDataActor's core config
320            if let Some(config_obj) = config_instance.as_ref() {
321                log::debug!("Extracting inherited config fields from Python actor config");
322
323                // Extract actor_id if present
324                if let Ok(actor_id) = config_obj.getattr("actor_id")
325                    && !actor_id.is_none() {
326                        // Try to extract as ActorId first, then as string
327                        let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
328                            actor_id_val
329                        } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
330                            ActorId::from(actor_id_str.as_str())
331                        } else {
332                            log::warn!("Failed to extract actor_id as ActorId or String");
333                            anyhow::bail!("Invalid `actor_id` type");
334                        };
335
336                        log::debug!("Extracted actor_id: {actor_id_val}");
337                        py_data_actor_ref.set_actor_id(actor_id_val);
338                    }
339
340                // Extract log_events if present
341                if let Ok(log_events) = config_obj.getattr("log_events")
342                    && let Ok(log_events_val) = log_events.extract::<bool>() {
343                        log::debug!("Extracted log_events: {log_events_val}");
344                        py_data_actor_ref.set_log_events(log_events_val);
345                    }
346
347                // Extract log_commands if present
348                if let Ok(log_commands) = config_obj.getattr("log_commands")
349                    && let Ok(log_commands_val) = log_commands.extract::<bool>() {
350                        log::debug!("Extracted log_commands: {log_commands_val}");
351                        py_data_actor_ref.set_log_commands(log_commands_val);
352                    }
353
354                log::debug!("Successfully updated PyDataActor config from Python actor instance");
355            }
356
357            // Set the Python instance reference for method dispatch on the original
358            py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
359
360            log::debug!("Set Python instance reference for method dispatch");
361
362            // Register the internal PyDataActor
363            let trader_id = self.trader_id();
364            let clock = self.kernel().clock();
365            let cache = self.kernel().cache();
366
367            py_data_actor_ref
368                .register(trader_id, clock, cache)
369                .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
370
371            log::debug!(
372                "Internal PyDataActor registered: {}, state: {:?}",
373                py_data_actor_ref.is_registered(),
374                py_data_actor_ref.state()
375            );
376
377            Ok(python_actor.unbind())
378        })
379        .map_err(to_pyruntime_err)?;
380
381        // Add the actor to the trader's lifecycle management without consuming it
382        let actor_id = Python::with_gil(
383            |py| -> anyhow::Result<nautilus_model::identifiers::ActorId> {
384                let py_actor = python_actor.bind(py);
385                let py_data_actor_ref = py_actor
386                    .downcast::<PyDataActor>()
387                    .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
388                let py_data_actor = py_data_actor_ref.borrow();
389
390                // Register the component in the global registry using the unsafe method
391                // SAFETY: The Python instance will remain alive, keeping the PyDataActor valid
392                unsafe {
393                    register_component_actor_by_ref(&*py_data_actor);
394                }
395
396                Ok(py_data_actor.actor_id())
397            },
398        )
399        .map_err(to_pyruntime_err)?;
400
401        // TODO: Add the actor ID to the trader for lifecycle management; clean up approach
402        self.kernel_mut()
403            .trader
404            .add_actor_id_for_lifecycle(actor_id)
405            .map_err(to_pyruntime_err)?;
406
407        // Store the Python actor reference to prevent garbage collection
408        // TODO: Add to a proper LiveNode registry for Python actors
409        std::mem::forget(python_actor); // Prevent dropping - we'll manage lifecycle manually
410
411        log::info!("Registered Python actor {actor_id}");
412        Ok(())
413    }
414
415    /// Returns a string representation of the node.
416    fn __repr__(&self) -> String {
417        format!(
418            "LiveNode(trader_id={}, environment={:?}, running={})",
419            self.trader_id(),
420            self.environment(),
421            self.is_running()
422        )
423    }
424}
425
426/// Python wrapper for `LiveNodeBuilder` that uses interior mutability
427/// to work around PyO3's shared ownership model.
428#[derive(Debug)]
429#[pyclass(name = "LiveNodeBuilder", module = "nautilus_trader.live", unsendable)]
430pub struct LiveNodeBuilderPy {
431    inner: Rc<RefCell<Option<LiveNodeBuilder>>>,
432}
433
434#[pymethods]
435impl LiveNodeBuilderPy {
436    /// Sets the instance ID for the node.
437    #[pyo3(name = "with_instance_id")]
438    fn py_with_instance_id(&self, instance_id: UUID4) -> PyResult<Self> {
439        let mut inner_ref = self.inner.borrow_mut();
440        if let Some(builder) = inner_ref.take() {
441            *inner_ref = Some(builder.with_instance_id(instance_id));
442            Ok(Self {
443                inner: self.inner.clone(),
444            })
445        } else {
446            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
447                "Builder already consumed",
448            ))
449        }
450    }
451
452    /// Sets whether to load state on startup.
453    #[pyo3(name = "with_load_state")]
454    fn py_with_load_state(&self, load_state: bool) -> PyResult<Self> {
455        let mut inner_ref = self.inner.borrow_mut();
456        if let Some(builder) = inner_ref.take() {
457            *inner_ref = Some(builder.with_load_state(load_state));
458            Ok(Self {
459                inner: self.inner.clone(),
460            })
461        } else {
462            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
463                "Builder already consumed",
464            ))
465        }
466    }
467
468    /// Sets whether to save state on shutdown.
469    #[pyo3(name = "with_save_state")]
470    fn py_with_save_state(&self, save_state: bool) -> PyResult<Self> {
471        let mut inner_ref = self.inner.borrow_mut();
472        if let Some(builder) = inner_ref.take() {
473            *inner_ref = Some(builder.with_save_state(save_state));
474            Ok(Self {
475                inner: self.inner.clone(),
476            })
477        } else {
478            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
479                "Builder already consumed",
480            ))
481        }
482    }
483
484    /// Adds a data client with factory and configuration.
485    #[pyo3(name = "add_data_client")]
486    fn py_add_data_client(
487        &self,
488        name: Option<String>,
489        factory: PyObject,
490        config: PyObject,
491    ) -> PyResult<Self> {
492        let mut inner_ref = self.inner.borrow_mut();
493        if let Some(builder) = inner_ref.take() {
494            Python::with_gil(|py| -> PyResult<Self> {
495                // Use the global registry to extract PyObjects to trait objects
496                let registry = get_global_pyo3_registry();
497
498                let boxed_factory = registry.extract_factory(py, factory.clone_ref(py))?;
499                let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
500
501                // Use the factory name from the original factory for the client name
502                let factory_name = factory
503                    .getattr(py, "name")?
504                    .call0(py)?
505                    .extract::<String>(py)?;
506                let client_name = name.unwrap_or(factory_name);
507
508                // Add the data client to the builder using boxed trait objects
509                match builder.add_data_client(Some(client_name), boxed_factory, boxed_config) {
510                    Ok(updated_builder) => {
511                        *inner_ref = Some(updated_builder);
512                        Ok(Self {
513                            inner: self.inner.clone(),
514                        })
515                    }
516                    Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
517                        "Failed to add data client: {e}"
518                    ))),
519                }
520            })
521        } else {
522            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
523                "Builder already consumed",
524            ))
525        }
526    }
527
528    /// Builds the node.
529    #[pyo3(name = "build")]
530    fn py_build(&self) -> PyResult<LiveNode> {
531        let mut inner_ref = self.inner.borrow_mut();
532        if let Some(builder) = inner_ref.take() {
533            match builder.build() {
534                Ok(node) => Ok(node),
535                Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
536                    e.to_string(),
537                )),
538            }
539        } else {
540            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
541                "Builder already consumed",
542            ))
543        }
544    }
545
546    /// Returns a string representation of the builder.
547    fn __repr__(&self) -> String {
548        format!("{self:?}")
549    }
550}