node_test/
node_test.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    ops::{Deref, DerefMut},
18    sync::Arc,
19    time::Duration,
20};
21
22use nautilus_blockchain::{
23    config::{BlockchainDataClientConfig, DexPoolFilters},
24    factories::BlockchainDataClientFactory,
25};
26use nautilus_common::{
27    actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
28    enums::{Environment, LogColor},
29    logging::log_info,
30    runtime::get_runtime,
31};
32use nautilus_core::env::get_env_var;
33use nautilus_infrastructure::sql::pg::PostgresConnectOptions;
34use nautilus_live::node::LiveNode;
35use nautilus_model::{
36    defi::{Block, Blockchain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, chain::chains},
37    identifiers::{ClientId, InstrumentId, TraderId},
38};
39
40// Requires capnp installed on the machine
41// Run with `cargo run -p nautilus-blockchain --bin node_test --features hypersync`
42// To see additional tracing logs `export RUST_LOG=debug,h2=off`
43
44// ================================================================================================
45// IMPORTANT: The actor definitions below are EXAMPLE CODE for demonstration purposes.
46// They should NOT be moved to the main library as they are specific to this test scenario.
47// If you need production-ready actors, create them in a separate production module.
48// ================================================================================================
49
50fn main() -> Result<(), Box<dyn std::error::Error>> {
51    dotenvy::dotenv().ok();
52
53    let environment = Environment::Live;
54    let trader_id = TraderId::default();
55    let node_name = "TESTER-001".to_string();
56
57    let chain = chains::ARBITRUM.clone();
58    let wss_rpc_url = get_env_var("RPC_WSS_URL")?;
59    let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
60    // let from_block = Some(22_735_000_u64); // Ethereum
61    let from_block = Some(350_000_000_u64); // Arbitrum
62    // let from_block = None; // No sync
63
64    let dex_pool_filter = DexPoolFilters::new(Some(true));
65
66    let client_factory = BlockchainDataClientFactory::new();
67    let client_config = BlockchainDataClientConfig::new(
68        Arc::new(chain.clone()),
69        vec![DexType::UniswapV3],
70        http_rpc_url,
71        None, // RPC requests per second
72        None, // Multicall calls per RPC request
73        Some(wss_rpc_url),
74        true, // Use HyperSync for live data
75        // Some(from_block), // from_block
76        from_block,
77        Some(dex_pool_filter),
78        Some(PostgresConnectOptions::default()),
79    );
80
81    let mut node = LiveNode::builder(node_name, trader_id, environment)?
82        .with_load_state(false)
83        .with_save_state(false)
84        .add_data_client(
85            None, // Use factory name
86            Box::new(client_factory),
87            Box::new(client_config),
88        )?
89        .build()?;
90
91    // Create and register a blockchain subscriber actor
92    let client_id = ClientId::new(format!("BLOCKCHAIN-{}", chain.name));
93
94    let pools = vec![InstrumentId::from(
95        "0xC31E54c7a869B9FcBEcc14363CF510d1c41fa443.Arbitrum:UniswapV3",
96    )];
97
98    let actor_config = BlockchainSubscriberActorConfig::new(client_id, chain.name, pools);
99    let actor = BlockchainSubscriberActor::new(actor_config);
100
101    node.add_actor(actor)?;
102
103    Ok(get_runtime().block_on(async move { node.run().await })?)
104}
105
106/// Configuration for the blockchain subscriber actor.
107#[derive(Debug, Clone)]
108#[cfg_attr(
109    feature = "python",
110    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.blockchain")
111)]
112pub struct BlockchainSubscriberActorConfig {
113    /// Base data actor configuration.
114    pub base: DataActorConfig,
115    /// Client ID to use for subscriptions.
116    pub client_id: ClientId,
117    /// The blockchain to subscribe for.
118    pub chain: Blockchain,
119    /// Pool instrument IDs to monitor for swaps and liquidity updates.
120    pub pools: Vec<InstrumentId>,
121}
122
123impl BlockchainSubscriberActorConfig {
124    /// Creates a new [`BlockchainSubscriberActorConfig`] instance.
125    #[must_use]
126    pub fn new(client_id: ClientId, chain: Blockchain, pools: Vec<InstrumentId>) -> Self {
127        Self {
128            base: DataActorConfig::default(),
129            client_id,
130            chain,
131            pools,
132        }
133    }
134}
135
136#[cfg(feature = "python")]
137#[pyo3::pymethods]
138impl BlockchainSubscriberActorConfig {
139    /// Creates a new `BlockchainSubscriberActorConfig` instance.
140    #[new]
141    fn py_new(client_id: ClientId, chain: Blockchain, pools: Vec<InstrumentId>) -> Self {
142        Self::new(client_id, chain, pools)
143    }
144
145    /// Returns a string representation of the configuration.
146    fn __repr__(&self) -> String {
147        format!(
148            "BlockchainSubscriberActorConfig(client_id={}, chain={:?}, pools={:?})",
149            self.client_id, self.chain, self.pools
150        )
151    }
152
153    /// Returns the client ID.
154    #[getter]
155    const fn client_id(&self) -> ClientId {
156        self.client_id
157    }
158
159    /// Returns the blockchain.
160    #[getter]
161    const fn chain(&self) -> Blockchain {
162        self.chain
163    }
164
165    /// Returns the pool instrument IDs.
166    #[getter]
167    fn pools(&self) -> Vec<InstrumentId> {
168        self.pools.clone()
169    }
170}
171
172/// A basic blockchain subscriber actor that monitors DeFi activities.
173///
174/// This actor demonstrates how to use the `DataActor` trait to monitor blockchain data
175/// from DEXs, pools, and other DeFi protocols. It logs received blocks and swaps
176/// to demonstrate the data flow.
177#[derive(Debug)]
178#[cfg_attr(
179    feature = "python",
180    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.blockchain", unsendable)
181)]
182pub struct BlockchainSubscriberActor {
183    core: DataActorCore,
184    config: BlockchainSubscriberActorConfig,
185    pub received_blocks: Vec<Block>,
186    pub received_pool_swaps: Vec<PoolSwap>,
187    pub received_pool_liquidity_updates: Vec<PoolLiquidityUpdate>,
188    pub received_pools: Vec<Pool>,
189}
190
191impl Deref for BlockchainSubscriberActor {
192    type Target = DataActorCore;
193
194    fn deref(&self) -> &Self::Target {
195        &self.core
196    }
197}
198
199impl DerefMut for BlockchainSubscriberActor {
200    fn deref_mut(&mut self) -> &mut Self::Target {
201        &mut self.core
202    }
203}
204
205impl DataActor for BlockchainSubscriberActor {
206    fn on_start(&mut self) -> anyhow::Result<()> {
207        let client_id = self.config.client_id;
208
209        self.subscribe_blocks(self.config.chain, Some(client_id), None);
210
211        let pool_instrument_ids = self.config.pools.clone();
212        for instrument_id in pool_instrument_ids {
213            self.subscribe_pool(instrument_id, Some(client_id), None);
214            self.subscribe_pool_swaps(instrument_id, Some(client_id), None);
215            self.subscribe_pool_liquidity_updates(instrument_id, Some(client_id), None);
216        }
217
218        self.clock().set_timer(
219            "TEST-TIMER-1-SECOND",
220            Duration::from_secs(1),
221            None,
222            None,
223            None,
224            Some(true),
225            Some(false),
226        )?;
227
228        self.clock().set_timer(
229            "TEST-TIMER-2-SECOND",
230            Duration::from_secs(2),
231            None,
232            None,
233            None,
234            Some(true),
235            Some(false),
236        )?;
237
238        Ok(())
239    }
240
241    fn on_stop(&mut self) -> anyhow::Result<()> {
242        let client_id = self.config.client_id;
243
244        self.unsubscribe_blocks(self.config.chain, Some(client_id), None);
245
246        let pool_instrument_ids = self.config.pools.clone();
247        for instrument_id in pool_instrument_ids {
248            self.unsubscribe_pool(instrument_id, Some(client_id), None);
249            self.unsubscribe_pool_swaps(instrument_id, Some(client_id), None);
250            self.unsubscribe_pool_liquidity_updates(instrument_id, Some(client_id), None);
251        }
252
253        Ok(())
254    }
255
256    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
257        log_info!("Received {block}", color = LogColor::Cyan);
258
259        self.received_blocks.push(block.clone());
260
261        {
262            let cache = self.cache();
263
264            for pool_id in &self.config.pools {
265                let pool_profiler = cache.pool_profiler(pool_id);
266                log_info!("{pool_profiler:?}", color = LogColor::Magenta);
267            }
268        }
269
270        Ok(())
271    }
272
273    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
274        log_info!("Received {swap}", color = LogColor::Cyan);
275
276        self.received_pool_swaps.push(swap.clone());
277        Ok(())
278    }
279}
280
281impl BlockchainSubscriberActor {
282    /// Creates a new [`BlockchainSubscriberActor`] instance.
283    #[must_use]
284    pub fn new(config: BlockchainSubscriberActorConfig) -> Self {
285        Self {
286            core: DataActorCore::new(config.base.clone()),
287            config,
288            received_blocks: Vec::new(),
289            received_pool_swaps: Vec::new(),
290            received_pool_liquidity_updates: Vec::new(),
291            received_pools: Vec::new(),
292        }
293    }
294
295    /// Returns the number of blocks received by this actor.
296    #[must_use]
297    pub const fn block_count(&self) -> usize {
298        self.received_blocks.len()
299    }
300
301    /// Returns the number of pools received by this actor.
302    #[must_use]
303    pub const fn pool_count(&self) -> usize {
304        self.received_pools.len()
305    }
306
307    /// Returns the number of swaps received by this actor.
308    #[must_use]
309    pub const fn pool_swap_count(&self) -> usize {
310        self.received_pool_swaps.len()
311    }
312
313    /// Returns the number of liquidity updates received by this actor.
314    #[must_use]
315    pub const fn pool_liquidity_update_count(&self) -> usize {
316        self.received_pool_liquidity_updates.len()
317    }
318}