nautilus_common/live/
runtime.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The centralized Tokio runtime for a running Nautilus system.
17//!
18//! # Design rationale
19//!
20//! NautilusTrader uses a single global Tokio runtime because:
21//! - A single long-lived runtime avoids repeated startup/shutdown overhead.
22//! - The runtime is lazily initialized on first call to `get_runtime()` via `OnceLock`.
23//! - Worker thread count is configurable via the `NAUTILUS_WORKER_THREADS` environment variable.
24//!
25//! # Python support
26//!
27//! When the `python` feature is enabled, the runtime initializes the Python interpreter
28//! before starting worker threads. The PyO3 module registers an `atexit` handler via
29//! `shutdown_runtime()` to cleanly shut down when Python exits.
30//!
31//! # Testing considerations
32//!
33//! The global runtime pattern makes it harder to inject test doubles. For testing:
34//! - Unit tests can use `#[tokio::test]` which creates its own runtime.
35//! - Integration tests should be aware they share the global runtime state.
36
37use std::{sync::OnceLock, time::Duration};
38
39use tokio::{runtime::Builder, task, time::timeout};
40
41static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
42
43/// Environment variable name to configure the number of OS threads for the common runtime.
44/// If not set or if the value cannot be parsed as a positive integer, the default value is used.
45const NAUTILUS_WORKER_THREADS: &str = "NAUTILUS_WORKER_THREADS";
46
47/// The default number of OS threads to use if the environment variable is not set.
48///
49/// 0 means Tokio will use the default (number of logical CPUs).
50const DEFAULT_OS_THREADS: usize = 0;
51
52/// Creates and configures a new multi-threaded Tokio runtime.
53///
54/// The number of OS threads is configured using the `NAUTILUS_WORKER_THREADS`
55/// environment variable. If not set, all available logical CPUs will be used.
56///
57/// # Panics
58///
59/// Panics if the runtime could not be created, which typically indicates
60/// an inability to spawn threads or allocate necessary resources.
61fn initialize_runtime() -> tokio::runtime::Runtime {
62    // Initialize Python if running as a Python extension module
63    #[cfg(feature = "python")]
64    {
65        crate::python::runtime::initialize_python();
66    }
67
68    let worker_threads = std::env::var(NAUTILUS_WORKER_THREADS)
69        .ok()
70        .and_then(|val| val.parse::<usize>().ok())
71        .unwrap_or(DEFAULT_OS_THREADS);
72
73    let mut builder = Builder::new_multi_thread();
74
75    let builder = if worker_threads > 0 {
76        builder.worker_threads(worker_threads)
77    } else {
78        &mut builder
79    };
80
81    builder
82        .enable_all()
83        .build()
84        .expect("Failed to create tokio runtime")
85}
86
87/// Returns a reference to the global Nautilus Tokio runtime.
88///
89/// The runtime is lazily initialized on the first call and reused thereafter.
90/// Intended for use cases where passing a runtime around is impractical.
91pub fn get_runtime() -> &'static tokio::runtime::Runtime {
92    RUNTIME.get_or_init(initialize_runtime)
93}
94
95/// Provides a best-effort flush for runtime tasks during shutdown.
96///
97/// The function yields once to the Tokio scheduler and gives outstanding tasks a chance
98/// to observe shutdown signals before Python finalizes the interpreter, which calls this via
99/// an `atexit` hook.
100pub fn shutdown_runtime(wait: Duration) {
101    if let Some(runtime) = RUNTIME.get() {
102        runtime.block_on(async {
103            let _ = timeout(wait, async {
104                task::yield_now().await;
105            })
106            .await;
107        });
108    }
109}