nautilus_blockchain/data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_common::{
17    clients::DataClient,
18    defi::RequestPoolSnapshot,
19    live::get_runtime,
20    messages::{
21        DataEvent,
22        defi::{
23            DefiDataCommand, DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand,
24            SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
25            SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
26            UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
27            UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
28        },
29    },
30};
31use nautilus_model::{
32    defi::{DefiData, PoolIdentifier, SharedChain, validation::validate_address},
33    identifiers::{ClientId, Venue},
34};
35use ustr::Ustr;
36
37use crate::{
38    config::BlockchainDataClientConfig,
39    data::core::BlockchainDataClientCore,
40    exchanges::get_dex_extended,
41    rpc::{BlockchainRpcClient, types::BlockchainMessage},
42};
43
44/// A comprehensive client for interacting with blockchain data from multiple sources.
45///
46/// The `BlockchainDataClient` serves as a facade that coordinates between different blockchain
47/// data providers, caching mechanisms, and contract interactions. It provides a unified interface
48/// for retrieving and processing blockchain data, particularly focused on DeFi protocols.
49///
50/// This client supports two primary data sources:
51/// 1. Direct RPC connections to blockchain nodes (via WebSocket).
52/// 2. HyperSync API for efficient historical data queries.
53#[derive(Debug)]
54pub struct BlockchainDataClient {
55    /// The blockchain being targeted by this client instance.
56    pub chain: SharedChain,
57    /// Configuration parameters for the blockchain data client.
58    pub config: BlockchainDataClientConfig,
59    /// The core client instance that handles blockchain operations.
60    /// Wrapped in Option to allow moving it into the background processing task.
61    pub core_client: Option<BlockchainDataClientCore>,
62    /// Channel receiver for messages from the HyperSync client.
63    hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
64    /// Channel sender for messages to the HyperSync client.
65    hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
66    /// Channel sender for commands to be processed asynchronously.
67    command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
68    /// Channel receiver for commands to be processed asynchronously.
69    command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
70    /// Background task for processing messages.
71    process_task: Option<tokio::task::JoinHandle<()>>,
72    /// Cancellation token for graceful shutdown of background tasks.
73    cancellation_token: tokio_util::sync::CancellationToken,
74}
75
76impl BlockchainDataClient {
77    /// Creates a new [`BlockchainDataClient`] instance for the specified configuration.
78    #[must_use]
79    pub fn new(config: BlockchainDataClientConfig) -> Self {
80        let chain = config.chain.clone();
81        let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
82        let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
83        Self {
84            chain,
85            core_client: None,
86            config,
87            hypersync_rx: Some(hypersync_rx),
88            hypersync_tx: Some(hypersync_tx),
89            command_tx,
90            command_rx: Some(command_rx),
91            process_task: None,
92            cancellation_token: tokio_util::sync::CancellationToken::new(),
93        }
94    }
95
96    /// Spawns the main processing task that handles commands and blockchain data.
97    ///
98    /// This method creates a background task that:
99    /// 1. Processes subscription/unsubscription commands from the command channel
100    /// 2. Handles incoming blockchain data from HyperSync
101    /// 3. Processes RPC messages if RPC client is configured
102    /// 4. Routes processed data to subscribers
103    fn spawn_process_task(&mut self) {
104        let command_rx = if let Some(r) = self.command_rx.take() {
105            r
106        } else {
107            log::error!("Command receiver already taken, not spawning handler");
108            return;
109        };
110
111        let cancellation_token = self.cancellation_token.clone();
112
113        let data_tx = nautilus_common::live::runner::get_data_event_sender();
114
115        let mut hypersync_rx = self.hypersync_rx.take().unwrap();
116        let hypersync_tx = self.hypersync_tx.take();
117
118        let mut core_client = BlockchainDataClientCore::new(
119            self.config.clone(),
120            hypersync_tx,
121            Some(data_tx),
122            cancellation_token.clone(),
123        );
124
125        let handle = get_runtime().spawn(async move {
126            log::debug!("Started task 'process'");
127
128            if let Err(e) = core_client.connect().await {
129                // TODO: connect() could return more granular error types to distinguish
130                // cancellation from actual failures without string matching
131                if e.to_string().contains("cancelled") || e.to_string().contains("Sync cancelled") {
132                    log::warn!("Blockchain core client connection interrupted: {e}");
133                } else {
134                    log::error!("Failed to connect blockchain core client: {e}");
135                }
136                return;
137            }
138
139            let mut command_rx = command_rx;
140
141            loop {
142                tokio::select! {
143                    () = cancellation_token.cancelled() => {
144                        log::debug!("Received cancellation signal in Blockchain data client process task");
145                        core_client.disconnect().await;
146                        break;
147                    }
148                    command = command_rx.recv() => {
149                        if let Some(cmd) = command {
150                            match cmd {
151                                DefiDataCommand::Subscribe(cmd) => {
152                                    let chain = cmd.blockchain();
153                                    if chain != core_client.chain.name {
154                                        log::error!("Incorrect blockchain for subscribe command: {chain}");
155                                        continue;
156                                    }
157
158                                      if let Err(e) = Self::handle_subscribe_command(cmd, &mut core_client).await{
159                                        log::error!("Error processing subscribe command: {e}");
160                                    }
161                                }
162                                DefiDataCommand::Unsubscribe(cmd) => {
163                                    let chain = cmd.blockchain();
164                                    if chain != core_client.chain.name {
165                                        log::error!("Incorrect blockchain for subscribe command: {chain}");
166                                        continue;
167                                    }
168
169                                    if let Err(e) = Self::handle_unsubscribe_command(cmd, &mut core_client).await{
170                                        log::error!("Error processing subscribe command: {e}");
171                                    }
172                                }
173                                DefiDataCommand::Request(cmd) => {
174                                    if let Err(e) = Self::handle_request_command(cmd, &mut core_client).await {
175                                        log::error!("Error processing request command: {e}");
176                                    }
177                                }
178                            }
179                        } else {
180                            log::debug!("Command channel closed");
181                            break;
182                        }
183                    }
184                    data = hypersync_rx.recv() => {
185                        if let Some(msg) = data {
186                            let data_event = match msg {
187                                BlockchainMessage::Block(block) => {
188                                    // Fetch and process all subscribed events per DEX
189                                    for dex in core_client.cache.get_registered_dexes(){
190                                        let addresses = core_client.subscription_manager.get_subscribed_dex_contract_addresses(&dex);
191                                        if !addresses.is_empty() {
192                                            core_client.hypersync_client.process_block_dex_contract_events(
193                                                &dex,
194                                                block.number,
195                                                addresses,
196                                                core_client.subscription_manager.get_dex_pool_swap_event_signature(&dex).unwrap(),
197                                                core_client.subscription_manager.get_dex_pool_mint_event_signature(&dex).unwrap(),
198                                                core_client.subscription_manager.get_dex_pool_burn_event_signature(&dex).unwrap(),
199                                            );
200                                        }
201                                    }
202
203                                    Some(DataEvent::DeFi(DefiData::Block(block)))
204                                }
205                                BlockchainMessage::SwapEvent(swap_event) => {
206                                    match core_client.get_pool(&swap_event.pool_identifier) {
207                                        Ok(pool) => {
208                                            match core_client.process_pool_swap_event(&swap_event, pool){
209                                                Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
210                                                Err(e) => {
211                                                    log::error!("Error processing pool swap event: {e}");
212                                                    None
213                                                }
214                                            }
215                                        }
216                                        Err(e) => {
217                                            log::error!("Failed to get pool {} with error {:?}", swap_event.pool_identifier, e);
218                                            None
219                                        }
220                                    }
221                                }
222                                BlockchainMessage::BurnEvent(burn_event) => {
223                                    match core_client.get_pool(&burn_event.pool_identifier) {
224                                        Ok(pool) => {
225                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
226                                            match core_client.process_pool_burn_event(
227                                                &burn_event,
228                                                pool,
229                                                dex_extended,
230                                            ){
231                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
232                                                Err(e) => {
233                                                    log::error!("Error processing pool burn event: {e}");
234                                                    None
235                                                }
236                                            }
237                                        }
238                                        Err(e) => {
239                                            log::error!("Failed to get pool {} with error {:?}", burn_event.pool_identifier, e);
240                                            None
241                                        }
242                                    }
243                                }
244                                BlockchainMessage::MintEvent(mint_event) => {
245                                    match core_client.get_pool(&mint_event.pool_identifier) {
246                                        Ok(pool) => {
247                                            let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
248                                            match core_client.process_pool_mint_event(
249                                                &mint_event,
250                                                pool,
251                                                dex_extended,
252                                            ){
253                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
254                                                Err(e) => {
255                                                    log::error!("Error processing pool mint event: {e}");
256                                                    None
257                                                }
258                                            }
259                                        }
260                                        Err(e) => {
261                                            log::error!("Failed to get pool {} with error {:?}", mint_event.pool_identifier, e);
262                                            None
263                                        }
264                                    }
265                                }
266                                BlockchainMessage::CollectEvent(collect_event) => {
267                                    match core_client.get_pool(&collect_event.pool_identifier) {
268                                        Ok(pool) => {
269                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
270                                            match core_client.process_pool_collect_event(
271                                                &collect_event,
272                                                pool,
273                                                dex_extended,
274                                            ){
275                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolFeeCollect(update))),
276                                                Err(e) => {
277                                                    log::error!("Error processing pool collect event: {e}");
278                                                    None
279                                                }
280                                            }
281                                        }
282                                        Err(e) => {
283                                            log::error!("Failed to get pool {} with error {:?}", collect_event.pool_identifier, e);
284                                            None
285                                        }
286                                    }
287                                }
288                            BlockchainMessage::FlashEvent(flash_event) => {
289                                    match core_client.get_pool(&flash_event.pool_identifier) {
290                                        Ok(pool) => {
291                                            match core_client.process_pool_flash_event(&flash_event,pool){
292                                                Ok(flash) => Some(DataEvent::DeFi(DefiData::PoolFlash(flash))),
293                                                Err(e) => {
294                                                    log::error!("Error processing pool flash event: {e}");
295                                                    None
296                                                }
297                                            }
298                                        }
299                                        Err(e) => {
300                                            log::error!("Failed to get pool {} with error {:?}", flash_event.pool_identifier, e);
301                                            None
302                                        }
303                                    }
304                                }
305                            };
306
307                            if let Some(event) = data_event {
308                                core_client.send_data(event);
309                            }
310                        } else {
311                            log::debug!("HyperSync data channel closed");
312                            break;
313                        }
314                    }
315                    msg = async {
316                        match core_client.rpc_client {
317                            Some(ref mut rpc_client) => rpc_client.next_rpc_message().await,
318                            None => std::future::pending().await,  // Never resolves
319                        }
320                    } => {
321                        // This branch only fires when we actually receive a message
322                        match msg {
323                            Ok(BlockchainMessage::Block(block)) => {
324                                let data = DataEvent::DeFi(DefiData::Block(block));
325                                core_client.send_data(data);
326                            },
327                            Ok(BlockchainMessage::SwapEvent(_)) => {
328                                log::warn!("RPC swap events are not yet supported");
329                            }
330                            Ok(BlockchainMessage::MintEvent(_)) => {
331                                log::warn!("RPC mint events are not yet supported");
332                            }
333                            Ok(BlockchainMessage::BurnEvent(_)) => {
334                                log::warn!("RPC burn events are not yet supported");
335                            }
336                            Ok(BlockchainMessage::CollectEvent(_)) => {
337                                log::warn!("RPC collect events are not yet supported");
338                            }
339                            Ok(BlockchainMessage::FlashEvent(_)) => {
340                                log::warn!("RPC flash events are not yet supported");
341                            }
342                            Err(e) => {
343                                log::error!("Error processing RPC message: {e}");
344                            }
345                        }
346                    }
347                }
348            }
349
350            log::debug!("Stopped task 'process'");
351        });
352
353        self.process_task = Some(handle);
354    }
355
356    /// Processes DeFi subscription commands to start receiving specific blockchain data.
357    async fn handle_subscribe_command(
358        command: DefiSubscribeCommand,
359        core_client: &mut BlockchainDataClientCore,
360    ) -> anyhow::Result<()> {
361        match command {
362            DefiSubscribeCommand::Blocks(_cmd) => {
363                log::info!("Processing subscribe blocks command");
364
365                // Try RPC client first if available, otherwise use HyperSync
366                if let Some(ref mut rpc) = core_client.rpc_client {
367                    if let Err(e) = rpc.subscribe_blocks().await {
368                        log::warn!(
369                            "RPC blocks subscription failed: {e}, falling back to HyperSync"
370                        );
371                        core_client.hypersync_client.subscribe_blocks();
372                        tokio::task::yield_now().await;
373                    } else {
374                        log::info!("Successfully subscribed to blocks via RPC");
375                    }
376                } else {
377                    log::info!("Subscribing to blocks via HyperSync");
378                    core_client.hypersync_client.subscribe_blocks();
379                    tokio::task::yield_now().await;
380                }
381
382                Ok(())
383            }
384            DefiSubscribeCommand::Pool(cmd) => {
385                log::info!(
386                    "Processing subscribe pool command for {}",
387                    cmd.instrument_id
388                );
389
390                if let Some(ref mut _rpc) = core_client.rpc_client {
391                    log::warn!("RPC pool subscription not yet implemented, using HyperSync");
392                }
393
394                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
395                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
396                        .map_err(|e| {
397                            anyhow::anyhow!(
398                                "Invalid pool address '{}' failed with error: {:?}",
399                                cmd.instrument_id,
400                                e
401                            )
402                        })?;
403
404                    // Subscribe to all pool event types
405                    core_client
406                        .subscription_manager
407                        .subscribe_swaps(dex, pool_address);
408                    core_client
409                        .subscription_manager
410                        .subscribe_burns(dex, pool_address);
411                    core_client
412                        .subscription_manager
413                        .subscribe_mints(dex, pool_address);
414                    core_client
415                        .subscription_manager
416                        .subscribe_collects(dex, pool_address);
417                    core_client
418                        .subscription_manager
419                        .subscribe_flashes(dex, pool_address);
420
421                    log::info!(
422                        "Subscribed to all pool events for {} at address {}",
423                        cmd.instrument_id,
424                        pool_address
425                    );
426                } else {
427                    anyhow::bail!(
428                        "Invalid venue {}, expected Blockchain DEX format",
429                        cmd.instrument_id.venue
430                    )
431                }
432
433                Ok(())
434            }
435            DefiSubscribeCommand::PoolSwaps(cmd) => {
436                log::info!(
437                    "Processing subscribe pool swaps command for {}",
438                    cmd.instrument_id
439                );
440
441                if let Some(ref mut _rpc) = core_client.rpc_client {
442                    log::warn!("RPC pool swaps subscription not yet implemented, using HyperSync");
443                }
444
445                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
446                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
447                        .map_err(|e| {
448                            anyhow::anyhow!(
449                                "Invalid pool swap address '{}' failed with error: {:?}",
450                                cmd.instrument_id,
451                                e
452                            )
453                        })?;
454                    core_client
455                        .subscription_manager
456                        .subscribe_swaps(dex, pool_address);
457                } else {
458                    anyhow::bail!(
459                        "Invalid venue {}, expected Blockchain DEX format",
460                        cmd.instrument_id.venue
461                    )
462                }
463
464                Ok(())
465            }
466            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
467                log::info!(
468                    "Processing subscribe pool liquidity updates command for address: {}",
469                    cmd.instrument_id
470                );
471
472                if let Some(ref mut _rpc) = core_client.rpc_client {
473                    log::warn!(
474                        "RPC pool liquidity updates subscription not yet implemented, using HyperSync"
475                    );
476                }
477
478                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
479                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
480                        .map_err(|_| {
481                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
482                        })?;
483                    core_client
484                        .subscription_manager
485                        .subscribe_burns(dex, pool_address);
486                    core_client
487                        .subscription_manager
488                        .subscribe_mints(dex, pool_address);
489                } else {
490                    anyhow::bail!(
491                        "Invalid venue {}, expected Blockchain DEX format",
492                        cmd.instrument_id.venue
493                    )
494                }
495
496                Ok(())
497            }
498            DefiSubscribeCommand::PoolFeeCollects(cmd) => {
499                log::info!(
500                    "Processing subscribe pool fee collects command for address: {}",
501                    cmd.instrument_id
502                );
503
504                if let Some(ref mut _rpc) = core_client.rpc_client {
505                    log::warn!(
506                        "RPC pool fee collects subscription not yet implemented, using HyperSync"
507                    );
508                }
509
510                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
511                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
512                        .map_err(|_| {
513                            anyhow::anyhow!(
514                                "Invalid pool fee collect address: {}",
515                                cmd.instrument_id
516                            )
517                        })?;
518                    core_client
519                        .subscription_manager
520                        .subscribe_collects(dex, pool_address);
521                } else {
522                    anyhow::bail!(
523                        "Invalid venue {}, expected Blockchain DEX format",
524                        cmd.instrument_id.venue
525                    )
526                }
527
528                Ok(())
529            }
530            DefiSubscribeCommand::PoolFlashEvents(cmd) => {
531                log::info!(
532                    "Processing subscribe pool flash command for address: {}",
533                    cmd.instrument_id
534                );
535
536                if let Some(ref mut _rpc) = core_client.rpc_client {
537                    log::warn!(
538                        "RPC pool fee collects subscription not yet implemented, using HyperSync"
539                    );
540                }
541
542                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
543                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
544                        .map_err(|_| {
545                            anyhow::anyhow!(
546                                "Invalid pool flash subscribe address: {}",
547                                cmd.instrument_id
548                            )
549                        })?;
550                    core_client
551                        .subscription_manager
552                        .subscribe_flashes(dex, pool_address);
553                } else {
554                    anyhow::bail!(
555                        "Invalid venue {}, expected Blockchain DEX format",
556                        cmd.instrument_id.venue
557                    )
558                }
559
560                Ok(())
561            }
562        }
563    }
564
565    /// Processes DeFi unsubscription commands to stop receiving specific blockchain data.
566    async fn handle_unsubscribe_command(
567        command: DefiUnsubscribeCommand,
568        core_client: &mut BlockchainDataClientCore,
569    ) -> anyhow::Result<()> {
570        match command {
571            DefiUnsubscribeCommand::Blocks(_cmd) => {
572                log::info!("Processing unsubscribe blocks command");
573
574                // TODO: Implement RPC unsubscription when available
575                if core_client.rpc_client.is_some() {
576                    log::warn!("RPC blocks unsubscription not yet implemented");
577                }
578
579                // Use HyperSync client for unsubscription
580                core_client.hypersync_client.unsubscribe_blocks().await;
581                log::info!("Unsubscribed from blocks via HyperSync");
582
583                Ok(())
584            }
585            DefiUnsubscribeCommand::Pool(cmd) => {
586                log::info!(
587                    "Processing unsubscribe pool command for {}",
588                    cmd.instrument_id
589                );
590
591                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
592                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
593                        .map_err(|_| {
594                            anyhow::anyhow!("Invalid pool address: {}", cmd.instrument_id)
595                        })?;
596
597                    // Unsubscribe from all pool event types
598                    core_client
599                        .subscription_manager
600                        .unsubscribe_swaps(dex, pool_address);
601                    core_client
602                        .subscription_manager
603                        .unsubscribe_burns(dex, pool_address);
604                    core_client
605                        .subscription_manager
606                        .unsubscribe_mints(dex, pool_address);
607                    core_client
608                        .subscription_manager
609                        .unsubscribe_collects(dex, pool_address);
610                    core_client
611                        .subscription_manager
612                        .unsubscribe_flashes(dex, pool_address);
613
614                    log::info!(
615                        "Unsubscribed from all pool events for {} at address {}",
616                        cmd.instrument_id,
617                        pool_address
618                    );
619                } else {
620                    anyhow::bail!(
621                        "Invalid venue {}, expected Blockchain DEX format",
622                        cmd.instrument_id.venue
623                    )
624                }
625
626                Ok(())
627            }
628            DefiUnsubscribeCommand::PoolSwaps(cmd) => {
629                log::info!("Processing unsubscribe pool swaps command");
630
631                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
632                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
633                        .map_err(|_| {
634                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
635                        })?;
636                    core_client
637                        .subscription_manager
638                        .unsubscribe_swaps(dex, pool_address);
639                } else {
640                    anyhow::bail!(
641                        "Invalid venue {}, expected Blockchain DEX format",
642                        cmd.instrument_id.venue
643                    )
644                }
645
646                Ok(())
647            }
648            DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
649                log::info!(
650                    "Processing unsubscribe pool liquidity updates command for {}",
651                    cmd.instrument_id
652                );
653
654                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
655                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
656                        .map_err(|_| {
657                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
658                        })?;
659                    core_client
660                        .subscription_manager
661                        .unsubscribe_burns(dex, pool_address);
662                    core_client
663                        .subscription_manager
664                        .unsubscribe_mints(dex, pool_address);
665                } else {
666                    anyhow::bail!(
667                        "Invalid venue {}, expected Blockchain DEX format",
668                        cmd.instrument_id.venue
669                    )
670                }
671
672                Ok(())
673            }
674            DefiUnsubscribeCommand::PoolFeeCollects(cmd) => {
675                log::info!(
676                    "Processing unsubscribe pool fee collects command for {}",
677                    cmd.instrument_id
678                );
679
680                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
681                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
682                        .map_err(|_| {
683                            anyhow::anyhow!(
684                                "Invalid pool fee collect address: {}",
685                                cmd.instrument_id
686                            )
687                        })?;
688                    core_client
689                        .subscription_manager
690                        .unsubscribe_collects(dex, pool_address);
691                } else {
692                    anyhow::bail!(
693                        "Invalid venue {}, expected Blockchain DEX format",
694                        cmd.instrument_id.venue
695                    )
696                }
697
698                Ok(())
699            }
700            DefiUnsubscribeCommand::PoolFlashEvents(cmd) => {
701                log::info!(
702                    "Processing unsubscribe pool flash command for {}",
703                    cmd.instrument_id
704                );
705
706                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
707                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
708                        .map_err(|_| {
709                            anyhow::anyhow!("Invalid pool flash address: {}", cmd.instrument_id)
710                        })?;
711                    core_client
712                        .subscription_manager
713                        .unsubscribe_flashes(dex, pool_address);
714                } else {
715                    anyhow::bail!(
716                        "Invalid venue {}, expected Blockchain DEX format",
717                        cmd.instrument_id.venue
718                    )
719                }
720
721                Ok(())
722            }
723        }
724    }
725
726    /// Processes DeFi request commands to fetch specific blockchain data.
727    async fn handle_request_command(
728        command: DefiRequestCommand,
729        core_client: &mut BlockchainDataClientCore,
730    ) -> anyhow::Result<()> {
731        match command {
732            DefiRequestCommand::PoolSnapshot(cmd) => {
733                log::info!("Processing pool snapshot request for {}", cmd.instrument_id);
734
735                let pool_address =
736                    validate_address(cmd.instrument_id.symbol.as_str()).map_err(|e| {
737                        anyhow::anyhow!(
738                            "Invalid pool address '{}' failed with error: {:?}",
739                            cmd.instrument_id,
740                            e
741                        )
742                    })?;
743
744                let pool_identifier =
745                    PoolIdentifier::Address(Ustr::from(&pool_address.to_string()));
746                match core_client.get_pool(&pool_identifier) {
747                    Ok(pool) => {
748                        let pool = pool.clone();
749                        log::debug!("Found pool for snapshot request: {}", cmd.instrument_id);
750
751                        // Send the pool definition
752                        let pool_data = DataEvent::DeFi(DefiData::Pool(pool.as_ref().clone()));
753                        core_client.send_data(pool_data);
754
755                        match core_client.bootstrap_latest_pool_profiler(&pool).await {
756                            Ok((profiler, already_valid)) => {
757                                let snapshot = profiler.extract_snapshot();
758
759                                log::info!(
760                                    "Saving pool snapshot with {} positions and {} ticks to database...",
761                                    snapshot.positions.len(),
762                                    snapshot.ticks.len()
763                                );
764                                core_client
765                                    .cache
766                                    .add_pool_snapshot(
767                                        &pool.dex.name,
768                                        &pool.pool_identifier,
769                                        &snapshot,
770                                    )
771                                    .await?;
772
773                                // If snapshot is valid, send it back to the data engine.
774                                if core_client
775                                    .check_snapshot_validity(&profiler, already_valid)
776                                    .await?
777                                {
778                                    let snapshot_data =
779                                        DataEvent::DeFi(DefiData::PoolSnapshot(snapshot));
780                                    core_client.send_data(snapshot_data);
781                                }
782                            }
783                            Err(e) => log::error!(
784                                "Failed to bootstrap pool profiler for {} and extract snapshot with error {e}",
785                                cmd.instrument_id
786                            ),
787                        }
788                    }
789                    Err(e) => {
790                        log::warn!("Pool {} not found in cache: {e}", cmd.instrument_id);
791                    }
792                }
793
794                Ok(())
795            }
796        }
797    }
798
799    /// Waits for the background processing task to complete.
800    ///
801    /// This method blocks until the spawned process task finishes execution,
802    /// which typically happens after a shutdown signal is sent.
803    pub async fn await_process_task_close(&mut self) {
804        if let Some(handle) = self.process_task.take()
805            && let Err(e) = handle.await
806        {
807            log::error!("Process task join error: {e}");
808        }
809    }
810}
811
812#[async_trait::async_trait(?Send)]
813impl DataClient for BlockchainDataClient {
814    fn client_id(&self) -> ClientId {
815        ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
816    }
817
818    fn venue(&self) -> Option<Venue> {
819        // Blockchain data clients don't map to a single venue since they can provide
820        // data for multiple DEXs across the blockchain
821        None
822    }
823
824    fn start(&mut self) -> anyhow::Result<()> {
825        log::info!(
826            "Starting blockchain data client: chain_name={}, dex_ids={:?}, use_hypersync_for_live_data={}, http_proxy_url={:?}, ws_proxy_url={:?}",
827            self.chain.name,
828            self.config.dex_ids,
829            self.config.use_hypersync_for_live_data,
830            self.config.http_proxy_url,
831            self.config.ws_proxy_url
832        );
833        Ok(())
834    }
835
836    fn stop(&mut self) -> anyhow::Result<()> {
837        log::info!(
838            "Stopping blockchain data client for '{chain_name}'",
839            chain_name = self.chain.name
840        );
841        self.cancellation_token.cancel();
842
843        // Create fresh token for next start cycle
844        self.cancellation_token = tokio_util::sync::CancellationToken::new();
845        Ok(())
846    }
847
848    fn reset(&mut self) -> anyhow::Result<()> {
849        log::info!(
850            "Resetting blockchain data client for '{chain_name}'",
851            chain_name = self.chain.name
852        );
853        self.cancellation_token = tokio_util::sync::CancellationToken::new();
854        Ok(())
855    }
856
857    fn dispose(&mut self) -> anyhow::Result<()> {
858        log::info!(
859            "Disposing blockchain data client for '{chain_name}'",
860            chain_name = self.chain.name
861        );
862        Ok(())
863    }
864
865    async fn connect(&mut self) -> anyhow::Result<()> {
866        log::info!(
867            "Connecting blockchain data client for '{}'",
868            self.chain.name
869        );
870
871        if self.process_task.is_none() {
872            self.spawn_process_task();
873        }
874
875        Ok(())
876    }
877
878    async fn disconnect(&mut self) -> anyhow::Result<()> {
879        log::info!(
880            "Disconnecting blockchain data client for '{}'",
881            self.chain.name
882        );
883
884        self.cancellation_token.cancel();
885        self.await_process_task_close().await;
886
887        // Create fresh token and channels for next connect cycle
888        self.cancellation_token = tokio_util::sync::CancellationToken::new();
889        let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
890        self.hypersync_tx = Some(hypersync_tx);
891        self.hypersync_rx = Some(hypersync_rx);
892        let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
893        self.command_tx = command_tx;
894        self.command_rx = Some(command_rx);
895
896        Ok(())
897    }
898
899    fn is_connected(&self) -> bool {
900        // TODO: Improve connection detection
901        // For now, we'll assume connected if we have either RPC or HyperSync configured
902        true
903    }
904
905    fn is_disconnected(&self) -> bool {
906        !self.is_connected()
907    }
908
909    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
910        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd.clone()));
911        self.command_tx.send(command)?;
912        Ok(())
913    }
914
915    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
916        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd.clone()));
917        self.command_tx.send(command)?;
918        Ok(())
919    }
920
921    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
922        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd.clone()));
923        self.command_tx.send(command)?;
924        Ok(())
925    }
926
927    fn subscribe_pool_liquidity_updates(
928        &mut self,
929        cmd: &SubscribePoolLiquidityUpdates,
930    ) -> anyhow::Result<()> {
931        let command =
932            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
933        self.command_tx.send(command)?;
934        Ok(())
935    }
936
937    fn subscribe_pool_fee_collects(
938        &mut self,
939        cmd: &SubscribePoolFeeCollects,
940    ) -> anyhow::Result<()> {
941        let command =
942            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFeeCollects(cmd.clone()));
943        self.command_tx.send(command)?;
944        Ok(())
945    }
946
947    fn subscribe_pool_flash_events(
948        &mut self,
949        cmd: &SubscribePoolFlashEvents,
950    ) -> anyhow::Result<()> {
951        let command =
952            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFlashEvents(cmd.clone()));
953        self.command_tx.send(command)?;
954        Ok(())
955    }
956
957    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
958        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
959        self.command_tx.send(command)?;
960        Ok(())
961    }
962
963    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
964        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
965        self.command_tx.send(command)?;
966        Ok(())
967    }
968
969    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
970        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
971        self.command_tx.send(command)?;
972        Ok(())
973    }
974
975    fn unsubscribe_pool_liquidity_updates(
976        &mut self,
977        cmd: &UnsubscribePoolLiquidityUpdates,
978    ) -> anyhow::Result<()> {
979        let command =
980            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
981        self.command_tx.send(command)?;
982        Ok(())
983    }
984
985    fn unsubscribe_pool_fee_collects(
986        &mut self,
987        cmd: &UnsubscribePoolFeeCollects,
988    ) -> anyhow::Result<()> {
989        let command =
990            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFeeCollects(cmd.clone()));
991        self.command_tx.send(command)?;
992        Ok(())
993    }
994
995    fn unsubscribe_pool_flash_events(
996        &mut self,
997        cmd: &UnsubscribePoolFlashEvents,
998    ) -> anyhow::Result<()> {
999        let command =
1000            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFlashEvents(cmd.clone()));
1001        self.command_tx.send(command)?;
1002        Ok(())
1003    }
1004
1005    fn request_pool_snapshot(&self, cmd: &RequestPoolSnapshot) -> anyhow::Result<()> {
1006        let command = DefiDataCommand::Request(DefiRequestCommand::PoolSnapshot(cmd.clone()));
1007        self.command_tx.send(command)?;
1008        Ok(())
1009    }
1010}