1use bytes::Bytes;
17use nautilus_core::UUID4;
18use nautilus_model::identifiers::TraderId;
19use serde::{Deserialize, Serialize};
20use ustr::Ustr;
21
22use crate::enums::SerializationEncoding;
23
24#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
30#[serde(default)]
31pub struct DatabaseConfig {
32 #[serde(alias = "type")]
34 pub database_type: String,
35 pub host: Option<String>,
37 pub port: Option<u16>,
39 pub username: Option<String>,
41 pub password: Option<String>,
43 pub ssl: bool,
45 pub connection_timeout: u16,
47 pub response_timeout: u16,
49 pub number_of_retries: usize,
51 pub exponent_base: u64,
53 pub max_delay: u64,
55 pub factor: u64,
57}
58
59impl Default for DatabaseConfig {
60 fn default() -> Self {
62 Self {
63 database_type: "redis".to_string(),
64 host: None,
65 port: None,
66 username: None,
67 password: None,
68 ssl: false,
69 connection_timeout: 20,
70 response_timeout: 20,
71 number_of_retries: 100,
72 exponent_base: 2,
73 max_delay: 1000,
74 factor: 2,
75 }
76 }
77}
78
79#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(default)]
82pub struct MessageBusConfig {
83 pub database: Option<DatabaseConfig>,
85 pub encoding: SerializationEncoding,
87 pub timestamps_as_iso8601: bool,
90 pub buffer_interval_ms: Option<u32>,
94 pub autotrim_mins: Option<u32>,
98 pub use_trader_prefix: bool,
100 pub use_trader_id: bool,
102 pub use_instance_id: bool,
104 pub streams_prefix: String,
106 pub stream_per_topic: bool,
109 pub external_streams: Option<Vec<String>>,
111 pub types_filter: Option<Vec<String>>,
113 pub heartbeat_interval_secs: Option<u16>,
115}
116
117impl Default for MessageBusConfig {
118 fn default() -> Self {
120 Self {
121 database: None,
122 encoding: SerializationEncoding::MsgPack,
123 timestamps_as_iso8601: false,
124 buffer_interval_ms: None,
125 autotrim_mins: None,
126 use_trader_prefix: true,
127 use_trader_id: true,
128 use_instance_id: false,
129 streams_prefix: "stream".to_string(),
130 stream_per_topic: true,
131 external_streams: None,
132 types_filter: None,
133 heartbeat_interval_secs: None,
134 }
135 }
136}
137
138pub trait MessageBusDatabaseAdapter {
145 type DatabaseType;
146
147 fn new(
151 trader_id: TraderId,
152 instance_id: UUID4,
153 config: MessageBusConfig,
154 ) -> anyhow::Result<Self::DatabaseType>;
155 fn is_closed(&self) -> bool;
156 fn publish(&self, topic: Ustr, payload: Bytes);
157 fn close(&mut self);
158}
159
160#[cfg(test)]
161mod tests {
162 use rstest::*;
163 use serde_json::json;
164
165 use super::*;
166
167 #[rstest]
168 fn test_default_database_config() {
169 let config = DatabaseConfig::default();
170 assert_eq!(config.database_type, "redis");
171 assert_eq!(config.host, None);
172 assert_eq!(config.port, None);
173 assert_eq!(config.username, None);
174 assert_eq!(config.password, None);
175 assert!(!config.ssl);
176 assert_eq!(config.connection_timeout, 20);
177 assert_eq!(config.response_timeout, 20);
178 assert_eq!(config.number_of_retries, 100);
179 assert_eq!(config.exponent_base, 2);
180 assert_eq!(config.max_delay, 1000);
181 assert_eq!(config.factor, 2);
182 }
183
184 #[rstest]
185 fn test_deserialize_database_config() {
186 let config_json = json!({
187 "type": "redis",
188 "host": "localhost",
189 "port": 6379,
190 "username": "user",
191 "password": "pass",
192 "ssl": true,
193 "connection_timeout": 30,
194 "response_timeout": 10,
195 "number_of_retries": 3,
196 "exponent_base": 2,
197 "max_delay": 10,
198 "factor": 2
199 });
200 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
201 assert_eq!(config.database_type, "redis");
202 assert_eq!(config.host, Some("localhost".to_string()));
203 assert_eq!(config.port, Some(6379));
204 assert_eq!(config.username, Some("user".to_string()));
205 assert_eq!(config.password, Some("pass".to_string()));
206 assert!(config.ssl);
207 assert_eq!(config.connection_timeout, 30);
208 assert_eq!(config.response_timeout, 10);
209 assert_eq!(config.number_of_retries, 3);
210 assert_eq!(config.exponent_base, 2);
211 assert_eq!(config.max_delay, 10);
212 assert_eq!(config.factor, 2);
213 }
214
215 #[rstest]
216 fn test_default_message_bus_config() {
217 let config = MessageBusConfig::default();
218 assert_eq!(config.encoding, SerializationEncoding::MsgPack);
219 assert!(!config.timestamps_as_iso8601);
220 assert_eq!(config.buffer_interval_ms, None);
221 assert_eq!(config.autotrim_mins, None);
222 assert!(config.use_trader_prefix);
223 assert!(config.use_trader_id);
224 assert!(!config.use_instance_id);
225 assert_eq!(config.streams_prefix, "stream");
226 assert!(config.stream_per_topic);
227 assert_eq!(config.external_streams, None);
228 assert_eq!(config.types_filter, None);
229 }
230
231 #[rstest]
232 fn test_deserialize_message_bus_config() {
233 let config_json = json!({
234 "database": {
235 "type": "redis",
236 "host": "localhost",
237 "port": 6379,
238 "username": "user",
239 "password": "pass",
240 "ssl": true,
241 "connection_timeout": 30,
242 "response_timeout": 10,
243 "number_of_retries": 3,
244 "exponent_base": 2,
245 "max_delay": 10,
246 "factor": 2
247 },
248 "encoding": "json",
249 "timestamps_as_iso8601": true,
250 "buffer_interval_ms": 100,
251 "autotrim_mins": 60,
252 "use_trader_prefix": false,
253 "use_trader_id": false,
254 "use_instance_id": true,
255 "streams_prefix": "data_streams",
256 "stream_per_topic": false,
257 "external_streams": ["stream1", "stream2"],
258 "types_filter": ["type1", "type2"]
259 });
260 let config: MessageBusConfig = serde_json::from_value(config_json).unwrap();
261 assert_eq!(config.encoding, SerializationEncoding::Json);
262 assert!(config.timestamps_as_iso8601);
263 assert_eq!(config.buffer_interval_ms, Some(100));
264 assert_eq!(config.autotrim_mins, Some(60));
265 assert!(!config.use_trader_prefix);
266 assert!(!config.use_trader_id);
267 assert!(config.use_instance_id);
268 assert_eq!(config.streams_prefix, "data_streams");
269 assert!(!config.stream_per_topic);
270 assert_eq!(
271 config.external_streams,
272 Some(vec!["stream1".to_string(), "stream2".to_string()])
273 );
274 assert_eq!(
275 config.types_filter,
276 Some(vec!["type1".to_string(), "type2".to_string()])
277 );
278 }
279}