nautilus_common/python/msgbus.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 2Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::rc::Rc;
17
18use pyo3::{pymethods, PyObject, PyRefMut};
19use ustr::Ustr;
20
21use super::handler::PythonMessageHandler;
22use crate::msgbus::{database::BusMessage, handler::ShareableMessageHandler, MessageBus};
23
24#[pymethods]
25impl BusMessage {
26 #[getter]
27 #[pyo3(name = "topic")]
28 fn py_close(&mut self) -> String {
29 self.topic.clone()
30 }
31
32 #[getter]
33 #[pyo3(name = "payload")]
34 fn py_payload(&mut self) -> &[u8] {
35 self.payload.as_ref()
36 }
37}
38
39#[pymethods]
40impl MessageBus {
41 /// Sends a message to a an endpoint.
42 #[pyo3(name = "send")]
43 pub fn send_py(&self, endpoint: &str, message: PyObject) {
44 if let Some(handler) = self.get_endpoint(endpoint) {
45 handler.0.handle(&message);
46 }
47 }
48
49 /// Publish a message to a topic.
50 #[pyo3(name = "publish")]
51 pub fn publish_py(&self, topic: &str, message: PyObject) {
52 let topic = Ustr::from(topic);
53 let matching_subs = self.matching_subscriptions(&topic);
54
55 for sub in matching_subs {
56 sub.handler.0.handle(&message);
57 }
58 }
59
60 /// Registers the given `handler` for the `endpoint` address.
61 #[pyo3(name = "register")]
62 pub fn register_py(&mut self, endpoint: &str, handler: PythonMessageHandler) {
63 // Updates value if key already exists
64 let handler = ShareableMessageHandler(Rc::new(handler));
65 self.register(endpoint, handler);
66 }
67
68 /// Subscribes the given `handler` to the `topic`.
69 ///
70 /// The priority for the subscription determines the ordering of
71 /// handlers receiving messages being processed, higher priority
72 /// handlers will receive messages before lower priority handlers.
73 ///
74 /// Safety: Priority should be between 0 and 255
75 ///
76 /// # Warnings
77 ///
78 /// Assigning priority handling is an advanced feature which *shouldn't
79 /// normally be needed by most users*. **Only assign a higher priority to the
80 /// subscription if you are certain of what you're doing**. If an inappropriate
81 /// priority is assigned then the handler may receive messages before core
82 /// system components have been able to process necessary calculations and
83 /// produce potential side effects for logically sound behavior.
84 #[pyo3(name = "subscribe")]
85 #[pyo3(signature = (topic, handler, priority=None))]
86 pub fn subscribe_py(
87 mut slf: PyRefMut<'_, Self>,
88 topic: &str,
89 handler: PythonMessageHandler,
90 priority: Option<u8>,
91 ) {
92 // Updates value if key already exists
93 let handler = ShareableMessageHandler(Rc::new(handler));
94 slf.subscribe(topic, handler, priority);
95 }
96
97 /// Returns whether there are subscribers for the given `pattern`.
98 #[must_use]
99 #[pyo3(name = "is_subscribed")]
100 pub fn is_subscribed_py(&self, topic: &str, handler: PythonMessageHandler) -> bool {
101 let handler = ShareableMessageHandler(Rc::new(handler));
102 self.is_subscribed(topic, handler)
103 }
104
105 /// Unsubscribes the given `handler` from the `topic`.
106 #[pyo3(name = "unsubscribe")]
107 pub fn unsubscribe_py(&mut self, topic: &str, handler: PythonMessageHandler) {
108 let handler = ShareableMessageHandler(Rc::new(handler));
109 self.unsubscribe(topic, handler);
110 }
111
112 /// Returns whether there are subscribers for the given `pattern`.
113 #[must_use]
114 #[pyo3(name = "is_registered")]
115 pub fn is_registered_py(&self, endpoint: &str) -> bool {
116 self.is_registered(endpoint)
117 }
118
119 /// Deregisters the given `handler` for the `endpoint` address.
120 #[pyo3(name = "deregister")]
121 pub fn deregister_py(&mut self, endpoint: &str) {
122 // Removes entry if it exists for endpoint
123 self.deregister(&Ustr::from(endpoint));
124 }
125}