nautilus_data/defi/
client.rs1use nautilus_common::{
22 clients::log_command_error,
23 messages::defi::{
24 DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
25 SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
26 SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
27 UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents, UnsubscribePoolLiquidityUpdates,
28 UnsubscribePoolSwaps,
29 },
30};
31
32use crate::client::DataClientAdapter;
33
34impl DataClientAdapter {
35 #[inline]
36 pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) {
37 if let Err(e) = match cmd {
38 DefiSubscribeCommand::Blocks(cmd) => self.subscribe_blocks(cmd),
39 DefiSubscribeCommand::Pool(cmd) => self.subscribe_pool(cmd),
40 DefiSubscribeCommand::PoolSwaps(cmd) => self.subscribe_pool_swaps(cmd),
41 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
42 self.subscribe_pool_liquidity_updates(cmd)
43 }
44 DefiSubscribeCommand::PoolFeeCollects(cmd) => self.subscribe_pool_fee_collects(cmd),
45 DefiSubscribeCommand::PoolFlashEvents(cmd) => self.subscribe_pool_flash_events(cmd),
46 } {
47 log_command_error(&cmd, &e);
48 }
49 }
50
51 #[inline]
52 pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) {
53 if let Err(e) = match cmd {
54 DefiUnsubscribeCommand::Blocks(cmd) => self.unsubscribe_blocks(cmd),
55 DefiUnsubscribeCommand::Pool(cmd) => self.unsubscribe_pool(cmd),
56 DefiUnsubscribeCommand::PoolSwaps(cmd) => self.unsubscribe_pool_swaps(cmd),
57 DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
58 self.unsubscribe_pool_liquidity_updates(cmd)
59 }
60 DefiUnsubscribeCommand::PoolFeeCollects(cmd) => self.unsubscribe_pool_fee_collects(cmd),
61 DefiUnsubscribeCommand::PoolFlashEvents(cmd) => self.unsubscribe_pool_flash_events(cmd),
62 } {
63 log_command_error(&cmd, &e);
64 }
65 }
66
67 #[inline]
73 pub fn execute_defi_request(&self, cmd: &DefiRequestCommand) -> anyhow::Result<()> {
74 match cmd {
75 DefiRequestCommand::PoolSnapshot(cmd) => self.request_pool_snapshot(cmd),
76 }
77 }
78
79 fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
85 if !self.subscriptions_blocks.contains(&cmd.chain) {
86 self.subscriptions_blocks.insert(cmd.chain);
87 self.client.subscribe_blocks(cmd)?;
88 }
89 Ok(())
90 }
91
92 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
98 if self.subscriptions_blocks.contains(&cmd.chain) {
99 self.subscriptions_blocks.remove(&cmd.chain);
100 self.client.unsubscribe_blocks(cmd)?;
101 }
102 Ok(())
103 }
104
105 fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
111 if !self.subscriptions_pools.contains(&cmd.instrument_id) {
112 self.subscriptions_pools.insert(cmd.instrument_id);
113 self.client.subscribe_pool(cmd)?;
114 }
115 Ok(())
116 }
117
118 fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
124 if !self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
125 self.subscriptions_pool_swaps.insert(cmd.instrument_id);
126 self.client.subscribe_pool_swaps(cmd)?;
127 }
128 Ok(())
129 }
130
131 fn subscribe_pool_liquidity_updates(
137 &mut self,
138 cmd: &SubscribePoolLiquidityUpdates,
139 ) -> anyhow::Result<()> {
140 if !self
141 .subscriptions_pool_liquidity_updates
142 .contains(&cmd.instrument_id)
143 {
144 self.subscriptions_pool_liquidity_updates
145 .insert(cmd.instrument_id);
146 self.client.subscribe_pool_liquidity_updates(cmd)?;
147 }
148 Ok(())
149 }
150
151 fn subscribe_pool_fee_collects(
157 &mut self,
158 cmd: &SubscribePoolFeeCollects,
159 ) -> anyhow::Result<()> {
160 if !self
161 .subscriptions_pool_fee_collects
162 .contains(&cmd.instrument_id)
163 {
164 self.subscriptions_pool_fee_collects
165 .insert(cmd.instrument_id);
166 self.client.subscribe_pool_fee_collects(cmd)?;
167 }
168 Ok(())
169 }
170
171 fn subscribe_pool_flash_events(
177 &mut self,
178 cmd: &SubscribePoolFlashEvents,
179 ) -> anyhow::Result<()> {
180 if !self.subscriptions_pool_flash.contains(&cmd.instrument_id) {
181 self.subscriptions_pool_flash.insert(cmd.instrument_id);
182 self.client.subscribe_pool_flash_events(cmd)?;
183 }
184 Ok(())
185 }
186
187 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
193 if self.subscriptions_pools.contains(&cmd.instrument_id) {
194 self.subscriptions_pools.remove(&cmd.instrument_id);
195 self.client.unsubscribe_pool(cmd)?;
196 }
197 Ok(())
198 }
199
200 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
206 if self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
207 self.subscriptions_pool_swaps.remove(&cmd.instrument_id);
208 self.client.unsubscribe_pool_swaps(cmd)?;
209 }
210 Ok(())
211 }
212
213 fn unsubscribe_pool_liquidity_updates(
219 &mut self,
220 cmd: &UnsubscribePoolLiquidityUpdates,
221 ) -> anyhow::Result<()> {
222 if self
223 .subscriptions_pool_liquidity_updates
224 .contains(&cmd.instrument_id)
225 {
226 self.subscriptions_pool_liquidity_updates
227 .remove(&cmd.instrument_id);
228 self.client.unsubscribe_pool_liquidity_updates(cmd)?;
229 }
230 Ok(())
231 }
232
233 fn unsubscribe_pool_fee_collects(
239 &mut self,
240 cmd: &UnsubscribePoolFeeCollects,
241 ) -> anyhow::Result<()> {
242 if self
243 .subscriptions_pool_fee_collects
244 .contains(&cmd.instrument_id)
245 {
246 self.subscriptions_pool_fee_collects
247 .remove(&cmd.instrument_id);
248 self.client.unsubscribe_pool_fee_collects(cmd)?;
249 }
250 Ok(())
251 }
252
253 fn unsubscribe_pool_flash_events(
259 &mut self,
260 cmd: &UnsubscribePoolFlashEvents,
261 ) -> anyhow::Result<()> {
262 if self.subscriptions_pool_flash.contains(&cmd.instrument_id) {
263 self.subscriptions_pool_flash.remove(&cmd.instrument_id);
264 self.client.unsubscribe_pool_flash_events(cmd)?;
265 }
266 Ok(())
267 }
268
269 pub fn request_pool_snapshot(&self, req: &RequestPoolSnapshot) -> anyhow::Result<()> {
275 self.client.request_pool_snapshot(req)
276 }
277}