Skip to main content

nautilus_backtest/
engine.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The core `BacktestEngine` for backtesting on historical data.
17
18use std::{any::Any, cell::RefCell, fmt::Debug, rc::Rc, sync::Arc};
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_analysis::analyzer::PortfolioAnalyzer;
22use nautilus_common::{
23    actor::DataActor,
24    cache::Cache,
25    clock::{Clock, TestClock},
26    component::Component,
27    enums::LogColor,
28    log_info,
29    logging::{
30        logging_clock_set_realtime_mode, logging_clock_set_static_mode,
31        logging_clock_set_static_time,
32    },
33    runner::{
34        SyncDataCommandSender, SyncTradingCommandSender, data_cmd_queue_is_empty,
35        drain_data_cmd_queue, drain_trading_cmd_queue, init_data_cmd_sender, init_exec_cmd_sender,
36        trading_cmd_queue_is_empty,
37    },
38};
39use nautilus_core::{UUID4, UnixNanos, datetime::unix_nanos_to_iso8601, formatting::Separable};
40use nautilus_data::client::DataClientAdapter;
41use nautilus_execution::models::{fee::FeeModelAny, fill::FillModelAny, latency::LatencyModel};
42use nautilus_model::{
43    accounts::{Account, AccountAny},
44    data::{Data, HasTsInit},
45    enums::{AccountType, BookType, OmsType},
46    identifiers::{AccountId, ClientId, InstrumentId, Venue},
47    instruments::{Instrument, InstrumentAny},
48    orders::Order,
49    position::Position,
50    types::{Currency, Money},
51};
52use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
53use nautilus_trading::strategy::Strategy;
54use rust_decimal::Decimal;
55
56use crate::{
57    accumulator::TimeEventAccumulator, config::BacktestEngineConfig,
58    data_client::BacktestDataClient, data_iterator::BacktestDataIterator,
59    exchange::SimulatedExchange, execution_client::BacktestExecutionClient,
60    modules::SimulationModule,
61};
62
63/// Results from a completed backtest run.
64#[derive(Debug)]
65pub struct BacktestResult {
66    pub trader_id: String,
67    pub machine_id: String,
68    pub instance_id: UUID4,
69    pub run_config_id: Option<String>,
70    pub run_id: Option<UUID4>,
71    pub run_started: Option<UnixNanos>,
72    pub run_finished: Option<UnixNanos>,
73    pub backtest_start: Option<UnixNanos>,
74    pub backtest_end: Option<UnixNanos>,
75    pub elapsed_time_secs: f64,
76    pub iterations: usize,
77    pub total_events: usize,
78    pub total_orders: usize,
79    pub total_positions: usize,
80    pub stats_pnls: AHashMap<String, AHashMap<String, f64>>,
81    pub stats_returns: AHashMap<String, f64>,
82    pub stats_general: AHashMap<String, f64>,
83}
84
85/// Core backtesting engine for running event-driven strategy backtests on historical data.
86///
87/// The `BacktestEngine` provides a high-fidelity simulation environment that processes
88/// historical market data chronologically through an event-driven architecture. It maintains
89/// simulated exchanges with realistic order matching and execution, allowing strategies
90/// to be tested exactly as they would run in live trading:
91///
92/// - Event-driven data replay with configurable latency models.
93/// - Multi-venue and multi-asset support.
94/// - Realistic order matching and execution simulation.
95/// - Strategy and portfolio performance analysis.
96/// - Seamless transition from backtesting to live trading.
97pub struct BacktestEngine {
98    instance_id: UUID4,
99    config: BacktestEngineConfig,
100    kernel: NautilusKernel,
101    accumulator: TimeEventAccumulator,
102    run_config_id: Option<String>,
103    run_id: Option<UUID4>,
104    venues: AHashMap<Venue, Rc<RefCell<SimulatedExchange>>>,
105    exec_clients: Vec<BacktestExecutionClient>,
106    has_data: AHashSet<InstrumentId>,
107    has_book_data: AHashSet<InstrumentId>,
108    data_iterator: BacktestDataIterator,
109    data_len: usize,
110    data_stream_counter: usize,
111    ts_first: Option<UnixNanos>,
112    ts_last_data: Option<UnixNanos>,
113    iteration: usize,
114    force_stop: bool,
115    last_ns: UnixNanos,
116    end_ns: UnixNanos,
117    run_started: Option<UnixNanos>,
118    run_finished: Option<UnixNanos>,
119    backtest_start: Option<UnixNanos>,
120    backtest_end: Option<UnixNanos>,
121}
122
123impl Debug for BacktestEngine {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        f.debug_struct(stringify!(BacktestEngine))
126            .field("instance_id", &self.instance_id)
127            .field("run_config_id", &self.run_config_id)
128            .field("run_id", &self.run_id)
129            .finish()
130    }
131}
132
133impl BacktestEngine {
134    /// Create a new [`BacktestEngine`] instance.
135    ///
136    /// # Errors
137    ///
138    /// Returns an error if the core `NautilusKernel` fails to initialize.
139    pub fn new(config: BacktestEngineConfig) -> anyhow::Result<Self> {
140        let kernel = NautilusKernel::new("BacktestEngine".to_string(), config.clone())?;
141        Ok(Self {
142            instance_id: kernel.instance_id,
143            config,
144            accumulator: TimeEventAccumulator::new(),
145            kernel,
146            run_config_id: None,
147            run_id: None,
148            venues: AHashMap::new(),
149            exec_clients: Vec::new(),
150            has_data: AHashSet::new(),
151            has_book_data: AHashSet::new(),
152            data_iterator: BacktestDataIterator::new(),
153            data_len: 0,
154            data_stream_counter: 0,
155            ts_first: None,
156            ts_last_data: None,
157            iteration: 0,
158            force_stop: false,
159            last_ns: UnixNanos::default(),
160            end_ns: UnixNanos::default(),
161            run_started: None,
162            run_finished: None,
163            backtest_start: None,
164            backtest_end: None,
165        })
166    }
167
168    /// # Errors
169    ///
170    /// Returns an error if initializing the simulated exchange for the venue fails.
171    #[allow(clippy::too_many_arguments)]
172    pub fn add_venue(
173        &mut self,
174        venue: Venue,
175        oms_type: OmsType,
176        account_type: AccountType,
177        book_type: BookType,
178        starting_balances: Vec<Money>,
179        base_currency: Option<Currency>,
180        default_leverage: Option<Decimal>,
181        leverages: AHashMap<InstrumentId, Decimal>,
182        modules: Vec<Box<dyn SimulationModule>>,
183        fill_model: FillModelAny,
184        fee_model: FeeModelAny,
185        latency_model: Option<Box<dyn LatencyModel>>,
186        routing: Option<bool>,
187        reject_stop_orders: Option<bool>,
188        support_gtd_orders: Option<bool>,
189        support_contingent_orders: Option<bool>,
190        use_position_ids: Option<bool>,
191        use_random_ids: Option<bool>,
192        use_reduce_only: Option<bool>,
193        use_message_queue: Option<bool>,
194        use_market_order_acks: Option<bool>,
195        bar_execution: Option<bool>,
196        bar_adaptive_high_low_ordering: Option<bool>,
197        trade_execution: Option<bool>,
198        liquidity_consumption: Option<bool>,
199        allow_cash_borrowing: Option<bool>,
200        frozen_account: Option<bool>,
201        price_protection_points: Option<u32>,
202    ) -> anyhow::Result<()> {
203        let default_leverage: Decimal = default_leverage.unwrap_or_else(|| {
204            if account_type == AccountType::Margin {
205                Decimal::from(10)
206            } else {
207                Decimal::from(0)
208            }
209        });
210
211        let exchange = SimulatedExchange::new(
212            venue,
213            oms_type,
214            account_type,
215            starting_balances,
216            base_currency,
217            default_leverage,
218            leverages,
219            modules,
220            self.kernel.cache.clone(),
221            self.kernel.clock.clone(),
222            fill_model,
223            fee_model,
224            book_type,
225            latency_model,
226            bar_execution,
227            bar_adaptive_high_low_ordering,
228            trade_execution,
229            liquidity_consumption,
230            reject_stop_orders,
231            support_gtd_orders,
232            support_contingent_orders,
233            use_position_ids,
234            use_random_ids,
235            use_reduce_only,
236            use_message_queue,
237            use_market_order_acks,
238            allow_cash_borrowing,
239            frozen_account,
240            price_protection_points,
241        )?;
242        let exchange = Rc::new(RefCell::new(exchange));
243        self.venues.insert(venue, exchange.clone());
244
245        let account_id = AccountId::from(format!("{venue}-001").as_str());
246
247        let exec_client = BacktestExecutionClient::new(
248            self.config.trader_id(),
249            account_id,
250            exchange.clone(),
251            self.kernel.cache.clone(),
252            self.kernel.clock.clone(),
253            routing,
254            frozen_account,
255        );
256
257        exchange
258            .borrow_mut()
259            .register_client(Rc::new(exec_client.clone()));
260
261        self.exec_clients.push(exec_client.clone());
262
263        self.kernel
264            .exec_engine
265            .borrow_mut()
266            .register_client(Box::new(exec_client))?;
267
268        log::info!("Adding exchange {venue} to engine");
269
270        Ok(())
271    }
272
273    pub fn change_fill_model(&mut self, venue: Venue, fill_model: FillModelAny) {
274        if let Some(exchange) = self.venues.get_mut(&venue) {
275            exchange.borrow_mut().set_fill_model(fill_model);
276        } else {
277            log::warn!(
278                "BacktestEngine::change_fill_model called for unknown venue {venue}, ignoring"
279            );
280        }
281    }
282
283    /// Adds an instrument to the backtest engine for the specified venue.
284    ///
285    /// # Errors
286    ///
287    /// Returns an error if:
288    /// - The instrument's associated venue has not been added via `add_venue`.
289    /// - Attempting to add a `CurrencyPair` instrument for a single-currency CASH account.
290    ///
291    /// # Panics
292    ///
293    /// Panics if adding the instrument to the simulated exchange fails.
294    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
295        let instrument_id = instrument.id();
296        if let Some(exchange) = self.venues.get_mut(&instrument.id().venue) {
297            if matches!(instrument, InstrumentAny::CurrencyPair(_))
298                && exchange.borrow().account_type != AccountType::Margin
299                && exchange.borrow().base_currency.is_some()
300            {
301                anyhow::bail!(
302                    "Cannot add a `CurrencyPair` instrument {instrument_id} for a venue with a single-currency CASH account"
303                )
304            }
305            exchange
306                .borrow_mut()
307                .add_instrument(instrument.clone())
308                .unwrap();
309        } else {
310            anyhow::bail!(
311                "Cannot add an `Instrument` object without first adding its associated venue {}",
312                instrument.id().venue
313            )
314        }
315
316        self.add_market_data_client_if_not_exists(instrument.id().venue);
317
318        self.kernel
319            .data_engine
320            .borrow_mut()
321            .process(&instrument as &dyn Any);
322        log::info!(
323            "Added instrument {} to exchange {}",
324            instrument_id,
325            instrument_id.venue
326        );
327        Ok(())
328    }
329
330    pub fn add_data(
331        &mut self,
332        data: Vec<Data>,
333        _client_id: Option<ClientId>,
334        validate: bool,
335        sort: bool,
336    ) {
337        if data.is_empty() {
338            log::warn!("add_data called with empty data slice – ignoring");
339            return;
340        }
341
342        let count = data.len();
343
344        let mut to_add = data;
345        if sort {
346            to_add.sort_by_key(HasTsInit::ts_init);
347        }
348
349        if validate {
350            for item in &to_add {
351                let instr_id = item.instrument_id();
352                self.has_data.insert(instr_id);
353
354                if item.is_order_book_data() {
355                    self.has_book_data.insert(instr_id);
356                }
357
358                self.add_market_data_client_if_not_exists(instr_id.venue);
359            }
360        }
361
362        // Track time bounds for start/end defaults
363        if let Some(first) = to_add.first() {
364            let ts = first.ts_init();
365            if self.ts_first.is_none_or(|t| ts < t) {
366                self.ts_first = Some(ts);
367            }
368        }
369        if let Some(last) = to_add.last() {
370            let ts = last.ts_init();
371            if self.ts_last_data.is_none_or(|t| ts > t) {
372                self.ts_last_data = Some(ts);
373            }
374        }
375
376        self.data_len += count;
377        let stream_name = format!("backtest_data_{}", self.data_stream_counter);
378        self.data_stream_counter += 1;
379        self.data_iterator.add_data(&stream_name, to_add, true);
380
381        log::info!(
382            "Added {count} data element{} to BacktestEngine ({} total)",
383            if count == 1 { "" } else { "s" },
384            self.data_len,
385        );
386    }
387
388    /// Adds a strategy to the backtest engine.
389    ///
390    /// # Errors
391    ///
392    /// Returns an error if the strategy is already registered or the trader is running.
393    pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
394    where
395        T: Strategy + Component + Debug + 'static,
396    {
397        self.kernel.trader.add_strategy(strategy)
398    }
399
400    /// Adds an actor to the backtest engine.
401    ///
402    /// # Errors
403    ///
404    /// Returns an error if the actor is already registered or the trader is running.
405    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
406    where
407        T: DataActor + Component + Debug + 'static,
408    {
409        self.kernel.trader.add_actor(actor)
410    }
411
412    /// Adds an execution algorithm to the backtest engine.
413    ///
414    /// # Errors
415    ///
416    /// Returns an error if the algorithm is already registered or the trader is running.
417    pub fn add_exec_algorithm<T>(&mut self, exec_algorithm: T) -> anyhow::Result<()>
418    where
419        T: DataActor + Component + Debug + 'static,
420    {
421        self.kernel.trader.add_exec_algorithm(exec_algorithm)
422    }
423
424    /// Run a backtest.
425    ///
426    /// Processes all data chronologically. When `streaming` is false (default),
427    /// finalizes the run via [`end`](Self::end). When `streaming` is true, the
428    /// run pauses without finalizing, allowing additional data to be loaded:
429    ///
430    /// 1. Add initial data and strategies
431    /// 2. Call `run(streaming=true)`
432    /// 3. Call `clear_data()`
433    /// 4. Add next batch of data
434    /// 5. Repeat steps 2-4, then call `run(streaming=false)` or `end()` for the final batch
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if the backtest encounters an unrecoverable state.
439    pub fn run(
440        &mut self,
441        start: Option<UnixNanos>,
442        end: Option<UnixNanos>,
443        run_config_id: Option<String>,
444        streaming: bool,
445    ) -> anyhow::Result<()> {
446        self.run_impl(start, end, run_config_id)?;
447
448        if !streaming {
449            self.end();
450        }
451
452        Ok(())
453    }
454
455    fn run_impl(
456        &mut self,
457        start: Option<UnixNanos>,
458        end: Option<UnixNanos>,
459        run_config_id: Option<String>,
460    ) -> anyhow::Result<()> {
461        // Determine time boundaries
462        let start_ns = start.unwrap_or_else(|| self.ts_first.unwrap_or_default());
463        let end_ns = end.unwrap_or_else(|| {
464            self.ts_last_data
465                .unwrap_or(UnixNanos::from(4_102_444_800_000_000_000u64))
466        });
467        anyhow::ensure!(start_ns <= end_ns, "start was > end");
468        self.end_ns = end_ns;
469        self.last_ns = start_ns;
470
471        // Set all component clocks to start
472        let clocks = self.collect_all_clocks();
473        Self::set_all_clocks_time(&clocks, start_ns);
474
475        // First-iteration initialization
476        if self.iteration == 0 {
477            self.run_config_id = run_config_id;
478            self.run_id = Some(UUID4::new());
479            self.run_started = Some(UnixNanos::from(std::time::SystemTime::now()));
480            self.backtest_start = Some(start_ns);
481
482            // Initialize exchange accounts
483            for exchange in self.venues.values() {
484                exchange.borrow_mut().initialize_account();
485            }
486
487            // Re-set clocks after account init
488            Self::set_all_clocks_time(&clocks, start_ns);
489
490            // Reset force stop flag
491            self.force_stop = false;
492
493            // Initialize sync command senders (once per thread)
494            Self::init_command_senders();
495
496            // Set logging to static clock mode for deterministic timestamps
497            logging_clock_set_static_mode();
498            logging_clock_set_static_time(start_ns.as_u64());
499
500            // Start kernel (engines + trader init + clients)
501            self.kernel.start();
502            self.kernel.start_trader();
503
504            self.log_pre_run();
505        }
506
507        self.log_run();
508
509        // Skip data before start_ns
510        let mut data = self.data_iterator.next();
511        while let Some(ref d) = data {
512            if d.ts_init() >= start_ns {
513                break;
514            }
515            data = self.data_iterator.next();
516        }
517
518        // Initialize last_ns before first data point
519        if let Some(ref d) = data {
520            let ts = d.ts_init();
521            self.last_ns = if ts.as_u64() > 0 {
522                UnixNanos::from(ts.as_u64() - 1)
523            } else {
524                UnixNanos::default()
525            };
526        } else {
527            self.last_ns = start_ns;
528        }
529
530        loop {
531            if self.force_stop {
532                log::error!("Force stop triggered, ending backtest");
533                break;
534            }
535
536            if data.is_none() {
537                let done = self.process_next_timer(&clocks);
538                data = self.data_iterator.next();
539                if data.is_none() && done {
540                    break;
541                }
542                continue;
543            }
544
545            let d = data.as_ref().unwrap();
546            let ts_init = d.ts_init();
547
548            if ts_init > end_ns {
549                break;
550            }
551
552            if ts_init > self.last_ns {
553                self.last_ns = ts_init;
554                self.advance_time_impl(ts_init, &clocks);
555            }
556
557            // Route data to exchange
558            self.route_data_to_exchange(d);
559
560            // Process through data engine (may trigger strategy callbacks
561            // which queue trading commands via the sync senders)
562            self.kernel.data_engine.borrow_mut().process_data(d.clone());
563
564            // Drain deferred commands, then process exchange queues
565            self.drain_command_queues();
566            self.process_and_settle_venues(ts_init);
567
568            let prev_last_ns = self.last_ns;
569            data = self.data_iterator.next();
570
571            // If timestamp changed, flush accumulated timer events
572            if data.is_none() || data.as_ref().unwrap().ts_init() > prev_last_ns {
573                self.flush_accumulator_events(&clocks, prev_last_ns);
574            }
575
576            self.iteration += 1;
577        }
578
579        // Process remaining exchange messages
580        let ts_now = self.kernel.clock.borrow().timestamp_ns();
581        for exchange in self.venues.values() {
582            exchange.borrow_mut().process(ts_now);
583        }
584
585        // Flush remaining timer events up to end time
586        self.flush_accumulator_events(&clocks, end_ns);
587
588        Ok(())
589    }
590
591    /// Manually end the backtest.
592    pub fn end(&mut self) {
593        // Stop trader
594        self.kernel.stop_trader();
595
596        // Stop engines
597        self.kernel.data_engine.borrow_mut().stop();
598        self.kernel.risk_engine.borrow_mut().stop();
599        self.kernel.exec_engine.borrow_mut().stop();
600
601        // Process remaining exchange messages
602        let ts_now = self.kernel.clock.borrow().timestamp_ns();
603        for exchange in self.venues.values() {
604            exchange.borrow_mut().process(ts_now);
605        }
606
607        self.run_finished = Some(UnixNanos::from(std::time::SystemTime::now()));
608        self.backtest_end = Some(self.kernel.clock.borrow().timestamp_ns());
609
610        // Switch logging back to realtime mode
611        logging_clock_set_realtime_mode();
612
613        self.log_post_run();
614    }
615
616    /// Reset the backtest engine.
617    ///
618    /// All stateful fields are reset to their initial value. Data and instruments
619    /// persist across resets to enable repeated runs with different strategies.
620    pub fn reset(&mut self) {
621        log::debug!("Resetting");
622
623        if self.kernel.trader.is_running() {
624            self.end();
625        }
626
627        // Stop and reset engines
628        self.kernel.data_engine.borrow_mut().stop();
629        self.kernel.data_engine.borrow_mut().reset();
630
631        self.kernel.exec_engine.borrow_mut().stop();
632        self.kernel.exec_engine.borrow_mut().reset();
633
634        self.kernel.risk_engine.borrow_mut().stop();
635        self.kernel.risk_engine.borrow_mut().reset();
636
637        // Reset trader
638        if let Err(e) = self.kernel.trader.reset() {
639            log::error!("Error resetting trader: {e:?}");
640        }
641
642        // Reset all exchanges
643        for exchange in self.venues.values() {
644            exchange.borrow_mut().reset();
645        }
646
647        // Clear run state
648        self.run_config_id = None;
649        self.run_id = None;
650        self.run_started = None;
651        self.run_finished = None;
652        self.backtest_start = None;
653        self.backtest_end = None;
654        self.iteration = 0;
655        self.force_stop = false;
656        self.last_ns = UnixNanos::default();
657        self.end_ns = UnixNanos::default();
658
659        self.accumulator.clear();
660
661        // Reset all iterator cursors to beginning (data persists)
662        self.data_iterator.reset_all_cursors();
663
664        log::info!("Reset");
665    }
666
667    /// Sort the engine's internal data stream by timestamp.
668    ///
669    /// Useful when data has been added with `sort=false` for batch performance,
670    /// then sorted once before running.
671    pub fn sort_data(&mut self) {
672        // The iterator sorts internally on add_data, but if multiple streams
673        // were added unsorted we need to re-add them. Since we use a single
674        // "backtest_data" stream, the iterator already maintains sort order.
675        // This is a no-op when using the iterator (data is sorted on insert).
676        log::info!("Data sort requested (iterator maintains sort order)");
677    }
678
679    /// Clear the engine's internal data stream. Does not clear instruments.
680    pub fn clear_data(&mut self) {
681        self.has_data.clear();
682        self.has_book_data.clear();
683        self.data_iterator = BacktestDataIterator::new();
684        self.data_len = 0;
685        self.data_stream_counter = 0;
686        self.ts_first = None;
687        self.ts_last_data = None;
688    }
689
690    /// Clear all trading strategies from the engine's internal trader.
691    ///
692    /// # Errors
693    ///
694    /// Returns an error if any strategy fails to dispose.
695    pub fn clear_strategies(&mut self) -> anyhow::Result<()> {
696        self.kernel.trader.clear_strategies()
697    }
698
699    /// Clear all execution algorithms from the engine's internal trader.
700    ///
701    /// # Errors
702    ///
703    /// Returns an error if any execution algorithm fails to dispose.
704    pub fn clear_exec_algorithms(&mut self) -> anyhow::Result<()> {
705        self.kernel.trader.clear_exec_algorithms()
706    }
707
708    /// Dispose of the backtest engine, releasing all resources.
709    pub fn dispose(&mut self) {
710        self.clear_data();
711        self.kernel.dispose();
712    }
713
714    /// Return the backtest result from the last run.
715    #[must_use]
716    pub fn get_result(&self) -> BacktestResult {
717        let elapsed_time_secs = match (self.backtest_start, self.backtest_end) {
718            (Some(start), Some(end)) => {
719                (end.as_u64() as f64 - start.as_u64() as f64) / 1_000_000_000.0
720            }
721            _ => 0.0,
722        };
723
724        let cache = self.kernel.cache.borrow();
725        let total_orders = cache.orders_total_count(None, None, None, None, None);
726        let positions = cache.positions(None, None, None, None, None);
727        let total_positions = positions.len();
728
729        let analyzer = self.build_analyzer(&cache, &positions);
730        let mut stats_pnls = AHashMap::new();
731        for currency in analyzer.currencies() {
732            if let Ok(pnls) = analyzer.get_performance_stats_pnls(Some(currency), None) {
733                stats_pnls.insert(currency.code.to_string(), pnls);
734            }
735        }
736
737        let stats_returns = analyzer.get_performance_stats_returns();
738        let stats_general = analyzer.get_performance_stats_general();
739
740        BacktestResult {
741            trader_id: self.config.trader_id().to_string(),
742            machine_id: self.kernel.machine_id.clone(),
743            instance_id: self.instance_id,
744            run_config_id: self.run_config_id.clone(),
745            run_id: self.run_id,
746            run_started: self.run_started,
747            run_finished: self.run_finished,
748            backtest_start: self.backtest_start,
749            backtest_end: self.backtest_end,
750            elapsed_time_secs,
751            iterations: self.iteration,
752            total_events: self.iteration,
753            total_orders,
754            total_positions,
755            stats_pnls,
756            stats_returns,
757            stats_general,
758        }
759    }
760
761    fn build_analyzer(&self, cache: &Cache, positions: &[&Position]) -> PortfolioAnalyzer {
762        let mut analyzer = PortfolioAnalyzer::default();
763        let positions_owned: Vec<_> = positions.iter().map(|p| (*p).clone()).collect();
764
765        // Aggregate starting and current balances across all venue accounts
766        for venue in self.venues.keys() {
767            if let Some(account) = cache.account_for_venue(venue) {
768                let account_ref: &dyn Account = match account {
769                    AccountAny::Cash(cash) => cash,
770                    AccountAny::Margin(margin) => margin,
771                };
772                for (currency, money) in account_ref.starting_balances() {
773                    analyzer
774                        .account_balances_starting
775                        .entry(currency)
776                        .and_modify(|existing| *existing = *existing + money)
777                        .or_insert(money);
778                }
779                for (currency, money) in account_ref.balances_total() {
780                    analyzer
781                        .account_balances
782                        .entry(currency)
783                        .and_modify(|existing| *existing = *existing + money)
784                        .or_insert(money);
785                }
786            }
787        }
788
789        analyzer.add_positions(&positions_owned);
790        analyzer
791    }
792
793    fn route_data_to_exchange(&self, data: &Data) {
794        let venue = data.instrument_id().venue;
795        if let Some(exchange) = self.venues.get(&venue) {
796            let mut ex = exchange.borrow_mut();
797            match data {
798                Data::Delta(delta) => ex.process_order_book_delta(*delta),
799                Data::Deltas(deltas) => ex.process_order_book_deltas((**deltas).clone()),
800                Data::Quote(quote) => ex.process_quote_tick(quote),
801                Data::Trade(trade) => ex.process_trade_tick(trade),
802                Data::Bar(bar) => ex.process_bar(*bar),
803                Data::InstrumentClose(close) => ex.process_instrument_close(*close),
804                Data::Depth10(depth) => ex.process_order_book_depth10(depth),
805                Data::MarkPriceUpdate(_) | Data::IndexPriceUpdate(_) => {
806                    // Not routed to exchange — processed by data engine only
807                }
808            }
809        } else {
810            log::warn!("No exchange found for venue {venue}, data not routed");
811        }
812    }
813
814    fn advance_time_impl(&mut self, ts_now: UnixNanos, clocks: &[Rc<RefCell<dyn Clock>>]) {
815        // Advance all clocks to ts_now via accumulator
816        for clock in clocks {
817            Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
818        }
819
820        // Process events with ts_event < ts_now
821        let ts_before = if ts_now.as_u64() > 0 {
822            UnixNanos::from(ts_now.as_u64() - 1)
823        } else {
824            UnixNanos::default()
825        };
826
827        let mut ts_last: Option<UnixNanos> = None;
828
829        while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_before) {
830            let ts_event = handler.event.ts_event;
831
832            Self::set_all_clocks_time(clocks, ts_event);
833            logging_clock_set_static_time(ts_event.as_u64());
834
835            handler.run();
836            self.drain_command_queues();
837
838            if ts_last != Some(ts_event) {
839                ts_last = Some(ts_event);
840                self.process_and_settle_venues(ts_event);
841            }
842
843            // Re-advance clocks to capture chained timers
844            for clock in clocks {
845                Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
846            }
847        }
848
849        Self::set_all_clocks_time(clocks, ts_now);
850        logging_clock_set_static_time(ts_now.as_u64());
851    }
852
853    fn flush_accumulator_events(&mut self, clocks: &[Rc<RefCell<dyn Clock>>], ts_now: UnixNanos) {
854        for clock in clocks {
855            Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
856        }
857
858        let mut ts_last: Option<UnixNanos> = None;
859
860        while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_now) {
861            let ts_event = handler.event.ts_event;
862
863            Self::set_all_clocks_time(clocks, ts_event);
864            logging_clock_set_static_time(ts_event.as_u64());
865
866            handler.run();
867            self.drain_command_queues();
868
869            if ts_last != Some(ts_event) {
870                ts_last = Some(ts_event);
871                self.process_and_settle_venues(ts_event);
872            }
873
874            // Re-advance clocks to capture chained timers
875            for clock in clocks {
876                Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
877            }
878        }
879    }
880
881    fn process_next_timer(&mut self, clocks: &[Rc<RefCell<dyn Clock>>]) -> bool {
882        self.flush_accumulator_events(clocks, self.last_ns);
883
884        // Find minimum next timer time across all component clocks
885        let mut min_next_time: Option<UnixNanos> = None;
886
887        for clock in clocks {
888            let clock_ref = clock.borrow();
889            for name in clock_ref.timer_names() {
890                if let Some(next_time) = clock_ref.next_time_ns(name)
891                    && next_time > self.last_ns
892                {
893                    min_next_time = Some(match min_next_time {
894                        Some(current_min) => next_time.min(current_min),
895                        None => next_time,
896                    });
897                }
898            }
899        }
900
901        match min_next_time {
902            None => true,
903            Some(t) if t > self.end_ns => true,
904            Some(t) => {
905                self.last_ns = t;
906                self.flush_accumulator_events(clocks, t);
907                false
908            }
909        }
910    }
911
912    fn collect_all_clocks(&self) -> Vec<Rc<RefCell<dyn Clock>>> {
913        let mut clocks = vec![self.kernel.clock.clone()];
914        clocks.extend(self.kernel.trader.get_component_clocks());
915        clocks
916    }
917
918    fn process_and_settle_venues(&self, ts_now: UnixNanos) {
919        loop {
920            for exchange in self.venues.values() {
921                exchange.borrow_mut().process(ts_now);
922            }
923            self.drain_command_queues();
924
925            let has_pending = self
926                .venues
927                .values()
928                .any(|exchange| exchange.borrow().has_pending_commands(ts_now));
929            if !has_pending {
930                break;
931            }
932        }
933    }
934
935    fn drain_exec_client_events(&self) {
936        for client in &self.exec_clients {
937            client.drain_queued_events();
938        }
939    }
940
941    fn drain_command_queues(&self) {
942        // Drain trading commands, exec client events, and data commands
943        // in a loop until all queues settle. Handles cascading re-entrancy
944        // (e.g. strategy submits order from on_order_filled).
945        loop {
946            drain_trading_cmd_queue();
947            drain_data_cmd_queue();
948            self.drain_exec_client_events();
949
950            if trading_cmd_queue_is_empty() && data_cmd_queue_is_empty() {
951                break;
952            }
953        }
954    }
955
956    fn init_command_senders() {
957        init_data_cmd_sender(Arc::new(SyncDataCommandSender));
958        init_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
959    }
960
961    fn advance_clock_on_accumulator(
962        accumulator: &mut TimeEventAccumulator,
963        clock: &Rc<RefCell<dyn Clock>>,
964        to_time_ns: UnixNanos,
965        set_time: bool,
966    ) {
967        let mut clock_ref = clock.borrow_mut();
968        let test_clock = clock_ref
969            .as_any_mut()
970            .downcast_mut::<TestClock>()
971            .expect("BacktestEngine requires TestClock");
972        accumulator.advance_clock(test_clock, to_time_ns, set_time);
973    }
974
975    fn set_all_clocks_time(clocks: &[Rc<RefCell<dyn Clock>>], ts: UnixNanos) {
976        for clock in clocks {
977            let mut clock_ref = clock.borrow_mut();
978            let test_clock = clock_ref
979                .as_any_mut()
980                .downcast_mut::<TestClock>()
981                .expect("BacktestEngine requires TestClock");
982            test_clock.set_time(ts);
983        }
984    }
985
986    #[rustfmt::skip]
987    fn log_pre_run(&self) {
988        log_info!("=================================================================", color = LogColor::Cyan);
989        log_info!(" BACKTEST PRE-RUN", color = LogColor::Cyan);
990        log_info!("=================================================================", color = LogColor::Cyan);
991
992        for exchange in self.venues.values() {
993            let ex = exchange.borrow();
994            log::info!(" SimulatedVenue {} ({})", ex.id, ex.account_type);
995        }
996
997        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
998    }
999
1000    #[rustfmt::skip]
1001    fn log_run(&self) {
1002        let config_id = self.run_config_id.as_deref().unwrap_or("None");
1003        let id = format_optional_uuid(self.run_id.as_ref());
1004        let start = format_optional_nanos(self.backtest_start);
1005
1006        log_info!("=================================================================", color = LogColor::Cyan);
1007        log_info!(" BACKTEST RUN", color = LogColor::Cyan);
1008        log_info!("=================================================================", color = LogColor::Cyan);
1009        log::info!("Run config ID:  {config_id}");
1010        log::info!("Run ID:         {id}");
1011        log::info!("Backtest start: {start}");
1012        log::info!("Data elements:  {}", self.data_len);
1013        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1014    }
1015
1016    #[rustfmt::skip]
1017    fn log_post_run(&self) {
1018        let cache = self.kernel.cache.borrow();
1019        let orders = cache.orders(None, None, None, None, None);
1020        let total_events: usize = orders.iter().map(|o| o.event_count()).sum();
1021        let total_orders = orders.len();
1022        let positions = cache.positions(None, None, None, None, None);
1023        let total_positions = positions.len();
1024
1025        let config_id = self.run_config_id.as_deref().unwrap_or("None");
1026        let id = format_optional_uuid(self.run_id.as_ref());
1027        let started = format_optional_nanos(self.run_started);
1028        let finished = format_optional_nanos(self.run_finished);
1029        let elapsed = format_optional_duration(self.run_started, self.run_finished);
1030        let bt_start = format_optional_nanos(self.backtest_start);
1031        let bt_end = format_optional_nanos(self.backtest_end);
1032        let bt_range = format_optional_duration(self.backtest_start, self.backtest_end);
1033        let iterations = self.iteration.separate_with_underscores();
1034        let events = total_events.separate_with_underscores();
1035        let num_orders = total_orders.separate_with_underscores();
1036        let num_positions = total_positions.separate_with_underscores();
1037
1038        log_info!("=================================================================", color = LogColor::Cyan);
1039        log_info!(" BACKTEST POST-RUN", color = LogColor::Cyan);
1040        log_info!("=================================================================", color = LogColor::Cyan);
1041        log::info!("Run config ID:  {config_id}");
1042        log::info!("Run ID:         {id}");
1043        log::info!("Run started:    {started}");
1044        log::info!("Run finished:   {finished}");
1045        log::info!("Elapsed time:   {elapsed}");
1046        log::info!("Backtest start: {bt_start}");
1047        log::info!("Backtest end:   {bt_end}");
1048        log::info!("Backtest range: {bt_range}");
1049        log::info!("Iterations: {iterations}");
1050        log::info!("Total events: {events}");
1051        log::info!("Total orders: {num_orders}");
1052        log::info!("Total positions: {num_positions}");
1053
1054        if !self.config.run_analysis {
1055            return;
1056        }
1057
1058        let analyzer = self.build_analyzer(&cache, &positions);
1059        log_portfolio_performance(&analyzer);
1060    }
1061
1062    pub fn add_data_client_if_not_exists(&mut self, client_id: ClientId) {
1063        if self
1064            .kernel
1065            .data_engine
1066            .borrow()
1067            .registered_clients()
1068            .contains(&client_id)
1069        {
1070            return;
1071        }
1072
1073        let venue = Venue::from(client_id.as_str());
1074        let backtest_client = BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1075        let data_client_adapter = DataClientAdapter::new(
1076            backtest_client.client_id,
1077            None,
1078            false,
1079            false,
1080            Box::new(backtest_client),
1081        );
1082
1083        self.kernel
1084            .data_engine
1085            .borrow_mut()
1086            .register_client(data_client_adapter, None);
1087    }
1088
1089    pub fn add_market_data_client_if_not_exists(&mut self, venue: Venue) {
1090        let client_id = ClientId::from(venue.as_str());
1091        if !self
1092            .kernel
1093            .data_engine
1094            .borrow()
1095            .registered_clients()
1096            .contains(&client_id)
1097        {
1098            let backtest_client =
1099                BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1100            let data_client_adapter = DataClientAdapter::new(
1101                client_id,
1102                Some(venue),
1103                false,
1104                false,
1105                Box::new(backtest_client),
1106            );
1107            self.kernel
1108                .data_engine
1109                .borrow_mut()
1110                .register_client(data_client_adapter, Some(venue));
1111        }
1112    }
1113}
1114
1115fn format_optional_nanos(nanos: Option<UnixNanos>) -> String {
1116    nanos.map_or("None".to_string(), unix_nanos_to_iso8601)
1117}
1118
1119fn format_optional_uuid(uuid: Option<&UUID4>) -> String {
1120    uuid.map_or("None".to_string(), |id| id.to_string())
1121}
1122
1123fn format_optional_duration(start: Option<UnixNanos>, end: Option<UnixNanos>) -> String {
1124    match (start, end) {
1125        (Some(s), Some(e)) => {
1126            let delta = e.to_datetime_utc() - s.to_datetime_utc();
1127            let days = delta.num_days().abs();
1128            let hours = delta.num_hours().abs() % 24;
1129            let minutes = delta.num_minutes().abs() % 60;
1130            let seconds = delta.num_seconds().abs() % 60;
1131            let micros = delta.subsec_nanos().unsigned_abs() / 1_000;
1132            format!("{days} days {hours:02}:{minutes:02}:{seconds:02}.{micros:06}")
1133        }
1134        _ => "None".to_string(),
1135    }
1136}
1137
1138#[rustfmt::skip]
1139fn log_portfolio_performance(analyzer: &PortfolioAnalyzer) {
1140    log_info!("=================================================================", color = LogColor::Cyan);
1141    log_info!(" PORTFOLIO PERFORMANCE", color = LogColor::Cyan);
1142    log_info!("=================================================================", color = LogColor::Cyan);
1143
1144    for currency in analyzer.currencies() {
1145        log::info!(" PnL Statistics ({})", currency.code);
1146        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1147
1148        if let Ok(pnl_lines) = analyzer.get_stats_pnls_formatted(Some(currency), None) {
1149            for line in &pnl_lines {
1150                log::info!("{line}");
1151            }
1152        }
1153
1154        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1155    }
1156
1157    log::info!(" Returns Statistics");
1158    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1159    for line in &analyzer.get_stats_returns_formatted() {
1160        log::info!("{line}");
1161    }
1162    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1163
1164    log::info!(" General Statistics");
1165    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1166    for line in &analyzer.get_stats_general_formatted() {
1167        log::info!("{line}");
1168    }
1169    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1170}