nautilus_blockchain/data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_common::{
17    messages::{
18        DataEvent,
19        defi::{
20            DefiDataCommand, DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand,
21            SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
22            SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
23            UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
24            UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
25        },
26    },
27    runtime::get_runtime,
28};
29use nautilus_data::client::DataClient;
30use nautilus_model::{
31    defi::{DefiData, SharedChain, validation::validate_address},
32    identifiers::{ClientId, Venue},
33};
34
35use crate::{
36    config::BlockchainDataClientConfig,
37    data::core::BlockchainDataClientCore,
38    exchanges::get_dex_extended,
39    rpc::{BlockchainRpcClient, types::BlockchainMessage},
40};
41
42/// A comprehensive client for interacting with blockchain data from multiple sources.
43///
44/// The `BlockchainDataClient` serves as a facade that coordinates between different blockchain
45/// data providers, caching mechanisms, and contract interactions. It provides a unified interface
46/// for retrieving and processing blockchain data, particularly focused on DeFi protocols.
47///
48/// This client supports two primary data sources:
49/// 1. Direct RPC connections to blockchain nodes (via WebSocket).
50/// 2. HyperSync API for efficient historical data queries.
51#[derive(Debug)]
52pub struct BlockchainDataClient {
53    /// The blockchain being targeted by this client instance.
54    pub chain: SharedChain,
55    /// Configuration parameters for the blockchain data client.
56    pub config: BlockchainDataClientConfig,
57    /// The core client instance that handles blockchain operations.
58    /// Wrapped in Option to allow moving it into the background processing task.
59    pub core_client: Option<BlockchainDataClientCore>,
60    /// Channel receiver for messages from the HyperSync client.
61    hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
62    /// Channel sender for messages to the HyperSync client.
63    hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
64    /// Channel sender for commands to be processed asynchronously.
65    command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
66    /// Channel receiver for commands to be processed asynchronously.
67    command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
68    /// Background task for processing messages.
69    process_task: Option<tokio::task::JoinHandle<()>>,
70    /// Cancellation token for graceful shutdown of background tasks.
71    cancellation_token: tokio_util::sync::CancellationToken,
72}
73
74impl BlockchainDataClient {
75    /// Creates a new [`BlockchainDataClient`] instance for the specified configuration.
76    #[must_use]
77    pub fn new(config: BlockchainDataClientConfig) -> Self {
78        let chain = config.chain.clone();
79        let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
80        let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
81        Self {
82            chain,
83            core_client: None,
84            config,
85            hypersync_rx: Some(hypersync_rx),
86            hypersync_tx: Some(hypersync_tx),
87            command_tx,
88            command_rx: Some(command_rx),
89            process_task: None,
90            cancellation_token: tokio_util::sync::CancellationToken::new(),
91        }
92    }
93
94    /// Spawns the main processing task that handles commands and blockchain data.
95    ///
96    /// This method creates a background task that:
97    /// 1. Processes subscription/unsubscription commands from the command channel
98    /// 2. Handles incoming blockchain data from HyperSync
99    /// 3. Processes RPC messages if RPC client is configured
100    /// 4. Routes processed data to subscribers
101    fn spawn_process_task(&mut self) {
102        let command_rx = if let Some(r) = self.command_rx.take() {
103            r
104        } else {
105            tracing::error!("Command receiver already taken, not spawning handler");
106            return;
107        };
108
109        let cancellation_token = self.cancellation_token.clone();
110
111        let data_tx = nautilus_common::runner::get_data_event_sender();
112
113        let mut hypersync_rx = self.hypersync_rx.take().unwrap();
114        let hypersync_tx = self.hypersync_tx.take();
115
116        let mut core_client = BlockchainDataClientCore::new(
117            self.config.clone(),
118            hypersync_tx,
119            Some(data_tx),
120            cancellation_token.clone(),
121        );
122
123        let handle = get_runtime().spawn(async move {
124            tracing::debug!("Started task 'process'");
125
126            if let Err(e) = core_client.connect().await {
127                // TODO: connect() could return more granular error types to distinguish
128                // cancellation from actual failures without string matching
129                if e.to_string().contains("cancelled") || e.to_string().contains("Sync cancelled") {
130                    tracing::warn!("Blockchain core client connection interrupted: {e}");
131                } else {
132                    tracing::error!("Failed to connect blockchain core client: {e}");
133                }
134                return;
135            }
136
137            let mut command_rx = command_rx;
138
139            loop {
140                tokio::select! {
141                    () = cancellation_token.cancelled() => {
142                        tracing::debug!("Received cancellation signal in Blockchain data client process task");
143                        core_client.disconnect().await;
144                        break;
145                    }
146                    command = command_rx.recv() => {
147                        if let Some(cmd) = command {
148                            match cmd {
149                                DefiDataCommand::Subscribe(cmd) => {
150                                    let chain = cmd.blockchain();
151                                    if chain != core_client.chain.name {
152                                        tracing::error!("Incorrect blockchain for subscribe command: {chain}");
153                                        continue;
154                                    }
155
156                                      if let Err(e) = Self::handle_subscribe_command(cmd, &mut core_client).await{
157                                        tracing::error!("Error processing subscribe command: {e}");
158                                    }
159                                }
160                                DefiDataCommand::Unsubscribe(cmd) => {
161                                    let chain = cmd.blockchain();
162                                    if chain != core_client.chain.name {
163                                        tracing::error!("Incorrect blockchain for subscribe command: {chain}");
164                                        continue;
165                                    }
166
167                                    if let Err(e) = Self::handle_unsubscribe_command(cmd, &mut core_client).await{
168                                        tracing::error!("Error processing subscribe command: {e}");
169                                    }
170                                }
171                                DefiDataCommand::Request(cmd) => {
172                                    if let Err(e) = Self::handle_request_command(cmd, &mut core_client).await {
173                                        tracing::error!("Error processing request command: {e}");
174                                    }
175                                }
176                            }
177                        } else {
178                            tracing::debug!("Command channel closed");
179                            break;
180                        }
181                    }
182                    data = hypersync_rx.recv() => {
183                        if let Some(msg) = data {
184                            let data_event = match msg {
185                                BlockchainMessage::Block(block) => {
186                                    // Fetch and process all subscribed events per DEX
187                                    for dex in core_client.cache.get_registered_dexes(){
188                                        let addresses = core_client.subscription_manager.get_subscribed_dex_contract_addresses(&dex);
189                                        if !addresses.is_empty() {
190                                            core_client.hypersync_client.process_block_dex_contract_events(
191                                                &dex,
192                                                block.number,
193                                                addresses,
194                                                core_client.subscription_manager.get_dex_pool_swap_event_signature(&dex).unwrap(),
195                                                core_client.subscription_manager.get_dex_pool_mint_event_signature(&dex).unwrap(),
196                                                core_client.subscription_manager.get_dex_pool_burn_event_signature(&dex).unwrap(),
197                                            );
198                                        }
199                                    }
200
201                                    Some(DataEvent::DeFi(DefiData::Block(block)))
202                                }
203                                BlockchainMessage::SwapEvent(swap_event) => {
204                                    match core_client.get_pool(&swap_event.pool_address) {
205                                        Ok(pool) => {
206                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
207                                            match core_client.process_pool_swap_event(
208                                                &swap_event,
209                                                pool,
210                                                dex_extended,
211                                            ){
212                                                Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
213                                                Err(e) => {
214                                                    tracing::error!("Error processing pool swap event: {e}");
215                                                    None
216                                                }
217                                            }
218                                        }
219                                        Err(e) => {
220                                            tracing::error!("Failed to get pool {} with error {:?}", swap_event.pool_address, e);
221                                            None
222                                        }
223                                    }
224                                }
225                                BlockchainMessage::BurnEvent(burn_event) => {
226                                    match core_client.get_pool(&burn_event.pool_address) {
227                                        Ok(pool) => {
228                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
229                                            match core_client.process_pool_burn_event(
230                                                &burn_event,
231                                                pool,
232                                                dex_extended,
233                                            ){
234                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
235                                                Err(e) => {
236                                                    tracing::error!("Error processing pool burn event: {e}");
237                                                    None
238                                                }
239                                            }
240                                        }
241                                        Err(e) => {
242                                            tracing::error!("Failed to get pool {} with error {:?}", burn_event.pool_address, e);
243                                            None
244                                        }
245                                    }
246                                }
247                                BlockchainMessage::MintEvent(mint_event) => {
248                                    match core_client.get_pool(&mint_event.pool_address) {
249                                        Ok(pool) => {
250                                            let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
251                                            match core_client.process_pool_mint_event(
252                                                &mint_event,
253                                                pool,
254                                                dex_extended,
255                                            ){
256                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
257                                                Err(e) => {
258                                                    tracing::error!("Error processing pool mint event: {e}");
259                                                    None
260                                                }
261                                            }
262                                        }
263                                        Err(e) => {
264                                            tracing::error!("Failed to get pool {} with error {:?}", mint_event.pool_address, e);
265                                            None
266                                        }
267                                    }
268                                }
269                                BlockchainMessage::CollectEvent(collect_event) => {
270                                    match core_client.get_pool(&collect_event.pool_address) {
271                                        Ok(pool) => {
272                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
273                                            match core_client.process_pool_collect_event(
274                                                &collect_event,
275                                                pool,
276                                                dex_extended,
277                                            ){
278                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolFeeCollect(update))),
279                                                Err(e) => {
280                                                    tracing::error!("Error processing pool collect event: {e}");
281                                                    None
282                                                }
283                                            }
284                                        }
285                                        Err(e) => {
286                                            tracing::error!("Failed to get pool {} with error {:?}", collect_event.pool_address, e);
287                                            None
288                                        }
289                                    }
290                                }
291                            BlockchainMessage::FlashEvent(flash_event) => {
292                                    match core_client.get_pool(&flash_event.pool_address) {
293                                        Ok(pool) => {
294                                            match core_client.process_pool_flash_event(&flash_event,pool){
295                                                Ok(flash) => Some(DataEvent::DeFi(DefiData::PoolFlash(flash))),
296                                                Err(e) => {
297                                                    tracing::error!("Error processing pool flash event: {e}");
298                                                    None
299                                                }
300                                            }
301                                        }
302                                        Err(e) => {
303                                            tracing::error!("Failed to get pool {} with error {:?}", flash_event.pool_address, e);
304                                            None
305                                        }
306                                    }
307                                }
308                            };
309
310                            if let Some(event) = data_event {
311                                core_client.send_data(event);
312                            }
313                        } else {
314                            tracing::debug!("HyperSync data channel closed");
315                            break;
316                        }
317                    }
318                    msg = async {
319                        match core_client.rpc_client {
320                            Some(ref mut rpc_client) => rpc_client.next_rpc_message().await,
321                            None => std::future::pending().await,  // Never resolves
322                        }
323                    } => {
324                        // This branch only fires when we actually receive a message
325                        match msg {
326                            Ok(BlockchainMessage::Block(block)) => {
327                                let data = DataEvent::DeFi(DefiData::Block(block));
328                                core_client.send_data(data);
329                            },
330                            Ok(BlockchainMessage::SwapEvent(_)) => {
331                                tracing::warn!("RPC swap events are not yet supported");
332                            }
333                            Ok(BlockchainMessage::MintEvent(_)) => {
334                                tracing::warn!("RPC mint events are not yet supported");
335                            }
336                            Ok(BlockchainMessage::BurnEvent(_)) => {
337                                tracing::warn!("RPC burn events are not yet supported");
338                            }
339                            Ok(BlockchainMessage::CollectEvent(_)) => {
340                                tracing::warn!("RPC collect events are not yet supported")
341                            }
342                            Ok(BlockchainMessage::FlashEvent(_)) => {
343                                tracing::warn!("RPC flash events are not yet supported")
344                            }
345                            Err(e) => {
346                                tracing::error!("Error processing RPC message: {e}");
347                            }
348                        }
349                    }
350                }
351            }
352
353            tracing::debug!("Stopped task 'process'");
354        });
355
356        self.process_task = Some(handle);
357    }
358
359    /// Processes DeFi subscription commands to start receiving specific blockchain data.
360    async fn handle_subscribe_command(
361        command: DefiSubscribeCommand,
362        core_client: &mut BlockchainDataClientCore,
363    ) -> anyhow::Result<()> {
364        match command {
365            DefiSubscribeCommand::Blocks(_cmd) => {
366                tracing::info!("Processing subscribe blocks command");
367
368                // Try RPC client first if available, otherwise use HyperSync
369                if let Some(ref mut rpc) = core_client.rpc_client {
370                    if let Err(e) = rpc.subscribe_blocks().await {
371                        tracing::warn!(
372                            "RPC blocks subscription failed: {e}, falling back to HyperSync"
373                        );
374                        core_client.hypersync_client.subscribe_blocks();
375                        tokio::task::yield_now().await;
376                    } else {
377                        tracing::info!("Successfully subscribed to blocks via RPC");
378                    }
379                } else {
380                    tracing::info!("Subscribing to blocks via HyperSync");
381                    core_client.hypersync_client.subscribe_blocks();
382                    tokio::task::yield_now().await;
383                }
384
385                Ok(())
386            }
387            DefiSubscribeCommand::Pool(cmd) => {
388                tracing::info!(
389                    "Processing subscribe pool command for {}",
390                    cmd.instrument_id
391                );
392
393                if let Some(ref mut _rpc) = core_client.rpc_client {
394                    tracing::warn!("RPC pool subscription not yet implemented, using HyperSync");
395                }
396
397                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
398                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
399                        .map_err(|e| {
400                            anyhow::anyhow!(
401                                "Invalid pool address '{}' failed with error: {:?}",
402                                cmd.instrument_id,
403                                e
404                            )
405                        })?;
406
407                    // Subscribe to all pool event types
408                    core_client
409                        .subscription_manager
410                        .subscribe_swaps(dex, pool_address);
411                    core_client
412                        .subscription_manager
413                        .subscribe_burns(dex, pool_address);
414                    core_client
415                        .subscription_manager
416                        .subscribe_mints(dex, pool_address);
417                    core_client
418                        .subscription_manager
419                        .subscribe_collects(dex, pool_address);
420                    core_client
421                        .subscription_manager
422                        .subscribe_flashes(dex, pool_address);
423
424                    tracing::info!(
425                        "Subscribed to all pool events for {} at address {}",
426                        cmd.instrument_id,
427                        pool_address
428                    );
429                } else {
430                    anyhow::bail!(
431                        "Invalid venue {}, expected Blockchain DEX format",
432                        cmd.instrument_id.venue
433                    )
434                }
435
436                Ok(())
437            }
438            DefiSubscribeCommand::PoolSwaps(cmd) => {
439                tracing::info!(
440                    "Processing subscribe pool swaps command for {}",
441                    cmd.instrument_id
442                );
443
444                if let Some(ref mut _rpc) = core_client.rpc_client {
445                    tracing::warn!(
446                        "RPC pool swaps subscription not yet implemented, using HyperSync"
447                    );
448                }
449
450                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
451                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
452                        .map_err(|e| {
453                            anyhow::anyhow!(
454                                "Invalid pool swap address '{}' failed with error: {:?}",
455                                cmd.instrument_id,
456                                e
457                            )
458                        })?;
459                    core_client
460                        .subscription_manager
461                        .subscribe_swaps(dex, pool_address);
462                } else {
463                    anyhow::bail!(
464                        "Invalid venue {}, expected Blockchain DEX format",
465                        cmd.instrument_id.venue
466                    )
467                }
468
469                Ok(())
470            }
471            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
472                tracing::info!(
473                    "Processing subscribe pool liquidity updates command for address: {}",
474                    cmd.instrument_id
475                );
476
477                if let Some(ref mut _rpc) = core_client.rpc_client {
478                    tracing::warn!(
479                        "RPC pool liquidity updates subscription not yet implemented, using HyperSync"
480                    );
481                }
482
483                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
484                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
485                        .map_err(|_| {
486                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
487                        })?;
488                    core_client
489                        .subscription_manager
490                        .subscribe_burns(dex, pool_address);
491                    core_client
492                        .subscription_manager
493                        .subscribe_mints(dex, pool_address);
494                } else {
495                    anyhow::bail!(
496                        "Invalid venue {}, expected Blockchain DEX format",
497                        cmd.instrument_id.venue
498                    )
499                }
500
501                Ok(())
502            }
503            DefiSubscribeCommand::PoolFeeCollects(cmd) => {
504                tracing::info!(
505                    "Processing subscribe pool fee collects command for address: {}",
506                    cmd.instrument_id
507                );
508
509                if let Some(ref mut _rpc) = core_client.rpc_client {
510                    tracing::warn!(
511                        "RPC pool fee collects subscription not yet implemented, using HyperSync"
512                    );
513                }
514
515                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
516                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
517                        .map_err(|_| {
518                            anyhow::anyhow!(
519                                "Invalid pool fee collect address: {}",
520                                cmd.instrument_id
521                            )
522                        })?;
523                    core_client
524                        .subscription_manager
525                        .subscribe_collects(dex, pool_address);
526                } else {
527                    anyhow::bail!(
528                        "Invalid venue {}, expected Blockchain DEX format",
529                        cmd.instrument_id.venue
530                    )
531                }
532
533                Ok(())
534            }
535            DefiSubscribeCommand::PoolFlashEvents(cmd) => {
536                tracing::info!(
537                    "Processing subscribe pool flash command for address: {}",
538                    cmd.instrument_id
539                );
540
541                if let Some(ref mut _rpc) = core_client.rpc_client {
542                    tracing::warn!(
543                        "RPC pool fee collects subscription not yet implemented, using HyperSync"
544                    );
545                }
546
547                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
548                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
549                        .map_err(|_| {
550                            anyhow::anyhow!(
551                                "Invalid pool flash subscribe address: {}",
552                                cmd.instrument_id
553                            )
554                        })?;
555                    core_client
556                        .subscription_manager
557                        .subscribe_flashes(dex, pool_address);
558                } else {
559                    anyhow::bail!(
560                        "Invalid venue {}, expected Blockchain DEX format",
561                        cmd.instrument_id.venue
562                    )
563                }
564
565                Ok(())
566            }
567        }
568    }
569
570    /// Processes DeFi unsubscription commands to stop receiving specific blockchain data.
571    async fn handle_unsubscribe_command(
572        command: DefiUnsubscribeCommand,
573        core_client: &mut BlockchainDataClientCore,
574    ) -> anyhow::Result<()> {
575        match command {
576            DefiUnsubscribeCommand::Blocks(_cmd) => {
577                tracing::info!("Processing unsubscribe blocks command");
578
579                // TODO: Implement RPC unsubscription when available
580                if core_client.rpc_client.is_some() {
581                    tracing::warn!("RPC blocks unsubscription not yet implemented");
582                }
583
584                // Use HyperSync client for unsubscription
585                core_client.hypersync_client.unsubscribe_blocks().await;
586                tracing::info!("Unsubscribed from blocks via HyperSync");
587
588                Ok(())
589            }
590            DefiUnsubscribeCommand::Pool(cmd) => {
591                tracing::info!(
592                    "Processing unsubscribe pool command for {}",
593                    cmd.instrument_id
594                );
595
596                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
597                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
598                        .map_err(|_| {
599                            anyhow::anyhow!("Invalid pool address: {}", cmd.instrument_id)
600                        })?;
601
602                    // Unsubscribe from all pool event types
603                    core_client
604                        .subscription_manager
605                        .unsubscribe_swaps(dex, pool_address);
606                    core_client
607                        .subscription_manager
608                        .unsubscribe_burns(dex, pool_address);
609                    core_client
610                        .subscription_manager
611                        .unsubscribe_mints(dex, pool_address);
612                    core_client
613                        .subscription_manager
614                        .unsubscribe_collects(dex, pool_address);
615                    core_client
616                        .subscription_manager
617                        .unsubscribe_flashes(dex, pool_address);
618
619                    tracing::info!(
620                        "Unsubscribed from all pool events for {} at address {}",
621                        cmd.instrument_id,
622                        pool_address
623                    );
624                } else {
625                    anyhow::bail!(
626                        "Invalid venue {}, expected Blockchain DEX format",
627                        cmd.instrument_id.venue
628                    )
629                }
630
631                Ok(())
632            }
633            DefiUnsubscribeCommand::PoolSwaps(cmd) => {
634                tracing::info!("Processing unsubscribe pool swaps command");
635
636                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
637                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
638                        .map_err(|_| {
639                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
640                        })?;
641                    core_client
642                        .subscription_manager
643                        .unsubscribe_swaps(dex, pool_address);
644                } else {
645                    anyhow::bail!(
646                        "Invalid venue {}, expected Blockchain DEX format",
647                        cmd.instrument_id.venue
648                    )
649                }
650
651                Ok(())
652            }
653            DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
654                tracing::info!(
655                    "Processing unsubscribe pool liquidity updates command for {}",
656                    cmd.instrument_id
657                );
658
659                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
660                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
661                        .map_err(|_| {
662                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
663                        })?;
664                    core_client
665                        .subscription_manager
666                        .unsubscribe_burns(dex, pool_address);
667                    core_client
668                        .subscription_manager
669                        .unsubscribe_mints(dex, pool_address);
670                } else {
671                    anyhow::bail!(
672                        "Invalid venue {}, expected Blockchain DEX format",
673                        cmd.instrument_id.venue
674                    )
675                }
676
677                Ok(())
678            }
679            DefiUnsubscribeCommand::PoolFeeCollects(cmd) => {
680                tracing::info!(
681                    "Processing unsubscribe pool fee collects command for {}",
682                    cmd.instrument_id
683                );
684
685                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
686                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
687                        .map_err(|_| {
688                            anyhow::anyhow!(
689                                "Invalid pool fee collect address: {}",
690                                cmd.instrument_id
691                            )
692                        })?;
693                    core_client
694                        .subscription_manager
695                        .unsubscribe_collects(dex, pool_address);
696                } else {
697                    anyhow::bail!(
698                        "Invalid venue {}, expected Blockchain DEX format",
699                        cmd.instrument_id.venue
700                    )
701                }
702
703                Ok(())
704            }
705            DefiUnsubscribeCommand::PoolFlashEvents(cmd) => {
706                tracing::info!(
707                    "Processing unsubscribe pool flash command for {}",
708                    cmd.instrument_id
709                );
710
711                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
712                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
713                        .map_err(|_| {
714                            anyhow::anyhow!("Invalid pool flash address: {}", cmd.instrument_id)
715                        })?;
716                    core_client
717                        .subscription_manager
718                        .unsubscribe_flashes(dex, pool_address);
719                } else {
720                    anyhow::bail!(
721                        "Invalid venue {}, expected Blockchain DEX format",
722                        cmd.instrument_id.venue
723                    )
724                }
725
726                Ok(())
727            }
728        }
729    }
730
731    /// Processes DeFi request commands to fetch specific blockchain data.
732    async fn handle_request_command(
733        command: DefiRequestCommand,
734        core_client: &mut BlockchainDataClientCore,
735    ) -> anyhow::Result<()> {
736        match command {
737            DefiRequestCommand::PoolSnapshot(cmd) => {
738                tracing::info!("Processing pool snapshot request for {}", cmd.instrument_id);
739
740                let pool_address =
741                    validate_address(cmd.instrument_id.symbol.as_str()).map_err(|e| {
742                        anyhow::anyhow!(
743                            "Invalid pool address '{}' failed with error: {:?}",
744                            cmd.instrument_id,
745                            e
746                        )
747                    })?;
748
749                match core_client.get_pool(&pool_address) {
750                    Ok(pool) => {
751                        let pool = pool.clone();
752                        tracing::debug!("Found pool for snapshot request: {}", cmd.instrument_id);
753
754                        // Send the pool definition
755                        let pool_data = DataEvent::DeFi(DefiData::Pool(pool.as_ref().clone()));
756                        core_client.send_data(pool_data);
757
758                        match core_client.bootstrap_latest_pool_profiler(&pool).await {
759                            Ok((profiler, already_valid)) => {
760                                let snapshot = profiler.extract_snapshot();
761
762                                tracing::info!(
763                                    "Saving pool snapshot with {} positions and {} ticks to database...",
764                                    snapshot.positions.len(),
765                                    snapshot.ticks.len()
766                                );
767                                core_client
768                                    .cache
769                                    .add_pool_snapshot(&pool.address, &snapshot)
770                                    .await?;
771
772                                // If snapshot is valid, send it back to the data engine.
773                                if core_client
774                                    .check_snapshot_validity(&profiler, already_valid)
775                                    .await?
776                                {
777                                    let snapshot_data =
778                                        DataEvent::DeFi(DefiData::PoolSnapshot(snapshot));
779                                    core_client.send_data(snapshot_data);
780                                }
781                            }
782                            Err(e) => tracing::error!(
783                                "Failed to bootstrap pool profiler for {} and extract snapshot with error {}",
784                                cmd.instrument_id,
785                                e.to_string()
786                            ),
787                        }
788                    }
789                    Err(e) => {
790                        tracing::warn!("Pool {} not found in cache: {e}", cmd.instrument_id);
791                    }
792                }
793
794                Ok(())
795            }
796        }
797    }
798
799    /// Waits for the background processing task to complete.
800    ///
801    /// This method blocks until the spawned process task finishes execution,
802    /// which typically happens after a shutdown signal is sent.
803    pub async fn await_process_task_close(&mut self) {
804        if let Some(handle) = self.process_task.take()
805            && let Err(e) = handle.await
806        {
807            tracing::error!("Process task join error: {e}");
808        }
809    }
810}
811
812#[async_trait::async_trait]
813impl DataClient for BlockchainDataClient {
814    fn client_id(&self) -> ClientId {
815        ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
816    }
817
818    fn venue(&self) -> Option<Venue> {
819        // Blockchain data clients don't map to a single venue since they can provide
820        // data for multiple DEXs across the blockchain
821        None
822    }
823
824    fn start(&mut self) -> anyhow::Result<()> {
825        tracing::info!(
826            "Starting blockchain data client for '{chain_name}'",
827            chain_name = self.chain.name
828        );
829        Ok(())
830    }
831
832    fn stop(&mut self) -> anyhow::Result<()> {
833        tracing::info!(
834            "Stopping blockchain data client for '{chain_name}'",
835            chain_name = self.chain.name
836        );
837        self.cancellation_token.cancel();
838
839        // Create fresh token for next start cycle
840        self.cancellation_token = tokio_util::sync::CancellationToken::new();
841        Ok(())
842    }
843
844    fn reset(&mut self) -> anyhow::Result<()> {
845        tracing::info!(
846            "Resetting blockchain data client for '{chain_name}'",
847            chain_name = self.chain.name
848        );
849        self.cancellation_token = tokio_util::sync::CancellationToken::new();
850        Ok(())
851    }
852
853    fn dispose(&mut self) -> anyhow::Result<()> {
854        tracing::info!(
855            "Disposing blockchain data client for '{chain_name}'",
856            chain_name = self.chain.name
857        );
858        Ok(())
859    }
860
861    async fn connect(&mut self) -> anyhow::Result<()> {
862        tracing::info!(
863            "Connecting blockchain data client for '{}'",
864            self.chain.name
865        );
866
867        if self.process_task.is_none() {
868            self.spawn_process_task();
869        }
870
871        Ok(())
872    }
873
874    async fn disconnect(&mut self) -> anyhow::Result<()> {
875        tracing::info!(
876            "Disconnecting blockchain data client for '{}'",
877            self.chain.name
878        );
879
880        self.cancellation_token.cancel();
881        self.await_process_task_close().await;
882
883        // Create fresh token and channels for next connect cycle
884        self.cancellation_token = tokio_util::sync::CancellationToken::new();
885        let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
886        self.hypersync_tx = Some(hypersync_tx);
887        self.hypersync_rx = Some(hypersync_rx);
888        let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
889        self.command_tx = command_tx;
890        self.command_rx = Some(command_rx);
891
892        Ok(())
893    }
894
895    fn is_connected(&self) -> bool {
896        // TODO: Improve connection detection
897        // For now, we'll assume connected if we have either RPC or HyperSync configured
898        true
899    }
900
901    fn is_disconnected(&self) -> bool {
902        !self.is_connected()
903    }
904
905    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
906        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd.clone()));
907        self.command_tx.send(command)?;
908        Ok(())
909    }
910
911    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
912        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd.clone()));
913        self.command_tx.send(command)?;
914        Ok(())
915    }
916
917    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
918        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd.clone()));
919        self.command_tx.send(command)?;
920        Ok(())
921    }
922
923    fn subscribe_pool_liquidity_updates(
924        &mut self,
925        cmd: &SubscribePoolLiquidityUpdates,
926    ) -> anyhow::Result<()> {
927        let command =
928            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
929        self.command_tx.send(command)?;
930        Ok(())
931    }
932
933    fn subscribe_pool_fee_collects(
934        &mut self,
935        cmd: &SubscribePoolFeeCollects,
936    ) -> anyhow::Result<()> {
937        let command =
938            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFeeCollects(cmd.clone()));
939        self.command_tx.send(command)?;
940        Ok(())
941    }
942
943    fn subscribe_pool_flash_events(
944        &mut self,
945        cmd: &SubscribePoolFlashEvents,
946    ) -> anyhow::Result<()> {
947        let command =
948            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFlashEvents(cmd.clone()));
949        self.command_tx.send(command)?;
950        Ok(())
951    }
952
953    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
954        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
955        self.command_tx.send(command)?;
956        Ok(())
957    }
958
959    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
960        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
961        self.command_tx.send(command)?;
962        Ok(())
963    }
964
965    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
966        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
967        self.command_tx.send(command)?;
968        Ok(())
969    }
970
971    fn unsubscribe_pool_liquidity_updates(
972        &mut self,
973        cmd: &UnsubscribePoolLiquidityUpdates,
974    ) -> anyhow::Result<()> {
975        let command =
976            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
977        self.command_tx.send(command)?;
978        Ok(())
979    }
980
981    fn unsubscribe_pool_fee_collects(
982        &mut self,
983        cmd: &UnsubscribePoolFeeCollects,
984    ) -> anyhow::Result<()> {
985        let command =
986            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFeeCollects(cmd.clone()));
987        self.command_tx.send(command)?;
988        Ok(())
989    }
990
991    fn unsubscribe_pool_flash_events(
992        &mut self,
993        cmd: &UnsubscribePoolFlashEvents,
994    ) -> anyhow::Result<()> {
995        let command =
996            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFlashEvents(cmd.clone()));
997        self.command_tx.send(command)?;
998        Ok(())
999    }
1000
1001    fn request_pool_snapshot(
1002        &self,
1003        cmd: &nautilus_common::messages::defi::RequestPoolSnapshot,
1004    ) -> anyhow::Result<()> {
1005        let command = DefiDataCommand::Request(DefiRequestCommand::PoolSnapshot(cmd.clone()));
1006        self.command_tx.send(command)?;
1007        Ok(())
1008    }
1009}