nautilus_blockchain/execution/
client.rs1use std::{collections::HashSet, sync::Arc};
17
18use alloy::primitives::Address;
19use async_trait::async_trait;
20use nautilus_common::{
21 clients::ExecutionClient,
22 messages::execution::{
23 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
24 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
25 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
26 },
27};
28use nautilus_core::UnixNanos;
29use nautilus_live::ExecutionClientCore;
30use nautilus_model::{
31 accounts::AccountAny,
32 defi::{
33 SharedChain, Token,
34 validation::validate_address,
35 wallet::{TokenBalance, WalletBalance},
36 },
37 enums::OmsType,
38 identifiers::{AccountId, ClientId, Venue},
39 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
40 types::{AccountBalance, MarginBalance, Money},
41};
42
43use crate::{
44 cache::BlockchainCache, config::BlockchainExecutionClientConfig,
45 contracts::erc20::Erc20Contract, rpc::http::BlockchainHttpRpcClient,
46};
47
48#[derive(Debug)]
50pub struct BlockchainExecutionClient {
51 core: ExecutionClientCore,
53 cache: BlockchainCache,
55 chain: SharedChain,
57 wallet_address: Address,
59 wallet_balance: WalletBalance,
61 erc20_contract: Erc20Contract,
63 connected: bool,
65 http_rpc_client: Arc<BlockchainHttpRpcClient>,
67}
68
69impl BlockchainExecutionClient {
70 pub fn new(
76 core_client: ExecutionClientCore,
77 config: BlockchainExecutionClientConfig,
78 ) -> anyhow::Result<Self> {
79 let chain = Arc::new(config.chain);
80 let cache = BlockchainCache::new(chain.clone());
81 let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
82 config.http_rpc_url.clone(),
83 config.rpc_requests_per_second,
84 ));
85 let wallet_address = validate_address(config.wallet_address.as_str())?;
86 let erc20_contract = Erc20Contract::new(http_rpc_client.clone(), true);
87
88 let mut token_universe = HashSet::new();
90 if let Some(specified_tokens) = config.tokens {
91 for token in specified_tokens {
92 let token_address = validate_address(token.as_str())?;
93 token_universe.insert(token_address);
94 }
95 }
96 let wallet_balance = WalletBalance::new(token_universe);
97
98 Ok(Self {
99 core: core_client,
100 connected: false,
101 wallet_balance,
102 chain,
103 cache,
104 erc20_contract,
105 http_rpc_client,
106 wallet_address,
107 })
108 }
109
110 async fn fetch_native_currency_balance(&self) -> anyhow::Result<Money> {
112 let balance_u256 = self
113 .http_rpc_client
114 .get_balance(&self.wallet_address, None)
115 .await?;
116
117 let native_currency = self.chain.native_currency();
118
119 let balance = Money::from_wei(balance_u256, native_currency);
121
122 Ok(balance)
123 }
124
125 async fn fetch_token_balance(
127 &mut self,
128 token_address: &Address,
129 ) -> anyhow::Result<TokenBalance> {
130 let token = if let Some(token) = self.cache.get_token(token_address) {
132 token.to_owned()
133 } else {
134 let token_info = self.erc20_contract.fetch_token_info(token_address).await?;
135 let token = Token::new(
136 self.chain.clone(),
137 *token_address,
138 token_info.name,
139 token_info.symbol,
140 token_info.decimals,
141 );
142 self.cache.add_token(token.clone()).await?;
143 token
144 };
145
146 let amount = self
147 .erc20_contract
148 .balance_of(token_address, &self.wallet_address)
149 .await?;
150 let token_balance = TokenBalance::new(amount, token);
151
152 Ok(token_balance)
156 }
157
158 async fn refresh_wallet_balances(&mut self) -> anyhow::Result<()> {
160 let native_currency_balance = self.fetch_native_currency_balance().await?;
161 log::info!(
162 "Initializing wallet balance with native currency balance: {} {}",
163 native_currency_balance.as_decimal(),
164 native_currency_balance.currency
165 );
166 self.wallet_balance
167 .set_native_currency_balance(native_currency_balance);
168
169 if self.wallet_balance.is_token_universe_initialized() {
171 let tokens: Vec<Address> = self
172 .wallet_balance
173 .token_universe
174 .clone()
175 .into_iter()
176 .collect();
177 for token in tokens {
178 if let Ok(token_balance) = self.fetch_token_balance(&token).await {
179 log::info!("Adding token balance to the wallet: {token_balance}");
180 self.wallet_balance.add_token_balance(token_balance);
181 }
182 }
183 } else {
184 }
186
187 Ok(())
188 }
189}
190
191#[async_trait(?Send)]
192impl ExecutionClient for BlockchainExecutionClient {
193 fn is_connected(&self) -> bool {
194 self.connected
195 }
196
197 fn client_id(&self) -> ClientId {
198 self.core.client_id
199 }
200
201 fn account_id(&self) -> AccountId {
202 self.core.account_id
203 }
204
205 fn venue(&self) -> Venue {
206 self.core.venue
207 }
208
209 fn oms_type(&self) -> OmsType {
210 self.core.oms_type
211 }
212
213 fn get_account(&self) -> Option<AccountAny> {
214 todo!("implement get_account")
215 }
216
217 fn generate_account_state(
218 &self,
219 _balances: Vec<AccountBalance>,
220 _margins: Vec<MarginBalance>,
221 _reported: bool,
222 _ts_event: UnixNanos,
223 ) -> anyhow::Result<()> {
224 todo!("implement generate_account_state")
225 }
226
227 fn start(&mut self) -> anyhow::Result<()> {
228 todo!("implement start")
229 }
230
231 fn stop(&mut self) -> anyhow::Result<()> {
232 todo!("implement stop")
233 }
234
235 fn submit_order(&self, _cmd: &SubmitOrder) -> anyhow::Result<()> {
236 todo!("implement submit_order")
237 }
238
239 fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
240 todo!("implement submit_order_list")
241 }
242
243 fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
244 todo!("implement modify_order")
245 }
246
247 fn cancel_order(&self, _cmd: &CancelOrder) -> anyhow::Result<()> {
248 todo!("implement cancel_order")
249 }
250
251 fn cancel_all_orders(&self, _cmd: &CancelAllOrders) -> anyhow::Result<()> {
252 todo!("implement cancel_all_orders")
253 }
254
255 fn batch_cancel_orders(&self, _cmd: &BatchCancelOrders) -> anyhow::Result<()> {
256 todo!("implement batch_cancel_orders")
257 }
258
259 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
260 todo!("implement query_account")
261 }
262
263 fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
264 todo!("implement query_order")
265 }
266
267 async fn connect(&mut self) -> anyhow::Result<()> {
268 if self.connected {
269 log::warn!("Blockchain execution client already connected");
270 return Ok(());
271 }
272
273 log::info!(
274 "Connecting to blockchain execution client on chain {}",
275 self.chain.name
276 );
277
278 self.refresh_wallet_balances().await?;
279
280 self.connected = true;
281 log::info!(
282 "Blockchain execution client connected on chain {}",
283 self.chain.name
284 );
285 Ok(())
286 }
287
288 async fn disconnect(&mut self) -> anyhow::Result<()> {
289 self.connected = false;
290 Ok(())
291 }
292
293 async fn generate_order_status_report(
294 &self,
295 _cmd: &GenerateOrderStatusReport,
296 ) -> anyhow::Result<Option<OrderStatusReport>> {
297 todo!("implement generate_order_status_report")
298 }
299
300 async fn generate_order_status_reports(
301 &self,
302 _cmd: &GenerateOrderStatusReports,
303 ) -> anyhow::Result<Vec<OrderStatusReport>> {
304 todo!("implement generate_order_status_reports")
305 }
306
307 async fn generate_fill_reports(
308 &self,
309 _cmd: GenerateFillReports,
310 ) -> anyhow::Result<Vec<FillReport>> {
311 todo!("implement generate_fill_reports")
312 }
313
314 async fn generate_position_status_reports(
315 &self,
316 _cmd: &GeneratePositionStatusReports,
317 ) -> anyhow::Result<Vec<PositionStatusReport>> {
318 todo!("implement generate_position_status_reports")
319 }
320
321 async fn generate_mass_status(
322 &self,
323 _lookback_mins: Option<u64>,
324 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
325 todo!("implement generate_mass_status")
326 }
327}