nautilus_common/
runtime.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The centralized Tokio runtime for a running Nautilus system.
17
18#[cfg(feature = "python")]
19use std::sync::Once;
20use std::{sync::OnceLock, time::Duration};
21
22#[cfg(feature = "python")]
23use pyo3::Python;
24use tokio::{runtime::Builder, task, time::timeout};
25
26static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
27#[cfg(feature = "python")]
28static PYTHON_INIT: Once = Once::new();
29
30/// Environment variable name to configure the number of OS threads for the common runtime.
31/// If not set or if the value cannot be parsed as a positive integer, the default value is used.
32const NAUTILUS_WORKER_THREADS: &str = "NAUTILUS_WORKER_THREADS";
33
34/// The default number of OS threads to use if the environment variable is not set.
35///
36/// 0 means Tokio will use the default (number of logical CPUs).
37const DEFAULT_OS_THREADS: usize = 0;
38
39/// Creates and configures a new multi-threaded Tokio runtime.
40///
41/// The number of OS threads is configured using the `NAUTILUS_WORKER_THREADS`
42/// environment variable. If not set, all available logical CPUs will be used.
43///
44/// # Panics
45///
46/// Panics if the runtime could not be created, which typically indicates
47/// an inability to spawn threads or allocate necessary resources.
48fn initialize_runtime() -> tokio::runtime::Runtime {
49    #[cfg(feature = "python")]
50    {
51        // Python hosts the process when we build as an extension module. Initializing
52        // here keeps the interpreter alive for the lifetime of the shared Tokio runtime
53        // so every worker thread sees a prepared PyO3 environment before using it.
54        PYTHON_INIT.call_once(|| {
55            Python::initialize();
56        });
57    }
58
59    let worker_threads = std::env::var(NAUTILUS_WORKER_THREADS)
60        .ok()
61        .and_then(|val| val.parse::<usize>().ok())
62        .unwrap_or(DEFAULT_OS_THREADS);
63
64    let mut builder = Builder::new_multi_thread();
65
66    let builder = if worker_threads > 0 {
67        builder.worker_threads(worker_threads)
68    } else {
69        &mut builder
70    };
71
72    builder
73        .enable_all()
74        .build()
75        .expect("Failed to create tokio runtime")
76}
77
78/// Returns a reference to the global Nautilus Tokio runtime.
79///
80/// The runtime is lazily initialized on the first call and reused thereafter.
81/// Intended for use cases where passing a runtime around is impractical.
82pub fn get_runtime() -> &'static tokio::runtime::Runtime {
83    RUNTIME.get_or_init(initialize_runtime)
84}
85
86/// Provides a best-effort flush for runtime tasks during shutdown.
87///
88/// The function yields once to the Tokio scheduler and gives outstanding tasks a chance
89/// to observe shutdown signals before Python finalizes the interpreter, which calls this via
90/// an `atexit` hook.
91pub fn shutdown_runtime(wait: Duration) {
92    if let Some(runtime) = RUNTIME.get() {
93        runtime.block_on(async {
94            let _ = timeout(wait, async {
95                task::yield_now().await;
96            })
97            .await;
98        });
99    }
100}