1use std::{
17 ops::{Deref, DerefMut},
18 sync::Arc,
19 time::Duration,
20};
21
22use nautilus_blockchain::{
23 config::{BlockchainDataClientConfig, DexPoolFilters},
24 factories::BlockchainDataClientFactory,
25};
26use nautilus_common::{
27 actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
28 enums::{Environment, LogColor},
29 log_warn,
30 logging::log_info,
31 runtime::get_runtime,
32};
33use nautilus_core::env::get_env_var;
34use nautilus_infrastructure::sql::pg::PostgresConnectOptions;
35use nautilus_live::node::LiveNode;
36use nautilus_model::{
37 defi::{Block, Blockchain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, chain::chains},
38 identifiers::{ClientId, InstrumentId, TraderId},
39};
40
41fn main() -> Result<(), Box<dyn std::error::Error>> {
52 dotenvy::dotenv().ok();
53
54 let environment = Environment::Live;
55 let trader_id = TraderId::default();
56 let node_name = "TESTER-001".to_string();
57
58 let chain = chains::ARBITRUM.clone();
59 let wss_rpc_url = get_env_var("RPC_WSS_URL")?;
60 let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
61
62 let dex_pool_filter = DexPoolFilters::new(Some(true));
63
64 let client_factory = BlockchainDataClientFactory::new();
65 let client_config = BlockchainDataClientConfig::new(
66 Arc::new(chain.clone()),
67 vec![DexType::UniswapV3],
68 http_rpc_url,
69 None, None, Some(wss_rpc_url),
72 true, None,
74 Some(dex_pool_filter),
75 Some(PostgresConnectOptions::default()),
76 );
77
78 let mut node = LiveNode::builder(node_name, trader_id, environment)?
79 .with_load_state(false)
80 .with_save_state(false)
81 .add_data_client(
82 None, Box::new(client_factory),
84 Box::new(client_config),
85 )?
86 .build()?;
87
88 let client_id = ClientId::new(format!("BLOCKCHAIN-{}", chain.name));
90
91 let pools = vec![InstrumentId::from(
92 "0x4CEf551255EC96d89feC975446301b5C4e164C59.Arbitrum:UniswapV3",
93 )];
94
95 let actor_config = BlockchainSubscriberActorConfig::new(client_id, chain.name, pools);
96 let actor = BlockchainSubscriberActor::new(actor_config);
97
98 node.add_actor(actor)?;
99
100 Ok(get_runtime().block_on(async move { node.run().await })?)
101}
102
103#[derive(Debug, Clone)]
105#[cfg_attr(
106 feature = "python",
107 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.blockchain")
108)]
109pub struct BlockchainSubscriberActorConfig {
110 pub base: DataActorConfig,
112 pub client_id: ClientId,
114 pub chain: Blockchain,
116 pub pools: Vec<InstrumentId>,
118}
119
120impl BlockchainSubscriberActorConfig {
121 #[must_use]
123 pub fn new(client_id: ClientId, chain: Blockchain, pools: Vec<InstrumentId>) -> Self {
124 Self {
125 base: DataActorConfig::default(),
126 client_id,
127 chain,
128 pools,
129 }
130 }
131}
132
133#[cfg(feature = "python")]
134#[pyo3::pymethods]
135impl BlockchainSubscriberActorConfig {
136 #[new]
138 fn py_new(client_id: ClientId, chain: Blockchain, pools: Vec<InstrumentId>) -> Self {
139 Self::new(client_id, chain, pools)
140 }
141
142 fn __repr__(&self) -> String {
144 format!(
145 "BlockchainSubscriberActorConfig(client_id={}, chain={:?}, pools={:?})",
146 self.client_id, self.chain, self.pools
147 )
148 }
149
150 #[getter]
152 const fn client_id(&self) -> ClientId {
153 self.client_id
154 }
155
156 #[getter]
158 const fn chain(&self) -> Blockchain {
159 self.chain
160 }
161
162 #[getter]
164 fn pools(&self) -> Vec<InstrumentId> {
165 self.pools.clone()
166 }
167}
168
169#[derive(Debug)]
175#[cfg_attr(
176 feature = "python",
177 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.blockchain", unsendable)
178)]
179pub struct BlockchainSubscriberActor {
180 core: DataActorCore,
181 config: BlockchainSubscriberActorConfig,
182 pub received_blocks: Vec<Block>,
183 pub received_pool_swaps: Vec<PoolSwap>,
184 pub received_pool_liquidity_updates: Vec<PoolLiquidityUpdate>,
185 pub received_pools: Vec<Pool>,
186}
187
188impl Deref for BlockchainSubscriberActor {
189 type Target = DataActorCore;
190
191 fn deref(&self) -> &Self::Target {
192 &self.core
193 }
194}
195
196impl DerefMut for BlockchainSubscriberActor {
197 fn deref_mut(&mut self) -> &mut Self::Target {
198 &mut self.core
199 }
200}
201
202impl DataActor for BlockchainSubscriberActor {
203 fn on_start(&mut self) -> anyhow::Result<()> {
204 let client_id = self.config.client_id;
205
206 self.subscribe_blocks(self.config.chain, Some(client_id), None);
207
208 let pool_instrument_ids = self.config.pools.clone();
209 for instrument_id in pool_instrument_ids {
210 self.subscribe_pool(instrument_id, Some(client_id), None);
211 self.subscribe_pool_swaps(instrument_id, Some(client_id), None);
212 self.subscribe_pool_liquidity_updates(instrument_id, Some(client_id), None);
213 }
214
215 self.clock().set_timer(
216 "TEST-TIMER-1-SECOND",
217 Duration::from_secs(1),
218 None,
219 None,
220 None,
221 Some(true),
222 Some(false),
223 )?;
224
225 self.clock().set_timer(
226 "TEST-TIMER-2-SECOND",
227 Duration::from_secs(2),
228 None,
229 None,
230 None,
231 Some(true),
232 Some(false),
233 )?;
234
235 Ok(())
236 }
237
238 fn on_stop(&mut self) -> anyhow::Result<()> {
239 let client_id = self.config.client_id;
240
241 self.unsubscribe_blocks(self.config.chain, Some(client_id), None);
242
243 let pool_instrument_ids = self.config.pools.clone();
244 for instrument_id in pool_instrument_ids {
245 self.unsubscribe_pool(instrument_id, Some(client_id), None);
246 self.unsubscribe_pool_swaps(instrument_id, Some(client_id), None);
247 self.unsubscribe_pool_liquidity_updates(instrument_id, Some(client_id), None);
248 }
249
250 Ok(())
251 }
252
253 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
254 log_info!("Received {block}", color = LogColor::Cyan);
255
256 self.received_blocks.push(block.clone());
257
258 {
259 let cache = self.cache();
260
261 for pool_id in &self.config.pools {
262 if let Some(pool_profiler) = cache.pool_profiler(pool_id) {
263 let total_ticks = pool_profiler.get_active_tick_count();
264 let total_positions = pool_profiler.get_total_active_positions();
265 let liquidity = pool_profiler.get_active_liquidity();
266 let liquidity_utilization_rate = pool_profiler.liquidity_utilization_rate();
267 log_info!(
268 "Pool {pool_id} contains {total_ticks} active ticks and {total_positions} active positions with liquidity of {liquidity}",
269 color = LogColor::Magenta
270 );
271 log_info!(
272 "Pool {pool_id} has a liquidity utilization rate of {:.4}%",
273 liquidity_utilization_rate * 100.0,
274 color = LogColor::Magenta
275 );
276 } else {
277 log_warn!(
278 "Pool profiler {} not found",
279 pool_id,
280 color = LogColor::Magenta
281 );
282 }
283 }
284 }
285
286 Ok(())
287 }
288
289 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
290 log_info!("Received {swap}", color = LogColor::Cyan);
291
292 self.received_pool_swaps.push(swap.clone());
293 Ok(())
294 }
295}
296
297impl BlockchainSubscriberActor {
298 #[must_use]
300 pub fn new(config: BlockchainSubscriberActorConfig) -> Self {
301 Self {
302 core: DataActorCore::new(config.base.clone()),
303 config,
304 received_blocks: Vec::new(),
305 received_pool_swaps: Vec::new(),
306 received_pool_liquidity_updates: Vec::new(),
307 received_pools: Vec::new(),
308 }
309 }
310
311 #[must_use]
313 pub const fn block_count(&self) -> usize {
314 self.received_blocks.len()
315 }
316
317 #[must_use]
319 pub const fn pool_count(&self) -> usize {
320 self.received_pools.len()
321 }
322
323 #[must_use]
325 pub const fn pool_swap_count(&self) -> usize {
326 self.received_pool_swaps.len()
327 }
328
329 #[must_use]
331 pub const fn pool_liquidity_update_count(&self) -> usize {
332 self.received_pool_liquidity_updates.len()
333 }
334}