databento_node_test/
node_test.rs1use std::{
17 ops::{Deref, DerefMut},
18 path::PathBuf,
19 time::Duration,
20};
21
22use nautilus_common::{
23 actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
24 enums::{Environment, LogColor},
25 log_info,
26 timer::TimeEvent,
27};
28use nautilus_core::env::get_env_var;
29use nautilus_databento::factories::{DatabentoDataClientFactory, DatabentoLiveClientConfig};
30use nautilus_live::node::LiveNode;
31use nautilus_model::{
32 data::{QuoteTick, TradeTick},
33 identifiers::{ClientId, InstrumentId, TraderId},
34};
35
36#[tokio::main]
39async fn main() -> Result<(), Box<dyn std::error::Error>> {
40 dotenvy::dotenv().ok();
41
42 let environment = Environment::Live;
43 let trader_id = TraderId::default();
44 let node_name = "DATABENTO-TESTER-001".to_string();
45
46 let api_key = get_env_var("DATABENTO_API_KEY").unwrap_or_else(|_| {
48 println!("WARNING: DATABENTO_API_KEY not found, using placeholder");
49 "db-placeholder-key".to_string()
50 });
51
52 let publishers_filepath = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
54 if !publishers_filepath.exists() {
55 println!(
56 "WARNING: Publishers file not found at: {}",
57 publishers_filepath.display()
58 );
59 }
60
61 let databento_config = DatabentoLiveClientConfig::new(
63 api_key,
64 publishers_filepath,
65 true, true, );
68
69 let client_factory = DatabentoDataClientFactory::new();
70
71 let client_id = ClientId::new("DATABENTO");
73 let instrument_ids = vec![
74 InstrumentId::from("ESZ5.XCME"),
75 ];
77
78 let mut node = LiveNode::builder(trader_id, environment)?
80 .with_name(node_name)
81 .with_load_state(false)
82 .with_save_state(false)
83 .add_data_client(None, Box::new(client_factory), Box::new(databento_config))?
84 .build()?;
85
86 let actor_config = DatabentoSubscriberActorConfig::new(client_id, instrument_ids);
87 let actor = DatabentoSubscriberActor::new(actor_config);
88
89 node.add_actor(actor)?;
90
91 node.run().await?;
92
93 Ok(())
94}
95
96#[derive(Debug, Clone)]
98pub struct DatabentoSubscriberActorConfig {
99 pub base: DataActorConfig,
101 pub client_id: ClientId,
103 pub instrument_ids: Vec<InstrumentId>,
105}
106
107impl DatabentoSubscriberActorConfig {
108 #[must_use]
110 pub fn new(client_id: ClientId, instrument_ids: Vec<InstrumentId>) -> Self {
111 Self {
112 base: DataActorConfig::default(),
113 client_id,
114 instrument_ids,
115 }
116 }
117}
118
119#[derive(Debug)]
125pub struct DatabentoSubscriberActor {
126 core: DataActorCore,
127 config: DatabentoSubscriberActorConfig,
128 pub received_quotes: Vec<QuoteTick>,
129 pub received_trades: Vec<TradeTick>,
130}
131
132impl Deref for DatabentoSubscriberActor {
133 type Target = DataActorCore;
134
135 fn deref(&self) -> &Self::Target {
136 &self.core
137 }
138}
139
140impl DerefMut for DatabentoSubscriberActor {
141 fn deref_mut(&mut self) -> &mut Self::Target {
142 &mut self.core
143 }
144}
145
146impl DataActor for DatabentoSubscriberActor {
147 fn on_start(&mut self) -> anyhow::Result<()> {
148 let instrument_ids = self.config.instrument_ids.clone();
149 let client_id = self.config.client_id;
150
151 for instrument_id in instrument_ids {
152 self.subscribe_quotes(instrument_id, Some(client_id), None);
153 self.subscribe_trades(instrument_id, Some(client_id), None);
154 }
155
156 self.clock().set_timer(
157 "TEST-TIMER-1-SECOND",
158 Duration::from_secs(1),
159 None,
160 None,
161 None,
162 Some(true),
163 Some(false),
164 )?;
165
166 self.clock().set_timer(
167 "TEST-TIMER-2-SECOND",
168 Duration::from_secs(2),
169 None,
170 None,
171 None,
172 Some(true),
173 Some(false),
174 )?;
175
176 Ok(())
177 }
178
179 fn on_stop(&mut self) -> anyhow::Result<()> {
180 let instrument_ids = self.config.instrument_ids.clone();
181 let client_id = self.config.client_id;
182
183 for instrument_id in instrument_ids {
184 self.unsubscribe_quotes(instrument_id, Some(client_id), None);
185 self.unsubscribe_trades(instrument_id, Some(client_id), None);
186 }
187
188 Ok(())
189 }
190
191 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
192 log_info!("Received {event:?}", color = LogColor::Blue);
193 Ok(())
194 }
195
196 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
197 log_info!("Received {quote:?}", color = LogColor::Cyan);
198 self.received_quotes.push(*quote);
199 Ok(())
200 }
201
202 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
203 log_info!("Received {trade:?}", color = LogColor::Cyan);
204 self.received_trades.push(*trade);
205 Ok(())
206 }
207}
208
209impl DatabentoSubscriberActor {
210 #[must_use]
212 pub fn new(config: DatabentoSubscriberActorConfig) -> Self {
213 Self {
214 core: DataActorCore::new(config.base.clone()),
215 config,
216 received_quotes: Vec::new(),
217 received_trades: Vec::new(),
218 }
219 }
220
221 #[must_use]
223 pub const fn quote_count(&self) -> usize {
224 self.received_quotes.len()
225 }
226
227 #[must_use]
229 pub const fn trade_count(&self) -> usize {
230 self.received_trades.len()
231 }
232}