nautilus_blockchain/execution/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashSet, sync::Arc};
17
18use alloy::primitives::Address;
19use async_trait::async_trait;
20use nautilus_common::messages::execution::{
21    BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
22    GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount, QueryOrder,
23    SubmitOrder, SubmitOrderList,
24};
25use nautilus_core::UnixNanos;
26use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
27use nautilus_live::execution::client::LiveExecutionClient;
28use nautilus_model::{
29    accounts::AccountAny,
30    defi::{
31        SharedChain, Token,
32        validation::validate_address,
33        wallet::{TokenBalance, WalletBalance},
34    },
35    enums::OmsType,
36    identifiers::{AccountId, ClientId, Venue},
37    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
38    types::{AccountBalance, MarginBalance, Money},
39};
40
41use crate::{
42    cache::BlockchainCache, config::BlockchainExecutionClientConfig,
43    contracts::erc20::Erc20Contract, rpc::http::BlockchainHttpRpcClient,
44};
45
46/// Execution client for blockchain interactions including balance tracking and order execution.
47#[derive(Debug)]
48pub struct BlockchainExecutionClient {
49    /// Core execution client providing base functionality.
50    core: ExecutionClientCore,
51    /// Cache for storing token metadata and other blockchain data.
52    cache: BlockchainCache,
53    /// The blockchain network configuration.
54    chain: SharedChain,
55    /// The wallet address used for transactions and balance queries.
56    wallet_address: Address,
57    /// Tracks native currency and ERC-20 token balances.
58    wallet_balance: WalletBalance,
59    /// Contract interface for ERC-20 token interactions.
60    erc20_contract: Erc20Contract,
61    /// Whether the client is currently connected.
62    connected: bool,
63    /// HTTP RPC client for blockchain queries.
64    http_rpc_client: Arc<BlockchainHttpRpcClient>,
65}
66
67impl BlockchainExecutionClient {
68    /// Creates a new [`BlockchainExecutionClient`] instance for the specified configuration.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the wallet address or any token address in the config is invalid.
73    pub fn new(
74        core_client: ExecutionClientCore,
75        config: BlockchainExecutionClientConfig,
76    ) -> anyhow::Result<Self> {
77        let chain = Arc::new(config.chain);
78        let cache = BlockchainCache::new(chain.clone());
79        let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
80            config.http_rpc_url.clone(),
81            config.rpc_requests_per_second,
82        ));
83        let wallet_address = validate_address(config.wallet_address.as_str())?;
84        let erc20_contract = Erc20Contract::new(http_rpc_client.clone(), true);
85
86        // Initialize token universe, so we can fetch them from the blockchain later.
87        let mut token_universe = HashSet::new();
88        if let Some(specified_tokens) = config.tokens {
89            for token in specified_tokens {
90                let token_address = validate_address(token.as_str())?;
91                token_universe.insert(token_address);
92            }
93        }
94        let wallet_balance = WalletBalance::new(token_universe);
95
96        Ok(Self {
97            core: core_client,
98            connected: false,
99            wallet_balance,
100            chain,
101            cache,
102            erc20_contract,
103            http_rpc_client,
104            wallet_address,
105        })
106    }
107
108    /// Fetches the native currency balance (e.g., ETH) for the wallet from the blockchain.
109    async fn fetch_native_currency_balance(&self) -> anyhow::Result<Money> {
110        let balance_u256 = self
111            .http_rpc_client
112            .get_balance(&self.wallet_address, None)
113            .await?;
114
115        let native_currency = self.chain.native_currency();
116
117        // Convert from wei (18 decimals on-chain) to Money
118        let balance = Money::from_wei(balance_u256, native_currency);
119
120        Ok(balance)
121    }
122
123    /// Fetches the balance of a specific ERC-20 token for the wallet.
124    async fn fetch_token_balance(
125        &mut self,
126        token_address: &Address,
127    ) -> anyhow::Result<TokenBalance> {
128        // Get the cached token or fetch it from the blockchain and cache it.
129        let token = if let Some(token) = self.cache.get_token(token_address) {
130            token.to_owned()
131        } else {
132            let token_info = self.erc20_contract.fetch_token_info(token_address).await?;
133            let token = Token::new(
134                self.chain.clone(),
135                *token_address,
136                token_info.name,
137                token_info.symbol,
138                token_info.decimals,
139            );
140            self.cache.add_token(token.clone()).await?;
141            token
142        };
143
144        let amount = self
145            .erc20_contract
146            .balance_of(token_address, &self.wallet_address)
147            .await?;
148        let token_balance = TokenBalance::new(amount, token);
149
150        // TODO: Use price oracle here and cache, to get the latest price then convert to USD
151        // then use token_balance.set_amount_usd(amount_usd) to set the amount_usd value.
152
153        Ok(token_balance)
154    }
155
156    /// Refreshes all wallet balances including native currency and tracked ERC-20 tokens.
157    async fn refresh_wallet_balances(&mut self) -> anyhow::Result<()> {
158        let native_currency_balance = self.fetch_native_currency_balance().await?;
159        tracing::info!(
160            "Initializing wallet balance with native currency balance: {} {}",
161            native_currency_balance.as_decimal(),
162            native_currency_balance.currency
163        );
164        self.wallet_balance
165            .set_native_currency_balance(native_currency_balance);
166
167        // Fetch token balances from the blockchain.
168        if !self.wallet_balance.is_token_universe_initialized() {
169            // TODO sync from transfer events for tokens that wallet interacted with.
170        } else {
171            let tokens: Vec<Address> = self
172                .wallet_balance
173                .token_universe
174                .clone()
175                .into_iter()
176                .collect();
177            for token in tokens {
178                if let Ok(token_balance) = self.fetch_token_balance(&token).await {
179                    tracing::info!("Adding token balance to the wallet: {}", token_balance);
180                    self.wallet_balance.add_token_balance(token_balance);
181                }
182            }
183        }
184
185        Ok(())
186    }
187}
188
189#[async_trait(?Send)]
190impl ExecutionClient for BlockchainExecutionClient {
191    fn is_connected(&self) -> bool {
192        self.connected
193    }
194
195    fn client_id(&self) -> ClientId {
196        self.core.client_id
197    }
198
199    fn account_id(&self) -> AccountId {
200        self.core.account_id
201    }
202
203    fn venue(&self) -> Venue {
204        self.core.venue
205    }
206
207    fn oms_type(&self) -> OmsType {
208        self.core.oms_type
209    }
210
211    fn get_account(&self) -> Option<AccountAny> {
212        todo!("implement get_account")
213    }
214
215    fn generate_account_state(
216        &self,
217        _balances: Vec<AccountBalance>,
218        _margins: Vec<MarginBalance>,
219        _reported: bool,
220        _ts_event: UnixNanos,
221    ) -> anyhow::Result<()> {
222        todo!("implement generate_account_state")
223    }
224
225    fn start(&mut self) -> anyhow::Result<()> {
226        todo!("implement start")
227    }
228
229    fn stop(&mut self) -> anyhow::Result<()> {
230        todo!("implement stop")
231    }
232
233    fn submit_order(&self, _cmd: &SubmitOrder) -> anyhow::Result<()> {
234        todo!("implement submit_order")
235    }
236
237    fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
238        todo!("implement submit_order_list")
239    }
240
241    fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
242        todo!("implement modify_order")
243    }
244
245    fn cancel_order(&self, _cmd: &CancelOrder) -> anyhow::Result<()> {
246        todo!("implement cancel_order")
247    }
248
249    fn cancel_all_orders(&self, _cmd: &CancelAllOrders) -> anyhow::Result<()> {
250        todo!("implement cancel_all_orders")
251    }
252
253    fn batch_cancel_orders(&self, _cmd: &BatchCancelOrders) -> anyhow::Result<()> {
254        todo!("implement batch_cancel_orders")
255    }
256
257    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
258        todo!("implement query_account")
259    }
260
261    fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
262        todo!("implement query_order")
263    }
264
265    async fn connect(&mut self) -> anyhow::Result<()> {
266        if self.connected {
267            tracing::warn!("Blockchain execution client already connected");
268            return Ok(());
269        }
270
271        tracing::info!(
272            "Connecting to blockchain execution client on chain {}",
273            self.chain.name
274        );
275
276        self.refresh_wallet_balances().await?;
277
278        self.connected = true;
279        tracing::info!(
280            "Blockchain execution client connected on chain {}",
281            self.chain.name
282        );
283        Ok(())
284    }
285
286    async fn disconnect(&mut self) -> anyhow::Result<()> {
287        self.connected = false;
288        Ok(())
289    }
290}
291
292#[async_trait(?Send)]
293impl LiveExecutionClient for BlockchainExecutionClient {
294    async fn generate_order_status_report(
295        &self,
296        _cmd: &GenerateOrderStatusReport,
297    ) -> anyhow::Result<Option<OrderStatusReport>> {
298        todo!("implement generate_order_status_report")
299    }
300
301    async fn generate_order_status_reports(
302        &self,
303        _cmd: &GenerateOrderStatusReport,
304    ) -> anyhow::Result<Vec<OrderStatusReport>> {
305        todo!("implement generate_order_status_reports")
306    }
307
308    async fn generate_fill_reports(
309        &self,
310        _cmd: GenerateFillReports,
311    ) -> anyhow::Result<Vec<FillReport>> {
312        todo!("implement generate_fill_reports")
313    }
314
315    async fn generate_position_status_reports(
316        &self,
317        _cmd: &GeneratePositionReports,
318    ) -> anyhow::Result<Vec<PositionStatusReport>> {
319        todo!("implement generate_position_status_reports")
320    }
321
322    async fn generate_mass_status(
323        &self,
324        _lookback_mins: Option<u64>,
325    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
326        todo!("implement generate_mass_status")
327    }
328}