nautilus_common/msgbus/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Message handler functionality for the message bus system.
17//!
18//! This module provides a trait and implementations for handling messages
19//! in a type-safe manner, enabling both typed and untyped message processing.
20
21use std::{
22    any::{Any, type_name},
23    fmt::Debug,
24    marker::PhantomData,
25    rc::Rc,
26};
27
28use nautilus_core::UUID4;
29use ustr::Ustr;
30
31pub trait MessageHandler: Any {
32    /// Returns the unique identifier for this handler.
33    fn id(&self) -> Ustr;
34    /// Handles a message of any type.
35    fn handle(&self, message: &dyn Any);
36    /// Returns this handler as a trait object.
37    fn as_any(&self) -> &dyn Any;
38}
39
40impl PartialEq for dyn MessageHandler {
41    fn eq(&self, other: &Self) -> bool {
42        self.id() == other.id()
43    }
44}
45
46impl Eq for dyn MessageHandler {}
47
48#[derive(Debug)]
49pub struct TypedMessageHandler<T: 'static + ?Sized, F: Fn(&T) + 'static> {
50    id: Ustr,
51    callback: F,
52    _phantom: PhantomData<T>,
53}
54
55impl<T: 'static, F: Fn(&T) + 'static> TypedMessageHandler<T, F> {
56    /// Creates a new handler with an optional custom ID.
57    pub fn new<S: AsRef<str>>(id: Option<S>, callback: F) -> Self {
58        let id_ustr = id.map_or_else(
59            || generate_handler_id(&callback),
60            |s| Ustr::from(s.as_ref()),
61        );
62
63        Self {
64            id: id_ustr,
65            callback,
66            _phantom: PhantomData,
67        }
68    }
69
70    /// Creates a new handler with an auto-generated ID.
71    pub fn from(callback: F) -> Self {
72        Self::new::<Ustr>(None, callback)
73    }
74}
75
76impl<T: 'static, F: Fn(&T) + 'static> MessageHandler for TypedMessageHandler<T, F> {
77    fn id(&self) -> Ustr {
78        self.id
79    }
80
81    fn handle(&self, message: &dyn Any) {
82        if let Some(typed_msg) = message.downcast_ref::<T>() {
83            (self.callback)(typed_msg);
84        } else {
85            log::error!("Expected message of type {}", type_name::<T>());
86        }
87    }
88
89    fn as_any(&self) -> &dyn Any {
90        self
91    }
92}
93
94impl<F: Fn(&dyn Any) + 'static> TypedMessageHandler<dyn Any, F> {
95    /// Creates a new handler for dynamic Any messages with an optional custom ID.
96    pub fn new_any<S: AsRef<str>>(id: Option<S>, callback: F) -> Self {
97        let id_ustr = id.map_or_else(
98            || generate_handler_id(&callback),
99            |s| Ustr::from(s.as_ref()),
100        );
101
102        Self {
103            id: id_ustr,
104            callback,
105            _phantom: PhantomData,
106        }
107    }
108
109    /// Creates a handler for Any messages with an optional ID.
110    pub fn from_any<S: AsRef<str>>(id_opt: Option<S>, callback: F) -> Self {
111        Self::new_any(id_opt, callback)
112    }
113
114    /// Creates a handler for Any messages with an auto-generated ID.
115    pub fn with_any(callback: F) -> Self {
116        Self::new_any::<&str>(None, callback)
117    }
118}
119
120impl<F: Fn(&dyn Any) + 'static> MessageHandler for TypedMessageHandler<dyn Any, F> {
121    fn id(&self) -> Ustr {
122        self.id
123    }
124
125    fn handle(&self, message: &dyn Any) {
126        (self.callback)(message);
127    }
128
129    fn as_any(&self) -> &dyn Any {
130        self
131    }
132}
133
134fn generate_handler_id<T: 'static + ?Sized, F: 'static + Fn(&T)>(callback: &F) -> Ustr {
135    let callback_ptr = std::ptr::from_ref(callback);
136    let uuid = UUID4::new();
137    Ustr::from(&format!("<{callback_ptr:?}>-{uuid}"))
138}
139
140// ShareableMessageHandler contains Rc<dyn MessageHandler> which is not Send/Sync.
141// This is intentional - message handlers are designed for single-threaded use within
142// each async runtime. The MessageBus uses thread-local storage to ensure each thread
143// gets its own handlers, eliminating the need for unsafe Send/Sync implementations.
144#[repr(transparent)]
145#[derive(Clone)]
146pub struct ShareableMessageHandler(pub Rc<dyn MessageHandler>);
147
148impl ShareableMessageHandler {
149    pub fn id(&self) -> Ustr {
150        self.0.id()
151    }
152}
153
154impl Debug for ShareableMessageHandler {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        f.debug_struct(stringify!(ShareableMessageHandler))
157            .field("id", &self.0.id())
158            .field("type", &std::any::type_name::<Self>().to_string())
159            .finish()
160    }
161}
162
163impl From<Rc<dyn MessageHandler>> for ShareableMessageHandler {
164    fn from(value: Rc<dyn MessageHandler>) -> Self {
165        Self(value)
166    }
167}