nautilus_live/python/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for live node.
17
18use std::{cell::RefCell, rc::Rc};
19
20use nautilus_common::{
21    actor::data_actor::{DataActorConfig, ImportableActorConfig},
22    component::{Component, register_component_actor_by_ref},
23    enums::Environment,
24    python::actor::PyDataActor,
25    runtime::get_runtime,
26};
27use nautilus_core::{UUID4, python::to_pyruntime_err};
28use nautilus_model::identifiers::{ActorId, TraderId};
29use nautilus_system::get_global_pyo3_registry;
30use pyo3::{
31    exceptions::{PyRuntimeError, PyValueError},
32    prelude::*,
33    types::{PyDict, PyTuple},
34};
35use serde_json;
36
37use crate::node::{LiveNode, LiveNodeBuilder};
38
39#[pymethods]
40impl LiveNode {
41    /// Creates a new `LiveNode` builder.
42    #[staticmethod]
43    #[pyo3(name = "builder")]
44    fn py_builder(
45        name: String,
46        trader_id: TraderId,
47        environment: Environment,
48    ) -> PyResult<LiveNodeBuilderPy> {
49        match Self::builder(name, trader_id, environment) {
50            Ok(builder) => Ok(LiveNodeBuilderPy {
51                inner: Rc::new(RefCell::new(Some(builder))),
52            }),
53            Err(e) => Err(PyErr::new::<PyRuntimeError, _>(e.to_string())),
54        }
55    }
56
57    /// Returns the node's environment.
58    #[getter]
59    #[pyo3(name = "environment")]
60    fn py_environment(&self) -> Environment {
61        self.environment()
62    }
63
64    /// Returns the node's trader ID.
65    #[getter]
66    #[pyo3(name = "trader_id")]
67    fn py_trader_id(&self) -> TraderId {
68        self.trader_id()
69    }
70
71    /// Returns the node's instance ID.
72    #[getter]
73    #[pyo3(name = "instance_id")]
74    const fn py_instance_id(&self) -> UUID4 {
75        self.instance_id()
76    }
77
78    /// Returns whether the node is running.
79    #[getter]
80    #[pyo3(name = "is_running")]
81    const fn py_is_running(&self) -> bool {
82        self.is_running()
83    }
84
85    #[pyo3(name = "start")]
86    fn py_start(&mut self) -> PyResult<()> {
87        if self.is_running() {
88            return Err(PyRuntimeError::new_err("LiveNode is already running"));
89        }
90
91        // Non-blocking start - just start the node in the background
92        get_runtime().block_on(async {
93            self.start()
94                .await
95                .map_err(|e| PyRuntimeError::new_err(e.to_string()))
96        })
97    }
98
99    #[pyo3(name = "run")]
100    fn py_run(&mut self, py: Python) -> PyResult<()> {
101        if self.is_running() {
102            return Err(PyRuntimeError::new_err("LiveNode is already running"));
103        }
104
105        // Get a handle for coordinating with the signal checker
106        let handle = self.handle();
107
108        // Import signal module
109        let signal_module = py.import("signal")?;
110        let original_handler =
111            signal_module.call_method1("signal", (2, signal_module.getattr("SIG_DFL")?))?; // Save original SIGINT handler (signal 2)
112
113        // Set up a custom signal handler that uses our handle
114        let handle_for_signal = handle;
115        let signal_callback = pyo3::types::PyCFunction::new_closure(
116            py,
117            None,
118            None,
119            move |_args: &pyo3::Bound<'_, PyTuple>,
120                  _kwargs: Option<&pyo3::Bound<'_, PyDict>>|
121                  -> PyResult<()> {
122                log::info!("Python signal handler called");
123                handle_for_signal.stop();
124                Ok(())
125            },
126        )?;
127
128        // Install our signal handler
129        signal_module.call_method1("signal", (2, signal_callback))?;
130
131        // Run the node and restore signal handler afterward
132        let result = {
133            get_runtime().block_on(async {
134                self.run()
135                    .await
136                    .map_err(|e| PyRuntimeError::new_err(e.to_string()))
137            })
138        };
139
140        // Restore original signal handler
141        signal_module.call_method1("signal", (2, original_handler))?;
142
143        result
144    }
145
146    #[pyo3(name = "stop")]
147    fn py_stop(&self) -> PyResult<()> {
148        if !self.is_running() {
149            return Err(PyRuntimeError::new_err("LiveNode is not running"));
150        }
151
152        // Use the handle to signal stop - this is thread-safe and doesn't require async
153        self.handle().stop();
154        Ok(())
155    }
156
157    #[allow(
158        unsafe_code,
159        reason = "Required for Python actor component registration"
160    )]
161    #[pyo3(name = "add_actor_from_config")]
162    fn py_add_actor_from_config(
163        &mut self,
164        _py: Python,
165        config: ImportableActorConfig,
166    ) -> PyResult<()> {
167        log::debug!("`add_actor_from_config` with: {config:?}");
168
169        // Extract module and class name from actor_path
170        let parts: Vec<&str> = config.actor_path.split(':').collect();
171        if parts.len() != 2 {
172            return Err(PyValueError::new_err(
173                "actor_path must be in format 'module.path:ClassName'",
174            ));
175        }
176        let (module_name, class_name) = (parts[0], parts[1]);
177
178        log::info!("Importing actor from module: {module_name} class: {class_name}");
179
180        // Import the Python class to verify it exists and get it for method dispatch
181        let _python_class = Python::attach(|py| -> PyResult<Py<PyAny>> {
182            let actor_module = py.import(module_name)?;
183            let actor_class = actor_module.getattr(class_name)?;
184            Ok(actor_class.unbind())
185        })
186        .map_err(|e| PyRuntimeError::new_err(format!("Failed to import Python class: {e}")))?;
187
188        // Create default DataActorConfig for Rust PyDataActor
189        // Inherited config attributes will be extracted and wired in after Python actor creation
190        let basic_data_actor_config = DataActorConfig::default();
191
192        log::debug!("Created basic DataActorConfig for Rust: {basic_data_actor_config:?}");
193
194        // Create the Python actor and register the internal PyDataActor
195        let python_actor = Python::attach(|py| -> anyhow::Result<Py<PyAny>> {
196            // Import the Python class
197            let actor_module = py
198                .import(module_name)
199                .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
200            let actor_class = actor_module
201                .getattr(class_name)
202                .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
203
204            // Create config instance if config_path and config are provided
205            let config_instance = if !config.config_path.is_empty() && !config.config.is_empty() {
206                // Parse the config_path to get module and class
207                let config_parts: Vec<&str> = config.config_path.split(':').collect();
208                if config_parts.len() != 2 {
209                    anyhow::bail!(
210                        "config_path must be in format 'module.path:ClassName', was {}",
211                        config.config_path
212                    );
213                }
214                let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
215
216                log::debug!("Importing config class from module: {config_module_name} class: {config_class_name}");
217
218                // Import the config class
219                let config_module = py
220                    .import(config_module_name)
221                    .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
222                let config_class = config_module
223                    .getattr(config_class_name)
224                    .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
225
226                // Convert the serde_json::Value config dict to a Python dict
227                let py_dict = PyDict::new(py);
228                for (key, value) in &config.config {
229                    // Convert serde_json::Value back to Python object via JSON
230                    let json_str = serde_json::to_string(value)
231                        .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
232                    let py_value = PyModule::import(py, "json")?
233                        .call_method("loads", (json_str,), None)?;
234                    py_dict.set_item(key, py_value)?;
235                }
236
237                log::debug!("Created config dict: {py_dict:?}");
238
239                // Try multiple approaches to create the config instance
240                let config_instance = {
241                    // First, try calling the config class with **kwargs (this works if the dataclass handles string conversion)
242                    match config_class.call((), Some(&py_dict)) {
243                        Ok(instance) => {
244                            log::debug!("Successfully created config instance with kwargs");
245
246                            // Manually call __post_init__ if it exists
247                            if let Err(e) = instance.call_method0("__post_init__") {
248                                log::error!("Failed to call __post_init__ on config instance: {e}");
249                                anyhow::bail!("__post_init__ failed: {e}");
250                            }
251                            log::debug!("Successfully called __post_init__ on config instance");
252
253                            instance
254                        },
255                        Err(kwargs_err) => {
256                            log::debug!("Failed to create config with kwargs: {kwargs_err}");
257
258                            // Second approach: try to create with default constructor and set attributes
259                            match config_class.call0() {
260                                Ok(instance) => {
261                                    log::debug!("Created default config instance, setting attributes");
262                                    for (key, value) in &config.config {
263                                        // Convert serde_json::Value to Python object
264                                        let json_str = serde_json::to_string(value)
265                                            .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
266                                        let py_value = PyModule::import(py, "json")?
267                                            .call_method("loads", (json_str,), None)?;
268                                        if let Err(setattr_err) = instance.setattr(key, py_value) {
269                                            log::warn!("Failed to set attribute {key}: {setattr_err}");
270                                        }
271                                    }
272
273                                    // Manually call __post_init__ if it exists
274                                    if let Err(e) = instance.call_method0("__post_init__") {
275                                        log::error!("Failed to call __post_init__ on config instance: {e}");
276                                        anyhow::bail!("__post_init__ failed: {e}");
277                                    }
278                                    log::debug!("Called __post_init__ on config instance");
279
280                                    instance
281                                },
282                                Err(default_err) => {
283                                    log::debug!("Failed to create default config: {default_err}");
284
285                                    // If both approaches fail, return the original error
286                                    anyhow::bail!(
287                                        "Failed to create config instance. Tried kwargs approach: {kwargs_err}, default constructor: {default_err}"
288                                    );
289                                }
290                            }
291                        }
292                    }
293                };
294
295                log::debug!("Created config instance: {config_instance:?}");
296
297                Some(config_instance)
298            } else {
299                log::debug!("No config_path or empty config, using None");
300                None
301            };
302
303            // Create the Python actor instance with the config
304            let python_actor = if let Some(config_obj) = config_instance.clone() {
305                actor_class.call1((config_obj,))?
306            } else {
307                actor_class.call0()?
308            };
309
310            log::debug!("Created Python actor instance: {python_actor:?}");
311
312            // Get a mutable reference to the internal PyDataActor for registration
313            let mut py_data_actor_ref = python_actor
314                .extract::<PyRefMut<PyDataActor>>()
315                .map_err(Into::<PyErr>::into)
316                .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
317
318            log::debug!(
319                "Internal PyDataActor mem_addr: {}, registered: {}",
320                &py_data_actor_ref.mem_address(),
321                py_data_actor_ref.is_registered()
322            );
323
324            // Extract inherited DataActorConfig fields from the Python actor instance
325            // and wire them into the PyDataActor's core config
326            if let Some(config_obj) = config_instance.as_ref() {
327                log::debug!("Extracting inherited config fields from Python actor config");
328
329                // Extract actor_id if present
330                if let Ok(actor_id) = config_obj.getattr("actor_id")
331                    && !actor_id.is_none() {
332                        // Try to extract as ActorId first, then as string
333                        let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
334                            actor_id_val
335                        } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
336                            ActorId::from(actor_id_str.as_str())
337                        } else {
338                            log::warn!("Failed to extract actor_id as ActorId or String");
339                            anyhow::bail!("Invalid `actor_id` type");
340                        };
341
342                        log::debug!("Extracted actor_id: {actor_id_val}");
343                        py_data_actor_ref.set_actor_id(actor_id_val);
344                    }
345
346                // Extract log_events if present
347                if let Ok(log_events) = config_obj.getattr("log_events")
348                    && let Ok(log_events_val) = log_events.extract::<bool>() {
349                        log::debug!("Extracted log_events: {log_events_val}");
350                        py_data_actor_ref.set_log_events(log_events_val);
351                    }
352
353                // Extract log_commands if present
354                if let Ok(log_commands) = config_obj.getattr("log_commands")
355                    && let Ok(log_commands_val) = log_commands.extract::<bool>() {
356                        log::debug!("Extracted log_commands: {log_commands_val}");
357                        py_data_actor_ref.set_log_commands(log_commands_val);
358                    }
359
360                log::debug!("Successfully updated PyDataActor config from Python actor instance");
361            }
362
363            // Set the Python instance reference for method dispatch on the original
364            py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
365
366            log::debug!("Set Python instance reference for method dispatch");
367
368            // Register the internal PyDataActor
369            let trader_id = self.trader_id();
370            let clock = self.kernel().clock();
371            let cache = self.kernel().cache();
372
373            py_data_actor_ref
374                .register(trader_id, clock, cache)
375                .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
376
377            log::debug!(
378                "Internal PyDataActor registered: {}, state: {:?}",
379                py_data_actor_ref.is_registered(),
380                py_data_actor_ref.state()
381            );
382
383            Ok(python_actor.unbind())
384        })
385        .map_err(to_pyruntime_err)?;
386
387        // Add the actor to the trader's lifecycle management without consuming it
388        let actor_id = Python::attach(
389            |py| -> anyhow::Result<nautilus_model::identifiers::ActorId> {
390                let py_actor = python_actor.bind(py);
391                let py_data_actor_ref = py_actor
392                    .cast::<PyDataActor>()
393                    .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
394                let py_data_actor = py_data_actor_ref.borrow();
395
396                // Register the component in the global registry using the unsafe method
397                // SAFETY: The Python instance will remain alive, keeping the PyDataActor valid
398                unsafe {
399                    register_component_actor_by_ref(&*py_data_actor);
400                }
401
402                Ok(py_data_actor.actor_id())
403            },
404        )
405        .map_err(to_pyruntime_err)?;
406
407        // TODO: Add the actor ID to the trader for lifecycle management; clean up approach
408        self.kernel_mut()
409            .trader
410            .add_actor_id_for_lifecycle(actor_id)
411            .map_err(to_pyruntime_err)?;
412
413        // Store the Python actor reference to prevent garbage collection
414        // TODO: Add to a proper LiveNode registry for Python actors
415        std::mem::forget(python_actor); // Prevent dropping - we'll manage lifecycle manually
416
417        log::info!("Registered Python actor {actor_id}");
418        Ok(())
419    }
420
421    /// Returns a string representation of the node.
422    fn __repr__(&self) -> String {
423        format!(
424            "LiveNode(trader_id={}, environment={:?}, running={})",
425            self.trader_id(),
426            self.environment(),
427            self.is_running()
428        )
429    }
430}
431
432/// Python wrapper for `LiveNodeBuilder` that uses interior mutability
433/// to work around PyO3's shared ownership model.
434#[derive(Debug)]
435#[pyclass(name = "LiveNodeBuilder", module = "nautilus_trader.live", unsendable)]
436pub struct LiveNodeBuilderPy {
437    inner: Rc<RefCell<Option<LiveNodeBuilder>>>,
438}
439
440#[pymethods]
441impl LiveNodeBuilderPy {
442    /// Sets the instance ID for the node.
443    #[pyo3(name = "with_instance_id")]
444    fn py_with_instance_id(&self, instance_id: UUID4) -> PyResult<Self> {
445        let mut inner_ref = self.inner.borrow_mut();
446        if let Some(builder) = inner_ref.take() {
447            *inner_ref = Some(builder.with_instance_id(instance_id));
448            Ok(Self {
449                inner: self.inner.clone(),
450            })
451        } else {
452            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
453                "Builder already consumed",
454            ))
455        }
456    }
457
458    /// Sets whether to load state on startup.
459    #[pyo3(name = "with_load_state")]
460    fn py_with_load_state(&self, load_state: bool) -> PyResult<Self> {
461        let mut inner_ref = self.inner.borrow_mut();
462        if let Some(builder) = inner_ref.take() {
463            *inner_ref = Some(builder.with_load_state(load_state));
464            Ok(Self {
465                inner: self.inner.clone(),
466            })
467        } else {
468            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
469                "Builder already consumed",
470            ))
471        }
472    }
473
474    /// Sets whether to save state on shutdown.
475    #[pyo3(name = "with_save_state")]
476    fn py_with_save_state(&self, save_state: bool) -> PyResult<Self> {
477        let mut inner_ref = self.inner.borrow_mut();
478        if let Some(builder) = inner_ref.take() {
479            *inner_ref = Some(builder.with_save_state(save_state));
480            Ok(Self {
481                inner: self.inner.clone(),
482            })
483        } else {
484            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
485                "Builder already consumed",
486            ))
487        }
488    }
489
490    /// Adds a data client with factory and configuration.
491    #[pyo3(name = "add_data_client")]
492    fn py_add_data_client(
493        &self,
494        name: Option<String>,
495        factory: Py<PyAny>,
496        config: Py<PyAny>,
497    ) -> PyResult<Self> {
498        let mut inner_ref = self.inner.borrow_mut();
499        if let Some(builder) = inner_ref.take() {
500            Python::attach(|py| -> PyResult<Self> {
501                // Use the global registry to extract Py<PyAny>s to trait objects
502                let registry = get_global_pyo3_registry();
503
504                let boxed_factory = registry.extract_factory(py, factory.clone_ref(py))?;
505                let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
506
507                // Use the factory name from the original factory for the client name
508                let factory_name = factory
509                    .getattr(py, "name")?
510                    .call0(py)?
511                    .extract::<String>(py)?;
512                let client_name = name.unwrap_or(factory_name);
513
514                // Add the data client to the builder using boxed trait objects
515                match builder.add_data_client(Some(client_name), boxed_factory, boxed_config) {
516                    Ok(updated_builder) => {
517                        *inner_ref = Some(updated_builder);
518                        Ok(Self {
519                            inner: self.inner.clone(),
520                        })
521                    }
522                    Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
523                        "Failed to add data client: {e}"
524                    ))),
525                }
526            })
527        } else {
528            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
529                "Builder already consumed",
530            ))
531        }
532    }
533
534    /// Builds the node.
535    #[pyo3(name = "build")]
536    fn py_build(&self) -> PyResult<LiveNode> {
537        let mut inner_ref = self.inner.borrow_mut();
538        if let Some(builder) = inner_ref.take() {
539            match builder.build() {
540                Ok(node) => Ok(node),
541                Err(e) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
542                    e.to_string(),
543                )),
544            }
545        } else {
546            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
547                "Builder already consumed",
548            ))
549        }
550    }
551
552    /// Returns a string representation of the builder.
553    fn __repr__(&self) -> String {
554        format!("{self:?}")
555    }
556}