1use indexmap::IndexMap;
22use nautilus_core::UUID4;
23use nautilus_model::{
24 defi::Blockchain,
25 identifiers::{ClientId, InstrumentId},
26};
27
28use crate::{
29 actor::DataActorCore,
30 defi::{
31 DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks, SubscribePool,
32 SubscribePoolFeeCollects, SubscribePoolFlashEvents, SubscribePoolLiquidityUpdates,
33 SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool, UnsubscribePoolFeeCollects,
34 UnsubscribePoolFlashEvents, UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
35 switchboard::{
36 get_defi_blocks_topic, get_defi_collect_topic, get_defi_flash_topic,
37 get_defi_liquidity_topic, get_defi_pool_swaps_topic, get_defi_pool_topic,
38 },
39 },
40 messages::data::DataCommand,
41 msgbus::{MStr, Topic, handler::ShareableMessageHandler},
42};
43
44impl DataActorCore {
45 pub fn subscribe_blocks(
47 &mut self,
48 topic: MStr<Topic>,
49 handler: ShareableMessageHandler,
50 chain: Blockchain,
51 client_id: Option<ClientId>,
52 params: Option<IndexMap<String, String>>,
53 ) {
54 self.check_registered();
55
56 self.add_subscription(topic, handler);
57
58 let command = DefiSubscribeCommand::Blocks(SubscribeBlocks {
59 chain,
60 client_id,
61 command_id: UUID4::new(),
62 ts_init: self.timestamp_ns(),
63 params,
64 });
65
66 self.send_data_cmd(DataCommand::DefiSubscribe(command));
67 }
68
69 pub fn subscribe_pool(
71 &mut self,
72 topic: MStr<Topic>,
73 handler: ShareableMessageHandler,
74 instrument_id: InstrumentId,
75 client_id: Option<ClientId>,
76 params: Option<IndexMap<String, String>>,
77 ) {
78 self.check_registered();
79
80 self.add_subscription(topic, handler);
81
82 let command = DefiSubscribeCommand::Pool(SubscribePool {
83 instrument_id,
84 client_id,
85 command_id: UUID4::new(),
86 ts_init: self.timestamp_ns(),
87 params,
88 });
89
90 self.send_data_cmd(DataCommand::DefiSubscribe(command));
91 }
92
93 pub fn subscribe_pool_swaps(
95 &mut self,
96 topic: MStr<Topic>,
97 handler: ShareableMessageHandler,
98 instrument_id: InstrumentId,
99 client_id: Option<ClientId>,
100 params: Option<IndexMap<String, String>>,
101 ) {
102 self.check_registered();
103
104 self.add_subscription(topic, handler);
105
106 let command = DefiSubscribeCommand::PoolSwaps(SubscribePoolSwaps {
107 instrument_id,
108 client_id,
109 command_id: UUID4::new(),
110 ts_init: self.timestamp_ns(),
111 params,
112 });
113
114 self.send_data_cmd(DataCommand::DefiSubscribe(command));
115 }
116
117 pub fn subscribe_pool_liquidity_updates(
119 &mut self,
120 topic: MStr<Topic>,
121 handler: ShareableMessageHandler,
122 instrument_id: InstrumentId,
123 client_id: Option<ClientId>,
124 params: Option<IndexMap<String, String>>,
125 ) {
126 self.check_registered();
127
128 self.add_subscription(topic, handler);
129
130 let command = DefiSubscribeCommand::PoolLiquidityUpdates(SubscribePoolLiquidityUpdates {
131 instrument_id,
132 client_id,
133 command_id: UUID4::new(),
134 ts_init: self.timestamp_ns(),
135 params,
136 });
137
138 self.send_data_cmd(DataCommand::DefiSubscribe(command));
139 }
140
141 pub fn subscribe_pool_fee_collects(
143 &mut self,
144 topic: MStr<Topic>,
145 handler: ShareableMessageHandler,
146 instrument_id: InstrumentId,
147 client_id: Option<ClientId>,
148 params: Option<IndexMap<String, String>>,
149 ) {
150 self.check_registered();
151
152 self.add_subscription(topic, handler);
153
154 let command = DefiSubscribeCommand::PoolFeeCollects(SubscribePoolFeeCollects {
155 instrument_id,
156 client_id,
157 command_id: UUID4::new(),
158 ts_init: self.timestamp_ns(),
159 params,
160 });
161
162 self.send_data_cmd(DataCommand::DefiSubscribe(command));
163 }
164
165 pub fn subscribe_pool_flash_events(
167 &mut self,
168 topic: MStr<Topic>,
169 handler: ShareableMessageHandler,
170 instrument_id: InstrumentId,
171 client_id: Option<ClientId>,
172 params: Option<IndexMap<String, String>>,
173 ) {
174 self.check_registered();
175
176 self.add_subscription(topic, handler);
177
178 let command = DefiSubscribeCommand::PoolFlashEvents(SubscribePoolFlashEvents {
179 instrument_id,
180 client_id,
181 command_id: UUID4::new(),
182 ts_init: self.timestamp_ns(),
183 params,
184 });
185
186 self.send_data_cmd(DataCommand::DefiSubscribe(command));
187 }
188
189 pub fn unsubscribe_blocks(
191 &mut self,
192 chain: Blockchain,
193 client_id: Option<ClientId>,
194 params: Option<IndexMap<String, String>>,
195 ) {
196 self.check_registered();
197
198 let topic = get_defi_blocks_topic(chain);
199 self.remove_subscription(topic);
200
201 let command = DefiUnsubscribeCommand::Blocks(UnsubscribeBlocks {
202 chain,
203 client_id,
204 command_id: UUID4::new(),
205 ts_init: self.timestamp_ns(),
206 params,
207 });
208
209 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
210 }
211
212 pub fn unsubscribe_pool(
214 &mut self,
215 instrument_id: InstrumentId,
216 client_id: Option<ClientId>,
217 params: Option<IndexMap<String, String>>,
218 ) {
219 self.check_registered();
220
221 let topic = get_defi_pool_topic(instrument_id);
222 self.remove_subscription(topic);
223
224 let command = DefiUnsubscribeCommand::Pool(UnsubscribePool {
225 instrument_id,
226 client_id,
227 command_id: UUID4::new(),
228 ts_init: self.timestamp_ns(),
229 params,
230 });
231
232 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
233 }
234
235 pub fn unsubscribe_pool_swaps(
237 &mut self,
238 instrument_id: InstrumentId,
239 client_id: Option<ClientId>,
240 params: Option<IndexMap<String, String>>,
241 ) {
242 self.check_registered();
243
244 let topic = get_defi_pool_swaps_topic(instrument_id);
245 self.remove_subscription(topic);
246
247 let command = DefiUnsubscribeCommand::PoolSwaps(UnsubscribePoolSwaps {
248 instrument_id,
249 client_id,
250 command_id: UUID4::new(),
251 ts_init: self.timestamp_ns(),
252 params,
253 });
254
255 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
256 }
257
258 pub fn unsubscribe_pool_liquidity_updates(
260 &mut self,
261 instrument_id: InstrumentId,
262 client_id: Option<ClientId>,
263 params: Option<IndexMap<String, String>>,
264 ) {
265 self.check_registered();
266
267 let topic = get_defi_liquidity_topic(instrument_id);
268 self.remove_subscription(topic);
269
270 let command =
271 DefiUnsubscribeCommand::PoolLiquidityUpdates(UnsubscribePoolLiquidityUpdates {
272 instrument_id,
273 client_id,
274 command_id: UUID4::new(),
275 ts_init: self.timestamp_ns(),
276 params,
277 });
278
279 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
280 }
281
282 pub fn unsubscribe_pool_fee_collects(
284 &mut self,
285 instrument_id: InstrumentId,
286 client_id: Option<ClientId>,
287 params: Option<IndexMap<String, String>>,
288 ) {
289 self.check_registered();
290
291 let topic = get_defi_collect_topic(instrument_id);
292 self.remove_subscription(topic);
293
294 let command = DefiUnsubscribeCommand::PoolFeeCollects(UnsubscribePoolFeeCollects {
295 instrument_id,
296 client_id,
297 command_id: UUID4::new(),
298 ts_init: self.timestamp_ns(),
299 params,
300 });
301
302 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
303 }
304
305 pub fn unsubscribe_pool_flash_events(
307 &mut self,
308 instrument_id: InstrumentId,
309 client_id: Option<ClientId>,
310 params: Option<IndexMap<String, String>>,
311 ) {
312 self.check_registered();
313
314 let topic = get_defi_flash_topic(instrument_id);
315 self.remove_subscription(topic);
316
317 let command = DefiUnsubscribeCommand::PoolFlashEvents(UnsubscribePoolFlashEvents {
318 instrument_id,
319 client_id,
320 command_id: UUID4::new(),
321 ts_init: self.timestamp_ns(),
322 params,
323 });
324
325 self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
326 }
327}