node_test/
node_test.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    ops::{Deref, DerefMut},
18    sync::Arc,
19    time::Duration,
20};
21
22use nautilus_blockchain::{
23    config::{BlockchainDataClientConfig, DexPoolFilters},
24    factories::BlockchainDataClientFactory,
25};
26use nautilus_common::{
27    actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
28    enums::{Environment, LogColor},
29    log_warn,
30    logging::log_info,
31    runtime::get_runtime,
32};
33use nautilus_core::env::get_env_var;
34use nautilus_infrastructure::sql::pg::PostgresConnectOptions;
35use nautilus_live::node::LiveNode;
36use nautilus_model::{
37    defi::{Block, Blockchain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, chain::chains},
38    identifiers::{ClientId, InstrumentId, TraderId},
39};
40
41// Requires capnp installed on the machine
42// Run with `cargo run -p nautilus-blockchain --bin node_test --features hypersync`
43// To see additional tracing logs `export RUST_LOG=debug,h2=off`
44
45// ================================================================================================
46// IMPORTANT: The actor definitions below are EXAMPLE CODE for demonstration purposes.
47// They should NOT be moved to the main library as they are specific to this test scenario.
48// If you need production-ready actors, create them in a separate production module.
49// ================================================================================================
50
51fn main() -> Result<(), Box<dyn std::error::Error>> {
52    dotenvy::dotenv().ok();
53
54    let environment = Environment::Live;
55    let trader_id = TraderId::default();
56    let node_name = "TESTER-001".to_string();
57
58    let chain = chains::ARBITRUM.clone();
59    let wss_rpc_url = get_env_var("RPC_WSS_URL")?;
60    let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
61
62    let dex_pool_filter = DexPoolFilters::new(Some(true));
63
64    let client_factory = BlockchainDataClientFactory::new();
65    let client_config = BlockchainDataClientConfig::new(
66        Arc::new(chain.clone()),
67        vec![DexType::UniswapV3],
68        http_rpc_url,
69        None, // RPC requests per second
70        None, // Multicall calls per RPC request
71        Some(wss_rpc_url),
72        true, // Use HyperSync for live data
73        None,
74        Some(dex_pool_filter),
75        Some(PostgresConnectOptions::default()),
76    );
77
78    let mut node = LiveNode::builder(node_name, trader_id, environment)?
79        .with_load_state(false)
80        .with_save_state(false)
81        .add_data_client(
82            None, // Use factory name
83            Box::new(client_factory),
84            Box::new(client_config),
85        )?
86        .build()?;
87
88    // Create and register a blockchain subscriber actor
89    let client_id = ClientId::new(format!("BLOCKCHAIN-{}", chain.name));
90
91    let pools = vec![InstrumentId::from(
92        "0x4CEf551255EC96d89feC975446301b5C4e164C59.Arbitrum:UniswapV3",
93    )];
94
95    let actor_config = BlockchainSubscriberActorConfig::new(client_id, chain.name, pools);
96    let actor = BlockchainSubscriberActor::new(actor_config);
97
98    node.add_actor(actor)?;
99
100    Ok(get_runtime().block_on(async move { node.run().await })?)
101}
102
103/// Configuration for the blockchain subscriber actor.
104#[derive(Debug, Clone)]
105#[cfg_attr(
106    feature = "python",
107    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.blockchain")
108)]
109pub struct BlockchainSubscriberActorConfig {
110    /// Base data actor configuration.
111    pub base: DataActorConfig,
112    /// Client ID to use for subscriptions.
113    pub client_id: ClientId,
114    /// The blockchain to subscribe for.
115    pub chain: Blockchain,
116    /// Pool instrument IDs to monitor for swaps and liquidity updates.
117    pub pools: Vec<InstrumentId>,
118}
119
120impl BlockchainSubscriberActorConfig {
121    /// Creates a new [`BlockchainSubscriberActorConfig`] instance.
122    #[must_use]
123    pub fn new(client_id: ClientId, chain: Blockchain, pools: Vec<InstrumentId>) -> Self {
124        Self {
125            base: DataActorConfig::default(),
126            client_id,
127            chain,
128            pools,
129        }
130    }
131}
132
133#[cfg(feature = "python")]
134#[pyo3::pymethods]
135impl BlockchainSubscriberActorConfig {
136    /// Creates a new `BlockchainSubscriberActorConfig` instance.
137    #[new]
138    fn py_new(client_id: ClientId, chain: Blockchain, pools: Vec<InstrumentId>) -> Self {
139        Self::new(client_id, chain, pools)
140    }
141
142    /// Returns a string representation of the configuration.
143    fn __repr__(&self) -> String {
144        format!(
145            "BlockchainSubscriberActorConfig(client_id={}, chain={:?}, pools={:?})",
146            self.client_id, self.chain, self.pools
147        )
148    }
149
150    /// Returns the client ID.
151    #[getter]
152    const fn client_id(&self) -> ClientId {
153        self.client_id
154    }
155
156    /// Returns the blockchain.
157    #[getter]
158    const fn chain(&self) -> Blockchain {
159        self.chain
160    }
161
162    /// Returns the pool instrument IDs.
163    #[getter]
164    fn pools(&self) -> Vec<InstrumentId> {
165        self.pools.clone()
166    }
167}
168
169/// A basic blockchain subscriber actor that monitors DeFi activities.
170///
171/// This actor demonstrates how to use the `DataActor` trait to monitor blockchain data
172/// from DEXs, pools, and other DeFi protocols. It logs received blocks and swaps
173/// to demonstrate the data flow.
174#[derive(Debug)]
175#[cfg_attr(
176    feature = "python",
177    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.blockchain", unsendable)
178)]
179pub struct BlockchainSubscriberActor {
180    core: DataActorCore,
181    config: BlockchainSubscriberActorConfig,
182    pub received_blocks: Vec<Block>,
183    pub received_pool_swaps: Vec<PoolSwap>,
184    pub received_pool_liquidity_updates: Vec<PoolLiquidityUpdate>,
185    pub received_pools: Vec<Pool>,
186}
187
188impl Deref for BlockchainSubscriberActor {
189    type Target = DataActorCore;
190
191    fn deref(&self) -> &Self::Target {
192        &self.core
193    }
194}
195
196impl DerefMut for BlockchainSubscriberActor {
197    fn deref_mut(&mut self) -> &mut Self::Target {
198        &mut self.core
199    }
200}
201
202impl DataActor for BlockchainSubscriberActor {
203    fn on_start(&mut self) -> anyhow::Result<()> {
204        let client_id = self.config.client_id;
205
206        self.subscribe_blocks(self.config.chain, Some(client_id), None);
207
208        let pool_instrument_ids = self.config.pools.clone();
209        for instrument_id in pool_instrument_ids {
210            self.subscribe_pool(instrument_id, Some(client_id), None);
211            self.subscribe_pool_swaps(instrument_id, Some(client_id), None);
212            self.subscribe_pool_liquidity_updates(instrument_id, Some(client_id), None);
213        }
214
215        self.clock().set_timer(
216            "TEST-TIMER-1-SECOND",
217            Duration::from_secs(1),
218            None,
219            None,
220            None,
221            Some(true),
222            Some(false),
223        )?;
224
225        self.clock().set_timer(
226            "TEST-TIMER-2-SECOND",
227            Duration::from_secs(2),
228            None,
229            None,
230            None,
231            Some(true),
232            Some(false),
233        )?;
234
235        Ok(())
236    }
237
238    fn on_stop(&mut self) -> anyhow::Result<()> {
239        let client_id = self.config.client_id;
240
241        self.unsubscribe_blocks(self.config.chain, Some(client_id), None);
242
243        let pool_instrument_ids = self.config.pools.clone();
244        for instrument_id in pool_instrument_ids {
245            self.unsubscribe_pool(instrument_id, Some(client_id), None);
246            self.unsubscribe_pool_swaps(instrument_id, Some(client_id), None);
247            self.unsubscribe_pool_liquidity_updates(instrument_id, Some(client_id), None);
248        }
249
250        Ok(())
251    }
252
253    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
254        log_info!("Received {block}", color = LogColor::Cyan);
255
256        self.received_blocks.push(block.clone());
257
258        {
259            let cache = self.cache();
260
261            for pool_id in &self.config.pools {
262                if let Some(pool_profiler) = cache.pool_profiler(pool_id) {
263                    let total_ticks = pool_profiler.get_active_tick_count();
264                    let total_positions = pool_profiler.get_total_active_positions();
265                    let liquidity = pool_profiler.get_active_liquidity();
266                    let liquidity_utilization_rate = pool_profiler.liquidity_utilization_rate();
267                    log_info!(
268                        "Pool {pool_id} contains {total_ticks} active ticks and {total_positions} active positions with liquidity of {liquidity}",
269                        color = LogColor::Magenta
270                    );
271                    log_info!(
272                        "Pool {pool_id} has a liquidity utilization rate of {:.4}%",
273                        liquidity_utilization_rate * 100.0,
274                        color = LogColor::Magenta
275                    );
276                } else {
277                    log_warn!(
278                        "Pool profiler {} not found",
279                        pool_id,
280                        color = LogColor::Magenta
281                    );
282                }
283            }
284        }
285
286        Ok(())
287    }
288
289    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
290        log_info!("Received {swap}", color = LogColor::Cyan);
291
292        self.received_pool_swaps.push(swap.clone());
293        Ok(())
294    }
295}
296
297impl BlockchainSubscriberActor {
298    /// Creates a new [`BlockchainSubscriberActor`] instance.
299    #[must_use]
300    pub fn new(config: BlockchainSubscriberActorConfig) -> Self {
301        Self {
302            core: DataActorCore::new(config.base.clone()),
303            config,
304            received_blocks: Vec::new(),
305            received_pool_swaps: Vec::new(),
306            received_pool_liquidity_updates: Vec::new(),
307            received_pools: Vec::new(),
308        }
309    }
310
311    /// Returns the number of blocks received by this actor.
312    #[must_use]
313    pub const fn block_count(&self) -> usize {
314        self.received_blocks.len()
315    }
316
317    /// Returns the number of pools received by this actor.
318    #[must_use]
319    pub const fn pool_count(&self) -> usize {
320        self.received_pools.len()
321    }
322
323    /// Returns the number of swaps received by this actor.
324    #[must_use]
325    pub const fn pool_swap_count(&self) -> usize {
326        self.received_pool_swaps.len()
327    }
328
329    /// Returns the number of liquidity updates received by this actor.
330    #[must_use]
331    pub const fn pool_liquidity_update_count(&self) -> usize {
332        self.received_pool_liquidity_updates.len()
333    }
334}