nautilus_live/python/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for live node.
17
18use std::{cell::RefCell, rc::Rc};
19
20use nautilus_common::{
21    actor::data_actor::{DataActorConfig, ImportableActorConfig},
22    enums::Environment,
23    live::runtime::get_runtime,
24    python::actor::PyDataActor,
25};
26use nautilus_core::{UUID4, python::to_pyruntime_err};
27use nautilus_model::identifiers::{ActorId, TraderId};
28use nautilus_system::get_global_pyo3_registry;
29use pyo3::{
30    exceptions::{PyRuntimeError, PyValueError},
31    prelude::*,
32    types::{PyDict, PyTuple},
33};
34use serde_json;
35
36use crate::node::{LiveNode, LiveNodeBuilder};
37
38#[pymethods]
39impl LiveNode {
40    #[staticmethod]
41    #[pyo3(name = "builder")]
42    fn py_builder(
43        name: String,
44        trader_id: TraderId,
45        environment: Environment,
46    ) -> PyResult<LiveNodeBuilderPy> {
47        match Self::builder(trader_id, environment) {
48            Ok(builder) => Ok(LiveNodeBuilderPy {
49                inner: Rc::new(RefCell::new(Some(builder.with_name(name)))),
50            }),
51            Err(e) => Err(PyErr::new::<PyRuntimeError, _>(e.to_string())),
52        }
53    }
54
55    #[getter]
56    #[pyo3(name = "environment")]
57    fn py_environment(&self) -> Environment {
58        self.environment()
59    }
60
61    #[getter]
62    #[pyo3(name = "trader_id")]
63    fn py_trader_id(&self) -> TraderId {
64        self.trader_id()
65    }
66
67    #[getter]
68    #[pyo3(name = "instance_id")]
69    const fn py_instance_id(&self) -> UUID4 {
70        self.instance_id()
71    }
72
73    #[getter]
74    #[pyo3(name = "is_running")]
75    const fn py_is_running(&self) -> bool {
76        self.is_running()
77    }
78
79    #[pyo3(name = "start")]
80    fn py_start(&mut self) -> PyResult<()> {
81        if self.is_running() {
82            return Err(PyRuntimeError::new_err("LiveNode is already running"));
83        }
84
85        // Non-blocking start - just start the node in the background
86        get_runtime().block_on(async {
87            self.start()
88                .await
89                .map_err(|e| PyRuntimeError::new_err(e.to_string()))
90        })
91    }
92
93    #[pyo3(name = "run")]
94    fn py_run(&mut self, py: Python) -> PyResult<()> {
95        if self.is_running() {
96            return Err(PyRuntimeError::new_err("LiveNode is already running"));
97        }
98
99        // Get a handle for coordinating with the signal checker
100        let handle = self.handle();
101
102        // Import signal module
103        let signal_module = py.import("signal")?;
104        let original_handler =
105            signal_module.call_method1("signal", (2, signal_module.getattr("SIG_DFL")?))?; // Save original SIGINT handler (signal 2)
106
107        // Set up a custom signal handler that uses our handle
108        let handle_for_signal = handle;
109        let signal_callback = pyo3::types::PyCFunction::new_closure(
110            py,
111            None,
112            None,
113            move |_args: &pyo3::Bound<'_, PyTuple>,
114                  _kwargs: Option<&pyo3::Bound<'_, PyDict>>|
115                  -> PyResult<()> {
116                log::info!("Python signal handler called");
117                handle_for_signal.stop();
118                Ok(())
119            },
120        )?;
121
122        // Install our signal handler
123        signal_module.call_method1("signal", (2, signal_callback))?;
124
125        // Run the node and restore signal handler afterward
126        let result = {
127            get_runtime().block_on(async {
128                self.run()
129                    .await
130                    .map_err(|e| PyRuntimeError::new_err(e.to_string()))
131            })
132        };
133
134        // Restore original signal handler
135        signal_module.call_method1("signal", (2, original_handler))?;
136
137        result
138    }
139
140    #[pyo3(name = "stop")]
141    fn py_stop(&self) -> PyResult<()> {
142        if !self.is_running() {
143            return Err(PyRuntimeError::new_err("LiveNode is not running"));
144        }
145
146        // Use the handle to signal stop - this is thread-safe and doesn't require async
147        self.handle().stop();
148        Ok(())
149    }
150
151    #[allow(
152        unsafe_code,
153        reason = "Required for Python actor component registration"
154    )]
155    #[pyo3(name = "add_actor_from_config")]
156    fn py_add_actor_from_config(
157        &mut self,
158        _py: Python,
159        config: ImportableActorConfig,
160    ) -> PyResult<()> {
161        log::debug!("`add_actor_from_config` with: {config:?}");
162
163        // Extract module and class name from actor_path
164        let parts: Vec<&str> = config.actor_path.split(':').collect();
165        if parts.len() != 2 {
166            return Err(PyValueError::new_err(
167                "actor_path must be in format 'module.path:ClassName'",
168            ));
169        }
170        let (module_name, class_name) = (parts[0], parts[1]);
171
172        log::info!("Importing actor from module: {module_name} class: {class_name}");
173
174        // Import the Python class to verify it exists and get it for method dispatch
175        let _python_class = Python::attach(|py| -> PyResult<Py<PyAny>> {
176            let actor_module = py.import(module_name)?;
177            let actor_class = actor_module.getattr(class_name)?;
178            Ok(actor_class.unbind())
179        })
180        .map_err(|e| PyRuntimeError::new_err(format!("Failed to import Python class: {e}")))?;
181
182        // Create default DataActorConfig for Rust PyDataActor
183        // Inherited config attributes will be extracted and wired in after Python actor creation
184        let basic_data_actor_config = DataActorConfig::default();
185
186        log::debug!("Created basic DataActorConfig for Rust: {basic_data_actor_config:?}");
187
188        // Create the Python actor and register the internal PyDataActor
189        let python_actor = Python::attach(|py| -> anyhow::Result<Py<PyAny>> {
190            // Import the Python class
191            let actor_module = py
192                .import(module_name)
193                .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
194            let actor_class = actor_module
195                .getattr(class_name)
196                .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
197
198            // Create config instance if config_path and config are provided
199            let config_instance = if !config.config_path.is_empty() && !config.config.is_empty() {
200                // Parse the config_path to get module and class
201                let config_parts: Vec<&str> = config.config_path.split(':').collect();
202                if config_parts.len() != 2 {
203                    anyhow::bail!(
204                        "config_path must be in format 'module.path:ClassName', was {}",
205                        config.config_path
206                    );
207                }
208                let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
209
210                log::debug!("Importing config class from module: {config_module_name} class: {config_class_name}");
211
212                // Import the config class
213                let config_module = py
214                    .import(config_module_name)
215                    .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
216                let config_class = config_module
217                    .getattr(config_class_name)
218                    .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
219
220                // Convert the serde_json::Value config dict to a Python dict
221                let py_dict = PyDict::new(py);
222                for (key, value) in &config.config {
223                    // Convert serde_json::Value back to Python object via JSON
224                    let json_str = serde_json::to_string(value)
225                        .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
226                    let py_value = PyModule::import(py, "json")?
227                        .call_method("loads", (json_str,), None)?;
228                    py_dict.set_item(key, py_value)?;
229                }
230
231                log::debug!("Created config dict: {py_dict:?}");
232
233                // Try multiple approaches to create the config instance
234                let config_instance = {
235                    // First, try calling the config class with **kwargs (this works if the dataclass handles string conversion)
236                    match config_class.call((), Some(&py_dict)) {
237                        Ok(instance) => {
238                            log::debug!("Successfully created config instance with kwargs");
239
240                            // Manually call __post_init__ if it exists
241                            if let Err(e) = instance.call_method0("__post_init__") {
242                                log::error!("Failed to call __post_init__ on config instance: {e}");
243                                anyhow::bail!("__post_init__ failed: {e}");
244                            }
245                            log::debug!("Successfully called __post_init__ on config instance");
246
247                            instance
248                        },
249                        Err(kwargs_err) => {
250                            log::debug!("Failed to create config with kwargs: {kwargs_err}");
251
252                            // Second approach: try to create with default constructor and set attributes
253                            match config_class.call0() {
254                                Ok(instance) => {
255                                    log::debug!("Created default config instance, setting attributes");
256                                    for (key, value) in &config.config {
257                                        // Convert serde_json::Value to Python object
258                                        let json_str = serde_json::to_string(value)
259                                            .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
260                                        let py_value = PyModule::import(py, "json")?
261                                            .call_method("loads", (json_str,), None)?;
262                                        if let Err(setattr_err) = instance.setattr(key, py_value) {
263                                            log::warn!("Failed to set attribute {key}: {setattr_err}");
264                                        }
265                                    }
266
267                                    // Manually call __post_init__ if it exists
268                                    if let Err(e) = instance.call_method0("__post_init__") {
269                                        log::error!("Failed to call __post_init__ on config instance: {e}");
270                                        anyhow::bail!("__post_init__ failed: {e}");
271                                    }
272                                    log::debug!("Called __post_init__ on config instance");
273
274                                    instance
275                                },
276                                Err(default_err) => {
277                                    log::debug!("Failed to create default config: {default_err}");
278
279                                    // If both approaches fail, return the original error
280                                    anyhow::bail!(
281                                        "Failed to create config instance. Tried kwargs approach: {kwargs_err}, default constructor: {default_err}"
282                                    );
283                                }
284                            }
285                        }
286                    }
287                };
288
289                log::debug!("Created config instance: {config_instance:?}");
290
291                Some(config_instance)
292            } else {
293                log::debug!("No config_path or empty config, using None");
294                None
295            };
296
297            // Create the Python actor instance with the config
298            let python_actor = if let Some(config_obj) = config_instance.clone() {
299                actor_class.call1((config_obj,))?
300            } else {
301                actor_class.call0()?
302            };
303
304            log::debug!("Created Python actor instance: {python_actor:?}");
305
306            // Get a mutable reference to the internal PyDataActor for registration
307            let mut py_data_actor_ref = python_actor
308                .extract::<PyRefMut<PyDataActor>>()
309                .map_err(Into::<PyErr>::into)
310                .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
311
312            log::debug!(
313                "Internal PyDataActor mem_addr: {}, registered: {}",
314                &py_data_actor_ref.mem_address(),
315                py_data_actor_ref.is_registered()
316            );
317
318            // Extract inherited DataActorConfig fields from the Python actor instance
319            // and wire them into the PyDataActor's core config
320            if let Some(config_obj) = config_instance.as_ref() {
321                log::debug!("Extracting inherited config fields from Python actor config");
322
323                // Extract actor_id if present
324                if let Ok(actor_id) = config_obj.getattr("actor_id")
325                    && !actor_id.is_none() {
326                        // Try to extract as ActorId first, then as string
327                        let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
328                            actor_id_val
329                        } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
330                            ActorId::from(actor_id_str.as_str())
331                        } else {
332                            log::warn!("Failed to extract actor_id as ActorId or String");
333                            anyhow::bail!("Invalid `actor_id` type");
334                        };
335
336                        log::debug!("Extracted actor_id: {actor_id_val}");
337                        py_data_actor_ref.set_actor_id(actor_id_val);
338                    }
339
340                // Extract log_events if present
341                if let Ok(log_events) = config_obj.getattr("log_events")
342                    && let Ok(log_events_val) = log_events.extract::<bool>() {
343                        log::debug!("Extracted log_events: {log_events_val}");
344                        py_data_actor_ref.set_log_events(log_events_val);
345                    }
346
347                // Extract log_commands if present
348                if let Ok(log_commands) = config_obj.getattr("log_commands")
349                    && let Ok(log_commands_val) = log_commands.extract::<bool>() {
350                        log::debug!("Extracted log_commands: {log_commands_val}");
351                        py_data_actor_ref.set_log_commands(log_commands_val);
352                    }
353
354                log::debug!("Successfully updated PyDataActor config from Python actor instance");
355            }
356
357            // Set the Python instance reference for method dispatch on the original
358            py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
359
360            log::debug!("Set Python instance reference for method dispatch");
361
362            // Register the internal PyDataActor
363            let trader_id = self.trader_id();
364            let clock = self.kernel().clock();
365            let cache = self.kernel().cache();
366
367            py_data_actor_ref
368                .register(trader_id, clock, cache)
369                .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
370
371            log::debug!(
372                "Internal PyDataActor registered: {}, state: {:?}",
373                py_data_actor_ref.is_registered(),
374                py_data_actor_ref.state()
375            );
376
377            Ok(python_actor.unbind())
378        })
379        .map_err(to_pyruntime_err)?;
380
381        let actor_id = Python::attach(|py| -> anyhow::Result<ActorId> {
382            let py_actor = python_actor.bind(py);
383            let py_data_actor_ref = py_actor
384                .cast::<PyDataActor>()
385                .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
386            let py_data_actor = py_data_actor_ref.borrow();
387            py_data_actor.register_in_global_registries();
388
389            Ok(py_data_actor.actor_id())
390        })
391        .map_err(to_pyruntime_err)?;
392
393        self.kernel_mut()
394            .trader
395            .add_actor_id_for_lifecycle(actor_id)
396            .map_err(to_pyruntime_err)?;
397
398        // Note: No mem::forget needed - the actor's py_self field holds a Py<PyAny>
399        // that keeps the Python instance alive, and registries share the inner via Rc::clone()
400
401        log::info!("Registered Python actor {actor_id}");
402        Ok(())
403    }
404
405    /// Returns a string representation of the node.
406    fn __repr__(&self) -> String {
407        format!(
408            "LiveNode(trader_id={}, environment={:?}, running={})",
409            self.trader_id(),
410            self.environment(),
411            self.is_running()
412        )
413    }
414}
415
416/// Python wrapper for `LiveNodeBuilder` that uses interior mutability
417/// to work around PyO3's shared ownership model.
418#[derive(Debug)]
419#[pyclass(name = "LiveNodeBuilder", module = "nautilus_trader.live", unsendable)]
420pub struct LiveNodeBuilderPy {
421    inner: Rc<RefCell<Option<LiveNodeBuilder>>>,
422}
423
424#[pymethods]
425impl LiveNodeBuilderPy {
426    /// Sets the instance ID for the node.
427    #[pyo3(name = "with_instance_id")]
428    fn py_with_instance_id(&self, instance_id: UUID4) -> PyResult<Self> {
429        let mut inner_ref = self.inner.borrow_mut();
430        if let Some(builder) = inner_ref.take() {
431            *inner_ref = Some(builder.with_instance_id(instance_id));
432            Ok(Self {
433                inner: self.inner.clone(),
434            })
435        } else {
436            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
437                "Builder already consumed",
438            ))
439        }
440    }
441
442    /// Sets whether to load state on startup.
443    #[pyo3(name = "with_load_state")]
444    fn py_with_load_state(&self, load_state: bool) -> PyResult<Self> {
445        let mut inner_ref = self.inner.borrow_mut();
446        if let Some(builder) = inner_ref.take() {
447            *inner_ref = Some(builder.with_load_state(load_state));
448            Ok(Self {
449                inner: self.inner.clone(),
450            })
451        } else {
452            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
453                "Builder already consumed",
454            ))
455        }
456    }
457
458    /// Sets whether to save state on shutdown.
459    #[pyo3(name = "with_save_state")]
460    fn py_with_save_state(&self, save_state: bool) -> PyResult<Self> {
461        let mut inner_ref = self.inner.borrow_mut();
462        if let Some(builder) = inner_ref.take() {
463            *inner_ref = Some(builder.with_save_state(save_state));
464            Ok(Self {
465                inner: self.inner.clone(),
466            })
467        } else {
468            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
469                "Builder already consumed",
470            ))
471        }
472    }
473
474    /// Adds a data client with factory and configuration.
475    #[pyo3(name = "add_data_client")]
476    fn py_add_data_client(
477        &self,
478        name: Option<String>,
479        factory: Py<PyAny>,
480        config: Py<PyAny>,
481    ) -> PyResult<Self> {
482        let mut inner_ref = self.inner.borrow_mut();
483        if let Some(builder) = inner_ref.take() {
484            Python::attach(|py| -> PyResult<Self> {
485                // Use the global registry to extract Py<PyAny>s to trait objects
486                let registry = get_global_pyo3_registry();
487
488                let boxed_factory = registry.extract_factory(py, factory.clone_ref(py))?;
489                let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
490
491                // Use the factory name from the original factory for the client name
492                let factory_name = factory
493                    .getattr(py, "name")?
494                    .call0(py)?
495                    .extract::<String>(py)?;
496                let client_name = name.unwrap_or(factory_name);
497
498                // Add the data client to the builder using boxed trait objects
499                match builder.add_data_client(Some(client_name), boxed_factory, boxed_config) {
500                    Ok(updated_builder) => {
501                        *inner_ref = Some(updated_builder);
502                        Ok(Self {
503                            inner: self.inner.clone(),
504                        })
505                    }
506                    Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
507                        "Failed to add data client: {e}"
508                    ))),
509                }
510            })
511        } else {
512            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
513                "Builder already consumed",
514            ))
515        }
516    }
517
518    /// Builds the node.
519    #[pyo3(name = "build")]
520    fn py_build(&self) -> PyResult<LiveNode> {
521        let mut inner_ref = self.inner.borrow_mut();
522        if let Some(builder) = inner_ref.take() {
523            match builder.build() {
524                Ok(node) => Ok(node),
525                Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
526                    e.to_string(),
527                )),
528            }
529        } else {
530            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
531                "Builder already consumed",
532            ))
533        }
534    }
535
536    /// Returns a string representation of the builder.
537    fn __repr__(&self) -> String {
538        format!("{self:?}")
539    }
540}