nautilus_blockchain/data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_common::{
17    messages::{
18        DataEvent,
19        defi::{
20            DefiDataCommand, DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand,
21            SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
22            SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
23            UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
24            UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
25        },
26    },
27    runtime::get_runtime,
28};
29use nautilus_data::client::DataClient;
30use nautilus_model::{
31    defi::{DefiData, SharedChain, validation::validate_address},
32    identifiers::{ClientId, Venue},
33};
34
35use crate::{
36    config::BlockchainDataClientConfig,
37    data::core::BlockchainDataClientCore,
38    exchanges::get_dex_extended,
39    rpc::{BlockchainRpcClient, types::BlockchainMessage},
40};
41
42/// A comprehensive client for interacting with blockchain data from multiple sources.
43///
44/// The `BlockchainDataClient` serves as a facade that coordinates between different blockchain
45/// data providers, caching mechanisms, and contract interactions. It provides a unified interface
46/// for retrieving and processing blockchain data, particularly focused on DeFi protocols.
47///
48/// This client supports two primary data sources:
49/// 1. Direct RPC connections to blockchain nodes (via WebSocket).
50/// 2. HyperSync API for efficient historical data queries.
51#[derive(Debug)]
52pub struct BlockchainDataClient {
53    /// The blockchain being targeted by this client instance.
54    pub chain: SharedChain,
55    /// Configuration parameters for the blockchain data client.
56    pub config: BlockchainDataClientConfig,
57    /// The core client instance that handles blockchain operations.
58    /// Wrapped in Option to allow moving it into the background processing task.
59    pub core_client: Option<BlockchainDataClientCore>,
60    /// Channel receiver for messages from the HyperSync client.
61    hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
62    /// Channel sender for messages to the HyperSync client.
63    hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
64    /// Channel sender for commands to be processed asynchronously.
65    command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
66    /// Channel receiver for commands to be processed asynchronously.
67    command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
68    /// Background task for processing messages.
69    process_task: Option<tokio::task::JoinHandle<()>>,
70    /// Oneshot channel sender for graceful shutdown signal.
71    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
72}
73
74impl BlockchainDataClient {
75    /// Creates a new [`BlockchainDataClient`] instance for the specified configuration.
76    #[must_use]
77    pub fn new(config: BlockchainDataClientConfig) -> Self {
78        let chain = config.chain.clone();
79        let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
80        let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
81        Self {
82            chain,
83            core_client: None,
84            config,
85            hypersync_rx: Some(hypersync_rx),
86            hypersync_tx: Some(hypersync_tx),
87            command_tx,
88            command_rx: Some(command_rx),
89            process_task: None,
90            shutdown_tx: None,
91        }
92    }
93
94    /// Spawns the main processing task that handles commands and blockchain data.
95    ///
96    /// This method creates a background task that:
97    /// 1. Processes subscription/unsubscription commands from the command channel
98    /// 2. Handles incoming blockchain data from HyperSync
99    /// 3. Processes RPC messages if RPC client is configured
100    /// 4. Routes processed data to subscribers
101    fn spawn_process_task(&mut self) {
102        let command_rx = if let Some(r) = self.command_rx.take() {
103            r
104        } else {
105            tracing::error!("Command receiver already taken, not spawning handler");
106            return;
107        };
108
109        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
110        self.shutdown_tx = Some(shutdown_tx);
111
112        let data_tx = nautilus_common::runner::get_data_event_sender();
113
114        let mut hypersync_rx = self.hypersync_rx.take().unwrap();
115        let hypersync_tx = self.hypersync_tx.take();
116
117        let mut core_client =
118            BlockchainDataClientCore::new(self.config.clone(), hypersync_tx, Some(data_tx));
119
120        let handle = get_runtime().spawn(async move {
121            tracing::debug!("Started task 'process'");
122
123            if let Err(e) = core_client.connect().await {
124                tracing::error!("Failed to connect blockchain core client: {e}");
125                return;
126            }
127
128            let mut command_rx = command_rx;
129            let mut shutdown_rx = shutdown_rx;
130
131            loop {
132                tokio::select! {
133                    _ = &mut shutdown_rx => {
134                        tracing::debug!("Received shutdown signal in Blockchain data client process task");
135                        core_client.disconnect();
136                        break;
137                    }
138                    command = command_rx.recv() => {
139                        if let Some(cmd) = command {
140                            match cmd {
141                                DefiDataCommand::Subscribe(cmd) => {
142                                    let chain = cmd.blockchain();
143                                    if chain != core_client.chain.name {
144                                        tracing::error!("Incorrect blockchain for subscribe command: {chain}");
145                                        continue;
146                                    }
147
148                                      if let Err(e) = Self::handle_subscribe_command(cmd, &mut core_client).await{
149                                        tracing::error!("Error processing subscribe command: {e}");
150                                    }
151                                }
152                                DefiDataCommand::Unsubscribe(cmd) => {
153                                    let chain = cmd.blockchain();
154                                    if chain != core_client.chain.name {
155                                        tracing::error!("Incorrect blockchain for subscribe command: {chain}");
156                                        continue;
157                                    }
158
159                                    if let Err(e) = Self::handle_unsubscribe_command(cmd, &mut core_client).await{
160                                        tracing::error!("Error processing subscribe command: {e}");
161                                    }
162                                }
163                                DefiDataCommand::Request(cmd) => {
164                                    if let Err(e) = Self::handle_request_command(cmd, &mut core_client).await {
165                                        tracing::error!("Error processing request command: {e}");
166                                    }
167                                }
168                            }
169                        } else {
170                            tracing::debug!("Command channel closed");
171                            break;
172                        }
173                    }
174                    data = hypersync_rx.recv() => {
175                        if let Some(msg) = data {
176                            let data_event = match msg {
177                                BlockchainMessage::Block(block) => {
178                                    // Fetch and process all subscribed events per DEX
179                                    for dex in core_client.cache.get_registered_dexes(){
180                                        let addresses = core_client.subscription_manager.get_subscribed_dex_contract_addresses(&dex);
181                                        if !addresses.is_empty() {
182                                            core_client.hypersync_client.process_block_dex_contract_events(
183                                                &dex,
184                                                block.number,
185                                                addresses,
186                                                core_client.subscription_manager.get_dex_pool_swap_event_signature(&dex).unwrap(),
187                                                core_client.subscription_manager.get_dex_pool_mint_event_signature(&dex).unwrap(),
188                                                core_client.subscription_manager.get_dex_pool_burn_event_signature(&dex).unwrap(),
189                                            ).await;
190                                        }
191                                    }
192
193                                    Some(DataEvent::DeFi(DefiData::Block(block)))
194                                }
195                                BlockchainMessage::SwapEvent(swap_event) => {
196                                    match core_client.get_pool(&swap_event.pool_address) {
197                                        Ok(pool) => {
198                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
199                                            match core_client.process_pool_swap_event(
200                                                &swap_event,
201                                                pool,
202                                                dex_extended,
203                                            ){
204                                                Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
205                                                Err(e) => {
206                                                    tracing::error!("Error processing pool swap event: {e}");
207                                                    None
208                                                }
209                                            }
210                                        }
211                                        Err(e) => {
212                                            tracing::error!("Failed to get pool {} with error {:?}", swap_event.pool_address, e);
213                                            None
214                                        }
215                                    }
216                                }
217                                BlockchainMessage::BurnEvent(burn_event) => {
218                                    match core_client.get_pool(&burn_event.pool_address) {
219                                        Ok(pool) => {
220                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
221                                            match core_client.process_pool_burn_event(
222                                                &burn_event,
223                                                pool,
224                                                dex_extended,
225                                            ){
226                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
227                                                Err(e) => {
228                                                    tracing::error!("Error processing pool burn event: {e}");
229                                                    None
230                                                }
231                                            }
232                                        }
233                                        Err(e) => {
234                                            tracing::error!("Failed to get pool {} with error {:?}", burn_event.pool_address, e);
235                                            None
236                                        }
237                                    }
238                                }
239                                BlockchainMessage::MintEvent(mint_event) => {
240                                    match core_client.get_pool(&mint_event.pool_address) {
241                                        Ok(pool) => {
242                                            let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
243                                            match core_client.process_pool_mint_event(
244                                                &mint_event,
245                                                pool,
246                                                dex_extended,
247                                            ){
248                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
249                                                Err(e) => {
250                                                    tracing::error!("Error processing pool mint event: {e}");
251                                                    None
252                                                }
253                                            }
254                                        }
255                                        Err(e) => {
256                                            tracing::error!("Failed to get pool {} with error {:?}", mint_event.pool_address, e);
257                                            None
258                                        }
259                                    }
260                                }
261                                BlockchainMessage::CollectEvent(collect_event) => {
262                                    match core_client.get_pool(&collect_event.pool_address) {
263                                        Ok(pool) => {
264                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
265                                            match core_client.process_pool_collect_event(
266                                                &collect_event,
267                                                pool,
268                                                dex_extended,
269                                            ){
270                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolFeeCollect(update))),
271                                                Err(e) => {
272                                                    tracing::error!("Error processing pool collect event: {e}");
273                                                    None
274                                                }
275                                            }
276                                        }
277                                        Err(e) => {
278                                            tracing::error!("Failed to get pool {} with error {:?}", collect_event.pool_address, e);
279                                            None
280                                        }
281                                    }
282                                }
283                            BlockchainMessage::FlashEvent(flash_event) => {
284                                    match core_client.get_pool(&flash_event.pool_address) {
285                                        Ok(pool) => {
286                                            match core_client.process_pool_flash_event(&flash_event,pool){
287                                                Ok(flash) => Some(DataEvent::DeFi(DefiData::PoolFlash(flash))),
288                                                Err(e) => {
289                                                    tracing::error!("Error processing pool flash event: {e}");
290                                                    None
291                                                }
292                                            }
293                                        }
294                                        Err(e) => {
295                                            tracing::error!("Failed to get pool {} with error {:?}", flash_event.pool_address, e);
296                                            None
297                                        }
298                                    }
299                                }
300                            };
301
302                            if let Some(event) = data_event {
303                                core_client.send_data(event);
304                            }
305                        } else {
306                            tracing::debug!("HyperSync data channel closed");
307                            break;
308                        }
309                    }
310                    msg = async {
311                        if let Some(ref mut rpc_client) = core_client.rpc_client {
312                            Some(rpc_client.next_rpc_message().await)
313                        } else {
314                            None
315                        }
316                    } => {
317                        if let Some(msg) = msg {
318                            match msg {
319                                Ok(BlockchainMessage::Block(block)) => {
320                                    let data = DataEvent::DeFi(DefiData::Block(block));
321                                    core_client.send_data(data);
322                                },
323                                Ok(BlockchainMessage::SwapEvent(_)) => {
324                                    tracing::warn!("RPC swap events are not yet supported");
325                                }
326                                Ok(BlockchainMessage::MintEvent(_)) => {
327                                    tracing::warn!("RPC mint events are not yet supported");
328                                }
329                                Ok(BlockchainMessage::BurnEvent(_)) => {
330                                    tracing::warn!("RPC burn events are not yet supported");
331                                }
332                                Ok(BlockchainMessage::CollectEvent(_)) => {
333                                    tracing::warn!("RPC collect events are not yet supported")
334                                }
335                                Ok(BlockchainMessage::FlashEvent(_)) => {
336                                    tracing::warn!("RPC flash events are not yet supported")
337                                }
338                                Err(e) => {
339                                    tracing::error!("Error processing RPC message: {e}");
340                                }
341                            }
342                        }
343                    }
344                }
345            }
346
347            tracing::debug!("Stopped task 'process'");
348        });
349
350        self.process_task = Some(handle);
351    }
352
353    /// Processes DeFi subscription commands to start receiving specific blockchain data.
354    async fn handle_subscribe_command(
355        command: DefiSubscribeCommand,
356        core_client: &mut BlockchainDataClientCore,
357    ) -> anyhow::Result<()> {
358        match command {
359            DefiSubscribeCommand::Blocks(_cmd) => {
360                tracing::info!("Processing subscribe blocks command");
361
362                // Try RPC client first if available, otherwise use HyperSync
363                if let Some(ref mut rpc) = core_client.rpc_client {
364                    if let Err(e) = rpc.subscribe_blocks().await {
365                        tracing::warn!(
366                            "RPC blocks subscription failed: {e}, falling back to HyperSync"
367                        );
368                        core_client.hypersync_client.subscribe_blocks();
369                        tokio::task::yield_now().await;
370                    } else {
371                        tracing::info!("Successfully subscribed to blocks via RPC");
372                    }
373                } else {
374                    tracing::info!("Subscribing to blocks via HyperSync");
375                    core_client.hypersync_client.subscribe_blocks();
376                    tokio::task::yield_now().await;
377                }
378
379                Ok(())
380            }
381            DefiSubscribeCommand::Pool(cmd) => {
382                tracing::info!(
383                    "Processing subscribe pool command for {}",
384                    cmd.instrument_id
385                );
386
387                if let Some(ref mut _rpc) = core_client.rpc_client {
388                    tracing::warn!("RPC pool subscription not yet implemented, using HyperSync");
389                }
390
391                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
392                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
393                        .map_err(|e| {
394                            anyhow::anyhow!(
395                                "Invalid pool address '{}' failed with error: {:?}",
396                                cmd.instrument_id,
397                                e
398                            )
399                        })?;
400
401                    // Subscribe to all pool event types
402                    core_client
403                        .subscription_manager
404                        .subscribe_swaps(dex, pool_address);
405                    core_client
406                        .subscription_manager
407                        .subscribe_burns(dex, pool_address);
408                    core_client
409                        .subscription_manager
410                        .subscribe_mints(dex, pool_address);
411                    core_client
412                        .subscription_manager
413                        .subscribe_collects(dex, pool_address);
414                    core_client
415                        .subscription_manager
416                        .subscribe_flashes(dex, pool_address);
417
418                    tracing::info!(
419                        "Subscribed to all pool events for {} at address {}",
420                        cmd.instrument_id,
421                        pool_address
422                    );
423                } else {
424                    anyhow::bail!(
425                        "Invalid venue {}, expected Blockchain DEX format",
426                        cmd.instrument_id.venue
427                    )
428                }
429
430                Ok(())
431            }
432            DefiSubscribeCommand::PoolSwaps(cmd) => {
433                tracing::info!(
434                    "Processing subscribe pool swaps command for {}",
435                    cmd.instrument_id
436                );
437
438                if let Some(ref mut _rpc) = core_client.rpc_client {
439                    tracing::warn!(
440                        "RPC pool swaps subscription not yet implemented, using HyperSync"
441                    );
442                }
443
444                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
445                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
446                        .map_err(|e| {
447                            anyhow::anyhow!(
448                                "Invalid pool swap address '{}' failed with error: {:?}",
449                                cmd.instrument_id,
450                                e
451                            )
452                        })?;
453                    core_client
454                        .subscription_manager
455                        .subscribe_swaps(dex, pool_address);
456                } else {
457                    anyhow::bail!(
458                        "Invalid venue {}, expected Blockchain DEX format",
459                        cmd.instrument_id.venue
460                    )
461                }
462
463                Ok(())
464            }
465            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
466                tracing::info!(
467                    "Processing subscribe pool liquidity updates command for address: {}",
468                    cmd.instrument_id
469                );
470
471                if let Some(ref mut _rpc) = core_client.rpc_client {
472                    tracing::warn!(
473                        "RPC pool liquidity updates subscription not yet implemented, using HyperSync"
474                    );
475                }
476
477                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
478                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
479                        .map_err(|_| {
480                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
481                        })?;
482                    core_client
483                        .subscription_manager
484                        .subscribe_burns(dex, pool_address);
485                    core_client
486                        .subscription_manager
487                        .subscribe_mints(dex, pool_address);
488                } else {
489                    anyhow::bail!(
490                        "Invalid venue {}, expected Blockchain DEX format",
491                        cmd.instrument_id.venue
492                    )
493                }
494
495                Ok(())
496            }
497            DefiSubscribeCommand::PoolFeeCollects(cmd) => {
498                tracing::info!(
499                    "Processing subscribe pool fee collects command for address: {}",
500                    cmd.instrument_id
501                );
502
503                if let Some(ref mut _rpc) = core_client.rpc_client {
504                    tracing::warn!(
505                        "RPC pool fee collects subscription not yet implemented, using HyperSync"
506                    );
507                }
508
509                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
510                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
511                        .map_err(|_| {
512                            anyhow::anyhow!(
513                                "Invalid pool fee collect address: {}",
514                                cmd.instrument_id
515                            )
516                        })?;
517                    core_client
518                        .subscription_manager
519                        .subscribe_collects(dex, pool_address);
520                } else {
521                    anyhow::bail!(
522                        "Invalid venue {}, expected Blockchain DEX format",
523                        cmd.instrument_id.venue
524                    )
525                }
526
527                Ok(())
528            }
529            DefiSubscribeCommand::PoolFlashEvents(cmd) => {
530                tracing::info!(
531                    "Processing subscribe pool flash command for address: {}",
532                    cmd.instrument_id
533                );
534
535                if let Some(ref mut _rpc) = core_client.rpc_client {
536                    tracing::warn!(
537                        "RPC pool fee collects subscription not yet implemented, using HyperSync"
538                    );
539                }
540
541                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
542                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
543                        .map_err(|_| {
544                            anyhow::anyhow!(
545                                "Invalid pool flash subscribe address: {}",
546                                cmd.instrument_id
547                            )
548                        })?;
549                    core_client
550                        .subscription_manager
551                        .subscribe_flashes(dex, pool_address);
552                } else {
553                    anyhow::bail!(
554                        "Invalid venue {}, expected Blockchain DEX format",
555                        cmd.instrument_id.venue
556                    )
557                }
558
559                Ok(())
560            }
561        }
562    }
563
564    /// Processes DeFi unsubscription commands to stop receiving specific blockchain data.
565    async fn handle_unsubscribe_command(
566        command: DefiUnsubscribeCommand,
567        core_client: &mut BlockchainDataClientCore,
568    ) -> anyhow::Result<()> {
569        match command {
570            DefiUnsubscribeCommand::Blocks(_cmd) => {
571                tracing::info!("Processing unsubscribe blocks command");
572
573                // TODO: Implement RPC unsubscription when available
574                if core_client.rpc_client.is_some() {
575                    tracing::warn!("RPC blocks unsubscription not yet implemented");
576                }
577
578                // Use HyperSync client for unsubscription
579                core_client.hypersync_client.unsubscribe_blocks();
580                tracing::info!("Unsubscribed from blocks via HyperSync");
581
582                Ok(())
583            }
584            DefiUnsubscribeCommand::Pool(cmd) => {
585                tracing::info!(
586                    "Processing unsubscribe pool command for {}",
587                    cmd.instrument_id
588                );
589
590                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
591                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
592                        .map_err(|_| {
593                            anyhow::anyhow!("Invalid pool address: {}", cmd.instrument_id)
594                        })?;
595
596                    // Unsubscribe from all pool event types
597                    core_client
598                        .subscription_manager
599                        .unsubscribe_swaps(dex, pool_address);
600                    core_client
601                        .subscription_manager
602                        .unsubscribe_burns(dex, pool_address);
603                    core_client
604                        .subscription_manager
605                        .unsubscribe_mints(dex, pool_address);
606                    core_client
607                        .subscription_manager
608                        .unsubscribe_collects(dex, pool_address);
609                    core_client
610                        .subscription_manager
611                        .unsubscribe_flashes(dex, pool_address);
612
613                    tracing::info!(
614                        "Unsubscribed from all pool events for {} at address {}",
615                        cmd.instrument_id,
616                        pool_address
617                    );
618                } else {
619                    anyhow::bail!(
620                        "Invalid venue {}, expected Blockchain DEX format",
621                        cmd.instrument_id.venue
622                    )
623                }
624
625                Ok(())
626            }
627            DefiUnsubscribeCommand::PoolSwaps(cmd) => {
628                tracing::info!("Processing unsubscribe pool swaps command");
629
630                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
631                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
632                        .map_err(|_| {
633                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
634                        })?;
635                    core_client
636                        .subscription_manager
637                        .unsubscribe_swaps(dex, pool_address);
638                } else {
639                    anyhow::bail!(
640                        "Invalid venue {}, expected Blockchain DEX format",
641                        cmd.instrument_id.venue
642                    )
643                }
644
645                Ok(())
646            }
647            DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
648                tracing::info!(
649                    "Processing unsubscribe pool liquidity updates command for {}",
650                    cmd.instrument_id
651                );
652
653                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
654                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
655                        .map_err(|_| {
656                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
657                        })?;
658                    core_client
659                        .subscription_manager
660                        .unsubscribe_burns(dex, pool_address);
661                    core_client
662                        .subscription_manager
663                        .unsubscribe_mints(dex, pool_address);
664                } else {
665                    anyhow::bail!(
666                        "Invalid venue {}, expected Blockchain DEX format",
667                        cmd.instrument_id.venue
668                    )
669                }
670
671                Ok(())
672            }
673            DefiUnsubscribeCommand::PoolFeeCollects(cmd) => {
674                tracing::info!(
675                    "Processing unsubscribe pool fee collects command for {}",
676                    cmd.instrument_id
677                );
678
679                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
680                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
681                        .map_err(|_| {
682                            anyhow::anyhow!(
683                                "Invalid pool fee collect address: {}",
684                                cmd.instrument_id
685                            )
686                        })?;
687                    core_client
688                        .subscription_manager
689                        .unsubscribe_collects(dex, pool_address);
690                } else {
691                    anyhow::bail!(
692                        "Invalid venue {}, expected Blockchain DEX format",
693                        cmd.instrument_id.venue
694                    )
695                }
696
697                Ok(())
698            }
699            DefiUnsubscribeCommand::PoolFlashEvents(cmd) => {
700                tracing::info!(
701                    "Processing unsubscribe pool flash command for {}",
702                    cmd.instrument_id
703                );
704
705                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
706                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
707                        .map_err(|_| {
708                            anyhow::anyhow!("Invalid pool flash address: {}", cmd.instrument_id)
709                        })?;
710                    core_client
711                        .subscription_manager
712                        .unsubscribe_flashes(dex, pool_address);
713                } else {
714                    anyhow::bail!(
715                        "Invalid venue {}, expected Blockchain DEX format",
716                        cmd.instrument_id.venue
717                    )
718                }
719
720                Ok(())
721            }
722        }
723    }
724
725    /// Processes DeFi request commands to fetch specific blockchain data.
726    async fn handle_request_command(
727        command: DefiRequestCommand,
728        core_client: &mut BlockchainDataClientCore,
729    ) -> anyhow::Result<()> {
730        match command {
731            DefiRequestCommand::PoolSnapshot(cmd) => {
732                tracing::info!("Processing pool snapshot request for {}", cmd.instrument_id);
733
734                let pool_address =
735                    validate_address(cmd.instrument_id.symbol.as_str()).map_err(|e| {
736                        anyhow::anyhow!(
737                            "Invalid pool address '{}' failed with error: {:?}",
738                            cmd.instrument_id,
739                            e
740                        )
741                    })?;
742
743                match core_client.get_pool(&pool_address) {
744                    Ok(pool) => {
745                        tracing::debug!("Found pool for snapshot request: {}", cmd.instrument_id);
746
747                        // Send the pool definition
748                        let pool_data = DataEvent::DeFi(DefiData::Pool(pool.as_ref().clone()));
749                        core_client.send_data(pool_data);
750
751                        // Load and send the stored snapshot from database
752                        if let Some(database) = &core_client.cache.database {
753                            match database
754                                .load_latest_valid_pool_snapshot(
755                                    core_client.chain.chain_id,
756                                    &pool_address,
757                                )
758                                .await
759                            {
760                                Ok(Some(snapshot)) => {
761                                    tracing::info!(
762                                        "Loaded pool snapshot for {} at block {} with {} positions and {} ticks",
763                                        cmd.instrument_id,
764                                        snapshot.block_position.number,
765                                        snapshot.positions.len(),
766                                        snapshot.ticks.len()
767                                    );
768                                    let snapshot_data =
769                                        DataEvent::DeFi(DefiData::PoolSnapshot(snapshot));
770                                    core_client.send_data(snapshot_data);
771                                }
772                                Ok(None) => {
773                                    tracing::warn!(
774                                        "No valid pool snapshot found in database for {}",
775                                        cmd.instrument_id
776                                    );
777                                }
778                                Err(e) => {
779                                    tracing::error!(
780                                        "Failed to load pool snapshot for {}: {e}",
781                                        cmd.instrument_id
782                                    );
783                                }
784                            }
785                        }
786                    }
787                    Err(e) => {
788                        tracing::warn!("Pool {} not found in cache: {e}", cmd.instrument_id);
789                    }
790                }
791
792                Ok(())
793            }
794        }
795    }
796
797    /// Waits for the background processing task to complete.
798    ///
799    /// This method blocks until the spawned process task finishes execution,
800    /// which typically happens after a shutdown signal is sent.
801    pub async fn await_process_task_close(&mut self) {
802        if let Some(handle) = self.process_task.take()
803            && let Err(e) = handle.await
804        {
805            tracing::error!("Process task join error: {e}");
806        }
807    }
808}
809
810#[async_trait::async_trait]
811impl DataClient for BlockchainDataClient {
812    fn client_id(&self) -> ClientId {
813        ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
814    }
815
816    fn venue(&self) -> Option<Venue> {
817        // Blockchain data clients don't map to a single venue since they can provide
818        // data for multiple DEXs across the blockchain
819        None
820    }
821
822    fn start(&mut self) -> anyhow::Result<()> {
823        tracing::info!(
824            "Starting blockchain data client for '{chain_name}'",
825            chain_name = self.chain.name
826        );
827        Ok(())
828    }
829
830    fn stop(&mut self) -> anyhow::Result<()> {
831        tracing::info!(
832            "Stopping blockchain data client for '{chain_name}'",
833            chain_name = self.chain.name
834        );
835        Ok(())
836    }
837
838    fn reset(&mut self) -> anyhow::Result<()> {
839        tracing::info!(
840            "Resetting blockchain data client for '{chain_name}'",
841            chain_name = self.chain.name
842        );
843        Ok(())
844    }
845
846    fn dispose(&mut self) -> anyhow::Result<()> {
847        tracing::info!(
848            "Disposing blockchain data client for '{chain_name}'",
849            chain_name = self.chain.name
850        );
851        Ok(())
852    }
853
854    async fn connect(&mut self) -> anyhow::Result<()> {
855        tracing::info!(
856            "Connecting blockchain data client for '{}'",
857            self.chain.name
858        );
859
860        if self.process_task.is_none() {
861            self.spawn_process_task();
862        }
863
864        Ok(())
865    }
866
867    async fn disconnect(&mut self) -> anyhow::Result<()> {
868        tracing::info!(
869            "Disconnecting blockchain data client for '{}'",
870            self.chain.name
871        );
872
873        if let Some(shutdown_tx) = self.shutdown_tx.take() {
874            let _ = shutdown_tx.send(());
875        }
876        self.await_process_task_close().await;
877
878        Ok(())
879    }
880
881    fn is_connected(&self) -> bool {
882        // TODO: Improve connection detection
883        // For now, we'll assume connected if we have either RPC or HyperSync configured
884        true
885    }
886
887    fn is_disconnected(&self) -> bool {
888        !self.is_connected()
889    }
890
891    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
892        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd.clone()));
893        self.command_tx.send(command)?;
894        Ok(())
895    }
896
897    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
898        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd.clone()));
899        self.command_tx.send(command)?;
900        Ok(())
901    }
902
903    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
904        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd.clone()));
905        self.command_tx.send(command)?;
906        Ok(())
907    }
908
909    fn subscribe_pool_liquidity_updates(
910        &mut self,
911        cmd: &SubscribePoolLiquidityUpdates,
912    ) -> anyhow::Result<()> {
913        let command =
914            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
915        self.command_tx.send(command)?;
916        Ok(())
917    }
918
919    fn subscribe_pool_fee_collects(
920        &mut self,
921        cmd: &SubscribePoolFeeCollects,
922    ) -> anyhow::Result<()> {
923        let command =
924            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFeeCollects(cmd.clone()));
925        self.command_tx.send(command)?;
926        Ok(())
927    }
928
929    fn subscribe_pool_flash_events(
930        &mut self,
931        cmd: &SubscribePoolFlashEvents,
932    ) -> anyhow::Result<()> {
933        let command =
934            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFlashEvents(cmd.clone()));
935        self.command_tx.send(command)?;
936        Ok(())
937    }
938
939    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
940        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
941        self.command_tx.send(command)?;
942        Ok(())
943    }
944
945    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
946        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
947        self.command_tx.send(command)?;
948        Ok(())
949    }
950
951    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
952        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
953        self.command_tx.send(command)?;
954        Ok(())
955    }
956
957    fn unsubscribe_pool_liquidity_updates(
958        &mut self,
959        cmd: &UnsubscribePoolLiquidityUpdates,
960    ) -> anyhow::Result<()> {
961        let command =
962            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
963        self.command_tx.send(command)?;
964        Ok(())
965    }
966
967    fn unsubscribe_pool_fee_collects(
968        &mut self,
969        cmd: &UnsubscribePoolFeeCollects,
970    ) -> anyhow::Result<()> {
971        let command =
972            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFeeCollects(cmd.clone()));
973        self.command_tx.send(command)?;
974        Ok(())
975    }
976
977    fn unsubscribe_pool_flash_events(
978        &mut self,
979        cmd: &UnsubscribePoolFlashEvents,
980    ) -> anyhow::Result<()> {
981        let command =
982            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFlashEvents(cmd.clone()));
983        self.command_tx.send(command)?;
984        Ok(())
985    }
986
987    fn request_pool_snapshot(
988        &self,
989        cmd: &nautilus_common::messages::defi::RequestPoolSnapshot,
990    ) -> anyhow::Result<()> {
991        let command = DefiDataCommand::Request(DefiRequestCommand::PoolSnapshot(cmd.clone()));
992        self.command_tx.send(command)?;
993        Ok(())
994    }
995}