1use std::{
17 ops::{Deref, DerefMut},
18 sync::Arc,
19 time::Duration,
20};
21
22use nautilus_blockchain::{
23 config::{BlockchainDataClientConfig, DexPoolFilters},
24 factories::BlockchainDataClientFactory,
25};
26use nautilus_common::{
27 actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
28 enums::{Environment, LogColor},
29 logging::log_info,
30 runtime::get_runtime,
31};
32use nautilus_core::env::get_env_var;
33use nautilus_infrastructure::sql::pg::PostgresConnectOptions;
34use nautilus_live::node::LiveNode;
35use nautilus_model::{
36 defi::{Block, Blockchain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, chain::chains},
37 identifiers::{ClientId, InstrumentId, TraderId},
38};
39
40fn main() -> Result<(), Box<dyn std::error::Error>> {
51 dotenvy::dotenv().ok();
52
53 let environment = Environment::Live;
54 let trader_id = TraderId::default();
55 let node_name = "TESTER-001".to_string();
56
57 let chain = chains::ARBITRUM.clone();
58 let wss_rpc_url = get_env_var("RPC_WSS_URL")?;
59 let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
60 let from_block = Some(350_000_000_u64); let dex_pool_filter = DexPoolFilters::new(Some(true));
65
66 let client_factory = BlockchainDataClientFactory::new();
67 let client_config = BlockchainDataClientConfig::new(
68 Arc::new(chain.clone()),
69 vec![DexType::UniswapV3],
70 http_rpc_url,
71 None, None, Some(wss_rpc_url),
74 true, from_block,
77 Some(dex_pool_filter),
78 Some(PostgresConnectOptions::default()),
79 );
80
81 let mut node = LiveNode::builder(node_name, trader_id, environment)?
82 .with_load_state(false)
83 .with_save_state(false)
84 .add_data_client(
85 None, Box::new(client_factory),
87 Box::new(client_config),
88 )?
89 .build()?;
90
91 let client_id = ClientId::new(format!("BLOCKCHAIN-{}", chain.name));
93
94 let pools = vec![InstrumentId::from(
95 "0xC31E54c7a869B9FcBEcc14363CF510d1c41fa443.Arbitrum:UniswapV3",
96 )];
97
98 let actor_config = BlockchainSubscriberActorConfig::new(client_id, chain.name, pools);
99 let actor = BlockchainSubscriberActor::new(actor_config);
100
101 node.add_actor(actor)?;
102
103 Ok(get_runtime().block_on(async move { node.run().await })?)
104}
105
106#[derive(Debug, Clone)]
108#[cfg_attr(
109 feature = "python",
110 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.blockchain")
111)]
112pub struct BlockchainSubscriberActorConfig {
113 pub base: DataActorConfig,
115 pub client_id: ClientId,
117 pub chain: Blockchain,
119 pub pools: Vec<InstrumentId>,
121}
122
123impl BlockchainSubscriberActorConfig {
124 #[must_use]
126 pub fn new(client_id: ClientId, chain: Blockchain, pools: Vec<InstrumentId>) -> Self {
127 Self {
128 base: DataActorConfig::default(),
129 client_id,
130 chain,
131 pools,
132 }
133 }
134}
135
136#[cfg(feature = "python")]
137#[pyo3::pymethods]
138impl BlockchainSubscriberActorConfig {
139 #[new]
141 fn py_new(client_id: ClientId, chain: Blockchain, pools: Vec<InstrumentId>) -> Self {
142 Self::new(client_id, chain, pools)
143 }
144
145 fn __repr__(&self) -> String {
147 format!(
148 "BlockchainSubscriberActorConfig(client_id={}, chain={:?}, pools={:?})",
149 self.client_id, self.chain, self.pools
150 )
151 }
152
153 #[getter]
155 const fn client_id(&self) -> ClientId {
156 self.client_id
157 }
158
159 #[getter]
161 const fn chain(&self) -> Blockchain {
162 self.chain
163 }
164
165 #[getter]
167 fn pools(&self) -> Vec<InstrumentId> {
168 self.pools.clone()
169 }
170}
171
172#[derive(Debug)]
178#[cfg_attr(
179 feature = "python",
180 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.blockchain", unsendable)
181)]
182pub struct BlockchainSubscriberActor {
183 core: DataActorCore,
184 config: BlockchainSubscriberActorConfig,
185 pub received_blocks: Vec<Block>,
186 pub received_pool_swaps: Vec<PoolSwap>,
187 pub received_pool_liquidity_updates: Vec<PoolLiquidityUpdate>,
188 pub received_pools: Vec<Pool>,
189}
190
191impl Deref for BlockchainSubscriberActor {
192 type Target = DataActorCore;
193
194 fn deref(&self) -> &Self::Target {
195 &self.core
196 }
197}
198
199impl DerefMut for BlockchainSubscriberActor {
200 fn deref_mut(&mut self) -> &mut Self::Target {
201 &mut self.core
202 }
203}
204
205impl DataActor for BlockchainSubscriberActor {
206 fn on_start(&mut self) -> anyhow::Result<()> {
207 let client_id = self.config.client_id;
208
209 self.subscribe_blocks(self.config.chain, Some(client_id), None);
210
211 let pool_instrument_ids = self.config.pools.clone();
212 for instrument_id in pool_instrument_ids {
213 self.subscribe_pool(instrument_id, Some(client_id), None);
214 self.subscribe_pool_swaps(instrument_id, Some(client_id), None);
215 self.subscribe_pool_liquidity_updates(instrument_id, Some(client_id), None);
216 }
217
218 self.clock().set_timer(
219 "TEST-TIMER-1-SECOND",
220 Duration::from_secs(1),
221 None,
222 None,
223 None,
224 Some(true),
225 Some(false),
226 )?;
227
228 self.clock().set_timer(
229 "TEST-TIMER-2-SECOND",
230 Duration::from_secs(2),
231 None,
232 None,
233 None,
234 Some(true),
235 Some(false),
236 )?;
237
238 Ok(())
239 }
240
241 fn on_stop(&mut self) -> anyhow::Result<()> {
242 let client_id = self.config.client_id;
243
244 self.unsubscribe_blocks(self.config.chain, Some(client_id), None);
245
246 let pool_instrument_ids = self.config.pools.clone();
247 for instrument_id in pool_instrument_ids {
248 self.unsubscribe_pool(instrument_id, Some(client_id), None);
249 self.unsubscribe_pool_swaps(instrument_id, Some(client_id), None);
250 self.unsubscribe_pool_liquidity_updates(instrument_id, Some(client_id), None);
251 }
252
253 Ok(())
254 }
255
256 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
257 log_info!("Received {block}", color = LogColor::Cyan);
258
259 self.received_blocks.push(block.clone());
260
261 {
262 let cache = self.cache();
263
264 for pool_id in &self.config.pools {
265 let pool_profiler = cache.pool_profiler(pool_id);
266 log_info!("{pool_profiler:?}", color = LogColor::Magenta);
267 }
268 }
269
270 Ok(())
271 }
272
273 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
274 log_info!("Received {swap}", color = LogColor::Cyan);
275
276 self.received_pool_swaps.push(swap.clone());
277 Ok(())
278 }
279}
280
281impl BlockchainSubscriberActor {
282 #[must_use]
284 pub fn new(config: BlockchainSubscriberActorConfig) -> Self {
285 Self {
286 core: DataActorCore::new(config.base.clone()),
287 config,
288 received_blocks: Vec::new(),
289 received_pool_swaps: Vec::new(),
290 received_pool_liquidity_updates: Vec::new(),
291 received_pools: Vec::new(),
292 }
293 }
294
295 #[must_use]
297 pub const fn block_count(&self) -> usize {
298 self.received_blocks.len()
299 }
300
301 #[must_use]
303 pub const fn pool_count(&self) -> usize {
304 self.received_pools.len()
305 }
306
307 #[must_use]
309 pub const fn pool_swap_count(&self) -> usize {
310 self.received_pool_swaps.len()
311 }
312
313 #[must_use]
315 pub const fn pool_liquidity_update_count(&self) -> usize {
316 self.received_pool_liquidity_updates.len()
317 }
318}