nautilus_blockchain/data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_common::{
17    messages::{
18        DataEvent,
19        defi::{
20            DefiDataCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks,
21            SubscribePool, SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks,
22            UnsubscribePool, UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
23        },
24    },
25    runtime::get_runtime,
26};
27use nautilus_data::client::DataClient;
28use nautilus_model::{
29    defi::{DefiData, SharedChain, validation::validate_address},
30    identifiers::{ClientId, Venue},
31};
32
33use crate::{
34    config::BlockchainDataClientConfig,
35    data::core::BlockchainDataClientCore,
36    exchanges::get_dex_extended,
37    rpc::{BlockchainRpcClient, types::BlockchainMessage},
38};
39
40/// A comprehensive client for interacting with blockchain data from multiple sources.
41///
42/// The `BlockchainDataClient` serves as a facade that coordinates between different blockchain
43/// data providers, caching mechanisms, and contract interactions. It provides a unified interface
44/// for retrieving and processing blockchain data, particularly focused on DeFi protocols.
45///
46/// This client supports two primary data sources:
47/// 1. Direct RPC connections to blockchain nodes (via WebSocket).
48/// 2. HyperSync API for efficient historical data queries.
49#[derive(Debug)]
50pub struct BlockchainDataClient {
51    /// The blockchain being targeted by this client instance.
52    pub chain: SharedChain,
53    /// Configuration parameters for the blockchain data client.
54    pub config: BlockchainDataClientConfig,
55    /// The core client instance that handles blockchain operations.
56    /// Wrapped in Option to allow moving it into the background processing task.
57    pub core_client: Option<BlockchainDataClientCore>,
58    /// Channel receiver for messages from the HyperSync client.
59    hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
60    /// Channel sender for messages to the HyperSync client.
61    hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
62    /// Channel sender for commands to be processed asynchronously.
63    command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
64    /// Channel receiver for commands to be processed asynchronously.
65    command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
66    /// Background task for processing messages.
67    process_task: Option<tokio::task::JoinHandle<()>>,
68    /// Oneshot channel sender for graceful shutdown signal.
69    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
70}
71
72impl BlockchainDataClient {
73    /// Creates a new [`BlockchainDataClient`] instance for the specified configuration.
74    #[must_use]
75    pub fn new(config: BlockchainDataClientConfig) -> Self {
76        let chain = config.chain.clone();
77        let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
78        let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
79        Self {
80            chain,
81            core_client: None,
82            config,
83            hypersync_rx: Some(hypersync_rx),
84            hypersync_tx: Some(hypersync_tx),
85            command_tx,
86            command_rx: Some(command_rx),
87            process_task: None,
88            shutdown_tx: None,
89        }
90    }
91
92    /// Spawns the main processing task that handles commands and blockchain data.
93    ///
94    /// This method creates a background task that:
95    /// 1. Processes subscription/unsubscription commands from the command channel
96    /// 2. Handles incoming blockchain data from HyperSync
97    /// 3. Processes RPC messages if RPC client is configured
98    /// 4. Routes processed data to subscribers
99    fn spawn_process_task(&mut self) {
100        let command_rx = if let Some(r) = self.command_rx.take() {
101            r
102        } else {
103            tracing::error!("Command receiver already taken, not spawning handler");
104            return;
105        };
106
107        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
108        self.shutdown_tx = Some(shutdown_tx);
109
110        let data_tx = nautilus_common::runner::get_data_event_sender();
111
112        let mut hypersync_rx = self.hypersync_rx.take().unwrap();
113        let hypersync_tx = self.hypersync_tx.take();
114
115        let mut core_client =
116            BlockchainDataClientCore::new(self.config.clone(), hypersync_tx, Some(data_tx));
117
118        let handle = get_runtime().spawn(async move {
119            tracing::debug!("Started task 'process'");
120
121            if let Err(e) = core_client.connect().await {
122                tracing::error!("Failed to connect blockchain core client: {e}");
123                return;
124            }
125
126            let mut command_rx = command_rx;
127            let mut shutdown_rx = shutdown_rx;
128
129            loop {
130                tokio::select! {
131                    _ = &mut shutdown_rx => {
132                        tracing::debug!("Received shutdown signal in Blockchain data client process task");
133                        core_client.disconnect();
134                        break;
135                    }
136                    command = command_rx.recv() => {
137                        if let Some(cmd) = command {
138                            match cmd {
139                                DefiDataCommand::Subscribe(cmd) => {
140                                    let chain = cmd.blockchain();
141                                    if chain != core_client.chain.name {
142                                        tracing::error!("Incorrect blockchain for subscribe command: {chain}");
143                                        continue;
144                                    }
145
146                                      if let Err(e) = Self::handle_subscribe_command(cmd, &mut core_client).await{
147                                        tracing::error!("Error processing subscribe command: {e}");
148                                    }
149                                }
150                                DefiDataCommand::Unsubscribe(cmd) => {
151                                    let chain = cmd.blockchain();
152                                    if chain != core_client.chain.name {
153                                        tracing::error!("Incorrect blockchain for subscribe command: {chain}");
154                                        continue;
155                                    }
156
157                                    if let Err(e) = Self::handle_unsubscribe_command(cmd, &mut core_client).await{
158                                        tracing::error!("Error processing subscribe command: {e}");
159                                    }
160                                }
161                            }
162                        } else {
163                            tracing::debug!("Command channel closed");
164                            break;
165                        }
166                    }
167                    data = hypersync_rx.recv() => {
168                        if let Some(msg) = data {
169                            let data_event = match msg {
170                                BlockchainMessage::Block(block) => {
171                                    // Fetch and process all subscribed events per DEX
172                                    for dex in core_client.cache.get_registered_dexes(){
173                                        let addresses = core_client.subscription_manager.get_subscribed_dex_contract_addresses(&dex);
174                                        if !addresses.is_empty() {
175                                            core_client.hypersync_client.process_block_dex_contract_events(
176                                                &dex,
177                                                block.number,
178                                                addresses,
179                                                core_client.subscription_manager.get_dex_pool_swap_event_signature(&dex).unwrap(),
180                                                core_client.subscription_manager.get_dex_pool_mint_event_signature(&dex).unwrap(),
181                                                core_client.subscription_manager.get_dex_pool_burn_event_signature(&dex).unwrap(),
182                                            ).await;
183                                        }
184                                    }
185
186                                    Some(DataEvent::DeFi(DefiData::Block(block)))
187                                }
188                                BlockchainMessage::SwapEvent(swap_event) => {
189                                    match core_client.get_pool(&swap_event.pool_address) {
190                                        Ok(pool) => {
191                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
192                                            match core_client.process_pool_swap_event(
193                                                &swap_event,
194                                                pool,
195                                                dex_extended,
196                                            ){
197                                                Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
198                                                Err(e) => {
199                                                    tracing::error!("Error processing pool swap event: {e}");
200                                                    None
201                                                }
202                                            }
203                                        }
204                                        Err(e) => {
205                                            tracing::error!("Failed to get pool {} with error {:?}", swap_event.pool_address, e);
206                                            None
207                                        }
208                                    }
209                                }
210                                BlockchainMessage::BurnEvent(burn_event) => {
211                                    match core_client.get_pool(&burn_event.pool_address) {
212                                        Ok(pool) => {
213                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
214                                            match core_client.process_pool_burn_event(
215                                                &burn_event,
216                                                pool,
217                                                dex_extended,
218                                            ){
219                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
220                                                Err(e) => {
221                                                    tracing::error!("Error processing pool burn event: {e}");
222                                                    None
223                                                }
224                                            }
225                                        }
226                                        Err(e) => {
227                                            tracing::error!("Failed to get pool {} with error {:?}", burn_event.pool_address, e);
228                                            None
229                                        }
230                                    }
231                                }
232                                BlockchainMessage::MintEvent(mint_event) => {
233                                    match core_client.get_pool(&mint_event.pool_address) {
234                                        Ok(pool) => {
235                                            let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
236                                            match core_client.process_pool_mint_event(
237                                                &mint_event,
238                                                pool,
239                                                dex_extended,
240                                            ){
241                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
242                                                Err(e) => {
243                                                    tracing::error!("Error processing pool mint event: {e}");
244                                                    None
245                                                }
246                                            }
247                                        }
248                                        Err(e) => {
249                                            tracing::error!("Failed to get pool {} with error {:?}", mint_event.pool_address, e);
250                                            None
251                                        }
252                                    }
253                                }
254                                BlockchainMessage::CollectEvent(collect_event) => {
255                                    match core_client.get_pool(&collect_event.pool_address) {
256                                        Ok(pool) => {
257                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
258                                            match core_client.process_pool_collect_event(
259                                                &collect_event,
260                                                pool,
261                                                dex_extended,
262                                            ){
263                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolFeeCollect(update))),
264                                                Err(e) => {
265                                                    tracing::error!("Error processing pool collect event: {e}");
266                                                    None
267                                                }
268                                            }
269                                        }
270                                        Err(e) => {
271                                            tracing::error!("Failed to get pool {} with error {:?}", collect_event.pool_address, e);
272                                            None
273                                        }
274                                    }
275                                }
276                            };
277
278                            if let Some(event) = data_event {
279                                core_client.send_data(event);
280                            }
281                        } else {
282                            tracing::debug!("HyperSync data channel closed");
283                            break;
284                        }
285                    }
286                    msg = async {
287                        if let Some(ref mut rpc_client) = core_client.rpc_client {
288                            Some(rpc_client.next_rpc_message().await)
289                        } else {
290                            None
291                        }
292                    } => {
293                        if let Some(msg) = msg {
294                            match msg {
295                                Ok(BlockchainMessage::Block(block)) => {
296                                    let data = DataEvent::DeFi(DefiData::Block(block));
297                                    core_client.send_data(data);
298                                },
299                                Ok(BlockchainMessage::SwapEvent(_)) => {
300                                    tracing::warn!("RPC swap events are not yet supported");
301                                }
302                                Ok(BlockchainMessage::MintEvent(_)) => {
303                                    tracing::warn!("RPC mint events are not yet supported");
304                                }
305                                Ok(BlockchainMessage::BurnEvent(_)) => {
306                                    tracing::warn!("RPC burn events are not yet supported");
307                                }
308                                Ok(BlockchainMessage::CollectEvent(_)) => {
309                                    tracing::warn!("RPC collect events are not yet supported")
310                                }
311                                Err(e) => {
312                                    tracing::error!("Error processing RPC message: {e}");
313                                }
314                            }
315                        }
316                    }
317                }
318            }
319
320            tracing::debug!("Stopped task 'process'");
321        });
322
323        self.process_task = Some(handle);
324    }
325
326    /// Processes DeFi subscription commands to start receiving specific blockchain data.
327    async fn handle_subscribe_command(
328        command: DefiSubscribeCommand,
329        core_client: &mut BlockchainDataClientCore,
330    ) -> anyhow::Result<()> {
331        match command {
332            DefiSubscribeCommand::Blocks(_cmd) => {
333                tracing::info!("Processing subscribe blocks command");
334
335                // Try RPC client first if available, otherwise use HyperSync
336                if let Some(ref mut rpc) = core_client.rpc_client {
337                    if let Err(e) = rpc.subscribe_blocks().await {
338                        tracing::warn!(
339                            "RPC blocks subscription failed: {e}, falling back to HyperSync"
340                        );
341                        core_client.hypersync_client.subscribe_blocks();
342                        tokio::task::yield_now().await;
343                    } else {
344                        tracing::info!("Successfully subscribed to blocks via RPC");
345                    }
346                } else {
347                    tracing::info!("Subscribing to blocks via HyperSync");
348                    core_client.hypersync_client.subscribe_blocks();
349                    tokio::task::yield_now().await;
350                }
351
352                Ok(())
353            }
354            DefiSubscribeCommand::Pool(_cmd) => {
355                tracing::info!("Processing subscribe pool command");
356                // Pool subscriptions are typically handled at the application level
357                // as they involve specific pool addresses and don't require blockchain streaming
358                tracing::warn!("Pool subscriptions are handled at application level");
359                Ok(())
360            }
361            DefiSubscribeCommand::PoolSwaps(cmd) => {
362                tracing::info!(
363                    "Processing subscribe pool swaps command for {}",
364                    cmd.instrument_id
365                );
366
367                if let Some(ref mut _rpc) = core_client.rpc_client {
368                    tracing::warn!(
369                        "RPC pool swaps subscription not yet implemented, using HyperSync"
370                    );
371                }
372
373                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
374                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
375                        .map_err(|e| {
376                            anyhow::anyhow!(
377                                "Invalid pool swap address '{}' failed with error: {:?}",
378                                cmd.instrument_id,
379                                e
380                            )
381                        })?;
382                    core_client
383                        .subscription_manager
384                        .subscribe_swaps(dex, pool_address);
385                } else {
386                    anyhow::bail!(
387                        "Invalid venue {}, expected Blockchain DEX format",
388                        cmd.instrument_id.venue
389                    )
390                }
391
392                Ok(())
393            }
394            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
395                tracing::info!(
396                    "Processing subscribe pool liquidity updates command for address: {}",
397                    cmd.instrument_id
398                );
399
400                if let Some(ref mut _rpc) = core_client.rpc_client {
401                    tracing::warn!(
402                        "RPC pool liquidity updates subscription not yet implemented, using HyperSync"
403                    );
404                }
405
406                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
407                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
408                        .map_err(|_| {
409                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
410                        })?;
411                    core_client
412                        .subscription_manager
413                        .subscribe_burns(dex, pool_address);
414                    core_client
415                        .subscription_manager
416                        .subscribe_mints(dex, pool_address);
417                } else {
418                    anyhow::bail!(
419                        "Invalid venue {}, expected Blockchain DEX format",
420                        cmd.instrument_id.venue
421                    )
422                }
423
424                Ok(())
425            }
426        }
427    }
428
429    /// Processes DeFi unsubscription commands to stop receiving specific blockchain data.
430    async fn handle_unsubscribe_command(
431        command: DefiUnsubscribeCommand,
432        core_client: &mut BlockchainDataClientCore,
433    ) -> anyhow::Result<()> {
434        match command {
435            DefiUnsubscribeCommand::Blocks(_cmd) => {
436                tracing::info!("Processing unsubscribe blocks command");
437
438                // TODO: Implement RPC unsubscription when available
439                if core_client.rpc_client.is_some() {
440                    tracing::warn!("RPC blocks unsubscription not yet implemented");
441                }
442
443                // Use HyperSync client for unsubscription
444                core_client.hypersync_client.unsubscribe_blocks();
445                tracing::info!("Unsubscribed from blocks via HyperSync");
446
447                Ok(())
448            }
449            DefiUnsubscribeCommand::Pool(_cmd) => {
450                tracing::info!("Processing unsubscribe pool command");
451                // Pool unsubscriptions are typically handled at the application level
452                tracing::warn!("Pool unsubscriptions are handled at application level");
453                Ok(())
454            }
455            DefiUnsubscribeCommand::PoolSwaps(cmd) => {
456                tracing::info!("Processing unsubscribe pool swaps command");
457
458                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
459                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
460                        .map_err(|_| {
461                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
462                        })?;
463                    core_client
464                        .subscription_manager
465                        .unsubscribe_swaps(dex, pool_address);
466                } else {
467                    anyhow::bail!(
468                        "Invalid venue {}, expected Blockchain DEX format",
469                        cmd.instrument_id.venue
470                    )
471                }
472
473                Ok(())
474            }
475            DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
476                tracing::info!(
477                    "Processing unsubscribe pool liquidity updates command for {}",
478                    cmd.instrument_id
479                );
480
481                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
482                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
483                        .map_err(|_| {
484                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
485                        })?;
486                    core_client
487                        .subscription_manager
488                        .unsubscribe_burns(dex, pool_address);
489                    core_client
490                        .subscription_manager
491                        .unsubscribe_mints(dex, pool_address);
492                } else {
493                    anyhow::bail!(
494                        "Invalid venue {}, expected Blockchain DEX format",
495                        cmd.instrument_id.venue
496                    )
497                }
498
499                Ok(())
500            }
501        }
502    }
503
504    /// Waits for the background processing task to complete.
505    ///
506    /// This method blocks until the spawned process task finishes execution,
507    /// which typically happens after a shutdown signal is sent.
508    pub async fn await_process_task_close(&mut self) {
509        if let Some(handle) = self.process_task.take()
510            && let Err(e) = handle.await
511        {
512            tracing::error!("Process task join error: {e}");
513        }
514    }
515}
516
517#[async_trait::async_trait]
518impl DataClient for BlockchainDataClient {
519    fn client_id(&self) -> ClientId {
520        ClientId::from(format!("BLOCKCHAIN-{}", self.chain.name).as_str())
521    }
522
523    fn venue(&self) -> Option<Venue> {
524        // Blockchain data clients don't map to a single venue since they can provide
525        // data for multiple DEXs across the blockchain
526        None
527    }
528
529    fn start(&mut self) -> anyhow::Result<()> {
530        tracing::info!(
531            "Starting blockchain data client for '{chain_name}'",
532            chain_name = self.chain.name
533        );
534        Ok(())
535    }
536
537    fn stop(&mut self) -> anyhow::Result<()> {
538        tracing::info!(
539            "Stopping blockchain data client for '{chain_name}'",
540            chain_name = self.chain.name
541        );
542        Ok(())
543    }
544
545    fn reset(&mut self) -> anyhow::Result<()> {
546        tracing::info!(
547            "Resetting blockchain data client for '{chain_name}'",
548            chain_name = self.chain.name
549        );
550        Ok(())
551    }
552
553    fn dispose(&mut self) -> anyhow::Result<()> {
554        tracing::info!(
555            "Disposing blockchain data client for '{chain_name}'",
556            chain_name = self.chain.name
557        );
558        Ok(())
559    }
560
561    async fn connect(&mut self) -> anyhow::Result<()> {
562        tracing::info!(
563            "Connecting blockchain data client for '{}'",
564            self.chain.name
565        );
566
567        if self.process_task.is_none() {
568            self.spawn_process_task();
569        }
570
571        Ok(())
572    }
573
574    async fn disconnect(&mut self) -> anyhow::Result<()> {
575        tracing::info!(
576            "Disconnecting blockchain data client for '{}'",
577            self.chain.name
578        );
579
580        if let Some(shutdown_tx) = self.shutdown_tx.take() {
581            let _ = shutdown_tx.send(());
582        }
583        self.await_process_task_close().await;
584
585        Ok(())
586    }
587
588    fn is_connected(&self) -> bool {
589        // TODO: Improve connection detection
590        // For now, we'll assume connected if we have either RPC or HyperSync configured
591        true
592    }
593
594    fn is_disconnected(&self) -> bool {
595        !self.is_connected()
596    }
597
598    fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
599        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd.clone()));
600        self.command_tx.send(command)?;
601        Ok(())
602    }
603
604    fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
605        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd.clone()));
606        self.command_tx.send(command)?;
607        Ok(())
608    }
609
610    fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
611        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd.clone()));
612        self.command_tx.send(command)?;
613        Ok(())
614    }
615
616    fn subscribe_pool_liquidity_updates(
617        &mut self,
618        cmd: &SubscribePoolLiquidityUpdates,
619    ) -> anyhow::Result<()> {
620        let command =
621            DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
622        self.command_tx.send(command)?;
623        Ok(())
624    }
625
626    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
627        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
628        self.command_tx.send(command)?;
629        Ok(())
630    }
631
632    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
633        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
634        self.command_tx.send(command)?;
635        Ok(())
636    }
637
638    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
639        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
640        self.command_tx.send(command)?;
641        Ok(())
642    }
643
644    fn unsubscribe_pool_liquidity_updates(
645        &mut self,
646        cmd: &UnsubscribePoolLiquidityUpdates,
647    ) -> anyhow::Result<()> {
648        let command =
649            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
650        self.command_tx.send(command)?;
651        Ok(())
652    }
653}