1use nautilus_common::{
17 clients::DataClient,
18 defi::RequestPoolSnapshot,
19 live::get_runtime,
20 messages::{
21 DataEvent,
22 defi::{
23 DefiDataCommand, DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand,
24 SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
25 SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
26 UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
27 UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
28 },
29 },
30};
31use nautilus_model::{
32 defi::{DefiData, PoolIdentifier, SharedChain, validation::validate_address},
33 identifiers::{ClientId, Venue},
34};
35use ustr::Ustr;
36
37use crate::{
38 config::BlockchainDataClientConfig,
39 data::core::BlockchainDataClientCore,
40 exchanges::get_dex_extended,
41 rpc::{BlockchainRpcClient, types::BlockchainMessage},
42};
43
44#[derive(Debug)]
54pub struct BlockchainDataClient {
55 pub chain: SharedChain,
57 pub config: BlockchainDataClientConfig,
59 pub core_client: Option<BlockchainDataClientCore>,
62 hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
64 hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
66 command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
68 command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
70 process_task: Option<tokio::task::JoinHandle<()>>,
72 cancellation_token: tokio_util::sync::CancellationToken,
74}
75
76impl BlockchainDataClient {
77 #[must_use]
79 pub fn new(config: BlockchainDataClientConfig) -> Self {
80 let chain = config.chain.clone();
81 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
82 let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
83 Self {
84 chain,
85 core_client: None,
86 config,
87 hypersync_rx: Some(hypersync_rx),
88 hypersync_tx: Some(hypersync_tx),
89 command_tx,
90 command_rx: Some(command_rx),
91 process_task: None,
92 cancellation_token: tokio_util::sync::CancellationToken::new(),
93 }
94 }
95
96 fn spawn_process_task(&mut self) {
104 let command_rx = if let Some(r) = self.command_rx.take() {
105 r
106 } else {
107 log::error!("Command receiver already taken, not spawning handler");
108 return;
109 };
110
111 let cancellation_token = self.cancellation_token.clone();
112
113 let data_tx = nautilus_common::live::runner::get_data_event_sender();
114
115 let mut hypersync_rx = self.hypersync_rx.take().unwrap();
116 let hypersync_tx = self.hypersync_tx.take();
117
118 let mut core_client = BlockchainDataClientCore::new(
119 self.config.clone(),
120 hypersync_tx,
121 Some(data_tx),
122 cancellation_token.clone(),
123 );
124
125 let handle = get_runtime().spawn(async move {
126 log::debug!("Started task 'process'");
127
128 if let Err(e) = core_client.connect().await {
129 if e.to_string().contains("cancelled") || e.to_string().contains("Sync cancelled") {
132 log::warn!("Blockchain core client connection interrupted: {e}");
133 } else {
134 log::error!("Failed to connect blockchain core client: {e}");
135 }
136 return;
137 }
138
139 let mut command_rx = command_rx;
140
141 loop {
142 tokio::select! {
143 () = cancellation_token.cancelled() => {
144 log::debug!("Received cancellation signal in Blockchain data client process task");
145 core_client.disconnect().await;
146 break;
147 }
148 command = command_rx.recv() => {
149 if let Some(cmd) = command {
150 match cmd {
151 DefiDataCommand::Subscribe(cmd) => {
152 let chain = cmd.blockchain();
153 if chain != core_client.chain.name {
154 log::error!("Incorrect blockchain for subscribe command: {chain}");
155 continue;
156 }
157
158 if let Err(e) = Self::handle_subscribe_command(cmd, &mut core_client).await{
159 log::error!("Error processing subscribe command: {e}");
160 }
161 }
162 DefiDataCommand::Unsubscribe(cmd) => {
163 let chain = cmd.blockchain();
164 if chain != core_client.chain.name {
165 log::error!("Incorrect blockchain for subscribe command: {chain}");
166 continue;
167 }
168
169 if let Err(e) = Self::handle_unsubscribe_command(cmd, &mut core_client).await{
170 log::error!("Error processing subscribe command: {e}");
171 }
172 }
173 DefiDataCommand::Request(cmd) => {
174 if let Err(e) = Self::handle_request_command(cmd, &mut core_client).await {
175 log::error!("Error processing request command: {e}");
176 }
177 }
178 }
179 } else {
180 log::debug!("Command channel closed");
181 break;
182 }
183 }
184 data = hypersync_rx.recv() => {
185 if let Some(msg) = data {
186 let data_event = match msg {
187 BlockchainMessage::Block(block) => {
188 for dex in core_client.cache.get_registered_dexes(){
190 let addresses = core_client.subscription_manager.get_subscribed_dex_contract_addresses(&dex);
191 if !addresses.is_empty() {
192 core_client.hypersync_client.process_block_dex_contract_events(
193 &dex,
194 block.number,
195 addresses,
196 core_client.subscription_manager.get_dex_pool_swap_event_signature(&dex).unwrap(),
197 core_client.subscription_manager.get_dex_pool_mint_event_signature(&dex).unwrap(),
198 core_client.subscription_manager.get_dex_pool_burn_event_signature(&dex).unwrap(),
199 );
200 }
201 }
202
203 Some(DataEvent::DeFi(DefiData::Block(block)))
204 }
205 BlockchainMessage::SwapEvent(swap_event) => {
206 match core_client.get_pool(&swap_event.pool_identifier) {
207 Ok(pool) => {
208 match core_client.process_pool_swap_event(&swap_event, pool){
209 Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
210 Err(e) => {
211 log::error!("Error processing pool swap event: {e}");
212 None
213 }
214 }
215 }
216 Err(e) => {
217 log::error!("Failed to get pool {} with error {:?}", swap_event.pool_identifier, e);
218 None
219 }
220 }
221 }
222 BlockchainMessage::BurnEvent(burn_event) => {
223 match core_client.get_pool(&burn_event.pool_identifier) {
224 Ok(pool) => {
225 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
226 match core_client.process_pool_burn_event(
227 &burn_event,
228 pool,
229 dex_extended,
230 ){
231 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
232 Err(e) => {
233 log::error!("Error processing pool burn event: {e}");
234 None
235 }
236 }
237 }
238 Err(e) => {
239 log::error!("Failed to get pool {} with error {:?}", burn_event.pool_identifier, e);
240 None
241 }
242 }
243 }
244 BlockchainMessage::MintEvent(mint_event) => {
245 match core_client.get_pool(&mint_event.pool_identifier) {
246 Ok(pool) => {
247 let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
248 match core_client.process_pool_mint_event(
249 &mint_event,
250 pool,
251 dex_extended,
252 ){
253 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
254 Err(e) => {
255 log::error!("Error processing pool mint event: {e}");
256 None
257 }
258 }
259 }
260 Err(e) => {
261 log::error!("Failed to get pool {} with error {:?}", mint_event.pool_identifier, e);
262 None
263 }
264 }
265 }
266 BlockchainMessage::CollectEvent(collect_event) => {
267 match core_client.get_pool(&collect_event.pool_identifier) {
268 Ok(pool) => {
269 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
270 match core_client.process_pool_collect_event(
271 &collect_event,
272 pool,
273 dex_extended,
274 ){
275 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolFeeCollect(update))),
276 Err(e) => {
277 log::error!("Error processing pool collect event: {e}");
278 None
279 }
280 }
281 }
282 Err(e) => {
283 log::error!("Failed to get pool {} with error {:?}", collect_event.pool_identifier, e);
284 None
285 }
286 }
287 }
288 BlockchainMessage::FlashEvent(flash_event) => {
289 match core_client.get_pool(&flash_event.pool_identifier) {
290 Ok(pool) => {
291 match core_client.process_pool_flash_event(&flash_event,pool){
292 Ok(flash) => Some(DataEvent::DeFi(DefiData::PoolFlash(flash))),
293 Err(e) => {
294 log::error!("Error processing pool flash event: {e}");
295 None
296 }
297 }
298 }
299 Err(e) => {
300 log::error!("Failed to get pool {} with error {:?}", flash_event.pool_identifier, e);
301 None
302 }
303 }
304 }
305 };
306
307 if let Some(event) = data_event {
308 core_client.send_data(event);
309 }
310 } else {
311 log::debug!("HyperSync data channel closed");
312 break;
313 }
314 }
315 msg = async {
316 match core_client.rpc_client {
317 Some(ref mut rpc_client) => rpc_client.next_rpc_message().await,
318 None => std::future::pending().await, }
320 } => {
321 match msg {
323 Ok(BlockchainMessage::Block(block)) => {
324 let data = DataEvent::DeFi(DefiData::Block(block));
325 core_client.send_data(data);
326 },
327 Ok(BlockchainMessage::SwapEvent(_)) => {
328 log::warn!("RPC swap events are not yet supported");
329 }
330 Ok(BlockchainMessage::MintEvent(_)) => {
331 log::warn!("RPC mint events are not yet supported");
332 }
333 Ok(BlockchainMessage::BurnEvent(_)) => {
334 log::warn!("RPC burn events are not yet supported");
335 }
336 Ok(BlockchainMessage::CollectEvent(_)) => {
337 log::warn!("RPC collect events are not yet supported");
338 }
339 Ok(BlockchainMessage::FlashEvent(_)) => {
340 log::warn!("RPC flash events are not yet supported");
341 }
342 Err(e) => {
343 log::error!("Error processing RPC message: {e}");
344 }
345 }
346 }
347 }
348 }
349
350 log::debug!("Stopped task 'process'");
351 });
352
353 self.process_task = Some(handle);
354 }
355
356 async fn handle_subscribe_command(
358 command: DefiSubscribeCommand,
359 core_client: &mut BlockchainDataClientCore,
360 ) -> anyhow::Result<()> {
361 match command {
362 DefiSubscribeCommand::Blocks(_cmd) => {
363 log::info!("Processing subscribe blocks command");
364
365 if let Some(ref mut rpc) = core_client.rpc_client {
367 if let Err(e) = rpc.subscribe_blocks().await {
368 log::warn!(
369 "RPC blocks subscription failed: {e}, falling back to HyperSync"
370 );
371 core_client.hypersync_client.subscribe_blocks();
372 tokio::task::yield_now().await;
373 } else {
374 log::info!("Successfully subscribed to blocks via RPC");
375 }
376 } else {
377 log::info!("Subscribing to blocks via HyperSync");
378 core_client.hypersync_client.subscribe_blocks();
379 tokio::task::yield_now().await;
380 }
381
382 Ok(())
383 }
384 DefiSubscribeCommand::Pool(cmd) => {
385 log::info!(
386 "Processing subscribe pool command for {}",
387 cmd.instrument_id
388 );
389
390 if let Some(ref mut _rpc) = core_client.rpc_client {
391 log::warn!("RPC pool subscription not yet implemented, using HyperSync");
392 }
393
394 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
395 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
396 .map_err(|e| {
397 anyhow::anyhow!(
398 "Invalid pool address '{}' failed with error: {:?}",
399 cmd.instrument_id,
400 e
401 )
402 })?;
403
404 core_client
406 .subscription_manager
407 .subscribe_swaps(dex, pool_address);
408 core_client
409 .subscription_manager
410 .subscribe_burns(dex, pool_address);
411 core_client
412 .subscription_manager
413 .subscribe_mints(dex, pool_address);
414 core_client
415 .subscription_manager
416 .subscribe_collects(dex, pool_address);
417 core_client
418 .subscription_manager
419 .subscribe_flashes(dex, pool_address);
420
421 log::info!(
422 "Subscribed to all pool events for {} at address {}",
423 cmd.instrument_id,
424 pool_address
425 );
426 } else {
427 anyhow::bail!(
428 "Invalid venue {}, expected Blockchain DEX format",
429 cmd.instrument_id.venue
430 )
431 }
432
433 Ok(())
434 }
435 DefiSubscribeCommand::PoolSwaps(cmd) => {
436 log::info!(
437 "Processing subscribe pool swaps command for {}",
438 cmd.instrument_id
439 );
440
441 if let Some(ref mut _rpc) = core_client.rpc_client {
442 log::warn!("RPC pool swaps subscription not yet implemented, using HyperSync");
443 }
444
445 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
446 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
447 .map_err(|e| {
448 anyhow::anyhow!(
449 "Invalid pool swap address '{}' failed with error: {:?}",
450 cmd.instrument_id,
451 e
452 )
453 })?;
454 core_client
455 .subscription_manager
456 .subscribe_swaps(dex, pool_address);
457 } else {
458 anyhow::bail!(
459 "Invalid venue {}, expected Blockchain DEX format",
460 cmd.instrument_id.venue
461 )
462 }
463
464 Ok(())
465 }
466 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
467 log::info!(
468 "Processing subscribe pool liquidity updates command for address: {}",
469 cmd.instrument_id
470 );
471
472 if let Some(ref mut _rpc) = core_client.rpc_client {
473 log::warn!(
474 "RPC pool liquidity updates subscription not yet implemented, using HyperSync"
475 );
476 }
477
478 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
479 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
480 .map_err(|_| {
481 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
482 })?;
483 core_client
484 .subscription_manager
485 .subscribe_burns(dex, pool_address);
486 core_client
487 .subscription_manager
488 .subscribe_mints(dex, pool_address);
489 } else {
490 anyhow::bail!(
491 "Invalid venue {}, expected Blockchain DEX format",
492 cmd.instrument_id.venue
493 )
494 }
495
496 Ok(())
497 }
498 DefiSubscribeCommand::PoolFeeCollects(cmd) => {
499 log::info!(
500 "Processing subscribe pool fee collects command for address: {}",
501 cmd.instrument_id
502 );
503
504 if let Some(ref mut _rpc) = core_client.rpc_client {
505 log::warn!(
506 "RPC pool fee collects subscription not yet implemented, using HyperSync"
507 );
508 }
509
510 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
511 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
512 .map_err(|_| {
513 anyhow::anyhow!(
514 "Invalid pool fee collect address: {}",
515 cmd.instrument_id
516 )
517 })?;
518 core_client
519 .subscription_manager
520 .subscribe_collects(dex, pool_address);
521 } else {
522 anyhow::bail!(
523 "Invalid venue {}, expected Blockchain DEX format",
524 cmd.instrument_id.venue
525 )
526 }
527
528 Ok(())
529 }
530 DefiSubscribeCommand::PoolFlashEvents(cmd) => {
531 log::info!(
532 "Processing subscribe pool flash command for address: {}",
533 cmd.instrument_id
534 );
535
536 if let Some(ref mut _rpc) = core_client.rpc_client {
537 log::warn!(
538 "RPC pool fee collects subscription not yet implemented, using HyperSync"
539 );
540 }
541
542 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
543 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
544 .map_err(|_| {
545 anyhow::anyhow!(
546 "Invalid pool flash subscribe address: {}",
547 cmd.instrument_id
548 )
549 })?;
550 core_client
551 .subscription_manager
552 .subscribe_flashes(dex, pool_address);
553 } else {
554 anyhow::bail!(
555 "Invalid venue {}, expected Blockchain DEX format",
556 cmd.instrument_id.venue
557 )
558 }
559
560 Ok(())
561 }
562 }
563 }
564
565 async fn handle_unsubscribe_command(
567 command: DefiUnsubscribeCommand,
568 core_client: &mut BlockchainDataClientCore,
569 ) -> anyhow::Result<()> {
570 match command {
571 DefiUnsubscribeCommand::Blocks(_cmd) => {
572 log::info!("Processing unsubscribe blocks command");
573
574 if core_client.rpc_client.is_some() {
576 log::warn!("RPC blocks unsubscription not yet implemented");
577 }
578
579 core_client.hypersync_client.unsubscribe_blocks().await;
581 log::info!("Unsubscribed from blocks via HyperSync");
582
583 Ok(())
584 }
585 DefiUnsubscribeCommand::Pool(cmd) => {
586 log::info!(
587 "Processing unsubscribe pool command for {}",
588 cmd.instrument_id
589 );
590
591 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
592 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
593 .map_err(|_| {
594 anyhow::anyhow!("Invalid pool address: {}", cmd.instrument_id)
595 })?;
596
597 core_client
599 .subscription_manager
600 .unsubscribe_swaps(dex, pool_address);
601 core_client
602 .subscription_manager
603 .unsubscribe_burns(dex, pool_address);
604 core_client
605 .subscription_manager
606 .unsubscribe_mints(dex, pool_address);
607 core_client
608 .subscription_manager
609 .unsubscribe_collects(dex, pool_address);
610 core_client
611 .subscription_manager
612 .unsubscribe_flashes(dex, pool_address);
613
614 log::info!(
615 "Unsubscribed from all pool events for {} at address {}",
616 cmd.instrument_id,
617 pool_address
618 );
619 } else {
620 anyhow::bail!(
621 "Invalid venue {}, expected Blockchain DEX format",
622 cmd.instrument_id.venue
623 )
624 }
625
626 Ok(())
627 }
628 DefiUnsubscribeCommand::PoolSwaps(cmd) => {
629 log::info!("Processing unsubscribe pool swaps command");
630
631 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
632 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
633 .map_err(|_| {
634 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
635 })?;
636 core_client
637 .subscription_manager
638 .unsubscribe_swaps(dex, pool_address);
639 } else {
640 anyhow::bail!(
641 "Invalid venue {}, expected Blockchain DEX format",
642 cmd.instrument_id.venue
643 )
644 }
645
646 Ok(())
647 }
648 DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
649 log::info!(
650 "Processing unsubscribe pool liquidity updates command for {}",
651 cmd.instrument_id
652 );
653
654 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
655 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
656 .map_err(|_| {
657 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
658 })?;
659 core_client
660 .subscription_manager
661 .unsubscribe_burns(dex, pool_address);
662 core_client
663 .subscription_manager
664 .unsubscribe_mints(dex, pool_address);
665 } else {
666 anyhow::bail!(
667 "Invalid venue {}, expected Blockchain DEX format",
668 cmd.instrument_id.venue
669 )
670 }
671
672 Ok(())
673 }
674 DefiUnsubscribeCommand::PoolFeeCollects(cmd) => {
675 log::info!(
676 "Processing unsubscribe pool fee collects command for {}",
677 cmd.instrument_id
678 );
679
680 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
681 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
682 .map_err(|_| {
683 anyhow::anyhow!(
684 "Invalid pool fee collect address: {}",
685 cmd.instrument_id
686 )
687 })?;
688 core_client
689 .subscription_manager
690 .unsubscribe_collects(dex, pool_address);
691 } else {
692 anyhow::bail!(
693 "Invalid venue {}, expected Blockchain DEX format",
694 cmd.instrument_id.venue
695 )
696 }
697
698 Ok(())
699 }
700 DefiUnsubscribeCommand::PoolFlashEvents(cmd) => {
701 log::info!(
702 "Processing unsubscribe pool flash command for {}",
703 cmd.instrument_id
704 );
705
706 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
707 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
708 .map_err(|_| {
709 anyhow::anyhow!("Invalid pool flash address: {}", cmd.instrument_id)
710 })?;
711 core_client
712 .subscription_manager
713 .unsubscribe_flashes(dex, pool_address);
714 } else {
715 anyhow::bail!(
716 "Invalid venue {}, expected Blockchain DEX format",
717 cmd.instrument_id.venue
718 )
719 }
720
721 Ok(())
722 }
723 }
724 }
725
726 async fn handle_request_command(
728 command: DefiRequestCommand,
729 core_client: &mut BlockchainDataClientCore,
730 ) -> anyhow::Result<()> {
731 match command {
732 DefiRequestCommand::PoolSnapshot(cmd) => {
733 log::info!("Processing pool snapshot request for {}", cmd.instrument_id);
734
735 let pool_address =
736 validate_address(cmd.instrument_id.symbol.as_str()).map_err(|e| {
737 anyhow::anyhow!(
738 "Invalid pool address '{}' failed with error: {:?}",
739 cmd.instrument_id,
740 e
741 )
742 })?;
743
744 let pool_identifier =
745 PoolIdentifier::Address(Ustr::from(&pool_address.to_string()));
746 match core_client.get_pool(&pool_identifier) {
747 Ok(pool) => {
748 let pool = pool.clone();
749 log::debug!("Found pool for snapshot request: {}", cmd.instrument_id);
750
751 let pool_data = DataEvent::DeFi(DefiData::Pool(pool.as_ref().clone()));
753 core_client.send_data(pool_data);
754
755 match core_client.bootstrap_latest_pool_profiler(&pool).await {
756 Ok((profiler, already_valid)) => {
757 let snapshot = profiler.extract_snapshot();
758
759 log::info!(
760 "Saving pool snapshot with {} positions and {} ticks to database...",
761 snapshot.positions.len(),
762 snapshot.ticks.len()
763 );
764 core_client
765 .cache
766 .add_pool_snapshot(
767 &pool.dex.name,
768 &pool.pool_identifier,
769 &snapshot,
770 )
771 .await?;
772
773 if core_client
775 .check_snapshot_validity(&profiler, already_valid)
776 .await?
777 {
778 let snapshot_data =
779 DataEvent::DeFi(DefiData::PoolSnapshot(snapshot));
780 core_client.send_data(snapshot_data);
781 }
782 }
783 Err(e) => log::error!(
784 "Failed to bootstrap pool profiler for {} and extract snapshot with error {e}",
785 cmd.instrument_id
786 ),
787 }
788 }
789 Err(e) => {
790 log::warn!("Pool {} not found in cache: {e}", cmd.instrument_id);
791 }
792 }
793
794 Ok(())
795 }
796 }
797 }
798
799 pub async fn await_process_task_close(&mut self) {
804 if let Some(handle) = self.process_task.take()
805 && let Err(e) = handle.await
806 {
807 log::error!("Process task join error: {e}");
808 }
809 }
810}
811
812#[async_trait::async_trait(?Send)]
813impl DataClient for BlockchainDataClient {
814 fn client_id(&self) -> ClientId {
815 ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
816 }
817
818 fn venue(&self) -> Option<Venue> {
819 None
822 }
823
824 fn start(&mut self) -> anyhow::Result<()> {
825 log::info!(
826 "Starting blockchain data client: chain_name={}, dex_ids={:?}, use_hypersync_for_live_data={}, http_proxy_url={:?}, ws_proxy_url={:?}",
827 self.chain.name,
828 self.config.dex_ids,
829 self.config.use_hypersync_for_live_data,
830 self.config.http_proxy_url,
831 self.config.ws_proxy_url
832 );
833 Ok(())
834 }
835
836 fn stop(&mut self) -> anyhow::Result<()> {
837 log::info!(
838 "Stopping blockchain data client for '{chain_name}'",
839 chain_name = self.chain.name
840 );
841 self.cancellation_token.cancel();
842
843 self.cancellation_token = tokio_util::sync::CancellationToken::new();
845 Ok(())
846 }
847
848 fn reset(&mut self) -> anyhow::Result<()> {
849 log::info!(
850 "Resetting blockchain data client for '{chain_name}'",
851 chain_name = self.chain.name
852 );
853 self.cancellation_token = tokio_util::sync::CancellationToken::new();
854 Ok(())
855 }
856
857 fn dispose(&mut self) -> anyhow::Result<()> {
858 log::info!(
859 "Disposing blockchain data client for '{chain_name}'",
860 chain_name = self.chain.name
861 );
862 Ok(())
863 }
864
865 async fn connect(&mut self) -> anyhow::Result<()> {
866 log::info!(
867 "Connecting blockchain data client for '{}'",
868 self.chain.name
869 );
870
871 if self.process_task.is_none() {
872 self.spawn_process_task();
873 }
874
875 Ok(())
876 }
877
878 async fn disconnect(&mut self) -> anyhow::Result<()> {
879 log::info!(
880 "Disconnecting blockchain data client for '{}'",
881 self.chain.name
882 );
883
884 self.cancellation_token.cancel();
885 self.await_process_task_close().await;
886
887 self.cancellation_token = tokio_util::sync::CancellationToken::new();
889 let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
890 self.hypersync_tx = Some(hypersync_tx);
891 self.hypersync_rx = Some(hypersync_rx);
892 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
893 self.command_tx = command_tx;
894 self.command_rx = Some(command_rx);
895
896 Ok(())
897 }
898
899 fn is_connected(&self) -> bool {
900 true
903 }
904
905 fn is_disconnected(&self) -> bool {
906 !self.is_connected()
907 }
908
909 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
910 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd.clone()));
911 self.command_tx.send(command)?;
912 Ok(())
913 }
914
915 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
916 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd.clone()));
917 self.command_tx.send(command)?;
918 Ok(())
919 }
920
921 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
922 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd.clone()));
923 self.command_tx.send(command)?;
924 Ok(())
925 }
926
927 fn subscribe_pool_liquidity_updates(
928 &mut self,
929 cmd: &SubscribePoolLiquidityUpdates,
930 ) -> anyhow::Result<()> {
931 let command =
932 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
933 self.command_tx.send(command)?;
934 Ok(())
935 }
936
937 fn subscribe_pool_fee_collects(
938 &mut self,
939 cmd: &SubscribePoolFeeCollects,
940 ) -> anyhow::Result<()> {
941 let command =
942 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFeeCollects(cmd.clone()));
943 self.command_tx.send(command)?;
944 Ok(())
945 }
946
947 fn subscribe_pool_flash_events(
948 &mut self,
949 cmd: &SubscribePoolFlashEvents,
950 ) -> anyhow::Result<()> {
951 let command =
952 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFlashEvents(cmd.clone()));
953 self.command_tx.send(command)?;
954 Ok(())
955 }
956
957 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
958 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
959 self.command_tx.send(command)?;
960 Ok(())
961 }
962
963 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
964 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
965 self.command_tx.send(command)?;
966 Ok(())
967 }
968
969 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
970 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
971 self.command_tx.send(command)?;
972 Ok(())
973 }
974
975 fn unsubscribe_pool_liquidity_updates(
976 &mut self,
977 cmd: &UnsubscribePoolLiquidityUpdates,
978 ) -> anyhow::Result<()> {
979 let command =
980 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
981 self.command_tx.send(command)?;
982 Ok(())
983 }
984
985 fn unsubscribe_pool_fee_collects(
986 &mut self,
987 cmd: &UnsubscribePoolFeeCollects,
988 ) -> anyhow::Result<()> {
989 let command =
990 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFeeCollects(cmd.clone()));
991 self.command_tx.send(command)?;
992 Ok(())
993 }
994
995 fn unsubscribe_pool_flash_events(
996 &mut self,
997 cmd: &UnsubscribePoolFlashEvents,
998 ) -> anyhow::Result<()> {
999 let command =
1000 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFlashEvents(cmd.clone()));
1001 self.command_tx.send(command)?;
1002 Ok(())
1003 }
1004
1005 fn request_pool_snapshot(&self, cmd: &RequestPoolSnapshot) -> anyhow::Result<()> {
1006 let command = DefiDataCommand::Request(DefiRequestCommand::PoolSnapshot(cmd.clone()));
1007 self.command_tx.send(command)?;
1008 Ok(())
1009 }
1010}