1use nautilus_common::{
33 factories::OrderEventFactory,
34 messages::{ExecutionEvent, ExecutionReport},
35};
36use nautilus_core::{UUID4, UnixNanos, time::AtomicTime};
37use nautilus_model::{
38 enums::{AccountType, LiquiditySide},
39 events::{
40 AccountState, OrderCancelRejected, OrderEventAny, OrderModifyRejected, OrderRejected,
41 },
42 identifiers::{
43 AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, TraderId,
44 VenueOrderId,
45 },
46 orders::OrderAny,
47 reports::{FillReport, OrderStatusReport, PositionStatusReport},
48 types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
49};
50
51#[derive(Debug, Clone)]
59pub struct ExecutionEventEmitter {
60 clock: &'static AtomicTime,
61 factory: OrderEventFactory,
62 sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
63}
64
65impl ExecutionEventEmitter {
66 #[must_use]
70 pub fn new(
71 clock: &'static AtomicTime,
72 trader_id: TraderId,
73 account_id: AccountId,
74 account_type: AccountType,
75 base_currency: Option<Currency>,
76 ) -> Self {
77 Self {
78 clock,
79 factory: OrderEventFactory::new(trader_id, account_id, account_type, base_currency),
80 sender: None,
81 }
82 }
83
84 fn ts_init(&self) -> UnixNanos {
85 self.clock.get_time_ns()
86 }
87
88 pub fn set_sender(&mut self, sender: tokio::sync::mpsc::UnboundedSender<ExecutionEvent>) {
90 self.sender = Some(sender);
91 }
92
93 #[must_use]
95 pub fn is_initialized(&self) -> bool {
96 self.sender.is_some()
97 }
98
99 #[must_use]
101 pub fn trader_id(&self) -> TraderId {
102 self.factory.trader_id()
103 }
104
105 #[must_use]
107 pub fn account_id(&self) -> AccountId {
108 self.factory.account_id()
109 }
110
111 pub fn emit_account_state(
113 &self,
114 balances: Vec<AccountBalance>,
115 margins: Vec<MarginBalance>,
116 reported: bool,
117 ts_event: UnixNanos,
118 ) {
119 let state = self.factory.generate_account_state(
120 balances,
121 margins,
122 reported,
123 ts_event,
124 self.ts_init(),
125 );
126 self.send_account_state(state);
127 }
128
129 pub fn emit_order_denied(&self, order: &OrderAny, reason: &str) {
131 let event = self
132 .factory
133 .generate_order_denied(order, reason, self.ts_init());
134 self.send_order_event(event);
135 }
136
137 pub fn emit_order_submitted(&self, order: &OrderAny) {
139 let event = self.factory.generate_order_submitted(order, self.ts_init());
140 self.send_order_event(event);
141 }
142
143 pub fn emit_order_rejected(
145 &self,
146 order: &OrderAny,
147 reason: &str,
148 ts_event: UnixNanos,
149 due_post_only: bool,
150 ) {
151 let event = self.factory.generate_order_rejected(
152 order,
153 reason,
154 ts_event,
155 self.ts_init(),
156 due_post_only,
157 );
158 self.send_order_event(event);
159 }
160
161 pub fn emit_order_accepted(
163 &self,
164 order: &OrderAny,
165 venue_order_id: VenueOrderId,
166 ts_event: UnixNanos,
167 ) {
168 let event =
169 self.factory
170 .generate_order_accepted(order, venue_order_id, ts_event, self.ts_init());
171 self.send_order_event(event);
172 }
173
174 pub fn emit_order_modify_rejected(
176 &self,
177 order: &OrderAny,
178 venue_order_id: Option<VenueOrderId>,
179 reason: &str,
180 ts_event: UnixNanos,
181 ) {
182 let event = self.factory.generate_order_modify_rejected(
183 order,
184 venue_order_id,
185 reason,
186 ts_event,
187 self.ts_init(),
188 );
189 self.send_order_event(event);
190 }
191
192 pub fn emit_order_cancel_rejected(
194 &self,
195 order: &OrderAny,
196 venue_order_id: Option<VenueOrderId>,
197 reason: &str,
198 ts_event: UnixNanos,
199 ) {
200 let event = self.factory.generate_order_cancel_rejected(
201 order,
202 venue_order_id,
203 reason,
204 ts_event,
205 self.ts_init(),
206 );
207 self.send_order_event(event);
208 }
209
210 #[allow(clippy::too_many_arguments)]
212 pub fn emit_order_updated(
213 &self,
214 order: &OrderAny,
215 venue_order_id: VenueOrderId,
216 quantity: Quantity,
217 price: Option<Price>,
218 trigger_price: Option<Price>,
219 protection_price: Option<Price>,
220 ts_event: UnixNanos,
221 ) {
222 let event = self.factory.generate_order_updated(
223 order,
224 venue_order_id,
225 quantity,
226 price,
227 trigger_price,
228 protection_price,
229 ts_event,
230 self.ts_init(),
231 );
232 self.send_order_event(event);
233 }
234
235 pub fn emit_order_canceled(
237 &self,
238 order: &OrderAny,
239 venue_order_id: Option<VenueOrderId>,
240 ts_event: UnixNanos,
241 ) {
242 let event =
243 self.factory
244 .generate_order_canceled(order, venue_order_id, ts_event, self.ts_init());
245 self.send_order_event(event);
246 }
247
248 pub fn emit_order_triggered(
250 &self,
251 order: &OrderAny,
252 venue_order_id: Option<VenueOrderId>,
253 ts_event: UnixNanos,
254 ) {
255 let event =
256 self.factory
257 .generate_order_triggered(order, venue_order_id, ts_event, self.ts_init());
258 self.send_order_event(event);
259 }
260
261 pub fn emit_order_expired(
263 &self,
264 order: &OrderAny,
265 venue_order_id: Option<VenueOrderId>,
266 ts_event: UnixNanos,
267 ) {
268 let event =
269 self.factory
270 .generate_order_expired(order, venue_order_id, ts_event, self.ts_init());
271 self.send_order_event(event);
272 }
273
274 #[allow(clippy::too_many_arguments)]
276 pub fn emit_order_filled(
277 &self,
278 order: &OrderAny,
279 venue_order_id: VenueOrderId,
280 venue_position_id: Option<PositionId>,
281 trade_id: TradeId,
282 last_qty: Quantity,
283 last_px: Price,
284 quote_currency: Currency,
285 commission: Option<Money>,
286 liquidity_side: LiquiditySide,
287 ts_event: UnixNanos,
288 ) {
289 let event = self.factory.generate_order_filled(
290 order,
291 venue_order_id,
292 venue_position_id,
293 trade_id,
294 last_qty,
295 last_px,
296 quote_currency,
297 commission,
298 liquidity_side,
299 ts_event,
300 self.ts_init(),
301 );
302 self.send_order_event(event);
303 }
304
305 #[allow(clippy::too_many_arguments)]
307 pub fn emit_order_rejected_event(
308 &self,
309 strategy_id: StrategyId,
310 instrument_id: InstrumentId,
311 client_order_id: ClientOrderId,
312 reason: &str,
313 ts_event: UnixNanos,
314 due_post_only: bool,
315 ) {
316 let event = OrderRejected::new(
317 self.factory.trader_id(),
318 strategy_id,
319 instrument_id,
320 client_order_id,
321 self.factory.account_id(),
322 reason.into(),
323 UUID4::new(),
324 ts_event,
325 self.ts_init(),
326 false,
327 due_post_only,
328 );
329 self.send_order_event(OrderEventAny::Rejected(event));
330 }
331
332 #[allow(clippy::too_many_arguments)]
334 pub fn emit_order_modify_rejected_event(
335 &self,
336 strategy_id: StrategyId,
337 instrument_id: InstrumentId,
338 client_order_id: ClientOrderId,
339 venue_order_id: Option<VenueOrderId>,
340 reason: &str,
341 ts_event: UnixNanos,
342 ) {
343 let event = OrderModifyRejected::new(
344 self.factory.trader_id(),
345 strategy_id,
346 instrument_id,
347 client_order_id,
348 reason.into(),
349 UUID4::new(),
350 ts_event,
351 self.ts_init(),
352 false,
353 venue_order_id,
354 Some(self.factory.account_id()),
355 );
356 self.send_order_event(OrderEventAny::ModifyRejected(event));
357 }
358
359 #[allow(clippy::too_many_arguments)]
361 pub fn emit_order_cancel_rejected_event(
362 &self,
363 strategy_id: StrategyId,
364 instrument_id: InstrumentId,
365 client_order_id: ClientOrderId,
366 venue_order_id: Option<VenueOrderId>,
367 reason: &str,
368 ts_event: UnixNanos,
369 ) {
370 let event = OrderCancelRejected::new(
371 self.factory.trader_id(),
372 strategy_id,
373 instrument_id,
374 client_order_id,
375 reason.into(),
376 UUID4::new(),
377 ts_event,
378 self.ts_init(),
379 false,
380 venue_order_id,
381 Some(self.factory.account_id()),
382 );
383 self.send_order_event(OrderEventAny::CancelRejected(event));
384 }
385
386 pub fn send_order_event(&self, event: OrderEventAny) {
388 if let Some(sender) = &self.sender {
389 if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
390 log::warn!("Failed to send order event: {e}");
391 }
392 } else {
393 log::warn!("Cannot send order event: sender not initialized");
394 }
395 }
396
397 pub fn send_account_state(&self, state: AccountState) {
399 if let Some(sender) = &self.sender {
400 if let Err(e) = sender.send(ExecutionEvent::Account(state)) {
401 log::warn!("Failed to send account state: {e}");
402 }
403 } else {
404 log::warn!("Cannot send account state: sender not initialized");
405 }
406 }
407
408 pub fn send_execution_report(&self, report: ExecutionReport) {
410 if let Some(sender) = &self.sender {
411 if let Err(e) = sender.send(ExecutionEvent::Report(report)) {
412 log::warn!("Failed to send execution report: {e}");
413 }
414 } else {
415 log::warn!("Cannot send execution report: sender not initialized");
416 }
417 }
418
419 pub fn send_order_status_report(&self, report: OrderStatusReport) {
421 self.send_execution_report(ExecutionReport::Order(Box::new(report)));
422 }
423
424 pub fn send_fill_report(&self, report: FillReport) {
426 self.send_execution_report(ExecutionReport::Fill(Box::new(report)));
427 }
428
429 pub fn send_position_report(&self, report: PositionStatusReport) {
431 self.send_execution_report(ExecutionReport::Position(Box::new(report)));
432 }
433}