node_test/
node_test.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    ops::{Deref, DerefMut},
18    sync::Arc,
19    time::Duration,
20};
21
22use nautilus_blockchain::{
23    config::{BlockchainDataClientConfig, DexPoolFilters},
24    factories::BlockchainDataClientFactory,
25};
26use nautilus_common::{
27    actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
28    enums::{Environment, LogColor},
29    logging::log_info,
30    runtime::get_runtime,
31};
32use nautilus_core::env::get_env_var;
33use nautilus_infrastructure::sql::pg::PostgresConnectOptions;
34use nautilus_live::node::LiveNode;
35use nautilus_model::{
36    defi::{Block, Blockchain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, chain::chains},
37    identifiers::{ClientId, InstrumentId, TraderId},
38};
39// Requires capnp installed on the machine
40// Run with `cargo run -p nautilus-blockchain --bin node_test --features hypersync`
41// To see additional tracing logs `export RUST_LOG=debug,h2=off`
42
43// ================================================================================================
44// IMPORTANT: The actor definitions below are EXAMPLE CODE for demonstration purposes.
45// They should NOT be moved to the main library as they are specific to this test scenario.
46// If you need production-ready actors, create them in a separate production module.
47// ================================================================================================
48
49fn main() -> Result<(), Box<dyn std::error::Error>> {
50    dotenvy::dotenv().ok();
51
52    let environment = Environment::Live;
53    let trader_id = TraderId::default();
54    let node_name = "TESTER-001".to_string();
55
56    let chain = chains::ARBITRUM.clone();
57    let wss_rpc_url = get_env_var("RPC_WSS_URL")?;
58    let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
59    // let from_block = Some(22_735_000_u64); // Ethereum
60    // let from_block = Some(348_860_000_u64); // Arbitrum
61    let from_block = None; // No sync
62
63    let dex_pool_filter = DexPoolFilters::new(Some(true));
64
65    let client_factory = BlockchainDataClientFactory::new();
66    let client_config = BlockchainDataClientConfig::new(
67        Arc::new(chain.clone()),
68        vec![DexType::UniswapV3],
69        http_rpc_url,
70        None, // RPC requests per second
71        None, // Multicall calls per RPC request
72        Some(wss_rpc_url),
73        true, // Use HyperSync for live data
74        // Some(from_block), // from_block
75        from_block,
76        Some(dex_pool_filter),
77        Some(PostgresConnectOptions::default()),
78    );
79
80    let mut node = LiveNode::builder(node_name, trader_id, environment)?
81        .with_load_state(false)
82        .with_save_state(false)
83        .add_data_client(
84            None, // Use factory name
85            Box::new(client_factory),
86            Box::new(client_config),
87        )?
88        .build()?;
89
90    // Create and register a blockchain subscriber actor
91    let client_id = ClientId::new(format!("BLOCKCHAIN-{}", chain.name));
92
93    let pools = vec![InstrumentId::from(
94        "0xC31E54c7a869B9FcBEcc14363CF510d1c41fa443.Arbitrum:UniswapV3",
95    )];
96
97    let actor_config = BlockchainSubscriberActorConfig::new(client_id, chain.name, pools);
98    let actor = BlockchainSubscriberActor::new(actor_config);
99
100    node.add_actor(actor)?;
101
102    Ok(get_runtime().block_on(async move { node.run().await })?)
103}
104
105/// Configuration for the blockchain subscriber actor.
106#[derive(Debug, Clone)]
107#[cfg_attr(
108    feature = "python",
109    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.blockchain")
110)]
111pub struct BlockchainSubscriberActorConfig {
112    /// Base data actor configuration.
113    pub base: DataActorConfig,
114    /// Client ID to use for subscriptions.
115    pub client_id: ClientId,
116    /// The blockchain to subscribe for.
117    pub chain: Blockchain,
118    /// Pool instrument IDs to monitor for swaps and liquidity updates.
119    pub pools: Vec<InstrumentId>,
120}
121
122impl BlockchainSubscriberActorConfig {
123    /// Creates a new [`BlockchainSubscriberActorConfig`] instance.
124    #[must_use]
125    pub fn new(client_id: ClientId, chain: Blockchain, pools: Vec<InstrumentId>) -> Self {
126        Self {
127            base: DataActorConfig::default(),
128            client_id,
129            chain,
130            pools,
131        }
132    }
133}
134
135#[cfg(feature = "python")]
136#[pyo3::pymethods]
137impl BlockchainSubscriberActorConfig {
138    /// Creates a new `BlockchainSubscriberActorConfig` instance.
139    #[new]
140    fn py_new(client_id: ClientId, chain: Blockchain, pools: Vec<InstrumentId>) -> Self {
141        Self::new(client_id, chain, pools)
142    }
143
144    /// Returns the client ID.
145    #[getter]
146    const fn client_id(&self) -> ClientId {
147        self.client_id
148    }
149
150    /// Returns the blockchain.
151    #[getter]
152    const fn chain(&self) -> Blockchain {
153        self.chain
154    }
155
156    /// Returns the pool instrument IDs.
157    #[getter]
158    fn pools(&self) -> Vec<InstrumentId> {
159        self.pools.clone()
160    }
161
162    /// Returns a string representation of the configuration.
163    fn __repr__(&self) -> String {
164        format!(
165            "BlockchainSubscriberActorConfig(client_id={}, chain={:?}, pools={:?})",
166            self.client_id, self.chain, self.pools
167        )
168    }
169}
170
171/// A basic blockchain subscriber actor that monitors DeFi activities.
172///
173/// This actor demonstrates how to use the `DataActor` trait to monitor blockchain data
174/// from DEXs, pools, and other DeFi protocols. It logs received blocks and swaps
175/// to demonstrate the data flow.
176#[derive(Debug)]
177#[cfg_attr(
178    feature = "python",
179    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.blockchain", unsendable)
180)]
181pub struct BlockchainSubscriberActor {
182    core: DataActorCore,
183    config: BlockchainSubscriberActorConfig,
184    pub received_blocks: Vec<Block>,
185    pub received_pool_swaps: Vec<PoolSwap>,
186    pub received_pool_liquidity_updates: Vec<PoolLiquidityUpdate>,
187    pub received_pools: Vec<Pool>,
188}
189
190impl Deref for BlockchainSubscriberActor {
191    type Target = DataActorCore;
192
193    fn deref(&self) -> &Self::Target {
194        &self.core
195    }
196}
197
198impl DerefMut for BlockchainSubscriberActor {
199    fn deref_mut(&mut self) -> &mut Self::Target {
200        &mut self.core
201    }
202}
203
204impl DataActor for BlockchainSubscriberActor {
205    fn on_start(&mut self) -> anyhow::Result<()> {
206        let client_id = self.config.client_id;
207
208        self.subscribe_blocks(self.config.chain, Some(client_id), None);
209
210        let pool_instrument_ids = self.config.pools.clone();
211        for instrument_id in pool_instrument_ids {
212            self.subscribe_pool(instrument_id, Some(client_id), None);
213            self.subscribe_pool_swaps(instrument_id, Some(client_id), None);
214            self.subscribe_pool_liquidity_updates(instrument_id, Some(client_id), None);
215        }
216
217        self.clock().set_timer(
218            "TEST-TIMER-1-SECOND",
219            Duration::from_secs(1),
220            None,
221            None,
222            None,
223            Some(true),
224            Some(false),
225        )?;
226
227        self.clock().set_timer(
228            "TEST-TIMER-2-SECOND",
229            Duration::from_secs(2),
230            None,
231            None,
232            None,
233            Some(true),
234            Some(false),
235        )?;
236
237        Ok(())
238    }
239
240    fn on_stop(&mut self) -> anyhow::Result<()> {
241        let client_id = self.config.client_id;
242
243        self.unsubscribe_blocks(self.config.chain, Some(client_id), None);
244
245        let pool_instrument_ids = self.config.pools.clone();
246        for instrument_id in pool_instrument_ids {
247            self.unsubscribe_pool(instrument_id, Some(client_id), None);
248            self.unsubscribe_pool_swaps(instrument_id, Some(client_id), None);
249            self.unsubscribe_pool_liquidity_updates(instrument_id, Some(client_id), None);
250        }
251
252        Ok(())
253    }
254
255    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
256        log_info!("Received {block}", color = LogColor::Cyan);
257
258        self.received_blocks.push(block.clone());
259        Ok(())
260    }
261
262    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
263        log_info!("Received {swap}", color = LogColor::Cyan);
264
265        self.received_pool_swaps.push(swap.clone());
266        Ok(())
267    }
268}
269
270impl BlockchainSubscriberActor {
271    /// Creates a new [`BlockchainSubscriberActor`] instance.
272    #[must_use]
273    pub fn new(config: BlockchainSubscriberActorConfig) -> Self {
274        Self {
275            core: DataActorCore::new(config.base.clone()),
276            config,
277            received_blocks: Vec::new(),
278            received_pool_swaps: Vec::new(),
279            received_pool_liquidity_updates: Vec::new(),
280            received_pools: Vec::new(),
281        }
282    }
283
284    /// Returns the number of blocks received by this actor.
285    #[must_use]
286    pub const fn block_count(&self) -> usize {
287        self.received_blocks.len()
288    }
289
290    /// Returns the number of pools received by this actor.
291    #[must_use]
292    pub const fn pool_count(&self) -> usize {
293        self.received_pools.len()
294    }
295
296    /// Returns the number of swaps received by this actor.
297    #[must_use]
298    pub const fn pool_swap_count(&self) -> usize {
299        self.received_pool_swaps.len()
300    }
301
302    /// Returns the number of liquidity updates received by this actor.
303    #[must_use]
304    pub const fn pool_liquidity_update_count(&self) -> usize {
305        self.received_pool_liquidity_updates.len()
306    }
307}