Skip to main content

nautilus_live/
emitter.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution event emitter for async event dispatch.
17//!
18//! This module provides [`ExecutionEventEmitter`], which combines event generation (via
19//! [`OrderEventFactory`]) with async dispatch. Adapters use the `emit_*` convenience
20//! methods to generate and send events in a single call.
21//!
22//! # Architecture
23//!
24//! ```text
25//! Adapter
26//! ├── core: ExecutionClientCore    (identity + connection state)
27//! └── emitter: ExecutionEventEmitter   (event generation + async dispatch)
28//!     ├── factory: OrderEventFactory
29//!     └── sender: Option<Sender>   (set in start())
30//! ```
31
32use nautilus_common::{
33    factories::OrderEventFactory,
34    messages::{ExecutionEvent, ExecutionReport},
35};
36use nautilus_core::{UUID4, UnixNanos, time::AtomicTime};
37use nautilus_model::{
38    enums::{AccountType, LiquiditySide},
39    events::{
40        AccountState, OrderCancelRejected, OrderEventAny, OrderModifyRejected, OrderRejected,
41    },
42    identifiers::{
43        AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, TraderId,
44        VenueOrderId,
45    },
46    orders::OrderAny,
47    reports::{FillReport, OrderStatusReport, PositionStatusReport},
48    types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
49};
50
51/// Event emitter for live trading - combines event generation with async dispatch.
52///
53/// This struct wraps an [`OrderEventFactory`] for event construction and an unbounded
54/// channel sender for async dispatch. It provides `emit_*` convenience methods that
55/// generate and send events in a single call.
56///
57/// The sender is set during the adapter's `start()` phase via [`set_sender`](Self::set_sender).
58#[derive(Debug, Clone)]
59pub struct ExecutionEventEmitter {
60    clock: &'static AtomicTime,
61    factory: OrderEventFactory,
62    sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
63}
64
65impl ExecutionEventEmitter {
66    /// Creates a new [`ExecutionEventEmitter`] with no sender.
67    ///
68    /// Call [`set_sender`](Self::set_sender) in the adapter's `start()` method.
69    #[must_use]
70    pub fn new(
71        clock: &'static AtomicTime,
72        trader_id: TraderId,
73        account_id: AccountId,
74        account_type: AccountType,
75        base_currency: Option<Currency>,
76    ) -> Self {
77        Self {
78            clock,
79            factory: OrderEventFactory::new(trader_id, account_id, account_type, base_currency),
80            sender: None,
81        }
82    }
83
84    fn ts_init(&self) -> UnixNanos {
85        self.clock.get_time_ns()
86    }
87
88    /// Sets the sender. Call in adapter's `start()`.
89    pub fn set_sender(&mut self, sender: tokio::sync::mpsc::UnboundedSender<ExecutionEvent>) {
90        self.sender = Some(sender);
91    }
92
93    /// Returns true if the sender is initialized.
94    #[must_use]
95    pub fn is_initialized(&self) -> bool {
96        self.sender.is_some()
97    }
98
99    /// Returns the trader ID.
100    #[must_use]
101    pub fn trader_id(&self) -> TraderId {
102        self.factory.trader_id()
103    }
104
105    /// Returns the account ID.
106    #[must_use]
107    pub fn account_id(&self) -> AccountId {
108        self.factory.account_id()
109    }
110
111    /// Generates and emits an account state event.
112    pub fn emit_account_state(
113        &self,
114        balances: Vec<AccountBalance>,
115        margins: Vec<MarginBalance>,
116        reported: bool,
117        ts_event: UnixNanos,
118    ) {
119        let state = self.factory.generate_account_state(
120            balances,
121            margins,
122            reported,
123            ts_event,
124            self.ts_init(),
125        );
126        self.send_account_state(state);
127    }
128
129    /// Generates and emits an order denied event.
130    pub fn emit_order_denied(&self, order: &OrderAny, reason: &str) {
131        let event = self
132            .factory
133            .generate_order_denied(order, reason, self.ts_init());
134        self.send_order_event(event);
135    }
136
137    /// Generates and emits an order submitted event.
138    pub fn emit_order_submitted(&self, order: &OrderAny) {
139        let event = self.factory.generate_order_submitted(order, self.ts_init());
140        self.send_order_event(event);
141    }
142
143    /// Generates and emits an order rejected event.
144    pub fn emit_order_rejected(
145        &self,
146        order: &OrderAny,
147        reason: &str,
148        ts_event: UnixNanos,
149        due_post_only: bool,
150    ) {
151        let event = self.factory.generate_order_rejected(
152            order,
153            reason,
154            ts_event,
155            self.ts_init(),
156            due_post_only,
157        );
158        self.send_order_event(event);
159    }
160
161    /// Generates and emits an order accepted event.
162    pub fn emit_order_accepted(
163        &self,
164        order: &OrderAny,
165        venue_order_id: VenueOrderId,
166        ts_event: UnixNanos,
167    ) {
168        let event =
169            self.factory
170                .generate_order_accepted(order, venue_order_id, ts_event, self.ts_init());
171        self.send_order_event(event);
172    }
173
174    /// Generates and emits an order modify rejected event.
175    pub fn emit_order_modify_rejected(
176        &self,
177        order: &OrderAny,
178        venue_order_id: Option<VenueOrderId>,
179        reason: &str,
180        ts_event: UnixNanos,
181    ) {
182        let event = self.factory.generate_order_modify_rejected(
183            order,
184            venue_order_id,
185            reason,
186            ts_event,
187            self.ts_init(),
188        );
189        self.send_order_event(event);
190    }
191
192    /// Generates and emits an order cancel rejected event.
193    pub fn emit_order_cancel_rejected(
194        &self,
195        order: &OrderAny,
196        venue_order_id: Option<VenueOrderId>,
197        reason: &str,
198        ts_event: UnixNanos,
199    ) {
200        let event = self.factory.generate_order_cancel_rejected(
201            order,
202            venue_order_id,
203            reason,
204            ts_event,
205            self.ts_init(),
206        );
207        self.send_order_event(event);
208    }
209
210    /// Generates and emits an order updated event.
211    #[allow(clippy::too_many_arguments)]
212    pub fn emit_order_updated(
213        &self,
214        order: &OrderAny,
215        venue_order_id: VenueOrderId,
216        quantity: Quantity,
217        price: Option<Price>,
218        trigger_price: Option<Price>,
219        protection_price: Option<Price>,
220        ts_event: UnixNanos,
221    ) {
222        let event = self.factory.generate_order_updated(
223            order,
224            venue_order_id,
225            quantity,
226            price,
227            trigger_price,
228            protection_price,
229            ts_event,
230            self.ts_init(),
231        );
232        self.send_order_event(event);
233    }
234
235    /// Generates and emits an order canceled event.
236    pub fn emit_order_canceled(
237        &self,
238        order: &OrderAny,
239        venue_order_id: Option<VenueOrderId>,
240        ts_event: UnixNanos,
241    ) {
242        let event =
243            self.factory
244                .generate_order_canceled(order, venue_order_id, ts_event, self.ts_init());
245        self.send_order_event(event);
246    }
247
248    /// Generates and emits an order triggered event.
249    pub fn emit_order_triggered(
250        &self,
251        order: &OrderAny,
252        venue_order_id: Option<VenueOrderId>,
253        ts_event: UnixNanos,
254    ) {
255        let event =
256            self.factory
257                .generate_order_triggered(order, venue_order_id, ts_event, self.ts_init());
258        self.send_order_event(event);
259    }
260
261    /// Generates and emits an order expired event.
262    pub fn emit_order_expired(
263        &self,
264        order: &OrderAny,
265        venue_order_id: Option<VenueOrderId>,
266        ts_event: UnixNanos,
267    ) {
268        let event =
269            self.factory
270                .generate_order_expired(order, venue_order_id, ts_event, self.ts_init());
271        self.send_order_event(event);
272    }
273
274    /// Generates and emits an order filled event.
275    #[allow(clippy::too_many_arguments)]
276    pub fn emit_order_filled(
277        &self,
278        order: &OrderAny,
279        venue_order_id: VenueOrderId,
280        venue_position_id: Option<PositionId>,
281        trade_id: TradeId,
282        last_qty: Quantity,
283        last_px: Price,
284        quote_currency: Currency,
285        commission: Option<Money>,
286        liquidity_side: LiquiditySide,
287        ts_event: UnixNanos,
288    ) {
289        let event = self.factory.generate_order_filled(
290            order,
291            venue_order_id,
292            venue_position_id,
293            trade_id,
294            last_qty,
295            last_px,
296            quote_currency,
297            commission,
298            liquidity_side,
299            ts_event,
300            self.ts_init(),
301        );
302        self.send_order_event(event);
303    }
304
305    /// Constructs and emits an order rejected event from raw fields.
306    #[allow(clippy::too_many_arguments)]
307    pub fn emit_order_rejected_event(
308        &self,
309        strategy_id: StrategyId,
310        instrument_id: InstrumentId,
311        client_order_id: ClientOrderId,
312        reason: &str,
313        ts_event: UnixNanos,
314        due_post_only: bool,
315    ) {
316        let event = OrderRejected::new(
317            self.factory.trader_id(),
318            strategy_id,
319            instrument_id,
320            client_order_id,
321            self.factory.account_id(),
322            reason.into(),
323            UUID4::new(),
324            ts_event,
325            self.ts_init(),
326            false,
327            due_post_only,
328        );
329        self.send_order_event(OrderEventAny::Rejected(event));
330    }
331
332    /// Constructs and emits an order modify rejected event from raw fields.
333    #[allow(clippy::too_many_arguments)]
334    pub fn emit_order_modify_rejected_event(
335        &self,
336        strategy_id: StrategyId,
337        instrument_id: InstrumentId,
338        client_order_id: ClientOrderId,
339        venue_order_id: Option<VenueOrderId>,
340        reason: &str,
341        ts_event: UnixNanos,
342    ) {
343        let event = OrderModifyRejected::new(
344            self.factory.trader_id(),
345            strategy_id,
346            instrument_id,
347            client_order_id,
348            reason.into(),
349            UUID4::new(),
350            ts_event,
351            self.ts_init(),
352            false,
353            venue_order_id,
354            Some(self.factory.account_id()),
355        );
356        self.send_order_event(OrderEventAny::ModifyRejected(event));
357    }
358
359    /// Constructs and emits an order cancel rejected event from raw fields.
360    #[allow(clippy::too_many_arguments)]
361    pub fn emit_order_cancel_rejected_event(
362        &self,
363        strategy_id: StrategyId,
364        instrument_id: InstrumentId,
365        client_order_id: ClientOrderId,
366        venue_order_id: Option<VenueOrderId>,
367        reason: &str,
368        ts_event: UnixNanos,
369    ) {
370        let event = OrderCancelRejected::new(
371            self.factory.trader_id(),
372            strategy_id,
373            instrument_id,
374            client_order_id,
375            reason.into(),
376            UUID4::new(),
377            ts_event,
378            self.ts_init(),
379            false,
380            venue_order_id,
381            Some(self.factory.account_id()),
382        );
383        self.send_order_event(OrderEventAny::CancelRejected(event));
384    }
385
386    /// Emits an order event.
387    pub fn send_order_event(&self, event: OrderEventAny) {
388        if let Some(sender) = &self.sender {
389            if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
390                log::warn!("Failed to send order event: {e}");
391            }
392        } else {
393            log::warn!("Cannot send order event: sender not initialized");
394        }
395    }
396
397    /// Emits an account state event.
398    pub fn send_account_state(&self, state: AccountState) {
399        if let Some(sender) = &self.sender {
400            if let Err(e) = sender.send(ExecutionEvent::Account(state)) {
401                log::warn!("Failed to send account state: {e}");
402            }
403        } else {
404            log::warn!("Cannot send account state: sender not initialized");
405        }
406    }
407
408    /// Emits an execution report.
409    pub fn send_execution_report(&self, report: ExecutionReport) {
410        if let Some(sender) = &self.sender {
411            if let Err(e) = sender.send(ExecutionEvent::Report(report)) {
412                log::warn!("Failed to send execution report: {e}");
413            }
414        } else {
415            log::warn!("Cannot send execution report: sender not initialized");
416        }
417    }
418
419    /// Emits an order status report.
420    pub fn send_order_status_report(&self, report: OrderStatusReport) {
421        self.send_execution_report(ExecutionReport::Order(Box::new(report)));
422    }
423
424    /// Emits a fill report.
425    pub fn send_fill_report(&self, report: FillReport) {
426        self.send_execution_report(ExecutionReport::Fill(Box::new(report)));
427    }
428
429    /// Emits a position status report.
430    pub fn send_position_report(&self, report: PositionStatusReport) {
431        self.send_execution_report(ExecutionReport::Position(Box::new(report)));
432    }
433}