1use std::collections::HashMap;
19
20use nautilus_core::{UUID4, UnixNanos, python::to_pyvalue_err};
21use nautilus_model::{
22 enums::{LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified, TimeInForce},
23 identifiers::{AccountId, InstrumentId, TradeId, VenueOrderId},
24 instruments::{Instrument, InstrumentAny},
25 python::instruments::pyobject_to_instrument_any,
26 reports::{ExecutionMassStatus, FillReport, OrderStatusReport},
27 types::{Money, Price, Quantity},
28};
29use pyo3::{
30 IntoPyObjectExt,
31 prelude::*,
32 types::{PyDict, PyList, PyTuple},
33};
34use rust_decimal::{Decimal, prelude::ToPrimitive};
35
36use crate::execution::reconciliation::{
37 FillAdjustmentResult, FillSnapshot, VenuePositionSnapshot, adjust_fills_for_partial_window,
38 calculate_reconciliation_price,
39};
40
41const DEFAULT_TOLERANCE: Decimal = Decimal::from_parts(1, 0, 0, false, 4); #[pyfunction(name = "adjust_fills_for_partial_window")]
58#[pyo3(signature = (mass_status, instrument, tolerance=None))]
59pub fn py_adjust_fills_for_partial_window(
60 py: Python<'_>,
61 mass_status: &Bound<'_, PyAny>,
62 instrument: Py<PyAny>,
63 tolerance: Option<String>,
64) -> PyResult<Py<PyTuple>> {
65 let instrument_any = pyobject_to_instrument_any(py, instrument)?;
66 let instrument_id = instrument_any.id();
67 let mass_status_obj: ExecutionMassStatus = mass_status.extract()?;
68 let account_id = mass_status_obj.account_id;
69
70 let all_position_reports = mass_status_obj.position_reports();
71
72 let position_reports = match all_position_reports.get(&instrument_id) {
73 Some(reports) if !reports.is_empty() => reports,
74 _ => {
75 let all_orders = mass_status_obj.order_reports();
77 let all_fills = mass_status_obj.fill_reports();
78
79 let orders_dict = PyDict::new(py);
80 let fills_dict = PyDict::new(py);
81
82 for (venue_order_id, order) in all_orders.iter() {
84 if order.instrument_id == instrument_id {
85 orders_dict
86 .set_item(venue_order_id.to_string(), order.clone().into_py_any(py)?)?;
87 }
88 }
89
90 for (venue_order_id, fills) in all_fills.iter() {
92 if let Some(first_fill) = fills.first()
93 && first_fill.instrument_id == instrument_id
94 {
95 let fills_list = PyList::empty(py);
96 for fill in fills {
97 let py_fill = Py::new(py, fill.clone())?;
98 fills_list.append(py_fill)?;
99 }
100 fills_dict.set_item(venue_order_id.to_string(), fills_list)?;
101 }
102 }
103
104 return Ok(PyTuple::new(
105 py,
106 [orders_dict.into_py_any(py)?, fills_dict.into_py_any(py)?],
107 )?
108 .into());
109 }
110 };
111
112 let position_report = position_reports
113 .first()
114 .ok_or_else(|| to_pyvalue_err("Position reports unexpectedly empty"))?;
115
116 let venue_side = match position_report.position_side {
117 PositionSideSpecified::Long => OrderSide::Buy,
118 PositionSideSpecified::Short => OrderSide::Sell,
119 PositionSideSpecified::Flat => OrderSide::Buy, };
121
122 let venue_position = VenuePositionSnapshot {
123 side: venue_side,
124 qty: position_report.quantity.into(),
125 avg_px: position_report.avg_px_open.unwrap_or(Decimal::ZERO),
126 };
127
128 let mut fill_snapshots = Vec::new();
130 let mut fill_map: HashMap<VenueOrderId, Vec<FillReport>> = HashMap::new();
131 let mut order_map: HashMap<VenueOrderId, OrderStatusReport> = HashMap::new();
132
133 for (venue_order_id, order) in mass_status_obj.order_reports() {
135 if order.instrument_id == instrument_id {
136 order_map.insert(venue_order_id, order.clone());
137 }
138 }
139
140 for (venue_order_id, fill_reports) in mass_status_obj.fill_reports() {
141 for fill in fill_reports {
142 if fill.instrument_id == instrument_id {
143 let side = mass_status_obj
145 .order_reports()
146 .get(&venue_order_id)
147 .map_or(fill.order_side, |order| order.order_side);
148
149 fill_snapshots.push(FillSnapshot::new(
150 fill.ts_event.as_u64(),
151 side,
152 fill.last_qty.into(),
153 fill.last_px.into(),
154 venue_order_id,
155 ));
156
157 fill_map
159 .entry(venue_order_id)
160 .or_default()
161 .push(fill.clone());
162 }
163 }
164 }
165
166 fill_snapshots.sort_by_key(|f| f.ts_event);
168
169 if fill_snapshots.is_empty() {
170 return py_tuple_from_reports(py, &order_map, &fill_map);
172 }
173
174 for window in fill_snapshots.windows(2) {
176 if window[0].ts_event == window[1].ts_event {
177 log::debug!(
178 "Duplicate timestamp detected in fills: {} for orders {} and {}",
179 window[0].ts_event,
180 window[0].venue_order_id,
181 window[1].venue_order_id
182 );
183 }
184 }
185
186 let tol = if let Some(tol_str) = tolerance {
188 Decimal::from_str_exact(&tol_str).map_err(to_pyvalue_err)?
189 } else {
190 DEFAULT_TOLERANCE
191 };
192
193 let result =
194 adjust_fills_for_partial_window(&fill_snapshots, &venue_position, &instrument_any, tol);
195
196 let (adjusted_orders, adjusted_fills) = match result {
198 FillAdjustmentResult::NoAdjustment => {
199 (order_map, fill_map)
201 }
202 FillAdjustmentResult::AddSyntheticOpening {
203 synthetic_fill,
204 existing_fills: _,
205 } => {
206 let synthetic_venue_order_id = create_synthetic_venue_order_id(synthetic_fill.ts_event);
208
209 let synthetic_order = create_synthetic_order_report(
211 &synthetic_fill,
212 account_id,
213 instrument_id,
214 &instrument_any,
215 synthetic_venue_order_id,
216 )?;
217 let synthetic_fill_report = create_synthetic_fill_report(
218 &synthetic_fill,
219 account_id,
220 instrument_id,
221 &instrument_any,
222 synthetic_venue_order_id,
223 )?;
224
225 let mut adjusted_fills = fill_map;
226 adjusted_fills
227 .entry(synthetic_venue_order_id)
228 .or_default()
229 .insert(0, synthetic_fill_report);
230
231 let mut adjusted_orders = order_map;
232 adjusted_orders.insert(synthetic_venue_order_id, synthetic_order);
233
234 (adjusted_orders, adjusted_fills)
235 }
236 FillAdjustmentResult::ReplaceCurrentLifecycle {
237 synthetic_fill,
238 first_venue_order_id,
239 } => {
240 let synthetic_order = if let Some(original_order) = order_map.get(&first_venue_order_id)
245 {
246 let mut updated_order = original_order.clone();
248
249 let qty_f64 = synthetic_fill
251 .qty
252 .to_f64()
253 .ok_or_else(|| to_pyvalue_err("Failed to convert quantity to f64"))?;
254 let order_qty = Quantity::new(qty_f64, instrument_any.size_precision());
255
256 updated_order.quantity = order_qty;
257 updated_order.filled_qty = order_qty;
258 updated_order.order_status = OrderStatus::Filled;
259 updated_order.ts_last = UnixNanos::from(synthetic_fill.ts_event);
260
261 updated_order
262 } else {
263 create_synthetic_order_report(
265 &synthetic_fill,
266 account_id,
267 instrument_id,
268 &instrument_any,
269 first_venue_order_id,
270 )?
271 };
272
273 let synthetic_fill_report = create_synthetic_fill_report(
274 &synthetic_fill,
275 account_id,
276 instrument_id,
277 &instrument_any,
278 first_venue_order_id,
279 )?;
280
281 let mut adjusted_orders = HashMap::new();
283 adjusted_orders.insert(first_venue_order_id, synthetic_order);
284
285 let mut adjusted_fills = HashMap::new();
286 adjusted_fills.insert(first_venue_order_id, vec![synthetic_fill_report]);
287
288 (adjusted_orders, adjusted_fills)
289 }
290 FillAdjustmentResult::FilterToCurrentLifecycle {
291 last_zero_crossing_ts,
292 current_lifecycle_fills: _,
293 } => {
294 let mut result_fills = HashMap::new();
296 let mut result_orders = HashMap::new();
297
298 let orders_with_fills: std::collections::HashSet<VenueOrderId> =
300 fill_map.keys().copied().collect();
301
302 for (venue_order_id, fills) in fill_map {
304 let filtered: Vec<FillReport> = fills
305 .into_iter()
306 .filter(|f| f.ts_event.as_u64() > last_zero_crossing_ts)
307 .collect();
308 if !filtered.is_empty() {
309 result_fills.insert(venue_order_id, filtered);
310 if let Some(order) = order_map.get(&venue_order_id) {
312 result_orders.insert(venue_order_id, order.clone());
313 }
314 }
315 }
316
317 for (venue_order_id, order) in &order_map {
322 let is_closed = matches!(
323 order.order_status,
324 OrderStatus::Denied
325 | OrderStatus::Rejected
326 | OrderStatus::Canceled
327 | OrderStatus::Expired
328 | OrderStatus::Filled
329 );
330 if !orders_with_fills.contains(venue_order_id) && !is_closed {
331 result_orders.insert(*venue_order_id, order.clone());
332 }
333 }
334
335 (result_orders, result_fills)
336 }
337 };
338
339 py_tuple_from_reports(py, &adjusted_orders, &adjusted_fills)
340}
341
342fn create_synthetic_venue_order_id(ts_event: u64) -> VenueOrderId {
344 let uuid = UUID4::new();
345 let uuid_str = uuid.to_string();
347 let uuid_suffix = &uuid_str[..8];
348 let venue_order_id_value = format!("S-{:x}-{}", ts_event, uuid_suffix);
349 VenueOrderId::new(&venue_order_id_value)
350}
351
352fn create_synthetic_order_report(
354 fill: &FillSnapshot,
355 account_id: AccountId,
356 instrument_id: InstrumentId,
357 instrument: &InstrumentAny,
358 venue_order_id: VenueOrderId,
359) -> PyResult<OrderStatusReport> {
360 let qty_f64 = fill
361 .qty
362 .to_f64()
363 .ok_or_else(|| to_pyvalue_err("Failed to convert quantity to f64"))?;
364 let order_qty = Quantity::new(qty_f64, instrument.size_precision());
365
366 Ok(OrderStatusReport::new(
367 account_id,
368 instrument_id,
369 None, venue_order_id,
371 fill.side,
372 OrderType::Market,
373 TimeInForce::Gtc,
374 OrderStatus::Filled,
375 order_qty,
376 order_qty, UnixNanos::from(fill.ts_event),
378 UnixNanos::from(fill.ts_event),
379 UnixNanos::from(fill.ts_event),
380 None, ))
382}
383
384fn create_synthetic_fill_report(
386 fill: &FillSnapshot,
387 account_id: AccountId,
388 instrument_id: InstrumentId,
389 instrument: &InstrumentAny,
390 venue_order_id: VenueOrderId,
391) -> PyResult<FillReport> {
392 let uuid = UUID4::new();
393 let uuid_str = uuid.to_string();
395 let uuid_suffix = &uuid_str[..8];
396 let trade_id_value = format!("S-{:x}-{}", fill.ts_event, uuid_suffix);
397 let trade_id = TradeId::new(&trade_id_value);
398
399 let qty_f64 = fill
400 .qty
401 .to_f64()
402 .ok_or_else(|| to_pyvalue_err("Failed to convert quantity to f64"))?;
403 let px_f64 = fill
404 .px
405 .to_f64()
406 .ok_or_else(|| to_pyvalue_err("Failed to convert price to f64"))?;
407
408 Ok(FillReport::new(
409 account_id,
410 instrument_id,
411 venue_order_id,
412 trade_id,
413 fill.side,
414 Quantity::new(qty_f64, instrument.size_precision()),
415 Price::new(px_f64, instrument.price_precision()),
416 Money::new(0.0, instrument.quote_currency()),
417 LiquiditySide::NoLiquiditySide,
418 None, None, fill.ts_event.into(),
421 fill.ts_event.into(),
422 None, ))
424}
425
426fn py_tuple_from_reports(
428 py: Python<'_>,
429 order_map: &HashMap<VenueOrderId, OrderStatusReport>,
430 fill_map: &HashMap<VenueOrderId, Vec<FillReport>>,
431) -> PyResult<Py<PyTuple>> {
432 let orders_dict = PyDict::new(py);
434
435 for (venue_order_id, order) in order_map {
436 orders_dict.set_item(venue_order_id.to_string(), order.clone().into_py_any(py)?)?;
437 }
438
439 let fills_dict = PyDict::new(py);
440
441 for (venue_order_id, fills) in fill_map {
442 let fills_list: Result<Vec<_>, _> =
443 fills.iter().map(|f| f.clone().into_py_any(py)).collect();
444 fills_dict.set_item(venue_order_id.to_string(), fills_list?)?;
445 }
446
447 Ok(PyTuple::new(
448 py,
449 [orders_dict.into_py_any(py)?, fills_dict.into_py_any(py)?],
450 )?
451 .into())
452}
453
454#[pyfunction(name = "calculate_reconciliation_price")]
471#[pyo3(signature = (current_position_qty, current_position_avg_px, target_position_qty, target_position_avg_px))]
472pub fn py_calculate_reconciliation_price(
473 current_position_qty: Decimal,
474 current_position_avg_px: Option<Decimal>,
475 target_position_qty: Decimal,
476 target_position_avg_px: Option<Decimal>,
477) -> Option<Decimal> {
478 calculate_reconciliation_price(
479 current_position_qty,
480 current_position_avg_px,
481 target_position_qty,
482 target_position_avg_px,
483 )
484}