1use nautilus_common::{
17 defi::RequestPoolSnapshot,
18 live::runtime::get_runtime,
19 messages::{
20 DataEvent,
21 defi::{
22 DefiDataCommand, DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand,
23 SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
24 SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
25 UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
26 UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
27 },
28 },
29};
30use nautilus_data::client::DataClient;
31use nautilus_model::{
32 defi::{DefiData, PoolIdentifier, SharedChain, validation::validate_address},
33 identifiers::{ClientId, Venue},
34};
35use ustr::Ustr;
36
37use crate::{
38 config::BlockchainDataClientConfig,
39 data::core::BlockchainDataClientCore,
40 exchanges::get_dex_extended,
41 rpc::{BlockchainRpcClient, types::BlockchainMessage},
42};
43
44#[derive(Debug)]
54pub struct BlockchainDataClient {
55 pub chain: SharedChain,
57 pub config: BlockchainDataClientConfig,
59 pub core_client: Option<BlockchainDataClientCore>,
62 hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
64 hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
66 command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
68 command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
70 process_task: Option<tokio::task::JoinHandle<()>>,
72 cancellation_token: tokio_util::sync::CancellationToken,
74}
75
76impl BlockchainDataClient {
77 #[must_use]
79 pub fn new(config: BlockchainDataClientConfig) -> Self {
80 let chain = config.chain.clone();
81 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
82 let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
83 Self {
84 chain,
85 core_client: None,
86 config,
87 hypersync_rx: Some(hypersync_rx),
88 hypersync_tx: Some(hypersync_tx),
89 command_tx,
90 command_rx: Some(command_rx),
91 process_task: None,
92 cancellation_token: tokio_util::sync::CancellationToken::new(),
93 }
94 }
95
96 fn spawn_process_task(&mut self) {
104 let command_rx = if let Some(r) = self.command_rx.take() {
105 r
106 } else {
107 tracing::error!("Command receiver already taken, not spawning handler");
108 return;
109 };
110
111 let cancellation_token = self.cancellation_token.clone();
112
113 let data_tx = nautilus_common::live::runner::get_data_event_sender();
114
115 let mut hypersync_rx = self.hypersync_rx.take().unwrap();
116 let hypersync_tx = self.hypersync_tx.take();
117
118 let mut core_client = BlockchainDataClientCore::new(
119 self.config.clone(),
120 hypersync_tx,
121 Some(data_tx),
122 cancellation_token.clone(),
123 );
124
125 let handle = get_runtime().spawn(async move {
126 tracing::debug!("Started task 'process'");
127
128 if let Err(e) = core_client.connect().await {
129 if e.to_string().contains("cancelled") || e.to_string().contains("Sync cancelled") {
132 tracing::warn!("Blockchain core client connection interrupted: {e}");
133 } else {
134 tracing::error!("Failed to connect blockchain core client: {e}");
135 }
136 return;
137 }
138
139 let mut command_rx = command_rx;
140
141 loop {
142 tokio::select! {
143 () = cancellation_token.cancelled() => {
144 tracing::debug!("Received cancellation signal in Blockchain data client process task");
145 core_client.disconnect().await;
146 break;
147 }
148 command = command_rx.recv() => {
149 if let Some(cmd) = command {
150 match cmd {
151 DefiDataCommand::Subscribe(cmd) => {
152 let chain = cmd.blockchain();
153 if chain != core_client.chain.name {
154 tracing::error!("Incorrect blockchain for subscribe command: {chain}");
155 continue;
156 }
157
158 if let Err(e) = Self::handle_subscribe_command(cmd, &mut core_client).await{
159 tracing::error!("Error processing subscribe command: {e}");
160 }
161 }
162 DefiDataCommand::Unsubscribe(cmd) => {
163 let chain = cmd.blockchain();
164 if chain != core_client.chain.name {
165 tracing::error!("Incorrect blockchain for subscribe command: {chain}");
166 continue;
167 }
168
169 if let Err(e) = Self::handle_unsubscribe_command(cmd, &mut core_client).await{
170 tracing::error!("Error processing subscribe command: {e}");
171 }
172 }
173 DefiDataCommand::Request(cmd) => {
174 if let Err(e) = Self::handle_request_command(cmd, &mut core_client).await {
175 tracing::error!("Error processing request command: {e}");
176 }
177 }
178 }
179 } else {
180 tracing::debug!("Command channel closed");
181 break;
182 }
183 }
184 data = hypersync_rx.recv() => {
185 if let Some(msg) = data {
186 let data_event = match msg {
187 BlockchainMessage::Block(block) => {
188 for dex in core_client.cache.get_registered_dexes(){
190 let addresses = core_client.subscription_manager.get_subscribed_dex_contract_addresses(&dex);
191 if !addresses.is_empty() {
192 core_client.hypersync_client.process_block_dex_contract_events(
193 &dex,
194 block.number,
195 addresses,
196 core_client.subscription_manager.get_dex_pool_swap_event_signature(&dex).unwrap(),
197 core_client.subscription_manager.get_dex_pool_mint_event_signature(&dex).unwrap(),
198 core_client.subscription_manager.get_dex_pool_burn_event_signature(&dex).unwrap(),
199 );
200 }
201 }
202
203 Some(DataEvent::DeFi(DefiData::Block(block)))
204 }
205 BlockchainMessage::SwapEvent(swap_event) => {
206 match core_client.get_pool(&swap_event.pool_identifier) {
207 Ok(pool) => {
208 match core_client.process_pool_swap_event(&swap_event, pool){
209 Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
210 Err(e) => {
211 tracing::error!("Error processing pool swap event: {e}");
212 None
213 }
214 }
215 }
216 Err(e) => {
217 tracing::error!("Failed to get pool {} with error {:?}", swap_event.pool_identifier, e);
218 None
219 }
220 }
221 }
222 BlockchainMessage::BurnEvent(burn_event) => {
223 match core_client.get_pool(&burn_event.pool_identifier) {
224 Ok(pool) => {
225 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
226 match core_client.process_pool_burn_event(
227 &burn_event,
228 pool,
229 dex_extended,
230 ){
231 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
232 Err(e) => {
233 tracing::error!("Error processing pool burn event: {e}");
234 None
235 }
236 }
237 }
238 Err(e) => {
239 tracing::error!("Failed to get pool {} with error {:?}", burn_event.pool_identifier, e);
240 None
241 }
242 }
243 }
244 BlockchainMessage::MintEvent(mint_event) => {
245 match core_client.get_pool(&mint_event.pool_identifier) {
246 Ok(pool) => {
247 let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
248 match core_client.process_pool_mint_event(
249 &mint_event,
250 pool,
251 dex_extended,
252 ){
253 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
254 Err(e) => {
255 tracing::error!("Error processing pool mint event: {e}");
256 None
257 }
258 }
259 }
260 Err(e) => {
261 tracing::error!("Failed to get pool {} with error {:?}", mint_event.pool_identifier, e);
262 None
263 }
264 }
265 }
266 BlockchainMessage::CollectEvent(collect_event) => {
267 match core_client.get_pool(&collect_event.pool_identifier) {
268 Ok(pool) => {
269 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
270 match core_client.process_pool_collect_event(
271 &collect_event,
272 pool,
273 dex_extended,
274 ){
275 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolFeeCollect(update))),
276 Err(e) => {
277 tracing::error!("Error processing pool collect event: {e}");
278 None
279 }
280 }
281 }
282 Err(e) => {
283 tracing::error!("Failed to get pool {} with error {:?}", collect_event.pool_identifier, e);
284 None
285 }
286 }
287 }
288 BlockchainMessage::FlashEvent(flash_event) => {
289 match core_client.get_pool(&flash_event.pool_identifier) {
290 Ok(pool) => {
291 match core_client.process_pool_flash_event(&flash_event,pool){
292 Ok(flash) => Some(DataEvent::DeFi(DefiData::PoolFlash(flash))),
293 Err(e) => {
294 tracing::error!("Error processing pool flash event: {e}");
295 None
296 }
297 }
298 }
299 Err(e) => {
300 tracing::error!("Failed to get pool {} with error {:?}", flash_event.pool_identifier, e);
301 None
302 }
303 }
304 }
305 };
306
307 if let Some(event) = data_event {
308 core_client.send_data(event);
309 }
310 } else {
311 tracing::debug!("HyperSync data channel closed");
312 break;
313 }
314 }
315 msg = async {
316 match core_client.rpc_client {
317 Some(ref mut rpc_client) => rpc_client.next_rpc_message().await,
318 None => std::future::pending().await, }
320 } => {
321 match msg {
323 Ok(BlockchainMessage::Block(block)) => {
324 let data = DataEvent::DeFi(DefiData::Block(block));
325 core_client.send_data(data);
326 },
327 Ok(BlockchainMessage::SwapEvent(_)) => {
328 tracing::warn!("RPC swap events are not yet supported");
329 }
330 Ok(BlockchainMessage::MintEvent(_)) => {
331 tracing::warn!("RPC mint events are not yet supported");
332 }
333 Ok(BlockchainMessage::BurnEvent(_)) => {
334 tracing::warn!("RPC burn events are not yet supported");
335 }
336 Ok(BlockchainMessage::CollectEvent(_)) => {
337 tracing::warn!("RPC collect events are not yet supported");
338 }
339 Ok(BlockchainMessage::FlashEvent(_)) => {
340 tracing::warn!("RPC flash events are not yet supported");
341 }
342 Err(e) => {
343 tracing::error!("Error processing RPC message: {e}");
344 }
345 }
346 }
347 }
348 }
349
350 tracing::debug!("Stopped task 'process'");
351 });
352
353 self.process_task = Some(handle);
354 }
355
356 async fn handle_subscribe_command(
358 command: DefiSubscribeCommand,
359 core_client: &mut BlockchainDataClientCore,
360 ) -> anyhow::Result<()> {
361 match command {
362 DefiSubscribeCommand::Blocks(_cmd) => {
363 tracing::info!("Processing subscribe blocks command");
364
365 if let Some(ref mut rpc) = core_client.rpc_client {
367 if let Err(e) = rpc.subscribe_blocks().await {
368 tracing::warn!(
369 "RPC blocks subscription failed: {e}, falling back to HyperSync"
370 );
371 core_client.hypersync_client.subscribe_blocks();
372 tokio::task::yield_now().await;
373 } else {
374 tracing::info!("Successfully subscribed to blocks via RPC");
375 }
376 } else {
377 tracing::info!("Subscribing to blocks via HyperSync");
378 core_client.hypersync_client.subscribe_blocks();
379 tokio::task::yield_now().await;
380 }
381
382 Ok(())
383 }
384 DefiSubscribeCommand::Pool(cmd) => {
385 tracing::info!(
386 "Processing subscribe pool command for {}",
387 cmd.instrument_id
388 );
389
390 if let Some(ref mut _rpc) = core_client.rpc_client {
391 tracing::warn!("RPC pool subscription not yet implemented, using HyperSync");
392 }
393
394 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
395 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
396 .map_err(|e| {
397 anyhow::anyhow!(
398 "Invalid pool address '{}' failed with error: {:?}",
399 cmd.instrument_id,
400 e
401 )
402 })?;
403
404 core_client
406 .subscription_manager
407 .subscribe_swaps(dex, pool_address);
408 core_client
409 .subscription_manager
410 .subscribe_burns(dex, pool_address);
411 core_client
412 .subscription_manager
413 .subscribe_mints(dex, pool_address);
414 core_client
415 .subscription_manager
416 .subscribe_collects(dex, pool_address);
417 core_client
418 .subscription_manager
419 .subscribe_flashes(dex, pool_address);
420
421 tracing::info!(
422 "Subscribed to all pool events for {} at address {}",
423 cmd.instrument_id,
424 pool_address
425 );
426 } else {
427 anyhow::bail!(
428 "Invalid venue {}, expected Blockchain DEX format",
429 cmd.instrument_id.venue
430 )
431 }
432
433 Ok(())
434 }
435 DefiSubscribeCommand::PoolSwaps(cmd) => {
436 tracing::info!(
437 "Processing subscribe pool swaps command for {}",
438 cmd.instrument_id
439 );
440
441 if let Some(ref mut _rpc) = core_client.rpc_client {
442 tracing::warn!(
443 "RPC pool swaps subscription not yet implemented, using HyperSync"
444 );
445 }
446
447 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
448 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
449 .map_err(|e| {
450 anyhow::anyhow!(
451 "Invalid pool swap address '{}' failed with error: {:?}",
452 cmd.instrument_id,
453 e
454 )
455 })?;
456 core_client
457 .subscription_manager
458 .subscribe_swaps(dex, pool_address);
459 } else {
460 anyhow::bail!(
461 "Invalid venue {}, expected Blockchain DEX format",
462 cmd.instrument_id.venue
463 )
464 }
465
466 Ok(())
467 }
468 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
469 tracing::info!(
470 "Processing subscribe pool liquidity updates command for address: {}",
471 cmd.instrument_id
472 );
473
474 if let Some(ref mut _rpc) = core_client.rpc_client {
475 tracing::warn!(
476 "RPC pool liquidity updates subscription not yet implemented, using HyperSync"
477 );
478 }
479
480 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
481 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
482 .map_err(|_| {
483 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
484 })?;
485 core_client
486 .subscription_manager
487 .subscribe_burns(dex, pool_address);
488 core_client
489 .subscription_manager
490 .subscribe_mints(dex, pool_address);
491 } else {
492 anyhow::bail!(
493 "Invalid venue {}, expected Blockchain DEX format",
494 cmd.instrument_id.venue
495 )
496 }
497
498 Ok(())
499 }
500 DefiSubscribeCommand::PoolFeeCollects(cmd) => {
501 tracing::info!(
502 "Processing subscribe pool fee collects command for address: {}",
503 cmd.instrument_id
504 );
505
506 if let Some(ref mut _rpc) = core_client.rpc_client {
507 tracing::warn!(
508 "RPC pool fee collects subscription not yet implemented, using HyperSync"
509 );
510 }
511
512 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
513 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
514 .map_err(|_| {
515 anyhow::anyhow!(
516 "Invalid pool fee collect address: {}",
517 cmd.instrument_id
518 )
519 })?;
520 core_client
521 .subscription_manager
522 .subscribe_collects(dex, pool_address);
523 } else {
524 anyhow::bail!(
525 "Invalid venue {}, expected Blockchain DEX format",
526 cmd.instrument_id.venue
527 )
528 }
529
530 Ok(())
531 }
532 DefiSubscribeCommand::PoolFlashEvents(cmd) => {
533 tracing::info!(
534 "Processing subscribe pool flash command for address: {}",
535 cmd.instrument_id
536 );
537
538 if let Some(ref mut _rpc) = core_client.rpc_client {
539 tracing::warn!(
540 "RPC pool fee collects subscription not yet implemented, using HyperSync"
541 );
542 }
543
544 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
545 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
546 .map_err(|_| {
547 anyhow::anyhow!(
548 "Invalid pool flash subscribe address: {}",
549 cmd.instrument_id
550 )
551 })?;
552 core_client
553 .subscription_manager
554 .subscribe_flashes(dex, pool_address);
555 } else {
556 anyhow::bail!(
557 "Invalid venue {}, expected Blockchain DEX format",
558 cmd.instrument_id.venue
559 )
560 }
561
562 Ok(())
563 }
564 }
565 }
566
567 async fn handle_unsubscribe_command(
569 command: DefiUnsubscribeCommand,
570 core_client: &mut BlockchainDataClientCore,
571 ) -> anyhow::Result<()> {
572 match command {
573 DefiUnsubscribeCommand::Blocks(_cmd) => {
574 tracing::info!("Processing unsubscribe blocks command");
575
576 if core_client.rpc_client.is_some() {
578 tracing::warn!("RPC blocks unsubscription not yet implemented");
579 }
580
581 core_client.hypersync_client.unsubscribe_blocks().await;
583 tracing::info!("Unsubscribed from blocks via HyperSync");
584
585 Ok(())
586 }
587 DefiUnsubscribeCommand::Pool(cmd) => {
588 tracing::info!(
589 "Processing unsubscribe pool command for {}",
590 cmd.instrument_id
591 );
592
593 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
594 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
595 .map_err(|_| {
596 anyhow::anyhow!("Invalid pool address: {}", cmd.instrument_id)
597 })?;
598
599 core_client
601 .subscription_manager
602 .unsubscribe_swaps(dex, pool_address);
603 core_client
604 .subscription_manager
605 .unsubscribe_burns(dex, pool_address);
606 core_client
607 .subscription_manager
608 .unsubscribe_mints(dex, pool_address);
609 core_client
610 .subscription_manager
611 .unsubscribe_collects(dex, pool_address);
612 core_client
613 .subscription_manager
614 .unsubscribe_flashes(dex, pool_address);
615
616 tracing::info!(
617 "Unsubscribed from all pool events for {} at address {}",
618 cmd.instrument_id,
619 pool_address
620 );
621 } else {
622 anyhow::bail!(
623 "Invalid venue {}, expected Blockchain DEX format",
624 cmd.instrument_id.venue
625 )
626 }
627
628 Ok(())
629 }
630 DefiUnsubscribeCommand::PoolSwaps(cmd) => {
631 tracing::info!("Processing unsubscribe pool swaps command");
632
633 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
634 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
635 .map_err(|_| {
636 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
637 })?;
638 core_client
639 .subscription_manager
640 .unsubscribe_swaps(dex, pool_address);
641 } else {
642 anyhow::bail!(
643 "Invalid venue {}, expected Blockchain DEX format",
644 cmd.instrument_id.venue
645 )
646 }
647
648 Ok(())
649 }
650 DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
651 tracing::info!(
652 "Processing unsubscribe pool liquidity updates command for {}",
653 cmd.instrument_id
654 );
655
656 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
657 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
658 .map_err(|_| {
659 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
660 })?;
661 core_client
662 .subscription_manager
663 .unsubscribe_burns(dex, pool_address);
664 core_client
665 .subscription_manager
666 .unsubscribe_mints(dex, pool_address);
667 } else {
668 anyhow::bail!(
669 "Invalid venue {}, expected Blockchain DEX format",
670 cmd.instrument_id.venue
671 )
672 }
673
674 Ok(())
675 }
676 DefiUnsubscribeCommand::PoolFeeCollects(cmd) => {
677 tracing::info!(
678 "Processing unsubscribe pool fee collects command for {}",
679 cmd.instrument_id
680 );
681
682 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
683 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
684 .map_err(|_| {
685 anyhow::anyhow!(
686 "Invalid pool fee collect address: {}",
687 cmd.instrument_id
688 )
689 })?;
690 core_client
691 .subscription_manager
692 .unsubscribe_collects(dex, pool_address);
693 } else {
694 anyhow::bail!(
695 "Invalid venue {}, expected Blockchain DEX format",
696 cmd.instrument_id.venue
697 )
698 }
699
700 Ok(())
701 }
702 DefiUnsubscribeCommand::PoolFlashEvents(cmd) => {
703 tracing::info!(
704 "Processing unsubscribe pool flash command for {}",
705 cmd.instrument_id
706 );
707
708 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
709 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
710 .map_err(|_| {
711 anyhow::anyhow!("Invalid pool flash address: {}", cmd.instrument_id)
712 })?;
713 core_client
714 .subscription_manager
715 .unsubscribe_flashes(dex, pool_address);
716 } else {
717 anyhow::bail!(
718 "Invalid venue {}, expected Blockchain DEX format",
719 cmd.instrument_id.venue
720 )
721 }
722
723 Ok(())
724 }
725 }
726 }
727
728 async fn handle_request_command(
730 command: DefiRequestCommand,
731 core_client: &mut BlockchainDataClientCore,
732 ) -> anyhow::Result<()> {
733 match command {
734 DefiRequestCommand::PoolSnapshot(cmd) => {
735 tracing::info!("Processing pool snapshot request for {}", cmd.instrument_id);
736
737 let pool_address =
738 validate_address(cmd.instrument_id.symbol.as_str()).map_err(|e| {
739 anyhow::anyhow!(
740 "Invalid pool address '{}' failed with error: {:?}",
741 cmd.instrument_id,
742 e
743 )
744 })?;
745
746 let pool_identifier =
747 PoolIdentifier::Address(Ustr::from(&pool_address.to_string()));
748 match core_client.get_pool(&pool_identifier) {
749 Ok(pool) => {
750 let pool = pool.clone();
751 tracing::debug!("Found pool for snapshot request: {}", cmd.instrument_id);
752
753 let pool_data = DataEvent::DeFi(DefiData::Pool(pool.as_ref().clone()));
755 core_client.send_data(pool_data);
756
757 match core_client.bootstrap_latest_pool_profiler(&pool).await {
758 Ok((profiler, already_valid)) => {
759 let snapshot = profiler.extract_snapshot();
760
761 tracing::info!(
762 "Saving pool snapshot with {} positions and {} ticks to database...",
763 snapshot.positions.len(),
764 snapshot.ticks.len()
765 );
766 core_client
767 .cache
768 .add_pool_snapshot(
769 &pool.dex.name,
770 &pool.pool_identifier,
771 &snapshot,
772 )
773 .await?;
774
775 if core_client
777 .check_snapshot_validity(&profiler, already_valid)
778 .await?
779 {
780 let snapshot_data =
781 DataEvent::DeFi(DefiData::PoolSnapshot(snapshot));
782 core_client.send_data(snapshot_data);
783 }
784 }
785 Err(e) => tracing::error!(
786 "Failed to bootstrap pool profiler for {} and extract snapshot with error {}",
787 cmd.instrument_id,
788 e.to_string()
789 ),
790 }
791 }
792 Err(e) => {
793 tracing::warn!("Pool {} not found in cache: {e}", cmd.instrument_id);
794 }
795 }
796
797 Ok(())
798 }
799 }
800 }
801
802 pub async fn await_process_task_close(&mut self) {
807 if let Some(handle) = self.process_task.take()
808 && let Err(e) = handle.await
809 {
810 tracing::error!("Process task join error: {e}");
811 }
812 }
813}
814
815#[async_trait::async_trait(?Send)]
816impl DataClient for BlockchainDataClient {
817 fn client_id(&self) -> ClientId {
818 ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
819 }
820
821 fn venue(&self) -> Option<Venue> {
822 None
825 }
826
827 fn start(&mut self) -> anyhow::Result<()> {
828 tracing::info!(
829 chain_name = %self.chain.name,
830 dex_ids = ?self.config.dex_ids,
831 use_hypersync_for_live_data = self.config.use_hypersync_for_live_data,
832 http_proxy_url = ?self.config.http_proxy_url,
833 ws_proxy_url = ?self.config.ws_proxy_url,
834 "Starting blockchain data client"
835 );
836 Ok(())
837 }
838
839 fn stop(&mut self) -> anyhow::Result<()> {
840 tracing::info!(
841 "Stopping blockchain data client for '{chain_name}'",
842 chain_name = self.chain.name
843 );
844 self.cancellation_token.cancel();
845
846 self.cancellation_token = tokio_util::sync::CancellationToken::new();
848 Ok(())
849 }
850
851 fn reset(&mut self) -> anyhow::Result<()> {
852 tracing::info!(
853 "Resetting blockchain data client for '{chain_name}'",
854 chain_name = self.chain.name
855 );
856 self.cancellation_token = tokio_util::sync::CancellationToken::new();
857 Ok(())
858 }
859
860 fn dispose(&mut self) -> anyhow::Result<()> {
861 tracing::info!(
862 "Disposing blockchain data client for '{chain_name}'",
863 chain_name = self.chain.name
864 );
865 Ok(())
866 }
867
868 async fn connect(&mut self) -> anyhow::Result<()> {
869 tracing::info!(
870 "Connecting blockchain data client for '{}'",
871 self.chain.name
872 );
873
874 if self.process_task.is_none() {
875 self.spawn_process_task();
876 }
877
878 Ok(())
879 }
880
881 async fn disconnect(&mut self) -> anyhow::Result<()> {
882 tracing::info!(
883 "Disconnecting blockchain data client for '{}'",
884 self.chain.name
885 );
886
887 self.cancellation_token.cancel();
888 self.await_process_task_close().await;
889
890 self.cancellation_token = tokio_util::sync::CancellationToken::new();
892 let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
893 self.hypersync_tx = Some(hypersync_tx);
894 self.hypersync_rx = Some(hypersync_rx);
895 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
896 self.command_tx = command_tx;
897 self.command_rx = Some(command_rx);
898
899 Ok(())
900 }
901
902 fn is_connected(&self) -> bool {
903 true
906 }
907
908 fn is_disconnected(&self) -> bool {
909 !self.is_connected()
910 }
911
912 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
913 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd.clone()));
914 self.command_tx.send(command)?;
915 Ok(())
916 }
917
918 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
919 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd.clone()));
920 self.command_tx.send(command)?;
921 Ok(())
922 }
923
924 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
925 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd.clone()));
926 self.command_tx.send(command)?;
927 Ok(())
928 }
929
930 fn subscribe_pool_liquidity_updates(
931 &mut self,
932 cmd: &SubscribePoolLiquidityUpdates,
933 ) -> anyhow::Result<()> {
934 let command =
935 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
936 self.command_tx.send(command)?;
937 Ok(())
938 }
939
940 fn subscribe_pool_fee_collects(
941 &mut self,
942 cmd: &SubscribePoolFeeCollects,
943 ) -> anyhow::Result<()> {
944 let command =
945 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFeeCollects(cmd.clone()));
946 self.command_tx.send(command)?;
947 Ok(())
948 }
949
950 fn subscribe_pool_flash_events(
951 &mut self,
952 cmd: &SubscribePoolFlashEvents,
953 ) -> anyhow::Result<()> {
954 let command =
955 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFlashEvents(cmd.clone()));
956 self.command_tx.send(command)?;
957 Ok(())
958 }
959
960 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
961 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
962 self.command_tx.send(command)?;
963 Ok(())
964 }
965
966 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
967 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
968 self.command_tx.send(command)?;
969 Ok(())
970 }
971
972 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
973 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
974 self.command_tx.send(command)?;
975 Ok(())
976 }
977
978 fn unsubscribe_pool_liquidity_updates(
979 &mut self,
980 cmd: &UnsubscribePoolLiquidityUpdates,
981 ) -> anyhow::Result<()> {
982 let command =
983 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
984 self.command_tx.send(command)?;
985 Ok(())
986 }
987
988 fn unsubscribe_pool_fee_collects(
989 &mut self,
990 cmd: &UnsubscribePoolFeeCollects,
991 ) -> anyhow::Result<()> {
992 let command =
993 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFeeCollects(cmd.clone()));
994 self.command_tx.send(command)?;
995 Ok(())
996 }
997
998 fn unsubscribe_pool_flash_events(
999 &mut self,
1000 cmd: &UnsubscribePoolFlashEvents,
1001 ) -> anyhow::Result<()> {
1002 let command =
1003 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFlashEvents(cmd.clone()));
1004 self.command_tx.send(command)?;
1005 Ok(())
1006 }
1007
1008 fn request_pool_snapshot(&self, cmd: &RequestPoolSnapshot) -> anyhow::Result<()> {
1009 let command = DefiDataCommand::Request(DefiRequestCommand::PoolSnapshot(cmd.clone()));
1010 self.command_tx.send(command)?;
1011 Ok(())
1012 }
1013}