1use std::{any::Any, cell::RefCell, fmt::Debug, path::PathBuf, rc::Rc};
19
20use nautilus_common::{cache::Cache, clients::DataClient, clock::Clock};
21use nautilus_core::time::{AtomicTime, get_atomic_clock_realtime};
22use nautilus_model::identifiers::ClientId;
23use nautilus_system::factories::{ClientConfig, DataClientFactory};
24
25use crate::{
26 common::Credential,
27 data::{DatabentoDataClient, DatabentoDataClientConfig},
28 historical::DatabentoHistoricalClient,
29};
30
31#[derive(Clone)]
33pub struct DatabentoLiveClientConfig {
34 credential: Credential,
36 pub publishers_filepath: PathBuf,
38 pub use_exchange_as_venue: bool,
40 pub bars_timestamp_on_close: bool,
42}
43
44impl Debug for DatabentoLiveClientConfig {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 f.debug_struct(stringify!(DatabentoLiveClientConfig))
47 .field("credential", &"<redacted>")
48 .field("publishers_filepath", &self.publishers_filepath)
49 .field("use_exchange_as_venue", &self.use_exchange_as_venue)
50 .field("bars_timestamp_on_close", &self.bars_timestamp_on_close)
51 .finish()
52 }
53}
54
55impl DatabentoLiveClientConfig {
56 #[must_use]
58 pub fn new(
59 api_key: impl Into<String>,
60 publishers_filepath: PathBuf,
61 use_exchange_as_venue: bool,
62 bars_timestamp_on_close: bool,
63 ) -> Self {
64 Self {
65 credential: Credential::new(api_key),
66 publishers_filepath,
67 use_exchange_as_venue,
68 bars_timestamp_on_close,
69 }
70 }
71
72 #[must_use]
74 pub fn api_key(&self) -> &str {
75 self.credential.api_key()
76 }
77
78 #[must_use]
80 pub fn api_key_masked(&self) -> String {
81 self.credential.api_key_masked()
82 }
83}
84
85impl ClientConfig for DatabentoLiveClientConfig {
86 fn as_any(&self) -> &dyn Any {
87 self
88 }
89}
90
91#[derive(Debug)]
93pub struct DatabentoDataClientFactory;
94
95impl DatabentoDataClientFactory {
96 #[must_use]
98 pub const fn new() -> Self {
99 Self
100 }
101
102 pub fn create_live_data_client(
108 client_id: ClientId,
109 api_key: impl Into<String>,
110 publishers_filepath: PathBuf,
111 use_exchange_as_venue: bool,
112 bars_timestamp_on_close: bool,
113 clock: &'static AtomicTime,
114 ) -> anyhow::Result<DatabentoDataClient> {
115 let config = DatabentoDataClientConfig::new(
116 api_key,
117 publishers_filepath,
118 use_exchange_as_venue,
119 bars_timestamp_on_close,
120 );
121
122 DatabentoDataClient::new(client_id, config, clock)
123 }
124
125 pub fn create_live_data_client_with_config(
131 client_id: ClientId,
132 config: DatabentoDataClientConfig,
133 clock: &'static AtomicTime,
134 ) -> anyhow::Result<DatabentoDataClient> {
135 DatabentoDataClient::new(client_id, config, clock)
136 }
137}
138
139impl Default for DatabentoDataClientFactory {
140 fn default() -> Self {
141 Self::new()
142 }
143}
144
145impl DataClientFactory for DatabentoDataClientFactory {
146 fn create(
147 &self,
148 name: &str,
149 config: &dyn ClientConfig,
150 _cache: Rc<RefCell<Cache>>,
151 _clock: Rc<RefCell<dyn Clock>>,
152 ) -> anyhow::Result<Box<dyn DataClient>> {
153 let databento_config = config
154 .as_any()
155 .downcast_ref::<DatabentoLiveClientConfig>()
156 .ok_or_else(|| {
157 anyhow::anyhow!(
158 "Invalid config type for DatabentoDataClientFactory. Expected DatabentoLiveClientConfig, was {config:?}"
159 )
160 })?;
161
162 let client_id = ClientId::from(name);
163 let config = DatabentoDataClientConfig::new(
164 databento_config.api_key(),
165 databento_config.publishers_filepath.clone(),
166 databento_config.use_exchange_as_venue,
167 databento_config.bars_timestamp_on_close,
168 );
169
170 let client = DatabentoDataClient::new(client_id, config, get_atomic_clock_realtime())?;
171 Ok(Box::new(client))
172 }
173
174 fn name(&self) -> &'static str {
175 "DATABENTO"
176 }
177
178 fn config_type(&self) -> &'static str {
179 "DatabentoLiveClientConfig"
180 }
181}
182
183#[derive(Debug)]
185pub struct DatabentoHistoricalClientFactory;
186
187impl DatabentoHistoricalClientFactory {
188 pub fn create(
194 api_key: String,
195 publishers_filepath: PathBuf,
196 use_exchange_as_venue: bool,
197 clock: &'static AtomicTime,
198 ) -> anyhow::Result<DatabentoHistoricalClient> {
199 DatabentoHistoricalClient::new(api_key, publishers_filepath, clock, use_exchange_as_venue)
200 }
201}
202
203#[derive(Debug, Default)]
205pub struct DatabentoDataClientConfigBuilder {
206 api_key: Option<String>,
207 dataset: Option<String>,
208 publishers_filepath: Option<PathBuf>,
209 use_exchange_as_venue: bool,
210 bars_timestamp_on_close: bool,
211}
212
213impl DatabentoDataClientConfigBuilder {
214 #[must_use]
216 pub fn new() -> Self {
217 Self::default()
218 }
219
220 #[must_use]
222 pub fn api_key(mut self, api_key: String) -> Self {
223 self.api_key = Some(api_key);
224 self
225 }
226
227 #[must_use]
229 pub fn dataset(mut self, dataset: String) -> Self {
230 self.dataset = Some(dataset);
231 self
232 }
233
234 #[must_use]
236 pub fn publishers_filepath(mut self, filepath: PathBuf) -> Self {
237 self.publishers_filepath = Some(filepath);
238 self
239 }
240
241 #[must_use]
243 pub const fn use_exchange_as_venue(mut self, use_exchange: bool) -> Self {
244 self.use_exchange_as_venue = use_exchange;
245 self
246 }
247
248 #[must_use]
250 pub const fn bars_timestamp_on_close(mut self, timestamp_on_close: bool) -> Self {
251 self.bars_timestamp_on_close = timestamp_on_close;
252 self
253 }
254
255 pub fn build(self) -> anyhow::Result<DatabentoDataClientConfig> {
261 let api_key = self
262 .api_key
263 .ok_or_else(|| anyhow::anyhow!("API key is required"))?;
264 let publishers_filepath = self
265 .publishers_filepath
266 .ok_or_else(|| anyhow::anyhow!("Publishers filepath is required"))?;
267
268 Ok(DatabentoDataClientConfig::new(
269 api_key,
270 publishers_filepath,
271 self.use_exchange_as_venue,
272 self.bars_timestamp_on_close,
273 ))
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use std::env;
280
281 use nautilus_core::time::get_atomic_clock_realtime;
282 use rstest::rstest;
283
284 use super::*;
285
286 #[rstest]
287 fn test_config_builder() {
288 let config = DatabentoDataClientConfigBuilder::new()
289 .api_key("test_key".to_string())
290 .dataset("GLBX.MDP3".to_string())
291 .publishers_filepath(PathBuf::from("test_publishers.json"))
292 .use_exchange_as_venue(true)
293 .bars_timestamp_on_close(false)
294 .build();
295
296 assert!(config.is_ok());
297 let config = config.unwrap();
298 assert_eq!(config.api_key(), "test_key");
299 assert!(config.use_exchange_as_venue);
300 assert!(!config.bars_timestamp_on_close);
301 }
302
303 #[rstest]
304 fn test_config_builder_missing_required_fields() {
305 let config = DatabentoDataClientConfigBuilder::new()
306 .api_key("test_key".to_string())
307 .build();
309
310 assert!(config.is_err());
311 }
312
313 #[rstest]
314 fn test_historical_client_factory() {
315 let api_key = env::var("DATABENTO_API_KEY").unwrap_or_else(|_| "test_key".to_string());
316 let publishers_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
317 let clock = get_atomic_clock_realtime();
318
319 let result =
321 DatabentoHistoricalClientFactory::create(api_key, publishers_path, false, clock);
322
323 assert!(result.is_err() || result.is_ok());
326 }
327
328 #[rstest]
329 fn test_live_data_client_factory() {
330 let client_id = ClientId::from("DATABENTO-001");
331 let api_key = "test_key".to_string();
332 let publishers_path = PathBuf::from("test_publishers.json");
333 let clock = get_atomic_clock_realtime();
334
335 let result = DatabentoDataClientFactory::create_live_data_client(
337 client_id,
338 api_key,
339 publishers_path,
340 false,
341 true,
342 clock,
343 );
344
345 assert!(result.is_err() || result.is_ok());
348 }
349}