nautilus_live/python/
reconciliation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for reconciliation functions.
17
18use std::collections::HashMap;
19
20use nautilus_core::{UUID4, UnixNanos, python::to_pyvalue_err};
21use nautilus_model::{
22    enums::{LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified, TimeInForce},
23    identifiers::{AccountId, InstrumentId, TradeId, VenueOrderId},
24    instruments::{Instrument, InstrumentAny},
25    python::instruments::pyobject_to_instrument_any,
26    reports::{ExecutionMassStatus, FillReport, OrderStatusReport},
27    types::{Money, Price, Quantity},
28};
29use pyo3::{
30    IntoPyObjectExt,
31    prelude::*,
32    types::{PyDict, PyList, PyTuple},
33};
34use rust_decimal::{Decimal, prelude::ToPrimitive};
35
36use crate::execution::reconciliation::{
37    FillAdjustmentResult, FillSnapshot, VenuePositionSnapshot, adjust_fills_for_partial_window,
38    calculate_reconciliation_price,
39};
40
41const DEFAULT_TOLERANCE: Decimal = Decimal::from_parts(1, 0, 0, false, 4); // 0.0001
42
43/// Python wrapper for adjust_fills_for_partial_window.
44///
45/// Takes ExecutionMassStatus and Instrument, performs all adjustment logic in Rust,
46/// and returns tuple of (order_reports, fill_reports) ready for reconciliation.
47///
48/// # Returns
49///
50/// Tuple of (Dict[VenueOrderId, OrderStatusReport], Dict[VenueOrderId, List[FillReport]])
51///
52/// # Errors
53///
54/// This function returns an error if:
55/// - The instrument conversion fails.
56/// - Any decimal or tolerance parsing fails.
57#[pyfunction(name = "adjust_fills_for_partial_window")]
58#[pyo3(signature = (mass_status, instrument, tolerance=None))]
59pub fn py_adjust_fills_for_partial_window(
60    py: Python<'_>,
61    mass_status: &Bound<'_, PyAny>,
62    instrument: Py<PyAny>,
63    tolerance: Option<String>,
64) -> PyResult<Py<PyTuple>> {
65    let instrument_any = pyobject_to_instrument_any(py, instrument)?;
66    let instrument_id = instrument_any.id();
67    let mass_status_obj: ExecutionMassStatus = mass_status.extract()?;
68    let account_id = mass_status_obj.account_id;
69
70    let all_position_reports = mass_status_obj.position_reports();
71
72    let position_reports = match all_position_reports.get(&instrument_id) {
73        Some(reports) if !reports.is_empty() => reports,
74        _ => {
75            // No position report - return all orders and fills for this instrument unchanged
76            let all_orders = mass_status_obj.order_reports();
77            let all_fills = mass_status_obj.fill_reports();
78
79            let orders_dict = PyDict::new(py);
80            let fills_dict = PyDict::new(py);
81
82            // Add all orders for this instrument
83            for (venue_order_id, order) in all_orders.iter() {
84                if order.instrument_id == instrument_id {
85                    orders_dict
86                        .set_item(venue_order_id.to_string(), order.clone().into_py_any(py)?)?;
87                }
88            }
89
90            // Add all fills for this instrument
91            for (venue_order_id, fills) in all_fills.iter() {
92                if let Some(first_fill) = fills.first()
93                    && first_fill.instrument_id == instrument_id
94                {
95                    let fills_list = PyList::empty(py);
96                    for fill in fills {
97                        let py_fill = Py::new(py, fill.clone())?;
98                        fills_list.append(py_fill)?;
99                    }
100                    fills_dict.set_item(venue_order_id.to_string(), fills_list)?;
101                }
102            }
103
104            return Ok(PyTuple::new(
105                py,
106                [orders_dict.into_py_any(py)?, fills_dict.into_py_any(py)?],
107            )?
108            .into());
109        }
110    };
111
112    let position_report = position_reports
113        .first()
114        .ok_or_else(|| to_pyvalue_err("Position reports unexpectedly empty"))?;
115
116    let venue_side = match position_report.position_side {
117        PositionSideSpecified::Long => OrderSide::Buy,
118        PositionSideSpecified::Short => OrderSide::Sell,
119        PositionSideSpecified::Flat => OrderSide::Buy, // Default to Buy for flat
120    };
121
122    let venue_position = VenuePositionSnapshot {
123        side: venue_side,
124        qty: position_report.quantity.into(),
125        avg_px: position_report.avg_px_open.unwrap_or(Decimal::ZERO),
126    };
127
128    // Extract fills for this instrument and convert to FillSnapshot
129    let mut fill_snapshots = Vec::new();
130    let mut fill_map: HashMap<VenueOrderId, Vec<FillReport>> = HashMap::new();
131    let mut order_map: HashMap<VenueOrderId, OrderStatusReport> = HashMap::new();
132
133    // Seed order_map with ALL orders for this instrument (including those without fills)
134    for (venue_order_id, order) in mass_status_obj.order_reports() {
135        if order.instrument_id == instrument_id {
136            order_map.insert(venue_order_id, order.clone());
137        }
138    }
139
140    for (venue_order_id, fill_reports) in mass_status_obj.fill_reports() {
141        for fill in fill_reports {
142            if fill.instrument_id == instrument_id {
143                // Prefer order report side, fallback to fill's order_side
144                let side = mass_status_obj
145                    .order_reports()
146                    .get(&venue_order_id)
147                    .map_or(fill.order_side, |order| order.order_side);
148
149                fill_snapshots.push(FillSnapshot::new(
150                    fill.ts_event.as_u64(),
151                    side,
152                    fill.last_qty.into(),
153                    fill.last_px.into(),
154                    venue_order_id,
155                ));
156
157                // Store original fills
158                fill_map
159                    .entry(venue_order_id)
160                    .or_default()
161                    .push(fill.clone());
162            }
163        }
164    }
165
166    // Sort fills by timestamp to ensure chronological order
167    fill_snapshots.sort_by_key(|f| f.ts_event);
168
169    if fill_snapshots.is_empty() {
170        // Return original orders and fills if no fills found
171        return py_tuple_from_reports(py, &order_map, &fill_map);
172    }
173
174    // Validate chronological order and check for duplicate timestamps
175    for window in fill_snapshots.windows(2) {
176        if window[0].ts_event == window[1].ts_event {
177            log::debug!(
178                "Duplicate timestamp detected in fills: {} for orders {} and {}",
179                window[0].ts_event,
180                window[0].venue_order_id,
181                window[1].venue_order_id
182            );
183        }
184    }
185
186    // Parse tolerance
187    let tol = if let Some(tol_str) = tolerance {
188        Decimal::from_str_exact(&tol_str).map_err(to_pyvalue_err)?
189    } else {
190        DEFAULT_TOLERANCE
191    };
192
193    let result =
194        adjust_fills_for_partial_window(&fill_snapshots, &venue_position, &instrument_any, tol);
195
196    // Handle the result and create adjusted order and fill reports
197    let (adjusted_orders, adjusted_fills) = match result {
198        FillAdjustmentResult::NoAdjustment => {
199            // Return original orders and fills
200            (order_map, fill_map)
201        }
202        FillAdjustmentResult::AddSyntheticOpening {
203            synthetic_fill,
204            existing_fills: _,
205        } => {
206            // Create synthetic venue_order_id
207            let synthetic_venue_order_id = create_synthetic_venue_order_id(synthetic_fill.ts_event);
208
209            // Create synthetic order and fill
210            let synthetic_order = create_synthetic_order_report(
211                &synthetic_fill,
212                account_id,
213                instrument_id,
214                &instrument_any,
215                synthetic_venue_order_id,
216            )?;
217            let synthetic_fill_report = create_synthetic_fill_report(
218                &synthetic_fill,
219                account_id,
220                instrument_id,
221                &instrument_any,
222                synthetic_venue_order_id,
223            )?;
224
225            let mut adjusted_fills = fill_map;
226            adjusted_fills
227                .entry(synthetic_venue_order_id)
228                .or_default()
229                .insert(0, synthetic_fill_report);
230
231            let mut adjusted_orders = order_map;
232            adjusted_orders.insert(synthetic_venue_order_id, synthetic_order);
233
234            (adjusted_orders, adjusted_fills)
235        }
236        FillAdjustmentResult::ReplaceCurrentLifecycle {
237            synthetic_fill,
238            first_venue_order_id,
239        } => {
240            // Reuse the real venue_order_id from the first fill to maintain identity
241            // This ensures downstream reconciliation can match the synthetic report to the live order
242
243            // Clone and update the original order report if it exists, otherwise create new
244            let synthetic_order = if let Some(original_order) = order_map.get(&first_venue_order_id)
245            {
246                // Clone the original order to preserve client_order_id and other metadata
247                let mut updated_order = original_order.clone();
248
249                // Update filled quantity and status to reflect the synthetic fill
250                let qty_f64 = synthetic_fill
251                    .qty
252                    .to_f64()
253                    .ok_or_else(|| to_pyvalue_err("Failed to convert quantity to f64"))?;
254                let order_qty = Quantity::new(qty_f64, instrument_any.size_precision());
255
256                updated_order.quantity = order_qty;
257                updated_order.filled_qty = order_qty;
258                updated_order.order_status = OrderStatus::Filled;
259                updated_order.ts_last = UnixNanos::from(synthetic_fill.ts_event);
260
261                updated_order
262            } else {
263                // Fallback: create new synthetic order if original not found
264                create_synthetic_order_report(
265                    &synthetic_fill,
266                    account_id,
267                    instrument_id,
268                    &instrument_any,
269                    first_venue_order_id,
270                )?
271            };
272
273            let synthetic_fill_report = create_synthetic_fill_report(
274                &synthetic_fill,
275                account_id,
276                instrument_id,
277                &instrument_any,
278                first_venue_order_id,
279            )?;
280
281            // Return ONLY the synthetic order and fill using the real venue_order_id
282            let mut adjusted_orders = HashMap::new();
283            adjusted_orders.insert(first_venue_order_id, synthetic_order);
284
285            let mut adjusted_fills = HashMap::new();
286            adjusted_fills.insert(first_venue_order_id, vec![synthetic_fill_report]);
287
288            (adjusted_orders, adjusted_fills)
289        }
290        FillAdjustmentResult::FilterToCurrentLifecycle {
291            last_zero_crossing_ts,
292            current_lifecycle_fills: _,
293        } => {
294            // Filter fills to only those AFTER last zero-crossing
295            let mut result_fills = HashMap::new();
296            let mut result_orders = HashMap::new();
297
298            // Track which orders had fills in the original fill_map
299            let orders_with_fills: std::collections::HashSet<VenueOrderId> =
300                fill_map.keys().copied().collect();
301
302            // First, process orders that have fills
303            for (venue_order_id, fills) in fill_map {
304                let filtered: Vec<FillReport> = fills
305                    .into_iter()
306                    .filter(|f| f.ts_event.as_u64() > last_zero_crossing_ts)
307                    .collect();
308                if !filtered.is_empty() {
309                    result_fills.insert(venue_order_id, filtered);
310                    // Keep order report if fills were kept
311                    if let Some(order) = order_map.get(&venue_order_id) {
312                        result_orders.insert(venue_order_id, order.clone());
313                    }
314                }
315            }
316
317            // Also keep orders that have NO fills at all in the original fill_map AND are still working
318            // These are live orders that were never filled, submitted either before or after the zero-crossing
319            // Do NOT re-add orders that had fills but were filtered out (those are from previous lifecycles)
320            // Do NOT re-add terminal orders (FILLED, CANCELED, etc.) that never had fills reported
321            for (venue_order_id, order) in &order_map {
322                let is_closed = matches!(
323                    order.order_status,
324                    OrderStatus::Denied
325                        | OrderStatus::Rejected
326                        | OrderStatus::Canceled
327                        | OrderStatus::Expired
328                        | OrderStatus::Filled
329                );
330                if !orders_with_fills.contains(venue_order_id) && !is_closed {
331                    result_orders.insert(*venue_order_id, order.clone());
332                }
333            }
334
335            (result_orders, result_fills)
336        }
337    };
338
339    py_tuple_from_reports(py, &adjusted_orders, &adjusted_fills)
340}
341
342/// Create a synthetic VenueOrderId using timestamp and UUID suffix.
343fn create_synthetic_venue_order_id(ts_event: u64) -> VenueOrderId {
344    let uuid = UUID4::new();
345    // Use hex timestamp and first 8 chars of UUID for uniqueness while keeping it short
346    let uuid_str = uuid.to_string();
347    let uuid_suffix = &uuid_str[..8];
348    let venue_order_id_value = format!("S-{:x}-{}", ts_event, uuid_suffix);
349    VenueOrderId::new(&venue_order_id_value)
350}
351
352/// Create a synthetic OrderStatusReport from a FillSnapshot.
353fn create_synthetic_order_report(
354    fill: &FillSnapshot,
355    account_id: AccountId,
356    instrument_id: InstrumentId,
357    instrument: &InstrumentAny,
358    venue_order_id: VenueOrderId,
359) -> PyResult<OrderStatusReport> {
360    let qty_f64 = fill
361        .qty
362        .to_f64()
363        .ok_or_else(|| to_pyvalue_err("Failed to convert quantity to f64"))?;
364    let order_qty = Quantity::new(qty_f64, instrument.size_precision());
365
366    Ok(OrderStatusReport::new(
367        account_id,
368        instrument_id,
369        None, // client_order_id
370        venue_order_id,
371        fill.side,
372        OrderType::Market,
373        TimeInForce::Gtc,
374        OrderStatus::Filled,
375        order_qty,
376        order_qty, // filled_qty = order_qty (fully filled)
377        UnixNanos::from(fill.ts_event),
378        UnixNanos::from(fill.ts_event),
379        UnixNanos::from(fill.ts_event),
380        None, // report_id
381    ))
382}
383
384/// Create a synthetic FillReport from a FillSnapshot.
385fn create_synthetic_fill_report(
386    fill: &FillSnapshot,
387    account_id: AccountId,
388    instrument_id: InstrumentId,
389    instrument: &InstrumentAny,
390    venue_order_id: VenueOrderId,
391) -> PyResult<FillReport> {
392    let uuid = UUID4::new();
393    // Use hex timestamp and first 8 chars of UUID for uniqueness while keeping it short
394    let uuid_str = uuid.to_string();
395    let uuid_suffix = &uuid_str[..8];
396    let trade_id_value = format!("S-{:x}-{}", fill.ts_event, uuid_suffix);
397    let trade_id = TradeId::new(&trade_id_value);
398
399    let qty_f64 = fill
400        .qty
401        .to_f64()
402        .ok_or_else(|| to_pyvalue_err("Failed to convert quantity to f64"))?;
403    let px_f64 = fill
404        .px
405        .to_f64()
406        .ok_or_else(|| to_pyvalue_err("Failed to convert price to f64"))?;
407
408    Ok(FillReport::new(
409        account_id,
410        instrument_id,
411        venue_order_id,
412        trade_id,
413        fill.side,
414        Quantity::new(qty_f64, instrument.size_precision()),
415        Price::new(px_f64, instrument.price_precision()),
416        Money::new(0.0, instrument.quote_currency()),
417        LiquiditySide::NoLiquiditySide,
418        None, // client_order_id
419        None, // venue_position_id
420        fill.ts_event.into(),
421        fill.ts_event.into(),
422        None, // report_id
423    ))
424}
425
426/// Convert HashMaps of orders and fills to Python tuple of dicts.
427fn py_tuple_from_reports(
428    py: Python<'_>,
429    order_map: &HashMap<VenueOrderId, OrderStatusReport>,
430    fill_map: &HashMap<VenueOrderId, Vec<FillReport>>,
431) -> PyResult<Py<PyTuple>> {
432    // Create order reports dict
433    let orders_dict = PyDict::new(py);
434
435    for (venue_order_id, order) in order_map {
436        orders_dict.set_item(venue_order_id.to_string(), order.clone().into_py_any(py)?)?;
437    }
438
439    let fills_dict = PyDict::new(py);
440
441    for (venue_order_id, fills) in fill_map {
442        let fills_list: Result<Vec<_>, _> =
443            fills.iter().map(|f| f.clone().into_py_any(py)).collect();
444        fills_dict.set_item(venue_order_id.to_string(), fills_list?)?;
445    }
446
447    Ok(PyTuple::new(
448        py,
449        [orders_dict.into_py_any(py)?, fills_dict.into_py_any(py)?],
450    )?
451    .into())
452}
453
454/// Calculate the price needed for a reconciliation order to achieve target position.
455///
456/// This is a pure function that calculates what price a fill would need to have
457/// to move from the current position state to the target position state with the
458/// correct average price, accounting for the netting simulation logic.
459///
460/// # Returns
461///
462/// Returns `Some(Decimal)` if a valid reconciliation price can be calculated, `None` otherwise.
463///
464/// # Notes
465///
466/// The function handles three scenarios:
467/// 1. Flat to position: reconciliation_px = target_avg_px
468/// 2. Position flip (sign change): reconciliation_px = target_avg_px (due to value reset in simulation)
469/// 3. Accumulation/reduction: weighted average formula
470#[pyfunction(name = "calculate_reconciliation_price")]
471#[pyo3(signature = (current_position_qty, current_position_avg_px, target_position_qty, target_position_avg_px))]
472pub fn py_calculate_reconciliation_price(
473    current_position_qty: Decimal,
474    current_position_avg_px: Option<Decimal>,
475    target_position_qty: Decimal,
476    target_position_avg_px: Option<Decimal>,
477) -> Option<Decimal> {
478    calculate_reconciliation_price(
479        current_position_qty,
480        current_position_avg_px,
481        target_position_qty,
482        target_position_avg_px,
483    )
484}