1use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_analysis::analyzer::PortfolioAnalyzer;
22use nautilus_common::{
23 cache::Cache,
24 clock::Clock,
25 enums::LogColor,
26 msgbus::{self, MessagingSwitchboard, TypedHandler},
27};
28use nautilus_core::{WeakCell, datetime::NANOSECONDS_IN_MILLISECOND};
29use nautilus_model::{
30 accounts::AccountAny,
31 data::{Bar, MarkPriceUpdate, QuoteTick},
32 enums::{OmsType, OrderSide, OrderType, PositionSide, PriceType},
33 events::{AccountState, OrderEventAny, position::PositionEvent},
34 identifiers::{AccountId, InstrumentId, PositionId, Venue},
35 instruments::{Instrument, InstrumentAny},
36 orders::{Order, OrderAny},
37 position::Position,
38 types::{Currency, Money, Price},
39};
40use rust_decimal::Decimal;
41
42use crate::{config::PortfolioConfig, manager::AccountsManager};
43
44struct PortfolioState {
45 accounts: AccountsManager,
46 analyzer: PortfolioAnalyzer,
47 unrealized_pnls: AHashMap<InstrumentId, Money>,
48 realized_pnls: AHashMap<InstrumentId, Money>,
49 snapshot_sum_per_position: AHashMap<PositionId, Money>,
50 snapshot_last_per_position: AHashMap<PositionId, Money>,
51 snapshot_processed_counts: AHashMap<PositionId, usize>,
52 net_positions: AHashMap<InstrumentId, Decimal>,
53 pending_calcs: AHashSet<InstrumentId>,
54 bar_close_prices: AHashMap<InstrumentId, Price>,
55 initialized: bool,
56 last_account_state_log_ts: AHashMap<AccountId, u64>,
57 min_account_state_logging_interval_ns: u64,
58}
59
60impl PortfolioState {
61 fn new(
62 clock: Rc<RefCell<dyn Clock>>,
63 cache: Rc<RefCell<Cache>>,
64 config: &PortfolioConfig,
65 ) -> Self {
66 let min_account_state_logging_interval_ns = config
67 .min_account_state_logging_interval_ms
68 .map_or(0, |ms| ms * NANOSECONDS_IN_MILLISECOND);
69
70 Self {
71 accounts: AccountsManager::new(clock, cache),
72 analyzer: PortfolioAnalyzer::default(),
73 unrealized_pnls: AHashMap::new(),
74 realized_pnls: AHashMap::new(),
75 snapshot_sum_per_position: AHashMap::new(),
76 snapshot_last_per_position: AHashMap::new(),
77 snapshot_processed_counts: AHashMap::new(),
78 net_positions: AHashMap::new(),
79 pending_calcs: AHashSet::new(),
80 bar_close_prices: AHashMap::new(),
81 initialized: false,
82 last_account_state_log_ts: AHashMap::new(),
83 min_account_state_logging_interval_ns,
84 }
85 }
86
87 fn reset(&mut self) {
88 log::debug!("RESETTING");
89 self.net_positions.clear();
90 self.unrealized_pnls.clear();
91 self.realized_pnls.clear();
92 self.snapshot_sum_per_position.clear();
93 self.snapshot_last_per_position.clear();
94 self.snapshot_processed_counts.clear();
95 self.pending_calcs.clear();
96 self.last_account_state_log_ts.clear();
97 self.analyzer.reset();
98 log::debug!("READY");
99 }
100}
101
102pub struct Portfolio {
103 pub(crate) clock: Rc<RefCell<dyn Clock>>,
104 pub(crate) cache: Rc<RefCell<Cache>>,
105 inner: Rc<RefCell<PortfolioState>>,
106 config: PortfolioConfig,
107}
108
109impl Debug for Portfolio {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 f.debug_struct(stringify!(Portfolio)).finish()
112 }
113}
114
115impl Portfolio {
116 pub fn new(
117 cache: Rc<RefCell<Cache>>,
118 clock: Rc<RefCell<dyn Clock>>,
119 config: Option<PortfolioConfig>,
120 ) -> Self {
121 let config = config.unwrap_or_default();
122 let inner = Rc::new(RefCell::new(PortfolioState::new(
123 clock.clone(),
124 cache.clone(),
125 &config,
126 )));
127
128 Self::register_message_handlers(
129 cache.clone(),
130 clock.clone(),
131 inner.clone(),
132 config.clone(),
133 );
134
135 Self {
136 clock,
137 cache,
138 inner,
139 config,
140 }
141 }
142
143 #[must_use]
148 pub fn clone_shallow(&self) -> Self {
149 Self {
150 clock: self.clock.clone(),
151 cache: self.cache.clone(),
152 inner: self.inner.clone(),
153 config: self.config.clone(),
154 }
155 }
156
157 fn register_message_handlers(
158 cache: Rc<RefCell<Cache>>,
159 clock: Rc<RefCell<dyn Clock>>,
160 inner: Rc<RefCell<PortfolioState>>,
161 config: PortfolioConfig,
162 ) {
163 let inner_weak = WeakCell::from(Rc::downgrade(&inner));
164
165 let update_account_handler = {
167 let cache = cache.clone();
168 let inner = inner_weak.clone();
169 TypedHandler::from(move |event: &AccountState| {
170 if let Some(inner_rc) = inner.upgrade() {
171 update_account(cache.clone(), inner_rc.into(), event);
172 }
173 })
174 };
175
176 let update_position_handler = {
177 let cache = cache.clone();
178 let clock = clock.clone();
179 let inner = inner_weak.clone();
180 let config = config.clone();
181 TypedHandler::from(move |event: &PositionEvent| {
182 if let Some(inner_rc) = inner.upgrade() {
183 update_position(
184 cache.clone(),
185 clock.clone(),
186 inner_rc.into(),
187 config.clone(),
188 event,
189 );
190 }
191 })
192 };
193
194 let update_quote_handler = {
195 let cache = cache.clone();
196 let clock = clock.clone();
197 let inner = inner_weak.clone();
198 let config = config.clone();
199 TypedHandler::from(move |quote: &QuoteTick| {
200 if let Some(inner_rc) = inner.upgrade() {
201 update_quote_tick(
202 cache.clone(),
203 clock.clone(),
204 inner_rc.into(),
205 config.clone(),
206 quote,
207 );
208 }
209 })
210 };
211
212 let update_bar_handler = {
213 let cache = cache.clone();
214 let clock = clock.clone();
215 let inner = inner_weak.clone();
216 let config = config.clone();
217 TypedHandler::from(move |bar: &Bar| {
218 if let Some(inner_rc) = inner.upgrade() {
219 update_bar(
220 cache.clone(),
221 clock.clone(),
222 inner_rc.into(),
223 config.clone(),
224 bar,
225 );
226 }
227 })
228 };
229
230 let update_mark_price_handler = {
231 let cache = cache.clone();
232 let clock = clock.clone();
233 let inner = inner_weak.clone();
234 let config = config.clone();
235 TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
236 if let Some(inner_rc) = inner.upgrade() {
237 update_instrument_id(
238 cache.clone(),
239 clock.clone(),
240 inner_rc.into(),
241 config.clone(),
242 &mark_price.instrument_id,
243 );
244 }
245 })
246 };
247
248 let update_order_handler = {
249 let cache = cache;
250 let clock = clock.clone();
251 let inner = inner_weak;
252 let config = config.clone();
253 TypedHandler::from(move |event: &OrderEventAny| {
254 if let Some(inner_rc) = inner.upgrade() {
255 update_order(
256 cache.clone(),
257 clock.clone(),
258 inner_rc.into(),
259 config.clone(),
260 event,
261 );
262 }
263 })
264 };
265
266 let endpoint = MessagingSwitchboard::portfolio_update_account();
267 msgbus::register_account_state_endpoint(endpoint, update_account_handler.clone());
268
269 msgbus::subscribe_quotes("data.quotes.*".into(), update_quote_handler, Some(10));
270 if config.bar_updates {
271 msgbus::subscribe_bars("data.bars.*EXTERNAL".into(), update_bar_handler, Some(10));
272 }
273 if config.use_mark_prices {
274 msgbus::subscribe_mark_prices(
275 "data.mark_prices.*".into(),
276 update_mark_price_handler,
277 Some(10),
278 );
279 }
280 msgbus::subscribe_order_events("events.order.*".into(), update_order_handler, Some(10));
281 msgbus::subscribe_position_events(
282 "events.position.*".into(),
283 update_position_handler,
284 Some(10),
285 );
286 msgbus::subscribe_account_state(
287 "events.account.*".into(),
288 update_account_handler,
289 Some(10),
290 );
291 }
292
293 pub fn reset(&mut self) {
294 log::debug!("RESETTING");
295 self.inner.borrow_mut().reset();
296 log::debug!("READY");
297 }
298
299 #[must_use]
301 pub fn cache(&self) -> &Rc<RefCell<Cache>> {
302 &self.cache
303 }
304
305 #[must_use]
307 pub fn is_initialized(&self) -> bool {
308 self.inner.borrow().initialized
309 }
310
311 #[must_use]
315 pub fn balances_locked(&self, venue: &Venue) -> AHashMap<Currency, Money> {
316 self.cache.borrow().account_for_venue(venue).map_or_else(
317 || {
318 log::error!("Cannot get balances locked: no account generated for {venue}");
319 AHashMap::new()
320 },
321 AccountAny::balances_locked,
322 )
323 }
324
325 #[must_use]
329 pub fn margins_init(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
330 self.cache.borrow().account_for_venue(venue).map_or_else(
331 || {
332 log::error!(
333 "Cannot get initial (order) margins: no account registered for {venue}"
334 );
335 AHashMap::new()
336 },
337 |account| match account {
338 AccountAny::Margin(margin_account) => margin_account.initial_margins(),
339 AccountAny::Cash(_) => {
340 log::warn!("Initial margins not applicable for cash account");
341 AHashMap::new()
342 }
343 },
344 )
345 }
346
347 #[must_use]
351 pub fn margins_maint(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
352 self.cache.borrow().account_for_venue(venue).map_or_else(
353 || {
354 log::error!(
355 "Cannot get maintenance (position) margins: no account registered for {venue}"
356 );
357 AHashMap::new()
358 },
359 |account| match account {
360 AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
361 AccountAny::Cash(_) => {
362 log::warn!("Maintenance margins not applicable for cash account");
363 AHashMap::new()
364 }
365 },
366 )
367 }
368
369 #[must_use]
373 pub fn unrealized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
374 let instrument_ids = {
375 let cache = self.cache.borrow();
376 let positions = cache.positions(Some(venue), None, None, None, None);
377
378 if positions.is_empty() {
379 return AHashMap::new(); }
381
382 let instrument_ids: AHashSet<InstrumentId> =
383 positions.iter().map(|p| p.instrument_id).collect();
384
385 instrument_ids
386 };
387
388 let mut unrealized_pnls: AHashMap<Currency, f64> = AHashMap::new();
389
390 for instrument_id in instrument_ids {
391 if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
392 *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
394 continue;
395 }
396
397 match self.calculate_unrealized_pnl(&instrument_id) {
399 Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
400 None => continue,
401 }
402 }
403
404 unrealized_pnls
405 .into_iter()
406 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
407 .collect()
408 }
409
410 #[must_use]
414 pub fn realized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
415 let instrument_ids = {
416 let cache = self.cache.borrow();
417 let positions = cache.positions(Some(venue), None, None, None, None);
418
419 if positions.is_empty() {
420 return AHashMap::new(); }
422
423 let instrument_ids: AHashSet<InstrumentId> =
424 positions.iter().map(|p| p.instrument_id).collect();
425
426 instrument_ids
427 };
428
429 let mut realized_pnls: AHashMap<Currency, f64> = AHashMap::new();
430
431 for instrument_id in instrument_ids {
432 if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
433 *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
435 continue;
436 }
437
438 match self.calculate_realized_pnl(&instrument_id) {
440 Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
441 None => continue,
442 }
443 }
444
445 realized_pnls
446 .into_iter()
447 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
448 .collect()
449 }
450
451 #[must_use]
452 pub fn net_exposures(&self, venue: &Venue) -> Option<AHashMap<Currency, Money>> {
453 let cache = self.cache.borrow();
454 let account = if let Some(account) = cache.account_for_venue(venue) {
455 account
456 } else {
457 log::error!("Cannot calculate net exposures: no account registered for {venue}");
458 return None; };
460
461 let positions_open = cache.positions_open(Some(venue), None, None, None, None);
462 if positions_open.is_empty() {
463 return Some(AHashMap::new()); }
465
466 let mut net_exposures: AHashMap<Currency, f64> = AHashMap::new();
467
468 for position in positions_open {
469 let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
470 instrument
471 } else {
472 log::error!(
473 "Cannot calculate net exposures: no instrument for {}",
474 position.instrument_id
475 );
476 return None; };
478
479 if position.side == PositionSide::Flat {
480 log::error!(
481 "Cannot calculate net exposures: position is flat for {}",
482 position.instrument_id
483 );
484 continue; }
486
487 let price = self.get_price(position)?;
488 let xrate = if let Some(xrate) =
489 self.calculate_xrate_to_base(instrument, account, position.entry)
490 {
491 xrate
492 } else {
493 log::error!(
494 "Cannot calculate net exposures: insufficient data for {}/{:?}",
496 instrument.settlement_currency(),
497 account.base_currency()
498 );
499 return None; };
501
502 let settlement_currency = account
503 .base_currency()
504 .unwrap_or_else(|| instrument.settlement_currency());
505
506 let net_exposure = instrument
507 .calculate_notional_value(position.quantity, price, None)
508 .as_f64()
509 * xrate;
510
511 let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
512 .round()
513 / 10f64.powi(settlement_currency.precision.into());
514
515 *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
516 }
517
518 Some(
519 net_exposures
520 .into_iter()
521 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
522 .collect(),
523 )
524 }
525
526 #[must_use]
527 pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
528 if let Some(pnl) = self
529 .inner
530 .borrow()
531 .unrealized_pnls
532 .get(instrument_id)
533 .copied()
534 {
535 return Some(pnl);
536 }
537
538 let pnl = self.calculate_unrealized_pnl(instrument_id)?;
539 self.inner
540 .borrow_mut()
541 .unrealized_pnls
542 .insert(*instrument_id, pnl);
543 Some(pnl)
544 }
545
546 #[must_use]
547 pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
548 if let Some(pnl) = self
549 .inner
550 .borrow()
551 .realized_pnls
552 .get(instrument_id)
553 .copied()
554 {
555 return Some(pnl);
556 }
557
558 let pnl = self.calculate_realized_pnl(instrument_id)?;
559 self.inner
560 .borrow_mut()
561 .realized_pnls
562 .insert(*instrument_id, pnl);
563 Some(pnl)
564 }
565
566 #[must_use]
570 pub fn total_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
571 let realized = self.realized_pnl(instrument_id)?;
572 let unrealized = self.unrealized_pnl(instrument_id)?;
573
574 if realized.currency != unrealized.currency {
575 log::error!(
576 "Cannot calculate total PnL: currency mismatch {} vs {}",
577 realized.currency,
578 unrealized.currency
579 );
580 return None;
581 }
582
583 Some(Money::new(
584 realized.as_f64() + unrealized.as_f64(),
585 realized.currency,
586 ))
587 }
588
589 #[must_use]
593 pub fn total_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
594 let realized_pnls = self.realized_pnls(venue);
595 let unrealized_pnls = self.unrealized_pnls(venue);
596
597 let mut total_pnls: AHashMap<Currency, Money> = AHashMap::new();
598
599 for (currency, realized) in realized_pnls {
601 total_pnls.insert(currency, realized);
602 }
603
604 for (currency, unrealized) in unrealized_pnls {
606 match total_pnls.get_mut(¤cy) {
607 Some(total) => {
608 *total = *total + unrealized;
609 }
610 None => {
611 total_pnls.insert(currency, unrealized);
612 }
613 }
614 }
615
616 total_pnls
617 }
618
619 #[must_use]
620 pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
621 let cache = self.cache.borrow();
622
623 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
624 instrument
625 } else {
626 log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
627 return None;
628 };
629
630 let positions_open = cache.positions_open(
631 None, Some(instrument_id),
633 None,
634 None,
635 None,
636 );
637
638 if positions_open.is_empty() {
639 return Some(Money::new(0.0, instrument.settlement_currency()));
640 }
641
642 let mut net_exposure = 0.0;
643 let mut first_base_currency: Option<Currency> = None;
644 let mut first_account: Option<&AccountAny> = None;
645
646 for position in &positions_open {
647 let account = if let Some(account) = cache.account(&position.account_id) {
649 account
650 } else {
651 log::error!(
652 "Cannot calculate net exposure: no account for {}",
653 position.account_id
654 );
655 return None;
656 };
657
658 if let Some(base) = account.base_currency() {
660 match first_base_currency {
661 None => {
662 first_base_currency = Some(base);
663 first_account = Some(account);
664 }
665 Some(first) if first != base => {
666 log::error!(
667 "Cannot calculate net exposure: accounts have different base \
668 currencies ({first} vs {base}); multi-account aggregation requires \
669 consistent base currencies"
670 );
671 return None;
672 }
673 _ => {}
674 }
675 }
676
677 let price = self.get_price(position)?;
678 let xrate = if let Some(xrate) =
679 self.calculate_xrate_to_base(instrument, account, position.entry)
680 {
681 xrate
682 } else {
683 log::error!(
684 "Cannot calculate net exposures: insufficient data for {}/{:?}",
685 instrument.settlement_currency(),
686 account.base_currency()
687 );
688 return None;
689 };
690
691 let notional_value =
692 instrument.calculate_notional_value(position.quantity, price, None);
693 net_exposure += notional_value.as_f64() * xrate;
694 }
695
696 let settlement_currency = first_account
697 .and_then(|a| a.base_currency())
698 .unwrap_or_else(|| instrument.settlement_currency());
699
700 Some(Money::new(net_exposure, settlement_currency))
701 }
702
703 #[must_use]
704 pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
705 self.inner
706 .borrow()
707 .net_positions
708 .get(instrument_id)
709 .copied()
710 .unwrap_or(Decimal::ZERO)
711 }
712
713 #[must_use]
714 pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
715 self.inner
716 .borrow()
717 .net_positions
718 .get(instrument_id)
719 .copied()
720 .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
721 }
722
723 #[must_use]
724 pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
725 self.inner
726 .borrow()
727 .net_positions
728 .get(instrument_id)
729 .copied()
730 .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
731 }
732
733 #[must_use]
734 pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
735 self.inner
736 .borrow()
737 .net_positions
738 .get(instrument_id)
739 .copied()
740 .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
741 }
742
743 #[must_use]
744 pub fn is_completely_flat(&self) -> bool {
745 for net_position in self.inner.borrow().net_positions.values() {
746 if *net_position != Decimal::ZERO {
747 return false;
748 }
749 }
750 true
751 }
752
753 pub fn initialize_orders(&mut self) {
759 let mut initialized = true;
760 let orders_and_instruments = {
761 let cache = self.cache.borrow();
762 let all_orders_open = cache.orders_open(None, None, None, None, None);
763
764 let mut instruments_with_orders = Vec::new();
765 let mut instruments = AHashSet::new();
766
767 for order in &all_orders_open {
768 instruments.insert(order.instrument_id());
769 }
770
771 for instrument_id in instruments {
772 if let Some(instrument) = cache.instrument(&instrument_id) {
773 let orders = cache
774 .orders_open(None, Some(&instrument_id), None, None, None)
775 .into_iter()
776 .cloned()
777 .collect::<Vec<OrderAny>>();
778 instruments_with_orders.push((instrument.clone(), orders));
779 } else {
780 log::error!(
781 "Cannot update initial (order) margin: no instrument found for {instrument_id}"
782 );
783 initialized = false;
784 break;
785 }
786 }
787 instruments_with_orders
788 };
789
790 for (instrument, orders_open) in &orders_and_instruments {
791 let mut cache = self.cache.borrow_mut();
792 let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
793 account
794 } else {
795 log::error!(
796 "Cannot update initial (order) margin: no account registered for {}",
797 instrument.id().venue
798 );
799 initialized = false;
800 break;
801 };
802
803 let result = self.inner.borrow_mut().accounts.update_orders(
804 account,
805 instrument.clone(),
806 orders_open.iter().collect(),
807 self.clock.borrow().timestamp_ns(),
808 );
809
810 match result {
811 Some((updated_account, _)) => {
812 cache.update_account(updated_account).unwrap();
813 }
814 None => {
815 initialized = false;
816 }
817 }
818 }
819
820 let total_orders = orders_and_instruments
821 .into_iter()
822 .map(|(_, orders)| orders.len())
823 .sum::<usize>();
824
825 log::info!(
826 color = if total_orders > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
827 "Initialized {} open order{}",
828 total_orders,
829 if total_orders == 1 { "" } else { "s" }
830 );
831
832 self.inner.borrow_mut().initialized = initialized;
833 }
834
835 pub fn initialize_positions(&mut self) {
841 self.inner.borrow_mut().unrealized_pnls.clear();
842 self.inner.borrow_mut().realized_pnls.clear();
843 let all_positions_open: Vec<Position>;
844 let mut instruments = AHashSet::new();
845 {
846 let cache = self.cache.borrow();
847 all_positions_open = cache
848 .positions_open(None, None, None, None, None)
849 .into_iter()
850 .cloned()
851 .collect();
852 for position in &all_positions_open {
853 instruments.insert(position.instrument_id);
854 }
855 }
856
857 let mut initialized = true;
858
859 for instrument_id in instruments {
860 let positions_open: Vec<Position> = {
861 let cache = self.cache.borrow();
862 cache
863 .positions_open(None, Some(&instrument_id), None, None, None)
864 .into_iter()
865 .cloned()
866 .collect()
867 };
868
869 self.update_net_position(&instrument_id, &positions_open);
870
871 if let Some(calculated_unrealized_pnl) = self.calculate_unrealized_pnl(&instrument_id) {
872 self.inner
873 .borrow_mut()
874 .unrealized_pnls
875 .insert(instrument_id, calculated_unrealized_pnl);
876 } else {
877 log::warn!(
878 "Failed to calculate unrealized PnL for {instrument_id}, marking as pending"
879 );
880 self.inner.borrow_mut().pending_calcs.insert(instrument_id);
881 }
882
883 if let Some(calculated_realized_pnl) = self.calculate_realized_pnl(&instrument_id) {
884 self.inner
885 .borrow_mut()
886 .realized_pnls
887 .insert(instrument_id, calculated_realized_pnl);
888 } else {
889 log::warn!(
890 "Failed to calculate realized PnL for {instrument_id}, marking as pending"
891 );
892 self.inner.borrow_mut().pending_calcs.insert(instrument_id);
893 }
894
895 let cache = self.cache.borrow();
896 let Some(account) = cache.account_for_venue(&instrument_id.venue).cloned() else {
897 log::error!(
898 "Cannot update maintenance (position) margin: no account registered for {}",
899 instrument_id.venue
900 );
901 initialized = false;
902 break;
903 };
904
905 let account = match account {
906 AccountAny::Cash(_) => continue,
907 AccountAny::Margin(margin_account) => margin_account,
908 };
909
910 let Some(instrument) = cache.instrument(&instrument_id).cloned() else {
911 log::error!(
912 "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
913 );
914 initialized = false;
915 break;
916 };
917 let positions: Vec<Position> = cache
918 .positions_open(None, Some(&instrument_id), None, None, None)
919 .into_iter()
920 .cloned()
921 .collect();
922 drop(cache);
923
924 let result = self.inner.borrow_mut().accounts.update_positions(
925 &account,
926 instrument,
927 positions.iter().collect(),
928 self.clock.borrow().timestamp_ns(),
929 );
930
931 match result {
932 Some((updated_account, _)) => {
933 self.cache
934 .borrow_mut()
935 .update_account(AccountAny::Margin(updated_account))
936 .unwrap();
937 }
938 None => {
939 initialized = false;
940 }
941 }
942 }
943
944 let open_count = all_positions_open.len();
945 self.inner.borrow_mut().initialized = initialized;
946 log::info!(
947 color = if open_count > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
948 "Initialized {} open position{}",
949 open_count,
950 if open_count == 1 { "" } else { "s" }
951 );
952 }
953
954 pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
958 update_quote_tick(
959 self.cache.clone(),
960 self.clock.clone(),
961 self.inner.clone(),
962 self.config.clone(),
963 quote,
964 );
965 }
966
967 pub fn update_bar(&mut self, bar: &Bar) {
971 update_bar(
972 self.cache.clone(),
973 self.clock.clone(),
974 self.inner.clone(),
975 self.config.clone(),
976 bar,
977 );
978 }
979
980 pub fn update_account(&mut self, event: &AccountState) {
982 update_account(self.cache.clone(), self.inner.clone(), event);
983 }
984
985 pub fn update_order(&mut self, event: &OrderEventAny) {
989 update_order(
990 self.cache.clone(),
991 self.clock.clone(),
992 self.inner.clone(),
993 self.config.clone(),
994 event,
995 );
996 }
997
998 pub fn update_position(&mut self, event: &PositionEvent) {
1002 update_position(
1003 self.cache.clone(),
1004 self.clock.clone(),
1005 self.inner.clone(),
1006 self.config.clone(),
1007 event,
1008 );
1009 }
1010
1011 fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: &[Position]) {
1012 let mut net_position = Decimal::ZERO;
1013
1014 for open_position in positions_open {
1015 log::debug!("open_position: {open_position}");
1016 net_position += open_position.signed_decimal_qty();
1017 }
1018
1019 let existing_position = self.net_position(instrument_id);
1020 if existing_position != net_position {
1021 self.inner
1022 .borrow_mut()
1023 .net_positions
1024 .insert(*instrument_id, net_position);
1025 log::info!("{instrument_id} net_position={net_position}");
1026 }
1027 }
1028
1029 fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1030 let cache = self.cache.borrow();
1031 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1032 account
1033 } else {
1034 log::error!(
1035 "Cannot calculate unrealized PnL: no account registered for {}",
1036 instrument_id.venue
1037 );
1038 return None;
1039 };
1040
1041 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1042 instrument
1043 } else {
1044 log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
1045 return None;
1046 };
1047
1048 let currency = account
1049 .base_currency()
1050 .unwrap_or_else(|| instrument.settlement_currency());
1051
1052 let positions_open = cache.positions_open(
1053 None, Some(instrument_id),
1055 None,
1056 None,
1057 None,
1058 );
1059
1060 if positions_open.is_empty() {
1061 return Some(Money::new(0.0, currency));
1062 }
1063
1064 let mut total_pnl = 0.0;
1065
1066 for position in positions_open {
1067 if position.instrument_id != *instrument_id {
1068 continue; }
1070
1071 if position.side == PositionSide::Flat {
1072 continue; }
1074
1075 let price = if let Some(price) = self.get_price(position) {
1076 price
1077 } else {
1078 log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
1079 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1080 return None; };
1082
1083 let mut pnl = position.unrealized_pnl(price).as_f64();
1084
1085 if let Some(base_currency) = account.base_currency() {
1086 let xrate = if let Some(xrate) =
1087 self.calculate_xrate_to_base(instrument, account, position.entry)
1088 {
1089 xrate
1090 } else {
1091 log::error!(
1092 "Cannot calculate unrealized PnL: insufficient data for {}/{}",
1094 instrument.settlement_currency(),
1095 base_currency
1096 );
1097 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1098 return None; };
1100
1101 let scale = 10f64.powi(currency.precision.into());
1102 pnl = ((pnl * xrate) * scale).round() / scale;
1103 }
1104
1105 total_pnl += pnl;
1106 }
1107
1108 Some(Money::new(total_pnl, currency))
1109 }
1110
1111 fn ensure_snapshot_pnls_cached_for(&mut self, instrument_id: &InstrumentId) {
1112 let snapshot_position_ids = self.cache.borrow().position_snapshot_ids(instrument_id);
1118
1119 if snapshot_position_ids.is_empty() {
1120 return; }
1122
1123 let mut rebuild = false;
1124
1125 for position_id in &snapshot_position_ids {
1127 let position_snapshots = self.cache.borrow().position_snapshot_bytes(position_id);
1128 let curr_count = position_snapshots.map_or(0, |s| {
1129 s.split(|&b| b == b'{').count() - 1
1131 });
1132 let prev_count = self
1133 .inner
1134 .borrow()
1135 .snapshot_processed_counts
1136 .get(position_id)
1137 .copied()
1138 .unwrap_or(0);
1139
1140 if prev_count > curr_count {
1141 rebuild = true;
1142 break;
1143 }
1144 }
1145
1146 if rebuild {
1147 for position_id in &snapshot_position_ids {
1149 if let Some(position_snapshots) =
1150 self.cache.borrow().position_snapshot_bytes(position_id)
1151 {
1152 let mut sum_pnl: Option<Money> = None;
1153 let mut last_pnl: Option<Money> = None;
1154
1155 let mut start = 0;
1157 let mut depth = 0;
1158 let mut in_string = false;
1159 let mut escape_next = false;
1160
1161 for (i, &byte) in position_snapshots.iter().enumerate() {
1162 if escape_next {
1163 escape_next = false;
1164 continue;
1165 }
1166
1167 if byte == b'\\' && in_string {
1168 escape_next = true;
1169 continue;
1170 }
1171
1172 if byte == b'"' && !escape_next {
1173 in_string = !in_string;
1174 }
1175
1176 if !in_string {
1177 if byte == b'{' {
1178 if depth == 0 {
1179 start = i;
1180 }
1181 depth += 1;
1182 } else if byte == b'}' {
1183 depth -= 1;
1184 if depth == 0
1185 && let Ok(snapshot) = serde_json::from_slice::<Position>(
1186 &position_snapshots[start..=i],
1187 )
1188 && let Some(realized_pnl) = snapshot.realized_pnl
1189 {
1190 if let Some(ref mut sum) = sum_pnl {
1191 if sum.currency == realized_pnl.currency {
1192 *sum = Money::new(
1193 sum.as_f64() + realized_pnl.as_f64(),
1194 sum.currency,
1195 );
1196 }
1197 } else {
1198 sum_pnl = Some(realized_pnl);
1199 }
1200 last_pnl = Some(realized_pnl);
1201 }
1202 }
1203 }
1204 }
1205
1206 let mut inner = self.inner.borrow_mut();
1207 if let Some(sum) = sum_pnl {
1208 inner.snapshot_sum_per_position.insert(*position_id, sum);
1209 if let Some(last) = last_pnl {
1210 inner.snapshot_last_per_position.insert(*position_id, last);
1211 }
1212 } else {
1213 inner.snapshot_sum_per_position.remove(position_id);
1214 inner.snapshot_last_per_position.remove(position_id);
1215 }
1216
1217 let snapshot_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1218 inner
1219 .snapshot_processed_counts
1220 .insert(*position_id, snapshot_count);
1221 }
1222 }
1223 } else {
1224 for position_id in &snapshot_position_ids {
1226 if let Some(position_snapshots) =
1227 self.cache.borrow().position_snapshot_bytes(position_id)
1228 {
1229 let curr_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1230 let prev_count = self
1231 .inner
1232 .borrow()
1233 .snapshot_processed_counts
1234 .get(position_id)
1235 .copied()
1236 .unwrap_or(0);
1237
1238 if prev_count >= curr_count {
1239 continue;
1240 }
1241
1242 let mut sum_pnl = self
1243 .inner
1244 .borrow()
1245 .snapshot_sum_per_position
1246 .get(position_id)
1247 .copied();
1248 let mut last_pnl = self
1249 .inner
1250 .borrow()
1251 .snapshot_last_per_position
1252 .get(position_id)
1253 .copied();
1254
1255 let mut start = 0;
1257 let mut depth = 0;
1258 let mut in_string = false;
1259 let mut escape_next = false;
1260 let mut snapshot_index = 0;
1261
1262 for (i, &byte) in position_snapshots.iter().enumerate() {
1263 if escape_next {
1264 escape_next = false;
1265 continue;
1266 }
1267
1268 if byte == b'\\' && in_string {
1269 escape_next = true;
1270 continue;
1271 }
1272
1273 if byte == b'"' && !escape_next {
1274 in_string = !in_string;
1275 }
1276
1277 if !in_string {
1278 if byte == b'{' {
1279 if depth == 0 {
1280 start = i;
1281 }
1282 depth += 1;
1283 } else if byte == b'}' {
1284 depth -= 1;
1285 if depth == 0 {
1286 snapshot_index += 1;
1287 if snapshot_index > prev_count
1289 && let Ok(snapshot) = serde_json::from_slice::<Position>(
1290 &position_snapshots[start..=i],
1291 )
1292 && let Some(realized_pnl) = snapshot.realized_pnl
1293 {
1294 if let Some(ref mut sum) = sum_pnl {
1295 if sum.currency == realized_pnl.currency {
1296 *sum = Money::new(
1297 sum.as_f64() + realized_pnl.as_f64(),
1298 sum.currency,
1299 );
1300 }
1301 } else {
1302 sum_pnl = Some(realized_pnl);
1303 }
1304 last_pnl = Some(realized_pnl);
1305 }
1306 }
1307 }
1308 }
1309 }
1310
1311 let mut inner = self.inner.borrow_mut();
1312 if let Some(sum) = sum_pnl {
1313 inner.snapshot_sum_per_position.insert(*position_id, sum);
1314 if let Some(last) = last_pnl {
1315 inner.snapshot_last_per_position.insert(*position_id, last);
1316 }
1317 }
1318 inner
1319 .snapshot_processed_counts
1320 .insert(*position_id, curr_count);
1321 }
1322 }
1323 }
1324 }
1325
1326 fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1327 self.ensure_snapshot_pnls_cached_for(instrument_id);
1329
1330 let cache = self.cache.borrow();
1331 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1332 account
1333 } else {
1334 log::error!(
1335 "Cannot calculate realized PnL: no account registered for {}",
1336 instrument_id.venue
1337 );
1338 return None;
1339 };
1340
1341 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1342 instrument
1343 } else {
1344 log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
1345 return None;
1346 };
1347
1348 let currency = account
1349 .base_currency()
1350 .unwrap_or_else(|| instrument.settlement_currency());
1351
1352 let positions = cache.positions(
1353 None, Some(instrument_id),
1355 None,
1356 None,
1357 None,
1358 );
1359
1360 let snapshot_position_ids = cache.position_snapshot_ids(instrument_id);
1361
1362 let is_netting = positions
1364 .iter()
1365 .any(|p| cache.oms_type(&p.id) == Some(OmsType::Netting));
1366
1367 let mut total_pnl = 0.0;
1368
1369 if is_netting && !snapshot_position_ids.is_empty() {
1370 for position_id in &snapshot_position_ids {
1373 let is_active = positions.iter().any(|p| p.id == *position_id);
1374
1375 if is_active {
1376 let last_pnl = self
1378 .inner
1379 .borrow()
1380 .snapshot_last_per_position
1381 .get(position_id)
1382 .copied();
1383 if let Some(last_pnl) = last_pnl {
1384 let mut pnl = last_pnl.as_f64();
1385
1386 if let Some(base_currency) = account.base_currency()
1387 && let Some(position) = positions.iter().find(|p| p.id == *position_id)
1388 {
1389 let xrate = if let Some(xrate) =
1390 self.calculate_xrate_to_base(instrument, account, position.entry)
1391 {
1392 xrate
1393 } else {
1394 log::error!(
1395 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1396 instrument.settlement_currency(),
1397 base_currency
1398 );
1399 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1400 return Some(Money::new(0.0, currency));
1401 };
1402
1403 let scale = 10f64.powi(currency.precision.into());
1404 pnl = ((pnl * xrate) * scale).round() / scale;
1405 }
1406
1407 total_pnl += pnl;
1408 }
1409 } else {
1410 let sum_pnl = self
1412 .inner
1413 .borrow()
1414 .snapshot_sum_per_position
1415 .get(position_id)
1416 .copied();
1417 if let Some(sum_pnl) = sum_pnl {
1418 let mut pnl = sum_pnl.as_f64();
1419
1420 if let Some(base_currency) = account.base_currency() {
1421 let xrate = cache.get_xrate(
1423 instrument_id.venue,
1424 instrument.settlement_currency(),
1425 base_currency,
1426 PriceType::Mid,
1427 );
1428
1429 if let Some(xrate) = xrate {
1430 let scale = 10f64.powi(currency.precision.into());
1431 pnl = ((pnl * xrate) * scale).round() / scale;
1432 } else {
1433 log::error!(
1434 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1435 instrument.settlement_currency(),
1436 base_currency
1437 );
1438 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1439 return Some(Money::new(0.0, currency));
1440 }
1441 }
1442
1443 total_pnl += pnl;
1444 }
1445 }
1446 }
1447
1448 for position in positions {
1450 if position.instrument_id != *instrument_id {
1451 continue;
1452 }
1453
1454 if let Some(realized_pnl) = position.realized_pnl {
1455 let mut pnl = realized_pnl.as_f64();
1456
1457 if let Some(base_currency) = account.base_currency() {
1458 let xrate = if let Some(xrate) =
1459 self.calculate_xrate_to_base(instrument, account, position.entry)
1460 {
1461 xrate
1462 } else {
1463 log::error!(
1464 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1465 instrument.settlement_currency(),
1466 base_currency
1467 );
1468 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1469 return Some(Money::new(0.0, currency));
1470 };
1471
1472 let scale = 10f64.powi(currency.precision.into());
1473 pnl = ((pnl * xrate) * scale).round() / scale;
1474 }
1475
1476 total_pnl += pnl;
1477 }
1478 }
1479 } else {
1480 for position_id in &snapshot_position_ids {
1483 let sum_pnl = self
1484 .inner
1485 .borrow()
1486 .snapshot_sum_per_position
1487 .get(position_id)
1488 .copied();
1489 if let Some(sum_pnl) = sum_pnl {
1490 let mut pnl = sum_pnl.as_f64();
1491
1492 if let Some(base_currency) = account.base_currency() {
1493 let xrate = cache.get_xrate(
1494 instrument_id.venue,
1495 instrument.settlement_currency(),
1496 base_currency,
1497 PriceType::Mid,
1498 );
1499
1500 if let Some(xrate) = xrate {
1501 let scale = 10f64.powi(currency.precision.into());
1502 pnl = ((pnl * xrate) * scale).round() / scale;
1503 } else {
1504 log::error!(
1505 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1506 instrument.settlement_currency(),
1507 base_currency
1508 );
1509 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1510 return Some(Money::new(0.0, currency));
1511 }
1512 }
1513
1514 total_pnl += pnl;
1515 }
1516 }
1517
1518 for position in positions {
1520 if position.instrument_id != *instrument_id {
1521 continue;
1522 }
1523
1524 if let Some(realized_pnl) = position.realized_pnl {
1525 let mut pnl = realized_pnl.as_f64();
1526
1527 if let Some(base_currency) = account.base_currency() {
1528 let xrate = if let Some(xrate) =
1529 self.calculate_xrate_to_base(instrument, account, position.entry)
1530 {
1531 xrate
1532 } else {
1533 log::error!(
1534 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1535 instrument.settlement_currency(),
1536 base_currency
1537 );
1538 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1539 return Some(Money::new(0.0, currency));
1540 };
1541
1542 let scale = 10f64.powi(currency.precision.into());
1543 pnl = ((pnl * xrate) * scale).round() / scale;
1544 }
1545
1546 total_pnl += pnl;
1547 }
1548 }
1549 }
1550
1551 Some(Money::new(total_pnl, currency))
1552 }
1553
1554 fn get_price(&self, position: &Position) -> Option<Price> {
1555 let cache = self.cache.borrow();
1556 let instrument_id = &position.instrument_id;
1557
1558 if self.config.use_mark_prices
1560 && let Some(mark_price) = cache.mark_price(instrument_id)
1561 {
1562 return Some(mark_price.value);
1563 }
1564
1565 let price_type = match position.side {
1567 PositionSide::Long => PriceType::Bid,
1568 PositionSide::Short => PriceType::Ask,
1569 _ => panic!("invalid `PositionSide`, was {}", position.side),
1570 };
1571
1572 cache
1573 .price(instrument_id, price_type)
1574 .or_else(|| cache.price(instrument_id, PriceType::Last))
1575 .or_else(|| {
1576 self.inner
1577 .borrow()
1578 .bar_close_prices
1579 .get(instrument_id)
1580 .copied()
1581 })
1582 }
1583
1584 fn calculate_xrate_to_base(
1585 &self,
1586 instrument: &InstrumentAny,
1587 account: &AccountAny,
1588 side: OrderSide,
1589 ) -> Option<f64> {
1590 if !self.config.convert_to_account_base_currency {
1591 return Some(1.0); }
1593
1594 match account.base_currency() {
1595 None => Some(1.0), Some(base_currency) => {
1597 let cache = self.cache.borrow();
1598
1599 if self.config.use_mark_xrates {
1600 return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1601 }
1602
1603 let price_type = if side == OrderSide::Buy {
1604 PriceType::Bid
1605 } else {
1606 PriceType::Ask
1607 };
1608
1609 cache.get_xrate(
1610 instrument.id().venue,
1611 instrument.settlement_currency(),
1612 base_currency,
1613 price_type,
1614 )
1615 }
1616 }
1617 }
1618}
1619
1620fn update_quote_tick(
1622 cache: Rc<RefCell<Cache>>,
1623 clock: Rc<RefCell<dyn Clock>>,
1624 inner: Rc<RefCell<PortfolioState>>,
1625 config: PortfolioConfig,
1626 quote: &QuoteTick,
1627) {
1628 update_instrument_id(cache, clock.clone(), inner, config, "e.instrument_id);
1629}
1630
1631fn update_bar(
1632 cache: Rc<RefCell<Cache>>,
1633 clock: Rc<RefCell<dyn Clock>>,
1634 inner: Rc<RefCell<PortfolioState>>,
1635 config: PortfolioConfig,
1636 bar: &Bar,
1637) {
1638 let instrument_id = bar.bar_type.instrument_id();
1639 inner
1640 .borrow_mut()
1641 .bar_close_prices
1642 .insert(instrument_id, bar.close);
1643 update_instrument_id(cache, clock.clone(), inner, config, &instrument_id);
1644}
1645
1646fn update_instrument_id(
1647 cache: Rc<RefCell<Cache>>,
1648 clock: Rc<RefCell<dyn Clock>>,
1649 inner: Rc<RefCell<PortfolioState>>,
1650 config: PortfolioConfig,
1651 instrument_id: &InstrumentId,
1652) {
1653 inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1654
1655 if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1656 return;
1657 }
1658
1659 let result_init;
1660 let mut result_maint = None;
1661
1662 let account = {
1663 let account = {
1664 let cache_ref = cache.borrow();
1665 if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1666 account.clone()
1667 } else {
1668 log::error!(
1669 "Cannot update tick: no account registered for {}",
1670 instrument_id.venue
1671 );
1672 return;
1673 }
1674 };
1675
1676 let mut cache_ref = cache.borrow_mut();
1677 let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1678 instrument.clone()
1679 } else {
1680 log::error!("Cannot update tick: no instrument found for {instrument_id}");
1681 return;
1682 };
1683
1684 let orders_open: Vec<OrderAny> = cache_ref
1686 .orders_open(None, Some(instrument_id), None, None, None)
1687 .iter()
1688 .map(|o| (*o).clone())
1689 .collect();
1690
1691 let positions_open: Vec<Position> = cache_ref
1692 .positions_open(None, Some(instrument_id), None, None, None)
1693 .iter()
1694 .map(|p| (*p).clone())
1695 .collect();
1696
1697 result_init = inner.borrow().accounts.update_orders(
1698 &account,
1699 instrument.clone(),
1700 orders_open.iter().collect(),
1701 clock.borrow().timestamp_ns(),
1702 );
1703
1704 if let AccountAny::Margin(ref margin_account) = account {
1705 result_maint = inner.borrow().accounts.update_positions(
1706 margin_account,
1707 instrument,
1708 positions_open.iter().collect(),
1709 clock.borrow().timestamp_ns(),
1710 );
1711 }
1712
1713 if let Some((ref updated_account, _)) = result_init {
1714 cache_ref.update_account(updated_account.clone()).unwrap();
1715 }
1716 account
1717 };
1718
1719 let mut portfolio_clone = Portfolio {
1720 clock: clock.clone(),
1721 cache,
1722 inner: inner.clone(),
1723 config,
1724 };
1725
1726 let result_unrealized_pnl: Option<Money> =
1727 portfolio_clone.calculate_unrealized_pnl(instrument_id);
1728
1729 if result_init.is_some()
1730 && (matches!(account, AccountAny::Cash(_))
1731 || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1732 {
1733 inner.borrow_mut().pending_calcs.remove(instrument_id);
1734 if inner.borrow().pending_calcs.is_empty() {
1735 inner.borrow_mut().initialized = true;
1736 }
1737 }
1738}
1739
1740fn update_order(
1741 cache: Rc<RefCell<Cache>>,
1742 clock: Rc<RefCell<dyn Clock>>,
1743 inner: Rc<RefCell<PortfolioState>>,
1744 _config: PortfolioConfig,
1745 event: &OrderEventAny,
1746) {
1747 let cache_ref = cache.borrow();
1748 let account_id = match event.account_id() {
1749 Some(account_id) => account_id,
1750 None => {
1751 return; }
1753 };
1754
1755 let account = if let Some(account) = cache_ref.account(&account_id) {
1756 account
1757 } else {
1758 log::error!("Cannot update order: no account registered for {account_id}");
1759 return;
1760 };
1761
1762 match account {
1763 AccountAny::Cash(cash_account) => {
1764 if !cash_account.base.calculate_account_state {
1765 return;
1766 }
1767 }
1768 AccountAny::Margin(margin_account) => {
1769 if !margin_account.base.calculate_account_state {
1770 return;
1771 }
1772 }
1773 }
1774
1775 match event {
1776 OrderEventAny::Accepted(_)
1777 | OrderEventAny::Canceled(_)
1778 | OrderEventAny::Rejected(_)
1779 | OrderEventAny::Updated(_)
1780 | OrderEventAny::Filled(_) => {}
1781 _ => {
1782 return;
1783 }
1784 }
1785
1786 let cache_ref = cache.borrow();
1787 let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1788 order
1789 } else {
1790 log::error!(
1791 "Cannot update order: {} not found in the cache",
1792 event.client_order_id()
1793 );
1794 return; };
1796
1797 if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1798 return; }
1800
1801 let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1802 instrument_id
1803 } else {
1804 log::error!(
1805 "Cannot update order: no instrument found for {}",
1806 event.instrument_id()
1807 );
1808 return;
1809 };
1810
1811 if let OrderEventAny::Filled(order_filled) = event {
1812 let _ = inner.borrow().accounts.update_balances(
1813 account.clone(),
1814 instrument.clone(),
1815 *order_filled,
1816 );
1817
1818 let mut portfolio_clone = Portfolio {
1819 clock: clock.clone(),
1820 cache: cache.clone(),
1821 inner: inner.clone(),
1822 config: PortfolioConfig::default(), };
1824
1825 match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1826 Some(unrealized_pnl) => {
1827 inner
1828 .borrow_mut()
1829 .unrealized_pnls
1830 .insert(event.instrument_id(), unrealized_pnl);
1831 }
1832 None => {
1833 log::error!(
1834 "Failed to calculate unrealized PnL for instrument {}",
1835 event.instrument_id()
1836 );
1837 }
1838 }
1839 }
1840
1841 let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None, None);
1842
1843 let account_state = inner.borrow_mut().accounts.update_orders(
1844 account,
1845 instrument.clone(),
1846 orders_open,
1847 clock.borrow().timestamp_ns(),
1848 );
1849
1850 let mut cache_ref = cache.borrow_mut();
1851
1852 if let Some((updated_account, account_state)) = account_state {
1853 cache_ref.update_account(updated_account).unwrap();
1854 msgbus::publish_account_state(
1855 format!("events.account.{}", account.id()).into(),
1856 &account_state,
1857 );
1858 } else {
1859 log::debug!("Added pending calculation for {}", instrument.id());
1860 inner.borrow_mut().pending_calcs.insert(instrument.id());
1861 }
1862
1863 log::debug!("Updated {event}");
1864}
1865
1866fn update_position(
1867 cache: Rc<RefCell<Cache>>,
1868 clock: Rc<RefCell<dyn Clock>>,
1869 inner: Rc<RefCell<PortfolioState>>,
1870 _config: PortfolioConfig,
1871 event: &PositionEvent,
1872) {
1873 let instrument_id = event.instrument_id();
1874
1875 let positions_open: Vec<Position> = {
1876 let cache_ref = cache.borrow();
1877
1878 cache_ref
1879 .positions_open(None, Some(&instrument_id), None, None, None)
1880 .iter()
1881 .map(|o| (*o).clone())
1882 .collect()
1883 };
1884
1885 log::debug!("position fresh from cache -> {positions_open:?}");
1886
1887 let mut portfolio_clone = Portfolio {
1888 clock: clock.clone(),
1889 cache: cache.clone(),
1890 inner: inner.clone(),
1891 config: PortfolioConfig::default(), };
1893
1894 portfolio_clone.update_net_position(&instrument_id, &positions_open);
1895
1896 if let Some(calculated_unrealized_pnl) =
1897 portfolio_clone.calculate_unrealized_pnl(&instrument_id)
1898 {
1899 inner
1900 .borrow_mut()
1901 .unrealized_pnls
1902 .insert(event.instrument_id(), calculated_unrealized_pnl);
1903 } else {
1904 log::warn!(
1905 "Failed to calculate unrealized PnL for {}, marking as pending",
1906 event.instrument_id()
1907 );
1908 inner
1909 .borrow_mut()
1910 .pending_calcs
1911 .insert(event.instrument_id());
1912 }
1913
1914 if let Some(calculated_realized_pnl) = portfolio_clone.calculate_realized_pnl(&instrument_id) {
1915 inner
1916 .borrow_mut()
1917 .realized_pnls
1918 .insert(event.instrument_id(), calculated_realized_pnl);
1919 } else {
1920 log::warn!(
1921 "Failed to calculate realized PnL for {}, marking as pending",
1922 event.instrument_id()
1923 );
1924 inner
1925 .borrow_mut()
1926 .pending_calcs
1927 .insert(event.instrument_id());
1928 }
1929
1930 let cache_ref = cache.borrow();
1931 let account = cache_ref.account(&event.account_id());
1932
1933 if let Some(AccountAny::Margin(margin_account)) = account {
1934 if !margin_account.calculate_account_state {
1935 return; }
1937
1938 let cache_ref = cache.borrow();
1939 let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1940 instrument
1941 } else {
1942 log::error!("Cannot update position: no instrument found for {instrument_id}");
1943 return;
1944 };
1945
1946 let result = inner.borrow_mut().accounts.update_positions(
1947 margin_account,
1948 instrument.clone(),
1949 positions_open.iter().collect(),
1950 clock.borrow().timestamp_ns(),
1951 );
1952 let mut cache_ref = cache.borrow_mut();
1953 if let Some((margin_account, _)) = result {
1954 cache_ref
1955 .update_account(AccountAny::Margin(margin_account))
1956 .unwrap();
1957 }
1958 } else if account.is_none() {
1959 log::error!(
1960 "Cannot update position: no account registered for {}",
1961 event.account_id()
1962 );
1963 }
1964}
1965
1966fn update_account(
1967 cache: Rc<RefCell<Cache>>,
1968 inner: Rc<RefCell<PortfolioState>>,
1969 event: &AccountState,
1970) {
1971 let mut cache_ref = cache.borrow_mut();
1972
1973 if let Some(existing) = cache_ref.account(&event.account_id) {
1974 let mut account = existing.clone();
1975 if let Err(e) = account.apply(event.clone()) {
1976 log::error!("Failed to apply account state: {e}");
1977 return;
1978 }
1979
1980 if let Err(e) = cache_ref.update_account(account.clone()) {
1981 log::error!("Failed to update account: {e}");
1982 return;
1983 }
1984 } else {
1985 let account = match AccountAny::from_events(vec![event.clone()]) {
1986 Ok(account) => account,
1987 Err(e) => {
1988 log::error!("Failed to create account: {e}");
1989 return;
1990 }
1991 };
1992
1993 if let Err(e) = cache_ref.add_account(account) {
1994 log::error!("Failed to add account: {e}");
1995 return;
1996 }
1997 }
1998
1999 let mut inner_ref = inner.borrow_mut();
2001 let should_log = if inner_ref.min_account_state_logging_interval_ns > 0 {
2002 let current_ts = event.ts_init.as_u64();
2003 let last_ts = inner_ref
2004 .last_account_state_log_ts
2005 .get(&event.account_id)
2006 .copied()
2007 .unwrap_or(0);
2008
2009 if last_ts == 0 || (current_ts - last_ts) >= inner_ref.min_account_state_logging_interval_ns
2010 {
2011 inner_ref
2012 .last_account_state_log_ts
2013 .insert(event.account_id, current_ts);
2014 true
2015 } else {
2016 false
2017 }
2018 } else {
2019 true };
2021
2022 if should_log {
2023 log::info!("Updated {event}");
2024 }
2025}