1use nautilus_common::{
17 messages::{
18 DataEvent,
19 defi::{
20 DefiDataCommand, DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand,
21 SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
22 SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
23 UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
24 UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
25 },
26 },
27 runtime::get_runtime,
28};
29use nautilus_data::client::DataClient;
30use nautilus_model::{
31 defi::{DefiData, SharedChain, validation::validate_address},
32 identifiers::{ClientId, Venue},
33};
34
35use crate::{
36 config::BlockchainDataClientConfig,
37 data::core::BlockchainDataClientCore,
38 exchanges::get_dex_extended,
39 rpc::{BlockchainRpcClient, types::BlockchainMessage},
40};
41
42#[derive(Debug)]
52pub struct BlockchainDataClient {
53 pub chain: SharedChain,
55 pub config: BlockchainDataClientConfig,
57 pub core_client: Option<BlockchainDataClientCore>,
60 hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
62 hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
64 command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
66 command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
68 process_task: Option<tokio::task::JoinHandle<()>>,
70 cancellation_token: tokio_util::sync::CancellationToken,
72}
73
74impl BlockchainDataClient {
75 #[must_use]
77 pub fn new(config: BlockchainDataClientConfig) -> Self {
78 let chain = config.chain.clone();
79 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
80 let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
81 Self {
82 chain,
83 core_client: None,
84 config,
85 hypersync_rx: Some(hypersync_rx),
86 hypersync_tx: Some(hypersync_tx),
87 command_tx,
88 command_rx: Some(command_rx),
89 process_task: None,
90 cancellation_token: tokio_util::sync::CancellationToken::new(),
91 }
92 }
93
94 fn spawn_process_task(&mut self) {
102 let command_rx = if let Some(r) = self.command_rx.take() {
103 r
104 } else {
105 tracing::error!("Command receiver already taken, not spawning handler");
106 return;
107 };
108
109 let cancellation_token = self.cancellation_token.clone();
110
111 let data_tx = nautilus_common::runner::get_data_event_sender();
112
113 let mut hypersync_rx = self.hypersync_rx.take().unwrap();
114 let hypersync_tx = self.hypersync_tx.take();
115
116 let mut core_client = BlockchainDataClientCore::new(
117 self.config.clone(),
118 hypersync_tx,
119 Some(data_tx),
120 cancellation_token.clone(),
121 );
122
123 let handle = get_runtime().spawn(async move {
124 tracing::debug!("Started task 'process'");
125
126 if let Err(e) = core_client.connect().await {
127 if e.to_string().contains("cancelled") || e.to_string().contains("Sync cancelled") {
130 tracing::warn!("Blockchain core client connection interrupted: {e}");
131 } else {
132 tracing::error!("Failed to connect blockchain core client: {e}");
133 }
134 return;
135 }
136
137 let mut command_rx = command_rx;
138
139 loop {
140 tokio::select! {
141 () = cancellation_token.cancelled() => {
142 tracing::debug!("Received cancellation signal in Blockchain data client process task");
143 core_client.disconnect().await;
144 break;
145 }
146 command = command_rx.recv() => {
147 if let Some(cmd) = command {
148 match cmd {
149 DefiDataCommand::Subscribe(cmd) => {
150 let chain = cmd.blockchain();
151 if chain != core_client.chain.name {
152 tracing::error!("Incorrect blockchain for subscribe command: {chain}");
153 continue;
154 }
155
156 if let Err(e) = Self::handle_subscribe_command(cmd, &mut core_client).await{
157 tracing::error!("Error processing subscribe command: {e}");
158 }
159 }
160 DefiDataCommand::Unsubscribe(cmd) => {
161 let chain = cmd.blockchain();
162 if chain != core_client.chain.name {
163 tracing::error!("Incorrect blockchain for subscribe command: {chain}");
164 continue;
165 }
166
167 if let Err(e) = Self::handle_unsubscribe_command(cmd, &mut core_client).await{
168 tracing::error!("Error processing subscribe command: {e}");
169 }
170 }
171 DefiDataCommand::Request(cmd) => {
172 if let Err(e) = Self::handle_request_command(cmd, &mut core_client).await {
173 tracing::error!("Error processing request command: {e}");
174 }
175 }
176 }
177 } else {
178 tracing::debug!("Command channel closed");
179 break;
180 }
181 }
182 data = hypersync_rx.recv() => {
183 if let Some(msg) = data {
184 let data_event = match msg {
185 BlockchainMessage::Block(block) => {
186 for dex in core_client.cache.get_registered_dexes(){
188 let addresses = core_client.subscription_manager.get_subscribed_dex_contract_addresses(&dex);
189 if !addresses.is_empty() {
190 core_client.hypersync_client.process_block_dex_contract_events(
191 &dex,
192 block.number,
193 addresses,
194 core_client.subscription_manager.get_dex_pool_swap_event_signature(&dex).unwrap(),
195 core_client.subscription_manager.get_dex_pool_mint_event_signature(&dex).unwrap(),
196 core_client.subscription_manager.get_dex_pool_burn_event_signature(&dex).unwrap(),
197 );
198 }
199 }
200
201 Some(DataEvent::DeFi(DefiData::Block(block)))
202 }
203 BlockchainMessage::SwapEvent(swap_event) => {
204 match core_client.get_pool(&swap_event.pool_address) {
205 Ok(pool) => {
206 match core_client.process_pool_swap_event(&swap_event, pool){
207 Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
208 Err(e) => {
209 tracing::error!("Error processing pool swap event: {e}");
210 None
211 }
212 }
213 }
214 Err(e) => {
215 tracing::error!("Failed to get pool {} with error {:?}", swap_event.pool_address, e);
216 None
217 }
218 }
219 }
220 BlockchainMessage::BurnEvent(burn_event) => {
221 match core_client.get_pool(&burn_event.pool_address) {
222 Ok(pool) => {
223 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
224 match core_client.process_pool_burn_event(
225 &burn_event,
226 pool,
227 dex_extended,
228 ){
229 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
230 Err(e) => {
231 tracing::error!("Error processing pool burn event: {e}");
232 None
233 }
234 }
235 }
236 Err(e) => {
237 tracing::error!("Failed to get pool {} with error {:?}", burn_event.pool_address, e);
238 None
239 }
240 }
241 }
242 BlockchainMessage::MintEvent(mint_event) => {
243 match core_client.get_pool(&mint_event.pool_address) {
244 Ok(pool) => {
245 let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
246 match core_client.process_pool_mint_event(
247 &mint_event,
248 pool,
249 dex_extended,
250 ){
251 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
252 Err(e) => {
253 tracing::error!("Error processing pool mint event: {e}");
254 None
255 }
256 }
257 }
258 Err(e) => {
259 tracing::error!("Failed to get pool {} with error {:?}", mint_event.pool_address, e);
260 None
261 }
262 }
263 }
264 BlockchainMessage::CollectEvent(collect_event) => {
265 match core_client.get_pool(&collect_event.pool_address) {
266 Ok(pool) => {
267 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
268 match core_client.process_pool_collect_event(
269 &collect_event,
270 pool,
271 dex_extended,
272 ){
273 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolFeeCollect(update))),
274 Err(e) => {
275 tracing::error!("Error processing pool collect event: {e}");
276 None
277 }
278 }
279 }
280 Err(e) => {
281 tracing::error!("Failed to get pool {} with error {:?}", collect_event.pool_address, e);
282 None
283 }
284 }
285 }
286 BlockchainMessage::FlashEvent(flash_event) => {
287 match core_client.get_pool(&flash_event.pool_address) {
288 Ok(pool) => {
289 match core_client.process_pool_flash_event(&flash_event,pool){
290 Ok(flash) => Some(DataEvent::DeFi(DefiData::PoolFlash(flash))),
291 Err(e) => {
292 tracing::error!("Error processing pool flash event: {e}");
293 None
294 }
295 }
296 }
297 Err(e) => {
298 tracing::error!("Failed to get pool {} with error {:?}", flash_event.pool_address, e);
299 None
300 }
301 }
302 }
303 };
304
305 if let Some(event) = data_event {
306 core_client.send_data(event);
307 }
308 } else {
309 tracing::debug!("HyperSync data channel closed");
310 break;
311 }
312 }
313 msg = async {
314 match core_client.rpc_client {
315 Some(ref mut rpc_client) => rpc_client.next_rpc_message().await,
316 None => std::future::pending().await, }
318 } => {
319 match msg {
321 Ok(BlockchainMessage::Block(block)) => {
322 let data = DataEvent::DeFi(DefiData::Block(block));
323 core_client.send_data(data);
324 },
325 Ok(BlockchainMessage::SwapEvent(_)) => {
326 tracing::warn!("RPC swap events are not yet supported");
327 }
328 Ok(BlockchainMessage::MintEvent(_)) => {
329 tracing::warn!("RPC mint events are not yet supported");
330 }
331 Ok(BlockchainMessage::BurnEvent(_)) => {
332 tracing::warn!("RPC burn events are not yet supported");
333 }
334 Ok(BlockchainMessage::CollectEvent(_)) => {
335 tracing::warn!("RPC collect events are not yet supported");
336 }
337 Ok(BlockchainMessage::FlashEvent(_)) => {
338 tracing::warn!("RPC flash events are not yet supported");
339 }
340 Err(e) => {
341 tracing::error!("Error processing RPC message: {e}");
342 }
343 }
344 }
345 }
346 }
347
348 tracing::debug!("Stopped task 'process'");
349 });
350
351 self.process_task = Some(handle);
352 }
353
354 async fn handle_subscribe_command(
356 command: DefiSubscribeCommand,
357 core_client: &mut BlockchainDataClientCore,
358 ) -> anyhow::Result<()> {
359 match command {
360 DefiSubscribeCommand::Blocks(_cmd) => {
361 tracing::info!("Processing subscribe blocks command");
362
363 if let Some(ref mut rpc) = core_client.rpc_client {
365 if let Err(e) = rpc.subscribe_blocks().await {
366 tracing::warn!(
367 "RPC blocks subscription failed: {e}, falling back to HyperSync"
368 );
369 core_client.hypersync_client.subscribe_blocks();
370 tokio::task::yield_now().await;
371 } else {
372 tracing::info!("Successfully subscribed to blocks via RPC");
373 }
374 } else {
375 tracing::info!("Subscribing to blocks via HyperSync");
376 core_client.hypersync_client.subscribe_blocks();
377 tokio::task::yield_now().await;
378 }
379
380 Ok(())
381 }
382 DefiSubscribeCommand::Pool(cmd) => {
383 tracing::info!(
384 "Processing subscribe pool command for {}",
385 cmd.instrument_id
386 );
387
388 if let Some(ref mut _rpc) = core_client.rpc_client {
389 tracing::warn!("RPC pool subscription not yet implemented, using HyperSync");
390 }
391
392 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
393 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
394 .map_err(|e| {
395 anyhow::anyhow!(
396 "Invalid pool address '{}' failed with error: {:?}",
397 cmd.instrument_id,
398 e
399 )
400 })?;
401
402 core_client
404 .subscription_manager
405 .subscribe_swaps(dex, pool_address);
406 core_client
407 .subscription_manager
408 .subscribe_burns(dex, pool_address);
409 core_client
410 .subscription_manager
411 .subscribe_mints(dex, pool_address);
412 core_client
413 .subscription_manager
414 .subscribe_collects(dex, pool_address);
415 core_client
416 .subscription_manager
417 .subscribe_flashes(dex, pool_address);
418
419 tracing::info!(
420 "Subscribed to all pool events for {} at address {}",
421 cmd.instrument_id,
422 pool_address
423 );
424 } else {
425 anyhow::bail!(
426 "Invalid venue {}, expected Blockchain DEX format",
427 cmd.instrument_id.venue
428 )
429 }
430
431 Ok(())
432 }
433 DefiSubscribeCommand::PoolSwaps(cmd) => {
434 tracing::info!(
435 "Processing subscribe pool swaps command for {}",
436 cmd.instrument_id
437 );
438
439 if let Some(ref mut _rpc) = core_client.rpc_client {
440 tracing::warn!(
441 "RPC pool swaps subscription not yet implemented, using HyperSync"
442 );
443 }
444
445 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
446 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
447 .map_err(|e| {
448 anyhow::anyhow!(
449 "Invalid pool swap address '{}' failed with error: {:?}",
450 cmd.instrument_id,
451 e
452 )
453 })?;
454 core_client
455 .subscription_manager
456 .subscribe_swaps(dex, pool_address);
457 } else {
458 anyhow::bail!(
459 "Invalid venue {}, expected Blockchain DEX format",
460 cmd.instrument_id.venue
461 )
462 }
463
464 Ok(())
465 }
466 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
467 tracing::info!(
468 "Processing subscribe pool liquidity updates command for address: {}",
469 cmd.instrument_id
470 );
471
472 if let Some(ref mut _rpc) = core_client.rpc_client {
473 tracing::warn!(
474 "RPC pool liquidity updates subscription not yet implemented, using HyperSync"
475 );
476 }
477
478 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
479 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
480 .map_err(|_| {
481 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
482 })?;
483 core_client
484 .subscription_manager
485 .subscribe_burns(dex, pool_address);
486 core_client
487 .subscription_manager
488 .subscribe_mints(dex, pool_address);
489 } else {
490 anyhow::bail!(
491 "Invalid venue {}, expected Blockchain DEX format",
492 cmd.instrument_id.venue
493 )
494 }
495
496 Ok(())
497 }
498 DefiSubscribeCommand::PoolFeeCollects(cmd) => {
499 tracing::info!(
500 "Processing subscribe pool fee collects command for address: {}",
501 cmd.instrument_id
502 );
503
504 if let Some(ref mut _rpc) = core_client.rpc_client {
505 tracing::warn!(
506 "RPC pool fee collects subscription not yet implemented, using HyperSync"
507 );
508 }
509
510 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
511 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
512 .map_err(|_| {
513 anyhow::anyhow!(
514 "Invalid pool fee collect address: {}",
515 cmd.instrument_id
516 )
517 })?;
518 core_client
519 .subscription_manager
520 .subscribe_collects(dex, pool_address);
521 } else {
522 anyhow::bail!(
523 "Invalid venue {}, expected Blockchain DEX format",
524 cmd.instrument_id.venue
525 )
526 }
527
528 Ok(())
529 }
530 DefiSubscribeCommand::PoolFlashEvents(cmd) => {
531 tracing::info!(
532 "Processing subscribe pool flash command for address: {}",
533 cmd.instrument_id
534 );
535
536 if let Some(ref mut _rpc) = core_client.rpc_client {
537 tracing::warn!(
538 "RPC pool fee collects subscription not yet implemented, using HyperSync"
539 );
540 }
541
542 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
543 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
544 .map_err(|_| {
545 anyhow::anyhow!(
546 "Invalid pool flash subscribe address: {}",
547 cmd.instrument_id
548 )
549 })?;
550 core_client
551 .subscription_manager
552 .subscribe_flashes(dex, pool_address);
553 } else {
554 anyhow::bail!(
555 "Invalid venue {}, expected Blockchain DEX format",
556 cmd.instrument_id.venue
557 )
558 }
559
560 Ok(())
561 }
562 }
563 }
564
565 async fn handle_unsubscribe_command(
567 command: DefiUnsubscribeCommand,
568 core_client: &mut BlockchainDataClientCore,
569 ) -> anyhow::Result<()> {
570 match command {
571 DefiUnsubscribeCommand::Blocks(_cmd) => {
572 tracing::info!("Processing unsubscribe blocks command");
573
574 if core_client.rpc_client.is_some() {
576 tracing::warn!("RPC blocks unsubscription not yet implemented");
577 }
578
579 core_client.hypersync_client.unsubscribe_blocks().await;
581 tracing::info!("Unsubscribed from blocks via HyperSync");
582
583 Ok(())
584 }
585 DefiUnsubscribeCommand::Pool(cmd) => {
586 tracing::info!(
587 "Processing unsubscribe pool command for {}",
588 cmd.instrument_id
589 );
590
591 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
592 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
593 .map_err(|_| {
594 anyhow::anyhow!("Invalid pool address: {}", cmd.instrument_id)
595 })?;
596
597 core_client
599 .subscription_manager
600 .unsubscribe_swaps(dex, pool_address);
601 core_client
602 .subscription_manager
603 .unsubscribe_burns(dex, pool_address);
604 core_client
605 .subscription_manager
606 .unsubscribe_mints(dex, pool_address);
607 core_client
608 .subscription_manager
609 .unsubscribe_collects(dex, pool_address);
610 core_client
611 .subscription_manager
612 .unsubscribe_flashes(dex, pool_address);
613
614 tracing::info!(
615 "Unsubscribed from all pool events for {} at address {}",
616 cmd.instrument_id,
617 pool_address
618 );
619 } else {
620 anyhow::bail!(
621 "Invalid venue {}, expected Blockchain DEX format",
622 cmd.instrument_id.venue
623 )
624 }
625
626 Ok(())
627 }
628 DefiUnsubscribeCommand::PoolSwaps(cmd) => {
629 tracing::info!("Processing unsubscribe pool swaps command");
630
631 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
632 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
633 .map_err(|_| {
634 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
635 })?;
636 core_client
637 .subscription_manager
638 .unsubscribe_swaps(dex, pool_address);
639 } else {
640 anyhow::bail!(
641 "Invalid venue {}, expected Blockchain DEX format",
642 cmd.instrument_id.venue
643 )
644 }
645
646 Ok(())
647 }
648 DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
649 tracing::info!(
650 "Processing unsubscribe pool liquidity updates command for {}",
651 cmd.instrument_id
652 );
653
654 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
655 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
656 .map_err(|_| {
657 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
658 })?;
659 core_client
660 .subscription_manager
661 .unsubscribe_burns(dex, pool_address);
662 core_client
663 .subscription_manager
664 .unsubscribe_mints(dex, pool_address);
665 } else {
666 anyhow::bail!(
667 "Invalid venue {}, expected Blockchain DEX format",
668 cmd.instrument_id.venue
669 )
670 }
671
672 Ok(())
673 }
674 DefiUnsubscribeCommand::PoolFeeCollects(cmd) => {
675 tracing::info!(
676 "Processing unsubscribe pool fee collects command for {}",
677 cmd.instrument_id
678 );
679
680 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
681 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
682 .map_err(|_| {
683 anyhow::anyhow!(
684 "Invalid pool fee collect address: {}",
685 cmd.instrument_id
686 )
687 })?;
688 core_client
689 .subscription_manager
690 .unsubscribe_collects(dex, pool_address);
691 } else {
692 anyhow::bail!(
693 "Invalid venue {}, expected Blockchain DEX format",
694 cmd.instrument_id.venue
695 )
696 }
697
698 Ok(())
699 }
700 DefiUnsubscribeCommand::PoolFlashEvents(cmd) => {
701 tracing::info!(
702 "Processing unsubscribe pool flash command for {}",
703 cmd.instrument_id
704 );
705
706 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
707 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
708 .map_err(|_| {
709 anyhow::anyhow!("Invalid pool flash address: {}", cmd.instrument_id)
710 })?;
711 core_client
712 .subscription_manager
713 .unsubscribe_flashes(dex, pool_address);
714 } else {
715 anyhow::bail!(
716 "Invalid venue {}, expected Blockchain DEX format",
717 cmd.instrument_id.venue
718 )
719 }
720
721 Ok(())
722 }
723 }
724 }
725
726 async fn handle_request_command(
728 command: DefiRequestCommand,
729 core_client: &mut BlockchainDataClientCore,
730 ) -> anyhow::Result<()> {
731 match command {
732 DefiRequestCommand::PoolSnapshot(cmd) => {
733 tracing::info!("Processing pool snapshot request for {}", cmd.instrument_id);
734
735 let pool_address =
736 validate_address(cmd.instrument_id.symbol.as_str()).map_err(|e| {
737 anyhow::anyhow!(
738 "Invalid pool address '{}' failed with error: {:?}",
739 cmd.instrument_id,
740 e
741 )
742 })?;
743
744 match core_client.get_pool(&pool_address) {
745 Ok(pool) => {
746 let pool = pool.clone();
747 tracing::debug!("Found pool for snapshot request: {}", cmd.instrument_id);
748
749 let pool_data = DataEvent::DeFi(DefiData::Pool(pool.as_ref().clone()));
751 core_client.send_data(pool_data);
752
753 match core_client.bootstrap_latest_pool_profiler(&pool).await {
754 Ok((profiler, already_valid)) => {
755 let snapshot = profiler.extract_snapshot();
756
757 tracing::info!(
758 "Saving pool snapshot with {} positions and {} ticks to database...",
759 snapshot.positions.len(),
760 snapshot.ticks.len()
761 );
762 core_client
763 .cache
764 .add_pool_snapshot(&pool.address, &snapshot)
765 .await?;
766
767 if core_client
769 .check_snapshot_validity(&profiler, already_valid)
770 .await?
771 {
772 let snapshot_data =
773 DataEvent::DeFi(DefiData::PoolSnapshot(snapshot));
774 core_client.send_data(snapshot_data);
775 }
776 }
777 Err(e) => tracing::error!(
778 "Failed to bootstrap pool profiler for {} and extract snapshot with error {}",
779 cmd.instrument_id,
780 e.to_string()
781 ),
782 }
783 }
784 Err(e) => {
785 tracing::warn!("Pool {} not found in cache: {e}", cmd.instrument_id);
786 }
787 }
788
789 Ok(())
790 }
791 }
792 }
793
794 pub async fn await_process_task_close(&mut self) {
799 if let Some(handle) = self.process_task.take()
800 && let Err(e) = handle.await
801 {
802 tracing::error!("Process task join error: {e}");
803 }
804 }
805}
806
807#[async_trait::async_trait]
808impl DataClient for BlockchainDataClient {
809 fn client_id(&self) -> ClientId {
810 ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
811 }
812
813 fn venue(&self) -> Option<Venue> {
814 None
817 }
818
819 fn start(&mut self) -> anyhow::Result<()> {
820 tracing::info!(
821 chain_name = %self.chain.name,
822 dex_ids = ?self.config.dex_ids,
823 use_hypersync_for_live_data = self.config.use_hypersync_for_live_data,
824 http_proxy_url = ?self.config.http_proxy_url,
825 ws_proxy_url = ?self.config.ws_proxy_url,
826 "Starting blockchain data client"
827 );
828 Ok(())
829 }
830
831 fn stop(&mut self) -> anyhow::Result<()> {
832 tracing::info!(
833 "Stopping blockchain data client for '{chain_name}'",
834 chain_name = self.chain.name
835 );
836 self.cancellation_token.cancel();
837
838 self.cancellation_token = tokio_util::sync::CancellationToken::new();
840 Ok(())
841 }
842
843 fn reset(&mut self) -> anyhow::Result<()> {
844 tracing::info!(
845 "Resetting blockchain data client for '{chain_name}'",
846 chain_name = self.chain.name
847 );
848 self.cancellation_token = tokio_util::sync::CancellationToken::new();
849 Ok(())
850 }
851
852 fn dispose(&mut self) -> anyhow::Result<()> {
853 tracing::info!(
854 "Disposing blockchain data client for '{chain_name}'",
855 chain_name = self.chain.name
856 );
857 Ok(())
858 }
859
860 async fn connect(&mut self) -> anyhow::Result<()> {
861 tracing::info!(
862 "Connecting blockchain data client for '{}'",
863 self.chain.name
864 );
865
866 if self.process_task.is_none() {
867 self.spawn_process_task();
868 }
869
870 Ok(())
871 }
872
873 async fn disconnect(&mut self) -> anyhow::Result<()> {
874 tracing::info!(
875 "Disconnecting blockchain data client for '{}'",
876 self.chain.name
877 );
878
879 self.cancellation_token.cancel();
880 self.await_process_task_close().await;
881
882 self.cancellation_token = tokio_util::sync::CancellationToken::new();
884 let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
885 self.hypersync_tx = Some(hypersync_tx);
886 self.hypersync_rx = Some(hypersync_rx);
887 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
888 self.command_tx = command_tx;
889 self.command_rx = Some(command_rx);
890
891 Ok(())
892 }
893
894 fn is_connected(&self) -> bool {
895 true
898 }
899
900 fn is_disconnected(&self) -> bool {
901 !self.is_connected()
902 }
903
904 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
905 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd.clone()));
906 self.command_tx.send(command)?;
907 Ok(())
908 }
909
910 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
911 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd.clone()));
912 self.command_tx.send(command)?;
913 Ok(())
914 }
915
916 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
917 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd.clone()));
918 self.command_tx.send(command)?;
919 Ok(())
920 }
921
922 fn subscribe_pool_liquidity_updates(
923 &mut self,
924 cmd: &SubscribePoolLiquidityUpdates,
925 ) -> anyhow::Result<()> {
926 let command =
927 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
928 self.command_tx.send(command)?;
929 Ok(())
930 }
931
932 fn subscribe_pool_fee_collects(
933 &mut self,
934 cmd: &SubscribePoolFeeCollects,
935 ) -> anyhow::Result<()> {
936 let command =
937 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFeeCollects(cmd.clone()));
938 self.command_tx.send(command)?;
939 Ok(())
940 }
941
942 fn subscribe_pool_flash_events(
943 &mut self,
944 cmd: &SubscribePoolFlashEvents,
945 ) -> anyhow::Result<()> {
946 let command =
947 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFlashEvents(cmd.clone()));
948 self.command_tx.send(command)?;
949 Ok(())
950 }
951
952 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
953 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
954 self.command_tx.send(command)?;
955 Ok(())
956 }
957
958 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
959 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
960 self.command_tx.send(command)?;
961 Ok(())
962 }
963
964 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
965 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
966 self.command_tx.send(command)?;
967 Ok(())
968 }
969
970 fn unsubscribe_pool_liquidity_updates(
971 &mut self,
972 cmd: &UnsubscribePoolLiquidityUpdates,
973 ) -> anyhow::Result<()> {
974 let command =
975 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
976 self.command_tx.send(command)?;
977 Ok(())
978 }
979
980 fn unsubscribe_pool_fee_collects(
981 &mut self,
982 cmd: &UnsubscribePoolFeeCollects,
983 ) -> anyhow::Result<()> {
984 let command =
985 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFeeCollects(cmd.clone()));
986 self.command_tx.send(command)?;
987 Ok(())
988 }
989
990 fn unsubscribe_pool_flash_events(
991 &mut self,
992 cmd: &UnsubscribePoolFlashEvents,
993 ) -> anyhow::Result<()> {
994 let command =
995 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFlashEvents(cmd.clone()));
996 self.command_tx.send(command)?;
997 Ok(())
998 }
999
1000 fn request_pool_snapshot(
1001 &self,
1002 cmd: &nautilus_common::messages::defi::RequestPoolSnapshot,
1003 ) -> anyhow::Result<()> {
1004 let command = DefiDataCommand::Request(DefiRequestCommand::PoolSnapshot(cmd.clone()));
1005 self.command_tx.send(command)?;
1006 Ok(())
1007 }
1008}