1use std::{any::Any, cell::RefCell, fmt::Debug, rc::Rc, sync::Arc};
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_analysis::analyzer::PortfolioAnalyzer;
22use nautilus_common::{
23 actor::DataActor,
24 cache::Cache,
25 clock::{Clock, TestClock},
26 component::Component,
27 enums::LogColor,
28 log_info,
29 logging::{
30 logging_clock_set_realtime_mode, logging_clock_set_static_mode,
31 logging_clock_set_static_time,
32 },
33 runner::{
34 SyncDataCommandSender, SyncTradingCommandSender, data_cmd_queue_is_empty,
35 drain_data_cmd_queue, drain_trading_cmd_queue, init_data_cmd_sender, init_exec_cmd_sender,
36 trading_cmd_queue_is_empty,
37 },
38};
39use nautilus_core::{UUID4, UnixNanos, datetime::unix_nanos_to_iso8601, formatting::Separable};
40use nautilus_data::client::DataClientAdapter;
41use nautilus_execution::models::{fee::FeeModelAny, fill::FillModelAny, latency::LatencyModel};
42use nautilus_model::{
43 accounts::{Account, AccountAny},
44 data::{Data, HasTsInit},
45 enums::{AccountType, BookType, OmsType},
46 identifiers::{AccountId, ClientId, InstrumentId, Venue},
47 instruments::{Instrument, InstrumentAny},
48 orders::Order,
49 position::Position,
50 types::{Currency, Money},
51};
52use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
53use nautilus_trading::strategy::Strategy;
54use rust_decimal::Decimal;
55
56use crate::{
57 accumulator::TimeEventAccumulator, config::BacktestEngineConfig,
58 data_client::BacktestDataClient, data_iterator::BacktestDataIterator,
59 exchange::SimulatedExchange, execution_client::BacktestExecutionClient,
60 modules::SimulationModule,
61};
62
63#[derive(Debug)]
65pub struct BacktestResult {
66 pub trader_id: String,
67 pub machine_id: String,
68 pub instance_id: UUID4,
69 pub run_config_id: Option<String>,
70 pub run_id: Option<UUID4>,
71 pub run_started: Option<UnixNanos>,
72 pub run_finished: Option<UnixNanos>,
73 pub backtest_start: Option<UnixNanos>,
74 pub backtest_end: Option<UnixNanos>,
75 pub elapsed_time_secs: f64,
76 pub iterations: usize,
77 pub total_events: usize,
78 pub total_orders: usize,
79 pub total_positions: usize,
80 pub stats_pnls: AHashMap<String, AHashMap<String, f64>>,
81 pub stats_returns: AHashMap<String, f64>,
82 pub stats_general: AHashMap<String, f64>,
83}
84
85pub struct BacktestEngine {
98 instance_id: UUID4,
99 config: BacktestEngineConfig,
100 kernel: NautilusKernel,
101 accumulator: TimeEventAccumulator,
102 run_config_id: Option<String>,
103 run_id: Option<UUID4>,
104 venues: AHashMap<Venue, Rc<RefCell<SimulatedExchange>>>,
105 exec_clients: Vec<BacktestExecutionClient>,
106 has_data: AHashSet<InstrumentId>,
107 has_book_data: AHashSet<InstrumentId>,
108 data_iterator: BacktestDataIterator,
109 data_len: usize,
110 data_stream_counter: usize,
111 ts_first: Option<UnixNanos>,
112 ts_last_data: Option<UnixNanos>,
113 iteration: usize,
114 force_stop: bool,
115 last_ns: UnixNanos,
116 end_ns: UnixNanos,
117 run_started: Option<UnixNanos>,
118 run_finished: Option<UnixNanos>,
119 backtest_start: Option<UnixNanos>,
120 backtest_end: Option<UnixNanos>,
121}
122
123impl Debug for BacktestEngine {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 f.debug_struct(stringify!(BacktestEngine))
126 .field("instance_id", &self.instance_id)
127 .field("run_config_id", &self.run_config_id)
128 .field("run_id", &self.run_id)
129 .finish()
130 }
131}
132
133impl BacktestEngine {
134 pub fn new(config: BacktestEngineConfig) -> anyhow::Result<Self> {
140 let kernel = NautilusKernel::new("BacktestEngine".to_string(), config.clone())?;
141 Ok(Self {
142 instance_id: kernel.instance_id,
143 config,
144 accumulator: TimeEventAccumulator::new(),
145 kernel,
146 run_config_id: None,
147 run_id: None,
148 venues: AHashMap::new(),
149 exec_clients: Vec::new(),
150 has_data: AHashSet::new(),
151 has_book_data: AHashSet::new(),
152 data_iterator: BacktestDataIterator::new(),
153 data_len: 0,
154 data_stream_counter: 0,
155 ts_first: None,
156 ts_last_data: None,
157 iteration: 0,
158 force_stop: false,
159 last_ns: UnixNanos::default(),
160 end_ns: UnixNanos::default(),
161 run_started: None,
162 run_finished: None,
163 backtest_start: None,
164 backtest_end: None,
165 })
166 }
167
168 #[allow(clippy::too_many_arguments)]
172 pub fn add_venue(
173 &mut self,
174 venue: Venue,
175 oms_type: OmsType,
176 account_type: AccountType,
177 book_type: BookType,
178 starting_balances: Vec<Money>,
179 base_currency: Option<Currency>,
180 default_leverage: Option<Decimal>,
181 leverages: AHashMap<InstrumentId, Decimal>,
182 modules: Vec<Box<dyn SimulationModule>>,
183 fill_model: FillModelAny,
184 fee_model: FeeModelAny,
185 latency_model: Option<Box<dyn LatencyModel>>,
186 routing: Option<bool>,
187 reject_stop_orders: Option<bool>,
188 support_gtd_orders: Option<bool>,
189 support_contingent_orders: Option<bool>,
190 use_position_ids: Option<bool>,
191 use_random_ids: Option<bool>,
192 use_reduce_only: Option<bool>,
193 use_message_queue: Option<bool>,
194 use_market_order_acks: Option<bool>,
195 bar_execution: Option<bool>,
196 bar_adaptive_high_low_ordering: Option<bool>,
197 trade_execution: Option<bool>,
198 liquidity_consumption: Option<bool>,
199 allow_cash_borrowing: Option<bool>,
200 frozen_account: Option<bool>,
201 price_protection_points: Option<u32>,
202 ) -> anyhow::Result<()> {
203 let default_leverage: Decimal = default_leverage.unwrap_or_else(|| {
204 if account_type == AccountType::Margin {
205 Decimal::from(10)
206 } else {
207 Decimal::from(0)
208 }
209 });
210
211 let exchange = SimulatedExchange::new(
212 venue,
213 oms_type,
214 account_type,
215 starting_balances,
216 base_currency,
217 default_leverage,
218 leverages,
219 modules,
220 self.kernel.cache.clone(),
221 self.kernel.clock.clone(),
222 fill_model,
223 fee_model,
224 book_type,
225 latency_model,
226 bar_execution,
227 bar_adaptive_high_low_ordering,
228 trade_execution,
229 liquidity_consumption,
230 reject_stop_orders,
231 support_gtd_orders,
232 support_contingent_orders,
233 use_position_ids,
234 use_random_ids,
235 use_reduce_only,
236 use_message_queue,
237 use_market_order_acks,
238 allow_cash_borrowing,
239 frozen_account,
240 price_protection_points,
241 )?;
242 let exchange = Rc::new(RefCell::new(exchange));
243 self.venues.insert(venue, exchange.clone());
244
245 let account_id = AccountId::from(format!("{venue}-001").as_str());
246
247 let exec_client = BacktestExecutionClient::new(
248 self.config.trader_id(),
249 account_id,
250 exchange.clone(),
251 self.kernel.cache.clone(),
252 self.kernel.clock.clone(),
253 routing,
254 frozen_account,
255 );
256
257 exchange
258 .borrow_mut()
259 .register_client(Rc::new(exec_client.clone()));
260
261 self.exec_clients.push(exec_client.clone());
262
263 self.kernel
264 .exec_engine
265 .borrow_mut()
266 .register_client(Box::new(exec_client))?;
267
268 log::info!("Adding exchange {venue} to engine");
269
270 Ok(())
271 }
272
273 pub fn change_fill_model(&mut self, venue: Venue, fill_model: FillModelAny) {
274 if let Some(exchange) = self.venues.get_mut(&venue) {
275 exchange.borrow_mut().set_fill_model(fill_model);
276 } else {
277 log::warn!(
278 "BacktestEngine::change_fill_model called for unknown venue {venue}, ignoring"
279 );
280 }
281 }
282
283 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
295 let instrument_id = instrument.id();
296 if let Some(exchange) = self.venues.get_mut(&instrument.id().venue) {
297 if matches!(instrument, InstrumentAny::CurrencyPair(_))
298 && exchange.borrow().account_type != AccountType::Margin
299 && exchange.borrow().base_currency.is_some()
300 {
301 anyhow::bail!(
302 "Cannot add a `CurrencyPair` instrument {instrument_id} for a venue with a single-currency CASH account"
303 )
304 }
305 exchange
306 .borrow_mut()
307 .add_instrument(instrument.clone())
308 .unwrap();
309 } else {
310 anyhow::bail!(
311 "Cannot add an `Instrument` object without first adding its associated venue {}",
312 instrument.id().venue
313 )
314 }
315
316 self.add_market_data_client_if_not_exists(instrument.id().venue);
317
318 self.kernel
319 .data_engine
320 .borrow_mut()
321 .process(&instrument as &dyn Any);
322 log::info!(
323 "Added instrument {} to exchange {}",
324 instrument_id,
325 instrument_id.venue
326 );
327 Ok(())
328 }
329
330 pub fn add_data(
331 &mut self,
332 data: Vec<Data>,
333 _client_id: Option<ClientId>,
334 validate: bool,
335 sort: bool,
336 ) {
337 if data.is_empty() {
338 log::warn!("add_data called with empty data slice – ignoring");
339 return;
340 }
341
342 let count = data.len();
343
344 let mut to_add = data;
345 if sort {
346 to_add.sort_by_key(HasTsInit::ts_init);
347 }
348
349 if validate {
350 for item in &to_add {
351 let instr_id = item.instrument_id();
352 self.has_data.insert(instr_id);
353
354 if item.is_order_book_data() {
355 self.has_book_data.insert(instr_id);
356 }
357
358 self.add_market_data_client_if_not_exists(instr_id.venue);
359 }
360 }
361
362 if let Some(first) = to_add.first() {
364 let ts = first.ts_init();
365 if self.ts_first.is_none_or(|t| ts < t) {
366 self.ts_first = Some(ts);
367 }
368 }
369 if let Some(last) = to_add.last() {
370 let ts = last.ts_init();
371 if self.ts_last_data.is_none_or(|t| ts > t) {
372 self.ts_last_data = Some(ts);
373 }
374 }
375
376 self.data_len += count;
377 let stream_name = format!("backtest_data_{}", self.data_stream_counter);
378 self.data_stream_counter += 1;
379 self.data_iterator.add_data(&stream_name, to_add, true);
380
381 log::info!(
382 "Added {count} data element{} to BacktestEngine ({} total)",
383 if count == 1 { "" } else { "s" },
384 self.data_len,
385 );
386 }
387
388 pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
394 where
395 T: Strategy + Component + Debug + 'static,
396 {
397 self.kernel.trader.add_strategy(strategy)
398 }
399
400 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
406 where
407 T: DataActor + Component + Debug + 'static,
408 {
409 self.kernel.trader.add_actor(actor)
410 }
411
412 pub fn add_exec_algorithm<T>(&mut self, exec_algorithm: T) -> anyhow::Result<()>
418 where
419 T: DataActor + Component + Debug + 'static,
420 {
421 self.kernel.trader.add_exec_algorithm(exec_algorithm)
422 }
423
424 pub fn run(
440 &mut self,
441 start: Option<UnixNanos>,
442 end: Option<UnixNanos>,
443 run_config_id: Option<String>,
444 streaming: bool,
445 ) -> anyhow::Result<()> {
446 self.run_impl(start, end, run_config_id)?;
447
448 if !streaming {
449 self.end();
450 }
451
452 Ok(())
453 }
454
455 fn run_impl(
456 &mut self,
457 start: Option<UnixNanos>,
458 end: Option<UnixNanos>,
459 run_config_id: Option<String>,
460 ) -> anyhow::Result<()> {
461 let start_ns = start.unwrap_or_else(|| self.ts_first.unwrap_or_default());
463 let end_ns = end.unwrap_or_else(|| {
464 self.ts_last_data
465 .unwrap_or(UnixNanos::from(4_102_444_800_000_000_000u64))
466 });
467 anyhow::ensure!(start_ns <= end_ns, "start was > end");
468 self.end_ns = end_ns;
469 self.last_ns = start_ns;
470
471 let clocks = self.collect_all_clocks();
473 Self::set_all_clocks_time(&clocks, start_ns);
474
475 if self.iteration == 0 {
477 self.run_config_id = run_config_id;
478 self.run_id = Some(UUID4::new());
479 self.run_started = Some(UnixNanos::from(std::time::SystemTime::now()));
480 self.backtest_start = Some(start_ns);
481
482 for exchange in self.venues.values() {
484 exchange.borrow_mut().initialize_account();
485 }
486
487 Self::set_all_clocks_time(&clocks, start_ns);
489
490 self.force_stop = false;
492
493 Self::init_command_senders();
495
496 logging_clock_set_static_mode();
498 logging_clock_set_static_time(start_ns.as_u64());
499
500 self.kernel.start();
502 self.kernel.start_trader();
503
504 self.log_pre_run();
505 }
506
507 self.log_run();
508
509 let mut data = self.data_iterator.next();
511 while let Some(ref d) = data {
512 if d.ts_init() >= start_ns {
513 break;
514 }
515 data = self.data_iterator.next();
516 }
517
518 if let Some(ref d) = data {
520 let ts = d.ts_init();
521 self.last_ns = if ts.as_u64() > 0 {
522 UnixNanos::from(ts.as_u64() - 1)
523 } else {
524 UnixNanos::default()
525 };
526 } else {
527 self.last_ns = start_ns;
528 }
529
530 loop {
531 if self.force_stop {
532 log::error!("Force stop triggered, ending backtest");
533 break;
534 }
535
536 if data.is_none() {
537 let done = self.process_next_timer(&clocks);
538 data = self.data_iterator.next();
539 if data.is_none() && done {
540 break;
541 }
542 continue;
543 }
544
545 let d = data.as_ref().unwrap();
546 let ts_init = d.ts_init();
547
548 if ts_init > end_ns {
549 break;
550 }
551
552 if ts_init > self.last_ns {
553 self.last_ns = ts_init;
554 self.advance_time_impl(ts_init, &clocks);
555 }
556
557 self.route_data_to_exchange(d);
559
560 self.kernel.data_engine.borrow_mut().process_data(d.clone());
563
564 self.drain_command_queues();
566 self.process_and_settle_venues(ts_init);
567
568 let prev_last_ns = self.last_ns;
569 data = self.data_iterator.next();
570
571 if data.is_none() || data.as_ref().unwrap().ts_init() > prev_last_ns {
573 self.flush_accumulator_events(&clocks, prev_last_ns);
574 }
575
576 self.iteration += 1;
577 }
578
579 let ts_now = self.kernel.clock.borrow().timestamp_ns();
581 for exchange in self.venues.values() {
582 exchange.borrow_mut().process(ts_now);
583 }
584
585 self.flush_accumulator_events(&clocks, end_ns);
587
588 Ok(())
589 }
590
591 pub fn end(&mut self) {
593 self.kernel.stop_trader();
595
596 self.kernel.data_engine.borrow_mut().stop();
598 self.kernel.risk_engine.borrow_mut().stop();
599 self.kernel.exec_engine.borrow_mut().stop();
600
601 let ts_now = self.kernel.clock.borrow().timestamp_ns();
603 for exchange in self.venues.values() {
604 exchange.borrow_mut().process(ts_now);
605 }
606
607 self.run_finished = Some(UnixNanos::from(std::time::SystemTime::now()));
608 self.backtest_end = Some(self.kernel.clock.borrow().timestamp_ns());
609
610 logging_clock_set_realtime_mode();
612
613 self.log_post_run();
614 }
615
616 pub fn reset(&mut self) {
621 log::debug!("Resetting");
622
623 if self.kernel.trader.is_running() {
624 self.end();
625 }
626
627 self.kernel.data_engine.borrow_mut().stop();
629 self.kernel.data_engine.borrow_mut().reset();
630
631 self.kernel.exec_engine.borrow_mut().stop();
632 self.kernel.exec_engine.borrow_mut().reset();
633
634 self.kernel.risk_engine.borrow_mut().stop();
635 self.kernel.risk_engine.borrow_mut().reset();
636
637 if let Err(e) = self.kernel.trader.reset() {
639 log::error!("Error resetting trader: {e:?}");
640 }
641
642 for exchange in self.venues.values() {
644 exchange.borrow_mut().reset();
645 }
646
647 self.run_config_id = None;
649 self.run_id = None;
650 self.run_started = None;
651 self.run_finished = None;
652 self.backtest_start = None;
653 self.backtest_end = None;
654 self.iteration = 0;
655 self.force_stop = false;
656 self.last_ns = UnixNanos::default();
657 self.end_ns = UnixNanos::default();
658
659 self.accumulator.clear();
660
661 self.data_iterator.reset_all_cursors();
663
664 log::info!("Reset");
665 }
666
667 pub fn sort_data(&mut self) {
672 log::info!("Data sort requested (iterator maintains sort order)");
677 }
678
679 pub fn clear_data(&mut self) {
681 self.has_data.clear();
682 self.has_book_data.clear();
683 self.data_iterator = BacktestDataIterator::new();
684 self.data_len = 0;
685 self.data_stream_counter = 0;
686 self.ts_first = None;
687 self.ts_last_data = None;
688 }
689
690 pub fn clear_strategies(&mut self) -> anyhow::Result<()> {
696 self.kernel.trader.clear_strategies()
697 }
698
699 pub fn clear_exec_algorithms(&mut self) -> anyhow::Result<()> {
705 self.kernel.trader.clear_exec_algorithms()
706 }
707
708 pub fn dispose(&mut self) {
710 self.clear_data();
711 self.kernel.dispose();
712 }
713
714 #[must_use]
716 pub fn get_result(&self) -> BacktestResult {
717 let elapsed_time_secs = match (self.backtest_start, self.backtest_end) {
718 (Some(start), Some(end)) => {
719 (end.as_u64() as f64 - start.as_u64() as f64) / 1_000_000_000.0
720 }
721 _ => 0.0,
722 };
723
724 let cache = self.kernel.cache.borrow();
725 let total_orders = cache.orders_total_count(None, None, None, None, None);
726 let positions = cache.positions(None, None, None, None, None);
727 let total_positions = positions.len();
728
729 let analyzer = self.build_analyzer(&cache, &positions);
730 let mut stats_pnls = AHashMap::new();
731 for currency in analyzer.currencies() {
732 if let Ok(pnls) = analyzer.get_performance_stats_pnls(Some(currency), None) {
733 stats_pnls.insert(currency.code.to_string(), pnls);
734 }
735 }
736
737 let stats_returns = analyzer.get_performance_stats_returns();
738 let stats_general = analyzer.get_performance_stats_general();
739
740 BacktestResult {
741 trader_id: self.config.trader_id().to_string(),
742 machine_id: self.kernel.machine_id.clone(),
743 instance_id: self.instance_id,
744 run_config_id: self.run_config_id.clone(),
745 run_id: self.run_id,
746 run_started: self.run_started,
747 run_finished: self.run_finished,
748 backtest_start: self.backtest_start,
749 backtest_end: self.backtest_end,
750 elapsed_time_secs,
751 iterations: self.iteration,
752 total_events: self.iteration,
753 total_orders,
754 total_positions,
755 stats_pnls,
756 stats_returns,
757 stats_general,
758 }
759 }
760
761 fn build_analyzer(&self, cache: &Cache, positions: &[&Position]) -> PortfolioAnalyzer {
762 let mut analyzer = PortfolioAnalyzer::default();
763 let positions_owned: Vec<_> = positions.iter().map(|p| (*p).clone()).collect();
764
765 for venue in self.venues.keys() {
767 if let Some(account) = cache.account_for_venue(venue) {
768 let account_ref: &dyn Account = match account {
769 AccountAny::Cash(cash) => cash,
770 AccountAny::Margin(margin) => margin,
771 };
772 for (currency, money) in account_ref.starting_balances() {
773 analyzer
774 .account_balances_starting
775 .entry(currency)
776 .and_modify(|existing| *existing = *existing + money)
777 .or_insert(money);
778 }
779 for (currency, money) in account_ref.balances_total() {
780 analyzer
781 .account_balances
782 .entry(currency)
783 .and_modify(|existing| *existing = *existing + money)
784 .or_insert(money);
785 }
786 }
787 }
788
789 analyzer.add_positions(&positions_owned);
790 analyzer
791 }
792
793 fn route_data_to_exchange(&self, data: &Data) {
794 let venue = data.instrument_id().venue;
795 if let Some(exchange) = self.venues.get(&venue) {
796 let mut ex = exchange.borrow_mut();
797 match data {
798 Data::Delta(delta) => ex.process_order_book_delta(*delta),
799 Data::Deltas(deltas) => ex.process_order_book_deltas((**deltas).clone()),
800 Data::Quote(quote) => ex.process_quote_tick(quote),
801 Data::Trade(trade) => ex.process_trade_tick(trade),
802 Data::Bar(bar) => ex.process_bar(*bar),
803 Data::InstrumentClose(close) => ex.process_instrument_close(*close),
804 Data::Depth10(depth) => ex.process_order_book_depth10(depth),
805 Data::MarkPriceUpdate(_) | Data::IndexPriceUpdate(_) => {
806 }
808 }
809 } else {
810 log::warn!("No exchange found for venue {venue}, data not routed");
811 }
812 }
813
814 fn advance_time_impl(&mut self, ts_now: UnixNanos, clocks: &[Rc<RefCell<dyn Clock>>]) {
815 for clock in clocks {
817 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
818 }
819
820 let ts_before = if ts_now.as_u64() > 0 {
822 UnixNanos::from(ts_now.as_u64() - 1)
823 } else {
824 UnixNanos::default()
825 };
826
827 let mut ts_last: Option<UnixNanos> = None;
828
829 while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_before) {
830 let ts_event = handler.event.ts_event;
831
832 Self::set_all_clocks_time(clocks, ts_event);
833 logging_clock_set_static_time(ts_event.as_u64());
834
835 handler.run();
836 self.drain_command_queues();
837
838 if ts_last != Some(ts_event) {
839 ts_last = Some(ts_event);
840 self.process_and_settle_venues(ts_event);
841 }
842
843 for clock in clocks {
845 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
846 }
847 }
848
849 Self::set_all_clocks_time(clocks, ts_now);
850 logging_clock_set_static_time(ts_now.as_u64());
851 }
852
853 fn flush_accumulator_events(&mut self, clocks: &[Rc<RefCell<dyn Clock>>], ts_now: UnixNanos) {
854 for clock in clocks {
855 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
856 }
857
858 let mut ts_last: Option<UnixNanos> = None;
859
860 while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_now) {
861 let ts_event = handler.event.ts_event;
862
863 Self::set_all_clocks_time(clocks, ts_event);
864 logging_clock_set_static_time(ts_event.as_u64());
865
866 handler.run();
867 self.drain_command_queues();
868
869 if ts_last != Some(ts_event) {
870 ts_last = Some(ts_event);
871 self.process_and_settle_venues(ts_event);
872 }
873
874 for clock in clocks {
876 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
877 }
878 }
879 }
880
881 fn process_next_timer(&mut self, clocks: &[Rc<RefCell<dyn Clock>>]) -> bool {
882 self.flush_accumulator_events(clocks, self.last_ns);
883
884 let mut min_next_time: Option<UnixNanos> = None;
886
887 for clock in clocks {
888 let clock_ref = clock.borrow();
889 for name in clock_ref.timer_names() {
890 if let Some(next_time) = clock_ref.next_time_ns(name)
891 && next_time > self.last_ns
892 {
893 min_next_time = Some(match min_next_time {
894 Some(current_min) => next_time.min(current_min),
895 None => next_time,
896 });
897 }
898 }
899 }
900
901 match min_next_time {
902 None => true,
903 Some(t) if t > self.end_ns => true,
904 Some(t) => {
905 self.last_ns = t;
906 self.flush_accumulator_events(clocks, t);
907 false
908 }
909 }
910 }
911
912 fn collect_all_clocks(&self) -> Vec<Rc<RefCell<dyn Clock>>> {
913 let mut clocks = vec![self.kernel.clock.clone()];
914 clocks.extend(self.kernel.trader.get_component_clocks());
915 clocks
916 }
917
918 fn process_and_settle_venues(&self, ts_now: UnixNanos) {
919 loop {
920 for exchange in self.venues.values() {
921 exchange.borrow_mut().process(ts_now);
922 }
923 self.drain_command_queues();
924
925 let has_pending = self
926 .venues
927 .values()
928 .any(|exchange| exchange.borrow().has_pending_commands(ts_now));
929 if !has_pending {
930 break;
931 }
932 }
933 }
934
935 fn drain_exec_client_events(&self) {
936 for client in &self.exec_clients {
937 client.drain_queued_events();
938 }
939 }
940
941 fn drain_command_queues(&self) {
942 loop {
946 drain_trading_cmd_queue();
947 drain_data_cmd_queue();
948 self.drain_exec_client_events();
949
950 if trading_cmd_queue_is_empty() && data_cmd_queue_is_empty() {
951 break;
952 }
953 }
954 }
955
956 fn init_command_senders() {
957 init_data_cmd_sender(Arc::new(SyncDataCommandSender));
958 init_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
959 }
960
961 fn advance_clock_on_accumulator(
962 accumulator: &mut TimeEventAccumulator,
963 clock: &Rc<RefCell<dyn Clock>>,
964 to_time_ns: UnixNanos,
965 set_time: bool,
966 ) {
967 let mut clock_ref = clock.borrow_mut();
968 let test_clock = clock_ref
969 .as_any_mut()
970 .downcast_mut::<TestClock>()
971 .expect("BacktestEngine requires TestClock");
972 accumulator.advance_clock(test_clock, to_time_ns, set_time);
973 }
974
975 fn set_all_clocks_time(clocks: &[Rc<RefCell<dyn Clock>>], ts: UnixNanos) {
976 for clock in clocks {
977 let mut clock_ref = clock.borrow_mut();
978 let test_clock = clock_ref
979 .as_any_mut()
980 .downcast_mut::<TestClock>()
981 .expect("BacktestEngine requires TestClock");
982 test_clock.set_time(ts);
983 }
984 }
985
986 #[rustfmt::skip]
987 fn log_pre_run(&self) {
988 log_info!("=================================================================", color = LogColor::Cyan);
989 log_info!(" BACKTEST PRE-RUN", color = LogColor::Cyan);
990 log_info!("=================================================================", color = LogColor::Cyan);
991
992 for exchange in self.venues.values() {
993 let ex = exchange.borrow();
994 log::info!(" SimulatedVenue {} ({})", ex.id, ex.account_type);
995 }
996
997 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
998 }
999
1000 #[rustfmt::skip]
1001 fn log_run(&self) {
1002 let config_id = self.run_config_id.as_deref().unwrap_or("None");
1003 let id = format_optional_uuid(self.run_id.as_ref());
1004 let start = format_optional_nanos(self.backtest_start);
1005
1006 log_info!("=================================================================", color = LogColor::Cyan);
1007 log_info!(" BACKTEST RUN", color = LogColor::Cyan);
1008 log_info!("=================================================================", color = LogColor::Cyan);
1009 log::info!("Run config ID: {config_id}");
1010 log::info!("Run ID: {id}");
1011 log::info!("Backtest start: {start}");
1012 log::info!("Data elements: {}", self.data_len);
1013 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1014 }
1015
1016 #[rustfmt::skip]
1017 fn log_post_run(&self) {
1018 let cache = self.kernel.cache.borrow();
1019 let orders = cache.orders(None, None, None, None, None);
1020 let total_events: usize = orders.iter().map(|o| o.event_count()).sum();
1021 let total_orders = orders.len();
1022 let positions = cache.positions(None, None, None, None, None);
1023 let total_positions = positions.len();
1024
1025 let config_id = self.run_config_id.as_deref().unwrap_or("None");
1026 let id = format_optional_uuid(self.run_id.as_ref());
1027 let started = format_optional_nanos(self.run_started);
1028 let finished = format_optional_nanos(self.run_finished);
1029 let elapsed = format_optional_duration(self.run_started, self.run_finished);
1030 let bt_start = format_optional_nanos(self.backtest_start);
1031 let bt_end = format_optional_nanos(self.backtest_end);
1032 let bt_range = format_optional_duration(self.backtest_start, self.backtest_end);
1033 let iterations = self.iteration.separate_with_underscores();
1034 let events = total_events.separate_with_underscores();
1035 let num_orders = total_orders.separate_with_underscores();
1036 let num_positions = total_positions.separate_with_underscores();
1037
1038 log_info!("=================================================================", color = LogColor::Cyan);
1039 log_info!(" BACKTEST POST-RUN", color = LogColor::Cyan);
1040 log_info!("=================================================================", color = LogColor::Cyan);
1041 log::info!("Run config ID: {config_id}");
1042 log::info!("Run ID: {id}");
1043 log::info!("Run started: {started}");
1044 log::info!("Run finished: {finished}");
1045 log::info!("Elapsed time: {elapsed}");
1046 log::info!("Backtest start: {bt_start}");
1047 log::info!("Backtest end: {bt_end}");
1048 log::info!("Backtest range: {bt_range}");
1049 log::info!("Iterations: {iterations}");
1050 log::info!("Total events: {events}");
1051 log::info!("Total orders: {num_orders}");
1052 log::info!("Total positions: {num_positions}");
1053
1054 if !self.config.run_analysis {
1055 return;
1056 }
1057
1058 let analyzer = self.build_analyzer(&cache, &positions);
1059 log_portfolio_performance(&analyzer);
1060 }
1061
1062 pub fn add_data_client_if_not_exists(&mut self, client_id: ClientId) {
1063 if self
1064 .kernel
1065 .data_engine
1066 .borrow()
1067 .registered_clients()
1068 .contains(&client_id)
1069 {
1070 return;
1071 }
1072
1073 let venue = Venue::from(client_id.as_str());
1074 let backtest_client = BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1075 let data_client_adapter = DataClientAdapter::new(
1076 backtest_client.client_id,
1077 None,
1078 false,
1079 false,
1080 Box::new(backtest_client),
1081 );
1082
1083 self.kernel
1084 .data_engine
1085 .borrow_mut()
1086 .register_client(data_client_adapter, None);
1087 }
1088
1089 pub fn add_market_data_client_if_not_exists(&mut self, venue: Venue) {
1090 let client_id = ClientId::from(venue.as_str());
1091 if !self
1092 .kernel
1093 .data_engine
1094 .borrow()
1095 .registered_clients()
1096 .contains(&client_id)
1097 {
1098 let backtest_client =
1099 BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1100 let data_client_adapter = DataClientAdapter::new(
1101 client_id,
1102 Some(venue),
1103 false,
1104 false,
1105 Box::new(backtest_client),
1106 );
1107 self.kernel
1108 .data_engine
1109 .borrow_mut()
1110 .register_client(data_client_adapter, Some(venue));
1111 }
1112 }
1113}
1114
1115fn format_optional_nanos(nanos: Option<UnixNanos>) -> String {
1116 nanos.map_or("None".to_string(), unix_nanos_to_iso8601)
1117}
1118
1119fn format_optional_uuid(uuid: Option<&UUID4>) -> String {
1120 uuid.map_or("None".to_string(), |id| id.to_string())
1121}
1122
1123fn format_optional_duration(start: Option<UnixNanos>, end: Option<UnixNanos>) -> String {
1124 match (start, end) {
1125 (Some(s), Some(e)) => {
1126 let delta = e.to_datetime_utc() - s.to_datetime_utc();
1127 let days = delta.num_days().abs();
1128 let hours = delta.num_hours().abs() % 24;
1129 let minutes = delta.num_minutes().abs() % 60;
1130 let seconds = delta.num_seconds().abs() % 60;
1131 let micros = delta.subsec_nanos().unsigned_abs() / 1_000;
1132 format!("{days} days {hours:02}:{minutes:02}:{seconds:02}.{micros:06}")
1133 }
1134 _ => "None".to_string(),
1135 }
1136}
1137
1138#[rustfmt::skip]
1139fn log_portfolio_performance(analyzer: &PortfolioAnalyzer) {
1140 log_info!("=================================================================", color = LogColor::Cyan);
1141 log_info!(" PORTFOLIO PERFORMANCE", color = LogColor::Cyan);
1142 log_info!("=================================================================", color = LogColor::Cyan);
1143
1144 for currency in analyzer.currencies() {
1145 log::info!(" PnL Statistics ({})", currency.code);
1146 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1147
1148 if let Ok(pnl_lines) = analyzer.get_stats_pnls_formatted(Some(currency), None) {
1149 for line in &pnl_lines {
1150 log::info!("{line}");
1151 }
1152 }
1153
1154 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1155 }
1156
1157 log::info!(" Returns Statistics");
1158 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1159 for line in &analyzer.get_stats_returns_formatted() {
1160 log::info!("{line}");
1161 }
1162 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1163
1164 log::info!(" General Statistics");
1165 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1166 for line in &analyzer.get_stats_general_formatted() {
1167 log::info!("{line}");
1168 }
1169 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1170}