nautilus_common/defi/
data_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! DeFi-specific actor functionality.
17//!
18//! This module provides DeFi subscription and unsubscription helper methods
19//! for the `DataActorCore`. All code in this module requires the `defi` feature flag.
20
21use indexmap::IndexMap;
22use nautilus_core::UUID4;
23use nautilus_model::{
24    defi::Blockchain,
25    identifiers::{ClientId, InstrumentId},
26};
27
28use crate::{
29    actor::DataActorCore,
30    defi::{
31        DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks, SubscribePool,
32        SubscribePoolFeeCollects, SubscribePoolFlashEvents, SubscribePoolLiquidityUpdates,
33        SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool, UnsubscribePoolFeeCollects,
34        UnsubscribePoolFlashEvents, UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
35        switchboard::{
36            get_defi_blocks_topic, get_defi_collect_topic, get_defi_flash_topic,
37            get_defi_liquidity_topic, get_defi_pool_swaps_topic, get_defi_pool_topic,
38        },
39    },
40    messages::data::DataCommand,
41    msgbus::{MStr, Topic, handler::ShareableMessageHandler},
42};
43
44impl DataActorCore {
45    /// Helper method for registering block subscriptions from the trait.
46    pub fn subscribe_blocks(
47        &mut self,
48        topic: MStr<Topic>,
49        handler: ShareableMessageHandler,
50        chain: Blockchain,
51        client_id: Option<ClientId>,
52        params: Option<IndexMap<String, String>>,
53    ) {
54        self.check_registered();
55
56        self.add_subscription(topic, handler);
57
58        let command = DefiSubscribeCommand::Blocks(SubscribeBlocks {
59            chain,
60            client_id,
61            command_id: UUID4::new(),
62            ts_init: self.timestamp_ns(),
63            params,
64        });
65
66        self.send_data_cmd(DataCommand::DefiSubscribe(command));
67    }
68
69    /// Helper method for registering pool subscriptions from the trait.
70    pub fn subscribe_pool(
71        &mut self,
72        topic: MStr<Topic>,
73        handler: ShareableMessageHandler,
74        instrument_id: InstrumentId,
75        client_id: Option<ClientId>,
76        params: Option<IndexMap<String, String>>,
77    ) {
78        self.check_registered();
79
80        self.add_subscription(topic, handler);
81
82        let command = DefiSubscribeCommand::Pool(SubscribePool {
83            instrument_id,
84            client_id,
85            command_id: UUID4::new(),
86            ts_init: self.timestamp_ns(),
87            params,
88        });
89
90        self.send_data_cmd(DataCommand::DefiSubscribe(command));
91    }
92
93    /// Helper method for registering pool swap subscriptions from the trait.
94    pub fn subscribe_pool_swaps(
95        &mut self,
96        topic: MStr<Topic>,
97        handler: ShareableMessageHandler,
98        instrument_id: InstrumentId,
99        client_id: Option<ClientId>,
100        params: Option<IndexMap<String, String>>,
101    ) {
102        self.check_registered();
103
104        self.add_subscription(topic, handler);
105
106        let command = DefiSubscribeCommand::PoolSwaps(SubscribePoolSwaps {
107            instrument_id,
108            client_id,
109            command_id: UUID4::new(),
110            ts_init: self.timestamp_ns(),
111            params,
112        });
113
114        self.send_data_cmd(DataCommand::DefiSubscribe(command));
115    }
116
117    /// Helper method for registering pool liquidity update subscriptions from the trait.
118    pub fn subscribe_pool_liquidity_updates(
119        &mut self,
120        topic: MStr<Topic>,
121        handler: ShareableMessageHandler,
122        instrument_id: InstrumentId,
123        client_id: Option<ClientId>,
124        params: Option<IndexMap<String, String>>,
125    ) {
126        self.check_registered();
127
128        self.add_subscription(topic, handler);
129
130        let command = DefiSubscribeCommand::PoolLiquidityUpdates(SubscribePoolLiquidityUpdates {
131            instrument_id,
132            client_id,
133            command_id: UUID4::new(),
134            ts_init: self.timestamp_ns(),
135            params,
136        });
137
138        self.send_data_cmd(DataCommand::DefiSubscribe(command));
139    }
140
141    /// Helper method for registering pool fee collect subscriptions from the trait.
142    pub fn subscribe_pool_fee_collects(
143        &mut self,
144        topic: MStr<Topic>,
145        handler: ShareableMessageHandler,
146        instrument_id: InstrumentId,
147        client_id: Option<ClientId>,
148        params: Option<IndexMap<String, String>>,
149    ) {
150        self.check_registered();
151
152        self.add_subscription(topic, handler);
153
154        let command = DefiSubscribeCommand::PoolFeeCollects(SubscribePoolFeeCollects {
155            instrument_id,
156            client_id,
157            command_id: UUID4::new(),
158            ts_init: self.timestamp_ns(),
159            params,
160        });
161
162        self.send_data_cmd(DataCommand::DefiSubscribe(command));
163    }
164
165    /// Helper method for registering pool flash event subscriptions from the trait.
166    pub fn subscribe_pool_flash_events(
167        &mut self,
168        topic: MStr<Topic>,
169        handler: ShareableMessageHandler,
170        instrument_id: InstrumentId,
171        client_id: Option<ClientId>,
172        params: Option<IndexMap<String, String>>,
173    ) {
174        self.check_registered();
175
176        self.add_subscription(topic, handler);
177
178        let command = DefiSubscribeCommand::PoolFlashEvents(SubscribePoolFlashEvents {
179            instrument_id,
180            client_id,
181            command_id: UUID4::new(),
182            ts_init: self.timestamp_ns(),
183            params,
184        });
185
186        self.send_data_cmd(DataCommand::DefiSubscribe(command));
187    }
188
189    /// Helper method for unsubscribing from blocks.
190    pub fn unsubscribe_blocks(
191        &mut self,
192        chain: Blockchain,
193        client_id: Option<ClientId>,
194        params: Option<IndexMap<String, String>>,
195    ) {
196        self.check_registered();
197
198        let topic = get_defi_blocks_topic(chain);
199        self.remove_subscription(topic);
200
201        let command = DefiUnsubscribeCommand::Blocks(UnsubscribeBlocks {
202            chain,
203            client_id,
204            command_id: UUID4::new(),
205            ts_init: self.timestamp_ns(),
206            params,
207        });
208
209        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
210    }
211
212    /// Helper method for unsubscribing from pool definition updates.
213    pub fn unsubscribe_pool(
214        &mut self,
215        instrument_id: InstrumentId,
216        client_id: Option<ClientId>,
217        params: Option<IndexMap<String, String>>,
218    ) {
219        self.check_registered();
220
221        let topic = get_defi_pool_topic(instrument_id);
222        self.remove_subscription(topic);
223
224        let command = DefiUnsubscribeCommand::Pool(UnsubscribePool {
225            instrument_id,
226            client_id,
227            command_id: UUID4::new(),
228            ts_init: self.timestamp_ns(),
229            params,
230        });
231
232        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
233    }
234
235    /// Helper method for unsubscribing from pool swaps.
236    pub fn unsubscribe_pool_swaps(
237        &mut self,
238        instrument_id: InstrumentId,
239        client_id: Option<ClientId>,
240        params: Option<IndexMap<String, String>>,
241    ) {
242        self.check_registered();
243
244        let topic = get_defi_pool_swaps_topic(instrument_id);
245        self.remove_subscription(topic);
246
247        let command = DefiUnsubscribeCommand::PoolSwaps(UnsubscribePoolSwaps {
248            instrument_id,
249            client_id,
250            command_id: UUID4::new(),
251            ts_init: self.timestamp_ns(),
252            params,
253        });
254
255        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
256    }
257
258    /// Helper method for unsubscribing from pool liquidity updates.
259    pub fn unsubscribe_pool_liquidity_updates(
260        &mut self,
261        instrument_id: InstrumentId,
262        client_id: Option<ClientId>,
263        params: Option<IndexMap<String, String>>,
264    ) {
265        self.check_registered();
266
267        let topic = get_defi_liquidity_topic(instrument_id);
268        self.remove_subscription(topic);
269
270        let command =
271            DefiUnsubscribeCommand::PoolLiquidityUpdates(UnsubscribePoolLiquidityUpdates {
272                instrument_id,
273                client_id,
274                command_id: UUID4::new(),
275                ts_init: self.timestamp_ns(),
276                params,
277            });
278
279        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
280    }
281
282    /// Helper method for unsubscribing from pool fee collects.
283    pub fn unsubscribe_pool_fee_collects(
284        &mut self,
285        instrument_id: InstrumentId,
286        client_id: Option<ClientId>,
287        params: Option<IndexMap<String, String>>,
288    ) {
289        self.check_registered();
290
291        let topic = get_defi_collect_topic(instrument_id);
292        self.remove_subscription(topic);
293
294        let command = DefiUnsubscribeCommand::PoolFeeCollects(UnsubscribePoolFeeCollects {
295            instrument_id,
296            client_id,
297            command_id: UUID4::new(),
298            ts_init: self.timestamp_ns(),
299            params,
300        });
301
302        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
303    }
304
305    /// Helper method for unsubscribing from pool flash events.
306    pub fn unsubscribe_pool_flash_events(
307        &mut self,
308        instrument_id: InstrumentId,
309        client_id: Option<ClientId>,
310        params: Option<IndexMap<String, String>>,
311    ) {
312        self.check_registered();
313
314        let topic = get_defi_flash_topic(instrument_id);
315        self.remove_subscription(topic);
316
317        let command = DefiUnsubscribeCommand::PoolFlashEvents(UnsubscribePoolFlashEvents {
318            instrument_id,
319            client_id,
320            command_id: UUID4::new(),
321            ts_init: self.timestamp_ns(),
322            params,
323        });
324
325        self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
326    }
327}