1use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_analysis::analyzer::PortfolioAnalyzer;
22use nautilus_common::{
23 cache::Cache,
24 clock::Clock,
25 enums::LogColor,
26 msgbus::{
27 self,
28 handler::{ShareableMessageHandler, TypedMessageHandler},
29 },
30};
31use nautilus_core::{WeakCell, datetime::NANOSECONDS_IN_MILLISECOND};
32use nautilus_model::{
33 accounts::AccountAny,
34 data::{Bar, MarkPriceUpdate, QuoteTick},
35 enums::{OmsType, OrderSide, OrderType, PositionSide, PriceType},
36 events::{AccountState, OrderEventAny, position::PositionEvent},
37 identifiers::{AccountId, InstrumentId, PositionId, Venue},
38 instruments::{Instrument, InstrumentAny},
39 orders::{Order, OrderAny},
40 position::Position,
41 types::{Currency, Money, Price},
42};
43use rust_decimal::{Decimal, prelude::FromPrimitive};
44
45use crate::{config::PortfolioConfig, manager::AccountsManager};
46
47struct PortfolioState {
48 accounts: AccountsManager,
49 analyzer: PortfolioAnalyzer,
50 unrealized_pnls: AHashMap<InstrumentId, Money>,
51 realized_pnls: AHashMap<InstrumentId, Money>,
52 snapshot_sum_per_position: AHashMap<PositionId, Money>,
53 snapshot_last_per_position: AHashMap<PositionId, Money>,
54 snapshot_processed_counts: AHashMap<PositionId, usize>,
55 net_positions: AHashMap<InstrumentId, Decimal>,
56 pending_calcs: AHashSet<InstrumentId>,
57 bar_close_prices: AHashMap<InstrumentId, Price>,
58 initialized: bool,
59 last_account_state_log_ts: AHashMap<AccountId, u64>,
60 min_account_state_logging_interval_ns: u64,
61}
62
63impl PortfolioState {
64 fn new(
65 clock: Rc<RefCell<dyn Clock>>,
66 cache: Rc<RefCell<Cache>>,
67 config: &PortfolioConfig,
68 ) -> Self {
69 let min_account_state_logging_interval_ns = config
70 .min_account_state_logging_interval_ms
71 .map_or(0, |ms| ms * NANOSECONDS_IN_MILLISECOND);
72
73 Self {
74 accounts: AccountsManager::new(clock, cache),
75 analyzer: PortfolioAnalyzer::default(),
76 unrealized_pnls: AHashMap::new(),
77 realized_pnls: AHashMap::new(),
78 snapshot_sum_per_position: AHashMap::new(),
79 snapshot_last_per_position: AHashMap::new(),
80 snapshot_processed_counts: AHashMap::new(),
81 net_positions: AHashMap::new(),
82 pending_calcs: AHashSet::new(),
83 bar_close_prices: AHashMap::new(),
84 initialized: false,
85 last_account_state_log_ts: AHashMap::new(),
86 min_account_state_logging_interval_ns,
87 }
88 }
89
90 fn reset(&mut self) {
91 log::debug!("RESETTING");
92 self.net_positions.clear();
93 self.unrealized_pnls.clear();
94 self.realized_pnls.clear();
95 self.snapshot_sum_per_position.clear();
96 self.snapshot_last_per_position.clear();
97 self.snapshot_processed_counts.clear();
98 self.pending_calcs.clear();
99 self.last_account_state_log_ts.clear();
100 self.analyzer.reset();
101 log::debug!("READY");
102 }
103}
104
105pub struct Portfolio {
106 pub(crate) clock: Rc<RefCell<dyn Clock>>,
107 pub(crate) cache: Rc<RefCell<Cache>>,
108 inner: Rc<RefCell<PortfolioState>>,
109 config: PortfolioConfig,
110}
111
112impl Debug for Portfolio {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_struct(stringify!(Portfolio)).finish()
115 }
116}
117
118impl Portfolio {
119 pub fn new(
120 cache: Rc<RefCell<Cache>>,
121 clock: Rc<RefCell<dyn Clock>>,
122 config: Option<PortfolioConfig>,
123 ) -> Self {
124 let config = config.unwrap_or_default();
125 let inner = Rc::new(RefCell::new(PortfolioState::new(
126 clock.clone(),
127 cache.clone(),
128 &config,
129 )));
130
131 Self::register_message_handlers(
132 cache.clone(),
133 clock.clone(),
134 inner.clone(),
135 config.clone(),
136 );
137
138 Self {
139 clock,
140 cache,
141 inner,
142 config,
143 }
144 }
145
146 #[must_use]
151 pub fn clone_shallow(&self) -> Self {
152 Self {
153 clock: self.clock.clone(),
154 cache: self.cache.clone(),
155 inner: self.inner.clone(),
156 config: self.config.clone(),
157 }
158 }
159
160 fn register_message_handlers(
161 cache: Rc<RefCell<Cache>>,
162 clock: Rc<RefCell<dyn Clock>>,
163 inner: Rc<RefCell<PortfolioState>>,
164 config: PortfolioConfig,
165 ) {
166 let inner_weak = WeakCell::from(Rc::downgrade(&inner));
167
168 let update_account_handler = {
169 let cache = cache.clone();
170 let inner = inner_weak.clone();
171 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
172 move |event: &AccountState| {
173 if let Some(inner_rc) = inner.upgrade() {
174 update_account(cache.clone(), inner_rc.into(), event);
175 }
176 },
177 )))
178 };
179
180 let update_position_handler = {
181 let cache = cache.clone();
182 let clock = clock.clone();
183 let inner = inner_weak.clone();
184 let config = config.clone();
185 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
186 move |event: &PositionEvent| {
187 if let Some(inner_rc) = inner.upgrade() {
188 update_position(
189 cache.clone(),
190 clock.clone(),
191 inner_rc.into(),
192 config.clone(),
193 event,
194 );
195 }
196 },
197 )))
198 };
199
200 let update_quote_handler = {
201 let cache = cache.clone();
202 let clock = clock.clone();
203 let inner = inner_weak.clone();
204 let config = config.clone();
205 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
206 move |quote: &QuoteTick| {
207 if let Some(inner_rc) = inner.upgrade() {
208 update_quote_tick(
209 cache.clone(),
210 clock.clone(),
211 inner_rc.into(),
212 config.clone(),
213 quote,
214 );
215 }
216 },
217 )))
218 };
219
220 let update_bar_handler = {
221 let cache = cache.clone();
222 let clock = clock.clone();
223 let inner = inner_weak.clone();
224 let config = config.clone();
225 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
226 if let Some(inner_rc) = inner.upgrade() {
227 update_bar(
228 cache.clone(),
229 clock.clone(),
230 inner_rc.into(),
231 config.clone(),
232 bar,
233 );
234 }
235 })))
236 };
237
238 let update_mark_price_handler = {
239 let cache = cache.clone();
240 let clock = clock.clone();
241 let inner = inner_weak.clone();
242 let config = config.clone();
243 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
244 move |mark_price: &MarkPriceUpdate| {
245 if let Some(inner_rc) = inner.upgrade() {
246 update_instrument_id(
247 cache.clone(),
248 clock.clone(),
249 inner_rc.into(),
250 config.clone(),
251 &mark_price.instrument_id,
252 );
253 }
254 },
255 )))
256 };
257
258 let update_order_handler = {
259 let cache = cache;
260 let clock = clock.clone();
261 let inner = inner_weak;
262 let config = config.clone();
263 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
264 move |event: &OrderEventAny| {
265 if let Some(inner_rc) = inner.upgrade() {
266 update_order(
267 cache.clone(),
268 clock.clone(),
269 inner_rc.into(),
270 config.clone(),
271 event,
272 );
273 }
274 },
275 )))
276 };
277
278 msgbus::register(
279 "Portfolio.update_account".into(),
280 update_account_handler.clone(),
281 );
282
283 msgbus::subscribe("data.quotes.*".into(), update_quote_handler, Some(10));
284 if config.bar_updates {
285 msgbus::subscribe("data.bars.*EXTERNAL".into(), update_bar_handler, Some(10));
286 }
287 if config.use_mark_prices {
288 msgbus::subscribe(
289 "data.mark_prices.*".into(),
290 update_mark_price_handler,
291 Some(10),
292 );
293 }
294 msgbus::subscribe("events.order.*".into(), update_order_handler, Some(10));
295 msgbus::subscribe(
296 "events.position.*".into(),
297 update_position_handler,
298 Some(10),
299 );
300 msgbus::subscribe("events.account.*".into(), update_account_handler, Some(10));
301 }
302
303 pub fn reset(&mut self) {
304 log::debug!("RESETTING");
305 self.inner.borrow_mut().reset();
306 log::debug!("READY");
307 }
308
309 #[must_use]
311 pub fn cache(&self) -> &Rc<RefCell<Cache>> {
312 &self.cache
313 }
314
315 #[must_use]
317 pub fn is_initialized(&self) -> bool {
318 self.inner.borrow().initialized
319 }
320
321 #[must_use]
325 pub fn balances_locked(&self, venue: &Venue) -> AHashMap<Currency, Money> {
326 self.cache.borrow().account_for_venue(venue).map_or_else(
327 || {
328 log::error!("Cannot get balances locked: no account generated for {venue}");
329 AHashMap::new()
330 },
331 AccountAny::balances_locked,
332 )
333 }
334
335 #[must_use]
339 pub fn margins_init(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
340 self.cache.borrow().account_for_venue(venue).map_or_else(
341 || {
342 log::error!(
343 "Cannot get initial (order) margins: no account registered for {venue}"
344 );
345 AHashMap::new()
346 },
347 |account| match account {
348 AccountAny::Margin(margin_account) => margin_account.initial_margins(),
349 AccountAny::Cash(_) => {
350 log::warn!("Initial margins not applicable for cash account");
351 AHashMap::new()
352 }
353 },
354 )
355 }
356
357 #[must_use]
361 pub fn margins_maint(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
362 self.cache.borrow().account_for_venue(venue).map_or_else(
363 || {
364 log::error!(
365 "Cannot get maintenance (position) margins: no account registered for {venue}"
366 );
367 AHashMap::new()
368 },
369 |account| match account {
370 AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
371 AccountAny::Cash(_) => {
372 log::warn!("Maintenance margins not applicable for cash account");
373 AHashMap::new()
374 }
375 },
376 )
377 }
378
379 #[must_use]
383 pub fn unrealized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
384 let instrument_ids = {
385 let cache = self.cache.borrow();
386 let positions = cache.positions(Some(venue), None, None, None);
387
388 if positions.is_empty() {
389 return AHashMap::new(); }
391
392 let instrument_ids: AHashSet<InstrumentId> =
393 positions.iter().map(|p| p.instrument_id).collect();
394
395 instrument_ids
396 };
397
398 let mut unrealized_pnls: AHashMap<Currency, f64> = AHashMap::new();
399
400 for instrument_id in instrument_ids {
401 if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
402 *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
404 continue;
405 }
406
407 match self.calculate_unrealized_pnl(&instrument_id) {
409 Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
410 None => continue,
411 }
412 }
413
414 unrealized_pnls
415 .into_iter()
416 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
417 .collect()
418 }
419
420 #[must_use]
424 pub fn realized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
425 let instrument_ids = {
426 let cache = self.cache.borrow();
427 let positions = cache.positions(Some(venue), None, None, None);
428
429 if positions.is_empty() {
430 return AHashMap::new(); }
432
433 let instrument_ids: AHashSet<InstrumentId> =
434 positions.iter().map(|p| p.instrument_id).collect();
435
436 instrument_ids
437 };
438
439 let mut realized_pnls: AHashMap<Currency, f64> = AHashMap::new();
440
441 for instrument_id in instrument_ids {
442 if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
443 *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
445 continue;
446 }
447
448 match self.calculate_realized_pnl(&instrument_id) {
450 Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
451 None => continue,
452 }
453 }
454
455 realized_pnls
456 .into_iter()
457 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
458 .collect()
459 }
460
461 #[must_use]
462 pub fn net_exposures(&self, venue: &Venue) -> Option<AHashMap<Currency, Money>> {
463 let cache = self.cache.borrow();
464 let account = if let Some(account) = cache.account_for_venue(venue) {
465 account
466 } else {
467 log::error!("Cannot calculate net exposures: no account registered for {venue}");
468 return None; };
470
471 let positions_open = cache.positions_open(Some(venue), None, None, None);
472 if positions_open.is_empty() {
473 return Some(AHashMap::new()); }
475
476 let mut net_exposures: AHashMap<Currency, f64> = AHashMap::new();
477
478 for position in positions_open {
479 let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
480 instrument
481 } else {
482 log::error!(
483 "Cannot calculate net exposures: no instrument for {}",
484 position.instrument_id
485 );
486 return None; };
488
489 if position.side == PositionSide::Flat {
490 log::error!(
491 "Cannot calculate net exposures: position is flat for {}",
492 position.instrument_id
493 );
494 continue; }
496
497 let price = self.get_price(position)?;
498 let xrate = if let Some(xrate) =
499 self.calculate_xrate_to_base(instrument, account, position.entry)
500 {
501 xrate
502 } else {
503 log::error!(
504 "Cannot calculate net exposures: insufficient data for {}/{:?}",
506 instrument.settlement_currency(),
507 account.base_currency()
508 );
509 return None; };
511
512 let settlement_currency = account
513 .base_currency()
514 .unwrap_or_else(|| instrument.settlement_currency());
515
516 let net_exposure = instrument
517 .calculate_notional_value(position.quantity, price, None)
518 .as_f64()
519 * xrate;
520
521 let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
522 .round()
523 / 10f64.powi(settlement_currency.precision.into());
524
525 *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
526 }
527
528 Some(
529 net_exposures
530 .into_iter()
531 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
532 .collect(),
533 )
534 }
535
536 #[must_use]
537 pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
538 if let Some(pnl) = self
539 .inner
540 .borrow()
541 .unrealized_pnls
542 .get(instrument_id)
543 .copied()
544 {
545 return Some(pnl);
546 }
547
548 let pnl = self.calculate_unrealized_pnl(instrument_id)?;
549 self.inner
550 .borrow_mut()
551 .unrealized_pnls
552 .insert(*instrument_id, pnl);
553 Some(pnl)
554 }
555
556 #[must_use]
557 pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
558 if let Some(pnl) = self
559 .inner
560 .borrow()
561 .realized_pnls
562 .get(instrument_id)
563 .copied()
564 {
565 return Some(pnl);
566 }
567
568 let pnl = self.calculate_realized_pnl(instrument_id)?;
569 self.inner
570 .borrow_mut()
571 .realized_pnls
572 .insert(*instrument_id, pnl);
573 Some(pnl)
574 }
575
576 #[must_use]
580 pub fn total_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
581 let realized = self.realized_pnl(instrument_id)?;
582 let unrealized = self.unrealized_pnl(instrument_id)?;
583
584 if realized.currency != unrealized.currency {
585 log::error!(
586 "Cannot calculate total PnL: currency mismatch {} vs {}",
587 realized.currency,
588 unrealized.currency
589 );
590 return None;
591 }
592
593 Some(Money::new(
594 realized.as_f64() + unrealized.as_f64(),
595 realized.currency,
596 ))
597 }
598
599 #[must_use]
603 pub fn total_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
604 let realized_pnls = self.realized_pnls(venue);
605 let unrealized_pnls = self.unrealized_pnls(venue);
606
607 let mut total_pnls: AHashMap<Currency, Money> = AHashMap::new();
608
609 for (currency, realized) in realized_pnls {
611 total_pnls.insert(currency, realized);
612 }
613
614 for (currency, unrealized) in unrealized_pnls {
616 match total_pnls.get_mut(¤cy) {
617 Some(total) => {
618 *total = Money::new(total.as_f64() + unrealized.as_f64(), currency);
619 }
620 None => {
621 total_pnls.insert(currency, unrealized);
622 }
623 }
624 }
625
626 total_pnls
627 }
628
629 #[must_use]
630 pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
631 let cache = self.cache.borrow();
632 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
633 account
634 } else {
635 log::error!(
636 "Cannot calculate net exposure: no account registered for {}",
637 instrument_id.venue
638 );
639 return None;
640 };
641
642 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
643 instrument
644 } else {
645 log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
646 return None;
647 };
648
649 let positions_open = cache.positions_open(
650 None, Some(instrument_id),
652 None,
653 None,
654 );
655
656 if positions_open.is_empty() {
657 return Some(Money::new(0.0, instrument.settlement_currency()));
658 }
659
660 let mut net_exposure = 0.0;
661
662 for position in positions_open {
663 let price = self.get_price(position)?;
664 let xrate = if let Some(xrate) =
665 self.calculate_xrate_to_base(instrument, account, position.entry)
666 {
667 xrate
668 } else {
669 log::error!(
670 "Cannot calculate net exposures: insufficient data for {}/{:?}",
672 instrument.settlement_currency(),
673 account.base_currency()
674 );
675 return None; };
677
678 let notional_value =
679 instrument.calculate_notional_value(position.quantity, price, None);
680 net_exposure += notional_value.as_f64() * xrate;
681 }
682
683 let settlement_currency = account
684 .base_currency()
685 .unwrap_or_else(|| instrument.settlement_currency());
686
687 Some(Money::new(net_exposure, settlement_currency))
688 }
689
690 #[must_use]
691 pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
692 self.inner
693 .borrow()
694 .net_positions
695 .get(instrument_id)
696 .copied()
697 .unwrap_or(Decimal::ZERO)
698 }
699
700 #[must_use]
701 pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
702 self.inner
703 .borrow()
704 .net_positions
705 .get(instrument_id)
706 .copied()
707 .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
708 }
709
710 #[must_use]
711 pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
712 self.inner
713 .borrow()
714 .net_positions
715 .get(instrument_id)
716 .copied()
717 .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
718 }
719
720 #[must_use]
721 pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
722 self.inner
723 .borrow()
724 .net_positions
725 .get(instrument_id)
726 .copied()
727 .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
728 }
729
730 #[must_use]
731 pub fn is_completely_flat(&self) -> bool {
732 for net_position in self.inner.borrow().net_positions.values() {
733 if *net_position != Decimal::ZERO {
734 return false;
735 }
736 }
737 true
738 }
739
740 pub fn initialize_orders(&mut self) {
746 let mut initialized = true;
747 let orders_and_instruments = {
748 let cache = self.cache.borrow();
749 let all_orders_open = cache.orders_open(None, None, None, None);
750
751 let mut instruments_with_orders = Vec::new();
752 let mut instruments = AHashSet::new();
753
754 for order in &all_orders_open {
755 instruments.insert(order.instrument_id());
756 }
757
758 for instrument_id in instruments {
759 if let Some(instrument) = cache.instrument(&instrument_id) {
760 let orders = cache
761 .orders_open(None, Some(&instrument_id), None, None)
762 .into_iter()
763 .cloned()
764 .collect::<Vec<OrderAny>>();
765 instruments_with_orders.push((instrument.clone(), orders));
766 } else {
767 log::error!(
768 "Cannot update initial (order) margin: no instrument found for {instrument_id}"
769 );
770 initialized = false;
771 break;
772 }
773 }
774 instruments_with_orders
775 };
776
777 for (instrument, orders_open) in &orders_and_instruments {
778 let mut cache = self.cache.borrow_mut();
779 let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
780 account
781 } else {
782 log::error!(
783 "Cannot update initial (order) margin: no account registered for {}",
784 instrument.id().venue
785 );
786 initialized = false;
787 break;
788 };
789
790 let result = self.inner.borrow_mut().accounts.update_orders(
791 account,
792 instrument.clone(),
793 orders_open.iter().collect(),
794 self.clock.borrow().timestamp_ns(),
795 );
796
797 match result {
798 Some((updated_account, _)) => {
799 cache.update_account(updated_account).unwrap();
800 }
801 None => {
802 initialized = false;
803 }
804 }
805 }
806
807 let total_orders = orders_and_instruments
808 .into_iter()
809 .map(|(_, orders)| orders.len())
810 .sum::<usize>();
811
812 log::info!(
813 color = if total_orders > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
814 "Initialized {} open order{}",
815 total_orders,
816 if total_orders == 1 { "" } else { "s" }
817 );
818
819 self.inner.borrow_mut().initialized = initialized;
820 }
821
822 pub fn initialize_positions(&mut self) {
828 self.inner.borrow_mut().unrealized_pnls.clear();
829 self.inner.borrow_mut().realized_pnls.clear();
830 let all_positions_open: Vec<Position>;
831 let mut instruments = AHashSet::new();
832 {
833 let cache = self.cache.borrow();
834 all_positions_open = cache
835 .positions_open(None, None, None, None)
836 .into_iter()
837 .cloned()
838 .collect();
839 for position in &all_positions_open {
840 instruments.insert(position.instrument_id);
841 }
842 }
843
844 let mut initialized = true;
845
846 for instrument_id in instruments {
847 let positions_open: Vec<Position> = {
848 let cache = self.cache.borrow();
849 cache
850 .positions_open(None, Some(&instrument_id), None, None)
851 .into_iter()
852 .cloned()
853 .collect()
854 };
855
856 self.update_net_position(&instrument_id, positions_open);
857
858 if let Some(calculated_unrealized_pnl) = self.calculate_unrealized_pnl(&instrument_id) {
859 self.inner
860 .borrow_mut()
861 .unrealized_pnls
862 .insert(instrument_id, calculated_unrealized_pnl);
863 } else {
864 log::warn!(
865 "Failed to calculate unrealized PnL for {instrument_id}, marking as pending"
866 );
867 self.inner.borrow_mut().pending_calcs.insert(instrument_id);
868 }
869
870 if let Some(calculated_realized_pnl) = self.calculate_realized_pnl(&instrument_id) {
871 self.inner
872 .borrow_mut()
873 .realized_pnls
874 .insert(instrument_id, calculated_realized_pnl);
875 } else {
876 log::warn!(
877 "Failed to calculate realized PnL for {instrument_id}, marking as pending"
878 );
879 self.inner.borrow_mut().pending_calcs.insert(instrument_id);
880 }
881
882 let cache = self.cache.borrow();
883 let Some(account) = cache.account_for_venue(&instrument_id.venue).cloned() else {
884 log::error!(
885 "Cannot update maintenance (position) margin: no account registered for {}",
886 instrument_id.venue
887 );
888 initialized = false;
889 break;
890 };
891
892 let account = match account {
893 AccountAny::Cash(_) => continue,
894 AccountAny::Margin(margin_account) => margin_account,
895 };
896
897 let Some(instrument) = cache.instrument(&instrument_id).cloned() else {
898 log::error!(
899 "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
900 );
901 initialized = false;
902 break;
903 };
904 let positions: Vec<Position> = cache
905 .positions_open(None, Some(&instrument_id), None, None)
906 .into_iter()
907 .cloned()
908 .collect();
909 drop(cache);
910
911 let result = self.inner.borrow_mut().accounts.update_positions(
912 &account,
913 instrument,
914 positions.iter().collect(),
915 self.clock.borrow().timestamp_ns(),
916 );
917
918 match result {
919 Some((updated_account, _)) => {
920 self.cache
921 .borrow_mut()
922 .update_account(AccountAny::Margin(updated_account))
923 .unwrap();
924 }
925 None => {
926 initialized = false;
927 }
928 }
929 }
930
931 let open_count = all_positions_open.len();
932 self.inner.borrow_mut().initialized = initialized;
933 log::info!(
934 color = if open_count > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
935 "Initialized {} open position{}",
936 open_count,
937 if open_count == 1 { "" } else { "s" }
938 );
939 }
940
941 pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
945 update_quote_tick(
946 self.cache.clone(),
947 self.clock.clone(),
948 self.inner.clone(),
949 self.config.clone(),
950 quote,
951 );
952 }
953
954 pub fn update_bar(&mut self, bar: &Bar) {
958 update_bar(
959 self.cache.clone(),
960 self.clock.clone(),
961 self.inner.clone(),
962 self.config.clone(),
963 bar,
964 );
965 }
966
967 pub fn update_account(&mut self, event: &AccountState) {
969 update_account(self.cache.clone(), self.inner.clone(), event);
970 }
971
972 pub fn update_order(&mut self, event: &OrderEventAny) {
976 update_order(
977 self.cache.clone(),
978 self.clock.clone(),
979 self.inner.clone(),
980 self.config.clone(),
981 event,
982 );
983 }
984
985 pub fn update_position(&mut self, event: &PositionEvent) {
989 update_position(
990 self.cache.clone(),
991 self.clock.clone(),
992 self.inner.clone(),
993 self.config.clone(),
994 event,
995 );
996 }
997
998 fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
999 let mut net_position = Decimal::ZERO;
1000
1001 for open_position in positions_open {
1002 log::debug!("open_position: {open_position}");
1003 net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
1004 }
1005
1006 let existing_position = self.net_position(instrument_id);
1007 if existing_position != net_position {
1008 self.inner
1009 .borrow_mut()
1010 .net_positions
1011 .insert(*instrument_id, net_position);
1012 log::info!("{instrument_id} net_position={net_position}");
1013 }
1014 }
1015
1016 fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1017 let cache = self.cache.borrow();
1018 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1019 account
1020 } else {
1021 log::error!(
1022 "Cannot calculate unrealized PnL: no account registered for {}",
1023 instrument_id.venue
1024 );
1025 return None;
1026 };
1027
1028 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1029 instrument
1030 } else {
1031 log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
1032 return None;
1033 };
1034
1035 let currency = account
1036 .base_currency()
1037 .unwrap_or_else(|| instrument.settlement_currency());
1038
1039 let positions_open = cache.positions_open(
1040 None, Some(instrument_id),
1042 None,
1043 None,
1044 );
1045
1046 if positions_open.is_empty() {
1047 return Some(Money::new(0.0, currency));
1048 }
1049
1050 let mut total_pnl = 0.0;
1051
1052 for position in positions_open {
1053 if position.instrument_id != *instrument_id {
1054 continue; }
1056
1057 if position.side == PositionSide::Flat {
1058 continue; }
1060
1061 let price = if let Some(price) = self.get_price(position) {
1062 price
1063 } else {
1064 log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
1065 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1066 return None; };
1068
1069 let mut pnl = position.unrealized_pnl(price).as_f64();
1070
1071 if let Some(base_currency) = account.base_currency() {
1072 let xrate = if let Some(xrate) =
1073 self.calculate_xrate_to_base(instrument, account, position.entry)
1074 {
1075 xrate
1076 } else {
1077 log::error!(
1078 "Cannot calculate unrealized PnL: insufficient data for {}/{}",
1080 instrument.settlement_currency(),
1081 base_currency
1082 );
1083 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1084 return None; };
1086
1087 let scale = 10f64.powi(currency.precision.into());
1088 pnl = ((pnl * xrate) * scale).round() / scale;
1089 }
1090
1091 total_pnl += pnl;
1092 }
1093
1094 Some(Money::new(total_pnl, currency))
1095 }
1096
1097 fn ensure_snapshot_pnls_cached_for(&mut self, instrument_id: &InstrumentId) {
1098 let snapshot_position_ids = self.cache.borrow().position_snapshot_ids(instrument_id);
1104
1105 if snapshot_position_ids.is_empty() {
1106 return; }
1108
1109 let mut rebuild = false;
1110
1111 for position_id in &snapshot_position_ids {
1113 let position_snapshots = self.cache.borrow().position_snapshot_bytes(position_id);
1114 let curr_count = position_snapshots.map_or(0, |s| {
1115 s.split(|&b| b == b'{').count() - 1
1117 });
1118 let prev_count = self
1119 .inner
1120 .borrow()
1121 .snapshot_processed_counts
1122 .get(position_id)
1123 .copied()
1124 .unwrap_or(0);
1125
1126 if prev_count > curr_count {
1127 rebuild = true;
1128 break;
1129 }
1130 }
1131
1132 if rebuild {
1133 for position_id in &snapshot_position_ids {
1135 if let Some(position_snapshots) =
1136 self.cache.borrow().position_snapshot_bytes(position_id)
1137 {
1138 let mut sum_pnl: Option<Money> = None;
1139 let mut last_pnl: Option<Money> = None;
1140
1141 let mut start = 0;
1143 let mut depth = 0;
1144 let mut in_string = false;
1145 let mut escape_next = false;
1146
1147 for (i, &byte) in position_snapshots.iter().enumerate() {
1148 if escape_next {
1149 escape_next = false;
1150 continue;
1151 }
1152
1153 if byte == b'\\' && in_string {
1154 escape_next = true;
1155 continue;
1156 }
1157
1158 if byte == b'"' && !escape_next {
1159 in_string = !in_string;
1160 }
1161
1162 if !in_string {
1163 if byte == b'{' {
1164 if depth == 0 {
1165 start = i;
1166 }
1167 depth += 1;
1168 } else if byte == b'}' {
1169 depth -= 1;
1170 if depth == 0
1171 && let Ok(snapshot) = serde_json::from_slice::<Position>(
1172 &position_snapshots[start..=i],
1173 )
1174 && let Some(realized_pnl) = snapshot.realized_pnl
1175 {
1176 if let Some(ref mut sum) = sum_pnl {
1177 if sum.currency == realized_pnl.currency {
1178 *sum = Money::new(
1179 sum.as_f64() + realized_pnl.as_f64(),
1180 sum.currency,
1181 );
1182 }
1183 } else {
1184 sum_pnl = Some(realized_pnl);
1185 }
1186 last_pnl = Some(realized_pnl);
1187 }
1188 }
1189 }
1190 }
1191
1192 let mut inner = self.inner.borrow_mut();
1193 if let Some(sum) = sum_pnl {
1194 inner.snapshot_sum_per_position.insert(*position_id, sum);
1195 if let Some(last) = last_pnl {
1196 inner.snapshot_last_per_position.insert(*position_id, last);
1197 }
1198 } else {
1199 inner.snapshot_sum_per_position.remove(position_id);
1200 inner.snapshot_last_per_position.remove(position_id);
1201 }
1202
1203 let snapshot_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1204 inner
1205 .snapshot_processed_counts
1206 .insert(*position_id, snapshot_count);
1207 }
1208 }
1209 } else {
1210 for position_id in &snapshot_position_ids {
1212 if let Some(position_snapshots) =
1213 self.cache.borrow().position_snapshot_bytes(position_id)
1214 {
1215 let curr_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1216 let prev_count = self
1217 .inner
1218 .borrow()
1219 .snapshot_processed_counts
1220 .get(position_id)
1221 .copied()
1222 .unwrap_or(0);
1223
1224 if prev_count >= curr_count {
1225 continue;
1226 }
1227
1228 let mut sum_pnl = self
1229 .inner
1230 .borrow()
1231 .snapshot_sum_per_position
1232 .get(position_id)
1233 .copied();
1234 let mut last_pnl = self
1235 .inner
1236 .borrow()
1237 .snapshot_last_per_position
1238 .get(position_id)
1239 .copied();
1240
1241 let mut start = 0;
1243 let mut depth = 0;
1244 let mut in_string = false;
1245 let mut escape_next = false;
1246 let mut snapshot_index = 0;
1247
1248 for (i, &byte) in position_snapshots.iter().enumerate() {
1249 if escape_next {
1250 escape_next = false;
1251 continue;
1252 }
1253
1254 if byte == b'\\' && in_string {
1255 escape_next = true;
1256 continue;
1257 }
1258
1259 if byte == b'"' && !escape_next {
1260 in_string = !in_string;
1261 }
1262
1263 if !in_string {
1264 if byte == b'{' {
1265 if depth == 0 {
1266 start = i;
1267 }
1268 depth += 1;
1269 } else if byte == b'}' {
1270 depth -= 1;
1271 if depth == 0 {
1272 snapshot_index += 1;
1273 if snapshot_index > prev_count
1275 && let Ok(snapshot) = serde_json::from_slice::<Position>(
1276 &position_snapshots[start..=i],
1277 )
1278 && let Some(realized_pnl) = snapshot.realized_pnl
1279 {
1280 if let Some(ref mut sum) = sum_pnl {
1281 if sum.currency == realized_pnl.currency {
1282 *sum = Money::new(
1283 sum.as_f64() + realized_pnl.as_f64(),
1284 sum.currency,
1285 );
1286 }
1287 } else {
1288 sum_pnl = Some(realized_pnl);
1289 }
1290 last_pnl = Some(realized_pnl);
1291 }
1292 }
1293 }
1294 }
1295 }
1296
1297 let mut inner = self.inner.borrow_mut();
1298 if let Some(sum) = sum_pnl {
1299 inner.snapshot_sum_per_position.insert(*position_id, sum);
1300 if let Some(last) = last_pnl {
1301 inner.snapshot_last_per_position.insert(*position_id, last);
1302 }
1303 }
1304 inner
1305 .snapshot_processed_counts
1306 .insert(*position_id, curr_count);
1307 }
1308 }
1309 }
1310 }
1311
1312 fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1313 self.ensure_snapshot_pnls_cached_for(instrument_id);
1315
1316 let cache = self.cache.borrow();
1317 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1318 account
1319 } else {
1320 log::error!(
1321 "Cannot calculate realized PnL: no account registered for {}",
1322 instrument_id.venue
1323 );
1324 return None;
1325 };
1326
1327 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1328 instrument
1329 } else {
1330 log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
1331 return None;
1332 };
1333
1334 let currency = account
1335 .base_currency()
1336 .unwrap_or_else(|| instrument.settlement_currency());
1337
1338 let positions = cache.positions(
1339 None, Some(instrument_id),
1341 None,
1342 None,
1343 );
1344
1345 let snapshot_position_ids = cache.position_snapshot_ids(instrument_id);
1346
1347 let is_netting = positions
1349 .iter()
1350 .any(|p| cache.oms_type(&p.id) == Some(OmsType::Netting));
1351
1352 let mut total_pnl = 0.0;
1353
1354 if is_netting && !snapshot_position_ids.is_empty() {
1355 for position_id in &snapshot_position_ids {
1358 let is_active = positions.iter().any(|p| p.id == *position_id);
1359
1360 if is_active {
1361 let last_pnl = self
1363 .inner
1364 .borrow()
1365 .snapshot_last_per_position
1366 .get(position_id)
1367 .copied();
1368 if let Some(last_pnl) = last_pnl {
1369 let mut pnl = last_pnl.as_f64();
1370
1371 if let Some(base_currency) = account.base_currency()
1372 && let Some(position) = positions.iter().find(|p| p.id == *position_id)
1373 {
1374 let xrate = if let Some(xrate) =
1375 self.calculate_xrate_to_base(instrument, account, position.entry)
1376 {
1377 xrate
1378 } else {
1379 log::error!(
1380 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1381 instrument.settlement_currency(),
1382 base_currency
1383 );
1384 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1385 return Some(Money::new(0.0, currency));
1386 };
1387
1388 let scale = 10f64.powi(currency.precision.into());
1389 pnl = ((pnl * xrate) * scale).round() / scale;
1390 }
1391
1392 total_pnl += pnl;
1393 }
1394 } else {
1395 let sum_pnl = self
1397 .inner
1398 .borrow()
1399 .snapshot_sum_per_position
1400 .get(position_id)
1401 .copied();
1402 if let Some(sum_pnl) = sum_pnl {
1403 let mut pnl = sum_pnl.as_f64();
1404
1405 if let Some(base_currency) = account.base_currency() {
1406 let xrate = cache.get_xrate(
1408 instrument_id.venue,
1409 instrument.settlement_currency(),
1410 base_currency,
1411 PriceType::Mid,
1412 );
1413
1414 if let Some(xrate) = xrate {
1415 let scale = 10f64.powi(currency.precision.into());
1416 pnl = ((pnl * xrate) * scale).round() / scale;
1417 } else {
1418 log::error!(
1419 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1420 instrument.settlement_currency(),
1421 base_currency
1422 );
1423 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1424 return Some(Money::new(0.0, currency));
1425 }
1426 }
1427
1428 total_pnl += pnl;
1429 }
1430 }
1431 }
1432
1433 for position in positions {
1435 if position.instrument_id != *instrument_id {
1436 continue;
1437 }
1438
1439 if let Some(realized_pnl) = position.realized_pnl {
1440 let mut pnl = realized_pnl.as_f64();
1441
1442 if let Some(base_currency) = account.base_currency() {
1443 let xrate = if let Some(xrate) =
1444 self.calculate_xrate_to_base(instrument, account, position.entry)
1445 {
1446 xrate
1447 } else {
1448 log::error!(
1449 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1450 instrument.settlement_currency(),
1451 base_currency
1452 );
1453 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1454 return Some(Money::new(0.0, currency));
1455 };
1456
1457 let scale = 10f64.powi(currency.precision.into());
1458 pnl = ((pnl * xrate) * scale).round() / scale;
1459 }
1460
1461 total_pnl += pnl;
1462 }
1463 }
1464 } else {
1465 for position_id in &snapshot_position_ids {
1468 let sum_pnl = self
1469 .inner
1470 .borrow()
1471 .snapshot_sum_per_position
1472 .get(position_id)
1473 .copied();
1474 if let Some(sum_pnl) = sum_pnl {
1475 let mut pnl = sum_pnl.as_f64();
1476
1477 if let Some(base_currency) = account.base_currency() {
1478 let xrate = cache.get_xrate(
1479 instrument_id.venue,
1480 instrument.settlement_currency(),
1481 base_currency,
1482 PriceType::Mid,
1483 );
1484
1485 if let Some(xrate) = xrate {
1486 let scale = 10f64.powi(currency.precision.into());
1487 pnl = ((pnl * xrate) * scale).round() / scale;
1488 } else {
1489 log::error!(
1490 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1491 instrument.settlement_currency(),
1492 base_currency
1493 );
1494 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1495 return Some(Money::new(0.0, currency));
1496 }
1497 }
1498
1499 total_pnl += pnl;
1500 }
1501 }
1502
1503 for position in positions {
1505 if position.instrument_id != *instrument_id {
1506 continue;
1507 }
1508
1509 if let Some(realized_pnl) = position.realized_pnl {
1510 let mut pnl = realized_pnl.as_f64();
1511
1512 if let Some(base_currency) = account.base_currency() {
1513 let xrate = if let Some(xrate) =
1514 self.calculate_xrate_to_base(instrument, account, position.entry)
1515 {
1516 xrate
1517 } else {
1518 log::error!(
1519 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1520 instrument.settlement_currency(),
1521 base_currency
1522 );
1523 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1524 return Some(Money::new(0.0, currency));
1525 };
1526
1527 let scale = 10f64.powi(currency.precision.into());
1528 pnl = ((pnl * xrate) * scale).round() / scale;
1529 }
1530
1531 total_pnl += pnl;
1532 }
1533 }
1534 }
1535
1536 Some(Money::new(total_pnl, currency))
1537 }
1538
1539 fn get_price(&self, position: &Position) -> Option<Price> {
1540 let cache = self.cache.borrow();
1541 let instrument_id = &position.instrument_id;
1542
1543 if self.config.use_mark_prices
1545 && let Some(mark_price) = cache.mark_price(instrument_id)
1546 {
1547 return Some(mark_price.value);
1548 }
1549
1550 let price_type = match position.side {
1552 PositionSide::Long => PriceType::Bid,
1553 PositionSide::Short => PriceType::Ask,
1554 _ => panic!("invalid `PositionSide`, was {}", position.side),
1555 };
1556
1557 cache
1558 .price(instrument_id, price_type)
1559 .or_else(|| cache.price(instrument_id, PriceType::Last))
1560 .or_else(|| {
1561 self.inner
1562 .borrow()
1563 .bar_close_prices
1564 .get(instrument_id)
1565 .copied()
1566 })
1567 }
1568
1569 fn calculate_xrate_to_base(
1570 &self,
1571 instrument: &InstrumentAny,
1572 account: &AccountAny,
1573 side: OrderSide,
1574 ) -> Option<f64> {
1575 if !self.config.convert_to_account_base_currency {
1576 return Some(1.0); }
1578
1579 match account.base_currency() {
1580 None => Some(1.0), Some(base_currency) => {
1582 let cache = self.cache.borrow();
1583
1584 if self.config.use_mark_xrates {
1585 return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1586 }
1587
1588 let price_type = if side == OrderSide::Buy {
1589 PriceType::Bid
1590 } else {
1591 PriceType::Ask
1592 };
1593
1594 cache.get_xrate(
1595 instrument.id().venue,
1596 instrument.settlement_currency(),
1597 base_currency,
1598 price_type,
1599 )
1600 }
1601 }
1602 }
1603}
1604
1605fn update_quote_tick(
1607 cache: Rc<RefCell<Cache>>,
1608 clock: Rc<RefCell<dyn Clock>>,
1609 inner: Rc<RefCell<PortfolioState>>,
1610 config: PortfolioConfig,
1611 quote: &QuoteTick,
1612) {
1613 update_instrument_id(cache, clock.clone(), inner, config, "e.instrument_id);
1614}
1615
1616fn update_bar(
1617 cache: Rc<RefCell<Cache>>,
1618 clock: Rc<RefCell<dyn Clock>>,
1619 inner: Rc<RefCell<PortfolioState>>,
1620 config: PortfolioConfig,
1621 bar: &Bar,
1622) {
1623 let instrument_id = bar.bar_type.instrument_id();
1624 inner
1625 .borrow_mut()
1626 .bar_close_prices
1627 .insert(instrument_id, bar.close);
1628 update_instrument_id(cache, clock.clone(), inner, config, &instrument_id);
1629}
1630
1631fn update_instrument_id(
1632 cache: Rc<RefCell<Cache>>,
1633 clock: Rc<RefCell<dyn Clock>>,
1634 inner: Rc<RefCell<PortfolioState>>,
1635 config: PortfolioConfig,
1636 instrument_id: &InstrumentId,
1637) {
1638 inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1639
1640 if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1641 return;
1642 }
1643
1644 let result_init;
1645 let mut result_maint = None;
1646
1647 let account = {
1648 let account = {
1649 let cache_ref = cache.borrow();
1650 if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1651 account.clone()
1652 } else {
1653 log::error!(
1654 "Cannot update tick: no account registered for {}",
1655 instrument_id.venue
1656 );
1657 return;
1658 }
1659 };
1660
1661 let mut cache_ref = cache.borrow_mut();
1662 let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1663 instrument.clone()
1664 } else {
1665 log::error!("Cannot update tick: no instrument found for {instrument_id}");
1666 return;
1667 };
1668
1669 let orders_open: Vec<OrderAny> = cache_ref
1671 .orders_open(None, Some(instrument_id), None, None)
1672 .iter()
1673 .map(|o| (*o).clone())
1674 .collect();
1675
1676 let positions_open: Vec<Position> = cache_ref
1677 .positions_open(None, Some(instrument_id), None, None)
1678 .iter()
1679 .map(|p| (*p).clone())
1680 .collect();
1681
1682 result_init = inner.borrow().accounts.update_orders(
1683 &account,
1684 instrument.clone(),
1685 orders_open.iter().collect(),
1686 clock.borrow().timestamp_ns(),
1687 );
1688
1689 if let AccountAny::Margin(ref margin_account) = account {
1690 result_maint = inner.borrow().accounts.update_positions(
1691 margin_account,
1692 instrument,
1693 positions_open.iter().collect(),
1694 clock.borrow().timestamp_ns(),
1695 );
1696 }
1697
1698 if let Some((ref updated_account, _)) = result_init {
1699 cache_ref.update_account(updated_account.clone()).unwrap();
1700 }
1701 account
1702 };
1703
1704 let mut portfolio_clone = Portfolio {
1705 clock: clock.clone(),
1706 cache,
1707 inner: inner.clone(),
1708 config,
1709 };
1710
1711 let result_unrealized_pnl: Option<Money> =
1712 portfolio_clone.calculate_unrealized_pnl(instrument_id);
1713
1714 if result_init.is_some()
1715 && (matches!(account, AccountAny::Cash(_))
1716 || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1717 {
1718 inner.borrow_mut().pending_calcs.remove(instrument_id);
1719 if inner.borrow().pending_calcs.is_empty() {
1720 inner.borrow_mut().initialized = true;
1721 }
1722 }
1723}
1724
1725fn update_order(
1726 cache: Rc<RefCell<Cache>>,
1727 clock: Rc<RefCell<dyn Clock>>,
1728 inner: Rc<RefCell<PortfolioState>>,
1729 _config: PortfolioConfig,
1730 event: &OrderEventAny,
1731) {
1732 let cache_ref = cache.borrow();
1733 let account_id = match event.account_id() {
1734 Some(account_id) => account_id,
1735 None => {
1736 return; }
1738 };
1739
1740 let account = if let Some(account) = cache_ref.account(&account_id) {
1741 account
1742 } else {
1743 log::error!("Cannot update order: no account registered for {account_id}");
1744 return;
1745 };
1746
1747 match account {
1748 AccountAny::Cash(cash_account) => {
1749 if !cash_account.base.calculate_account_state {
1750 return;
1751 }
1752 }
1753 AccountAny::Margin(margin_account) => {
1754 if !margin_account.base.calculate_account_state {
1755 return;
1756 }
1757 }
1758 }
1759
1760 match event {
1761 OrderEventAny::Accepted(_)
1762 | OrderEventAny::Canceled(_)
1763 | OrderEventAny::Rejected(_)
1764 | OrderEventAny::Updated(_)
1765 | OrderEventAny::Filled(_) => {}
1766 _ => {
1767 return;
1768 }
1769 }
1770
1771 let cache_ref = cache.borrow();
1772 let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1773 order
1774 } else {
1775 log::error!(
1776 "Cannot update order: {} not found in the cache",
1777 event.client_order_id()
1778 );
1779 return; };
1781
1782 if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1783 return; }
1785
1786 let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1787 instrument_id
1788 } else {
1789 log::error!(
1790 "Cannot update order: no instrument found for {}",
1791 event.instrument_id()
1792 );
1793 return;
1794 };
1795
1796 if let OrderEventAny::Filled(order_filled) = event {
1797 let _ = inner.borrow().accounts.update_balances(
1798 account.clone(),
1799 instrument.clone(),
1800 *order_filled,
1801 );
1802
1803 let mut portfolio_clone = Portfolio {
1804 clock: clock.clone(),
1805 cache: cache.clone(),
1806 inner: inner.clone(),
1807 config: PortfolioConfig::default(), };
1809
1810 match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1811 Some(unrealized_pnl) => {
1812 inner
1813 .borrow_mut()
1814 .unrealized_pnls
1815 .insert(event.instrument_id(), unrealized_pnl);
1816 }
1817 None => {
1818 log::error!(
1819 "Failed to calculate unrealized PnL for instrument {}",
1820 event.instrument_id()
1821 );
1822 }
1823 }
1824 }
1825
1826 let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None);
1827
1828 let account_state = inner.borrow_mut().accounts.update_orders(
1829 account,
1830 instrument.clone(),
1831 orders_open,
1832 clock.borrow().timestamp_ns(),
1833 );
1834
1835 let mut cache_ref = cache.borrow_mut();
1836 cache_ref.update_account(account.clone()).unwrap();
1837
1838 if let Some(account_state) = account_state {
1839 msgbus::publish(
1840 format!("events.account.{}", account.id()).into(),
1841 &account_state,
1842 );
1843 } else {
1844 log::debug!("Added pending calculation for {}", instrument.id());
1845 inner.borrow_mut().pending_calcs.insert(instrument.id());
1846 }
1847
1848 log::debug!("Updated {event}");
1849}
1850
1851fn update_position(
1852 cache: Rc<RefCell<Cache>>,
1853 clock: Rc<RefCell<dyn Clock>>,
1854 inner: Rc<RefCell<PortfolioState>>,
1855 _config: PortfolioConfig,
1856 event: &PositionEvent,
1857) {
1858 let instrument_id = event.instrument_id();
1859
1860 let positions_open: Vec<Position> = {
1861 let cache_ref = cache.borrow();
1862
1863 cache_ref
1864 .positions_open(None, Some(&instrument_id), None, None)
1865 .iter()
1866 .map(|o| (*o).clone())
1867 .collect()
1868 };
1869
1870 log::debug!("position fresh from cache -> {positions_open:?}");
1871
1872 let mut portfolio_clone = Portfolio {
1873 clock: clock.clone(),
1874 cache: cache.clone(),
1875 inner: inner.clone(),
1876 config: PortfolioConfig::default(), };
1878
1879 portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1880
1881 if let Some(calculated_unrealized_pnl) =
1882 portfolio_clone.calculate_unrealized_pnl(&instrument_id)
1883 {
1884 inner
1885 .borrow_mut()
1886 .unrealized_pnls
1887 .insert(event.instrument_id(), calculated_unrealized_pnl);
1888 } else {
1889 log::warn!(
1890 "Failed to calculate unrealized PnL for {}, marking as pending",
1891 event.instrument_id()
1892 );
1893 inner
1894 .borrow_mut()
1895 .pending_calcs
1896 .insert(event.instrument_id());
1897 }
1898
1899 if let Some(calculated_realized_pnl) = portfolio_clone.calculate_realized_pnl(&instrument_id) {
1900 inner
1901 .borrow_mut()
1902 .realized_pnls
1903 .insert(event.instrument_id(), calculated_realized_pnl);
1904 } else {
1905 log::warn!(
1906 "Failed to calculate realized PnL for {}, marking as pending",
1907 event.instrument_id()
1908 );
1909 inner
1910 .borrow_mut()
1911 .pending_calcs
1912 .insert(event.instrument_id());
1913 }
1914
1915 let cache_ref = cache.borrow();
1916 let account = cache_ref.account(&event.account_id());
1917
1918 if let Some(AccountAny::Margin(margin_account)) = account {
1919 if !margin_account.calculate_account_state {
1920 return; }
1922
1923 let cache_ref = cache.borrow();
1924 let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1925 instrument
1926 } else {
1927 log::error!("Cannot update position: no instrument found for {instrument_id}");
1928 return;
1929 };
1930
1931 let result = inner.borrow_mut().accounts.update_positions(
1932 margin_account,
1933 instrument.clone(),
1934 positions_open.iter().collect(),
1935 clock.borrow().timestamp_ns(),
1936 );
1937 let mut cache_ref = cache.borrow_mut();
1938 if let Some((margin_account, _)) = result {
1939 cache_ref
1940 .update_account(AccountAny::Margin(margin_account))
1941 .unwrap();
1942 }
1943 } else if account.is_none() {
1944 log::error!(
1945 "Cannot update position: no account registered for {}",
1946 event.account_id()
1947 );
1948 }
1949}
1950
1951fn update_account(
1952 cache: Rc<RefCell<Cache>>,
1953 inner: Rc<RefCell<PortfolioState>>,
1954 event: &AccountState,
1955) {
1956 let mut cache_ref = cache.borrow_mut();
1957
1958 if let Some(existing) = cache_ref.account(&event.account_id) {
1959 let mut account = existing.clone();
1960 account.apply(event.clone());
1961
1962 if let Err(e) = cache_ref.update_account(account.clone()) {
1963 log::error!("Failed to update account: {e}");
1964 return;
1965 }
1966 } else {
1967 let account = match AccountAny::from_events(vec![event.clone()]) {
1968 Ok(account) => account,
1969 Err(e) => {
1970 log::error!("Failed to create account: {e}");
1971 return;
1972 }
1973 };
1974
1975 if let Err(e) = cache_ref.add_account(account) {
1976 log::error!("Failed to add account: {e}");
1977 return;
1978 }
1979 }
1980
1981 let mut inner_ref = inner.borrow_mut();
1983 let should_log = if inner_ref.min_account_state_logging_interval_ns > 0 {
1984 let current_ts = event.ts_init.as_u64();
1985 let last_ts = inner_ref
1986 .last_account_state_log_ts
1987 .get(&event.account_id)
1988 .copied()
1989 .unwrap_or(0);
1990
1991 if last_ts == 0 || (current_ts - last_ts) >= inner_ref.min_account_state_logging_interval_ns
1992 {
1993 inner_ref
1994 .last_account_state_log_ts
1995 .insert(event.account_id, current_ts);
1996 true
1997 } else {
1998 false
1999 }
2000 } else {
2001 true };
2003
2004 if should_log {
2005 log::info!("Updated {event}");
2006 }
2007}