1use nautilus_common::{
17 messages::{
18 DataEvent,
19 defi::{
20 DefiDataCommand, DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand,
21 SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
22 SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
23 UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
24 UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
25 },
26 },
27 runtime::get_runtime,
28};
29use nautilus_data::client::DataClient;
30use nautilus_model::{
31 defi::{DefiData, SharedChain, validation::validate_address},
32 identifiers::{ClientId, Venue},
33};
34
35use crate::{
36 config::BlockchainDataClientConfig,
37 data::core::BlockchainDataClientCore,
38 exchanges::get_dex_extended,
39 rpc::{BlockchainRpcClient, types::BlockchainMessage},
40};
41
42#[derive(Debug)]
52pub struct BlockchainDataClient {
53 pub chain: SharedChain,
55 pub config: BlockchainDataClientConfig,
57 pub core_client: Option<BlockchainDataClientCore>,
60 hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
62 hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
64 command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
66 command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
68 process_task: Option<tokio::task::JoinHandle<()>>,
70 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
72}
73
74impl BlockchainDataClient {
75 #[must_use]
77 pub fn new(config: BlockchainDataClientConfig) -> Self {
78 let chain = config.chain.clone();
79 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
80 let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
81 Self {
82 chain,
83 core_client: None,
84 config,
85 hypersync_rx: Some(hypersync_rx),
86 hypersync_tx: Some(hypersync_tx),
87 command_tx,
88 command_rx: Some(command_rx),
89 process_task: None,
90 shutdown_tx: None,
91 }
92 }
93
94 fn spawn_process_task(&mut self) {
102 let command_rx = if let Some(r) = self.command_rx.take() {
103 r
104 } else {
105 tracing::error!("Command receiver already taken, not spawning handler");
106 return;
107 };
108
109 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
110 self.shutdown_tx = Some(shutdown_tx);
111
112 let data_tx = nautilus_common::runner::get_data_event_sender();
113
114 let mut hypersync_rx = self.hypersync_rx.take().unwrap();
115 let hypersync_tx = self.hypersync_tx.take();
116
117 let mut core_client =
118 BlockchainDataClientCore::new(self.config.clone(), hypersync_tx, Some(data_tx));
119
120 let handle = get_runtime().spawn(async move {
121 tracing::debug!("Started task 'process'");
122
123 if let Err(e) = core_client.connect().await {
124 tracing::error!("Failed to connect blockchain core client: {e}");
125 return;
126 }
127
128 let mut command_rx = command_rx;
129 let mut shutdown_rx = shutdown_rx;
130
131 loop {
132 tokio::select! {
133 _ = &mut shutdown_rx => {
134 tracing::debug!("Received shutdown signal in Blockchain data client process task");
135 core_client.disconnect();
136 break;
137 }
138 command = command_rx.recv() => {
139 if let Some(cmd) = command {
140 match cmd {
141 DefiDataCommand::Subscribe(cmd) => {
142 let chain = cmd.blockchain();
143 if chain != core_client.chain.name {
144 tracing::error!("Incorrect blockchain for subscribe command: {chain}");
145 continue;
146 }
147
148 if let Err(e) = Self::handle_subscribe_command(cmd, &mut core_client).await{
149 tracing::error!("Error processing subscribe command: {e}");
150 }
151 }
152 DefiDataCommand::Unsubscribe(cmd) => {
153 let chain = cmd.blockchain();
154 if chain != core_client.chain.name {
155 tracing::error!("Incorrect blockchain for subscribe command: {chain}");
156 continue;
157 }
158
159 if let Err(e) = Self::handle_unsubscribe_command(cmd, &mut core_client).await{
160 tracing::error!("Error processing subscribe command: {e}");
161 }
162 }
163 DefiDataCommand::Request(cmd) => {
164 if let Err(e) = Self::handle_request_command(cmd, &mut core_client).await {
165 tracing::error!("Error processing request command: {e}");
166 }
167 }
168 }
169 } else {
170 tracing::debug!("Command channel closed");
171 break;
172 }
173 }
174 data = hypersync_rx.recv() => {
175 if let Some(msg) = data {
176 let data_event = match msg {
177 BlockchainMessage::Block(block) => {
178 for dex in core_client.cache.get_registered_dexes(){
180 let addresses = core_client.subscription_manager.get_subscribed_dex_contract_addresses(&dex);
181 if !addresses.is_empty() {
182 core_client.hypersync_client.process_block_dex_contract_events(
183 &dex,
184 block.number,
185 addresses,
186 core_client.subscription_manager.get_dex_pool_swap_event_signature(&dex).unwrap(),
187 core_client.subscription_manager.get_dex_pool_mint_event_signature(&dex).unwrap(),
188 core_client.subscription_manager.get_dex_pool_burn_event_signature(&dex).unwrap(),
189 ).await;
190 }
191 }
192
193 Some(DataEvent::DeFi(DefiData::Block(block)))
194 }
195 BlockchainMessage::SwapEvent(swap_event) => {
196 match core_client.get_pool(&swap_event.pool_address) {
197 Ok(pool) => {
198 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
199 match core_client.process_pool_swap_event(
200 &swap_event,
201 pool,
202 dex_extended,
203 ){
204 Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
205 Err(e) => {
206 tracing::error!("Error processing pool swap event: {e}");
207 None
208 }
209 }
210 }
211 Err(e) => {
212 tracing::error!("Failed to get pool {} with error {:?}", swap_event.pool_address, e);
213 None
214 }
215 }
216 }
217 BlockchainMessage::BurnEvent(burn_event) => {
218 match core_client.get_pool(&burn_event.pool_address) {
219 Ok(pool) => {
220 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
221 match core_client.process_pool_burn_event(
222 &burn_event,
223 pool,
224 dex_extended,
225 ){
226 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
227 Err(e) => {
228 tracing::error!("Error processing pool burn event: {e}");
229 None
230 }
231 }
232 }
233 Err(e) => {
234 tracing::error!("Failed to get pool {} with error {:?}", burn_event.pool_address, e);
235 None
236 }
237 }
238 }
239 BlockchainMessage::MintEvent(mint_event) => {
240 match core_client.get_pool(&mint_event.pool_address) {
241 Ok(pool) => {
242 let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
243 match core_client.process_pool_mint_event(
244 &mint_event,
245 pool,
246 dex_extended,
247 ){
248 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
249 Err(e) => {
250 tracing::error!("Error processing pool mint event: {e}");
251 None
252 }
253 }
254 }
255 Err(e) => {
256 tracing::error!("Failed to get pool {} with error {:?}", mint_event.pool_address, e);
257 None
258 }
259 }
260 }
261 BlockchainMessage::CollectEvent(collect_event) => {
262 match core_client.get_pool(&collect_event.pool_address) {
263 Ok(pool) => {
264 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
265 match core_client.process_pool_collect_event(
266 &collect_event,
267 pool,
268 dex_extended,
269 ){
270 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolFeeCollect(update))),
271 Err(e) => {
272 tracing::error!("Error processing pool collect event: {e}");
273 None
274 }
275 }
276 }
277 Err(e) => {
278 tracing::error!("Failed to get pool {} with error {:?}", collect_event.pool_address, e);
279 None
280 }
281 }
282 }
283 BlockchainMessage::FlashEvent(flash_event) => {
284 match core_client.get_pool(&flash_event.pool_address) {
285 Ok(pool) => {
286 match core_client.process_pool_flash_event(&flash_event,pool){
287 Ok(flash) => Some(DataEvent::DeFi(DefiData::PoolFlash(flash))),
288 Err(e) => {
289 tracing::error!("Error processing pool flash event: {e}");
290 None
291 }
292 }
293 }
294 Err(e) => {
295 tracing::error!("Failed to get pool {} with error {:?}", flash_event.pool_address, e);
296 None
297 }
298 }
299 }
300 };
301
302 if let Some(event) = data_event {
303 core_client.send_data(event);
304 }
305 } else {
306 tracing::debug!("HyperSync data channel closed");
307 break;
308 }
309 }
310 msg = async {
311 if let Some(ref mut rpc_client) = core_client.rpc_client {
312 Some(rpc_client.next_rpc_message().await)
313 } else {
314 None
315 }
316 } => {
317 if let Some(msg) = msg {
318 match msg {
319 Ok(BlockchainMessage::Block(block)) => {
320 let data = DataEvent::DeFi(DefiData::Block(block));
321 core_client.send_data(data);
322 },
323 Ok(BlockchainMessage::SwapEvent(_)) => {
324 tracing::warn!("RPC swap events are not yet supported");
325 }
326 Ok(BlockchainMessage::MintEvent(_)) => {
327 tracing::warn!("RPC mint events are not yet supported");
328 }
329 Ok(BlockchainMessage::BurnEvent(_)) => {
330 tracing::warn!("RPC burn events are not yet supported");
331 }
332 Ok(BlockchainMessage::CollectEvent(_)) => {
333 tracing::warn!("RPC collect events are not yet supported")
334 }
335 Ok(BlockchainMessage::FlashEvent(_)) => {
336 tracing::warn!("RPC flash events are not yet supported")
337 }
338 Err(e) => {
339 tracing::error!("Error processing RPC message: {e}");
340 }
341 }
342 }
343 }
344 }
345 }
346
347 tracing::debug!("Stopped task 'process'");
348 });
349
350 self.process_task = Some(handle);
351 }
352
353 async fn handle_subscribe_command(
355 command: DefiSubscribeCommand,
356 core_client: &mut BlockchainDataClientCore,
357 ) -> anyhow::Result<()> {
358 match command {
359 DefiSubscribeCommand::Blocks(_cmd) => {
360 tracing::info!("Processing subscribe blocks command");
361
362 if let Some(ref mut rpc) = core_client.rpc_client {
364 if let Err(e) = rpc.subscribe_blocks().await {
365 tracing::warn!(
366 "RPC blocks subscription failed: {e}, falling back to HyperSync"
367 );
368 core_client.hypersync_client.subscribe_blocks();
369 tokio::task::yield_now().await;
370 } else {
371 tracing::info!("Successfully subscribed to blocks via RPC");
372 }
373 } else {
374 tracing::info!("Subscribing to blocks via HyperSync");
375 core_client.hypersync_client.subscribe_blocks();
376 tokio::task::yield_now().await;
377 }
378
379 Ok(())
380 }
381 DefiSubscribeCommand::Pool(cmd) => {
382 tracing::info!(
383 "Processing subscribe pool command for {}",
384 cmd.instrument_id
385 );
386
387 if let Some(ref mut _rpc) = core_client.rpc_client {
388 tracing::warn!("RPC pool subscription not yet implemented, using HyperSync");
389 }
390
391 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
392 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
393 .map_err(|e| {
394 anyhow::anyhow!(
395 "Invalid pool address '{}' failed with error: {:?}",
396 cmd.instrument_id,
397 e
398 )
399 })?;
400
401 core_client
403 .subscription_manager
404 .subscribe_swaps(dex, pool_address);
405 core_client
406 .subscription_manager
407 .subscribe_burns(dex, pool_address);
408 core_client
409 .subscription_manager
410 .subscribe_mints(dex, pool_address);
411 core_client
412 .subscription_manager
413 .subscribe_collects(dex, pool_address);
414 core_client
415 .subscription_manager
416 .subscribe_flashes(dex, pool_address);
417
418 tracing::info!(
419 "Subscribed to all pool events for {} at address {}",
420 cmd.instrument_id,
421 pool_address
422 );
423 } else {
424 anyhow::bail!(
425 "Invalid venue {}, expected Blockchain DEX format",
426 cmd.instrument_id.venue
427 )
428 }
429
430 Ok(())
431 }
432 DefiSubscribeCommand::PoolSwaps(cmd) => {
433 tracing::info!(
434 "Processing subscribe pool swaps command for {}",
435 cmd.instrument_id
436 );
437
438 if let Some(ref mut _rpc) = core_client.rpc_client {
439 tracing::warn!(
440 "RPC pool swaps subscription not yet implemented, using HyperSync"
441 );
442 }
443
444 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
445 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
446 .map_err(|e| {
447 anyhow::anyhow!(
448 "Invalid pool swap address '{}' failed with error: {:?}",
449 cmd.instrument_id,
450 e
451 )
452 })?;
453 core_client
454 .subscription_manager
455 .subscribe_swaps(dex, pool_address);
456 } else {
457 anyhow::bail!(
458 "Invalid venue {}, expected Blockchain DEX format",
459 cmd.instrument_id.venue
460 )
461 }
462
463 Ok(())
464 }
465 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
466 tracing::info!(
467 "Processing subscribe pool liquidity updates command for address: {}",
468 cmd.instrument_id
469 );
470
471 if let Some(ref mut _rpc) = core_client.rpc_client {
472 tracing::warn!(
473 "RPC pool liquidity updates subscription not yet implemented, using HyperSync"
474 );
475 }
476
477 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
478 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
479 .map_err(|_| {
480 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
481 })?;
482 core_client
483 .subscription_manager
484 .subscribe_burns(dex, pool_address);
485 core_client
486 .subscription_manager
487 .subscribe_mints(dex, pool_address);
488 } else {
489 anyhow::bail!(
490 "Invalid venue {}, expected Blockchain DEX format",
491 cmd.instrument_id.venue
492 )
493 }
494
495 Ok(())
496 }
497 DefiSubscribeCommand::PoolFeeCollects(cmd) => {
498 tracing::info!(
499 "Processing subscribe pool fee collects command for address: {}",
500 cmd.instrument_id
501 );
502
503 if let Some(ref mut _rpc) = core_client.rpc_client {
504 tracing::warn!(
505 "RPC pool fee collects subscription not yet implemented, using HyperSync"
506 );
507 }
508
509 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
510 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
511 .map_err(|_| {
512 anyhow::anyhow!(
513 "Invalid pool fee collect address: {}",
514 cmd.instrument_id
515 )
516 })?;
517 core_client
518 .subscription_manager
519 .subscribe_collects(dex, pool_address);
520 } else {
521 anyhow::bail!(
522 "Invalid venue {}, expected Blockchain DEX format",
523 cmd.instrument_id.venue
524 )
525 }
526
527 Ok(())
528 }
529 DefiSubscribeCommand::PoolFlashEvents(cmd) => {
530 tracing::info!(
531 "Processing subscribe pool flash command for address: {}",
532 cmd.instrument_id
533 );
534
535 if let Some(ref mut _rpc) = core_client.rpc_client {
536 tracing::warn!(
537 "RPC pool fee collects subscription not yet implemented, using HyperSync"
538 );
539 }
540
541 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
542 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
543 .map_err(|_| {
544 anyhow::anyhow!(
545 "Invalid pool flash subscribe address: {}",
546 cmd.instrument_id
547 )
548 })?;
549 core_client
550 .subscription_manager
551 .subscribe_flashes(dex, pool_address);
552 } else {
553 anyhow::bail!(
554 "Invalid venue {}, expected Blockchain DEX format",
555 cmd.instrument_id.venue
556 )
557 }
558
559 Ok(())
560 }
561 }
562 }
563
564 async fn handle_unsubscribe_command(
566 command: DefiUnsubscribeCommand,
567 core_client: &mut BlockchainDataClientCore,
568 ) -> anyhow::Result<()> {
569 match command {
570 DefiUnsubscribeCommand::Blocks(_cmd) => {
571 tracing::info!("Processing unsubscribe blocks command");
572
573 if core_client.rpc_client.is_some() {
575 tracing::warn!("RPC blocks unsubscription not yet implemented");
576 }
577
578 core_client.hypersync_client.unsubscribe_blocks();
580 tracing::info!("Unsubscribed from blocks via HyperSync");
581
582 Ok(())
583 }
584 DefiUnsubscribeCommand::Pool(cmd) => {
585 tracing::info!(
586 "Processing unsubscribe pool command for {}",
587 cmd.instrument_id
588 );
589
590 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
591 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
592 .map_err(|_| {
593 anyhow::anyhow!("Invalid pool address: {}", cmd.instrument_id)
594 })?;
595
596 core_client
598 .subscription_manager
599 .unsubscribe_swaps(dex, pool_address);
600 core_client
601 .subscription_manager
602 .unsubscribe_burns(dex, pool_address);
603 core_client
604 .subscription_manager
605 .unsubscribe_mints(dex, pool_address);
606 core_client
607 .subscription_manager
608 .unsubscribe_collects(dex, pool_address);
609 core_client
610 .subscription_manager
611 .unsubscribe_flashes(dex, pool_address);
612
613 tracing::info!(
614 "Unsubscribed from all pool events for {} at address {}",
615 cmd.instrument_id,
616 pool_address
617 );
618 } else {
619 anyhow::bail!(
620 "Invalid venue {}, expected Blockchain DEX format",
621 cmd.instrument_id.venue
622 )
623 }
624
625 Ok(())
626 }
627 DefiUnsubscribeCommand::PoolSwaps(cmd) => {
628 tracing::info!("Processing unsubscribe pool swaps command");
629
630 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
631 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
632 .map_err(|_| {
633 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
634 })?;
635 core_client
636 .subscription_manager
637 .unsubscribe_swaps(dex, pool_address);
638 } else {
639 anyhow::bail!(
640 "Invalid venue {}, expected Blockchain DEX format",
641 cmd.instrument_id.venue
642 )
643 }
644
645 Ok(())
646 }
647 DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
648 tracing::info!(
649 "Processing unsubscribe pool liquidity updates command for {}",
650 cmd.instrument_id
651 );
652
653 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
654 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
655 .map_err(|_| {
656 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
657 })?;
658 core_client
659 .subscription_manager
660 .unsubscribe_burns(dex, pool_address);
661 core_client
662 .subscription_manager
663 .unsubscribe_mints(dex, pool_address);
664 } else {
665 anyhow::bail!(
666 "Invalid venue {}, expected Blockchain DEX format",
667 cmd.instrument_id.venue
668 )
669 }
670
671 Ok(())
672 }
673 DefiUnsubscribeCommand::PoolFeeCollects(cmd) => {
674 tracing::info!(
675 "Processing unsubscribe pool fee collects command for {}",
676 cmd.instrument_id
677 );
678
679 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
680 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
681 .map_err(|_| {
682 anyhow::anyhow!(
683 "Invalid pool fee collect address: {}",
684 cmd.instrument_id
685 )
686 })?;
687 core_client
688 .subscription_manager
689 .unsubscribe_collects(dex, pool_address);
690 } else {
691 anyhow::bail!(
692 "Invalid venue {}, expected Blockchain DEX format",
693 cmd.instrument_id.venue
694 )
695 }
696
697 Ok(())
698 }
699 DefiUnsubscribeCommand::PoolFlashEvents(cmd) => {
700 tracing::info!(
701 "Processing unsubscribe pool flash command for {}",
702 cmd.instrument_id
703 );
704
705 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
706 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
707 .map_err(|_| {
708 anyhow::anyhow!("Invalid pool flash address: {}", cmd.instrument_id)
709 })?;
710 core_client
711 .subscription_manager
712 .unsubscribe_flashes(dex, pool_address);
713 } else {
714 anyhow::bail!(
715 "Invalid venue {}, expected Blockchain DEX format",
716 cmd.instrument_id.venue
717 )
718 }
719
720 Ok(())
721 }
722 }
723 }
724
725 async fn handle_request_command(
727 command: DefiRequestCommand,
728 core_client: &mut BlockchainDataClientCore,
729 ) -> anyhow::Result<()> {
730 match command {
731 DefiRequestCommand::PoolSnapshot(cmd) => {
732 tracing::info!("Processing pool snapshot request for {}", cmd.instrument_id);
733
734 let pool_address =
735 validate_address(cmd.instrument_id.symbol.as_str()).map_err(|e| {
736 anyhow::anyhow!(
737 "Invalid pool address '{}' failed with error: {:?}",
738 cmd.instrument_id,
739 e
740 )
741 })?;
742
743 match core_client.get_pool(&pool_address) {
744 Ok(pool) => {
745 tracing::debug!("Found pool for snapshot request: {}", cmd.instrument_id);
746
747 let pool_data = DataEvent::DeFi(DefiData::Pool(pool.as_ref().clone()));
749 core_client.send_data(pool_data);
750
751 if let Some(database) = &core_client.cache.database {
753 match database
754 .load_latest_valid_pool_snapshot(
755 core_client.chain.chain_id,
756 &pool_address,
757 )
758 .await
759 {
760 Ok(Some(snapshot)) => {
761 tracing::info!(
762 "Loaded pool snapshot for {} at block {} with {} positions and {} ticks",
763 cmd.instrument_id,
764 snapshot.block_position.number,
765 snapshot.positions.len(),
766 snapshot.ticks.len()
767 );
768 let snapshot_data =
769 DataEvent::DeFi(DefiData::PoolSnapshot(snapshot));
770 core_client.send_data(snapshot_data);
771 }
772 Ok(None) => {
773 tracing::warn!(
774 "No valid pool snapshot found in database for {}",
775 cmd.instrument_id
776 );
777 }
778 Err(e) => {
779 tracing::error!(
780 "Failed to load pool snapshot for {}: {e}",
781 cmd.instrument_id
782 );
783 }
784 }
785 }
786 }
787 Err(e) => {
788 tracing::warn!("Pool {} not found in cache: {e}", cmd.instrument_id);
789 }
790 }
791
792 Ok(())
793 }
794 }
795 }
796
797 pub async fn await_process_task_close(&mut self) {
802 if let Some(handle) = self.process_task.take()
803 && let Err(e) = handle.await
804 {
805 tracing::error!("Process task join error: {e}");
806 }
807 }
808}
809
810#[async_trait::async_trait]
811impl DataClient for BlockchainDataClient {
812 fn client_id(&self) -> ClientId {
813 ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
814 }
815
816 fn venue(&self) -> Option<Venue> {
817 None
820 }
821
822 fn start(&mut self) -> anyhow::Result<()> {
823 tracing::info!(
824 "Starting blockchain data client for '{chain_name}'",
825 chain_name = self.chain.name
826 );
827 Ok(())
828 }
829
830 fn stop(&mut self) -> anyhow::Result<()> {
831 tracing::info!(
832 "Stopping blockchain data client for '{chain_name}'",
833 chain_name = self.chain.name
834 );
835 Ok(())
836 }
837
838 fn reset(&mut self) -> anyhow::Result<()> {
839 tracing::info!(
840 "Resetting blockchain data client for '{chain_name}'",
841 chain_name = self.chain.name
842 );
843 Ok(())
844 }
845
846 fn dispose(&mut self) -> anyhow::Result<()> {
847 tracing::info!(
848 "Disposing blockchain data client for '{chain_name}'",
849 chain_name = self.chain.name
850 );
851 Ok(())
852 }
853
854 async fn connect(&mut self) -> anyhow::Result<()> {
855 tracing::info!(
856 "Connecting blockchain data client for '{}'",
857 self.chain.name
858 );
859
860 if self.process_task.is_none() {
861 self.spawn_process_task();
862 }
863
864 Ok(())
865 }
866
867 async fn disconnect(&mut self) -> anyhow::Result<()> {
868 tracing::info!(
869 "Disconnecting blockchain data client for '{}'",
870 self.chain.name
871 );
872
873 if let Some(shutdown_tx) = self.shutdown_tx.take() {
874 let _ = shutdown_tx.send(());
875 }
876 self.await_process_task_close().await;
877
878 Ok(())
879 }
880
881 fn is_connected(&self) -> bool {
882 true
885 }
886
887 fn is_disconnected(&self) -> bool {
888 !self.is_connected()
889 }
890
891 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
892 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd.clone()));
893 self.command_tx.send(command)?;
894 Ok(())
895 }
896
897 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
898 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd.clone()));
899 self.command_tx.send(command)?;
900 Ok(())
901 }
902
903 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
904 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd.clone()));
905 self.command_tx.send(command)?;
906 Ok(())
907 }
908
909 fn subscribe_pool_liquidity_updates(
910 &mut self,
911 cmd: &SubscribePoolLiquidityUpdates,
912 ) -> anyhow::Result<()> {
913 let command =
914 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
915 self.command_tx.send(command)?;
916 Ok(())
917 }
918
919 fn subscribe_pool_fee_collects(
920 &mut self,
921 cmd: &SubscribePoolFeeCollects,
922 ) -> anyhow::Result<()> {
923 let command =
924 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFeeCollects(cmd.clone()));
925 self.command_tx.send(command)?;
926 Ok(())
927 }
928
929 fn subscribe_pool_flash_events(
930 &mut self,
931 cmd: &SubscribePoolFlashEvents,
932 ) -> anyhow::Result<()> {
933 let command =
934 DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFlashEvents(cmd.clone()));
935 self.command_tx.send(command)?;
936 Ok(())
937 }
938
939 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
940 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
941 self.command_tx.send(command)?;
942 Ok(())
943 }
944
945 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
946 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
947 self.command_tx.send(command)?;
948 Ok(())
949 }
950
951 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
952 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
953 self.command_tx.send(command)?;
954 Ok(())
955 }
956
957 fn unsubscribe_pool_liquidity_updates(
958 &mut self,
959 cmd: &UnsubscribePoolLiquidityUpdates,
960 ) -> anyhow::Result<()> {
961 let command =
962 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
963 self.command_tx.send(command)?;
964 Ok(())
965 }
966
967 fn unsubscribe_pool_fee_collects(
968 &mut self,
969 cmd: &UnsubscribePoolFeeCollects,
970 ) -> anyhow::Result<()> {
971 let command =
972 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFeeCollects(cmd.clone()));
973 self.command_tx.send(command)?;
974 Ok(())
975 }
976
977 fn unsubscribe_pool_flash_events(
978 &mut self,
979 cmd: &UnsubscribePoolFlashEvents,
980 ) -> anyhow::Result<()> {
981 let command =
982 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFlashEvents(cmd.clone()));
983 self.command_tx.send(command)?;
984 Ok(())
985 }
986
987 fn request_pool_snapshot(
988 &self,
989 cmd: &nautilus_common::messages::defi::RequestPoolSnapshot,
990 ) -> anyhow::Result<()> {
991 let command = DefiDataCommand::Request(DefiRequestCommand::PoolSnapshot(cmd.clone()));
992 self.command_tx.send(command)?;
993 Ok(())
994 }
995}