nautilus_blockchain/data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_common::{
17    messages::{
18        DataEvent,
19        defi::{
20            DefiDataCommand, DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand,
21            SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
22            SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
23            UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
24            UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
25        },
26    },
27    runtime::get_runtime,
28};
29use nautilus_data::client::DataClient;
30use nautilus_model::{
31    defi::{DefiData, SharedChain, validation::validate_address},
32    identifiers::{ClientId, Venue},
33};
34
35use crate::{
36    config::BlockchainDataClientConfig,
37    data::core::BlockchainDataClientCore,
38    exchanges::get_dex_extended,
39    rpc::{BlockchainRpcClient, types::BlockchainMessage},
40};
41
42/// A comprehensive client for interacting with blockchain data from multiple sources.
43///
44/// The `BlockchainDataClient` serves as a facade that coordinates between different blockchain
45/// data providers, caching mechanisms, and contract interactions. It provides a unified interface
46/// for retrieving and processing blockchain data, particularly focused on DeFi protocols.
47///
48/// This client supports two primary data sources:
49/// 1. Direct RPC connections to blockchain nodes (via WebSocket).
50/// 2. HyperSync API for efficient historical data queries.
51#[derive(Debug)]
52pub struct BlockchainDataClient {
53    /// The blockchain being targeted by this client instance.
54    pub chain: SharedChain,
55    /// Configuration parameters for the blockchain data client.
56    pub config: BlockchainDataClientConfig,
57    /// The core client instance that handles blockchain operations.
58    /// Wrapped in Option to allow moving it into the background processing task.
59    pub core_client: Option<BlockchainDataClientCore>,
60    /// Channel receiver for messages from the HyperSync client.
61    hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
62    /// Channel sender for messages to the HyperSync client.
63    hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
64    /// Channel sender for commands to be processed asynchronously.
65    command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
66    /// Channel receiver for commands to be processed asynchronously.
67    command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
68    /// Background task for processing messages.
69    process_task: Option<tokio::task::JoinHandle<()>>,
70    /// Cancellation token for graceful shutdown of background tasks.
71    cancellation_token: tokio_util::sync::CancellationToken,
72}
73
74impl BlockchainDataClient {
75    /// Creates a new [`BlockchainDataClient`] instance for the specified configuration.
76    #[must_use]
77    pub fn new(config: BlockchainDataClientConfig) -> Self {
78        let chain = config.chain.clone();
79        let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
80        let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
81        Self {
82            chain,
83            core_client: None,
84            config,
85            hypersync_rx: Some(hypersync_rx),
86            hypersync_tx: Some(hypersync_tx),
87            command_tx,
88            command_rx: Some(command_rx),
89            process_task: None,
90            cancellation_token: tokio_util::sync::CancellationToken::new(),
91        }
92    }
93
94    /// Spawns the main processing task that handles commands and blockchain data.
95    ///
96    /// This method creates a background task that:
97    /// 1. Processes subscription/unsubscription commands from the command channel
98    /// 2. Handles incoming blockchain data from HyperSync
99    /// 3. Processes RPC messages if RPC client is configured
100    /// 4. Routes processed data to subscribers
101    fn spawn_process_task(&mut self) {
102        let command_rx = if let Some(r) = self.command_rx.take() {
103            r
104        } else {
105            tracing::error!("Command receiver already taken, not spawning handler");
106            return;
107        };
108
109        let cancellation_token = self.cancellation_token.clone();
110
111        let data_tx = nautilus_common::runner::get_data_event_sender();
112
113        let mut hypersync_rx = self.hypersync_rx.take().unwrap();
114        let hypersync_tx = self.hypersync_tx.take();
115
116        let mut core_client = BlockchainDataClientCore::new(
117            self.config.clone(),
118            hypersync_tx,
119            Some(data_tx),
120            cancellation_token.clone(),
121        );
122
123        let handle = get_runtime().spawn(async move {
124            tracing::debug!("Started task 'process'");
125
126            if let Err(e) = core_client.connect().await {
127                // TODO: connect() could return more granular error types to distinguish
128                // cancellation from actual failures without string matching
129                if e.to_string().contains("cancelled") || e.to_string().contains("Sync cancelled") {
130                    tracing::warn!("Blockchain core client connection interrupted: {e}");
131                } else {
132                    tracing::error!("Failed to connect blockchain core client: {e}");
133                }
134                return;
135            }
136
137            let mut command_rx = command_rx;
138
139            loop {
140                tokio::select! {
141                    () = cancellation_token.cancelled() => {
142                        tracing::debug!("Received cancellation signal in Blockchain data client process task");
143                        core_client.disconnect().await;
144                        break;
145                    }
146                    command = command_rx.recv() => {
147                        if let Some(cmd) = command {
148                            match cmd {
149                                DefiDataCommand::Subscribe(cmd) => {
150                                    let chain = cmd.blockchain();
151                                    if chain != core_client.chain.name {
152                                        tracing::error!("Incorrect blockchain for subscribe command: {chain}");
153                                        continue;
154                                    }
155
156                                      if let Err(e) = Self::handle_subscribe_command(cmd, &mut core_client).await{
157                                        tracing::error!("Error processing subscribe command: {e}");
158                                    }
159                                }
160                                DefiDataCommand::Unsubscribe(cmd) => {
161                                    let chain = cmd.blockchain();
162                                    if chain != core_client.chain.name {
163                                        tracing::error!("Incorrect blockchain for subscribe command: {chain}");
164                                        continue;
165                                    }
166
167                                    if let Err(e) = Self::handle_unsubscribe_command(cmd, &mut core_client).await{
168                                        tracing::error!("Error processing subscribe command: {e}");
169                                    }
170                                }
171                                DefiDataCommand::Request(cmd) => {
172                                    if let Err(e) = Self::handle_request_command(cmd, &mut core_client).await {
173                                        tracing::error!("Error processing request command: {e}");
174                                    }
175                                }
176                            }
177                        } else {
178                            tracing::debug!("Command channel closed");
179                            break;
180                        }
181                    }
182                    data = hypersync_rx.recv() => {
183                        if let Some(msg) = data {
184                            let data_event = match msg {
185                                BlockchainMessage::Block(block) => {
186                                    // Fetch and process all subscribed events per DEX
187                                    for dex in core_client.cache.get_registered_dexes(){
188                                        let addresses = core_client.subscription_manager.get_subscribed_dex_contract_addresses(&dex);
189                                        if !addresses.is_empty() {
190                                            core_client.hypersync_client.process_block_dex_contract_events(
191                                                &dex,
192                                                block.number,
193                                                addresses,
194                                                core_client.subscription_manager.get_dex_pool_swap_event_signature(&dex).unwrap(),
195                                                core_client.subscription_manager.get_dex_pool_mint_event_signature(&dex).unwrap(),
196                                                core_client.subscription_manager.get_dex_pool_burn_event_signature(&dex).unwrap(),
197                                            );
198                                        }
199                                    }
200
201                                    Some(DataEvent::DeFi(DefiData::Block(block)))
202                                }
203                                BlockchainMessage::SwapEvent(swap_event) => {
204                                    match core_client.get_pool(&swap_event.pool_address) {
205                                        Ok(pool) => {
206                                            match core_client.process_pool_swap_event(&swap_event, pool){
207                                                Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
208                                                Err(e) => {
209                                                    tracing::error!("Error processing pool swap event: {e}");
210                                                    None
211                                                }
212                                            }
213                                        }
214                                        Err(e) => {
215                                            tracing::error!("Failed to get pool {} with error {:?}", swap_event.pool_address, e);
216                                            None
217                                        }
218                                    }
219                                }
220                                BlockchainMessage::BurnEvent(burn_event) => {
221                                    match core_client.get_pool(&burn_event.pool_address) {
222                                        Ok(pool) => {
223                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
224                                            match core_client.process_pool_burn_event(
225                                                &burn_event,
226                                                pool,
227                                                dex_extended,
228                                            ){
229                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
230                                                Err(e) => {
231                                                    tracing::error!("Error processing pool burn event: {e}");
232                                                    None
233                                                }
234                                            }
235                                        }
236                                        Err(e) => {
237                                            tracing::error!("Failed to get pool {} with error {:?}", burn_event.pool_address, e);
238                                            None
239                                        }
240                                    }
241                                }
242                                BlockchainMessage::MintEvent(mint_event) => {
243                                    match core_client.get_pool(&mint_event.pool_address) {
244                                        Ok(pool) => {
245                                            let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
246                                            match core_client.process_pool_mint_event(
247                                                &mint_event,
248                                                pool,
249                                                dex_extended,
250                                            ){
251                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
252                                                Err(e) => {
253                                                    tracing::error!("Error processing pool mint event: {e}");
254                                                    None
255                                                }
256                                            }
257                                        }
258                                        Err(e) => {
259                                            tracing::error!("Failed to get pool {} with error {:?}", mint_event.pool_address, e);
260                                            None
261                                        }
262                                    }
263                                }
264                                BlockchainMessage::CollectEvent(collect_event) => {
265                                    match core_client.get_pool(&collect_event.pool_address) {
266                                        Ok(pool) => {
267                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
268                                            match core_client.process_pool_collect_event(
269                                                &collect_event,
270                                                pool,
271                                                dex_extended,
272                                            ){
273                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolFeeCollect(update))),
274                                                Err(e) => {
275                                                    tracing::error!("Error processing pool collect event: {e}");
276                                                    None
277                                                }
278                                            }
279                                        }
280                                        Err(e) => {
281                                            tracing::error!("Failed to get pool {} with error {:?}", collect_event.pool_address, e);
282                                            None
283                                        }
284                                    }
285                                }
286                            BlockchainMessage::FlashEvent(flash_event) => {
287                                    match core_client.get_pool(&flash_event.pool_address) {
288                                        Ok(pool) => {
289                                            match core_client.process_pool_flash_event(&flash_event,pool){
290                                                Ok(flash) => Some(DataEvent::DeFi(DefiData::PoolFlash(flash))),
291                                                Err(e) => {
292                                                    tracing::error!("Error processing pool flash event: {e}");
293                                                    None
294                                                }
295                                            }
296                                        }
297                                        Err(e) => {
298                                            tracing::error!("Failed to get pool {} with error {:?}", flash_event.pool_address, e);
299                                            None
300                                        }
301                                    }
302                                }
303                            };
304
305                            if let Some(event) = data_event {
306                                core_client.send_data(event);
307                            }
308                        } else {
309                            tracing::debug!("HyperSync data channel closed");
310                            break;
311                        }
312                    }
313                    msg = async {
314                        match core_client.rpc_client {
315                            Some(ref mut rpc_client) => rpc_client.next_rpc_message().await,
316                            None => std::future::pending().await,  // Never resolves
317                        }
318                    } => {
319                        // This branch only fires when we actually receive a message
320                        match msg {
321                            Ok(BlockchainMessage::Block(block)) => {
322                                let data = DataEvent::DeFi(DefiData::Block(block));
323                                core_client.send_data(data);
324                            },
325                            Ok(BlockchainMessage::SwapEvent(_)) => {
326                                tracing::warn!("RPC swap events are not yet supported");
327                            }
328                            Ok(BlockchainMessage::MintEvent(_)) => {
329                                tracing::warn!("RPC mint events are not yet supported");
330                            }
331                            Ok(BlockchainMessage::BurnEvent(_)) => {
332                                tracing::warn!("RPC burn events are not yet supported");
333                            }
334                            Ok(BlockchainMessage::CollectEvent(_)) => {
335                                tracing::warn!("RPC collect events are not yet supported");
336                            }
337                            Ok(BlockchainMessage::FlashEvent(_)) => {
338                                tracing::warn!("RPC flash events are not yet supported");
339                            }
340                            Err(e) => {
341                                tracing::error!("Error processing RPC message: {e}");
342                            }
343                        }
344                    }
345                }
346            }
347
348            tracing::debug!("Stopped task 'process'");
349        });
350
351        self.process_task = Some(handle);
352    }
353
354    /// Processes DeFi subscription commands to start receiving specific blockchain data.
355    async fn handle_subscribe_command(
356        command: DefiSubscribeCommand,
357        core_client: &mut BlockchainDataClientCore,
358    ) -> anyhow::Result<()> {
359        match command {
360            DefiSubscribeCommand::Blocks(_cmd) => {
361                tracing::info!("Processing subscribe blocks command");
362
363                // Try RPC client first if available, otherwise use HyperSync
364                if let Some(ref mut rpc) = core_client.rpc_client {
365                    if let Err(e) = rpc.subscribe_blocks().await {
366                        tracing::warn!(
367                            "RPC blocks subscription failed: {e}, falling back to HyperSync"
368                        );
369                        core_client.hypersync_client.subscribe_blocks();
370                        tokio::task::yield_now().await;
371                    } else {
372                        tracing::info!("Successfully subscribed to blocks via RPC");
373                    }
374                } else {
375                    tracing::info!("Subscribing to blocks via HyperSync");
376                    core_client.hypersync_client.subscribe_blocks();
377                    tokio::task::yield_now().await;
378                }
379
380                Ok(())
381            }
382            DefiSubscribeCommand::Pool(cmd) => {
383                tracing::info!(
384                    "Processing subscribe pool command for {}",
385                    cmd.instrument_id
386                );
387
388                if let Some(ref mut _rpc) = core_client.rpc_client {
389                    tracing::warn!("RPC pool subscription not yet implemented, using HyperSync");
390                }
391
392                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
393                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
394                        .map_err(|e| {
395                            anyhow::anyhow!(
396                                "Invalid pool address '{}' failed with error: {:?}",
397                                cmd.instrument_id,
398                                e
399                            )
400                        })?;
401
402                    // Subscribe to all pool event types
403                    core_client
404                        .subscription_manager
405                        .subscribe_swaps(dex, pool_address);
406                    core_client
407                        .subscription_manager
408                        .subscribe_burns(dex, pool_address);
409                    core_client
410                        .subscription_manager
411                        .subscribe_mints(dex, pool_address);
412                    core_client
413                        .subscription_manager
414                        .subscribe_collects(dex, pool_address);
415                    core_client
416                        .subscription_manager
417                        .subscribe_flashes(dex, pool_address);
418
419                    tracing::info!(
420                        "Subscribed to all pool events for {} at address {}",
421                        cmd.instrument_id,
422                        pool_address
423                    );
424                } else {
425                    anyhow::bail!(
426                        "Invalid venue {}, expected Blockchain DEX format",
427                        cmd.instrument_id.venue
428                    )
429                }
430
431                Ok(())
432            }
433            DefiSubscribeCommand::PoolSwaps(cmd) => {
434                tracing::info!(
435                    "Processing subscribe pool swaps command for {}",
436                    cmd.instrument_id
437                );
438
439                if let Some(ref mut _rpc) = core_client.rpc_client {
440                    tracing::warn!(
441                        "RPC pool swaps subscription not yet implemented, using HyperSync"
442                    );
443                }
444
445                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
446                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
447                        .map_err(|e| {
448                            anyhow::anyhow!(
449                                "Invalid pool swap address '{}' failed with error: {:?}",
450                                cmd.instrument_id,
451                                e
452                            )
453                        })?;
454                    core_client
455                        .subscription_manager
456                        .subscribe_swaps(dex, pool_address);
457                } else {
458                    anyhow::bail!(
459                        "Invalid venue {}, expected Blockchain DEX format",
460                        cmd.instrument_id.venue
461                    )
462                }
463
464                Ok(())
465            }
466            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
467                tracing::info!(
468                    "Processing subscribe pool liquidity updates command for address: {}",
469                    cmd.instrument_id
470                );
471
472                if let Some(ref mut _rpc) = core_client.rpc_client {
473                    tracing::warn!(
474                        "RPC pool liquidity updates subscription not yet implemented, using HyperSync"
475                    );
476                }
477
478                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
479                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
480                        .map_err(|_| {
481                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
482                        })?;
483                    core_client
484                        .subscription_manager
485                        .subscribe_burns(dex, pool_address);
486                    core_client
487                        .subscription_manager
488                        .subscribe_mints(dex, pool_address);
489                } else {
490                    anyhow::bail!(
491                        "Invalid venue {}, expected Blockchain DEX format",
492                        cmd.instrument_id.venue
493                    )
494                }
495
496                Ok(())
497            }
498            DefiSubscribeCommand::PoolFeeCollects(cmd) => {
499                tracing::info!(
500                    "Processing subscribe pool fee collects command for address: {}",
501                    cmd.instrument_id
502                );
503
504                if let Some(ref mut _rpc) = core_client.rpc_client {
505                    tracing::warn!(
506                        "RPC pool fee collects subscription not yet implemented, using HyperSync"
507                    );
508                }
509
510                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
511                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
512                        .map_err(|_| {
513                            anyhow::anyhow!(
514                                "Invalid pool fee collect address: {}",
515                                cmd.instrument_id
516                            )
517                        })?;
518                    core_client
519                        .subscription_manager
520                        .subscribe_collects(dex, pool_address);
521                } else {
522                    anyhow::bail!(
523                        "Invalid venue {}, expected Blockchain DEX format",
524                        cmd.instrument_id.venue
525                    )
526                }
527
528                Ok(())
529            }
530            DefiSubscribeCommand::PoolFlashEvents(cmd) => {
531                tracing::info!(
532                    "Processing subscribe pool flash command for address: {}",
533                    cmd.instrument_id
534                );
535
536                if let Some(ref mut _rpc) = core_client.rpc_client {
537                    tracing::warn!(
538                        "RPC pool fee collects subscription not yet implemented, using HyperSync"
539                    );
540                }
541
542                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
543                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
544                        .map_err(|_| {
545                            anyhow::anyhow!(
546                                "Invalid pool flash subscribe address: {}",
547                                cmd.instrument_id
548                            )
549                        })?;
550                    core_client
551                        .subscription_manager
552                        .subscribe_flashes(dex, pool_address);
553                } else {
554                    anyhow::bail!(
555                        "Invalid venue {}, expected Blockchain DEX format",
556                        cmd.instrument_id.venue
557                    )
558                }
559
560                Ok(())
561            }
562        }
563    }
564
565    /// Processes DeFi unsubscription commands to stop receiving specific blockchain data.
566    async fn handle_unsubscribe_command(
567        command: DefiUnsubscribeCommand,
568        core_client: &mut BlockchainDataClientCore,
569    ) -> anyhow::Result<()> {
570        match command {
571            DefiUnsubscribeCommand::Blocks(_cmd) => {
572                tracing::info!("Processing unsubscribe blocks command");
573
574                // TODO: Implement RPC unsubscription when available
575                if core_client.rpc_client.is_some() {
576                    tracing::warn!("RPC blocks unsubscription not yet implemented");
577                }
578
579                // Use HyperSync client for unsubscription
580                core_client.hypersync_client.unsubscribe_blocks().await;
581                tracing::info!("Unsubscribed from blocks via HyperSync");
582
583                Ok(())
584            }
585            DefiUnsubscribeCommand::Pool(cmd) => {
586                tracing::info!(
587                    "Processing unsubscribe pool command for {}",
588                    cmd.instrument_id
589                );
590
591                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
592                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
593                        .map_err(|_| {
594                            anyhow::anyhow!("Invalid pool address: {}", cmd.instrument_id)
595                        })?;
596
597                    // Unsubscribe from all pool event types
598                    core_client
599                        .subscription_manager
600                        .unsubscribe_swaps(dex, pool_address);
601                    core_client
602                        .subscription_manager
603                        .unsubscribe_burns(dex, pool_address);
604                    core_client
605                        .subscription_manager
606                        .unsubscribe_mints(dex, pool_address);
607                    core_client
608                        .subscription_manager
609                        .unsubscribe_collects(dex, pool_address);
610                    core_client
611                        .subscription_manager
612                        .unsubscribe_flashes(dex, pool_address);
613
614                    tracing::info!(
615                        "Unsubscribed from all pool events for {} at address {}",
616                        cmd.instrument_id,
617                        pool_address
618                    );
619                } else {
620                    anyhow::bail!(
621                        "Invalid venue {}, expected Blockchain DEX format",
622                        cmd.instrument_id.venue
623                    )
624                }
625
626                Ok(())
627            }
628            DefiUnsubscribeCommand::PoolSwaps(cmd) => {
629                tracing::info!("Processing unsubscribe pool swaps command");
630
631                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
632                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
633                        .map_err(|_| {
634                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
635                        })?;
636                    core_client
637                        .subscription_manager
638                        .unsubscribe_swaps(dex, pool_address);
639                } else {
640                    anyhow::bail!(
641                        "Invalid venue {}, expected Blockchain DEX format",
642                        cmd.instrument_id.venue
643                    )
644                }
645
646                Ok(())
647            }
648            DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
649                tracing::info!(
650                    "Processing unsubscribe pool liquidity updates command for {}",
651                    cmd.instrument_id
652                );
653
654                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
655                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
656                        .map_err(|_| {
657                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
658                        })?;
659                    core_client
660                        .subscription_manager
661                        .unsubscribe_burns(dex, pool_address);
662                    core_client
663                        .subscription_manager
664                        .unsubscribe_mints(dex, pool_address);
665                } else {
666                    anyhow::bail!(
667                        "Invalid venue {}, expected Blockchain DEX format",
668                        cmd.instrument_id.venue
669                    )
670                }
671
672                Ok(())
673            }
674            DefiUnsubscribeCommand::PoolFeeCollects(cmd) => {
675                tracing::info!(
676                    "Processing unsubscribe pool fee collects command for {}",
677                    cmd.instrument_id
678                );
679
680                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
681                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
682                        .map_err(|_| {
683                            anyhow::anyhow!(
684                                "Invalid pool fee collect address: {}",
685                                cmd.instrument_id
686                            )
687                        })?;
688                    core_client
689                        .subscription_manager
690                        .unsubscribe_collects(dex, pool_address);
691                } else {
692                    anyhow::bail!(
693                        "Invalid venue {}, expected Blockchain DEX format",
694                        cmd.instrument_id.venue
695                    )
696                }
697
698                Ok(())
699            }
700            DefiUnsubscribeCommand::PoolFlashEvents(cmd) => {
701                tracing::info!(
702                    "Processing unsubscribe pool flash command for {}",
703                    cmd.instrument_id
704                );
705
706                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
707                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
708                        .map_err(|_| {
709                            anyhow::anyhow!("Invalid pool flash address: {}", cmd.instrument_id)
710                        })?;
711                    core_client
712                        .subscription_manager
713                        .unsubscribe_flashes(dex, pool_address);
714                } else {
715                    anyhow::bail!(
716                        "Invalid venue {}, expected Blockchain DEX format",
717                        cmd.instrument_id.venue
718                    )
719                }
720
721                Ok(())
722            }
723        }
724    }
725
726    /// Processes DeFi request commands to fetch specific blockchain data.
727    async fn handle_request_command(
728        command: DefiRequestCommand,
729        core_client: &mut BlockchainDataClientCore,
730    ) -> anyhow::Result<()> {
731        match command {
732            DefiRequestCommand::PoolSnapshot(cmd) => {
733                tracing::info!("Processing pool snapshot request for {}", cmd.instrument_id);
734
735                let pool_address =
736                    validate_address(cmd.instrument_id.symbol.as_str()).map_err(|e| {
737                        anyhow::anyhow!(
738                            "Invalid pool address '{}' failed with error: {:?}",
739                            cmd.instrument_id,
740                            e
741                        )
742                    })?;
743
744                match core_client.get_pool(&pool_address) {
745                    Ok(pool) => {
746                        let pool = pool.clone();
747                        tracing::debug!("Found pool for snapshot request: {}", cmd.instrument_id);
748
749                        // Send the pool definition
750                        let pool_data = DataEvent::DeFi(DefiData::Pool(pool.as_ref().clone()));
751                        core_client.send_data(pool_data);
752
753                        match core_client.bootstrap_latest_pool_profiler(&pool).await {
754                            Ok((profiler, already_valid)) => {
755                                let snapshot = profiler.extract_snapshot();
756
757                                tracing::info!(
758                                    "Saving pool snapshot with {} positions and {} ticks to database...",
759                                    snapshot.positions.len(),
760                                    snapshot.ticks.len()
761                                );
762                                core_client
763                                    .cache
764                                    .add_pool_snapshot(&pool.address, &snapshot)
765                                    .await?;
766
767                                // If snapshot is valid, send it back to the data engine.
768                                if core_client
769                                    .check_snapshot_validity(&profiler, already_valid)
770                                    .await?
771                                {
772                                    let snapshot_data =
773                                        DataEvent::DeFi(DefiData::PoolSnapshot(snapshot));
774                                    core_client.send_data(snapshot_data);
775                                }
776                            }
777                            Err(e) => tracing::error!(
778                                "Failed to bootstrap pool profiler for {} and extract snapshot with error {}",
779                                cmd.instrument_id,
780                                e.to_string()
781                            ),
782                        }
783                    }
784                    Err(e) => {
785                        tracing::warn!("Pool {} not found in cache: {e}", cmd.instrument_id);
786                    }
787                }
788
789                Ok(())
790            }
791        }
792    }
793
794    /// Waits for the background processing task to complete.
795    ///
796    /// This method blocks until the spawned process task finishes execution,
797    /// which typically happens after a shutdown signal is sent.
798    pub async fn await_process_task_close(&mut self) {
799        if let Some(handle) = self.process_task.take()
800            && let Err(e) = handle.await
801        {
802            tracing::error!("Process task join error: {e}");
803        }
804    }
805}
806
807#[async_trait::async_trait]
808impl DataClient for BlockchainDataClient {
809    fn client_id(&self) -> ClientId {
810        ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
811    }
812
813    fn venue(&self) -> Option<Venue> {
814        // Blockchain data clients don't map to a single venue since they can provide
815        // data for multiple DEXs across the blockchain
816        None
817    }
818
819    fn start(&mut self) -> anyhow::Result<()> {
820        tracing::info!(
821            chain_name = %self.chain.name,
822            dex_ids = ?self.config.dex_ids,
823            use_hypersync_for_live_data = self.config.use_hypersync_for_live_data,
824            http_proxy_url = ?self.config.http_proxy_url,
825            ws_proxy_url = ?self.config.ws_proxy_url,
826            "Starting blockchain data client"
827        );
828        Ok(())
829    }
830
831    fn stop(&mut self) -> anyhow::Result<()> {
832        tracing::info!(
833            "Stopping blockchain data client for '{chain_name}'",
834            chain_name = self.chain.name
835        );
836        self.cancellation_token.cancel();
837
838        // Create fresh token for next start cycle
839        self.cancellation_token = tokio_util::sync::CancellationToken::new();
840        Ok(())
841    }
842
843    fn reset(&mut self) -> anyhow::Result<()> {
844        tracing::info!(
845            "Resetting blockchain data client for '{chain_name}'",
846            chain_name = self.chain.name
847        );
848        self.cancellation_token = tokio_util::sync::CancellationToken::new();
849        Ok(())
850    }
851
852    fn dispose(&mut self) -> anyhow::Result<()> {
853        tracing::info!(
854            "Disposing blockchain data client for '{chain_name}'",
855            chain_name = self.chain.name
856        );
857        Ok(())
858    }
859
860    async fn connect(&mut self) -> anyhow::Result<()> {
861        tracing::info!(
862            "Connecting blockchain data client for '{}'",
863            self.chain.name
864        );
865
866        if self.process_task.is_none() {
867            self.spawn_process_task();
868        }
869
870        Ok(())
871    }
872
873    async fn disconnect(&mut self) -> anyhow::Result<()> {
874        tracing::info!(
875            "Disconnecting blockchain data client for '{}'",
876            self.chain.name
877        );
878
879        self.cancellation_token.cancel();
880        self.await_process_task_close().await;
881
882        // Create fresh token and channels for next connect cycle
883        self.cancellation_token = tokio_util::sync::CancellationToken::new();
884        let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
885        self.hypersync_tx = Some(hypersync_tx);
886        self.hypersync_rx = Some(hypersync_rx);
887        let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
888        self.command_tx = command_tx;
889        self.command_rx = Some(command_rx);
890
891        Ok(())
892    }
893
894    fn is_connected(&self) -> bool {
895        // TODO: Improve connection detection
896        // For now, we'll assume connected if we have either RPC or HyperSync configured
897        true
898    }
899
900    fn is_disconnected(&self) -> bool {
901        !self.is_connected()
902    }
903
904    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
905        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd.clone()));
906        self.command_tx.send(command)?;
907        Ok(())
908    }
909
910    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
911        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd.clone()));
912        self.command_tx.send(command)?;
913        Ok(())
914    }
915
916    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
917        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd.clone()));
918        self.command_tx.send(command)?;
919        Ok(())
920    }
921
922    fn subscribe_pool_liquidity_updates(
923        &mut self,
924        cmd: &SubscribePoolLiquidityUpdates,
925    ) -> anyhow::Result<()> {
926        let command =
927            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
928        self.command_tx.send(command)?;
929        Ok(())
930    }
931
932    fn subscribe_pool_fee_collects(
933        &mut self,
934        cmd: &SubscribePoolFeeCollects,
935    ) -> anyhow::Result<()> {
936        let command =
937            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFeeCollects(cmd.clone()));
938        self.command_tx.send(command)?;
939        Ok(())
940    }
941
942    fn subscribe_pool_flash_events(
943        &mut self,
944        cmd: &SubscribePoolFlashEvents,
945    ) -> anyhow::Result<()> {
946        let command =
947            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFlashEvents(cmd.clone()));
948        self.command_tx.send(command)?;
949        Ok(())
950    }
951
952    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
953        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
954        self.command_tx.send(command)?;
955        Ok(())
956    }
957
958    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
959        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
960        self.command_tx.send(command)?;
961        Ok(())
962    }
963
964    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
965        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
966        self.command_tx.send(command)?;
967        Ok(())
968    }
969
970    fn unsubscribe_pool_liquidity_updates(
971        &mut self,
972        cmd: &UnsubscribePoolLiquidityUpdates,
973    ) -> anyhow::Result<()> {
974        let command =
975            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
976        self.command_tx.send(command)?;
977        Ok(())
978    }
979
980    fn unsubscribe_pool_fee_collects(
981        &mut self,
982        cmd: &UnsubscribePoolFeeCollects,
983    ) -> anyhow::Result<()> {
984        let command =
985            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFeeCollects(cmd.clone()));
986        self.command_tx.send(command)?;
987        Ok(())
988    }
989
990    fn unsubscribe_pool_flash_events(
991        &mut self,
992        cmd: &UnsubscribePoolFlashEvents,
993    ) -> anyhow::Result<()> {
994        let command =
995            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFlashEvents(cmd.clone()));
996        self.command_tx.send(command)?;
997        Ok(())
998    }
999
1000    fn request_pool_snapshot(
1001        &self,
1002        cmd: &nautilus_common::messages::defi::RequestPoolSnapshot,
1003    ) -> anyhow::Result<()> {
1004        let command = DefiDataCommand::Request(DefiRequestCommand::PoolSnapshot(cmd.clone()));
1005        self.command_tx.send(command)?;
1006        Ok(())
1007    }
1008}