nautilus_live/python/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for live node.
17
18use std::{cell::RefCell, rc::Rc};
19
20use nautilus_common::{
21    actor::data_actor::{DataActorConfig, ImportableActorConfig},
22    component::{Component, register_component_actor_by_ref},
23    enums::Environment,
24    python::actor::PyDataActor,
25    runtime::get_runtime,
26};
27use nautilus_core::{UUID4, python::to_pyruntime_err};
28use nautilus_model::identifiers::{ActorId, TraderId};
29use nautilus_system::get_global_pyo3_registry;
30use pyo3::{
31    exceptions::{PyRuntimeError, PyValueError},
32    prelude::*,
33    types::{PyDict, PyTuple},
34};
35use serde_json;
36
37use crate::node::{LiveNode, LiveNodeBuilder};
38
39#[pymethods]
40impl LiveNode {
41    /// Creates a new `LiveNode` builder.
42    #[staticmethod]
43    #[pyo3(name = "builder")]
44    fn py_builder(
45        name: String,
46        trader_id: TraderId,
47        environment: Environment,
48    ) -> PyResult<LiveNodeBuilderPy> {
49        match Self::builder(name, trader_id, environment) {
50            Ok(builder) => Ok(LiveNodeBuilderPy {
51                inner: Rc::new(RefCell::new(Some(builder))),
52            }),
53            Err(e) => Err(PyErr::new::<PyRuntimeError, _>(e.to_string())),
54        }
55    }
56
57    /// Returns the node's environment.
58    #[getter]
59    #[pyo3(name = "environment")]
60    fn py_environment(&self) -> Environment {
61        self.environment()
62    }
63
64    /// Returns the node's trader ID.
65    #[getter]
66    #[pyo3(name = "trader_id")]
67    fn py_trader_id(&self) -> TraderId {
68        self.trader_id()
69    }
70
71    /// Returns the node's instance ID.
72    #[getter]
73    #[pyo3(name = "instance_id")]
74    const fn py_instance_id(&self) -> UUID4 {
75        self.instance_id()
76    }
77
78    /// Returns whether the node is running.
79    #[getter]
80    #[pyo3(name = "is_running")]
81    const fn py_is_running(&self) -> bool {
82        self.is_running()
83    }
84
85    #[pyo3(name = "start")]
86    fn py_start(&mut self) -> PyResult<()> {
87        if self.is_running() {
88            return Err(PyRuntimeError::new_err("LiveNode is already running"));
89        }
90
91        // Non-blocking start - just start the node in the background
92        get_runtime().block_on(async {
93            self.start()
94                .await
95                .map_err(|e| PyRuntimeError::new_err(e.to_string()))
96        })
97    }
98
99    #[pyo3(name = "run")]
100    fn py_run(&mut self, py: Python) -> PyResult<()> {
101        if self.is_running() {
102            return Err(PyRuntimeError::new_err("LiveNode is already running"));
103        }
104
105        // Get a handle for coordinating with the signal checker
106        let handle = self.handle();
107
108        // Import signal module
109        let signal_module = py.import("signal")?;
110        let original_handler =
111            signal_module.call_method1("signal", (2, signal_module.getattr("SIG_DFL")?))?; // Save original SIGINT handler (signal 2)
112
113        // Set up a custom signal handler that uses our handle
114        let handle_for_signal = handle;
115        let signal_callback = pyo3::types::PyCFunction::new_closure(
116            py,
117            None,
118            None,
119            move |_args: &pyo3::Bound<'_, PyTuple>,
120                  _kwargs: Option<&pyo3::Bound<'_, PyDict>>|
121                  -> PyResult<()> {
122                log::info!("Python signal handler called");
123                handle_for_signal.stop();
124                Ok(())
125            },
126        )?;
127
128        // Install our signal handler
129        signal_module.call_method1("signal", (2, signal_callback))?;
130
131        // Run the node and restore signal handler afterward
132        let result = {
133            get_runtime().block_on(async {
134                self.run()
135                    .await
136                    .map_err(|e| PyRuntimeError::new_err(e.to_string()))
137            })
138        };
139
140        // Restore original signal handler
141        signal_module.call_method1("signal", (2, original_handler))?;
142
143        result
144    }
145
146    #[pyo3(name = "stop")]
147    fn py_stop(&self) -> PyResult<()> {
148        if !self.is_running() {
149            return Err(PyRuntimeError::new_err("LiveNode is not running"));
150        }
151
152        // Use the handle to signal stop - this is thread-safe and doesn't require async
153        self.handle().stop();
154        Ok(())
155    }
156
157    #[allow(
158        unsafe_code,
159        reason = "Required for Python actor component registration"
160    )]
161    #[pyo3(name = "add_actor_from_config")]
162    fn py_add_actor_from_config(
163        &mut self,
164        _py: Python,
165        config: ImportableActorConfig,
166    ) -> PyResult<()> {
167        log::debug!("`add_actor_from_config` with: {config:?}");
168
169        // Extract module and class name from actor_path
170        let parts: Vec<&str> = config.actor_path.split(':').collect();
171        if parts.len() != 2 {
172            return Err(PyValueError::new_err(
173                "actor_path must be in format 'module.path:ClassName'",
174            ));
175        }
176        let (module_name, class_name) = (parts[0], parts[1]);
177
178        log::info!("Importing actor from module: {module_name} class: {class_name}");
179
180        // Import the Python class to verify it exists and get it for method dispatch
181        let _python_class = Python::attach(|py| -> PyResult<Py<PyAny>> {
182            let actor_module = py.import(module_name)?;
183            let actor_class = actor_module.getattr(class_name)?;
184            Ok(actor_class.unbind())
185        })
186        .map_err(|e| PyRuntimeError::new_err(format!("Failed to import Python class: {e}")))?;
187
188        // Create default DataActorConfig for Rust PyDataActor
189        // Inherited config attributes will be extracted and wired in after Python actor creation
190        let basic_data_actor_config = DataActorConfig::default();
191
192        log::debug!("Created basic DataActorConfig for Rust: {basic_data_actor_config:?}");
193
194        // Create the Python actor and register the internal PyDataActor
195        let python_actor = Python::attach(|py| -> anyhow::Result<Py<PyAny>> {
196            // Import the Python class
197            let actor_module = py
198                .import(module_name)
199                .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
200            let actor_class = actor_module
201                .getattr(class_name)
202                .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
203
204            // Create config instance if config_path and config are provided
205            let config_instance = if !config.config_path.is_empty() && !config.config.is_empty() {
206                // Parse the config_path to get module and class
207                let config_parts: Vec<&str> = config.config_path.split(':').collect();
208                if config_parts.len() != 2 {
209                    anyhow::bail!(
210                        "config_path must be in format 'module.path:ClassName', was {}",
211                        config.config_path
212                    );
213                }
214                let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
215
216                log::debug!("Importing config class from module: {config_module_name} class: {config_class_name}");
217
218                // Import the config class
219                let config_module = py
220                    .import(config_module_name)
221                    .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
222                let config_class = config_module
223                    .getattr(config_class_name)
224                    .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
225
226                // Convert the serde_json::Value config dict to a Python dict
227                let py_dict = PyDict::new(py);
228                for (key, value) in &config.config {
229                    // Convert serde_json::Value back to Python object via JSON
230                    let json_str = serde_json::to_string(value)
231                        .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
232                    let py_value = PyModule::import(py, "json")?
233                        .call_method("loads", (json_str,), None)?;
234                    py_dict.set_item(key, py_value)?;
235                }
236
237                log::debug!("Created config dict: {py_dict:?}");
238
239                // Try multiple approaches to create the config instance
240                let config_instance = {
241                    // First, try calling the config class with **kwargs (this works if the dataclass handles string conversion)
242                    match config_class.call((), Some(&py_dict)) {
243                        Ok(instance) => {
244                            log::debug!("Successfully created config instance with kwargs");
245
246                            // Manually call __post_init__ if it exists
247                            if let Err(e) = instance.call_method0("__post_init__") {
248                                log::error!("Failed to call __post_init__ on config instance: {e}");
249                                anyhow::bail!("__post_init__ failed: {e}");
250                            }
251                            log::debug!("Successfully called __post_init__ on config instance");
252
253                            instance
254                        },
255                        Err(kwargs_err) => {
256                            log::debug!("Failed to create config with kwargs: {kwargs_err}");
257
258                            // Second approach: try to create with default constructor and set attributes
259                            match config_class.call0() {
260                                Ok(instance) => {
261                                    log::debug!("Created default config instance, setting attributes");
262                                    for (key, value) in &config.config {
263                                        // Convert serde_json::Value to Python object
264                                        let json_str = serde_json::to_string(value)
265                                            .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
266                                        let py_value = PyModule::import(py, "json")?
267                                            .call_method("loads", (json_str,), None)?;
268                                        if let Err(setattr_err) = instance.setattr(key, py_value) {
269                                            log::warn!("Failed to set attribute {key}: {setattr_err}");
270                                        }
271                                    }
272
273                                    // Manually call __post_init__ if it exists
274                                    if let Err(e) = instance.call_method0("__post_init__") {
275                                        log::error!("Failed to call __post_init__ on config instance: {e}");
276                                        anyhow::bail!("__post_init__ failed: {e}");
277                                    }
278                                    log::debug!("Called __post_init__ on config instance");
279
280                                    instance
281                                },
282                                Err(default_err) => {
283                                    log::debug!("Failed to create default config: {default_err}");
284
285                                    // If both approaches fail, return the original error
286                                    anyhow::bail!(
287                                        "Failed to create config instance. Tried kwargs approach: {kwargs_err}, default constructor: {default_err}"
288                                    );
289                                }
290                            }
291                        }
292                    }
293                };
294
295                log::debug!("Created config instance: {config_instance:?}");
296
297                Some(config_instance)
298            } else {
299                log::debug!("No config_path or empty config, using None");
300                None
301            };
302
303            // Create the Python actor instance with the config
304            let python_actor = if let Some(config_obj) = config_instance.clone() {
305                actor_class.call1((config_obj,))?
306            } else {
307                actor_class.call0()?
308            };
309
310            log::debug!("Created Python actor instance: {python_actor:?}");
311
312            // Get a mutable reference to the internal PyDataActor for registration
313            let mut py_data_actor_ref = python_actor.extract::<PyRefMut<PyDataActor>>()?;
314
315            log::debug!(
316                "Internal PyDataActor mem_addr: {}, registered: {}",
317                &py_data_actor_ref.mem_address(),
318                py_data_actor_ref.is_registered()
319            );
320
321            // Extract inherited DataActorConfig fields from the Python actor instance
322            // and wire them into the PyDataActor's core config
323            if let Some(config_obj) = config_instance.as_ref() {
324                log::debug!("Extracting inherited config fields from Python actor config");
325
326                // Extract actor_id if present
327                if let Ok(actor_id) = config_obj.getattr("actor_id")
328                    && !actor_id.is_none() {
329                        // Try to extract as ActorId first, then as string
330                        let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
331                            actor_id_val
332                        } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
333                            ActorId::from(actor_id_str.as_str())
334                        } else {
335                            log::warn!("Failed to extract actor_id as ActorId or String");
336                            anyhow::bail!("Invalid `actor_id` type");
337                        };
338
339                        log::debug!("Extracted actor_id: {actor_id_val}");
340                        py_data_actor_ref.set_actor_id(actor_id_val);
341                    }
342
343                // Extract log_events if present
344                if let Ok(log_events) = config_obj.getattr("log_events")
345                    && let Ok(log_events_val) = log_events.extract::<bool>() {
346                        log::debug!("Extracted log_events: {log_events_val}");
347                        py_data_actor_ref.set_log_events(log_events_val);
348                    }
349
350                // Extract log_commands if present
351                if let Ok(log_commands) = config_obj.getattr("log_commands")
352                    && let Ok(log_commands_val) = log_commands.extract::<bool>() {
353                        log::debug!("Extracted log_commands: {log_commands_val}");
354                        py_data_actor_ref.set_log_commands(log_commands_val);
355                    }
356
357                log::debug!("Successfully updated PyDataActor config from Python actor instance");
358            }
359
360            // Set the Python instance reference for method dispatch on the original
361            py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
362
363            log::debug!("Set Python instance reference for method dispatch");
364
365            // Register the internal PyDataActor
366            let trader_id = self.trader_id();
367            let clock = self.kernel().clock();
368            let cache = self.kernel().cache();
369
370            py_data_actor_ref
371                .register(trader_id, clock, cache)
372                .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
373
374            log::debug!(
375                "Internal PyDataActor registered: {}, state: {:?}",
376                py_data_actor_ref.is_registered(),
377                py_data_actor_ref.state()
378            );
379
380            Ok(python_actor.unbind())
381        })
382        .map_err(to_pyruntime_err)?;
383
384        // Add the actor to the trader's lifecycle management without consuming it
385        let actor_id = Python::attach(
386            |py| -> anyhow::Result<nautilus_model::identifiers::ActorId> {
387                let py_actor = python_actor.bind(py);
388                let py_data_actor_ref = py_actor
389                    .downcast::<PyDataActor>()
390                    .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
391                let py_data_actor = py_data_actor_ref.borrow();
392
393                // Register the component in the global registry using the unsafe method
394                // SAFETY: The Python instance will remain alive, keeping the PyDataActor valid
395                unsafe {
396                    register_component_actor_by_ref(&*py_data_actor);
397                }
398
399                Ok(py_data_actor.actor_id())
400            },
401        )
402        .map_err(to_pyruntime_err)?;
403
404        // TODO: Add the actor ID to the trader for lifecycle management; clean up approach
405        self.kernel_mut()
406            .trader
407            .add_actor_id_for_lifecycle(actor_id)
408            .map_err(to_pyruntime_err)?;
409
410        // Store the Python actor reference to prevent garbage collection
411        // TODO: Add to a proper LiveNode registry for Python actors
412        std::mem::forget(python_actor); // Prevent dropping - we'll manage lifecycle manually
413
414        log::info!("Registered Python actor {actor_id}");
415        Ok(())
416    }
417
418    /// Returns a string representation of the node.
419    fn __repr__(&self) -> String {
420        format!(
421            "LiveNode(trader_id={}, environment={:?}, running={})",
422            self.trader_id(),
423            self.environment(),
424            self.is_running()
425        )
426    }
427}
428
429/// Python wrapper for `LiveNodeBuilder` that uses interior mutability
430/// to work around PyO3's shared ownership model.
431#[derive(Debug)]
432#[pyclass(name = "LiveNodeBuilder", module = "nautilus_trader.live", unsendable)]
433pub struct LiveNodeBuilderPy {
434    inner: Rc<RefCell<Option<LiveNodeBuilder>>>,
435}
436
437#[pymethods]
438impl LiveNodeBuilderPy {
439    /// Sets the instance ID for the node.
440    #[pyo3(name = "with_instance_id")]
441    fn py_with_instance_id(&self, instance_id: UUID4) -> PyResult<Self> {
442        let mut inner_ref = self.inner.borrow_mut();
443        if let Some(builder) = inner_ref.take() {
444            *inner_ref = Some(builder.with_instance_id(instance_id));
445            Ok(Self {
446                inner: self.inner.clone(),
447            })
448        } else {
449            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
450                "Builder already consumed",
451            ))
452        }
453    }
454
455    /// Sets whether to load state on startup.
456    #[pyo3(name = "with_load_state")]
457    fn py_with_load_state(&self, load_state: bool) -> PyResult<Self> {
458        let mut inner_ref = self.inner.borrow_mut();
459        if let Some(builder) = inner_ref.take() {
460            *inner_ref = Some(builder.with_load_state(load_state));
461            Ok(Self {
462                inner: self.inner.clone(),
463            })
464        } else {
465            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
466                "Builder already consumed",
467            ))
468        }
469    }
470
471    /// Sets whether to save state on shutdown.
472    #[pyo3(name = "with_save_state")]
473    fn py_with_save_state(&self, save_state: bool) -> PyResult<Self> {
474        let mut inner_ref = self.inner.borrow_mut();
475        if let Some(builder) = inner_ref.take() {
476            *inner_ref = Some(builder.with_save_state(save_state));
477            Ok(Self {
478                inner: self.inner.clone(),
479            })
480        } else {
481            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
482                "Builder already consumed",
483            ))
484        }
485    }
486
487    /// Adds a data client with factory and configuration.
488    #[pyo3(name = "add_data_client")]
489    fn py_add_data_client(
490        &self,
491        name: Option<String>,
492        factory: Py<PyAny>,
493        config: Py<PyAny>,
494    ) -> PyResult<Self> {
495        let mut inner_ref = self.inner.borrow_mut();
496        if let Some(builder) = inner_ref.take() {
497            Python::attach(|py| -> PyResult<Self> {
498                // Use the global registry to extract Py<PyAny>s to trait objects
499                let registry = get_global_pyo3_registry();
500
501                let boxed_factory = registry.extract_factory(py, factory.clone_ref(py))?;
502                let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
503
504                // Use the factory name from the original factory for the client name
505                let factory_name = factory
506                    .getattr(py, "name")?
507                    .call0(py)?
508                    .extract::<String>(py)?;
509                let client_name = name.unwrap_or(factory_name);
510
511                // Add the data client to the builder using boxed trait objects
512                match builder.add_data_client(Some(client_name), boxed_factory, boxed_config) {
513                    Ok(updated_builder) => {
514                        *inner_ref = Some(updated_builder);
515                        Ok(Self {
516                            inner: self.inner.clone(),
517                        })
518                    }
519                    Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
520                        "Failed to add data client: {e}"
521                    ))),
522                }
523            })
524        } else {
525            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
526                "Builder already consumed",
527            ))
528        }
529    }
530
531    /// Builds the node.
532    #[pyo3(name = "build")]
533    fn py_build(&self) -> PyResult<LiveNode> {
534        let mut inner_ref = self.inner.borrow_mut();
535        if let Some(builder) = inner_ref.take() {
536            match builder.build() {
537                Ok(node) => Ok(node),
538                Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
539                    e.to_string(),
540                )),
541            }
542        } else {
543            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
544                "Builder already consumed",
545            ))
546        }
547    }
548
549    /// Returns a string representation of the builder.
550    fn __repr__(&self) -> String {
551        format!("{self:?}")
552    }
553}