Skip to main content

nautilus_live/python/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for live node.
17
18use std::{cell::RefCell, rc::Rc};
19
20use nautilus_common::{
21    actor::data_actor::{DataActorConfig, ImportableActorConfig},
22    enums::Environment,
23    live::get_runtime,
24    python::actor::PyDataActor,
25};
26use nautilus_core::{
27    UUID4,
28    python::{to_pyruntime_err, to_pyvalue_err},
29};
30use nautilus_model::identifiers::{ActorId, TraderId};
31use nautilus_system::get_global_pyo3_registry;
32use pyo3::{
33    prelude::*,
34    types::{PyDict, PyTuple},
35};
36use serde_json;
37
38use crate::{builder::LiveNodeBuilder, node::LiveNode};
39
40#[pymethods]
41impl LiveNode {
42    #[staticmethod]
43    #[pyo3(name = "builder")]
44    fn py_builder(
45        name: String,
46        trader_id: TraderId,
47        environment: Environment,
48    ) -> PyResult<LiveNodeBuilderPy> {
49        match Self::builder(trader_id, environment) {
50            Ok(builder) => Ok(LiveNodeBuilderPy {
51                inner: Rc::new(RefCell::new(Some(builder.with_name(name)))),
52            }),
53            Err(e) => Err(to_pyruntime_err(e)),
54        }
55    }
56
57    #[getter]
58    #[pyo3(name = "environment")]
59    fn py_environment(&self) -> Environment {
60        self.environment()
61    }
62
63    #[getter]
64    #[pyo3(name = "trader_id")]
65    fn py_trader_id(&self) -> TraderId {
66        self.trader_id()
67    }
68
69    #[getter]
70    #[pyo3(name = "instance_id")]
71    const fn py_instance_id(&self) -> UUID4 {
72        self.instance_id()
73    }
74
75    #[getter]
76    #[pyo3(name = "is_running")]
77    fn py_is_running(&self) -> bool {
78        self.is_running()
79    }
80
81    #[pyo3(name = "start")]
82    fn py_start(&mut self) -> PyResult<()> {
83        if self.is_running() {
84            return Err(to_pyruntime_err("LiveNode is already running"));
85        }
86
87        // Non-blocking start - just start the node in the background
88        get_runtime().block_on(async { self.start().await.map_err(to_pyruntime_err) })
89    }
90
91    #[pyo3(name = "run")]
92    fn py_run(&mut self, py: Python) -> PyResult<()> {
93        if self.is_running() {
94            return Err(to_pyruntime_err("LiveNode is already running"));
95        }
96
97        // Get a handle for coordinating with the signal checker
98        let handle = self.handle();
99
100        // Import signal module
101        let signal_module = py.import("signal")?;
102        let original_handler =
103            signal_module.call_method1("signal", (2, signal_module.getattr("SIG_DFL")?))?; // Save original SIGINT handler (signal 2)
104
105        // Set up a custom signal handler that uses our handle
106        let handle_for_signal = handle;
107        let signal_callback = pyo3::types::PyCFunction::new_closure(
108            py,
109            None,
110            None,
111            move |_args: &pyo3::Bound<'_, PyTuple>,
112                  _kwargs: Option<&pyo3::Bound<'_, PyDict>>|
113                  -> PyResult<()> {
114                log::info!("Python signal handler called");
115                handle_for_signal.stop();
116                Ok(())
117            },
118        )?;
119
120        // Install our signal handler
121        signal_module.call_method1("signal", (2, signal_callback))?;
122
123        // Run the node and restore signal handler afterward
124        let result =
125            { get_runtime().block_on(async { self.run().await.map_err(to_pyruntime_err) }) };
126
127        // Restore original signal handler
128        signal_module.call_method1("signal", (2, original_handler))?;
129
130        result
131    }
132
133    #[pyo3(name = "stop")]
134    fn py_stop(&self) -> PyResult<()> {
135        if !self.is_running() {
136            return Err(to_pyruntime_err("LiveNode is not running"));
137        }
138
139        // Use the handle to signal stop - this is thread-safe and doesn't require async
140        self.handle().stop();
141        Ok(())
142    }
143
144    #[allow(
145        unsafe_code,
146        reason = "Required for Python actor component registration"
147    )]
148    #[pyo3(name = "add_actor_from_config")]
149    fn py_add_actor_from_config(
150        &mut self,
151        _py: Python,
152        config: ImportableActorConfig,
153    ) -> PyResult<()> {
154        log::debug!("`add_actor_from_config` with: {config:?}");
155
156        // Extract module and class name from actor_path
157        let parts: Vec<&str> = config.actor_path.split(':').collect();
158        if parts.len() != 2 {
159            return Err(to_pyvalue_err(
160                "actor_path must be in format 'module.path:ClassName'",
161            ));
162        }
163        let (module_name, class_name) = (parts[0], parts[1]);
164
165        log::info!("Importing actor from module: {module_name} class: {class_name}");
166
167        // Import the Python class to verify it exists and get it for method dispatch
168        let _python_class = Python::attach(|py| -> PyResult<Py<PyAny>> {
169            let actor_module = py.import(module_name)?;
170            let actor_class = actor_module.getattr(class_name)?;
171            Ok(actor_class.unbind())
172        })
173        .map_err(|e| to_pyruntime_err(format!("Failed to import Python class: {e}")))?;
174
175        // Create default DataActorConfig for Rust PyDataActor
176        // Inherited config attributes will be extracted and wired in after Python actor creation
177        let basic_data_actor_config = DataActorConfig::default();
178
179        log::debug!("Created basic DataActorConfig for Rust: {basic_data_actor_config:?}");
180
181        // Create the Python actor and register the internal PyDataActor
182        let python_actor = Python::attach(|py| -> anyhow::Result<Py<PyAny>> {
183            // Import the Python class
184            let actor_module = py
185                .import(module_name)
186                .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
187            let actor_class = actor_module
188                .getattr(class_name)
189                .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
190
191            // Create config instance if config_path and config are provided
192            let config_instance = if !config.config_path.is_empty() && !config.config.is_empty() {
193                // Parse the config_path to get module and class
194                let config_parts: Vec<&str> = config.config_path.split(':').collect();
195                if config_parts.len() != 2 {
196                    anyhow::bail!(
197                        "config_path must be in format 'module.path:ClassName', was {}",
198                        config.config_path
199                    );
200                }
201                let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
202
203                log::debug!("Importing config class from module: {config_module_name} class: {config_class_name}");
204
205                // Import the config class
206                let config_module = py
207                    .import(config_module_name)
208                    .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
209                let config_class = config_module
210                    .getattr(config_class_name)
211                    .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
212
213                // Convert the serde_json::Value config dict to a Python dict
214                let py_dict = PyDict::new(py);
215                for (key, value) in &config.config {
216                    // Convert serde_json::Value back to Python object via JSON
217                    let json_str = serde_json::to_string(value)
218                        .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
219                    let py_value = PyModule::import(py, "json")?
220                        .call_method("loads", (json_str,), None)?;
221                    py_dict.set_item(key, py_value)?;
222                }
223
224                log::debug!("Created config dict: {py_dict:?}");
225
226                // Try multiple approaches to create the config instance
227                let config_instance = {
228                    // First, try calling the config class with **kwargs (this works if the dataclass handles string conversion)
229                    match config_class.call((), Some(&py_dict)) {
230                        Ok(instance) => {
231                            log::debug!("Successfully created config instance with kwargs");
232
233                            // Manually call __post_init__ if it exists
234                            if let Err(e) = instance.call_method0("__post_init__") {
235                                log::error!("Failed to call __post_init__ on config instance: {e}");
236                                anyhow::bail!("__post_init__ failed: {e}");
237                            }
238                            log::debug!("Successfully called __post_init__ on config instance");
239
240                            instance
241                        },
242                        Err(kwargs_err) => {
243                            log::debug!("Failed to create config with kwargs: {kwargs_err}");
244
245                            // Second approach: try to create with default constructor and set attributes
246                            match config_class.call0() {
247                                Ok(instance) => {
248                                    log::debug!("Created default config instance, setting attributes");
249                                    for (key, value) in &config.config {
250                                        // Convert serde_json::Value to Python object
251                                        let json_str = serde_json::to_string(value)
252                                            .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
253                                        let py_value = PyModule::import(py, "json")?
254                                            .call_method("loads", (json_str,), None)?;
255                                        if let Err(setattr_err) = instance.setattr(key, py_value) {
256                                            log::warn!("Failed to set attribute {key}: {setattr_err}");
257                                        }
258                                    }
259
260                                    // Manually call __post_init__ if it exists
261                                    if let Err(e) = instance.call_method0("__post_init__") {
262                                        log::error!("Failed to call __post_init__ on config instance: {e}");
263                                        anyhow::bail!("__post_init__ failed: {e}");
264                                    }
265                                    log::debug!("Called __post_init__ on config instance");
266
267                                    instance
268                                },
269                                Err(default_err) => {
270                                    log::debug!("Failed to create default config: {default_err}");
271
272                                    // If both approaches fail, return the original error
273                                    anyhow::bail!(
274                                        "Failed to create config instance. Tried kwargs approach: {kwargs_err}, default constructor: {default_err}"
275                                    );
276                                }
277                            }
278                        }
279                    }
280                };
281
282                log::debug!("Created config instance: {config_instance:?}");
283
284                Some(config_instance)
285            } else {
286                log::debug!("No config_path or empty config, using None");
287                None
288            };
289
290            // Create the Python actor instance with the config
291            let python_actor = if let Some(config_obj) = config_instance.clone() {
292                actor_class.call1((config_obj,))?
293            } else {
294                actor_class.call0()?
295            };
296
297            log::debug!("Created Python actor instance: {python_actor:?}");
298
299            // Get a mutable reference to the internal PyDataActor for registration
300            let mut py_data_actor_ref = python_actor
301                .extract::<PyRefMut<PyDataActor>>()
302                .map_err(Into::<PyErr>::into)
303                .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
304
305            log::debug!(
306                "Internal PyDataActor mem_addr: {}, registered: {}",
307                &py_data_actor_ref.mem_address(),
308                py_data_actor_ref.is_registered()
309            );
310
311            // Extract inherited DataActorConfig fields from the Python actor instance
312            // and wire them into the PyDataActor's core config
313            if let Some(config_obj) = config_instance.as_ref() {
314                log::debug!("Extracting inherited config fields from Python actor config");
315
316                // Extract actor_id if present
317                if let Ok(actor_id) = config_obj.getattr("actor_id")
318                    && !actor_id.is_none() {
319                        // Try to extract as ActorId first, then as string
320                        let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
321                            actor_id_val
322                        } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
323                            ActorId::from(actor_id_str.as_str())
324                        } else {
325                            log::warn!("Failed to extract actor_id as ActorId or String");
326                            anyhow::bail!("Invalid `actor_id` type");
327                        };
328
329                        log::debug!("Extracted actor_id: {actor_id_val}");
330                        py_data_actor_ref.set_actor_id(actor_id_val);
331                    }
332
333                // Extract log_events if present
334                if let Ok(log_events) = config_obj.getattr("log_events")
335                    && let Ok(log_events_val) = log_events.extract::<bool>() {
336                        log::debug!("Extracted log_events: {log_events_val}");
337                        py_data_actor_ref.set_log_events(log_events_val);
338                    }
339
340                // Extract log_commands if present
341                if let Ok(log_commands) = config_obj.getattr("log_commands")
342                    && let Ok(log_commands_val) = log_commands.extract::<bool>() {
343                        log::debug!("Extracted log_commands: {log_commands_val}");
344                        py_data_actor_ref.set_log_commands(log_commands_val);
345                    }
346
347                log::debug!("Successfully updated PyDataActor config from Python actor instance");
348            }
349
350            // Set the Python instance reference for method dispatch on the original
351            py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
352
353            log::debug!("Set Python instance reference for method dispatch");
354
355            // Register the internal PyDataActor
356            let trader_id = self.trader_id();
357            let clock = self.kernel().clock();
358            let cache = self.kernel().cache();
359
360            py_data_actor_ref
361                .register(trader_id, clock, cache)
362                .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
363
364            log::debug!(
365                "Internal PyDataActor registered: {}, state: {:?}",
366                py_data_actor_ref.is_registered(),
367                py_data_actor_ref.state()
368            );
369
370            Ok(python_actor.unbind())
371        })
372        .map_err(to_pyruntime_err)?;
373
374        let actor_id = Python::attach(|py| -> anyhow::Result<ActorId> {
375            let py_actor = python_actor.bind(py);
376            let py_data_actor_ref = py_actor
377                .cast::<PyDataActor>()
378                .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
379            let py_data_actor = py_data_actor_ref.borrow();
380            py_data_actor.register_in_global_registries();
381
382            Ok(py_data_actor.actor_id())
383        })
384        .map_err(to_pyruntime_err)?;
385
386        self.kernel_mut()
387            .trader
388            .add_actor_id_for_lifecycle(actor_id)
389            .map_err(to_pyruntime_err)?;
390
391        // Note: No mem::forget needed - the actor's py_self field holds a Py<PyAny>
392        // that keeps the Python instance alive, and registries share the inner via Rc::clone()
393
394        log::info!("Registered Python actor {actor_id}");
395        Ok(())
396    }
397
398    /// Returns a string representation of the node.
399    fn __repr__(&self) -> String {
400        format!(
401            "LiveNode(trader_id={}, environment={:?}, running={})",
402            self.trader_id(),
403            self.environment(),
404            self.is_running()
405        )
406    }
407}
408
409/// Python wrapper for `LiveNodeBuilder` that uses interior mutability
410/// to work around PyO3's shared ownership model.
411#[derive(Debug)]
412#[pyclass(name = "LiveNodeBuilder", module = "nautilus_trader.live", unsendable)]
413pub struct LiveNodeBuilderPy {
414    inner: Rc<RefCell<Option<LiveNodeBuilder>>>,
415}
416
417#[pymethods]
418impl LiveNodeBuilderPy {
419    /// Sets the instance ID for the node.
420    #[pyo3(name = "with_instance_id")]
421    fn py_with_instance_id(&self, instance_id: UUID4) -> PyResult<Self> {
422        let mut inner_ref = self.inner.borrow_mut();
423        if let Some(builder) = inner_ref.take() {
424            *inner_ref = Some(builder.with_instance_id(instance_id));
425            Ok(Self {
426                inner: self.inner.clone(),
427            })
428        } else {
429            Err(to_pyruntime_err("Builder already consumed"))
430        }
431    }
432
433    /// Sets whether to load state on startup.
434    #[pyo3(name = "with_load_state")]
435    fn py_with_load_state(&self, load_state: bool) -> PyResult<Self> {
436        let mut inner_ref = self.inner.borrow_mut();
437        if let Some(builder) = inner_ref.take() {
438            *inner_ref = Some(builder.with_load_state(load_state));
439            Ok(Self {
440                inner: self.inner.clone(),
441            })
442        } else {
443            Err(to_pyruntime_err("Builder already consumed"))
444        }
445    }
446
447    /// Sets whether to save state on shutdown.
448    #[pyo3(name = "with_save_state")]
449    fn py_with_save_state(&self, save_state: bool) -> PyResult<Self> {
450        let mut inner_ref = self.inner.borrow_mut();
451        if let Some(builder) = inner_ref.take() {
452            *inner_ref = Some(builder.with_save_state(save_state));
453            Ok(Self {
454                inner: self.inner.clone(),
455            })
456        } else {
457            Err(to_pyruntime_err("Builder already consumed"))
458        }
459    }
460
461    /// Adds a data client with factory and configuration.
462    #[pyo3(name = "add_data_client")]
463    fn py_add_data_client(
464        &self,
465        name: Option<String>,
466        factory: Py<PyAny>,
467        config: Py<PyAny>,
468    ) -> PyResult<Self> {
469        let mut inner_ref = self.inner.borrow_mut();
470        if let Some(builder) = inner_ref.take() {
471            Python::attach(|py| -> PyResult<Self> {
472                // Use the global registry to extract Py<PyAny>s to trait objects
473                let registry = get_global_pyo3_registry();
474
475                let boxed_factory = registry.extract_factory(py, factory.clone_ref(py))?;
476                let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
477
478                // Use the factory name from the original factory for the client name
479                let factory_name = factory
480                    .getattr(py, "name")?
481                    .call0(py)?
482                    .extract::<String>(py)?;
483                let client_name = name.unwrap_or(factory_name);
484
485                // Add the data client to the builder using boxed trait objects
486                match builder.add_data_client(Some(client_name), boxed_factory, boxed_config) {
487                    Ok(updated_builder) => {
488                        *inner_ref = Some(updated_builder);
489                        Ok(Self {
490                            inner: self.inner.clone(),
491                        })
492                    }
493                    Err(e) => Err(to_pyruntime_err(format!("Failed to add data client: {e}"))),
494                }
495            })
496        } else {
497            Err(to_pyruntime_err("Builder already consumed"))
498        }
499    }
500
501    /// Builds the node.
502    #[pyo3(name = "build")]
503    fn py_build(&self) -> PyResult<LiveNode> {
504        let mut inner_ref = self.inner.borrow_mut();
505        if let Some(builder) = inner_ref.take() {
506            match builder.build() {
507                Ok(node) => Ok(node),
508                Err(e) => Err(to_pyruntime_err(e)),
509            }
510        } else {
511            Err(to_pyruntime_err("Builder already consumed"))
512        }
513    }
514
515    /// Returns a string representation of the builder.
516    fn __repr__(&self) -> String {
517        format!("{self:?}")
518    }
519}