nautilus_data/defi/
client.rs1use std::fmt::{Debug, Display};
22
23use nautilus_common::messages::defi::{
24 DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
25 SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
26 SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
27 UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents, UnsubscribePoolLiquidityUpdates,
28 UnsubscribePoolSwaps,
29};
30
31use crate::client::DataClientAdapter;
32
33impl DataClientAdapter {
34 #[inline]
35 pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) {
36 if let Err(e) = match cmd {
37 DefiSubscribeCommand::Blocks(cmd) => self.subscribe_blocks(cmd),
38 DefiSubscribeCommand::Pool(cmd) => self.subscribe_pool(cmd),
39 DefiSubscribeCommand::PoolSwaps(cmd) => self.subscribe_pool_swaps(cmd),
40 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
41 self.subscribe_pool_liquidity_updates(cmd)
42 }
43 DefiSubscribeCommand::PoolFeeCollects(cmd) => self.subscribe_pool_fee_collects(cmd),
44 DefiSubscribeCommand::PoolFlashEvents(cmd) => self.subscribe_pool_flash_events(cmd),
45 } {
46 log_command_error(&cmd, &e);
47 }
48 }
49
50 #[inline]
51 pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) {
52 if let Err(e) = match cmd {
53 DefiUnsubscribeCommand::Blocks(cmd) => self.unsubscribe_blocks(cmd),
54 DefiUnsubscribeCommand::Pool(cmd) => self.unsubscribe_pool(cmd),
55 DefiUnsubscribeCommand::PoolSwaps(cmd) => self.unsubscribe_pool_swaps(cmd),
56 DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
57 self.unsubscribe_pool_liquidity_updates(cmd)
58 }
59 DefiUnsubscribeCommand::PoolFeeCollects(cmd) => self.unsubscribe_pool_fee_collects(cmd),
60 DefiUnsubscribeCommand::PoolFlashEvents(cmd) => self.unsubscribe_pool_flash_events(cmd),
61 } {
62 log_command_error(&cmd, &e);
63 }
64 }
65
66 #[inline]
72 pub fn execute_defi_request(&self, cmd: &DefiRequestCommand) -> anyhow::Result<()> {
73 match cmd {
74 DefiRequestCommand::PoolSnapshot(cmd) => self.request_pool_snapshot(cmd),
75 }
76 }
77
78 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
84 if !self.subscriptions_blocks.contains(&cmd.chain) {
85 self.subscriptions_blocks.insert(cmd.chain);
86 self.client.subscribe_blocks(cmd)?;
87 }
88 Ok(())
89 }
90
91 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
97 if self.subscriptions_blocks.contains(&cmd.chain) {
98 self.subscriptions_blocks.remove(&cmd.chain);
99 self.client.unsubscribe_blocks(cmd)?;
100 }
101 Ok(())
102 }
103
104 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
110 if !self.subscriptions_pools.contains(&cmd.instrument_id) {
111 self.subscriptions_pools.insert(cmd.instrument_id);
112 self.client.subscribe_pool(cmd)?;
113 }
114 Ok(())
115 }
116
117 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
123 if !self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
124 self.subscriptions_pool_swaps.insert(cmd.instrument_id);
125 self.client.subscribe_pool_swaps(cmd)?;
126 }
127 Ok(())
128 }
129
130 fn subscribe_pool_liquidity_updates(
136 &mut self,
137 cmd: &SubscribePoolLiquidityUpdates,
138 ) -> anyhow::Result<()> {
139 if !self
140 .subscriptions_pool_liquidity_updates
141 .contains(&cmd.instrument_id)
142 {
143 self.subscriptions_pool_liquidity_updates
144 .insert(cmd.instrument_id);
145 self.client.subscribe_pool_liquidity_updates(cmd)?;
146 }
147 Ok(())
148 }
149
150 fn subscribe_pool_fee_collects(
156 &mut self,
157 cmd: &SubscribePoolFeeCollects,
158 ) -> anyhow::Result<()> {
159 if !self
160 .subscriptions_pool_fee_collects
161 .contains(&cmd.instrument_id)
162 {
163 self.subscriptions_pool_fee_collects
164 .insert(cmd.instrument_id);
165 self.client.subscribe_pool_fee_collects(cmd)?;
166 }
167 Ok(())
168 }
169
170 fn subscribe_pool_flash_events(
176 &mut self,
177 cmd: &SubscribePoolFlashEvents,
178 ) -> anyhow::Result<()> {
179 if !self.subscriptions_pool_flash.contains(&cmd.instrument_id) {
180 self.subscriptions_pool_flash.insert(cmd.instrument_id);
181 self.client.subscribe_pool_flash_events(cmd)?;
182 }
183 Ok(())
184 }
185
186 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
192 if self.subscriptions_pools.contains(&cmd.instrument_id) {
193 self.subscriptions_pools.remove(&cmd.instrument_id);
194 self.client.unsubscribe_pool(cmd)?;
195 }
196 Ok(())
197 }
198
199 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
205 if self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
206 self.subscriptions_pool_swaps.remove(&cmd.instrument_id);
207 self.client.unsubscribe_pool_swaps(cmd)?;
208 }
209 Ok(())
210 }
211
212 fn unsubscribe_pool_liquidity_updates(
218 &mut self,
219 cmd: &UnsubscribePoolLiquidityUpdates,
220 ) -> anyhow::Result<()> {
221 if self
222 .subscriptions_pool_liquidity_updates
223 .contains(&cmd.instrument_id)
224 {
225 self.subscriptions_pool_liquidity_updates
226 .remove(&cmd.instrument_id);
227 self.client.unsubscribe_pool_liquidity_updates(cmd)?;
228 }
229 Ok(())
230 }
231
232 fn unsubscribe_pool_fee_collects(
238 &mut self,
239 cmd: &UnsubscribePoolFeeCollects,
240 ) -> anyhow::Result<()> {
241 if self
242 .subscriptions_pool_fee_collects
243 .contains(&cmd.instrument_id)
244 {
245 self.subscriptions_pool_fee_collects
246 .remove(&cmd.instrument_id);
247 self.client.unsubscribe_pool_fee_collects(cmd)?;
248 }
249 Ok(())
250 }
251
252 fn unsubscribe_pool_flash_events(
258 &mut self,
259 cmd: &UnsubscribePoolFlashEvents,
260 ) -> anyhow::Result<()> {
261 if self.subscriptions_pool_flash.contains(&cmd.instrument_id) {
262 self.subscriptions_pool_flash.remove(&cmd.instrument_id);
263 self.client.unsubscribe_pool_flash_events(cmd)?;
264 }
265 Ok(())
266 }
267
268 pub fn request_pool_snapshot(&self, req: &RequestPoolSnapshot) -> anyhow::Result<()> {
274 self.client.request_pool_snapshot(req)
275 }
276}
277
278#[inline(always)]
279fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
280 log::error!("Error on {cmd:?}: {e}");
281}