nautilus_common/live/runtime.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The centralized Tokio runtime for a running Nautilus system.
17//!
18//! # Design rationale
19//!
20//! NautilusTrader uses a single global Tokio runtime because:
21//! - A single long-lived runtime avoids repeated startup/shutdown overhead.
22//! - The runtime is lazily initialized on first call to `get_runtime()` via `OnceLock`.
23//! - Worker thread count is configurable via the `NAUTILUS_WORKER_THREADS` environment variable.
24//!
25//! # Python support
26//!
27//! When the `python` feature is enabled, the runtime initializes the Python interpreter
28//! before starting worker threads. The PyO3 module registers an `atexit` handler via
29//! `shutdown_runtime()` to cleanly shut down when Python exits.
30//!
31//! # Testing considerations
32//!
33//! The global runtime pattern makes it harder to inject test doubles. For testing:
34//! - Unit tests can use `#[tokio::test]` which creates its own runtime.
35//! - Integration tests should be aware they share the global runtime state.
36
37use std::{sync::OnceLock, time::Duration};
38
39use tokio::{runtime::Builder, task, time::timeout};
40
41static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
42
43/// Environment variable name to configure the number of OS threads for the common runtime.
44/// If not set or if the value cannot be parsed as a positive integer, the default value is used.
45const NAUTILUS_WORKER_THREADS: &str = "NAUTILUS_WORKER_THREADS";
46
47/// The default number of OS threads to use if the environment variable is not set.
48///
49/// 0 means Tokio will use the default (number of logical CPUs).
50const DEFAULT_OS_THREADS: usize = 0;
51
52/// Creates and configures a new multi-threaded Tokio runtime.
53///
54/// The number of OS threads is configured using the `NAUTILUS_WORKER_THREADS`
55/// environment variable. If not set, all available logical CPUs will be used.
56///
57/// # Panics
58///
59/// Panics if the runtime could not be created, which typically indicates
60/// an inability to spawn threads or allocate necessary resources.
61fn initialize_runtime() -> tokio::runtime::Runtime {
62 // Initialize Python if running as a Python extension module
63 #[cfg(feature = "python")]
64 {
65 crate::python::runtime::initialize_python();
66 }
67
68 let worker_threads = std::env::var(NAUTILUS_WORKER_THREADS)
69 .ok()
70 .and_then(|val| val.parse::<usize>().ok())
71 .unwrap_or(DEFAULT_OS_THREADS);
72
73 let mut builder = Builder::new_multi_thread();
74
75 let builder = if worker_threads > 0 {
76 builder.worker_threads(worker_threads)
77 } else {
78 &mut builder
79 };
80
81 builder
82 .enable_all()
83 .build()
84 .expect("Failed to create tokio runtime")
85}
86
87/// Returns a reference to the global Nautilus Tokio runtime.
88///
89/// The runtime is lazily initialized on the first call and reused thereafter.
90/// Intended for use cases where passing a runtime around is impractical.
91pub fn get_runtime() -> &'static tokio::runtime::Runtime {
92 RUNTIME.get_or_init(initialize_runtime)
93}
94
95/// Provides a best-effort flush for runtime tasks during shutdown.
96///
97/// The function yields once to the Tokio scheduler and gives outstanding tasks a chance
98/// to observe shutdown signals before Python finalizes the interpreter, which calls this via
99/// an `atexit` hook.
100pub fn shutdown_runtime(wait: Duration) {
101 if let Some(runtime) = RUNTIME.get() {
102 runtime.block_on(async {
103 let _ = timeout(wait, async {
104 task::yield_now().await;
105 })
106 .await;
107 });
108 }
109}