nautilus_blockchain/execution/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashSet, sync::Arc};
17
18use alloy::primitives::Address;
19use async_trait::async_trait;
20use nautilus_common::{
21    clients::ExecutionClient,
22    messages::execution::{
23        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
24        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
25        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
26    },
27};
28use nautilus_core::UnixNanos;
29use nautilus_live::ExecutionClientCore;
30use nautilus_model::{
31    accounts::AccountAny,
32    defi::{
33        SharedChain, Token,
34        validation::validate_address,
35        wallet::{TokenBalance, WalletBalance},
36    },
37    enums::OmsType,
38    identifiers::{AccountId, ClientId, Venue},
39    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
40    types::{AccountBalance, MarginBalance, Money},
41};
42
43use crate::{
44    cache::BlockchainCache, config::BlockchainExecutionClientConfig,
45    contracts::erc20::Erc20Contract, rpc::http::BlockchainHttpRpcClient,
46};
47
48/// Execution client for blockchain interactions including balance tracking and order execution.
49#[derive(Debug)]
50pub struct BlockchainExecutionClient {
51    /// Core execution client providing base functionality.
52    core: ExecutionClientCore,
53    /// Cache for storing token metadata and other blockchain data.
54    cache: BlockchainCache,
55    /// The blockchain network configuration.
56    chain: SharedChain,
57    /// The wallet address used for transactions and balance queries.
58    wallet_address: Address,
59    /// Tracks native currency and ERC-20 token balances.
60    wallet_balance: WalletBalance,
61    /// Contract interface for ERC-20 token interactions.
62    erc20_contract: Erc20Contract,
63    /// Whether the client is currently connected.
64    connected: bool,
65    /// HTTP RPC client for blockchain queries.
66    http_rpc_client: Arc<BlockchainHttpRpcClient>,
67}
68
69impl BlockchainExecutionClient {
70    /// Creates a new [`BlockchainExecutionClient`] instance for the specified configuration.
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if the wallet address or any token address in the config is invalid.
75    pub fn new(
76        core_client: ExecutionClientCore,
77        config: BlockchainExecutionClientConfig,
78    ) -> anyhow::Result<Self> {
79        let chain = Arc::new(config.chain);
80        let cache = BlockchainCache::new(chain.clone());
81        let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
82            config.http_rpc_url.clone(),
83            config.rpc_requests_per_second,
84        ));
85        let wallet_address = validate_address(config.wallet_address.as_str())?;
86        let erc20_contract = Erc20Contract::new(http_rpc_client.clone(), true);
87
88        // Initialize token universe, so we can fetch them from the blockchain later.
89        let mut token_universe = HashSet::new();
90        if let Some(specified_tokens) = config.tokens {
91            for token in specified_tokens {
92                let token_address = validate_address(token.as_str())?;
93                token_universe.insert(token_address);
94            }
95        }
96        let wallet_balance = WalletBalance::new(token_universe);
97
98        Ok(Self {
99            core: core_client,
100            connected: false,
101            wallet_balance,
102            chain,
103            cache,
104            erc20_contract,
105            http_rpc_client,
106            wallet_address,
107        })
108    }
109
110    /// Fetches the native currency balance (e.g., ETH) for the wallet from the blockchain.
111    async fn fetch_native_currency_balance(&self) -> anyhow::Result<Money> {
112        let balance_u256 = self
113            .http_rpc_client
114            .get_balance(&self.wallet_address, None)
115            .await?;
116
117        let native_currency = self.chain.native_currency();
118
119        // Convert from wei (18 decimals on-chain) to Money
120        let balance = Money::from_wei(balance_u256, native_currency);
121
122        Ok(balance)
123    }
124
125    /// Fetches the balance of a specific ERC-20 token for the wallet.
126    async fn fetch_token_balance(
127        &mut self,
128        token_address: &Address,
129    ) -> anyhow::Result<TokenBalance> {
130        // Get the cached token or fetch it from the blockchain and cache it.
131        let token = if let Some(token) = self.cache.get_token(token_address) {
132            token.to_owned()
133        } else {
134            let token_info = self.erc20_contract.fetch_token_info(token_address).await?;
135            let token = Token::new(
136                self.chain.clone(),
137                *token_address,
138                token_info.name,
139                token_info.symbol,
140                token_info.decimals,
141            );
142            self.cache.add_token(token.clone()).await?;
143            token
144        };
145
146        let amount = self
147            .erc20_contract
148            .balance_of(token_address, &self.wallet_address)
149            .await?;
150        let token_balance = TokenBalance::new(amount, token);
151
152        // TODO: Use price oracle here and cache, to get the latest price then convert to USD
153        // then use token_balance.set_amount_usd(amount_usd) to set the amount_usd value.
154
155        Ok(token_balance)
156    }
157
158    /// Refreshes all wallet balances including native currency and tracked ERC-20 tokens.
159    async fn refresh_wallet_balances(&mut self) -> anyhow::Result<()> {
160        let native_currency_balance = self.fetch_native_currency_balance().await?;
161        log::info!(
162            "Initializing wallet balance with native currency balance: {} {}",
163            native_currency_balance.as_decimal(),
164            native_currency_balance.currency
165        );
166        self.wallet_balance
167            .set_native_currency_balance(native_currency_balance);
168
169        // Fetch token balances from the blockchain.
170        if self.wallet_balance.is_token_universe_initialized() {
171            let tokens: Vec<Address> = self
172                .wallet_balance
173                .token_universe
174                .clone()
175                .into_iter()
176                .collect();
177            for token in tokens {
178                if let Ok(token_balance) = self.fetch_token_balance(&token).await {
179                    log::info!("Adding token balance to the wallet: {token_balance}");
180                    self.wallet_balance.add_token_balance(token_balance);
181                }
182            }
183        } else {
184            // TODO sync from transfer events for tokens that wallet interacted with.
185        }
186
187        Ok(())
188    }
189}
190
191#[async_trait(?Send)]
192impl ExecutionClient for BlockchainExecutionClient {
193    fn is_connected(&self) -> bool {
194        self.connected
195    }
196
197    fn client_id(&self) -> ClientId {
198        self.core.client_id
199    }
200
201    fn account_id(&self) -> AccountId {
202        self.core.account_id
203    }
204
205    fn venue(&self) -> Venue {
206        self.core.venue
207    }
208
209    fn oms_type(&self) -> OmsType {
210        self.core.oms_type
211    }
212
213    fn get_account(&self) -> Option<AccountAny> {
214        todo!("implement get_account")
215    }
216
217    fn generate_account_state(
218        &self,
219        _balances: Vec<AccountBalance>,
220        _margins: Vec<MarginBalance>,
221        _reported: bool,
222        _ts_event: UnixNanos,
223    ) -> anyhow::Result<()> {
224        todo!("implement generate_account_state")
225    }
226
227    fn start(&mut self) -> anyhow::Result<()> {
228        todo!("implement start")
229    }
230
231    fn stop(&mut self) -> anyhow::Result<()> {
232        todo!("implement stop")
233    }
234
235    fn submit_order(&self, _cmd: &SubmitOrder) -> anyhow::Result<()> {
236        todo!("implement submit_order")
237    }
238
239    fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
240        todo!("implement submit_order_list")
241    }
242
243    fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
244        todo!("implement modify_order")
245    }
246
247    fn cancel_order(&self, _cmd: &CancelOrder) -> anyhow::Result<()> {
248        todo!("implement cancel_order")
249    }
250
251    fn cancel_all_orders(&self, _cmd: &CancelAllOrders) -> anyhow::Result<()> {
252        todo!("implement cancel_all_orders")
253    }
254
255    fn batch_cancel_orders(&self, _cmd: &BatchCancelOrders) -> anyhow::Result<()> {
256        todo!("implement batch_cancel_orders")
257    }
258
259    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
260        todo!("implement query_account")
261    }
262
263    fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
264        todo!("implement query_order")
265    }
266
267    async fn connect(&mut self) -> anyhow::Result<()> {
268        if self.connected {
269            log::warn!("Blockchain execution client already connected");
270            return Ok(());
271        }
272
273        log::info!(
274            "Connecting to blockchain execution client on chain {}",
275            self.chain.name
276        );
277
278        self.refresh_wallet_balances().await?;
279
280        self.connected = true;
281        log::info!(
282            "Blockchain execution client connected on chain {}",
283            self.chain.name
284        );
285        Ok(())
286    }
287
288    async fn disconnect(&mut self) -> anyhow::Result<()> {
289        self.connected = false;
290        Ok(())
291    }
292
293    async fn generate_order_status_report(
294        &self,
295        _cmd: &GenerateOrderStatusReport,
296    ) -> anyhow::Result<Option<OrderStatusReport>> {
297        todo!("implement generate_order_status_report")
298    }
299
300    async fn generate_order_status_reports(
301        &self,
302        _cmd: &GenerateOrderStatusReports,
303    ) -> anyhow::Result<Vec<OrderStatusReport>> {
304        todo!("implement generate_order_status_reports")
305    }
306
307    async fn generate_fill_reports(
308        &self,
309        _cmd: GenerateFillReports,
310    ) -> anyhow::Result<Vec<FillReport>> {
311        todo!("implement generate_fill_reports")
312    }
313
314    async fn generate_position_status_reports(
315        &self,
316        _cmd: &GeneratePositionStatusReports,
317    ) -> anyhow::Result<Vec<PositionStatusReport>> {
318        todo!("implement generate_position_status_reports")
319    }
320
321    async fn generate_mass_status(
322        &self,
323        _lookback_mins: Option<u64>,
324    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
325        todo!("implement generate_mass_status")
326    }
327}