nautilus_blockchain/execution/
client.rs1use std::{collections::HashSet, sync::Arc};
17
18use alloy::primitives::Address;
19use async_trait::async_trait;
20use nautilus_common::messages::execution::{
21 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
22 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
23 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
24};
25use nautilus_core::UnixNanos;
26use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
27use nautilus_model::{
28 accounts::AccountAny,
29 defi::{
30 SharedChain, Token,
31 validation::validate_address,
32 wallet::{TokenBalance, WalletBalance},
33 },
34 enums::OmsType,
35 identifiers::{AccountId, ClientId, Venue},
36 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
37 types::{AccountBalance, MarginBalance, Money},
38};
39
40use crate::{
41 cache::BlockchainCache, config::BlockchainExecutionClientConfig,
42 contracts::erc20::Erc20Contract, rpc::http::BlockchainHttpRpcClient,
43};
44
45#[derive(Debug)]
47pub struct BlockchainExecutionClient {
48 core: ExecutionClientCore,
50 cache: BlockchainCache,
52 chain: SharedChain,
54 wallet_address: Address,
56 wallet_balance: WalletBalance,
58 erc20_contract: Erc20Contract,
60 connected: bool,
62 http_rpc_client: Arc<BlockchainHttpRpcClient>,
64}
65
66impl BlockchainExecutionClient {
67 pub fn new(
73 core_client: ExecutionClientCore,
74 config: BlockchainExecutionClientConfig,
75 ) -> anyhow::Result<Self> {
76 let chain = Arc::new(config.chain);
77 let cache = BlockchainCache::new(chain.clone());
78 let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
79 config.http_rpc_url.clone(),
80 config.rpc_requests_per_second,
81 ));
82 let wallet_address = validate_address(config.wallet_address.as_str())?;
83 let erc20_contract = Erc20Contract::new(http_rpc_client.clone(), true);
84
85 let mut token_universe = HashSet::new();
87 if let Some(specified_tokens) = config.tokens {
88 for token in specified_tokens {
89 let token_address = validate_address(token.as_str())?;
90 token_universe.insert(token_address);
91 }
92 }
93 let wallet_balance = WalletBalance::new(token_universe);
94
95 Ok(Self {
96 core: core_client,
97 connected: false,
98 wallet_balance,
99 chain,
100 cache,
101 erc20_contract,
102 http_rpc_client,
103 wallet_address,
104 })
105 }
106
107 async fn fetch_native_currency_balance(&self) -> anyhow::Result<Money> {
109 let balance_u256 = self
110 .http_rpc_client
111 .get_balance(&self.wallet_address, None)
112 .await?;
113
114 let native_currency = self.chain.native_currency();
115
116 let balance = Money::from_wei(balance_u256, native_currency);
118
119 Ok(balance)
120 }
121
122 async fn fetch_token_balance(
124 &mut self,
125 token_address: &Address,
126 ) -> anyhow::Result<TokenBalance> {
127 let token = if let Some(token) = self.cache.get_token(token_address) {
129 token.to_owned()
130 } else {
131 let token_info = self.erc20_contract.fetch_token_info(token_address).await?;
132 let token = Token::new(
133 self.chain.clone(),
134 *token_address,
135 token_info.name,
136 token_info.symbol,
137 token_info.decimals,
138 );
139 self.cache.add_token(token.clone()).await?;
140 token
141 };
142
143 let amount = self
144 .erc20_contract
145 .balance_of(token_address, &self.wallet_address)
146 .await?;
147 let token_balance = TokenBalance::new(amount, token);
148
149 Ok(token_balance)
153 }
154
155 async fn refresh_wallet_balances(&mut self) -> anyhow::Result<()> {
157 let native_currency_balance = self.fetch_native_currency_balance().await?;
158 tracing::info!(
159 "Initializing wallet balance with native currency balance: {} {}",
160 native_currency_balance.as_decimal(),
161 native_currency_balance.currency
162 );
163 self.wallet_balance
164 .set_native_currency_balance(native_currency_balance);
165
166 if !self.wallet_balance.is_token_universe_initialized() {
168 } else {
170 let tokens: Vec<Address> = self
171 .wallet_balance
172 .token_universe
173 .clone()
174 .into_iter()
175 .collect();
176 for token in tokens {
177 if let Ok(token_balance) = self.fetch_token_balance(&token).await {
178 tracing::info!("Adding token balance to the wallet: {}", token_balance);
179 self.wallet_balance.add_token_balance(token_balance);
180 }
181 }
182 }
183
184 Ok(())
185 }
186}
187
188#[async_trait(?Send)]
189impl ExecutionClient for BlockchainExecutionClient {
190 fn is_connected(&self) -> bool {
191 self.connected
192 }
193
194 fn client_id(&self) -> ClientId {
195 self.core.client_id
196 }
197
198 fn account_id(&self) -> AccountId {
199 self.core.account_id
200 }
201
202 fn venue(&self) -> Venue {
203 self.core.venue
204 }
205
206 fn oms_type(&self) -> OmsType {
207 self.core.oms_type
208 }
209
210 fn get_account(&self) -> Option<AccountAny> {
211 todo!("implement get_account")
212 }
213
214 fn generate_account_state(
215 &self,
216 _balances: Vec<AccountBalance>,
217 _margins: Vec<MarginBalance>,
218 _reported: bool,
219 _ts_event: UnixNanos,
220 ) -> anyhow::Result<()> {
221 todo!("implement generate_account_state")
222 }
223
224 fn start(&mut self) -> anyhow::Result<()> {
225 todo!("implement start")
226 }
227
228 fn stop(&mut self) -> anyhow::Result<()> {
229 todo!("implement stop")
230 }
231
232 fn submit_order(&self, _cmd: &SubmitOrder) -> anyhow::Result<()> {
233 todo!("implement submit_order")
234 }
235
236 fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
237 todo!("implement submit_order_list")
238 }
239
240 fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
241 todo!("implement modify_order")
242 }
243
244 fn cancel_order(&self, _cmd: &CancelOrder) -> anyhow::Result<()> {
245 todo!("implement cancel_order")
246 }
247
248 fn cancel_all_orders(&self, _cmd: &CancelAllOrders) -> anyhow::Result<()> {
249 todo!("implement cancel_all_orders")
250 }
251
252 fn batch_cancel_orders(&self, _cmd: &BatchCancelOrders) -> anyhow::Result<()> {
253 todo!("implement batch_cancel_orders")
254 }
255
256 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
257 todo!("implement query_account")
258 }
259
260 fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
261 todo!("implement query_order")
262 }
263
264 async fn connect(&mut self) -> anyhow::Result<()> {
265 if self.connected {
266 tracing::warn!("Blockchain execution client already connected");
267 return Ok(());
268 }
269
270 tracing::info!(
271 "Connecting to blockchain execution client on chain {}",
272 self.chain.name
273 );
274
275 self.refresh_wallet_balances().await?;
276
277 self.connected = true;
278 tracing::info!(
279 "Blockchain execution client connected on chain {}",
280 self.chain.name
281 );
282 Ok(())
283 }
284
285 async fn disconnect(&mut self) -> anyhow::Result<()> {
286 self.connected = false;
287 Ok(())
288 }
289
290 async fn generate_order_status_report(
291 &self,
292 _cmd: &GenerateOrderStatusReport,
293 ) -> anyhow::Result<Option<OrderStatusReport>> {
294 todo!("implement generate_order_status_report")
295 }
296
297 async fn generate_order_status_reports(
298 &self,
299 _cmd: &GenerateOrderStatusReports,
300 ) -> anyhow::Result<Vec<OrderStatusReport>> {
301 todo!("implement generate_order_status_reports")
302 }
303
304 async fn generate_fill_reports(
305 &self,
306 _cmd: GenerateFillReports,
307 ) -> anyhow::Result<Vec<FillReport>> {
308 todo!("implement generate_fill_reports")
309 }
310
311 async fn generate_position_status_reports(
312 &self,
313 _cmd: &GeneratePositionStatusReports,
314 ) -> anyhow::Result<Vec<PositionStatusReport>> {
315 todo!("implement generate_position_status_reports")
316 }
317
318 async fn generate_mass_status(
319 &self,
320 _lookback_mins: Option<u64>,
321 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
322 todo!("implement generate_mass_status")
323 }
324}