nautilus_blockchain/data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_common::{
17    defi::RequestPoolSnapshot,
18    live::runtime::get_runtime,
19    messages::{
20        DataEvent,
21        defi::{
22            DefiDataCommand, DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand,
23            SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
24            SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
25            UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
26            UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
27        },
28    },
29};
30use nautilus_data::client::DataClient;
31use nautilus_model::{
32    defi::{DefiData, PoolIdentifier, SharedChain, validation::validate_address},
33    identifiers::{ClientId, Venue},
34};
35use ustr::Ustr;
36
37use crate::{
38    config::BlockchainDataClientConfig,
39    data::core::BlockchainDataClientCore,
40    exchanges::get_dex_extended,
41    rpc::{BlockchainRpcClient, types::BlockchainMessage},
42};
43
44/// A comprehensive client for interacting with blockchain data from multiple sources.
45///
46/// The `BlockchainDataClient` serves as a facade that coordinates between different blockchain
47/// data providers, caching mechanisms, and contract interactions. It provides a unified interface
48/// for retrieving and processing blockchain data, particularly focused on DeFi protocols.
49///
50/// This client supports two primary data sources:
51/// 1. Direct RPC connections to blockchain nodes (via WebSocket).
52/// 2. HyperSync API for efficient historical data queries.
53#[derive(Debug)]
54pub struct BlockchainDataClient {
55    /// The blockchain being targeted by this client instance.
56    pub chain: SharedChain,
57    /// Configuration parameters for the blockchain data client.
58    pub config: BlockchainDataClientConfig,
59    /// The core client instance that handles blockchain operations.
60    /// Wrapped in Option to allow moving it into the background processing task.
61    pub core_client: Option<BlockchainDataClientCore>,
62    /// Channel receiver for messages from the HyperSync client.
63    hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
64    /// Channel sender for messages to the HyperSync client.
65    hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
66    /// Channel sender for commands to be processed asynchronously.
67    command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
68    /// Channel receiver for commands to be processed asynchronously.
69    command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
70    /// Background task for processing messages.
71    process_task: Option<tokio::task::JoinHandle<()>>,
72    /// Cancellation token for graceful shutdown of background tasks.
73    cancellation_token: tokio_util::sync::CancellationToken,
74}
75
76impl BlockchainDataClient {
77    /// Creates a new [`BlockchainDataClient`] instance for the specified configuration.
78    #[must_use]
79    pub fn new(config: BlockchainDataClientConfig) -> Self {
80        let chain = config.chain.clone();
81        let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
82        let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
83        Self {
84            chain,
85            core_client: None,
86            config,
87            hypersync_rx: Some(hypersync_rx),
88            hypersync_tx: Some(hypersync_tx),
89            command_tx,
90            command_rx: Some(command_rx),
91            process_task: None,
92            cancellation_token: tokio_util::sync::CancellationToken::new(),
93        }
94    }
95
96    /// Spawns the main processing task that handles commands and blockchain data.
97    ///
98    /// This method creates a background task that:
99    /// 1. Processes subscription/unsubscription commands from the command channel
100    /// 2. Handles incoming blockchain data from HyperSync
101    /// 3. Processes RPC messages if RPC client is configured
102    /// 4. Routes processed data to subscribers
103    fn spawn_process_task(&mut self) {
104        let command_rx = if let Some(r) = self.command_rx.take() {
105            r
106        } else {
107            tracing::error!("Command receiver already taken, not spawning handler");
108            return;
109        };
110
111        let cancellation_token = self.cancellation_token.clone();
112
113        let data_tx = nautilus_common::live::runner::get_data_event_sender();
114
115        let mut hypersync_rx = self.hypersync_rx.take().unwrap();
116        let hypersync_tx = self.hypersync_tx.take();
117
118        let mut core_client = BlockchainDataClientCore::new(
119            self.config.clone(),
120            hypersync_tx,
121            Some(data_tx),
122            cancellation_token.clone(),
123        );
124
125        let handle = get_runtime().spawn(async move {
126            tracing::debug!("Started task 'process'");
127
128            if let Err(e) = core_client.connect().await {
129                // TODO: connect() could return more granular error types to distinguish
130                // cancellation from actual failures without string matching
131                if e.to_string().contains("cancelled") || e.to_string().contains("Sync cancelled") {
132                    tracing::warn!("Blockchain core client connection interrupted: {e}");
133                } else {
134                    tracing::error!("Failed to connect blockchain core client: {e}");
135                }
136                return;
137            }
138
139            let mut command_rx = command_rx;
140
141            loop {
142                tokio::select! {
143                    () = cancellation_token.cancelled() => {
144                        tracing::debug!("Received cancellation signal in Blockchain data client process task");
145                        core_client.disconnect().await;
146                        break;
147                    }
148                    command = command_rx.recv() => {
149                        if let Some(cmd) = command {
150                            match cmd {
151                                DefiDataCommand::Subscribe(cmd) => {
152                                    let chain = cmd.blockchain();
153                                    if chain != core_client.chain.name {
154                                        tracing::error!("Incorrect blockchain for subscribe command: {chain}");
155                                        continue;
156                                    }
157
158                                      if let Err(e) = Self::handle_subscribe_command(cmd, &mut core_client).await{
159                                        tracing::error!("Error processing subscribe command: {e}");
160                                    }
161                                }
162                                DefiDataCommand::Unsubscribe(cmd) => {
163                                    let chain = cmd.blockchain();
164                                    if chain != core_client.chain.name {
165                                        tracing::error!("Incorrect blockchain for subscribe command: {chain}");
166                                        continue;
167                                    }
168
169                                    if let Err(e) = Self::handle_unsubscribe_command(cmd, &mut core_client).await{
170                                        tracing::error!("Error processing subscribe command: {e}");
171                                    }
172                                }
173                                DefiDataCommand::Request(cmd) => {
174                                    if let Err(e) = Self::handle_request_command(cmd, &mut core_client).await {
175                                        tracing::error!("Error processing request command: {e}");
176                                    }
177                                }
178                            }
179                        } else {
180                            tracing::debug!("Command channel closed");
181                            break;
182                        }
183                    }
184                    data = hypersync_rx.recv() => {
185                        if let Some(msg) = data {
186                            let data_event = match msg {
187                                BlockchainMessage::Block(block) => {
188                                    // Fetch and process all subscribed events per DEX
189                                    for dex in core_client.cache.get_registered_dexes(){
190                                        let addresses = core_client.subscription_manager.get_subscribed_dex_contract_addresses(&dex);
191                                        if !addresses.is_empty() {
192                                            core_client.hypersync_client.process_block_dex_contract_events(
193                                                &dex,
194                                                block.number,
195                                                addresses,
196                                                core_client.subscription_manager.get_dex_pool_swap_event_signature(&dex).unwrap(),
197                                                core_client.subscription_manager.get_dex_pool_mint_event_signature(&dex).unwrap(),
198                                                core_client.subscription_manager.get_dex_pool_burn_event_signature(&dex).unwrap(),
199                                            );
200                                        }
201                                    }
202
203                                    Some(DataEvent::DeFi(DefiData::Block(block)))
204                                }
205                                BlockchainMessage::SwapEvent(swap_event) => {
206                                    match core_client.get_pool(&swap_event.pool_identifier) {
207                                        Ok(pool) => {
208                                            match core_client.process_pool_swap_event(&swap_event, pool){
209                                                Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
210                                                Err(e) => {
211                                                    tracing::error!("Error processing pool swap event: {e}");
212                                                    None
213                                                }
214                                            }
215                                        }
216                                        Err(e) => {
217                                            tracing::error!("Failed to get pool {} with error {:?}", swap_event.pool_identifier, e);
218                                            None
219                                        }
220                                    }
221                                }
222                                BlockchainMessage::BurnEvent(burn_event) => {
223                                    match core_client.get_pool(&burn_event.pool_identifier) {
224                                        Ok(pool) => {
225                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
226                                            match core_client.process_pool_burn_event(
227                                                &burn_event,
228                                                pool,
229                                                dex_extended,
230                                            ){
231                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
232                                                Err(e) => {
233                                                    tracing::error!("Error processing pool burn event: {e}");
234                                                    None
235                                                }
236                                            }
237                                        }
238                                        Err(e) => {
239                                            tracing::error!("Failed to get pool {} with error {:?}", burn_event.pool_identifier, e);
240                                            None
241                                        }
242                                    }
243                                }
244                                BlockchainMessage::MintEvent(mint_event) => {
245                                    match core_client.get_pool(&mint_event.pool_identifier) {
246                                        Ok(pool) => {
247                                            let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
248                                            match core_client.process_pool_mint_event(
249                                                &mint_event,
250                                                pool,
251                                                dex_extended,
252                                            ){
253                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
254                                                Err(e) => {
255                                                    tracing::error!("Error processing pool mint event: {e}");
256                                                    None
257                                                }
258                                            }
259                                        }
260                                        Err(e) => {
261                                            tracing::error!("Failed to get pool {} with error {:?}", mint_event.pool_identifier, e);
262                                            None
263                                        }
264                                    }
265                                }
266                                BlockchainMessage::CollectEvent(collect_event) => {
267                                    match core_client.get_pool(&collect_event.pool_identifier) {
268                                        Ok(pool) => {
269                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
270                                            match core_client.process_pool_collect_event(
271                                                &collect_event,
272                                                pool,
273                                                dex_extended,
274                                            ){
275                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolFeeCollect(update))),
276                                                Err(e) => {
277                                                    tracing::error!("Error processing pool collect event: {e}");
278                                                    None
279                                                }
280                                            }
281                                        }
282                                        Err(e) => {
283                                            tracing::error!("Failed to get pool {} with error {:?}", collect_event.pool_identifier, e);
284                                            None
285                                        }
286                                    }
287                                }
288                            BlockchainMessage::FlashEvent(flash_event) => {
289                                    match core_client.get_pool(&flash_event.pool_identifier) {
290                                        Ok(pool) => {
291                                            match core_client.process_pool_flash_event(&flash_event,pool){
292                                                Ok(flash) => Some(DataEvent::DeFi(DefiData::PoolFlash(flash))),
293                                                Err(e) => {
294                                                    tracing::error!("Error processing pool flash event: {e}");
295                                                    None
296                                                }
297                                            }
298                                        }
299                                        Err(e) => {
300                                            tracing::error!("Failed to get pool {} with error {:?}", flash_event.pool_identifier, e);
301                                            None
302                                        }
303                                    }
304                                }
305                            };
306
307                            if let Some(event) = data_event {
308                                core_client.send_data(event);
309                            }
310                        } else {
311                            tracing::debug!("HyperSync data channel closed");
312                            break;
313                        }
314                    }
315                    msg = async {
316                        match core_client.rpc_client {
317                            Some(ref mut rpc_client) => rpc_client.next_rpc_message().await,
318                            None => std::future::pending().await,  // Never resolves
319                        }
320                    } => {
321                        // This branch only fires when we actually receive a message
322                        match msg {
323                            Ok(BlockchainMessage::Block(block)) => {
324                                let data = DataEvent::DeFi(DefiData::Block(block));
325                                core_client.send_data(data);
326                            },
327                            Ok(BlockchainMessage::SwapEvent(_)) => {
328                                tracing::warn!("RPC swap events are not yet supported");
329                            }
330                            Ok(BlockchainMessage::MintEvent(_)) => {
331                                tracing::warn!("RPC mint events are not yet supported");
332                            }
333                            Ok(BlockchainMessage::BurnEvent(_)) => {
334                                tracing::warn!("RPC burn events are not yet supported");
335                            }
336                            Ok(BlockchainMessage::CollectEvent(_)) => {
337                                tracing::warn!("RPC collect events are not yet supported");
338                            }
339                            Ok(BlockchainMessage::FlashEvent(_)) => {
340                                tracing::warn!("RPC flash events are not yet supported");
341                            }
342                            Err(e) => {
343                                tracing::error!("Error processing RPC message: {e}");
344                            }
345                        }
346                    }
347                }
348            }
349
350            tracing::debug!("Stopped task 'process'");
351        });
352
353        self.process_task = Some(handle);
354    }
355
356    /// Processes DeFi subscription commands to start receiving specific blockchain data.
357    async fn handle_subscribe_command(
358        command: DefiSubscribeCommand,
359        core_client: &mut BlockchainDataClientCore,
360    ) -> anyhow::Result<()> {
361        match command {
362            DefiSubscribeCommand::Blocks(_cmd) => {
363                tracing::info!("Processing subscribe blocks command");
364
365                // Try RPC client first if available, otherwise use HyperSync
366                if let Some(ref mut rpc) = core_client.rpc_client {
367                    if let Err(e) = rpc.subscribe_blocks().await {
368                        tracing::warn!(
369                            "RPC blocks subscription failed: {e}, falling back to HyperSync"
370                        );
371                        core_client.hypersync_client.subscribe_blocks();
372                        tokio::task::yield_now().await;
373                    } else {
374                        tracing::info!("Successfully subscribed to blocks via RPC");
375                    }
376                } else {
377                    tracing::info!("Subscribing to blocks via HyperSync");
378                    core_client.hypersync_client.subscribe_blocks();
379                    tokio::task::yield_now().await;
380                }
381
382                Ok(())
383            }
384            DefiSubscribeCommand::Pool(cmd) => {
385                tracing::info!(
386                    "Processing subscribe pool command for {}",
387                    cmd.instrument_id
388                );
389
390                if let Some(ref mut _rpc) = core_client.rpc_client {
391                    tracing::warn!("RPC pool subscription not yet implemented, using HyperSync");
392                }
393
394                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
395                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
396                        .map_err(|e| {
397                            anyhow::anyhow!(
398                                "Invalid pool address '{}' failed with error: {:?}",
399                                cmd.instrument_id,
400                                e
401                            )
402                        })?;
403
404                    // Subscribe to all pool event types
405                    core_client
406                        .subscription_manager
407                        .subscribe_swaps(dex, pool_address);
408                    core_client
409                        .subscription_manager
410                        .subscribe_burns(dex, pool_address);
411                    core_client
412                        .subscription_manager
413                        .subscribe_mints(dex, pool_address);
414                    core_client
415                        .subscription_manager
416                        .subscribe_collects(dex, pool_address);
417                    core_client
418                        .subscription_manager
419                        .subscribe_flashes(dex, pool_address);
420
421                    tracing::info!(
422                        "Subscribed to all pool events for {} at address {}",
423                        cmd.instrument_id,
424                        pool_address
425                    );
426                } else {
427                    anyhow::bail!(
428                        "Invalid venue {}, expected Blockchain DEX format",
429                        cmd.instrument_id.venue
430                    )
431                }
432
433                Ok(())
434            }
435            DefiSubscribeCommand::PoolSwaps(cmd) => {
436                tracing::info!(
437                    "Processing subscribe pool swaps command for {}",
438                    cmd.instrument_id
439                );
440
441                if let Some(ref mut _rpc) = core_client.rpc_client {
442                    tracing::warn!(
443                        "RPC pool swaps subscription not yet implemented, using HyperSync"
444                    );
445                }
446
447                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
448                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
449                        .map_err(|e| {
450                            anyhow::anyhow!(
451                                "Invalid pool swap address '{}' failed with error: {:?}",
452                                cmd.instrument_id,
453                                e
454                            )
455                        })?;
456                    core_client
457                        .subscription_manager
458                        .subscribe_swaps(dex, pool_address);
459                } else {
460                    anyhow::bail!(
461                        "Invalid venue {}, expected Blockchain DEX format",
462                        cmd.instrument_id.venue
463                    )
464                }
465
466                Ok(())
467            }
468            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
469                tracing::info!(
470                    "Processing subscribe pool liquidity updates command for address: {}",
471                    cmd.instrument_id
472                );
473
474                if let Some(ref mut _rpc) = core_client.rpc_client {
475                    tracing::warn!(
476                        "RPC pool liquidity updates subscription not yet implemented, using HyperSync"
477                    );
478                }
479
480                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
481                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
482                        .map_err(|_| {
483                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
484                        })?;
485                    core_client
486                        .subscription_manager
487                        .subscribe_burns(dex, pool_address);
488                    core_client
489                        .subscription_manager
490                        .subscribe_mints(dex, pool_address);
491                } else {
492                    anyhow::bail!(
493                        "Invalid venue {}, expected Blockchain DEX format",
494                        cmd.instrument_id.venue
495                    )
496                }
497
498                Ok(())
499            }
500            DefiSubscribeCommand::PoolFeeCollects(cmd) => {
501                tracing::info!(
502                    "Processing subscribe pool fee collects command for address: {}",
503                    cmd.instrument_id
504                );
505
506                if let Some(ref mut _rpc) = core_client.rpc_client {
507                    tracing::warn!(
508                        "RPC pool fee collects subscription not yet implemented, using HyperSync"
509                    );
510                }
511
512                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
513                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
514                        .map_err(|_| {
515                            anyhow::anyhow!(
516                                "Invalid pool fee collect address: {}",
517                                cmd.instrument_id
518                            )
519                        })?;
520                    core_client
521                        .subscription_manager
522                        .subscribe_collects(dex, pool_address);
523                } else {
524                    anyhow::bail!(
525                        "Invalid venue {}, expected Blockchain DEX format",
526                        cmd.instrument_id.venue
527                    )
528                }
529
530                Ok(())
531            }
532            DefiSubscribeCommand::PoolFlashEvents(cmd) => {
533                tracing::info!(
534                    "Processing subscribe pool flash command for address: {}",
535                    cmd.instrument_id
536                );
537
538                if let Some(ref mut _rpc) = core_client.rpc_client {
539                    tracing::warn!(
540                        "RPC pool fee collects subscription not yet implemented, using HyperSync"
541                    );
542                }
543
544                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
545                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
546                        .map_err(|_| {
547                            anyhow::anyhow!(
548                                "Invalid pool flash subscribe address: {}",
549                                cmd.instrument_id
550                            )
551                        })?;
552                    core_client
553                        .subscription_manager
554                        .subscribe_flashes(dex, pool_address);
555                } else {
556                    anyhow::bail!(
557                        "Invalid venue {}, expected Blockchain DEX format",
558                        cmd.instrument_id.venue
559                    )
560                }
561
562                Ok(())
563            }
564        }
565    }
566
567    /// Processes DeFi unsubscription commands to stop receiving specific blockchain data.
568    async fn handle_unsubscribe_command(
569        command: DefiUnsubscribeCommand,
570        core_client: &mut BlockchainDataClientCore,
571    ) -> anyhow::Result<()> {
572        match command {
573            DefiUnsubscribeCommand::Blocks(_cmd) => {
574                tracing::info!("Processing unsubscribe blocks command");
575
576                // TODO: Implement RPC unsubscription when available
577                if core_client.rpc_client.is_some() {
578                    tracing::warn!("RPC blocks unsubscription not yet implemented");
579                }
580
581                // Use HyperSync client for unsubscription
582                core_client.hypersync_client.unsubscribe_blocks().await;
583                tracing::info!("Unsubscribed from blocks via HyperSync");
584
585                Ok(())
586            }
587            DefiUnsubscribeCommand::Pool(cmd) => {
588                tracing::info!(
589                    "Processing unsubscribe pool command for {}",
590                    cmd.instrument_id
591                );
592
593                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
594                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
595                        .map_err(|_| {
596                            anyhow::anyhow!("Invalid pool address: {}", cmd.instrument_id)
597                        })?;
598
599                    // Unsubscribe from all pool event types
600                    core_client
601                        .subscription_manager
602                        .unsubscribe_swaps(dex, pool_address);
603                    core_client
604                        .subscription_manager
605                        .unsubscribe_burns(dex, pool_address);
606                    core_client
607                        .subscription_manager
608                        .unsubscribe_mints(dex, pool_address);
609                    core_client
610                        .subscription_manager
611                        .unsubscribe_collects(dex, pool_address);
612                    core_client
613                        .subscription_manager
614                        .unsubscribe_flashes(dex, pool_address);
615
616                    tracing::info!(
617                        "Unsubscribed from all pool events for {} at address {}",
618                        cmd.instrument_id,
619                        pool_address
620                    );
621                } else {
622                    anyhow::bail!(
623                        "Invalid venue {}, expected Blockchain DEX format",
624                        cmd.instrument_id.venue
625                    )
626                }
627
628                Ok(())
629            }
630            DefiUnsubscribeCommand::PoolSwaps(cmd) => {
631                tracing::info!("Processing unsubscribe pool swaps command");
632
633                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
634                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
635                        .map_err(|_| {
636                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
637                        })?;
638                    core_client
639                        .subscription_manager
640                        .unsubscribe_swaps(dex, pool_address);
641                } else {
642                    anyhow::bail!(
643                        "Invalid venue {}, expected Blockchain DEX format",
644                        cmd.instrument_id.venue
645                    )
646                }
647
648                Ok(())
649            }
650            DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
651                tracing::info!(
652                    "Processing unsubscribe pool liquidity updates command for {}",
653                    cmd.instrument_id
654                );
655
656                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
657                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
658                        .map_err(|_| {
659                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
660                        })?;
661                    core_client
662                        .subscription_manager
663                        .unsubscribe_burns(dex, pool_address);
664                    core_client
665                        .subscription_manager
666                        .unsubscribe_mints(dex, pool_address);
667                } else {
668                    anyhow::bail!(
669                        "Invalid venue {}, expected Blockchain DEX format",
670                        cmd.instrument_id.venue
671                    )
672                }
673
674                Ok(())
675            }
676            DefiUnsubscribeCommand::PoolFeeCollects(cmd) => {
677                tracing::info!(
678                    "Processing unsubscribe pool fee collects command for {}",
679                    cmd.instrument_id
680                );
681
682                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
683                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
684                        .map_err(|_| {
685                            anyhow::anyhow!(
686                                "Invalid pool fee collect address: {}",
687                                cmd.instrument_id
688                            )
689                        })?;
690                    core_client
691                        .subscription_manager
692                        .unsubscribe_collects(dex, pool_address);
693                } else {
694                    anyhow::bail!(
695                        "Invalid venue {}, expected Blockchain DEX format",
696                        cmd.instrument_id.venue
697                    )
698                }
699
700                Ok(())
701            }
702            DefiUnsubscribeCommand::PoolFlashEvents(cmd) => {
703                tracing::info!(
704                    "Processing unsubscribe pool flash command for {}",
705                    cmd.instrument_id
706                );
707
708                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
709                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
710                        .map_err(|_| {
711                            anyhow::anyhow!("Invalid pool flash address: {}", cmd.instrument_id)
712                        })?;
713                    core_client
714                        .subscription_manager
715                        .unsubscribe_flashes(dex, pool_address);
716                } else {
717                    anyhow::bail!(
718                        "Invalid venue {}, expected Blockchain DEX format",
719                        cmd.instrument_id.venue
720                    )
721                }
722
723                Ok(())
724            }
725        }
726    }
727
728    /// Processes DeFi request commands to fetch specific blockchain data.
729    async fn handle_request_command(
730        command: DefiRequestCommand,
731        core_client: &mut BlockchainDataClientCore,
732    ) -> anyhow::Result<()> {
733        match command {
734            DefiRequestCommand::PoolSnapshot(cmd) => {
735                tracing::info!("Processing pool snapshot request for {}", cmd.instrument_id);
736
737                let pool_address =
738                    validate_address(cmd.instrument_id.symbol.as_str()).map_err(|e| {
739                        anyhow::anyhow!(
740                            "Invalid pool address '{}' failed with error: {:?}",
741                            cmd.instrument_id,
742                            e
743                        )
744                    })?;
745
746                let pool_identifier =
747                    PoolIdentifier::Address(Ustr::from(&pool_address.to_string()));
748                match core_client.get_pool(&pool_identifier) {
749                    Ok(pool) => {
750                        let pool = pool.clone();
751                        tracing::debug!("Found pool for snapshot request: {}", cmd.instrument_id);
752
753                        // Send the pool definition
754                        let pool_data = DataEvent::DeFi(DefiData::Pool(pool.as_ref().clone()));
755                        core_client.send_data(pool_data);
756
757                        match core_client.bootstrap_latest_pool_profiler(&pool).await {
758                            Ok((profiler, already_valid)) => {
759                                let snapshot = profiler.extract_snapshot();
760
761                                tracing::info!(
762                                    "Saving pool snapshot with {} positions and {} ticks to database...",
763                                    snapshot.positions.len(),
764                                    snapshot.ticks.len()
765                                );
766                                core_client
767                                    .cache
768                                    .add_pool_snapshot(
769                                        &pool.dex.name,
770                                        &pool.pool_identifier,
771                                        &snapshot,
772                                    )
773                                    .await?;
774
775                                // If snapshot is valid, send it back to the data engine.
776                                if core_client
777                                    .check_snapshot_validity(&profiler, already_valid)
778                                    .await?
779                                {
780                                    let snapshot_data =
781                                        DataEvent::DeFi(DefiData::PoolSnapshot(snapshot));
782                                    core_client.send_data(snapshot_data);
783                                }
784                            }
785                            Err(e) => tracing::error!(
786                                "Failed to bootstrap pool profiler for {} and extract snapshot with error {}",
787                                cmd.instrument_id,
788                                e.to_string()
789                            ),
790                        }
791                    }
792                    Err(e) => {
793                        tracing::warn!("Pool {} not found in cache: {e}", cmd.instrument_id);
794                    }
795                }
796
797                Ok(())
798            }
799        }
800    }
801
802    /// Waits for the background processing task to complete.
803    ///
804    /// This method blocks until the spawned process task finishes execution,
805    /// which typically happens after a shutdown signal is sent.
806    pub async fn await_process_task_close(&mut self) {
807        if let Some(handle) = self.process_task.take()
808            && let Err(e) = handle.await
809        {
810            tracing::error!("Process task join error: {e}");
811        }
812    }
813}
814
815#[async_trait::async_trait(?Send)]
816impl DataClient for BlockchainDataClient {
817    fn client_id(&self) -> ClientId {
818        ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
819    }
820
821    fn venue(&self) -> Option<Venue> {
822        // Blockchain data clients don't map to a single venue since they can provide
823        // data for multiple DEXs across the blockchain
824        None
825    }
826
827    fn start(&mut self) -> anyhow::Result<()> {
828        tracing::info!(
829            chain_name = %self.chain.name,
830            dex_ids = ?self.config.dex_ids,
831            use_hypersync_for_live_data = self.config.use_hypersync_for_live_data,
832            http_proxy_url = ?self.config.http_proxy_url,
833            ws_proxy_url = ?self.config.ws_proxy_url,
834            "Starting blockchain data client"
835        );
836        Ok(())
837    }
838
839    fn stop(&mut self) -> anyhow::Result<()> {
840        tracing::info!(
841            "Stopping blockchain data client for '{chain_name}'",
842            chain_name = self.chain.name
843        );
844        self.cancellation_token.cancel();
845
846        // Create fresh token for next start cycle
847        self.cancellation_token = tokio_util::sync::CancellationToken::new();
848        Ok(())
849    }
850
851    fn reset(&mut self) -> anyhow::Result<()> {
852        tracing::info!(
853            "Resetting blockchain data client for '{chain_name}'",
854            chain_name = self.chain.name
855        );
856        self.cancellation_token = tokio_util::sync::CancellationToken::new();
857        Ok(())
858    }
859
860    fn dispose(&mut self) -> anyhow::Result<()> {
861        tracing::info!(
862            "Disposing blockchain data client for '{chain_name}'",
863            chain_name = self.chain.name
864        );
865        Ok(())
866    }
867
868    async fn connect(&mut self) -> anyhow::Result<()> {
869        tracing::info!(
870            "Connecting blockchain data client for '{}'",
871            self.chain.name
872        );
873
874        if self.process_task.is_none() {
875            self.spawn_process_task();
876        }
877
878        Ok(())
879    }
880
881    async fn disconnect(&mut self) -> anyhow::Result<()> {
882        tracing::info!(
883            "Disconnecting blockchain data client for '{}'",
884            self.chain.name
885        );
886
887        self.cancellation_token.cancel();
888        self.await_process_task_close().await;
889
890        // Create fresh token and channels for next connect cycle
891        self.cancellation_token = tokio_util::sync::CancellationToken::new();
892        let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
893        self.hypersync_tx = Some(hypersync_tx);
894        self.hypersync_rx = Some(hypersync_rx);
895        let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
896        self.command_tx = command_tx;
897        self.command_rx = Some(command_rx);
898
899        Ok(())
900    }
901
902    fn is_connected(&self) -> bool {
903        // TODO: Improve connection detection
904        // For now, we'll assume connected if we have either RPC or HyperSync configured
905        true
906    }
907
908    fn is_disconnected(&self) -> bool {
909        !self.is_connected()
910    }
911
912    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
913        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd.clone()));
914        self.command_tx.send(command)?;
915        Ok(())
916    }
917
918    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
919        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd.clone()));
920        self.command_tx.send(command)?;
921        Ok(())
922    }
923
924    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
925        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd.clone()));
926        self.command_tx.send(command)?;
927        Ok(())
928    }
929
930    fn subscribe_pool_liquidity_updates(
931        &mut self,
932        cmd: &SubscribePoolLiquidityUpdates,
933    ) -> anyhow::Result<()> {
934        let command =
935            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
936        self.command_tx.send(command)?;
937        Ok(())
938    }
939
940    fn subscribe_pool_fee_collects(
941        &mut self,
942        cmd: &SubscribePoolFeeCollects,
943    ) -> anyhow::Result<()> {
944        let command =
945            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFeeCollects(cmd.clone()));
946        self.command_tx.send(command)?;
947        Ok(())
948    }
949
950    fn subscribe_pool_flash_events(
951        &mut self,
952        cmd: &SubscribePoolFlashEvents,
953    ) -> anyhow::Result<()> {
954        let command =
955            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFlashEvents(cmd.clone()));
956        self.command_tx.send(command)?;
957        Ok(())
958    }
959
960    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
961        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
962        self.command_tx.send(command)?;
963        Ok(())
964    }
965
966    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
967        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
968        self.command_tx.send(command)?;
969        Ok(())
970    }
971
972    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
973        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
974        self.command_tx.send(command)?;
975        Ok(())
976    }
977
978    fn unsubscribe_pool_liquidity_updates(
979        &mut self,
980        cmd: &UnsubscribePoolLiquidityUpdates,
981    ) -> anyhow::Result<()> {
982        let command =
983            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
984        self.command_tx.send(command)?;
985        Ok(())
986    }
987
988    fn unsubscribe_pool_fee_collects(
989        &mut self,
990        cmd: &UnsubscribePoolFeeCollects,
991    ) -> anyhow::Result<()> {
992        let command =
993            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFeeCollects(cmd.clone()));
994        self.command_tx.send(command)?;
995        Ok(())
996    }
997
998    fn unsubscribe_pool_flash_events(
999        &mut self,
1000        cmd: &UnsubscribePoolFlashEvents,
1001    ) -> anyhow::Result<()> {
1002        let command =
1003            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFlashEvents(cmd.clone()));
1004        self.command_tx.send(command)?;
1005        Ok(())
1006    }
1007
1008    fn request_pool_snapshot(&self, cmd: &RequestPoolSnapshot) -> anyhow::Result<()> {
1009        let command = DefiDataCommand::Request(DefiRequestCommand::PoolSnapshot(cmd.clone()));
1010        self.command_tx.send(command)?;
1011        Ok(())
1012    }
1013}