1use nautilus_common::{
17 messages::{
18 DataEvent,
19 defi::{
20 DefiDataCommand, DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand,
21 SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
22 SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
23 UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
24 UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
25 },
26 },
27 runtime::get_runtime,
28};
29use nautilus_data::client::DataClient;
30use nautilus_model::{
31 defi::{DefiData, SharedChain, validation::validate_address},
32 identifiers::{ClientId, Venue},
33};
34
35use crate::{
36 config::BlockchainDataClientConfig,
37 data::core::BlockchainDataClientCore,
38 exchanges::get_dex_extended,
39 rpc::{BlockchainRpcClient, types::BlockchainMessage},
40};
41
42#[derive(Debug)]
52pub struct BlockchainDataClient {
53 pub chain: SharedChain,
55 pub config: BlockchainDataClientConfig,
57 pub core_client: Option<BlockchainDataClientCore>,
60 hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
62 hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
64 command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
66 command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
68 process_task: Option<tokio::task::JoinHandle<()>>,
70 cancellation_token: tokio_util::sync::CancellationToken,
72}
73
74impl BlockchainDataClient {
75 #[must_use]
77 pub fn new(config: BlockchainDataClientConfig) -> Self {
78 let chain = config.chain.clone();
79 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
80 let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
81 Self {
82 chain,
83 core_client: None,
84 config,
85 hypersync_rx: Some(hypersync_rx),
86 hypersync_tx: Some(hypersync_tx),
87 command_tx,
88 command_rx: Some(command_rx),
89 process_task: None,
90 cancellation_token: tokio_util::sync::CancellationToken::new(),
91 }
92 }
93
94 fn spawn_process_task(&mut self) {
102 let command_rx = if let Some(r) = self.command_rx.take() {
103 r
104 } else {
105 tracing::error!("Command receiver already taken, not spawning handler");
106 return;
107 };
108
109 let cancellation_token = self.cancellation_token.clone();
110
111 let data_tx = nautilus_common::runner::get_data_event_sender();
112
113 let mut hypersync_rx = self.hypersync_rx.take().unwrap();
114 let hypersync_tx = self.hypersync_tx.take();
115
116 let mut core_client = BlockchainDataClientCore::new(
117 self.config.clone(),
118 hypersync_tx,
119 Some(data_tx),
120 cancellation_token.clone(),
121 );
122
123 let handle = get_runtime().spawn(async move {
124 tracing::debug!("Started task 'process'");
125
126 if let Err(e) = core_client.connect().await {
127 if e.to_string().contains("cancelled") || e.to_string().contains("Sync cancelled") {
130 tracing::warn!("Blockchain core client connection interrupted: {e}");
131 } else {
132 tracing::error!("Failed to connect blockchain core client: {e}");
133 }
134 return;
135 }
136
137 let mut command_rx = command_rx;
138
139 loop {
140 tokio::select! {
141 () = cancellation_token.cancelled() => {
142 tracing::debug!("Received cancellation signal in Blockchain data client process task");
143 core_client.disconnect().await;
144 break;
145 }
146 command = command_rx.recv() => {
147 if let Some(cmd) = command {
148 match cmd {
149 DefiDataCommand::Subscribe(cmd) => {
150 let chain = cmd.blockchain();
151 if chain != core_client.chain.name {
152 tracing::error!("Incorrect blockchain for subscribe command: {chain}");
153 continue;
154 }
155
156 if let Err(e) = Self::handle_subscribe_command(cmd, &mut core_client).await{
157 tracing::error!("Error processing subscribe command: {e}");
158 }
159 }
160 DefiDataCommand::Unsubscribe(cmd) => {
161 let chain = cmd.blockchain();
162 if chain != core_client.chain.name {
163 tracing::error!("Incorrect blockchain for subscribe command: {chain}");
164 continue;
165 }
166
167 if let Err(e) = Self::handle_unsubscribe_command(cmd, &mut core_client).await{
168 tracing::error!("Error processing subscribe command: {e}");
169 }
170 }
171 DefiDataCommand::Request(cmd) => {
172 if let Err(e) = Self::handle_request_command(cmd, &mut core_client).await {
173 tracing::error!("Error processing request command: {e}");
174 }
175 }
176 }
177 } else {
178 tracing::debug!("Command channel closed");
179 break;
180 }
181 }
182 data = hypersync_rx.recv() => {
183 if let Some(msg) = data {
184 let data_event = match msg {
185 BlockchainMessage::Block(block) => {
186 for dex in core_client.cache.get_registered_dexes(){
188 let addresses = core_client.subscription_manager.get_subscribed_dex_contract_addresses(&dex);
189 if !addresses.is_empty() {
190 core_client.hypersync_client.process_block_dex_contract_events(
191 &dex,
192 block.number,
193 addresses,
194 core_client.subscription_manager.get_dex_pool_swap_event_signature(&dex).unwrap(),
195 core_client.subscription_manager.get_dex_pool_mint_event_signature(&dex).unwrap(),
196 core_client.subscription_manager.get_dex_pool_burn_event_signature(&dex).unwrap(),
197 );
198 }
199 }
200
201 Some(DataEvent::DeFi(DefiData::Block(block)))
202 }
203 BlockchainMessage::SwapEvent(swap_event) => {
204 match core_client.get_pool(&swap_event.pool_address) {
205 Ok(pool) => {
206 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
207 match core_client.process_pool_swap_event(
208 &swap_event,
209 pool,
210 dex_extended,
211 ){
212 Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
213 Err(e) => {
214 tracing::error!("Error processing pool swap event: {e}");
215 None
216 }
217 }
218 }
219 Err(e) => {
220 tracing::error!("Failed to get pool {} with error {:?}", swap_event.pool_address, e);
221 None
222 }
223 }
224 }
225 BlockchainMessage::BurnEvent(burn_event) => {
226 match core_client.get_pool(&burn_event.pool_address) {
227 Ok(pool) => {
228 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
229 match core_client.process_pool_burn_event(
230 &burn_event,
231 pool,
232 dex_extended,
233 ){
234 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
235 Err(e) => {
236 tracing::error!("Error processing pool burn event: {e}");
237 None
238 }
239 }
240 }
241 Err(e) => {
242 tracing::error!("Failed to get pool {} with error {:?}", burn_event.pool_address, e);
243 None
244 }
245 }
246 }
247 BlockchainMessage::MintEvent(mint_event) => {
248 match core_client.get_pool(&mint_event.pool_address) {
249 Ok(pool) => {
250 let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
251 match core_client.process_pool_mint_event(
252 &mint_event,
253 pool,
254 dex_extended,
255 ){
256 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
257 Err(e) => {
258 tracing::error!("Error processing pool mint event: {e}");
259 None
260 }
261 }
262 }
263 Err(e) => {
264 tracing::error!("Failed to get pool {} with error {:?}", mint_event.pool_address, e);
265 None
266 }
267 }
268 }
269 BlockchainMessage::CollectEvent(collect_event) => {
270 match core_client.get_pool(&collect_event.pool_address) {
271 Ok(pool) => {
272 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
273 match core_client.process_pool_collect_event(
274 &collect_event,
275 pool,
276 dex_extended,
277 ){
278 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolFeeCollect(update))),
279 Err(e) => {
280 tracing::error!("Error processing pool collect event: {e}");
281 None
282 }
283 }
284 }
285 Err(e) => {
286 tracing::error!("Failed to get pool {} with error {:?}", collect_event.pool_address, e);
287 None
288 }
289 }
290 }
291 BlockchainMessage::FlashEvent(flash_event) => {
292 match core_client.get_pool(&flash_event.pool_address) {
293 Ok(pool) => {
294 match core_client.process_pool_flash_event(&flash_event,pool){
295 Ok(flash) => Some(DataEvent::DeFi(DefiData::PoolFlash(flash))),
296 Err(e) => {
297 tracing::error!("Error processing pool flash event: {e}");
298 None
299 }
300 }
301 }
302 Err(e) => {
303 tracing::error!("Failed to get pool {} with error {:?}", flash_event.pool_address, e);
304 None
305 }
306 }
307 }
308 };
309
310 if let Some(event) = data_event {
311 core_client.send_data(event);
312 }
313 } else {
314 tracing::debug!("HyperSync data channel closed");
315 break;
316 }
317 }
318 msg = async {
319 match core_client.rpc_client {
320 Some(ref mut rpc_client) => rpc_client.next_rpc_message().await,
321 None => std::future::pending().await, }
323 } => {
324 match msg {
326 Ok(BlockchainMessage::Block(block)) => {
327 let data = DataEvent::DeFi(DefiData::Block(block));
328 core_client.send_data(data);
329 },
330 Ok(BlockchainMessage::SwapEvent(_)) => {
331 tracing::warn!("RPC swap events are not yet supported");
332 }
333 Ok(BlockchainMessage::MintEvent(_)) => {
334 tracing::warn!("RPC mint events are not yet supported");
335 }
336 Ok(BlockchainMessage::BurnEvent(_)) => {
337 tracing::warn!("RPC burn events are not yet supported");
338 }
339 Ok(BlockchainMessage::CollectEvent(_)) => {
340 tracing::warn!("RPC collect events are not yet supported")
341 }
342 Ok(BlockchainMessage::FlashEvent(_)) => {
343 tracing::warn!("RPC flash events are not yet supported")
344 }
345 Err(e) => {
346 tracing::error!("Error processing RPC message: {e}");
347 }
348 }
349 }
350 }
351 }
352
353 tracing::debug!("Stopped task 'process'");
354 });
355
356 self.process_task = Some(handle);
357 }
358
359 async fn handle_subscribe_command(
361 command: DefiSubscribeCommand,
362 core_client: &mut BlockchainDataClientCore,
363 ) -> anyhow::Result<()> {
364 match command {
365 DefiSubscribeCommand::Blocks(_cmd) => {
366 tracing::info!("Processing subscribe blocks command");
367
368 if let Some(ref mut rpc) = core_client.rpc_client {
370 if let Err(e) = rpc.subscribe_blocks().await {
371 tracing::warn!(
372 "RPC blocks subscription failed: {e}, falling back to HyperSync"
373 );
374 core_client.hypersync_client.subscribe_blocks();
375 tokio::task::yield_now().await;
376 } else {
377 tracing::info!("Successfully subscribed to blocks via RPC");
378 }
379 } else {
380 tracing::info!("Subscribing to blocks via HyperSync");
381 core_client.hypersync_client.subscribe_blocks();
382 tokio::task::yield_now().await;
383 }
384
385 Ok(())
386 }
387 DefiSubscribeCommand::Pool(cmd) => {
388 tracing::info!(
389 "Processing subscribe pool command for {}",
390 cmd.instrument_id
391 );
392
393 if let Some(ref mut _rpc) = core_client.rpc_client {
394 tracing::warn!("RPC pool subscription not yet implemented, using HyperSync");
395 }
396
397 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
398 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
399 .map_err(|e| {
400 anyhow::anyhow!(
401 "Invalid pool address '{}' failed with error: {:?}",
402 cmd.instrument_id,
403 e
404 )
405 })?;
406
407 core_client
409 .subscription_manager
410 .subscribe_swaps(dex, pool_address);
411 core_client
412 .subscription_manager
413 .subscribe_burns(dex, pool_address);
414 core_client
415 .subscription_manager
416 .subscribe_mints(dex, pool_address);
417 core_client
418 .subscription_manager
419 .subscribe_collects(dex, pool_address);
420 core_client
421 .subscription_manager
422 .subscribe_flashes(dex, pool_address);
423
424 tracing::info!(
425 "Subscribed to all pool events for {} at address {}",
426 cmd.instrument_id,
427 pool_address
428 );
429 } else {
430 anyhow::bail!(
431 "Invalid venue {}, expected Blockchain DEX format",
432 cmd.instrument_id.venue
433 )
434 }
435
436 Ok(())
437 }
438 DefiSubscribeCommand::PoolSwaps(cmd) => {
439 tracing::info!(
440 "Processing subscribe pool swaps command for {}",
441 cmd.instrument_id
442 );
443
444 if let Some(ref mut _rpc) = core_client.rpc_client {
445 tracing::warn!(
446 "RPC pool swaps subscription not yet implemented, using HyperSync"
447 );
448 }
449
450 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
451 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
452 .map_err(|e| {
453 anyhow::anyhow!(
454 "Invalid pool swap address '{}' failed with error: {:?}",
455 cmd.instrument_id,
456 e
457 )
458 })?;
459 core_client
460 .subscription_manager
461 .subscribe_swaps(dex, pool_address);
462 } else {
463 anyhow::bail!(
464 "Invalid venue {}, expected Blockchain DEX format",
465 cmd.instrument_id.venue
466 )
467 }
468
469 Ok(())
470 }
471 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
472 tracing::info!(
473 "Processing subscribe pool liquidity updates command for address: {}",
474 cmd.instrument_id
475 );
476
477 if let Some(ref mut _rpc) = core_client.rpc_client {
478 tracing::warn!(
479 "RPC pool liquidity updates subscription not yet implemented, using HyperSync"
480 );
481 }
482
483 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
484 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
485 .map_err(|_| {
486 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
487 })?;
488 core_client
489 .subscription_manager
490 .subscribe_burns(dex, pool_address);
491 core_client
492 .subscription_manager
493 .subscribe_mints(dex, pool_address);
494 } else {
495 anyhow::bail!(
496 "Invalid venue {}, expected Blockchain DEX format",
497 cmd.instrument_id.venue
498 )
499 }
500
501 Ok(())
502 }
503 DefiSubscribeCommand::PoolFeeCollects(cmd) => {
504 tracing::info!(
505 "Processing subscribe pool fee collects command for address: {}",
506 cmd.instrument_id
507 );
508
509 if let Some(ref mut _rpc) = core_client.rpc_client {
510 tracing::warn!(
511 "RPC pool fee collects subscription not yet implemented, using HyperSync"
512 );
513 }
514
515 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
516 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
517 .map_err(|_| {
518 anyhow::anyhow!(
519 "Invalid pool fee collect address: {}",
520 cmd.instrument_id
521 )
522 })?;
523 core_client
524 .subscription_manager
525 .subscribe_collects(dex, pool_address);
526 } else {
527 anyhow::bail!(
528 "Invalid venue {}, expected Blockchain DEX format",
529 cmd.instrument_id.venue
530 )
531 }
532
533 Ok(())
534 }
535 DefiSubscribeCommand::PoolFlashEvents(cmd) => {
536 tracing::info!(
537 "Processing subscribe pool flash command for address: {}",
538 cmd.instrument_id
539 );
540
541 if let Some(ref mut _rpc) = core_client.rpc_client {
542 tracing::warn!(
543 "RPC pool fee collects subscription not yet implemented, using HyperSync"
544 );
545 }
546
547 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
548 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
549 .map_err(|_| {
550 anyhow::anyhow!(
551 "Invalid pool flash subscribe address: {}",
552 cmd.instrument_id
553 )
554 })?;
555 core_client
556 .subscription_manager
557 .subscribe_flashes(dex, pool_address);
558 } else {
559 anyhow::bail!(
560 "Invalid venue {}, expected Blockchain DEX format",
561 cmd.instrument_id.venue
562 )
563 }
564
565 Ok(())
566 }
567 }
568 }
569
570 async fn handle_unsubscribe_command(
572 command: DefiUnsubscribeCommand,
573 core_client: &mut BlockchainDataClientCore,
574 ) -> anyhow::Result<()> {
575 match command {
576 DefiUnsubscribeCommand::Blocks(_cmd) => {
577 tracing::info!("Processing unsubscribe blocks command");
578
579 if core_client.rpc_client.is_some() {
581 tracing::warn!("RPC blocks unsubscription not yet implemented");
582 }
583
584 core_client.hypersync_client.unsubscribe_blocks().await;
586 tracing::info!("Unsubscribed from blocks via HyperSync");
587
588 Ok(())
589 }
590 DefiUnsubscribeCommand::Pool(cmd) => {
591 tracing::info!(
592 "Processing unsubscribe pool command for {}",
593 cmd.instrument_id
594 );
595
596 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
597 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
598 .map_err(|_| {
599 anyhow::anyhow!("Invalid pool address: {}", cmd.instrument_id)
600 })?;
601
602 core_client
604 .subscription_manager
605 .unsubscribe_swaps(dex, pool_address);
606 core_client
607 .subscription_manager
608 .unsubscribe_burns(dex, pool_address);
609 core_client
610 .subscription_manager
611 .unsubscribe_mints(dex, pool_address);
612 core_client
613 .subscription_manager
614 .unsubscribe_collects(dex, pool_address);
615 core_client
616 .subscription_manager
617 .unsubscribe_flashes(dex, pool_address);
618
619 tracing::info!(
620 "Unsubscribed from all pool events for {} at address {}",
621 cmd.instrument_id,
622 pool_address
623 );
624 } else {
625 anyhow::bail!(
626 "Invalid venue {}, expected Blockchain DEX format",
627 cmd.instrument_id.venue
628 )
629 }
630
631 Ok(())
632 }
633 DefiUnsubscribeCommand::PoolSwaps(cmd) => {
634 tracing::info!("Processing unsubscribe pool swaps command");
635
636 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
637 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
638 .map_err(|_| {
639 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
640 })?;
641 core_client
642 .subscription_manager
643 .unsubscribe_swaps(dex, pool_address);
644 } else {
645 anyhow::bail!(
646 "Invalid venue {}, expected Blockchain DEX format",
647 cmd.instrument_id.venue
648 )
649 }
650
651 Ok(())
652 }
653 DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
654 tracing::info!(
655 "Processing unsubscribe pool liquidity updates command for {}",
656 cmd.instrument_id
657 );
658
659 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
660 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
661 .map_err(|_| {
662 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
663 })?;
664 core_client
665 .subscription_manager
666 .unsubscribe_burns(dex, pool_address);
667 core_client
668 .subscription_manager
669 .unsubscribe_mints(dex, pool_address);
670 } else {
671 anyhow::bail!(
672 "Invalid venue {}, expected Blockchain DEX format",
673 cmd.instrument_id.venue
674 )
675 }
676
677 Ok(())
678 }
679 DefiUnsubscribeCommand::PoolFeeCollects(cmd) => {
680 tracing::info!(
681 "Processing unsubscribe pool fee collects command for {}",
682 cmd.instrument_id
683 );
684
685 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
686 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
687 .map_err(|_| {
688 anyhow::anyhow!(
689 "Invalid pool fee collect address: {}",
690 cmd.instrument_id
691 )
692 })?;
693 core_client
694 .subscription_manager
695 .unsubscribe_collects(dex, pool_address);
696 } else {
697 anyhow::bail!(
698 "Invalid venue {}, expected Blockchain DEX format",
699 cmd.instrument_id.venue
700 )
701 }
702
703 Ok(())
704 }
705 DefiUnsubscribeCommand::PoolFlashEvents(cmd) => {
706 tracing::info!(
707 "Processing unsubscribe pool flash command for {}",
708 cmd.instrument_id
709 );
710
711 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
712 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
713 .map_err(|_| {
714 anyhow::anyhow!("Invalid pool flash address: {}", cmd.instrument_id)
715 })?;
716 core_client
717 .subscription_manager
718 .unsubscribe_flashes(dex, pool_address);
719 } else {
720 anyhow::bail!(
721 "Invalid venue {}, expected Blockchain DEX format",
722 cmd.instrument_id.venue
723 )
724 }
725
726 Ok(())
727 }
728 }
729 }
730
731 async fn handle_request_command(
733 command: DefiRequestCommand,
734 core_client: &mut BlockchainDataClientCore,
735 ) -> anyhow::Result<()> {
736 match command {
737 DefiRequestCommand::PoolSnapshot(cmd) => {
738 tracing::info!("Processing pool snapshot request for {}", cmd.instrument_id);
739
740 let pool_address =
741 validate_address(cmd.instrument_id.symbol.as_str()).map_err(|e| {
742 anyhow::anyhow!(
743 "Invalid pool address '{}' failed with error: {:?}",
744 cmd.instrument_id,
745 e
746 )
747 })?;
748
749 match core_client.get_pool(&pool_address) {
750 Ok(pool) => {
751 let pool = pool.clone();
752 tracing::debug!("Found pool for snapshot request: {}", cmd.instrument_id);
753
754 let pool_data = DataEvent::DeFi(DefiData::Pool(pool.as_ref().clone()));
756 core_client.send_data(pool_data);
757
758 match core_client.bootstrap_latest_pool_profiler(&pool).await {
759 Ok((profiler, already_valid)) => {
760 let snapshot = profiler.extract_snapshot();
761
762 tracing::info!(
763 "Saving pool snapshot with {} positions and {} ticks to database...",
764 snapshot.positions.len(),
765 snapshot.ticks.len()
766 );
767 core_client
768 .cache
769 .add_pool_snapshot(&pool.address, &snapshot)
770 .await?;
771
772 if core_client
774 .check_snapshot_validity(&profiler, already_valid)
775 .await?
776 {
777 let snapshot_data =
778 DataEvent::DeFi(DefiData::PoolSnapshot(snapshot));
779 core_client.send_data(snapshot_data);
780 }
781 }
782 Err(e) => tracing::error!(
783 "Failed to bootstrap pool profiler for {} and extract snapshot with error {}",
784 cmd.instrument_id,
785 e.to_string()
786 ),
787 }
788 }
789 Err(e) => {
790 tracing::warn!("Pool {} not found in cache: {e}", cmd.instrument_id);
791 }
792 }
793
794 Ok(())
795 }
796 }
797 }
798
799 pub async fn await_process_task_close(&mut self) {
804 if let Some(handle) = self.process_task.take()
805 && let Err(e) = handle.await
806 {
807 tracing::error!("Process task join error: {e}");
808 }
809 }
810}
811
812#[async_trait::async_trait]
813impl DataClient for BlockchainDataClient {
814 fn client_id(&self) -> ClientId {
815 ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
816 }
817
818 fn venue(&self) -> Option<Venue> {
819 None
822 }
823
824 fn start(&mut self) -> anyhow::Result<()> {
825 tracing::info!(
826 "Starting blockchain data client for '{chain_name}'",
827 chain_name = self.chain.name
828 );
829 Ok(())
830 }
831
832 fn stop(&mut self) -> anyhow::Result<()> {
833 tracing::info!(
834 "Stopping blockchain data client for '{chain_name}'",
835 chain_name = self.chain.name
836 );
837 self.cancellation_token.cancel();
838
839 self.cancellation_token = tokio_util::sync::CancellationToken::new();
841 Ok(())
842 }
843
844 fn reset(&mut self) -> anyhow::Result<()> {
845 tracing::info!(
846 "Resetting blockchain data client for '{chain_name}'",
847 chain_name = self.chain.name
848 );
849 self.cancellation_token = tokio_util::sync::CancellationToken::new();
850 Ok(())
851 }
852
853 fn dispose(&mut self) -> anyhow::Result<()> {
854 tracing::info!(
855 "Disposing blockchain data client for '{chain_name}'",
856 chain_name = self.chain.name
857 );
858 Ok(())
859 }
860
861 async fn connect(&mut self) -> anyhow::Result<()> {
862 tracing::info!(
863 "Connecting blockchain data client for '{}'",
864 self.chain.name
865 );
866
867 if self.process_task.is_none() {
868 self.spawn_process_task();
869 }
870
871 Ok(())
872 }
873
874 async fn disconnect(&mut self) -> anyhow::Result<()> {
875 tracing::info!(
876 "Disconnecting blockchain data client for '{}'",
877 self.chain.name
878 );
879
880 self.cancellation_token.cancel();
881 self.await_process_task_close().await;
882
883 self.cancellation_token = tokio_util::sync::CancellationToken::new();
885 let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
886 self.hypersync_tx = Some(hypersync_tx);
887 self.hypersync_rx = Some(hypersync_rx);
888 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
889 self.command_tx = command_tx;
890 self.command_rx = Some(command_rx);
891
892 Ok(())
893 }
894
895 fn is_connected(&self) -> bool {
896 true
899 }
900
901 fn is_disconnected(&self) -> bool {
902 !self.is_connected()
903 }
904
905 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
906 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd.clone()));
907 self.command_tx.send(command)?;
908 Ok(())
909 }
910
911 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
912 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd.clone()));
913 self.command_tx.send(command)?;
914 Ok(())
915 }
916
917 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
918 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd.clone()));
919 self.command_tx.send(command)?;
920 Ok(())
921 }
922
923 fn subscribe_pool_liquidity_updates(
924 &mut self,
925 cmd: &SubscribePoolLiquidityUpdates,
926 ) -> anyhow::Result<()> {
927 let command =
928 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
929 self.command_tx.send(command)?;
930 Ok(())
931 }
932
933 fn subscribe_pool_fee_collects(
934 &mut self,
935 cmd: &SubscribePoolFeeCollects,
936 ) -> anyhow::Result<()> {
937 let command =
938 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFeeCollects(cmd.clone()));
939 self.command_tx.send(command)?;
940 Ok(())
941 }
942
943 fn subscribe_pool_flash_events(
944 &mut self,
945 cmd: &SubscribePoolFlashEvents,
946 ) -> anyhow::Result<()> {
947 let command =
948 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFlashEvents(cmd.clone()));
949 self.command_tx.send(command)?;
950 Ok(())
951 }
952
953 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
954 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
955 self.command_tx.send(command)?;
956 Ok(())
957 }
958
959 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
960 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
961 self.command_tx.send(command)?;
962 Ok(())
963 }
964
965 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
966 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
967 self.command_tx.send(command)?;
968 Ok(())
969 }
970
971 fn unsubscribe_pool_liquidity_updates(
972 &mut self,
973 cmd: &UnsubscribePoolLiquidityUpdates,
974 ) -> anyhow::Result<()> {
975 let command =
976 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
977 self.command_tx.send(command)?;
978 Ok(())
979 }
980
981 fn unsubscribe_pool_fee_collects(
982 &mut self,
983 cmd: &UnsubscribePoolFeeCollects,
984 ) -> anyhow::Result<()> {
985 let command =
986 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFeeCollects(cmd.clone()));
987 self.command_tx.send(command)?;
988 Ok(())
989 }
990
991 fn unsubscribe_pool_flash_events(
992 &mut self,
993 cmd: &UnsubscribePoolFlashEvents,
994 ) -> anyhow::Result<()> {
995 let command =
996 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFlashEvents(cmd.clone()));
997 self.command_tx.send(command)?;
998 Ok(())
999 }
1000
1001 fn request_pool_snapshot(
1002 &self,
1003 cmd: &nautilus_common::messages::defi::RequestPoolSnapshot,
1004 ) -> anyhow::Result<()> {
1005 let command = DefiDataCommand::Request(DefiRequestCommand::PoolSnapshot(cmd.clone()));
1006 self.command_tx.send(command)?;
1007 Ok(())
1008 }
1009}