1use std::{
17 ops::{Deref, DerefMut},
18 sync::Arc,
19 time::Duration,
20};
21
22use nautilus_blockchain::{
23 config::{BlockchainDataClientConfig, DexPoolFilters},
24 factories::BlockchainDataClientFactory,
25};
26use nautilus_common::{
27 actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
28 enums::{Environment, LogColor},
29 logging::log_info,
30 runtime::get_runtime,
31};
32use nautilus_core::env::get_env_var;
33use nautilus_infrastructure::sql::pg::PostgresConnectOptions;
34use nautilus_live::node::LiveNode;
35use nautilus_model::{
36 defi::{Block, Blockchain, DexType, Pool, PoolLiquidityUpdate, PoolSwap, chain::chains},
37 identifiers::{ClientId, InstrumentId, TraderId},
38};
39fn main() -> Result<(), Box<dyn std::error::Error>> {
50 dotenvy::dotenv().ok();
51
52 let environment = Environment::Live;
53 let trader_id = TraderId::default();
54 let node_name = "TESTER-001".to_string();
55
56 let chain = chains::ARBITRUM.clone();
57 let wss_rpc_url = get_env_var("RPC_WSS_URL")?;
58 let http_rpc_url = get_env_var("RPC_HTTP_URL")?;
59 let from_block = None; let dex_pool_filter = DexPoolFilters::new(Some(true));
64
65 let client_factory = BlockchainDataClientFactory::new();
66 let client_config = BlockchainDataClientConfig::new(
67 Arc::new(chain.clone()),
68 vec![DexType::UniswapV3],
69 http_rpc_url,
70 None, Some(wss_rpc_url),
72 true, from_block,
75 Some(dex_pool_filter),
76 Some(PostgresConnectOptions::default()),
77 );
78
79 let mut node = LiveNode::builder(node_name, trader_id, environment)?
80 .with_load_state(false)
81 .with_save_state(false)
82 .add_data_client(
83 None, Box::new(client_factory),
85 Box::new(client_config),
86 )?
87 .build()?;
88
89 let client_id = ClientId::new(format!("BLOCKCHAIN-{}", chain.name));
91
92 let pools = vec![InstrumentId::from(
93 "0xC31E54c7a869B9FcBEcc14363CF510d1c41fa443.Arbitrum:UniswapV3",
94 )];
95
96 let actor_config = BlockchainSubscriberActorConfig::new(client_id, chain.name, pools);
97 let actor = BlockchainSubscriberActor::new(actor_config);
98
99 node.add_actor(actor)?;
100
101 Ok(get_runtime().block_on(async move { node.run().await })?)
102}
103
104#[derive(Debug, Clone)]
106#[cfg_attr(
107 feature = "python",
108 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.blockchain")
109)]
110pub struct BlockchainSubscriberActorConfig {
111 pub base: DataActorConfig,
113 pub client_id: ClientId,
115 pub chain: Blockchain,
117 pub pools: Vec<InstrumentId>,
119}
120
121impl BlockchainSubscriberActorConfig {
122 #[must_use]
124 pub fn new(client_id: ClientId, chain: Blockchain, pools: Vec<InstrumentId>) -> Self {
125 Self {
126 base: DataActorConfig::default(),
127 client_id,
128 chain,
129 pools,
130 }
131 }
132}
133
134#[cfg(feature = "python")]
135#[pyo3::pymethods]
136impl BlockchainSubscriberActorConfig {
137 #[new]
139 fn py_new(client_id: ClientId, chain: Blockchain, pools: Vec<InstrumentId>) -> Self {
140 Self::new(client_id, chain, pools)
141 }
142
143 #[getter]
145 const fn client_id(&self) -> ClientId {
146 self.client_id
147 }
148
149 #[getter]
151 const fn chain(&self) -> Blockchain {
152 self.chain
153 }
154
155 #[getter]
157 fn pools(&self) -> Vec<InstrumentId> {
158 self.pools.clone()
159 }
160
161 fn __repr__(&self) -> String {
163 format!(
164 "BlockchainSubscriberActorConfig(client_id={}, chain={:?}, pools={:?})",
165 self.client_id, self.chain, self.pools
166 )
167 }
168}
169
170#[derive(Debug)]
176#[cfg_attr(
177 feature = "python",
178 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.blockchain", unsendable)
179)]
180pub struct BlockchainSubscriberActor {
181 core: DataActorCore,
182 config: BlockchainSubscriberActorConfig,
183 pub received_blocks: Vec<Block>,
184 pub received_pool_swaps: Vec<PoolSwap>,
185 pub received_pool_liquidity_updates: Vec<PoolLiquidityUpdate>,
186 pub received_pools: Vec<Pool>,
187}
188
189impl Deref for BlockchainSubscriberActor {
190 type Target = DataActorCore;
191
192 fn deref(&self) -> &Self::Target {
193 &self.core
194 }
195}
196
197impl DerefMut for BlockchainSubscriberActor {
198 fn deref_mut(&mut self) -> &mut Self::Target {
199 &mut self.core
200 }
201}
202
203impl DataActor for BlockchainSubscriberActor {
204 fn on_start(&mut self) -> anyhow::Result<()> {
205 let client_id = self.config.client_id;
206
207 self.subscribe_blocks(self.config.chain, Some(client_id), None);
208
209 let pool_instrument_ids = self.config.pools.clone();
210 for instrument_id in pool_instrument_ids {
211 self.subscribe_pool(instrument_id, Some(client_id), None);
212 self.subscribe_pool_swaps(instrument_id, Some(client_id), None);
213 self.subscribe_pool_liquidity_updates(instrument_id, Some(client_id), None);
214 }
215
216 self.clock().set_timer(
217 "TEST-TIMER-1-SECOND",
218 Duration::from_secs(1),
219 None,
220 None,
221 None,
222 Some(true),
223 Some(false),
224 )?;
225
226 self.clock().set_timer(
227 "TEST-TIMER-2-SECOND",
228 Duration::from_secs(2),
229 None,
230 None,
231 None,
232 Some(true),
233 Some(false),
234 )?;
235
236 Ok(())
237 }
238
239 fn on_stop(&mut self) -> anyhow::Result<()> {
240 let client_id = self.config.client_id;
241
242 self.unsubscribe_blocks(self.config.chain, Some(client_id), None);
243
244 let pool_instrument_ids = self.config.pools.clone();
245 for instrument_id in pool_instrument_ids {
246 self.unsubscribe_pool(instrument_id, Some(client_id), None);
247 self.unsubscribe_pool_swaps(instrument_id, Some(client_id), None);
248 self.unsubscribe_pool_liquidity_updates(instrument_id, Some(client_id), None);
249 }
250
251 Ok(())
252 }
253
254 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
255 log_info!("Received {block}", color = LogColor::Cyan);
256
257 self.received_blocks.push(block.clone());
258 Ok(())
259 }
260
261 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
262 log_info!("Received {swap}", color = LogColor::Cyan);
263
264 self.received_pool_swaps.push(swap.clone());
265 Ok(())
266 }
267}
268
269impl BlockchainSubscriberActor {
270 #[must_use]
272 pub fn new(config: BlockchainSubscriberActorConfig) -> Self {
273 Self {
274 core: DataActorCore::new(config.base.clone()),
275 config,
276 received_blocks: Vec::new(),
277 received_pool_swaps: Vec::new(),
278 received_pool_liquidity_updates: Vec::new(),
279 received_pools: Vec::new(),
280 }
281 }
282
283 #[must_use]
285 pub const fn block_count(&self) -> usize {
286 self.received_blocks.len()
287 }
288
289 #[must_use]
291 pub const fn pool_count(&self) -> usize {
292 self.received_pools.len()
293 }
294
295 #[must_use]
297 pub const fn pool_swap_count(&self) -> usize {
298 self.received_pool_swaps.len()
299 }
300
301 #[must_use]
303 pub const fn pool_liquidity_update_count(&self) -> usize {
304 self.received_pool_liquidity_updates.len()
305 }
306}