nautilus_live/python/
reconciliation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for reconciliation functions.
17
18use ahash::{AHashMap, AHashSet};
19use nautilus_core::{UUID4, UnixNanos, python::to_pyvalue_err};
20use nautilus_model::{
21    enums::{LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified, TimeInForce},
22    identifiers::{AccountId, InstrumentId, TradeId, VenueOrderId},
23    instruments::{Instrument, InstrumentAny},
24    python::instruments::pyobject_to_instrument_any,
25    reports::{ExecutionMassStatus, FillReport, OrderStatusReport},
26    types::{Money, Price, Quantity},
27};
28use pyo3::{
29    IntoPyObjectExt,
30    prelude::*,
31    types::{PyDict, PyList, PyTuple},
32};
33use rust_decimal::{Decimal, prelude::ToPrimitive};
34
35use crate::execution::reconciliation::{
36    FillAdjustmentResult, FillSnapshot, VenuePositionSnapshot, adjust_fills_for_partial_window,
37    calculate_reconciliation_price,
38};
39
40const DEFAULT_TOLERANCE: Decimal = Decimal::from_parts(1, 0, 0, false, 4); // 0.0001
41
42/// Python wrapper for adjust_fills_for_partial_window.
43///
44/// Takes ExecutionMassStatus and Instrument, performs all adjustment logic in Rust,
45/// and returns tuple of (order_reports, fill_reports) ready for reconciliation.
46///
47/// # Returns
48///
49/// Tuple of (Dict[VenueOrderId, OrderStatusReport], Dict[VenueOrderId, List[FillReport]])
50///
51/// # Errors
52///
53/// This function returns an error if:
54/// - The instrument conversion fails.
55/// - Any decimal or tolerance parsing fails.
56#[pyfunction(name = "adjust_fills_for_partial_window")]
57#[pyo3(signature = (mass_status, instrument, tolerance=None))]
58pub fn py_adjust_fills_for_partial_window(
59    py: Python<'_>,
60    mass_status: &Bound<'_, PyAny>,
61    instrument: Py<PyAny>,
62    tolerance: Option<String>,
63) -> PyResult<Py<PyTuple>> {
64    let instrument_any = pyobject_to_instrument_any(py, instrument)?;
65    let instrument_id = instrument_any.id();
66    let mass_status_obj: ExecutionMassStatus = mass_status.extract()?;
67    let account_id = mass_status_obj.account_id;
68
69    let all_position_reports = mass_status_obj.position_reports();
70
71    let position_reports = match all_position_reports.get(&instrument_id) {
72        Some(reports) if !reports.is_empty() => reports,
73        _ => {
74            // No position report - return all orders and fills for this instrument unchanged
75            let all_orders = mass_status_obj.order_reports();
76            let all_fills = mass_status_obj.fill_reports();
77
78            let orders_dict = PyDict::new(py);
79            let fills_dict = PyDict::new(py);
80
81            // Add all orders for this instrument
82            for (venue_order_id, order) in &all_orders {
83                if order.instrument_id == instrument_id {
84                    orders_dict
85                        .set_item(venue_order_id.to_string(), order.clone().into_py_any(py)?)?;
86                }
87            }
88
89            // Add all fills for this instrument
90            for (venue_order_id, fills) in &all_fills {
91                if let Some(first_fill) = fills.first()
92                    && first_fill.instrument_id == instrument_id
93                {
94                    let fills_list = PyList::empty(py);
95                    for fill in fills {
96                        let py_fill = Py::new(py, fill.clone())?;
97                        fills_list.append(py_fill)?;
98                    }
99                    fills_dict.set_item(venue_order_id.to_string(), fills_list)?;
100                }
101            }
102
103            return Ok(PyTuple::new(
104                py,
105                [orders_dict.into_py_any(py)?, fills_dict.into_py_any(py)?],
106            )?
107            .into());
108        }
109    };
110
111    let position_report = position_reports
112        .first()
113        .ok_or_else(|| to_pyvalue_err("Position reports unexpectedly empty"))?;
114
115    let venue_side = match position_report.position_side {
116        PositionSideSpecified::Long => OrderSide::Buy,
117        PositionSideSpecified::Short => OrderSide::Sell,
118        PositionSideSpecified::Flat => OrderSide::Buy, // Default to Buy for flat
119    };
120
121    let venue_position = VenuePositionSnapshot {
122        side: venue_side,
123        qty: position_report.quantity.into(),
124        avg_px: position_report.avg_px_open.unwrap_or(Decimal::ZERO),
125    };
126
127    // Extract fills for this instrument and convert to FillSnapshot
128    let mut fill_snapshots = Vec::new();
129    let mut fill_map: AHashMap<VenueOrderId, Vec<FillReport>> = AHashMap::new();
130    let mut order_map: AHashMap<VenueOrderId, OrderStatusReport> = AHashMap::new();
131
132    // Seed order_map with ALL orders for this instrument (including those without fills)
133    for (venue_order_id, order) in mass_status_obj.order_reports() {
134        if order.instrument_id == instrument_id {
135            order_map.insert(venue_order_id, order.clone());
136        }
137    }
138
139    for (venue_order_id, fill_reports) in mass_status_obj.fill_reports() {
140        for fill in fill_reports {
141            if fill.instrument_id == instrument_id {
142                // Prefer order report side, fallback to fill's order_side
143                let side = mass_status_obj
144                    .order_reports()
145                    .get(&venue_order_id)
146                    .map_or(fill.order_side, |order| order.order_side);
147
148                fill_snapshots.push(FillSnapshot::new(
149                    fill.ts_event.as_u64(),
150                    side,
151                    fill.last_qty.into(),
152                    fill.last_px.into(),
153                    venue_order_id,
154                ));
155
156                // Store original fills
157                fill_map
158                    .entry(venue_order_id)
159                    .or_default()
160                    .push(fill.clone());
161            }
162        }
163    }
164
165    // Sort fills by timestamp to ensure chronological order
166    fill_snapshots.sort_by_key(|f| f.ts_event);
167
168    if fill_snapshots.is_empty() {
169        // Return original orders and fills if no fills found
170        return py_tuple_from_reports(py, &order_map, &fill_map);
171    }
172
173    // Validate chronological order and check for duplicate timestamps
174    for window in fill_snapshots.windows(2) {
175        if window[0].ts_event == window[1].ts_event {
176            log::debug!(
177                "Duplicate timestamp detected in fills: {} for orders {} and {}",
178                window[0].ts_event,
179                window[0].venue_order_id,
180                window[1].venue_order_id
181            );
182        }
183    }
184
185    // Parse tolerance
186    let tol = if let Some(tol_str) = tolerance {
187        Decimal::from_str_exact(&tol_str).map_err(to_pyvalue_err)?
188    } else {
189        DEFAULT_TOLERANCE
190    };
191
192    let result =
193        adjust_fills_for_partial_window(&fill_snapshots, &venue_position, &instrument_any, tol);
194
195    // Handle the result and create adjusted order and fill reports
196    let (adjusted_orders, adjusted_fills) = match result {
197        FillAdjustmentResult::NoAdjustment => {
198            // Return original orders and fills
199            (order_map, fill_map)
200        }
201        FillAdjustmentResult::AddSyntheticOpening {
202            synthetic_fill,
203            existing_fills: _,
204        } => {
205            // Create synthetic venue_order_id
206            let synthetic_venue_order_id = create_synthetic_venue_order_id(synthetic_fill.ts_event);
207
208            // Create synthetic order and fill
209            let synthetic_order = create_synthetic_order_report(
210                &synthetic_fill,
211                account_id,
212                instrument_id,
213                &instrument_any,
214                synthetic_venue_order_id,
215            )?;
216            let synthetic_fill_report = create_synthetic_fill_report(
217                &synthetic_fill,
218                account_id,
219                instrument_id,
220                &instrument_any,
221                synthetic_venue_order_id,
222            )?;
223
224            let mut adjusted_fills = fill_map;
225            adjusted_fills
226                .entry(synthetic_venue_order_id)
227                .or_default()
228                .insert(0, synthetic_fill_report);
229
230            let mut adjusted_orders = order_map;
231            adjusted_orders.insert(synthetic_venue_order_id, synthetic_order);
232
233            (adjusted_orders, adjusted_fills)
234        }
235        FillAdjustmentResult::ReplaceCurrentLifecycle {
236            synthetic_fill,
237            first_venue_order_id,
238        } => {
239            // Reuse the real venue_order_id from the first fill to maintain identity
240            // This ensures downstream reconciliation can match the synthetic report to the live order
241
242            // Clone and update the original order report if it exists, otherwise create new
243            let synthetic_order = if let Some(original_order) = order_map.get(&first_venue_order_id)
244            {
245                // Clone the original order to preserve client_order_id and other metadata
246                let mut updated_order = original_order.clone();
247
248                // Update filled quantity and status to reflect the synthetic fill
249                let qty_f64 = synthetic_fill
250                    .qty
251                    .to_f64()
252                    .ok_or_else(|| to_pyvalue_err("Failed to convert quantity to f64"))?;
253                let order_qty = Quantity::new(qty_f64, instrument_any.size_precision());
254
255                updated_order.quantity = order_qty;
256                updated_order.filled_qty = order_qty;
257                updated_order.order_status = OrderStatus::Filled;
258                updated_order.ts_last = UnixNanos::from(synthetic_fill.ts_event);
259
260                updated_order
261            } else {
262                // Fallback: create new synthetic order if original not found
263                create_synthetic_order_report(
264                    &synthetic_fill,
265                    account_id,
266                    instrument_id,
267                    &instrument_any,
268                    first_venue_order_id,
269                )?
270            };
271
272            let synthetic_fill_report = create_synthetic_fill_report(
273                &synthetic_fill,
274                account_id,
275                instrument_id,
276                &instrument_any,
277                first_venue_order_id,
278            )?;
279
280            // Return ONLY the synthetic order and fill using the real venue_order_id
281            let mut adjusted_orders = AHashMap::new();
282            adjusted_orders.insert(first_venue_order_id, synthetic_order);
283
284            let mut adjusted_fills = AHashMap::new();
285            adjusted_fills.insert(first_venue_order_id, vec![synthetic_fill_report]);
286
287            (adjusted_orders, adjusted_fills)
288        }
289        FillAdjustmentResult::FilterToCurrentLifecycle {
290            last_zero_crossing_ts,
291            current_lifecycle_fills: _,
292        } => {
293            // Filter fills to only those AFTER last zero-crossing
294            let mut result_fills = AHashMap::new();
295            let mut result_orders = AHashMap::new();
296
297            // Track which orders had fills in the original fill_map
298            let orders_with_fills: AHashSet<VenueOrderId> = fill_map.keys().copied().collect();
299
300            // First, process orders that have fills
301            for (venue_order_id, fills) in fill_map {
302                let filtered: Vec<FillReport> = fills
303                    .into_iter()
304                    .filter(|f| f.ts_event.as_u64() > last_zero_crossing_ts)
305                    .collect();
306                if !filtered.is_empty() {
307                    result_fills.insert(venue_order_id, filtered);
308                    // Keep order report if fills were kept
309                    if let Some(order) = order_map.get(&venue_order_id) {
310                        result_orders.insert(venue_order_id, order.clone());
311                    }
312                }
313            }
314
315            // Also keep orders that have NO fills at all in the original fill_map AND are still working
316            // These are live orders that were never filled, submitted either before or after the zero-crossing
317            // Do NOT re-add orders that had fills but were filtered out (those are from previous lifecycles)
318            // Do NOT re-add terminal orders (FILLED, CANCELED, etc.) that never had fills reported
319            for (venue_order_id, order) in &order_map {
320                let is_closed = matches!(
321                    order.order_status,
322                    OrderStatus::Denied
323                        | OrderStatus::Rejected
324                        | OrderStatus::Canceled
325                        | OrderStatus::Expired
326                        | OrderStatus::Filled
327                );
328                if !orders_with_fills.contains(venue_order_id) && !is_closed {
329                    result_orders.insert(*venue_order_id, order.clone());
330                }
331            }
332
333            (result_orders, result_fills)
334        }
335    };
336
337    py_tuple_from_reports(py, &adjusted_orders, &adjusted_fills)
338}
339
340/// Create a synthetic VenueOrderId using timestamp and UUID suffix.
341fn create_synthetic_venue_order_id(ts_event: u64) -> VenueOrderId {
342    let uuid = UUID4::new();
343    // Use hex timestamp and first 8 chars of UUID for uniqueness while keeping it short
344    let uuid_str = uuid.to_string();
345    let uuid_suffix = &uuid_str[..8];
346    let venue_order_id_value = format!("S-{ts_event:x}-{uuid_suffix}");
347    VenueOrderId::new(&venue_order_id_value)
348}
349
350/// Create a synthetic OrderStatusReport from a FillSnapshot.
351fn create_synthetic_order_report(
352    fill: &FillSnapshot,
353    account_id: AccountId,
354    instrument_id: InstrumentId,
355    instrument: &InstrumentAny,
356    venue_order_id: VenueOrderId,
357) -> PyResult<OrderStatusReport> {
358    let qty_f64 = fill
359        .qty
360        .to_f64()
361        .ok_or_else(|| to_pyvalue_err("Failed to convert quantity to f64"))?;
362    let order_qty = Quantity::new(qty_f64, instrument.size_precision());
363
364    Ok(OrderStatusReport::new(
365        account_id,
366        instrument_id,
367        None, // client_order_id
368        venue_order_id,
369        fill.side,
370        OrderType::Market,
371        TimeInForce::Gtc,
372        OrderStatus::Filled,
373        order_qty,
374        order_qty, // filled_qty = order_qty (fully filled)
375        UnixNanos::from(fill.ts_event),
376        UnixNanos::from(fill.ts_event),
377        UnixNanos::from(fill.ts_event),
378        None, // report_id
379    ))
380}
381
382/// Create a synthetic FillReport from a FillSnapshot.
383fn create_synthetic_fill_report(
384    fill: &FillSnapshot,
385    account_id: AccountId,
386    instrument_id: InstrumentId,
387    instrument: &InstrumentAny,
388    venue_order_id: VenueOrderId,
389) -> PyResult<FillReport> {
390    let uuid = UUID4::new();
391    // Use hex timestamp and first 8 chars of UUID for uniqueness while keeping it short
392    let uuid_str = uuid.to_string();
393    let uuid_suffix = &uuid_str[..8];
394    let trade_id_value = format!("S-{:x}-{}", fill.ts_event, uuid_suffix);
395    let trade_id = TradeId::new(&trade_id_value);
396
397    let qty_f64 = fill
398        .qty
399        .to_f64()
400        .ok_or_else(|| to_pyvalue_err("Failed to convert quantity to f64"))?;
401    let px_f64 = fill
402        .px
403        .to_f64()
404        .ok_or_else(|| to_pyvalue_err("Failed to convert price to f64"))?;
405
406    Ok(FillReport::new(
407        account_id,
408        instrument_id,
409        venue_order_id,
410        trade_id,
411        fill.side,
412        Quantity::new(qty_f64, instrument.size_precision()),
413        Price::new(px_f64, instrument.price_precision()),
414        Money::new(0.0, instrument.quote_currency()),
415        LiquiditySide::NoLiquiditySide,
416        None, // client_order_id
417        None, // venue_position_id
418        fill.ts_event.into(),
419        fill.ts_event.into(),
420        None, // report_id
421    ))
422}
423
424/// Convert AHashMaps of orders and fills to Python tuple of dicts.
425fn py_tuple_from_reports(
426    py: Python<'_>,
427    order_map: &AHashMap<VenueOrderId, OrderStatusReport>,
428    fill_map: &AHashMap<VenueOrderId, Vec<FillReport>>,
429) -> PyResult<Py<PyTuple>> {
430    // Create order reports dict
431    let orders_dict = PyDict::new(py);
432
433    for (venue_order_id, order) in order_map {
434        orders_dict.set_item(venue_order_id.to_string(), order.clone().into_py_any(py)?)?;
435    }
436
437    let fills_dict = PyDict::new(py);
438
439    for (venue_order_id, fills) in fill_map {
440        let fills_list: Result<Vec<_>, _> =
441            fills.iter().map(|f| f.clone().into_py_any(py)).collect();
442        fills_dict.set_item(venue_order_id.to_string(), fills_list?)?;
443    }
444
445    Ok(PyTuple::new(
446        py,
447        [orders_dict.into_py_any(py)?, fills_dict.into_py_any(py)?],
448    )?
449    .into())
450}
451
452/// Calculate the price needed for a reconciliation order to achieve target position.
453///
454/// This is a pure function that calculates what price a fill would need to have
455/// to move from the current position state to the target position state with the
456/// correct average price, accounting for the netting simulation logic.
457///
458/// # Returns
459///
460/// Returns `Some(Decimal)` if a valid reconciliation price can be calculated, `None` otherwise.
461///
462/// # Notes
463///
464/// The function handles three scenarios:
465/// 1. Flat to position: reconciliation_px = target_avg_px
466/// 2. Position flip (sign change): reconciliation_px = target_avg_px (due to value reset in simulation)
467/// 3. Accumulation/reduction: weighted average formula
468#[pyfunction(name = "calculate_reconciliation_price")]
469#[pyo3(signature = (current_position_qty, current_position_avg_px, target_position_qty, target_position_avg_px))]
470pub fn py_calculate_reconciliation_price(
471    current_position_qty: Decimal,
472    current_position_avg_px: Option<Decimal>,
473    target_position_qty: Decimal,
474    target_position_avg_px: Option<Decimal>,
475) -> Option<Decimal> {
476    calculate_reconciliation_price(
477        current_position_qty,
478        current_position_avg_px,
479        target_position_qty,
480        target_position_avg_px,
481    )
482}