node_test/
node_test.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    ops::{Deref, DerefMut},
18    sync::Arc,
19    time::Duration,
20};
21
22use nautilus_blockchain::{
23    config::{BlockchainDataClientConfig, DexPoolFilters},
24    factories::BlockchainDataClientFactory,
25};
26use nautilus_common::{
27    actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
28    enums::{Environment, LogColor},
29    logging::log_info,
30    runtime::get_runtime,
31};
32use nautilus_core::env::get_env_var;
33use nautilus_infrastructure::sql::pg::PostgresConnectOptions;
34use nautilus_live::node::LiveNode;
35use nautilus_model::{
36    defi::{Block, Blockchain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, chain::chains},
37    identifiers::{ClientId, InstrumentId, TraderId},
38};
39// Requires capnp installed on the machine
40// Run with `cargo run -p nautilus-blockchain --bin node_test --features hypersync`
41// To see additional tracing logs `export RUST_LOG=debug,h2=off`
42
43// ================================================================================================
44// IMPORTANT: The actor definitions below are EXAMPLE CODE for demonstration purposes.
45// They should NOT be moved to the main library as they are specific to this test scenario.
46// If you need production-ready actors, create them in a separate production module.
47// ================================================================================================
48
49fn main() -> Result<(), Box<dyn std::error::Error>> {
50    dotenvy::dotenv().ok();
51
52    let environment = Environment::Live;
53    let trader_id = TraderId::default();
54    let node_name = "TESTER-001".to_string();
55
56    let chain = chains::ARBITRUM.clone();
57    let wss_rpc_url = get_env_var("RPC_WSS_URL")?;
58    let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
59    // let from_block = Some(22_735_000_u64); // Ethereum
60    // let from_block = Some(348_860_000_u64); // Arbitrum
61    let from_block = None; // No sync
62
63    let dex_pool_filter = DexPoolFilters::new(Some(true));
64
65    let client_factory = BlockchainDataClientFactory::new();
66    let client_config = BlockchainDataClientConfig::new(
67        Arc::new(chain.clone()),
68        vec![DexType::UniswapV3],
69        http_rpc_url,
70        None, // RPC requests per second
71        Some(wss_rpc_url),
72        true, // Use HyperSync for live data
73        // Some(from_block), // from_block
74        from_block,
75        Some(dex_pool_filter),
76        Some(PostgresConnectOptions::default()),
77    );
78
79    let mut node = LiveNode::builder(node_name, trader_id, environment)?
80        .with_load_state(false)
81        .with_save_state(false)
82        .add_data_client(
83            None, // Use factory name
84            Box::new(client_factory),
85            Box::new(client_config),
86        )?
87        .build()?;
88
89    // Create and register a blockchain subscriber actor
90    let client_id = ClientId::new(format!("BLOCKCHAIN-{}", chain.name));
91
92    let pools = vec![InstrumentId::from(
93        "0xC31E54c7a869B9FcBEcc14363CF510d1c41fa443.Arbitrum:UniswapV3",
94    )];
95
96    let actor_config = BlockchainSubscriberActorConfig::new(client_id, chain.name, pools);
97    let actor = BlockchainSubscriberActor::new(actor_config);
98
99    node.add_actor(actor)?;
100
101    Ok(get_runtime().block_on(async move { node.run().await })?)
102}
103
104/// Configuration for the blockchain subscriber actor.
105#[derive(Debug, Clone)]
106#[cfg_attr(
107    feature = "python",
108    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.blockchain")
109)]
110pub struct BlockchainSubscriberActorConfig {
111    /// Base data actor configuration.
112    pub base: DataActorConfig,
113    /// Client ID to use for subscriptions.
114    pub client_id: ClientId,
115    /// The blockchain to subscribe for.
116    pub chain: Blockchain,
117    /// Pool instrument IDs to monitor for swaps and liquidity updates.
118    pub pools: Vec<InstrumentId>,
119}
120
121impl BlockchainSubscriberActorConfig {
122    /// Creates a new [`BlockchainSubscriberActorConfig`] instance.
123    #[must_use]
124    pub fn new(client_id: ClientId, chain: Blockchain, pools: Vec<InstrumentId>) -> Self {
125        Self {
126            base: DataActorConfig::default(),
127            client_id,
128            chain,
129            pools,
130        }
131    }
132}
133
134#[cfg(feature = "python")]
135#[pyo3::pymethods]
136impl BlockchainSubscriberActorConfig {
137    /// Creates a new `BlockchainSubscriberActorConfig` instance.
138    #[new]
139    fn py_new(client_id: ClientId, chain: Blockchain, pools: Vec<InstrumentId>) -> Self {
140        Self::new(client_id, chain, pools)
141    }
142
143    /// Returns the client ID.
144    #[getter]
145    const fn client_id(&self) -> ClientId {
146        self.client_id
147    }
148
149    /// Returns the blockchain.
150    #[getter]
151    const fn chain(&self) -> Blockchain {
152        self.chain
153    }
154
155    /// Returns the pool instrument IDs.
156    #[getter]
157    fn pools(&self) -> Vec<InstrumentId> {
158        self.pools.clone()
159    }
160
161    /// Returns a string representation of the configuration.
162    fn __repr__(&self) -> String {
163        format!(
164            "BlockchainSubscriberActorConfig(client_id={}, chain={:?}, pools={:?})",
165            self.client_id, self.chain, self.pools
166        )
167    }
168}
169
170/// A basic blockchain subscriber actor that monitors DeFi activities.
171///
172/// This actor demonstrates how to use the `DataActor` trait to monitor blockchain data
173/// from DEXs, pools, and other DeFi protocols. It logs received blocks and swaps
174/// to demonstrate the data flow.
175#[derive(Debug)]
176#[cfg_attr(
177    feature = "python",
178    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.blockchain", unsendable)
179)]
180pub struct BlockchainSubscriberActor {
181    core: DataActorCore,
182    config: BlockchainSubscriberActorConfig,
183    pub received_blocks: Vec<Block>,
184    pub received_pool_swaps: Vec<PoolSwap>,
185    pub received_pool_liquidity_updates: Vec<PoolLiquidityUpdate>,
186    pub received_pools: Vec<Pool>,
187}
188
189impl Deref for BlockchainSubscriberActor {
190    type Target = DataActorCore;
191
192    fn deref(&self) -> &Self::Target {
193        &self.core
194    }
195}
196
197impl DerefMut for BlockchainSubscriberActor {
198    fn deref_mut(&mut self) -> &mut Self::Target {
199        &mut self.core
200    }
201}
202
203impl DataActor for BlockchainSubscriberActor {
204    fn on_start(&mut self) -> anyhow::Result<()> {
205        let client_id = self.config.client_id;
206
207        self.subscribe_blocks(self.config.chain, Some(client_id), None);
208
209        let pool_instrument_ids = self.config.pools.clone();
210        for instrument_id in pool_instrument_ids {
211            self.subscribe_pool(instrument_id, Some(client_id), None);
212            self.subscribe_pool_swaps(instrument_id, Some(client_id), None);
213            self.subscribe_pool_liquidity_updates(instrument_id, Some(client_id), None);
214        }
215
216        self.clock().set_timer(
217            "TEST-TIMER-1-SECOND",
218            Duration::from_secs(1),
219            None,
220            None,
221            None,
222            Some(true),
223            Some(false),
224        )?;
225
226        self.clock().set_timer(
227            "TEST-TIMER-2-SECOND",
228            Duration::from_secs(2),
229            None,
230            None,
231            None,
232            Some(true),
233            Some(false),
234        )?;
235
236        Ok(())
237    }
238
239    fn on_stop(&mut self) -> anyhow::Result<()> {
240        let client_id = self.config.client_id;
241
242        self.unsubscribe_blocks(self.config.chain, Some(client_id), None);
243
244        let pool_instrument_ids = self.config.pools.clone();
245        for instrument_id in pool_instrument_ids {
246            self.unsubscribe_pool(instrument_id, Some(client_id), None);
247            self.unsubscribe_pool_swaps(instrument_id, Some(client_id), None);
248            self.unsubscribe_pool_liquidity_updates(instrument_id, Some(client_id), None);
249        }
250
251        Ok(())
252    }
253
254    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
255        log_info!("Received {block}", color = LogColor::Cyan);
256
257        self.received_blocks.push(block.clone());
258        Ok(())
259    }
260
261    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
262        log_info!("Received {swap}", color = LogColor::Cyan);
263
264        self.received_pool_swaps.push(swap.clone());
265        Ok(())
266    }
267}
268
269impl BlockchainSubscriberActor {
270    /// Creates a new [`BlockchainSubscriberActor`] instance.
271    #[must_use]
272    pub fn new(config: BlockchainSubscriberActorConfig) -> Self {
273        Self {
274            core: DataActorCore::new(config.base.clone()),
275            config,
276            received_blocks: Vec::new(),
277            received_pool_swaps: Vec::new(),
278            received_pool_liquidity_updates: Vec::new(),
279            received_pools: Vec::new(),
280        }
281    }
282
283    /// Returns the number of blocks received by this actor.
284    #[must_use]
285    pub const fn block_count(&self) -> usize {
286        self.received_blocks.len()
287    }
288
289    /// Returns the number of pools received by this actor.
290    #[must_use]
291    pub const fn pool_count(&self) -> usize {
292        self.received_pools.len()
293    }
294
295    /// Returns the number of swaps received by this actor.
296    #[must_use]
297    pub const fn pool_swap_count(&self) -> usize {
298        self.received_pool_swaps.len()
299    }
300
301    /// Returns the number of liquidity updates received by this actor.
302    #[must_use]
303    pub const fn pool_liquidity_update_count(&self) -> usize {
304        self.received_pool_liquidity_updates.len()
305    }
306}