databento_node_test/
node_test.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    ops::{Deref, DerefMut},
18    path::PathBuf,
19    time::Duration,
20};
21
22use nautilus_common::{
23    actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
24    enums::{Environment, LogColor},
25    log_info,
26    timer::TimeEvent,
27};
28use nautilus_core::env::get_env_var;
29use nautilus_databento::factories::{DatabentoDataClientFactory, DatabentoLiveClientConfig};
30use nautilus_live::node::LiveNode;
31use nautilus_model::{
32    data::{QuoteTick, TradeTick},
33    identifiers::{ClientId, InstrumentId, TraderId},
34};
35
36// Run with `cargo run --bin databento-node-test --features high-precision`
37
38#[tokio::main]
39async fn main() -> Result<(), Box<dyn std::error::Error>> {
40    dotenvy::dotenv().ok();
41
42    let environment = Environment::Live;
43    let trader_id = TraderId::default();
44    let node_name = "DATABENTO-TESTER-001".to_string();
45
46    // Get Databento API key from environment
47    let api_key = get_env_var("DATABENTO_API_KEY").unwrap_or_else(|_| {
48        println!("⚠️  DATABENTO_API_KEY not found, using placeholder");
49        "db-placeholder-key".to_string()
50    });
51
52    // Determine publishers file path
53    let publishers_filepath = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
54    if !publishers_filepath.exists() {
55        println!(
56            "⚠️  Publishers file not found at: {}",
57            publishers_filepath.display()
58        );
59    }
60
61    // Configure Databento client
62    let databento_config = DatabentoLiveClientConfig::new(
63        api_key,
64        publishers_filepath,
65        true, // use_exchange_as_venue
66        true, // bars_timestamp_on_close
67    );
68
69    let client_factory = DatabentoDataClientFactory::new();
70
71    // Create and register a Databento subscriber actor
72    let client_id = ClientId::new("DATABENTO");
73    let instrument_ids = vec![
74        InstrumentId::from("ESZ5.XCME"),
75        // Add more instruments as needed
76    ];
77
78    // Build the live node with Databento data client
79    let mut node = LiveNode::builder(node_name, trader_id, environment)?
80        .with_load_state(false)
81        .with_save_state(false)
82        .add_data_client(None, Box::new(client_factory), Box::new(databento_config))?
83        .build()?;
84
85    let actor_config = DatabentoSubscriberActorConfig::new(client_id, instrument_ids);
86    let actor = DatabentoSubscriberActor::new(actor_config);
87
88    node.add_actor(actor)?;
89
90    node.run().await?;
91
92    Ok(())
93}
94
95/// Configuration for the Databento subscriber actor.
96#[derive(Debug, Clone)]
97pub struct DatabentoSubscriberActorConfig {
98    /// Base data actor configuration.
99    pub base: DataActorConfig,
100    /// Client ID to use for subscriptions.
101    pub client_id: ClientId,
102    /// Instrument IDs to subscribe to.
103    pub instrument_ids: Vec<InstrumentId>,
104}
105
106impl DatabentoSubscriberActorConfig {
107    /// Creates a new [`DatabentoSubscriberActorConfig`] instance.
108    #[must_use]
109    pub fn new(client_id: ClientId, instrument_ids: Vec<InstrumentId>) -> Self {
110        Self {
111            base: DataActorConfig::default(),
112            client_id,
113            instrument_ids,
114        }
115    }
116}
117
118/// A basic Databento subscriber actor that subscribes to quotes and trades.
119///
120/// This actor demonstrates how to use the `DataActor` trait to subscribe to market data
121/// from Databento for specified instruments. It logs received quotes and trades to
122/// demonstrate the data flow.
123#[derive(Debug)]
124pub struct DatabentoSubscriberActor {
125    core: DataActorCore,
126    config: DatabentoSubscriberActorConfig,
127    pub received_quotes: Vec<QuoteTick>,
128    pub received_trades: Vec<TradeTick>,
129}
130
131impl Deref for DatabentoSubscriberActor {
132    type Target = DataActorCore;
133
134    fn deref(&self) -> &Self::Target {
135        &self.core
136    }
137}
138
139impl DerefMut for DatabentoSubscriberActor {
140    fn deref_mut(&mut self) -> &mut Self::Target {
141        &mut self.core
142    }
143}
144
145impl DataActor for DatabentoSubscriberActor {
146    fn on_start(&mut self) -> anyhow::Result<()> {
147        let instrument_ids = self.config.instrument_ids.clone();
148        let client_id = self.config.client_id;
149
150        for instrument_id in instrument_ids {
151            self.subscribe_quotes(instrument_id, Some(client_id), None);
152            self.subscribe_trades(instrument_id, Some(client_id), None);
153        }
154
155        self.clock().set_timer(
156            "TEST-TIMER-1-SECOND",
157            Duration::from_secs(1),
158            None,
159            None,
160            None,
161            Some(true),
162            Some(false),
163        )?;
164
165        self.clock().set_timer(
166            "TEST-TIMER-2-SECOND",
167            Duration::from_secs(2),
168            None,
169            None,
170            None,
171            Some(true),
172            Some(false),
173        )?;
174
175        Ok(())
176    }
177
178    fn on_stop(&mut self) -> anyhow::Result<()> {
179        let instrument_ids = self.config.instrument_ids.clone();
180        let client_id = self.config.client_id;
181
182        for instrument_id in instrument_ids {
183            self.unsubscribe_quotes(instrument_id, Some(client_id), None);
184            self.unsubscribe_trades(instrument_id, Some(client_id), None);
185        }
186
187        Ok(())
188    }
189
190    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
191        log_info!("Received {event:?}", color = LogColor::Blue);
192        Ok(())
193    }
194
195    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
196        log_info!("Received {quote:?}", color = LogColor::Cyan);
197        self.received_quotes.push(*quote);
198        Ok(())
199    }
200
201    fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
202        log_info!("Received {trade:?}", color = LogColor::Cyan);
203        self.received_trades.push(*trade);
204        Ok(())
205    }
206}
207
208impl DatabentoSubscriberActor {
209    /// Creates a new [`DatabentoSubscriberActor`] instance.
210    #[must_use]
211    pub fn new(config: DatabentoSubscriberActorConfig) -> Self {
212        Self {
213            core: DataActorCore::new(config.base.clone()),
214            config,
215            received_quotes: Vec::new(),
216            received_trades: Vec::new(),
217        }
218    }
219
220    /// Returns the number of quotes received by this actor.
221    #[must_use]
222    pub const fn quote_count(&self) -> usize {
223        self.received_quotes.len()
224    }
225
226    /// Returns the number of trades received by this actor.
227    #[must_use]
228    pub const fn trade_count(&self) -> usize {
229        self.received_trades.len()
230    }
231}