databento_node_test/
node_test.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    ops::{Deref, DerefMut},
18    path::PathBuf,
19    time::Duration,
20};
21
22use nautilus_common::{
23    actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
24    enums::{Environment, LogColor},
25    log_info,
26    timer::TimeEvent,
27};
28use nautilus_core::env::get_env_var;
29use nautilus_databento::factories::{DatabentoDataClientFactory, DatabentoLiveClientConfig};
30use nautilus_live::node::LiveNode;
31use nautilus_model::{
32    data::{QuoteTick, TradeTick},
33    identifiers::{ClientId, InstrumentId, TraderId},
34};
35
36// Run with `cargo run --bin databento-node-test --features high-precision`
37
38#[tokio::main]
39async fn main() -> Result<(), Box<dyn std::error::Error>> {
40    dotenvy::dotenv().ok();
41
42    let environment = Environment::Live;
43    let trader_id = TraderId::default();
44    let node_name = "DATABENTO-TESTER-001".to_string();
45
46    // Get Databento API key from environment
47    let api_key = get_env_var("DATABENTO_API_KEY").unwrap_or_else(|_| {
48        println!("WARNING: DATABENTO_API_KEY not found, using placeholder");
49        "db-placeholder-key".to_string()
50    });
51
52    // Determine publishers file path
53    let publishers_filepath = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
54    if !publishers_filepath.exists() {
55        println!(
56            "WARNING: Publishers file not found at: {}",
57            publishers_filepath.display()
58        );
59    }
60
61    // Configure Databento client
62    let databento_config = DatabentoLiveClientConfig::new(
63        api_key,
64        publishers_filepath,
65        true, // use_exchange_as_venue
66        true, // bars_timestamp_on_close
67    );
68
69    let client_factory = DatabentoDataClientFactory::new();
70
71    // Create and register a Databento subscriber actor
72    let client_id = ClientId::new("DATABENTO");
73    let instrument_ids = vec![
74        InstrumentId::from("ESZ5.XCME"),
75        // Add more instruments as needed
76    ];
77
78    // Build the live node with Databento data client
79    let mut node = LiveNode::builder(trader_id, environment)?
80        .with_name(node_name)
81        .with_load_state(false)
82        .with_save_state(false)
83        .add_data_client(None, Box::new(client_factory), Box::new(databento_config))?
84        .build()?;
85
86    let actor_config = DatabentoSubscriberActorConfig::new(client_id, instrument_ids);
87    let actor = DatabentoSubscriberActor::new(actor_config);
88
89    node.add_actor(actor)?;
90
91    node.run().await?;
92
93    Ok(())
94}
95
96/// Configuration for the Databento subscriber actor.
97#[derive(Debug, Clone)]
98pub struct DatabentoSubscriberActorConfig {
99    /// Base data actor configuration.
100    pub base: DataActorConfig,
101    /// Client ID to use for subscriptions.
102    pub client_id: ClientId,
103    /// Instrument IDs to subscribe to.
104    pub instrument_ids: Vec<InstrumentId>,
105}
106
107impl DatabentoSubscriberActorConfig {
108    /// Creates a new [`DatabentoSubscriberActorConfig`] instance.
109    #[must_use]
110    pub fn new(client_id: ClientId, instrument_ids: Vec<InstrumentId>) -> Self {
111        Self {
112            base: DataActorConfig::default(),
113            client_id,
114            instrument_ids,
115        }
116    }
117}
118
119/// A basic Databento subscriber actor that subscribes to quotes and trades.
120///
121/// This actor demonstrates how to use the `DataActor` trait to subscribe to market data
122/// from Databento for specified instruments. It logs received quotes and trades to
123/// demonstrate the data flow.
124#[derive(Debug)]
125pub struct DatabentoSubscriberActor {
126    core: DataActorCore,
127    config: DatabentoSubscriberActorConfig,
128    pub received_quotes: Vec<QuoteTick>,
129    pub received_trades: Vec<TradeTick>,
130}
131
132impl Deref for DatabentoSubscriberActor {
133    type Target = DataActorCore;
134
135    fn deref(&self) -> &Self::Target {
136        &self.core
137    }
138}
139
140impl DerefMut for DatabentoSubscriberActor {
141    fn deref_mut(&mut self) -> &mut Self::Target {
142        &mut self.core
143    }
144}
145
146impl DataActor for DatabentoSubscriberActor {
147    fn on_start(&mut self) -> anyhow::Result<()> {
148        let instrument_ids = self.config.instrument_ids.clone();
149        let client_id = self.config.client_id;
150
151        for instrument_id in instrument_ids {
152            self.subscribe_quotes(instrument_id, Some(client_id), None);
153            self.subscribe_trades(instrument_id, Some(client_id), None);
154        }
155
156        self.clock().set_timer(
157            "TEST-TIMER-1-SECOND",
158            Duration::from_secs(1),
159            None,
160            None,
161            None,
162            Some(true),
163            Some(false),
164        )?;
165
166        self.clock().set_timer(
167            "TEST-TIMER-2-SECOND",
168            Duration::from_secs(2),
169            None,
170            None,
171            None,
172            Some(true),
173            Some(false),
174        )?;
175
176        Ok(())
177    }
178
179    fn on_stop(&mut self) -> anyhow::Result<()> {
180        let instrument_ids = self.config.instrument_ids.clone();
181        let client_id = self.config.client_id;
182
183        for instrument_id in instrument_ids {
184            self.unsubscribe_quotes(instrument_id, Some(client_id), None);
185            self.unsubscribe_trades(instrument_id, Some(client_id), None);
186        }
187
188        Ok(())
189    }
190
191    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
192        log_info!("Received {event:?}", color = LogColor::Blue);
193        Ok(())
194    }
195
196    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
197        log_info!("Received {quote:?}", color = LogColor::Cyan);
198        self.received_quotes.push(*quote);
199        Ok(())
200    }
201
202    fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
203        log_info!("Received {trade:?}", color = LogColor::Cyan);
204        self.received_trades.push(*trade);
205        Ok(())
206    }
207}
208
209impl DatabentoSubscriberActor {
210    /// Creates a new [`DatabentoSubscriberActor`] instance.
211    #[must_use]
212    pub fn new(config: DatabentoSubscriberActorConfig) -> Self {
213        Self {
214            core: DataActorCore::new(config.base.clone()),
215            config,
216            received_quotes: Vec::new(),
217            received_trades: Vec::new(),
218        }
219    }
220
221    /// Returns the number of quotes received by this actor.
222    #[must_use]
223    pub const fn quote_count(&self) -> usize {
224        self.received_quotes.len()
225    }
226
227    /// Returns the number of trades received by this actor.
228    #[must_use]
229    pub const fn trade_count(&self) -> usize {
230        self.received_trades.len()
231    }
232}