databento_node_test/
node_test.rs1use std::{
17 ops::{Deref, DerefMut},
18 path::PathBuf,
19 time::Duration,
20};
21
22use nautilus_common::{
23 actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
24 enums::{Environment, LogColor},
25 log_info,
26 timer::TimeEvent,
27};
28use nautilus_core::env::get_env_var;
29use nautilus_databento::factories::{DatabentoDataClientFactory, DatabentoLiveClientConfig};
30use nautilus_live::node::LiveNode;
31use nautilus_model::{
32 data::{QuoteTick, TradeTick},
33 identifiers::{ClientId, InstrumentId, TraderId},
34};
35
36#[tokio::main]
39async fn main() -> Result<(), Box<dyn std::error::Error>> {
40 dotenvy::dotenv().ok();
41
42 let environment = Environment::Live;
43 let trader_id = TraderId::default();
44 let node_name = "DATABENTO-TESTER-001".to_string();
45
46 let api_key = get_env_var("DATABENTO_API_KEY").unwrap_or_else(|_| {
48 println!("⚠️ DATABENTO_API_KEY not found, using placeholder");
49 "db-placeholder-key".to_string()
50 });
51
52 let publishers_filepath = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
54 if !publishers_filepath.exists() {
55 println!(
56 "⚠️ Publishers file not found at: {}",
57 publishers_filepath.display()
58 );
59 }
60
61 let databento_config = DatabentoLiveClientConfig::new(
63 api_key,
64 publishers_filepath,
65 true, true, );
68
69 let client_factory = DatabentoDataClientFactory::new();
70
71 let client_id = ClientId::new("DATABENTO");
73 let instrument_ids = vec![
74 InstrumentId::from("ESZ5.XCME"),
75 ];
77
78 let mut node = LiveNode::builder(node_name, trader_id, environment)?
80 .with_load_state(false)
81 .with_save_state(false)
82 .add_data_client(None, Box::new(client_factory), Box::new(databento_config))?
83 .build()?;
84
85 let actor_config = DatabentoSubscriberActorConfig::new(client_id, instrument_ids);
86 let actor = DatabentoSubscriberActor::new(actor_config);
87
88 node.add_actor(actor)?;
89
90 node.run().await?;
91
92 Ok(())
93}
94
95#[derive(Debug, Clone)]
97pub struct DatabentoSubscriberActorConfig {
98 pub base: DataActorConfig,
100 pub client_id: ClientId,
102 pub instrument_ids: Vec<InstrumentId>,
104}
105
106impl DatabentoSubscriberActorConfig {
107 #[must_use]
109 pub fn new(client_id: ClientId, instrument_ids: Vec<InstrumentId>) -> Self {
110 Self {
111 base: DataActorConfig::default(),
112 client_id,
113 instrument_ids,
114 }
115 }
116}
117
118#[derive(Debug)]
124pub struct DatabentoSubscriberActor {
125 core: DataActorCore,
126 config: DatabentoSubscriberActorConfig,
127 pub received_quotes: Vec<QuoteTick>,
128 pub received_trades: Vec<TradeTick>,
129}
130
131impl Deref for DatabentoSubscriberActor {
132 type Target = DataActorCore;
133
134 fn deref(&self) -> &Self::Target {
135 &self.core
136 }
137}
138
139impl DerefMut for DatabentoSubscriberActor {
140 fn deref_mut(&mut self) -> &mut Self::Target {
141 &mut self.core
142 }
143}
144
145impl DataActor for DatabentoSubscriberActor {
146 fn on_start(&mut self) -> anyhow::Result<()> {
147 let instrument_ids = self.config.instrument_ids.clone();
148 let client_id = self.config.client_id;
149
150 for instrument_id in instrument_ids {
151 self.subscribe_quotes(instrument_id, Some(client_id), None);
152 self.subscribe_trades(instrument_id, Some(client_id), None);
153 }
154
155 self.clock().set_timer(
156 "TEST-TIMER-1-SECOND",
157 Duration::from_secs(1),
158 None,
159 None,
160 None,
161 Some(true),
162 Some(false),
163 )?;
164
165 self.clock().set_timer(
166 "TEST-TIMER-2-SECOND",
167 Duration::from_secs(2),
168 None,
169 None,
170 None,
171 Some(true),
172 Some(false),
173 )?;
174
175 Ok(())
176 }
177
178 fn on_stop(&mut self) -> anyhow::Result<()> {
179 let instrument_ids = self.config.instrument_ids.clone();
180 let client_id = self.config.client_id;
181
182 for instrument_id in instrument_ids {
183 self.unsubscribe_quotes(instrument_id, Some(client_id), None);
184 self.unsubscribe_trades(instrument_id, Some(client_id), None);
185 }
186
187 Ok(())
188 }
189
190 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
191 log_info!("Received {event:?}", color = LogColor::Blue);
192 Ok(())
193 }
194
195 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
196 log_info!("Received {quote:?}", color = LogColor::Cyan);
197 self.received_quotes.push(*quote);
198 Ok(())
199 }
200
201 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
202 log_info!("Received {trade:?}", color = LogColor::Cyan);
203 self.received_trades.push(*trade);
204 Ok(())
205 }
206}
207
208impl DatabentoSubscriberActor {
209 #[must_use]
211 pub fn new(config: DatabentoSubscriberActorConfig) -> Self {
212 Self {
213 core: DataActorCore::new(config.base.clone()),
214 config,
215 received_quotes: Vec::new(),
216 received_trades: Vec::new(),
217 }
218 }
219
220 #[must_use]
222 pub const fn quote_count(&self) -> usize {
223 self.received_quotes.len()
224 }
225
226 #[must_use]
228 pub const fn trade_count(&self) -> usize {
229 self.received_trades.len()
230 }
231}