1use std::fmt::Display;
17
18use bytes::Bytes;
19use nautilus_core::UUID4;
20use nautilus_model::identifiers::TraderId;
21use serde::{Deserialize, Serialize};
22
23use crate::enums::SerializationEncoding;
24
25#[derive(Clone, Debug, Serialize, Deserialize)]
27#[cfg_attr(
28 feature = "python",
29 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
30)]
31pub struct BusMessage {
32 pub topic: String,
34 pub payload: Bytes,
36}
37
38impl Display for BusMessage {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(
41 f,
42 "[{}] {}",
43 self.topic,
44 String::from_utf8_lossy(&self.payload)
45 )
46 }
47}
48
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(default)]
56pub struct DatabaseConfig {
57 #[serde(alias = "type")]
59 pub database_type: String,
60 pub host: Option<String>,
62 pub port: Option<u16>,
64 pub username: Option<String>,
66 pub password: Option<String>,
68 pub ssl: bool,
70 pub timeout: u16,
72}
73
74impl Default for DatabaseConfig {
75 fn default() -> Self {
77 Self {
78 database_type: "redis".to_string(),
79 host: None,
80 port: None,
81 username: None,
82 password: None,
83 ssl: false,
84 timeout: 20,
85 }
86 }
87}
88
89#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
91#[serde(default)]
92pub struct MessageBusConfig {
93 pub database: Option<DatabaseConfig>,
95 pub encoding: SerializationEncoding,
97 pub timestamps_as_iso8601: bool,
100 pub buffer_interval_ms: Option<u32>,
104 pub autotrim_mins: Option<u32>,
108 pub use_trader_prefix: bool,
110 pub use_trader_id: bool,
112 pub use_instance_id: bool,
114 pub streams_prefix: String,
116 pub stream_per_topic: bool,
119 pub external_streams: Option<Vec<String>>,
121 pub types_filter: Option<Vec<String>>,
123 pub heartbeat_interval_secs: Option<u16>,
125}
126
127impl Default for MessageBusConfig {
128 fn default() -> Self {
130 Self {
131 database: None,
132 encoding: SerializationEncoding::MsgPack,
133 timestamps_as_iso8601: false,
134 buffer_interval_ms: None,
135 autotrim_mins: None,
136 use_trader_prefix: true,
137 use_trader_id: true,
138 use_instance_id: false,
139 streams_prefix: "stream".to_string(),
140 stream_per_topic: true,
141 external_streams: None,
142 types_filter: None,
143 heartbeat_interval_secs: None,
144 }
145 }
146}
147
148pub trait MessageBusDatabaseAdapter {
155 type DatabaseType;
156
157 fn new(
158 trader_id: TraderId,
159 instance_id: UUID4,
160 config: MessageBusConfig,
161 ) -> anyhow::Result<Self::DatabaseType>;
162 fn is_closed(&self) -> bool;
163 fn publish(&self, topic: String, payload: Bytes);
164 fn close(&mut self);
165}
166
167#[cfg(test)]
171mod tests {
172 use rstest::*;
173 use serde_json::json;
174
175 use super::*;
176
177 #[rstest]
178 fn test_default_database_config() {
179 let config = DatabaseConfig::default();
180 assert_eq!(config.database_type, "redis");
181 assert_eq!(config.host, None);
182 assert_eq!(config.port, None);
183 assert_eq!(config.username, None);
184 assert_eq!(config.password, None);
185 assert!(!config.ssl);
186 assert_eq!(config.timeout, 20);
187 }
188
189 #[rstest]
190 fn test_deserialize_database_config() {
191 let config_json = json!({
192 "type": "redis",
193 "host": "localhost",
194 "port": 6379,
195 "username": "user",
196 "password": "pass",
197 "ssl": true,
198 "timeout": 30
199 });
200 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
201 assert_eq!(config.database_type, "redis");
202 assert_eq!(config.host, Some("localhost".to_string()));
203 assert_eq!(config.port, Some(6379));
204 assert_eq!(config.username, Some("user".to_string()));
205 assert_eq!(config.password, Some("pass".to_string()));
206 assert!(config.ssl);
207 assert_eq!(config.timeout, 30);
208 }
209
210 #[rstest]
211 fn test_default_message_bus_config() {
212 let config = MessageBusConfig::default();
213 assert_eq!(config.encoding, SerializationEncoding::MsgPack);
214 assert!(!config.timestamps_as_iso8601);
215 assert_eq!(config.buffer_interval_ms, None);
216 assert_eq!(config.autotrim_mins, None);
217 assert!(config.use_trader_prefix);
218 assert!(config.use_trader_id);
219 assert!(!config.use_instance_id);
220 assert_eq!(config.streams_prefix, "stream");
221 assert!(config.stream_per_topic);
222 assert_eq!(config.external_streams, None);
223 assert_eq!(config.types_filter, None);
224 }
225
226 #[test]
227 fn test_deserialize_message_bus_config() {
228 let config_json = json!({
229 "database": {
230 "type": "redis",
231 "host": "localhost",
232 "port": 6379,
233 "username": "user",
234 "password": "pass",
235 "ssl": true,
236 "timeout": 30
237 },
238 "encoding": "json",
239 "timestamps_as_iso8601": true,
240 "buffer_interval_ms": 100,
241 "autotrim_mins": 60,
242 "use_trader_prefix": false,
243 "use_trader_id": false,
244 "use_instance_id": true,
245 "streams_prefix": "data_streams",
246 "stream_per_topic": false,
247 "external_streams": ["stream1", "stream2"],
248 "types_filter": ["type1", "type2"]
249 });
250 let config: MessageBusConfig = serde_json::from_value(config_json).unwrap();
251 assert_eq!(config.encoding, SerializationEncoding::Json);
252 assert!(config.timestamps_as_iso8601);
253 assert_eq!(config.buffer_interval_ms, Some(100));
254 assert_eq!(config.autotrim_mins, Some(60));
255 assert!(!config.use_trader_prefix);
256 assert!(!config.use_trader_id);
257 assert!(config.use_instance_id);
258 assert_eq!(config.streams_prefix, "data_streams");
259 assert!(!config.stream_per_topic);
260 assert_eq!(
261 config.external_streams,
262 Some(vec!["stream1".to_string(), "stream2".to_string()])
263 );
264 assert_eq!(
265 config.types_filter,
266 Some(vec!["type1".to_string(), "type2".to_string()])
267 );
268 }
269}