1use std::{any::Any, cell::RefCell, fmt::Debug, path::PathBuf, rc::Rc};
19
20use nautilus_common::{cache::Cache, clock::Clock};
21use nautilus_core::time::{AtomicTime, get_atomic_clock_realtime};
22use nautilus_data::client::DataClient;
23use nautilus_model::identifiers::ClientId;
24use nautilus_system::factories::{ClientConfig, DataClientFactory};
25
26use crate::{
27 common::Credential,
28 data::{DatabentoDataClient, DatabentoDataClientConfig},
29 historical::DatabentoHistoricalClient,
30};
31
32#[derive(Clone)]
34pub struct DatabentoLiveClientConfig {
35 credential: Credential,
37 pub publishers_filepath: PathBuf,
39 pub use_exchange_as_venue: bool,
41 pub bars_timestamp_on_close: bool,
43}
44
45impl Debug for DatabentoLiveClientConfig {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("DatabentoLiveClientConfig")
48 .field("credential", &"<redacted>")
49 .field("publishers_filepath", &self.publishers_filepath)
50 .field("use_exchange_as_venue", &self.use_exchange_as_venue)
51 .field("bars_timestamp_on_close", &self.bars_timestamp_on_close)
52 .finish()
53 }
54}
55
56impl DatabentoLiveClientConfig {
57 #[must_use]
59 pub fn new(
60 api_key: impl Into<String>,
61 publishers_filepath: PathBuf,
62 use_exchange_as_venue: bool,
63 bars_timestamp_on_close: bool,
64 ) -> Self {
65 Self {
66 credential: Credential::new(api_key),
67 publishers_filepath,
68 use_exchange_as_venue,
69 bars_timestamp_on_close,
70 }
71 }
72
73 #[must_use]
75 pub fn api_key(&self) -> &str {
76 self.credential.api_key()
77 }
78
79 #[must_use]
81 pub fn api_key_masked(&self) -> String {
82 self.credential.api_key_masked()
83 }
84}
85
86impl ClientConfig for DatabentoLiveClientConfig {
87 fn as_any(&self) -> &dyn Any {
88 self
89 }
90}
91
92#[derive(Debug)]
94pub struct DatabentoDataClientFactory;
95
96impl DatabentoDataClientFactory {
97 #[must_use]
99 pub const fn new() -> Self {
100 Self
101 }
102
103 pub fn create_live_data_client(
109 client_id: ClientId,
110 api_key: impl Into<String>,
111 publishers_filepath: PathBuf,
112 use_exchange_as_venue: bool,
113 bars_timestamp_on_close: bool,
114 clock: &'static AtomicTime,
115 ) -> anyhow::Result<DatabentoDataClient> {
116 let config = DatabentoDataClientConfig::new(
117 api_key,
118 publishers_filepath,
119 use_exchange_as_venue,
120 bars_timestamp_on_close,
121 );
122
123 DatabentoDataClient::new(client_id, config, clock)
124 }
125
126 pub fn create_live_data_client_with_config(
132 client_id: ClientId,
133 config: DatabentoDataClientConfig,
134 clock: &'static AtomicTime,
135 ) -> anyhow::Result<DatabentoDataClient> {
136 DatabentoDataClient::new(client_id, config, clock)
137 }
138}
139
140impl Default for DatabentoDataClientFactory {
141 fn default() -> Self {
142 Self::new()
143 }
144}
145
146impl DataClientFactory for DatabentoDataClientFactory {
147 fn create(
148 &self,
149 name: &str,
150 config: &dyn ClientConfig,
151 _cache: Rc<RefCell<Cache>>,
152 _clock: Rc<RefCell<dyn Clock>>,
153 ) -> anyhow::Result<Box<dyn DataClient>> {
154 let databento_config = config
155 .as_any()
156 .downcast_ref::<DatabentoLiveClientConfig>()
157 .ok_or_else(|| {
158 anyhow::anyhow!(
159 "Invalid config type for DatabentoDataClientFactory. Expected DatabentoLiveClientConfig, was {:?}",
160 config
161 )
162 })?;
163
164 let client_id = ClientId::from(name);
165 let config = DatabentoDataClientConfig::new(
166 databento_config.api_key(),
167 databento_config.publishers_filepath.clone(),
168 databento_config.use_exchange_as_venue,
169 databento_config.bars_timestamp_on_close,
170 );
171
172 let client = DatabentoDataClient::new(client_id, config, get_atomic_clock_realtime())?;
173 Ok(Box::new(client))
174 }
175
176 fn name(&self) -> &'static str {
177 "DATABENTO"
178 }
179
180 fn config_type(&self) -> &'static str {
181 "DatabentoLiveClientConfig"
182 }
183}
184
185#[derive(Debug)]
187pub struct DatabentoHistoricalClientFactory;
188
189impl DatabentoHistoricalClientFactory {
190 pub fn create(
196 api_key: String,
197 publishers_filepath: PathBuf,
198 use_exchange_as_venue: bool,
199 clock: &'static AtomicTime,
200 ) -> anyhow::Result<DatabentoHistoricalClient> {
201 DatabentoHistoricalClient::new(api_key, publishers_filepath, clock, use_exchange_as_venue)
202 }
203}
204
205#[derive(Debug, Default)]
207pub struct DatabentoDataClientConfigBuilder {
208 api_key: Option<String>,
209 dataset: Option<String>,
210 publishers_filepath: Option<PathBuf>,
211 use_exchange_as_venue: bool,
212 bars_timestamp_on_close: bool,
213}
214
215impl DatabentoDataClientConfigBuilder {
216 #[must_use]
218 pub fn new() -> Self {
219 Self::default()
220 }
221
222 #[must_use]
224 pub fn api_key(mut self, api_key: String) -> Self {
225 self.api_key = Some(api_key);
226 self
227 }
228
229 #[must_use]
231 pub fn dataset(mut self, dataset: String) -> Self {
232 self.dataset = Some(dataset);
233 self
234 }
235
236 #[must_use]
238 pub fn publishers_filepath(mut self, filepath: PathBuf) -> Self {
239 self.publishers_filepath = Some(filepath);
240 self
241 }
242
243 #[must_use]
245 pub const fn use_exchange_as_venue(mut self, use_exchange: bool) -> Self {
246 self.use_exchange_as_venue = use_exchange;
247 self
248 }
249
250 #[must_use]
252 pub const fn bars_timestamp_on_close(mut self, timestamp_on_close: bool) -> Self {
253 self.bars_timestamp_on_close = timestamp_on_close;
254 self
255 }
256
257 pub fn build(self) -> anyhow::Result<DatabentoDataClientConfig> {
263 let api_key = self
264 .api_key
265 .ok_or_else(|| anyhow::anyhow!("API key is required"))?;
266 let publishers_filepath = self
267 .publishers_filepath
268 .ok_or_else(|| anyhow::anyhow!("Publishers filepath is required"))?;
269
270 Ok(DatabentoDataClientConfig::new(
271 api_key,
272 publishers_filepath,
273 self.use_exchange_as_venue,
274 self.bars_timestamp_on_close,
275 ))
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use std::env;
282
283 use nautilus_core::time::get_atomic_clock_realtime;
284 use rstest::rstest;
285
286 use super::*;
287
288 #[rstest]
289 fn test_config_builder() {
290 let config = DatabentoDataClientConfigBuilder::new()
291 .api_key("test_key".to_string())
292 .dataset("GLBX.MDP3".to_string())
293 .publishers_filepath(PathBuf::from("test_publishers.json"))
294 .use_exchange_as_venue(true)
295 .bars_timestamp_on_close(false)
296 .build();
297
298 assert!(config.is_ok());
299 let config = config.unwrap();
300 assert_eq!(config.api_key(), "test_key");
301 assert!(config.use_exchange_as_venue);
302 assert!(!config.bars_timestamp_on_close);
303 }
304
305 #[rstest]
306 fn test_config_builder_missing_required_fields() {
307 let config = DatabentoDataClientConfigBuilder::new()
308 .api_key("test_key".to_string())
309 .build();
311
312 assert!(config.is_err());
313 }
314
315 #[rstest]
316 fn test_historical_client_factory() {
317 let api_key = env::var("DATABENTO_API_KEY").unwrap_or_else(|_| "test_key".to_string());
318 let publishers_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
319 let clock = get_atomic_clock_realtime();
320
321 let result =
323 DatabentoHistoricalClientFactory::create(api_key, publishers_path, false, clock);
324
325 assert!(result.is_err() || result.is_ok());
328 }
329
330 #[rstest]
331 fn test_live_data_client_factory() {
332 let client_id = ClientId::from("DATABENTO-001");
333 let api_key = "test_key".to_string();
334 let publishers_path = PathBuf::from("test_publishers.json");
335 let clock = get_atomic_clock_realtime();
336
337 let result = DatabentoDataClientFactory::create_live_data_client(
339 client_id,
340 api_key,
341 publishers_path,
342 false,
343 true,
344 clock,
345 );
346
347 assert!(result.is_err() || result.is_ok());
350 }
351}