1use ahash::{AHashMap, AHashSet};
19use nautilus_core::{UUID4, UnixNanos, python::to_pyvalue_err};
20use nautilus_model::{
21 enums::{LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified, TimeInForce},
22 identifiers::{AccountId, InstrumentId, TradeId, VenueOrderId},
23 instruments::{Instrument, InstrumentAny},
24 python::instruments::pyobject_to_instrument_any,
25 reports::{ExecutionMassStatus, FillReport, OrderStatusReport},
26 types::{Money, Price, Quantity},
27};
28use pyo3::{
29 IntoPyObjectExt,
30 prelude::*,
31 types::{PyDict, PyList, PyTuple},
32};
33use rust_decimal::{Decimal, prelude::ToPrimitive};
34
35use crate::execution::reconciliation::{
36 FillAdjustmentResult, FillSnapshot, VenuePositionSnapshot, adjust_fills_for_partial_window,
37 calculate_reconciliation_price,
38};
39
40const DEFAULT_TOLERANCE: Decimal = Decimal::from_parts(1, 0, 0, false, 4); #[pyfunction(name = "adjust_fills_for_partial_window")]
57#[pyo3(signature = (mass_status, instrument, tolerance=None))]
58pub fn py_adjust_fills_for_partial_window(
59 py: Python<'_>,
60 mass_status: &Bound<'_, PyAny>,
61 instrument: Py<PyAny>,
62 tolerance: Option<String>,
63) -> PyResult<Py<PyTuple>> {
64 let instrument_any = pyobject_to_instrument_any(py, instrument)?;
65 let instrument_id = instrument_any.id();
66 let mass_status_obj: ExecutionMassStatus = mass_status.extract()?;
67 let account_id = mass_status_obj.account_id;
68
69 let all_position_reports = mass_status_obj.position_reports();
70
71 let position_reports = match all_position_reports.get(&instrument_id) {
72 Some(reports) if !reports.is_empty() => reports,
73 _ => {
74 let all_orders = mass_status_obj.order_reports();
76 let all_fills = mass_status_obj.fill_reports();
77
78 let orders_dict = PyDict::new(py);
79 let fills_dict = PyDict::new(py);
80
81 for (venue_order_id, order) in &all_orders {
83 if order.instrument_id == instrument_id {
84 orders_dict
85 .set_item(venue_order_id.to_string(), order.clone().into_py_any(py)?)?;
86 }
87 }
88
89 for (venue_order_id, fills) in &all_fills {
91 if let Some(first_fill) = fills.first()
92 && first_fill.instrument_id == instrument_id
93 {
94 let fills_list = PyList::empty(py);
95 for fill in fills {
96 let py_fill = Py::new(py, fill.clone())?;
97 fills_list.append(py_fill)?;
98 }
99 fills_dict.set_item(venue_order_id.to_string(), fills_list)?;
100 }
101 }
102
103 return Ok(PyTuple::new(
104 py,
105 [orders_dict.into_py_any(py)?, fills_dict.into_py_any(py)?],
106 )?
107 .into());
108 }
109 };
110
111 let position_report = position_reports
112 .first()
113 .ok_or_else(|| to_pyvalue_err("Position reports unexpectedly empty"))?;
114
115 let venue_side = match position_report.position_side {
116 PositionSideSpecified::Long => OrderSide::Buy,
117 PositionSideSpecified::Short => OrderSide::Sell,
118 PositionSideSpecified::Flat => OrderSide::Buy, };
120
121 let venue_position = VenuePositionSnapshot {
122 side: venue_side,
123 qty: position_report.quantity.into(),
124 avg_px: position_report.avg_px_open.unwrap_or(Decimal::ZERO),
125 };
126
127 let mut fill_snapshots = Vec::new();
129 let mut fill_map: AHashMap<VenueOrderId, Vec<FillReport>> = AHashMap::new();
130 let mut order_map: AHashMap<VenueOrderId, OrderStatusReport> = AHashMap::new();
131
132 for (venue_order_id, order) in mass_status_obj.order_reports() {
134 if order.instrument_id == instrument_id {
135 order_map.insert(venue_order_id, order.clone());
136 }
137 }
138
139 for (venue_order_id, fill_reports) in mass_status_obj.fill_reports() {
140 for fill in fill_reports {
141 if fill.instrument_id == instrument_id {
142 let side = mass_status_obj
144 .order_reports()
145 .get(&venue_order_id)
146 .map_or(fill.order_side, |order| order.order_side);
147
148 fill_snapshots.push(FillSnapshot::new(
149 fill.ts_event.as_u64(),
150 side,
151 fill.last_qty.into(),
152 fill.last_px.into(),
153 venue_order_id,
154 ));
155
156 fill_map
158 .entry(venue_order_id)
159 .or_default()
160 .push(fill.clone());
161 }
162 }
163 }
164
165 fill_snapshots.sort_by_key(|f| f.ts_event);
167
168 if fill_snapshots.is_empty() {
169 return py_tuple_from_reports(py, &order_map, &fill_map);
171 }
172
173 for window in fill_snapshots.windows(2) {
175 if window[0].ts_event == window[1].ts_event {
176 log::debug!(
177 "Duplicate timestamp detected in fills: {} for orders {} and {}",
178 window[0].ts_event,
179 window[0].venue_order_id,
180 window[1].venue_order_id
181 );
182 }
183 }
184
185 let tol = if let Some(tol_str) = tolerance {
187 Decimal::from_str_exact(&tol_str).map_err(to_pyvalue_err)?
188 } else {
189 DEFAULT_TOLERANCE
190 };
191
192 let result =
193 adjust_fills_for_partial_window(&fill_snapshots, &venue_position, &instrument_any, tol);
194
195 let (adjusted_orders, adjusted_fills) = match result {
197 FillAdjustmentResult::NoAdjustment => {
198 (order_map, fill_map)
200 }
201 FillAdjustmentResult::AddSyntheticOpening {
202 synthetic_fill,
203 existing_fills: _,
204 } => {
205 let synthetic_venue_order_id = create_synthetic_venue_order_id(synthetic_fill.ts_event);
207
208 let synthetic_order = create_synthetic_order_report(
210 &synthetic_fill,
211 account_id,
212 instrument_id,
213 &instrument_any,
214 synthetic_venue_order_id,
215 )?;
216 let synthetic_fill_report = create_synthetic_fill_report(
217 &synthetic_fill,
218 account_id,
219 instrument_id,
220 &instrument_any,
221 synthetic_venue_order_id,
222 )?;
223
224 let mut adjusted_fills = fill_map;
225 adjusted_fills
226 .entry(synthetic_venue_order_id)
227 .or_default()
228 .insert(0, synthetic_fill_report);
229
230 let mut adjusted_orders = order_map;
231 adjusted_orders.insert(synthetic_venue_order_id, synthetic_order);
232
233 (adjusted_orders, adjusted_fills)
234 }
235 FillAdjustmentResult::ReplaceCurrentLifecycle {
236 synthetic_fill,
237 first_venue_order_id,
238 } => {
239 let synthetic_order = if let Some(original_order) = order_map.get(&first_venue_order_id)
244 {
245 let mut updated_order = original_order.clone();
247
248 let qty_f64 = synthetic_fill
250 .qty
251 .to_f64()
252 .ok_or_else(|| to_pyvalue_err("Failed to convert quantity to f64"))?;
253 let order_qty = Quantity::new(qty_f64, instrument_any.size_precision());
254
255 updated_order.quantity = order_qty;
256 updated_order.filled_qty = order_qty;
257 updated_order.order_status = OrderStatus::Filled;
258 updated_order.ts_last = UnixNanos::from(synthetic_fill.ts_event);
259
260 updated_order
261 } else {
262 create_synthetic_order_report(
264 &synthetic_fill,
265 account_id,
266 instrument_id,
267 &instrument_any,
268 first_venue_order_id,
269 )?
270 };
271
272 let synthetic_fill_report = create_synthetic_fill_report(
273 &synthetic_fill,
274 account_id,
275 instrument_id,
276 &instrument_any,
277 first_venue_order_id,
278 )?;
279
280 let mut adjusted_orders = AHashMap::new();
282 adjusted_orders.insert(first_venue_order_id, synthetic_order);
283
284 let mut adjusted_fills = AHashMap::new();
285 adjusted_fills.insert(first_venue_order_id, vec![synthetic_fill_report]);
286
287 (adjusted_orders, adjusted_fills)
288 }
289 FillAdjustmentResult::FilterToCurrentLifecycle {
290 last_zero_crossing_ts,
291 current_lifecycle_fills: _,
292 } => {
293 let mut result_fills = AHashMap::new();
295 let mut result_orders = AHashMap::new();
296
297 let orders_with_fills: AHashSet<VenueOrderId> = fill_map.keys().copied().collect();
299
300 for (venue_order_id, fills) in fill_map {
302 let filtered: Vec<FillReport> = fills
303 .into_iter()
304 .filter(|f| f.ts_event.as_u64() > last_zero_crossing_ts)
305 .collect();
306 if !filtered.is_empty() {
307 result_fills.insert(venue_order_id, filtered);
308 if let Some(order) = order_map.get(&venue_order_id) {
310 result_orders.insert(venue_order_id, order.clone());
311 }
312 }
313 }
314
315 for (venue_order_id, order) in &order_map {
320 let is_closed = matches!(
321 order.order_status,
322 OrderStatus::Denied
323 | OrderStatus::Rejected
324 | OrderStatus::Canceled
325 | OrderStatus::Expired
326 | OrderStatus::Filled
327 );
328 if !orders_with_fills.contains(venue_order_id) && !is_closed {
329 result_orders.insert(*venue_order_id, order.clone());
330 }
331 }
332
333 (result_orders, result_fills)
334 }
335 };
336
337 py_tuple_from_reports(py, &adjusted_orders, &adjusted_fills)
338}
339
340fn create_synthetic_venue_order_id(ts_event: u64) -> VenueOrderId {
342 let uuid = UUID4::new();
343 let uuid_str = uuid.to_string();
345 let uuid_suffix = &uuid_str[..8];
346 let venue_order_id_value = format!("S-{ts_event:x}-{uuid_suffix}");
347 VenueOrderId::new(&venue_order_id_value)
348}
349
350fn create_synthetic_order_report(
352 fill: &FillSnapshot,
353 account_id: AccountId,
354 instrument_id: InstrumentId,
355 instrument: &InstrumentAny,
356 venue_order_id: VenueOrderId,
357) -> PyResult<OrderStatusReport> {
358 let qty_f64 = fill
359 .qty
360 .to_f64()
361 .ok_or_else(|| to_pyvalue_err("Failed to convert quantity to f64"))?;
362 let order_qty = Quantity::new(qty_f64, instrument.size_precision());
363
364 Ok(OrderStatusReport::new(
365 account_id,
366 instrument_id,
367 None, venue_order_id,
369 fill.side,
370 OrderType::Market,
371 TimeInForce::Gtc,
372 OrderStatus::Filled,
373 order_qty,
374 order_qty, UnixNanos::from(fill.ts_event),
376 UnixNanos::from(fill.ts_event),
377 UnixNanos::from(fill.ts_event),
378 None, ))
380}
381
382fn create_synthetic_fill_report(
384 fill: &FillSnapshot,
385 account_id: AccountId,
386 instrument_id: InstrumentId,
387 instrument: &InstrumentAny,
388 venue_order_id: VenueOrderId,
389) -> PyResult<FillReport> {
390 let uuid = UUID4::new();
391 let uuid_str = uuid.to_string();
393 let uuid_suffix = &uuid_str[..8];
394 let trade_id_value = format!("S-{:x}-{}", fill.ts_event, uuid_suffix);
395 let trade_id = TradeId::new(&trade_id_value);
396
397 let qty_f64 = fill
398 .qty
399 .to_f64()
400 .ok_or_else(|| to_pyvalue_err("Failed to convert quantity to f64"))?;
401 let px_f64 = fill
402 .px
403 .to_f64()
404 .ok_or_else(|| to_pyvalue_err("Failed to convert price to f64"))?;
405
406 Ok(FillReport::new(
407 account_id,
408 instrument_id,
409 venue_order_id,
410 trade_id,
411 fill.side,
412 Quantity::new(qty_f64, instrument.size_precision()),
413 Price::new(px_f64, instrument.price_precision()),
414 Money::new(0.0, instrument.quote_currency()),
415 LiquiditySide::NoLiquiditySide,
416 None, None, fill.ts_event.into(),
419 fill.ts_event.into(),
420 None, ))
422}
423
424fn py_tuple_from_reports(
426 py: Python<'_>,
427 order_map: &AHashMap<VenueOrderId, OrderStatusReport>,
428 fill_map: &AHashMap<VenueOrderId, Vec<FillReport>>,
429) -> PyResult<Py<PyTuple>> {
430 let orders_dict = PyDict::new(py);
432
433 for (venue_order_id, order) in order_map {
434 orders_dict.set_item(venue_order_id.to_string(), order.clone().into_py_any(py)?)?;
435 }
436
437 let fills_dict = PyDict::new(py);
438
439 for (venue_order_id, fills) in fill_map {
440 let fills_list: Result<Vec<_>, _> =
441 fills.iter().map(|f| f.clone().into_py_any(py)).collect();
442 fills_dict.set_item(venue_order_id.to_string(), fills_list?)?;
443 }
444
445 Ok(PyTuple::new(
446 py,
447 [orders_dict.into_py_any(py)?, fills_dict.into_py_any(py)?],
448 )?
449 .into())
450}
451
452#[pyfunction(name = "calculate_reconciliation_price")]
469#[pyo3(signature = (current_position_qty, current_position_avg_px, target_position_qty, target_position_avg_px))]
470pub fn py_calculate_reconciliation_price(
471 current_position_qty: Decimal,
472 current_position_avg_px: Option<Decimal>,
473 target_position_qty: Decimal,
474 target_position_avg_px: Option<Decimal>,
475) -> Option<Decimal> {
476 calculate_reconciliation_price(
477 current_position_qty,
478 current_position_avg_px,
479 target_position_qty,
480 target_position_avg_px,
481 )
482}