1use std::{
19 cell::RefCell,
20 collections::{HashMap, HashSet},
21 fmt::Debug,
22 rc::Rc,
23};
24
25use nautilus_analysis::analyzer::PortfolioAnalyzer;
26use nautilus_common::{
27 cache::Cache,
28 clock::Clock,
29 msgbus::{
30 self,
31 handler::{ShareableMessageHandler, TypedMessageHandler},
32 },
33};
34use nautilus_core::{WeakCell, datetime::NANOSECONDS_IN_MILLISECOND};
35use nautilus_model::{
36 accounts::AccountAny,
37 data::{Bar, QuoteTick},
38 enums::{OrderSide, OrderType, PositionSide, PriceType},
39 events::{AccountState, OrderEventAny, position::PositionEvent},
40 identifiers::{AccountId, InstrumentId, Venue},
41 instruments::{Instrument, InstrumentAny},
42 orders::{Order, OrderAny},
43 position::Position,
44 types::{Currency, Money, Price},
45};
46use rust_decimal::{Decimal, prelude::FromPrimitive};
47
48use crate::{config::PortfolioConfig, manager::AccountsManager};
49
50struct PortfolioState {
51 accounts: AccountsManager,
52 analyzer: PortfolioAnalyzer,
53 unrealized_pnls: HashMap<InstrumentId, Money>,
54 realized_pnls: HashMap<InstrumentId, Money>,
55 net_positions: HashMap<InstrumentId, Decimal>,
56 pending_calcs: HashSet<InstrumentId>,
57 bar_close_prices: HashMap<InstrumentId, Price>,
58 initialized: bool,
59 last_account_state_log_ts: HashMap<AccountId, u64>,
60 min_account_state_logging_interval_ns: u64,
61}
62
63impl PortfolioState {
64 fn new(
65 clock: Rc<RefCell<dyn Clock>>,
66 cache: Rc<RefCell<Cache>>,
67 config: &PortfolioConfig,
68 ) -> Self {
69 let min_account_state_logging_interval_ns = config
70 .min_account_state_logging_interval_ms
71 .map_or(0, |ms| ms * NANOSECONDS_IN_MILLISECOND);
72
73 Self {
74 accounts: AccountsManager::new(clock, cache),
75 analyzer: PortfolioAnalyzer::default(),
76 unrealized_pnls: HashMap::new(),
77 realized_pnls: HashMap::new(),
78 net_positions: HashMap::new(),
79 pending_calcs: HashSet::new(),
80 bar_close_prices: HashMap::new(),
81 initialized: false,
82 last_account_state_log_ts: HashMap::new(),
83 min_account_state_logging_interval_ns,
84 }
85 }
86
87 fn reset(&mut self) {
88 log::debug!("RESETTING");
89 self.net_positions.clear();
90 self.unrealized_pnls.clear();
91 self.realized_pnls.clear();
92 self.pending_calcs.clear();
93 self.last_account_state_log_ts.clear();
94 self.analyzer.reset();
95 log::debug!("READY");
96 }
97}
98
99pub struct Portfolio {
100 pub(crate) clock: Rc<RefCell<dyn Clock>>,
101 pub(crate) cache: Rc<RefCell<Cache>>,
102 inner: Rc<RefCell<PortfolioState>>,
103 config: PortfolioConfig,
104}
105
106impl Debug for Portfolio {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 f.debug_struct(stringify!(Portfolio)).finish()
109 }
110}
111
112impl Portfolio {
113 pub fn new(
114 cache: Rc<RefCell<Cache>>,
115 clock: Rc<RefCell<dyn Clock>>,
116 config: Option<PortfolioConfig>,
117 ) -> Self {
118 let config = config.unwrap_or_default();
119 let inner = Rc::new(RefCell::new(PortfolioState::new(
120 clock.clone(),
121 cache.clone(),
122 &config,
123 )));
124
125 Self::register_message_handlers(
126 cache.clone(),
127 clock.clone(),
128 inner.clone(),
129 config.bar_updates,
130 );
131
132 Self {
133 clock,
134 cache,
135 inner,
136 config,
137 }
138 }
139
140 fn register_message_handlers(
141 cache: Rc<RefCell<Cache>>,
142 clock: Rc<RefCell<dyn Clock>>,
143 inner: Rc<RefCell<PortfolioState>>,
144 bar_updates: bool,
145 ) {
146 let inner_weak = WeakCell::from(Rc::downgrade(&inner));
147
148 let update_account_handler = {
149 let cache = cache.clone();
150 let inner = inner_weak.clone();
151 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
152 move |event: &AccountState| {
153 if let Some(inner_rc) = inner.upgrade() {
154 update_account(cache.clone(), inner_rc.into(), event);
155 }
156 },
157 )))
158 };
159
160 let update_position_handler = {
161 let cache = cache.clone();
162 let clock = clock.clone();
163 let inner = inner_weak.clone();
164 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
165 move |event: &PositionEvent| {
166 if let Some(inner_rc) = inner.upgrade() {
167 update_position(cache.clone(), clock.clone(), inner_rc.into(), event);
168 }
169 },
170 )))
171 };
172
173 let update_quote_handler = {
174 let cache = cache.clone();
175 let clock = clock.clone();
176 let inner = inner_weak.clone();
177 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
178 move |quote: &QuoteTick| {
179 if let Some(inner_rc) = inner.upgrade() {
180 update_quote_tick(cache.clone(), clock.clone(), inner_rc.into(), quote);
181 }
182 },
183 )))
184 };
185
186 let update_bar_handler = {
187 let cache = cache.clone();
188 let clock = clock.clone();
189 let inner = inner_weak.clone();
190 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
191 if let Some(inner_rc) = inner.upgrade() {
192 update_bar(cache.clone(), clock.clone(), inner_rc.into(), bar);
193 }
194 })))
195 };
196
197 let update_order_handler = {
198 let cache = cache;
199 let clock = clock.clone();
200 let inner = inner_weak;
201 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
202 move |event: &OrderEventAny| {
203 if let Some(inner_rc) = inner.upgrade() {
204 update_order(cache.clone(), clock.clone(), inner_rc.into(), event);
205 }
206 },
207 )))
208 };
209
210 msgbus::register(
211 "Portfolio.update_account".into(),
212 update_account_handler.clone(),
213 );
214
215 msgbus::subscribe("data.quotes.*".into(), update_quote_handler, Some(10));
216 if bar_updates {
217 msgbus::subscribe("data.quotes.*EXTERNAL".into(), update_bar_handler, Some(10));
218 }
219 msgbus::subscribe("events.order.*".into(), update_order_handler, Some(10));
220 msgbus::subscribe(
221 "events.position.*".into(),
222 update_position_handler,
223 Some(10),
224 );
225 msgbus::subscribe("events.account.*".into(), update_account_handler, Some(10));
226 }
227
228 pub fn reset(&mut self) {
229 log::debug!("RESETTING");
230 self.inner.borrow_mut().reset();
231 log::debug!("READY");
232 }
233
234 #[must_use]
238 pub fn is_initialized(&self) -> bool {
239 self.inner.borrow().initialized
240 }
241
242 #[must_use]
246 pub fn balances_locked(&self, venue: &Venue) -> HashMap<Currency, Money> {
247 self.cache.borrow().account_for_venue(venue).map_or_else(
248 || {
249 log::error!("Cannot get balances locked: no account generated for {venue}");
250 HashMap::new()
251 },
252 AccountAny::balances_locked,
253 )
254 }
255
256 #[must_use]
260 pub fn margins_init(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
261 self.cache.borrow().account_for_venue(venue).map_or_else(
262 || {
263 log::error!(
264 "Cannot get initial (order) margins: no account registered for {venue}"
265 );
266 HashMap::new()
267 },
268 |account| match account {
269 AccountAny::Margin(margin_account) => margin_account.initial_margins(),
270 AccountAny::Cash(_) => {
271 log::warn!("Initial margins not applicable for cash account");
272 HashMap::new()
273 }
274 },
275 )
276 }
277
278 #[must_use]
282 pub fn margins_maint(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
283 self.cache.borrow().account_for_venue(venue).map_or_else(
284 || {
285 log::error!(
286 "Cannot get maintenance (position) margins: no account registered for {venue}"
287 );
288 HashMap::new()
289 },
290 |account| match account {
291 AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
292 AccountAny::Cash(_) => {
293 log::warn!("Maintenance margins not applicable for cash account");
294 HashMap::new()
295 }
296 },
297 )
298 }
299
300 #[must_use]
304 pub fn unrealized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
305 let instrument_ids = {
306 let cache = self.cache.borrow();
307 let positions = cache.positions(Some(venue), None, None, None);
308
309 if positions.is_empty() {
310 return HashMap::new(); }
312
313 let instrument_ids: HashSet<InstrumentId> =
314 positions.iter().map(|p| p.instrument_id).collect();
315
316 instrument_ids
317 };
318
319 let mut unrealized_pnls: HashMap<Currency, f64> = HashMap::new();
320
321 for instrument_id in instrument_ids {
322 if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
323 *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
325 continue;
326 }
327
328 match self.calculate_unrealized_pnl(&instrument_id) {
330 Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
331 None => continue,
332 }
333 }
334
335 unrealized_pnls
336 .into_iter()
337 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
338 .collect()
339 }
340
341 #[must_use]
345 pub fn realized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
346 let instrument_ids = {
347 let cache = self.cache.borrow();
348 let positions = cache.positions(Some(venue), None, None, None);
349
350 if positions.is_empty() {
351 return HashMap::new(); }
353
354 let instrument_ids: HashSet<InstrumentId> =
355 positions.iter().map(|p| p.instrument_id).collect();
356
357 instrument_ids
358 };
359
360 let mut realized_pnls: HashMap<Currency, f64> = HashMap::new();
361
362 for instrument_id in instrument_ids {
363 if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
364 *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
366 continue;
367 }
368
369 match self.calculate_realized_pnl(&instrument_id) {
371 Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
372 None => continue,
373 }
374 }
375
376 realized_pnls
377 .into_iter()
378 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
379 .collect()
380 }
381
382 #[must_use]
383 pub fn net_exposures(&self, venue: &Venue) -> Option<HashMap<Currency, Money>> {
384 let cache = self.cache.borrow();
385 let account = if let Some(account) = cache.account_for_venue(venue) {
386 account
387 } else {
388 log::error!("Cannot calculate net exposures: no account registered for {venue}");
389 return None; };
391
392 let positions_open = cache.positions_open(Some(venue), None, None, None);
393 if positions_open.is_empty() {
394 return Some(HashMap::new()); }
396
397 let mut net_exposures: HashMap<Currency, f64> = HashMap::new();
398
399 for position in positions_open {
400 let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
401 instrument
402 } else {
403 log::error!(
404 "Cannot calculate net exposures: no instrument for {}",
405 position.instrument_id
406 );
407 return None; };
409
410 if position.side == PositionSide::Flat {
411 log::error!(
412 "Cannot calculate net exposures: position is flat for {}",
413 position.instrument_id
414 );
415 continue; }
417
418 let price = self.get_price(position)?;
419 let xrate = if let Some(xrate) =
420 self.calculate_xrate_to_base(instrument, account, position.entry)
421 {
422 xrate
423 } else {
424 log::error!(
425 "Cannot calculate net exposures: insufficient data for {}/{:?}",
427 instrument.settlement_currency(),
428 account.base_currency()
429 );
430 return None; };
432
433 let settlement_currency = account
434 .base_currency()
435 .unwrap_or_else(|| instrument.settlement_currency());
436
437 let net_exposure = instrument
438 .calculate_notional_value(position.quantity, price, None)
439 .as_f64()
440 * xrate;
441
442 let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
443 .round()
444 / 10f64.powi(settlement_currency.precision.into());
445
446 *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
447 }
448
449 Some(
450 net_exposures
451 .into_iter()
452 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
453 .collect(),
454 )
455 }
456
457 #[must_use]
458 pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
459 if let Some(pnl) = self
460 .inner
461 .borrow()
462 .unrealized_pnls
463 .get(instrument_id)
464 .copied()
465 {
466 return Some(pnl);
467 }
468
469 let pnl = self.calculate_unrealized_pnl(instrument_id)?;
470 self.inner
471 .borrow_mut()
472 .unrealized_pnls
473 .insert(*instrument_id, pnl);
474 Some(pnl)
475 }
476
477 #[must_use]
478 pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
479 if let Some(pnl) = self
480 .inner
481 .borrow()
482 .realized_pnls
483 .get(instrument_id)
484 .copied()
485 {
486 return Some(pnl);
487 }
488
489 let pnl = self.calculate_realized_pnl(instrument_id)?;
490 self.inner
491 .borrow_mut()
492 .realized_pnls
493 .insert(*instrument_id, pnl);
494 Some(pnl)
495 }
496
497 #[must_use]
498 pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
499 let cache = self.cache.borrow();
500 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
501 account
502 } else {
503 log::error!(
504 "Cannot calculate net exposure: no account registered for {}",
505 instrument_id.venue
506 );
507 return None;
508 };
509
510 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
511 instrument
512 } else {
513 log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
514 return None;
515 };
516
517 let positions_open = cache.positions_open(
518 None, Some(instrument_id),
520 None,
521 None,
522 );
523
524 if positions_open.is_empty() {
525 return Some(Money::new(0.0, instrument.settlement_currency()));
526 }
527
528 let mut net_exposure = 0.0;
529
530 for position in positions_open {
531 let price = self.get_price(position)?;
532 let xrate = if let Some(xrate) =
533 self.calculate_xrate_to_base(instrument, account, position.entry)
534 {
535 xrate
536 } else {
537 log::error!(
538 "Cannot calculate net exposures: insufficient data for {}/{:?}",
540 instrument.settlement_currency(),
541 account.base_currency()
542 );
543 return None; };
545
546 let notional_value =
547 instrument.calculate_notional_value(position.quantity, price, None);
548 net_exposure += notional_value.as_f64() * xrate;
549 }
550
551 let settlement_currency = account
552 .base_currency()
553 .unwrap_or_else(|| instrument.settlement_currency());
554
555 Some(Money::new(net_exposure, settlement_currency))
556 }
557
558 #[must_use]
559 pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
560 self.inner
561 .borrow()
562 .net_positions
563 .get(instrument_id)
564 .copied()
565 .unwrap_or(Decimal::ZERO)
566 }
567
568 #[must_use]
569 pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
570 self.inner
571 .borrow()
572 .net_positions
573 .get(instrument_id)
574 .copied()
575 .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
576 }
577
578 #[must_use]
579 pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
580 self.inner
581 .borrow()
582 .net_positions
583 .get(instrument_id)
584 .copied()
585 .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
586 }
587
588 #[must_use]
589 pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
590 self.inner
591 .borrow()
592 .net_positions
593 .get(instrument_id)
594 .copied()
595 .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
596 }
597
598 #[must_use]
599 pub fn is_completely_flat(&self) -> bool {
600 for net_position in self.inner.borrow().net_positions.values() {
601 if *net_position != Decimal::ZERO {
602 return false;
603 }
604 }
605 true
606 }
607
608 pub fn initialize_orders(&mut self) {
616 let mut initialized = true;
617 let orders_and_instruments = {
618 let cache = self.cache.borrow();
619 let all_orders_open = cache.orders_open(None, None, None, None);
620
621 let mut instruments_with_orders = Vec::new();
622 let mut instruments = HashSet::new();
623
624 for order in &all_orders_open {
625 instruments.insert(order.instrument_id());
626 }
627
628 for instrument_id in instruments {
629 if let Some(instrument) = cache.instrument(&instrument_id) {
630 let orders = cache
631 .orders_open(None, Some(&instrument_id), None, None)
632 .into_iter()
633 .cloned()
634 .collect::<Vec<OrderAny>>();
635 instruments_with_orders.push((instrument.clone(), orders));
636 } else {
637 log::error!(
638 "Cannot update initial (order) margin: no instrument found for {instrument_id}"
639 );
640 initialized = false;
641 break;
642 }
643 }
644 instruments_with_orders
645 };
646
647 for (instrument, orders_open) in &orders_and_instruments {
648 let mut cache = self.cache.borrow_mut();
649 let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
650 account
651 } else {
652 log::error!(
653 "Cannot update initial (order) margin: no account registered for {}",
654 instrument.id().venue
655 );
656 initialized = false;
657 break;
658 };
659
660 let result = self.inner.borrow_mut().accounts.update_orders(
661 account,
662 instrument.clone(),
663 orders_open.iter().collect(),
664 self.clock.borrow().timestamp_ns(),
665 );
666
667 match result {
668 Some((updated_account, _)) => {
669 cache.add_account(updated_account).unwrap(); }
671 None => {
672 initialized = false;
673 }
674 }
675 }
676
677 let total_orders = orders_and_instruments
678 .into_iter()
679 .map(|(_, orders)| orders.len())
680 .sum::<usize>();
681
682 log::info!(
683 "Initialized {} open order{}",
684 total_orders,
685 if total_orders == 1 { "" } else { "s" }
686 );
687
688 self.inner.borrow_mut().initialized = initialized;
689 }
690
691 pub fn initialize_positions(&mut self) {
697 self.inner.borrow_mut().unrealized_pnls.clear();
698 self.inner.borrow_mut().realized_pnls.clear();
699 let all_positions_open: Vec<Position>;
700 let mut instruments = HashSet::new();
701 {
702 let cache = self.cache.borrow();
703 all_positions_open = cache
704 .positions_open(None, None, None, None)
705 .into_iter()
706 .cloned()
707 .collect();
708 for position in &all_positions_open {
709 instruments.insert(position.instrument_id);
710 }
711 }
712
713 let mut initialized = true;
714
715 for instrument_id in instruments {
716 let positions_open: Vec<Position> = {
717 let cache = self.cache.borrow();
718 cache
719 .positions_open(None, Some(&instrument_id), None, None)
720 .into_iter()
721 .cloned()
722 .collect()
723 };
724
725 self.update_net_position(&instrument_id, positions_open);
726
727 let calculated_unrealized_pnl = self
728 .calculate_unrealized_pnl(&instrument_id)
729 .expect("Failed to calculate unrealized PnL");
730 let calculated_realized_pnl = self
731 .calculate_realized_pnl(&instrument_id)
732 .expect("Failed to calculate realized PnL");
733
734 self.inner
735 .borrow_mut()
736 .unrealized_pnls
737 .insert(instrument_id, calculated_unrealized_pnl);
738 self.inner
739 .borrow_mut()
740 .realized_pnls
741 .insert(instrument_id, calculated_realized_pnl);
742
743 let cache = self.cache.borrow();
744 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
745 account
746 } else {
747 log::error!(
748 "Cannot update maintenance (position) margin: no account registered for {}",
749 instrument_id.venue
750 );
751 initialized = false;
752 break;
753 };
754
755 let account = match account {
756 AccountAny::Cash(_) => continue,
757 AccountAny::Margin(margin_account) => margin_account,
758 };
759
760 let mut cache = self.cache.borrow_mut();
761 let instrument = if let Some(instrument) = cache.instrument(&instrument_id) {
762 instrument
763 } else {
764 log::error!(
765 "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
766 );
767 initialized = false;
768 break;
769 };
770
771 let result = self.inner.borrow_mut().accounts.update_positions(
772 account,
773 instrument.clone(),
774 self.cache
775 .borrow()
776 .positions_open(None, Some(&instrument_id), None, None),
777 self.clock.borrow().timestamp_ns(),
778 );
779
780 match result {
781 Some((updated_account, _)) => {
782 cache
783 .add_account(AccountAny::Margin(updated_account)) .unwrap();
785 }
786 None => {
787 initialized = false;
788 }
789 }
790 }
791
792 let open_count = all_positions_open.len();
793 self.inner.borrow_mut().initialized = initialized;
794 log::info!(
795 "Initialized {} open position{}",
796 open_count,
797 if open_count == 1 { "" } else { "s" }
798 );
799 }
800
801 pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
805 update_quote_tick(
806 self.cache.clone(),
807 self.clock.clone(),
808 self.inner.clone(),
809 quote,
810 );
811 }
812
813 pub fn update_bar(&mut self, bar: &Bar) {
817 update_bar(
818 self.cache.clone(),
819 self.clock.clone(),
820 self.inner.clone(),
821 bar,
822 );
823 }
824
825 pub fn update_account(&mut self, event: &AccountState) {
827 update_account(self.cache.clone(), self.inner.clone(), event);
828 }
829
830 pub fn update_order(&mut self, event: &OrderEventAny) {
834 update_order(
835 self.cache.clone(),
836 self.clock.clone(),
837 self.inner.clone(),
838 event,
839 );
840 }
841
842 pub fn update_position(&mut self, event: &PositionEvent) {
846 update_position(
847 self.cache.clone(),
848 self.clock.clone(),
849 self.inner.clone(),
850 event,
851 );
852 }
853
854 fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
857 let mut net_position = Decimal::ZERO;
858
859 for open_position in positions_open {
860 log::debug!("open_position: {open_position}");
861 net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
862 }
863
864 let existing_position = self.net_position(instrument_id);
865 if existing_position != net_position {
866 self.inner
867 .borrow_mut()
868 .net_positions
869 .insert(*instrument_id, net_position);
870 log::info!("{instrument_id} net_position={net_position}");
871 }
872 }
873
874 fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
875 let cache = self.cache.borrow();
876 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
877 account
878 } else {
879 log::error!(
880 "Cannot calculate unrealized PnL: no account registered for {}",
881 instrument_id.venue
882 );
883 return None;
884 };
885
886 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
887 instrument
888 } else {
889 log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
890 return None;
891 };
892
893 let currency = account
894 .base_currency()
895 .unwrap_or_else(|| instrument.settlement_currency());
896
897 let positions_open = cache.positions_open(
898 None, Some(instrument_id),
900 None,
901 None,
902 );
903
904 if positions_open.is_empty() {
905 return Some(Money::new(0.0, currency));
906 }
907
908 let mut total_pnl = 0.0;
909
910 for position in positions_open {
911 if position.instrument_id != *instrument_id {
912 continue; }
914
915 if position.side == PositionSide::Flat {
916 continue; }
918
919 let price = if let Some(price) = self.get_price(position) {
920 price
921 } else {
922 log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
923 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
924 return None; };
926
927 let mut pnl = position.unrealized_pnl(price).as_f64();
928
929 if let Some(base_currency) = account.base_currency() {
930 let xrate = if let Some(xrate) =
931 self.calculate_xrate_to_base(instrument, account, position.entry)
932 {
933 xrate
934 } else {
935 log::error!(
936 "Cannot calculate unrealized PnL: insufficient data for {}/{}",
938 instrument.settlement_currency(),
939 base_currency
940 );
941 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
942 return None; };
944
945 let scale = 10f64.powi(currency.precision.into());
946 pnl = ((pnl * xrate) * scale).round() / scale;
947 }
948
949 total_pnl += pnl;
950 }
951
952 Some(Money::new(total_pnl, currency))
953 }
954
955 fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
956 let cache = self.cache.borrow();
957 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
958 account
959 } else {
960 log::error!(
961 "Cannot calculate realized PnL: no account registered for {}",
962 instrument_id.venue
963 );
964 return None;
965 };
966
967 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
968 instrument
969 } else {
970 log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
971 return None;
972 };
973
974 let currency = account
975 .base_currency()
976 .unwrap_or_else(|| instrument.settlement_currency());
977
978 let positions = cache.positions(
979 None, Some(instrument_id),
981 None,
982 None,
983 );
984
985 if positions.is_empty() {
986 return Some(Money::new(0.0, currency));
987 }
988
989 let mut total_pnl = 0.0;
990
991 for position in positions {
992 if position.instrument_id != *instrument_id {
993 continue; }
995
996 if position.realized_pnl.is_none() {
997 continue; }
999
1000 let mut pnl = position.realized_pnl?.as_f64();
1001
1002 if let Some(base_currency) = account.base_currency() {
1003 let xrate = if let Some(xrate) =
1004 self.calculate_xrate_to_base(instrument, account, position.entry)
1005 {
1006 xrate
1007 } else {
1008 log::error!(
1009 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1010 instrument.settlement_currency(),
1011 base_currency
1012 );
1013 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1014 return Some(Money::new(0.0, currency));
1017 };
1018
1019 let scale = 10f64.powi(currency.precision.into());
1020 pnl = ((pnl * xrate) * scale).round() / scale;
1021 }
1022
1023 total_pnl += pnl;
1024 }
1025
1026 Some(Money::new(total_pnl, currency))
1027 }
1028
1029 fn get_price(&self, position: &Position) -> Option<Price> {
1030 let price_type = match position.side {
1031 PositionSide::Long => PriceType::Bid,
1032 PositionSide::Short => PriceType::Ask,
1033 _ => panic!("invalid `PositionSide`, was {}", position.side),
1034 };
1035
1036 let cache = self.cache.borrow();
1037
1038 let instrument_id = &position.instrument_id;
1039 cache
1040 .price(instrument_id, price_type)
1041 .or_else(|| cache.price(instrument_id, PriceType::Last))
1042 .or_else(|| {
1043 self.inner
1044 .borrow()
1045 .bar_close_prices
1046 .get(instrument_id)
1047 .copied()
1048 })
1049 }
1050
1051 fn calculate_xrate_to_base(
1052 &self,
1053 instrument: &InstrumentAny,
1054 account: &AccountAny,
1055 side: OrderSide,
1056 ) -> Option<f64> {
1057 if !self.config.convert_to_account_base_currency {
1058 return Some(1.0); }
1060
1061 match account.base_currency() {
1062 None => Some(1.0), Some(base_currency) => {
1064 let cache = self.cache.borrow();
1065
1066 if self.config.use_mark_xrates {
1067 return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1068 }
1069
1070 let price_type = if side == OrderSide::Buy {
1071 PriceType::Bid
1072 } else {
1073 PriceType::Ask
1074 };
1075
1076 cache.get_xrate(
1077 instrument.id().venue,
1078 instrument.settlement_currency(),
1079 base_currency,
1080 price_type,
1081 )
1082 }
1083 }
1084 }
1085}
1086
1087fn update_quote_tick(
1089 cache: Rc<RefCell<Cache>>,
1090 clock: Rc<RefCell<dyn Clock>>,
1091 inner: Rc<RefCell<PortfolioState>>,
1092 quote: &QuoteTick,
1093) {
1094 update_instrument_id(cache, clock.clone(), inner, "e.instrument_id);
1095}
1096
1097fn update_bar(
1098 cache: Rc<RefCell<Cache>>,
1099 clock: Rc<RefCell<dyn Clock>>,
1100 inner: Rc<RefCell<PortfolioState>>,
1101 bar: &Bar,
1102) {
1103 let instrument_id = bar.bar_type.instrument_id();
1104 inner
1105 .borrow_mut()
1106 .bar_close_prices
1107 .insert(instrument_id, bar.close);
1108 update_instrument_id(cache, clock.clone(), inner, &instrument_id);
1109}
1110
1111fn update_instrument_id(
1112 cache: Rc<RefCell<Cache>>,
1113 clock: Rc<RefCell<dyn Clock>>,
1114 inner: Rc<RefCell<PortfolioState>>,
1115 instrument_id: &InstrumentId,
1116) {
1117 inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1118
1119 if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1120 return;
1121 }
1122
1123 let result_init;
1124 let mut result_maint = None;
1125
1126 let account = {
1127 let cache_ref = cache.borrow();
1128 let account = if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1129 account
1130 } else {
1131 log::error!(
1132 "Cannot update tick: no account registered for {}",
1133 instrument_id.venue
1134 );
1135 return;
1136 };
1137
1138 let mut cache_ref = cache.borrow_mut();
1139 let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1140 instrument.clone()
1141 } else {
1142 log::error!("Cannot update tick: no instrument found for {instrument_id}");
1143 return;
1144 };
1145
1146 let orders_open: Vec<OrderAny> = cache_ref
1148 .orders_open(None, Some(instrument_id), None, None)
1149 .iter()
1150 .map(|o| (*o).clone())
1151 .collect();
1152
1153 let positions_open: Vec<Position> = cache_ref
1154 .positions_open(None, Some(instrument_id), None, None)
1155 .iter()
1156 .map(|p| (*p).clone())
1157 .collect();
1158
1159 result_init = inner.borrow().accounts.update_orders(
1160 account,
1161 instrument.clone(),
1162 orders_open.iter().collect(),
1163 clock.borrow().timestamp_ns(),
1164 );
1165
1166 if let AccountAny::Margin(margin_account) = account {
1167 result_maint = inner.borrow().accounts.update_positions(
1168 margin_account,
1169 instrument,
1170 positions_open.iter().collect(),
1171 clock.borrow().timestamp_ns(),
1172 );
1173 }
1174
1175 if let Some((ref updated_account, _)) = result_init {
1176 cache_ref.add_account(updated_account.clone()).unwrap(); }
1178 account.clone()
1179 };
1180
1181 let mut portfolio_clone = Portfolio {
1182 clock: clock.clone(),
1183 cache,
1184 inner: inner.clone(),
1185 config: PortfolioConfig::default(), };
1187
1188 let result_unrealized_pnl: Option<Money> =
1189 portfolio_clone.calculate_unrealized_pnl(instrument_id);
1190
1191 if result_init.is_some()
1192 && (matches!(account, AccountAny::Cash(_))
1193 || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1194 {
1195 inner.borrow_mut().pending_calcs.remove(instrument_id);
1196 if inner.borrow().pending_calcs.is_empty() {
1197 inner.borrow_mut().initialized = true;
1198 }
1199 }
1200}
1201
1202fn update_order(
1203 cache: Rc<RefCell<Cache>>,
1204 clock: Rc<RefCell<dyn Clock>>,
1205 inner: Rc<RefCell<PortfolioState>>,
1206 event: &OrderEventAny,
1207) {
1208 let cache_ref = cache.borrow();
1209 let account_id = match event.account_id() {
1210 Some(account_id) => account_id,
1211 None => {
1212 return; }
1214 };
1215
1216 let account = if let Some(account) = cache_ref.account(&account_id) {
1217 account
1218 } else {
1219 log::error!("Cannot update order: no account registered for {account_id}");
1220 return;
1221 };
1222
1223 match account {
1224 AccountAny::Cash(cash_account) => {
1225 if !cash_account.base.calculate_account_state {
1226 return;
1227 }
1228 }
1229 AccountAny::Margin(margin_account) => {
1230 if !margin_account.base.calculate_account_state {
1231 return;
1232 }
1233 }
1234 }
1235
1236 match event {
1237 OrderEventAny::Accepted(_)
1238 | OrderEventAny::Canceled(_)
1239 | OrderEventAny::Rejected(_)
1240 | OrderEventAny::Updated(_)
1241 | OrderEventAny::Filled(_) => {}
1242 _ => {
1243 return;
1244 }
1245 }
1246
1247 let cache_ref = cache.borrow();
1248 let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1249 order
1250 } else {
1251 log::error!(
1252 "Cannot update order: {} not found in the cache",
1253 event.client_order_id()
1254 );
1255 return; };
1257
1258 if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1259 return; }
1261
1262 let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1263 instrument_id
1264 } else {
1265 log::error!(
1266 "Cannot update order: no instrument found for {}",
1267 event.instrument_id()
1268 );
1269 return;
1270 };
1271
1272 if let OrderEventAny::Filled(order_filled) = event {
1273 let _ = inner.borrow().accounts.update_balances(
1274 account.clone(),
1275 instrument.clone(),
1276 *order_filled,
1277 );
1278
1279 let mut portfolio_clone = Portfolio {
1280 clock: clock.clone(),
1281 cache: cache.clone(),
1282 inner: inner.clone(),
1283 config: PortfolioConfig::default(), };
1285
1286 match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1287 Some(unrealized_pnl) => {
1288 inner
1289 .borrow_mut()
1290 .unrealized_pnls
1291 .insert(event.instrument_id(), unrealized_pnl);
1292 }
1293 None => {
1294 log::error!(
1295 "Failed to calculate unrealized PnL for instrument {}",
1296 event.instrument_id()
1297 );
1298 }
1299 }
1300 }
1301
1302 let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None);
1303
1304 let account_state = inner.borrow_mut().accounts.update_orders(
1305 account,
1306 instrument.clone(),
1307 orders_open,
1308 clock.borrow().timestamp_ns(),
1309 );
1310
1311 let mut cache_ref = cache.borrow_mut();
1312 cache_ref.update_account(account.clone()).unwrap();
1313
1314 if let Some(account_state) = account_state {
1315 msgbus::publish(
1316 format!("events.account.{}", account.id()).into(),
1317 &account_state,
1318 );
1319 } else {
1320 log::debug!("Added pending calculation for {}", instrument.id());
1321 inner.borrow_mut().pending_calcs.insert(instrument.id());
1322 }
1323
1324 log::debug!("Updated {event}");
1325}
1326
1327fn update_position(
1328 cache: Rc<RefCell<Cache>>,
1329 clock: Rc<RefCell<dyn Clock>>,
1330 inner: Rc<RefCell<PortfolioState>>,
1331 event: &PositionEvent,
1332) {
1333 let instrument_id = event.instrument_id();
1334
1335 let positions_open: Vec<Position> = {
1336 let cache_ref = cache.borrow();
1337
1338 cache_ref
1339 .positions_open(None, Some(&instrument_id), None, None)
1340 .iter()
1341 .map(|o| (*o).clone())
1342 .collect()
1343 };
1344
1345 log::debug!("postion fresh from cache -> {positions_open:?}");
1346
1347 let mut portfolio_clone = Portfolio {
1348 clock: clock.clone(),
1349 cache: cache.clone(),
1350 inner: inner.clone(),
1351 config: PortfolioConfig::default(), };
1353
1354 portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1355
1356 let calculated_unrealized_pnl = portfolio_clone
1357 .calculate_unrealized_pnl(&instrument_id)
1358 .expect("Failed to calculate unrealized PnL");
1359 let calculated_realized_pnl = portfolio_clone
1360 .calculate_realized_pnl(&instrument_id)
1361 .expect("Failed to calculate realized PnL");
1362
1363 inner
1364 .borrow_mut()
1365 .unrealized_pnls
1366 .insert(event.instrument_id(), calculated_unrealized_pnl);
1367 inner
1368 .borrow_mut()
1369 .realized_pnls
1370 .insert(event.instrument_id(), calculated_realized_pnl);
1371
1372 let cache_ref = cache.borrow();
1373 let account = cache_ref.account(&event.account_id());
1374
1375 if let Some(AccountAny::Margin(margin_account)) = account {
1376 if !margin_account.calculate_account_state {
1377 return; }
1379
1380 let cache_ref = cache.borrow();
1381 let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1382 instrument
1383 } else {
1384 log::error!("Cannot update position: no instrument found for {instrument_id}");
1385 return;
1386 };
1387
1388 let result = inner.borrow_mut().accounts.update_positions(
1389 margin_account,
1390 instrument.clone(),
1391 positions_open.iter().collect(),
1392 clock.borrow().timestamp_ns(),
1393 );
1394 let mut cache_ref = cache.borrow_mut();
1395 if let Some((margin_account, _)) = result {
1396 cache_ref
1397 .add_account(AccountAny::Margin(margin_account)) .unwrap();
1399 }
1400 } else if account.is_none() {
1401 log::error!(
1402 "Cannot update position: no account registered for {}",
1403 event.account_id()
1404 );
1405 }
1406}
1407
1408fn update_account(
1409 cache: Rc<RefCell<Cache>>,
1410 inner: Rc<RefCell<PortfolioState>>,
1411 event: &AccountState,
1412) {
1413 let mut cache_ref = cache.borrow_mut();
1414
1415 if let Some(existing) = cache_ref.account(&event.account_id) {
1416 let mut account = existing.clone();
1417 account.apply(event.clone());
1418
1419 if let Err(e) = cache_ref.update_account(account.clone()) {
1420 log::error!("Failed to update account: {e}");
1421 return;
1422 }
1423 } else {
1424 let account = match AccountAny::from_events(vec![event.clone()]) {
1425 Ok(account) => account,
1426 Err(e) => {
1427 log::error!("Failed to create account: {e}");
1428 return;
1429 }
1430 };
1431
1432 if let Err(e) = cache_ref.add_account(account) {
1433 log::error!("Failed to add account: {e}");
1434 return;
1435 }
1436 }
1437
1438 let mut inner_ref = inner.borrow_mut();
1440 let should_log = if inner_ref.min_account_state_logging_interval_ns > 0 {
1441 let current_ts = event.ts_init.as_u64();
1442 let last_ts = inner_ref
1443 .last_account_state_log_ts
1444 .get(&event.account_id)
1445 .copied()
1446 .unwrap_or(0);
1447
1448 if last_ts == 0 || (current_ts - last_ts) >= inner_ref.min_account_state_logging_interval_ns
1449 {
1450 inner_ref
1451 .last_account_state_log_ts
1452 .insert(event.account_id, current_ts);
1453 true
1454 } else {
1455 false
1456 }
1457 } else {
1458 true };
1460
1461 if should_log {
1462 log::info!("Updated {event}");
1463 }
1464}