nautilus_blockchain/execution/
client.rs1use std::{collections::HashSet, sync::Arc};
17
18use alloy::primitives::Address;
19use async_trait::async_trait;
20use nautilus_common::messages::execution::{
21 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
22 GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount, QueryOrder,
23 SubmitOrder, SubmitOrderList,
24};
25use nautilus_core::UnixNanos;
26use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
27use nautilus_live::execution::client::LiveExecutionClient;
28use nautilus_model::{
29 accounts::AccountAny,
30 defi::{
31 SharedChain, Token,
32 validation::validate_address,
33 wallet::{TokenBalance, WalletBalance},
34 },
35 enums::OmsType,
36 identifiers::{AccountId, ClientId, Venue},
37 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
38 types::{AccountBalance, MarginBalance, Money},
39};
40
41use crate::{
42 cache::BlockchainCache, config::BlockchainExecutionClientConfig,
43 contracts::erc20::Erc20Contract, rpc::http::BlockchainHttpRpcClient,
44};
45
46#[derive(Debug)]
48pub struct BlockchainExecutionClient {
49 core: ExecutionClientCore,
51 cache: BlockchainCache,
53 chain: SharedChain,
55 wallet_address: Address,
57 wallet_balance: WalletBalance,
59 erc20_contract: Erc20Contract,
61 connected: bool,
63 http_rpc_client: Arc<BlockchainHttpRpcClient>,
65}
66
67impl BlockchainExecutionClient {
68 pub fn new(
74 core_client: ExecutionClientCore,
75 config: BlockchainExecutionClientConfig,
76 ) -> anyhow::Result<Self> {
77 let chain = Arc::new(config.chain);
78 let cache = BlockchainCache::new(chain.clone());
79 let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
80 config.http_rpc_url.clone(),
81 config.rpc_requests_per_second,
82 ));
83 let wallet_address = validate_address(config.wallet_address.as_str())?;
84 let erc20_contract = Erc20Contract::new(http_rpc_client.clone(), true);
85
86 let mut token_universe = HashSet::new();
88 if let Some(specified_tokens) = config.tokens {
89 for token in specified_tokens {
90 let token_address = validate_address(token.as_str())?;
91 token_universe.insert(token_address);
92 }
93 }
94 let wallet_balance = WalletBalance::new(token_universe);
95
96 Ok(Self {
97 core: core_client,
98 connected: false,
99 wallet_balance,
100 chain,
101 cache,
102 erc20_contract,
103 http_rpc_client,
104 wallet_address,
105 })
106 }
107
108 async fn fetch_native_currency_balance(&self) -> anyhow::Result<Money> {
110 let balance_u256 = self
111 .http_rpc_client
112 .get_balance(&self.wallet_address, None)
113 .await?;
114
115 let native_currency = self.chain.native_currency();
116
117 let balance = Money::from_wei(balance_u256, native_currency);
119
120 Ok(balance)
121 }
122
123 async fn fetch_token_balance(
125 &mut self,
126 token_address: &Address,
127 ) -> anyhow::Result<TokenBalance> {
128 let token = if let Some(token) = self.cache.get_token(token_address) {
130 token.to_owned()
131 } else {
132 let token_info = self.erc20_contract.fetch_token_info(token_address).await?;
133 let token = Token::new(
134 self.chain.clone(),
135 *token_address,
136 token_info.name,
137 token_info.symbol,
138 token_info.decimals,
139 );
140 self.cache.add_token(token.clone()).await?;
141 token
142 };
143
144 let amount = self
145 .erc20_contract
146 .balance_of(token_address, &self.wallet_address)
147 .await?;
148 let token_balance = TokenBalance::new(amount, token);
149
150 Ok(token_balance)
154 }
155
156 async fn refresh_wallet_balances(&mut self) -> anyhow::Result<()> {
158 let native_currency_balance = self.fetch_native_currency_balance().await?;
159 tracing::info!(
160 "Initializing wallet balance with native currency balance: {} {}",
161 native_currency_balance.as_decimal(),
162 native_currency_balance.currency
163 );
164 self.wallet_balance
165 .set_native_currency_balance(native_currency_balance);
166
167 if !self.wallet_balance.is_token_universe_initialized() {
169 } else {
171 let tokens: Vec<Address> = self
172 .wallet_balance
173 .token_universe
174 .clone()
175 .into_iter()
176 .collect();
177 for token in tokens {
178 if let Ok(token_balance) = self.fetch_token_balance(&token).await {
179 tracing::info!("Adding token balance to the wallet: {}", token_balance);
180 self.wallet_balance.add_token_balance(token_balance);
181 }
182 }
183 }
184
185 Ok(())
186 }
187}
188
189#[async_trait(?Send)]
190impl ExecutionClient for BlockchainExecutionClient {
191 fn is_connected(&self) -> bool {
192 self.connected
193 }
194
195 fn client_id(&self) -> ClientId {
196 self.core.client_id
197 }
198
199 fn account_id(&self) -> AccountId {
200 self.core.account_id
201 }
202
203 fn venue(&self) -> Venue {
204 self.core.venue
205 }
206
207 fn oms_type(&self) -> OmsType {
208 self.core.oms_type
209 }
210
211 fn get_account(&self) -> Option<AccountAny> {
212 todo!("implement get_account")
213 }
214
215 fn generate_account_state(
216 &self,
217 _balances: Vec<AccountBalance>,
218 _margins: Vec<MarginBalance>,
219 _reported: bool,
220 _ts_event: UnixNanos,
221 ) -> anyhow::Result<()> {
222 todo!("implement generate_account_state")
223 }
224
225 fn start(&mut self) -> anyhow::Result<()> {
226 todo!("implement start")
227 }
228
229 fn stop(&mut self) -> anyhow::Result<()> {
230 todo!("implement stop")
231 }
232
233 fn submit_order(&self, _cmd: &SubmitOrder) -> anyhow::Result<()> {
234 todo!("implement submit_order")
235 }
236
237 fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
238 todo!("implement submit_order_list")
239 }
240
241 fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
242 todo!("implement modify_order")
243 }
244
245 fn cancel_order(&self, _cmd: &CancelOrder) -> anyhow::Result<()> {
246 todo!("implement cancel_order")
247 }
248
249 fn cancel_all_orders(&self, _cmd: &CancelAllOrders) -> anyhow::Result<()> {
250 todo!("implement cancel_all_orders")
251 }
252
253 fn batch_cancel_orders(&self, _cmd: &BatchCancelOrders) -> anyhow::Result<()> {
254 todo!("implement batch_cancel_orders")
255 }
256
257 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
258 todo!("implement query_account")
259 }
260
261 fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
262 todo!("implement query_order")
263 }
264
265 async fn connect(&mut self) -> anyhow::Result<()> {
266 if self.connected {
267 tracing::warn!("Blockchain execution client already connected");
268 return Ok(());
269 }
270
271 tracing::info!(
272 "Connecting to blockchain execution client on chain {}",
273 self.chain.name
274 );
275
276 self.refresh_wallet_balances().await?;
277
278 self.connected = true;
279 tracing::info!(
280 "Blockchain execution client connected on chain {}",
281 self.chain.name
282 );
283 Ok(())
284 }
285
286 async fn disconnect(&mut self) -> anyhow::Result<()> {
287 self.connected = false;
288 Ok(())
289 }
290}
291
292#[async_trait(?Send)]
293impl LiveExecutionClient for BlockchainExecutionClient {
294 async fn generate_order_status_report(
295 &self,
296 _cmd: &GenerateOrderStatusReport,
297 ) -> anyhow::Result<Option<OrderStatusReport>> {
298 todo!("implement generate_order_status_report")
299 }
300
301 async fn generate_order_status_reports(
302 &self,
303 _cmd: &GenerateOrderStatusReport,
304 ) -> anyhow::Result<Vec<OrderStatusReport>> {
305 todo!("implement generate_order_status_reports")
306 }
307
308 async fn generate_fill_reports(
309 &self,
310 _cmd: GenerateFillReports,
311 ) -> anyhow::Result<Vec<FillReport>> {
312 todo!("implement generate_fill_reports")
313 }
314
315 async fn generate_position_status_reports(
316 &self,
317 _cmd: &GeneratePositionReports,
318 ) -> anyhow::Result<Vec<PositionStatusReport>> {
319 todo!("implement generate_position_status_reports")
320 }
321
322 async fn generate_mass_status(
323 &self,
324 _lookback_mins: Option<u64>,
325 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
326 todo!("implement generate_mass_status")
327 }
328}