nautilus_common/python/msgbus.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 2Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::rc::Rc;
17
18use pyo3::{PyObject, pymethods};
19use ustr::Ustr;
20
21use super::handler::PythonMessageHandler;
22use crate::msgbus::{
23 MessageBus, database::BusMessage, deregister, handler::ShareableMessageHandler, register,
24 subscribe, unsubscribe,
25};
26
27#[pymethods]
28impl BusMessage {
29 #[getter]
30 #[pyo3(name = "topic")]
31 fn py_close(&mut self) -> String {
32 self.topic.clone()
33 }
34
35 #[getter]
36 #[pyo3(name = "payload")]
37 fn py_payload(&mut self) -> &[u8] {
38 self.payload.as_ref()
39 }
40}
41
42#[pymethods]
43impl MessageBus {
44 /// Sends a message to a an endpoint.
45 #[pyo3(name = "send")]
46 pub fn py_send(&self, endpoint: &str, message: PyObject) {
47 if let Some(handler) = self.get_endpoint(endpoint) {
48 handler.0.handle(&message);
49 }
50 }
51
52 /// Publish a message to a topic.
53 #[pyo3(name = "publish")]
54 pub fn py_publish(&self, topic: &str, message: PyObject) {
55 let topic = Ustr::from(topic);
56 let matching_subs = self.matching_subscriptions(&topic);
57
58 for sub in matching_subs {
59 sub.handler.0.handle(&message);
60 }
61 }
62
63 /// Registers the given `handler` for the `endpoint` address.
64 #[pyo3(name = "register")]
65 #[staticmethod]
66 pub fn py_register(endpoint: &str, handler: PythonMessageHandler) {
67 // Updates value if key already exists
68 let handler = ShareableMessageHandler(Rc::new(handler));
69 register(endpoint, handler);
70 }
71
72 /// Subscribes the given `handler` to the `topic`.
73 ///
74 /// The priority for the subscription determines the ordering of
75 /// handlers receiving messages being processed, higher priority
76 /// handlers will receive messages before lower priority handlers.
77 ///
78 /// Safety: Priority should be between 0 and 255
79 ///
80 /// # Warnings
81 ///
82 /// Assigning priority handling is an advanced feature which *shouldn't
83 /// normally be needed by most users*. **Only assign a higher priority to the
84 /// subscription if you are certain of what you're doing**. If an inappropriate
85 /// priority is assigned then the handler may receive messages before core
86 /// system components have been able to process necessary calculations and
87 /// produce potential side effects for logically sound behavior.
88 #[pyo3(name = "subscribe")]
89 #[pyo3(signature = (topic, handler, priority=None))]
90 #[staticmethod]
91 pub fn py_subscribe(topic: &str, handler: PythonMessageHandler, priority: Option<u8>) {
92 // Updates value if key already exists
93 let handler = ShareableMessageHandler(Rc::new(handler));
94 subscribe(topic, handler, priority);
95 }
96
97 /// Returns whether there are subscribers for the given `pattern`.
98 #[must_use]
99 #[pyo3(name = "is_subscribed")]
100 pub fn py_is_subscribed(&self, topic: &str, handler: PythonMessageHandler) -> bool {
101 let handler = ShareableMessageHandler(Rc::new(handler));
102 self.is_subscribed(topic, handler)
103 }
104
105 /// Unsubscribes the given `handler` from the `topic`.
106 #[pyo3(name = "unsubscribe")]
107 #[staticmethod]
108 pub fn py_unsubscribe(topic: &str, handler: PythonMessageHandler) {
109 let handler = ShareableMessageHandler(Rc::new(handler));
110 unsubscribe(topic, handler);
111 }
112
113 /// Returns whether there are subscribers for the given `pattern`.
114 #[must_use]
115 #[pyo3(name = "is_registered")]
116 pub fn py_is_registered(&self, endpoint: &str) -> bool {
117 self.is_registered(endpoint)
118 }
119
120 /// Deregisters the given `handler` for the `endpoint` address.
121 #[pyo3(name = "deregister")]
122 #[staticmethod]
123 pub fn py_deregister(endpoint: &str) {
124 // Removes entry if it exists for endpoint
125 deregister(&Ustr::from(endpoint));
126 }
127}