nautilus_blockchain/data/
client.rs1use nautilus_common::{
17 messages::{
18 DataEvent,
19 defi::{
20 DefiDataCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks,
21 SubscribePool, SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks,
22 UnsubscribePool, UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
23 },
24 },
25 runtime::get_runtime,
26};
27use nautilus_data::client::DataClient;
28use nautilus_model::{
29 defi::{DefiData, SharedChain, validation::validate_address},
30 identifiers::{ClientId, Venue},
31};
32
33use crate::{
34 config::BlockchainDataClientConfig,
35 data::core::BlockchainDataClientCore,
36 exchanges::get_dex_extended,
37 rpc::{BlockchainRpcClient, types::BlockchainMessage},
38};
39
40#[derive(Debug)]
50pub struct BlockchainDataClient {
51 pub chain: SharedChain,
53 pub config: BlockchainDataClientConfig,
55 pub core_client: Option<BlockchainDataClientCore>,
58 hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
60 hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
62 command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
64 command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
66 process_task: Option<tokio::task::JoinHandle<()>>,
68 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
70}
71
72impl BlockchainDataClient {
73 #[must_use]
75 pub fn new(config: BlockchainDataClientConfig) -> Self {
76 let chain = config.chain.clone();
77 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
78 let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
79 Self {
80 chain,
81 core_client: None,
82 config,
83 hypersync_rx: Some(hypersync_rx),
84 hypersync_tx: Some(hypersync_tx),
85 command_tx,
86 command_rx: Some(command_rx),
87 process_task: None,
88 shutdown_tx: None,
89 }
90 }
91
92 fn spawn_process_task(&mut self) {
100 let command_rx = if let Some(r) = self.command_rx.take() {
101 r
102 } else {
103 tracing::error!("Command receiver already taken, not spawning handler");
104 return;
105 };
106
107 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
108 self.shutdown_tx = Some(shutdown_tx);
109
110 let data_tx = nautilus_common::runner::get_data_event_sender();
111
112 let mut hypersync_rx = self.hypersync_rx.take().unwrap();
113 let hypersync_tx = self.hypersync_tx.take();
114
115 let mut core_client =
116 BlockchainDataClientCore::new(self.config.clone(), hypersync_tx, Some(data_tx));
117
118 let handle = get_runtime().spawn(async move {
119 tracing::debug!("Started task 'process'");
120
121 if let Err(e) = core_client.connect().await {
122 tracing::error!("Failed to connect blockchain core client: {e}");
123 return;
124 }
125
126 let mut command_rx = command_rx;
127 let mut shutdown_rx = shutdown_rx;
128
129 loop {
130 tokio::select! {
131 _ = &mut shutdown_rx => {
132 tracing::debug!("Received shutdown signal in Blockchain data client process task");
133 core_client.disconnect();
134 break;
135 }
136 command = command_rx.recv() => {
137 if let Some(cmd) = command {
138 match cmd {
139 DefiDataCommand::Subscribe(cmd) => {
140 let chain = cmd.blockchain();
141 if chain != core_client.chain.name {
142 tracing::error!("Incorrect blockchain for subscribe command: {chain}");
143 continue;
144 }
145
146 if let Err(e) = Self::handle_subscribe_command(cmd, &mut core_client).await{
147 tracing::error!("Error processing subscribe command: {e}");
148 }
149 }
150 DefiDataCommand::Unsubscribe(cmd) => {
151 let chain = cmd.blockchain();
152 if chain != core_client.chain.name {
153 tracing::error!("Incorrect blockchain for subscribe command: {chain}");
154 continue;
155 }
156
157 if let Err(e) = Self::handle_unsubscribe_command(cmd, &mut core_client).await{
158 tracing::error!("Error processing subscribe command: {e}");
159 }
160 }
161 }
162 } else {
163 tracing::debug!("Command channel closed");
164 break;
165 }
166 }
167 data = hypersync_rx.recv() => {
168 if let Some(msg) = data {
169 let data_event = match msg {
170 BlockchainMessage::Block(block) => {
171 for dex in core_client.cache.get_registered_dexes(){
173 let addresses = core_client.subscription_manager.get_subscribed_dex_contract_addresses(&dex);
174 if !addresses.is_empty() {
175 core_client.hypersync_client.process_block_dex_contract_events(
176 &dex,
177 block.number,
178 addresses,
179 core_client.subscription_manager.get_dex_pool_swap_event_signature(&dex).unwrap(),
180 core_client.subscription_manager.get_dex_pool_mint_event_signature(&dex).unwrap(),
181 core_client.subscription_manager.get_dex_pool_burn_event_signature(&dex).unwrap(),
182 ).await;
183 }
184 }
185
186 Some(DataEvent::DeFi(DefiData::Block(block)))
187 }
188 BlockchainMessage::SwapEvent(swap_event) => {
189 match core_client.get_pool(&swap_event.pool_address) {
190 Ok(pool) => {
191 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
192 match core_client.process_pool_swap_event(
193 &swap_event,
194 pool,
195 dex_extended,
196 ){
197 Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
198 Err(e) => {
199 tracing::error!("Error processing pool swap event: {e}");
200 None
201 }
202 }
203 }
204 Err(e) => {
205 tracing::error!("Failed to get pool {} with error {:?}", swap_event.pool_address, e);
206 None
207 }
208 }
209 }
210 BlockchainMessage::BurnEvent(burn_event) => {
211 match core_client.get_pool(&burn_event.pool_address) {
212 Ok(pool) => {
213 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
214 match core_client.process_pool_burn_event(
215 &burn_event,
216 pool,
217 dex_extended,
218 ){
219 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
220 Err(e) => {
221 tracing::error!("Error processing pool burn event: {e}");
222 None
223 }
224 }
225 }
226 Err(e) => {
227 tracing::error!("Failed to get pool {} with error {:?}", burn_event.pool_address, e);
228 None
229 }
230 }
231 }
232 BlockchainMessage::MintEvent(mint_event) => {
233 match core_client.get_pool(&mint_event.pool_address) {
234 Ok(pool) => {
235 let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
236 match core_client.process_pool_mint_event(
237 &mint_event,
238 pool,
239 dex_extended,
240 ){
241 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
242 Err(e) => {
243 tracing::error!("Error processing pool mint event: {e}");
244 None
245 }
246 }
247 }
248 Err(e) => {
249 tracing::error!("Failed to get pool {} with error {:?}", mint_event.pool_address, e);
250 None
251 }
252 }
253 }
254 BlockchainMessage::CollectEvent(collect_event) => {
255 match core_client.get_pool(&collect_event.pool_address) {
256 Ok(pool) => {
257 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
258 match core_client.process_pool_collect_event(
259 &collect_event,
260 pool,
261 dex_extended,
262 ){
263 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolFeeCollect(update))),
264 Err(e) => {
265 tracing::error!("Error processing pool collect event: {e}");
266 None
267 }
268 }
269 }
270 Err(e) => {
271 tracing::error!("Failed to get pool {} with error {:?}", collect_event.pool_address, e);
272 None
273 }
274 }
275 }
276 };
277
278 if let Some(event) = data_event {
279 core_client.send_data(event);
280 }
281 } else {
282 tracing::debug!("HyperSync data channel closed");
283 break;
284 }
285 }
286 msg = async {
287 if let Some(ref mut rpc_client) = core_client.rpc_client {
288 Some(rpc_client.next_rpc_message().await)
289 } else {
290 None
291 }
292 } => {
293 if let Some(msg) = msg {
294 match msg {
295 Ok(BlockchainMessage::Block(block)) => {
296 let data = DataEvent::DeFi(DefiData::Block(block));
297 core_client.send_data(data);
298 },
299 Ok(BlockchainMessage::SwapEvent(_)) => {
300 tracing::warn!("RPC swap events are not yet supported");
301 }
302 Ok(BlockchainMessage::MintEvent(_)) => {
303 tracing::warn!("RPC mint events are not yet supported");
304 }
305 Ok(BlockchainMessage::BurnEvent(_)) => {
306 tracing::warn!("RPC burn events are not yet supported");
307 }
308 Ok(BlockchainMessage::CollectEvent(_)) => {
309 tracing::warn!("RPC collect events are not yet supported")
310 }
311 Err(e) => {
312 tracing::error!("Error processing RPC message: {e}");
313 }
314 }
315 }
316 }
317 }
318 }
319
320 tracing::debug!("Stopped task 'process'");
321 });
322
323 self.process_task = Some(handle);
324 }
325
326 async fn handle_subscribe_command(
328 command: DefiSubscribeCommand,
329 core_client: &mut BlockchainDataClientCore,
330 ) -> anyhow::Result<()> {
331 match command {
332 DefiSubscribeCommand::Blocks(_cmd) => {
333 tracing::info!("Processing subscribe blocks command");
334
335 if let Some(ref mut rpc) = core_client.rpc_client {
337 if let Err(e) = rpc.subscribe_blocks().await {
338 tracing::warn!(
339 "RPC blocks subscription failed: {e}, falling back to HyperSync"
340 );
341 core_client.hypersync_client.subscribe_blocks();
342 tokio::task::yield_now().await;
343 } else {
344 tracing::info!("Successfully subscribed to blocks via RPC");
345 }
346 } else {
347 tracing::info!("Subscribing to blocks via HyperSync");
348 core_client.hypersync_client.subscribe_blocks();
349 tokio::task::yield_now().await;
350 }
351
352 Ok(())
353 }
354 DefiSubscribeCommand::Pool(_cmd) => {
355 tracing::info!("Processing subscribe pool command");
356 tracing::warn!("Pool subscriptions are handled at application level");
359 Ok(())
360 }
361 DefiSubscribeCommand::PoolSwaps(cmd) => {
362 tracing::info!(
363 "Processing subscribe pool swaps command for {}",
364 cmd.instrument_id
365 );
366
367 if let Some(ref mut _rpc) = core_client.rpc_client {
368 tracing::warn!(
369 "RPC pool swaps subscription not yet implemented, using HyperSync"
370 );
371 }
372
373 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
374 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
375 .map_err(|e| {
376 anyhow::anyhow!(
377 "Invalid pool swap address '{}' failed with error: {:?}",
378 cmd.instrument_id,
379 e
380 )
381 })?;
382 core_client
383 .subscription_manager
384 .subscribe_swaps(dex, pool_address);
385 } else {
386 anyhow::bail!(
387 "Invalid venue {}, expected Blockchain DEX format",
388 cmd.instrument_id.venue
389 )
390 }
391
392 Ok(())
393 }
394 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
395 tracing::info!(
396 "Processing subscribe pool liquidity updates command for address: {}",
397 cmd.instrument_id
398 );
399
400 if let Some(ref mut _rpc) = core_client.rpc_client {
401 tracing::warn!(
402 "RPC pool liquidity updates subscription not yet implemented, using HyperSync"
403 );
404 }
405
406 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
407 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
408 .map_err(|_| {
409 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
410 })?;
411 core_client
412 .subscription_manager
413 .subscribe_burns(dex, pool_address);
414 core_client
415 .subscription_manager
416 .subscribe_mints(dex, pool_address);
417 } else {
418 anyhow::bail!(
419 "Invalid venue {}, expected Blockchain DEX format",
420 cmd.instrument_id.venue
421 )
422 }
423
424 Ok(())
425 }
426 }
427 }
428
429 async fn handle_unsubscribe_command(
431 command: DefiUnsubscribeCommand,
432 core_client: &mut BlockchainDataClientCore,
433 ) -> anyhow::Result<()> {
434 match command {
435 DefiUnsubscribeCommand::Blocks(_cmd) => {
436 tracing::info!("Processing unsubscribe blocks command");
437
438 if core_client.rpc_client.is_some() {
440 tracing::warn!("RPC blocks unsubscription not yet implemented");
441 }
442
443 core_client.hypersync_client.unsubscribe_blocks();
445 tracing::info!("Unsubscribed from blocks via HyperSync");
446
447 Ok(())
448 }
449 DefiUnsubscribeCommand::Pool(_cmd) => {
450 tracing::info!("Processing unsubscribe pool command");
451 tracing::warn!("Pool unsubscriptions are handled at application level");
453 Ok(())
454 }
455 DefiUnsubscribeCommand::PoolSwaps(cmd) => {
456 tracing::info!("Processing unsubscribe pool swaps command");
457
458 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
459 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
460 .map_err(|_| {
461 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
462 })?;
463 core_client
464 .subscription_manager
465 .unsubscribe_swaps(dex, pool_address);
466 } else {
467 anyhow::bail!(
468 "Invalid venue {}, expected Blockchain DEX format",
469 cmd.instrument_id.venue
470 )
471 }
472
473 Ok(())
474 }
475 DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
476 tracing::info!(
477 "Processing unsubscribe pool liquidity updates command for {}",
478 cmd.instrument_id
479 );
480
481 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
482 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
483 .map_err(|_| {
484 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
485 })?;
486 core_client
487 .subscription_manager
488 .unsubscribe_burns(dex, pool_address);
489 core_client
490 .subscription_manager
491 .unsubscribe_mints(dex, pool_address);
492 } else {
493 anyhow::bail!(
494 "Invalid venue {}, expected Blockchain DEX format",
495 cmd.instrument_id.venue
496 )
497 }
498
499 Ok(())
500 }
501 }
502 }
503
504 pub async fn await_process_task_close(&mut self) {
509 if let Some(handle) = self.process_task.take()
510 && let Err(e) = handle.await
511 {
512 tracing::error!("Process task join error: {e}");
513 }
514 }
515}
516
517#[async_trait::async_trait]
518impl DataClient for BlockchainDataClient {
519 fn client_id(&self) -> ClientId {
520 ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
521 }
522
523 fn venue(&self) -> Option<Venue> {
524 None
527 }
528
529 fn start(&mut self) -> anyhow::Result<()> {
530 tracing::info!(
531 "Starting blockchain data client for '{chain_name}'",
532 chain_name = self.chain.name
533 );
534 Ok(())
535 }
536
537 fn stop(&mut self) -> anyhow::Result<()> {
538 tracing::info!(
539 "Stopping blockchain data client for '{chain_name}'",
540 chain_name = self.chain.name
541 );
542 Ok(())
543 }
544
545 fn reset(&mut self) -> anyhow::Result<()> {
546 tracing::info!(
547 "Resetting blockchain data client for '{chain_name}'",
548 chain_name = self.chain.name
549 );
550 Ok(())
551 }
552
553 fn dispose(&mut self) -> anyhow::Result<()> {
554 tracing::info!(
555 "Disposing blockchain data client for '{chain_name}'",
556 chain_name = self.chain.name
557 );
558 Ok(())
559 }
560
561 async fn connect(&mut self) -> anyhow::Result<()> {
562 tracing::info!(
563 "Connecting blockchain data client for '{}'",
564 self.chain.name
565 );
566
567 if self.process_task.is_none() {
568 self.spawn_process_task();
569 }
570
571 Ok(())
572 }
573
574 async fn disconnect(&mut self) -> anyhow::Result<()> {
575 tracing::info!(
576 "Disconnecting blockchain data client for '{}'",
577 self.chain.name
578 );
579
580 if let Some(shutdown_tx) = self.shutdown_tx.take() {
581 let _ = shutdown_tx.send(());
582 }
583 self.await_process_task_close().await;
584
585 Ok(())
586 }
587
588 fn is_connected(&self) -> bool {
589 true
592 }
593
594 fn is_disconnected(&self) -> bool {
595 !self.is_connected()
596 }
597
598 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
599 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd.clone()));
600 self.command_tx.send(command)?;
601 Ok(())
602 }
603
604 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
605 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd.clone()));
606 self.command_tx.send(command)?;
607 Ok(())
608 }
609
610 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
611 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd.clone()));
612 self.command_tx.send(command)?;
613 Ok(())
614 }
615
616 fn subscribe_pool_liquidity_updates(
617 &mut self,
618 cmd: &SubscribePoolLiquidityUpdates,
619 ) -> anyhow::Result<()> {
620 let command =
621 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
622 self.command_tx.send(command)?;
623 Ok(())
624 }
625
626 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
627 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
628 self.command_tx.send(command)?;
629 Ok(())
630 }
631
632 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
633 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
634 self.command_tx.send(command)?;
635 Ok(())
636 }
637
638 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
639 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
640 self.command_tx.send(command)?;
641 Ok(())
642 }
643
644 fn unsubscribe_pool_liquidity_updates(
645 &mut self,
646 cmd: &UnsubscribePoolLiquidityUpdates,
647 ) -> anyhow::Result<()> {
648 let command =
649 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
650 self.command_tx.send(command)?;
651 Ok(())
652 }
653}