1use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_analysis::analyzer::PortfolioAnalyzer;
22use nautilus_common::{
23 cache::Cache,
24 clock::Clock,
25 enums::LogColor,
26 msgbus::{
27 self,
28 handler::{ShareableMessageHandler, TypedMessageHandler},
29 },
30};
31use nautilus_core::{WeakCell, datetime::NANOSECONDS_IN_MILLISECOND};
32use nautilus_model::{
33 accounts::AccountAny,
34 data::{Bar, MarkPriceUpdate, QuoteTick},
35 enums::{OmsType, OrderSide, OrderType, PositionSide, PriceType},
36 events::{AccountState, OrderEventAny, position::PositionEvent},
37 identifiers::{AccountId, InstrumentId, PositionId, Venue},
38 instruments::{Instrument, InstrumentAny},
39 orders::{Order, OrderAny},
40 position::Position,
41 types::{Currency, Money, Price},
42};
43use rust_decimal::{Decimal, prelude::FromPrimitive};
44
45use crate::{config::PortfolioConfig, manager::AccountsManager};
46
47struct PortfolioState {
48 accounts: AccountsManager,
49 analyzer: PortfolioAnalyzer,
50 unrealized_pnls: AHashMap<InstrumentId, Money>,
51 realized_pnls: AHashMap<InstrumentId, Money>,
52 snapshot_sum_per_position: AHashMap<PositionId, Money>,
53 snapshot_last_per_position: AHashMap<PositionId, Money>,
54 snapshot_processed_counts: AHashMap<PositionId, usize>,
55 net_positions: AHashMap<InstrumentId, Decimal>,
56 pending_calcs: AHashSet<InstrumentId>,
57 bar_close_prices: AHashMap<InstrumentId, Price>,
58 initialized: bool,
59 last_account_state_log_ts: AHashMap<AccountId, u64>,
60 min_account_state_logging_interval_ns: u64,
61}
62
63impl PortfolioState {
64 fn new(
65 clock: Rc<RefCell<dyn Clock>>,
66 cache: Rc<RefCell<Cache>>,
67 config: &PortfolioConfig,
68 ) -> Self {
69 let min_account_state_logging_interval_ns = config
70 .min_account_state_logging_interval_ms
71 .map_or(0, |ms| ms * NANOSECONDS_IN_MILLISECOND);
72
73 Self {
74 accounts: AccountsManager::new(clock, cache),
75 analyzer: PortfolioAnalyzer::default(),
76 unrealized_pnls: AHashMap::new(),
77 realized_pnls: AHashMap::new(),
78 snapshot_sum_per_position: AHashMap::new(),
79 snapshot_last_per_position: AHashMap::new(),
80 snapshot_processed_counts: AHashMap::new(),
81 net_positions: AHashMap::new(),
82 pending_calcs: AHashSet::new(),
83 bar_close_prices: AHashMap::new(),
84 initialized: false,
85 last_account_state_log_ts: AHashMap::new(),
86 min_account_state_logging_interval_ns,
87 }
88 }
89
90 fn reset(&mut self) {
91 log::debug!("RESETTING");
92 self.net_positions.clear();
93 self.unrealized_pnls.clear();
94 self.realized_pnls.clear();
95 self.snapshot_sum_per_position.clear();
96 self.snapshot_last_per_position.clear();
97 self.snapshot_processed_counts.clear();
98 self.pending_calcs.clear();
99 self.last_account_state_log_ts.clear();
100 self.analyzer.reset();
101 log::debug!("READY");
102 }
103}
104
105pub struct Portfolio {
106 pub(crate) clock: Rc<RefCell<dyn Clock>>,
107 pub(crate) cache: Rc<RefCell<Cache>>,
108 inner: Rc<RefCell<PortfolioState>>,
109 config: PortfolioConfig,
110}
111
112impl Debug for Portfolio {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_struct(stringify!(Portfolio)).finish()
115 }
116}
117
118impl Portfolio {
119 pub fn new(
120 cache: Rc<RefCell<Cache>>,
121 clock: Rc<RefCell<dyn Clock>>,
122 config: Option<PortfolioConfig>,
123 ) -> Self {
124 let config = config.unwrap_or_default();
125 let inner = Rc::new(RefCell::new(PortfolioState::new(
126 clock.clone(),
127 cache.clone(),
128 &config,
129 )));
130
131 Self::register_message_handlers(
132 cache.clone(),
133 clock.clone(),
134 inner.clone(),
135 config.clone(),
136 );
137
138 Self {
139 clock,
140 cache,
141 inner,
142 config,
143 }
144 }
145
146 #[must_use]
151 pub fn clone_shallow(&self) -> Self {
152 Self {
153 clock: self.clock.clone(),
154 cache: self.cache.clone(),
155 inner: self.inner.clone(),
156 config: self.config.clone(),
157 }
158 }
159
160 fn register_message_handlers(
161 cache: Rc<RefCell<Cache>>,
162 clock: Rc<RefCell<dyn Clock>>,
163 inner: Rc<RefCell<PortfolioState>>,
164 config: PortfolioConfig,
165 ) {
166 let inner_weak = WeakCell::from(Rc::downgrade(&inner));
167
168 let update_account_handler = {
169 let cache = cache.clone();
170 let inner = inner_weak.clone();
171 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
172 move |event: &AccountState| {
173 if let Some(inner_rc) = inner.upgrade() {
174 update_account(cache.clone(), inner_rc.into(), event);
175 }
176 },
177 )))
178 };
179
180 let update_position_handler = {
181 let cache = cache.clone();
182 let clock = clock.clone();
183 let inner = inner_weak.clone();
184 let config = config.clone();
185 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
186 move |event: &PositionEvent| {
187 if let Some(inner_rc) = inner.upgrade() {
188 update_position(
189 cache.clone(),
190 clock.clone(),
191 inner_rc.into(),
192 config.clone(),
193 event,
194 );
195 }
196 },
197 )))
198 };
199
200 let update_quote_handler = {
201 let cache = cache.clone();
202 let clock = clock.clone();
203 let inner = inner_weak.clone();
204 let config = config.clone();
205 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
206 move |quote: &QuoteTick| {
207 if let Some(inner_rc) = inner.upgrade() {
208 update_quote_tick(
209 cache.clone(),
210 clock.clone(),
211 inner_rc.into(),
212 config.clone(),
213 quote,
214 );
215 }
216 },
217 )))
218 };
219
220 let update_bar_handler = {
221 let cache = cache.clone();
222 let clock = clock.clone();
223 let inner = inner_weak.clone();
224 let config = config.clone();
225 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
226 if let Some(inner_rc) = inner.upgrade() {
227 update_bar(
228 cache.clone(),
229 clock.clone(),
230 inner_rc.into(),
231 config.clone(),
232 bar,
233 );
234 }
235 })))
236 };
237
238 let update_mark_price_handler = {
239 let cache = cache.clone();
240 let clock = clock.clone();
241 let inner = inner_weak.clone();
242 let config = config.clone();
243 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
244 move |mark_price: &MarkPriceUpdate| {
245 if let Some(inner_rc) = inner.upgrade() {
246 update_instrument_id(
247 cache.clone(),
248 clock.clone(),
249 inner_rc.into(),
250 config.clone(),
251 &mark_price.instrument_id,
252 );
253 }
254 },
255 )))
256 };
257
258 let update_order_handler = {
259 let cache = cache;
260 let clock = clock.clone();
261 let inner = inner_weak;
262 let config = config.clone();
263 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
264 move |event: &OrderEventAny| {
265 if let Some(inner_rc) = inner.upgrade() {
266 update_order(
267 cache.clone(),
268 clock.clone(),
269 inner_rc.into(),
270 config.clone(),
271 event,
272 );
273 }
274 },
275 )))
276 };
277
278 msgbus::register(
279 "Portfolio.update_account".into(),
280 update_account_handler.clone(),
281 );
282
283 msgbus::subscribe("data.quotes.*".into(), update_quote_handler, Some(10));
284 if config.bar_updates {
285 msgbus::subscribe("data.bars.*EXTERNAL".into(), update_bar_handler, Some(10));
286 }
287 if config.use_mark_prices {
288 msgbus::subscribe(
289 "data.mark_prices.*".into(),
290 update_mark_price_handler,
291 Some(10),
292 );
293 }
294 msgbus::subscribe("events.order.*".into(), update_order_handler, Some(10));
295 msgbus::subscribe(
296 "events.position.*".into(),
297 update_position_handler,
298 Some(10),
299 );
300 msgbus::subscribe("events.account.*".into(), update_account_handler, Some(10));
301 }
302
303 pub fn reset(&mut self) {
304 log::debug!("RESETTING");
305 self.inner.borrow_mut().reset();
306 log::debug!("READY");
307 }
308
309 #[must_use]
313 pub fn is_initialized(&self) -> bool {
314 self.inner.borrow().initialized
315 }
316
317 #[must_use]
321 pub fn balances_locked(&self, venue: &Venue) -> AHashMap<Currency, Money> {
322 self.cache.borrow().account_for_venue(venue).map_or_else(
323 || {
324 log::error!("Cannot get balances locked: no account generated for {venue}");
325 AHashMap::new()
326 },
327 AccountAny::balances_locked,
328 )
329 }
330
331 #[must_use]
335 pub fn margins_init(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
336 self.cache.borrow().account_for_venue(venue).map_or_else(
337 || {
338 log::error!(
339 "Cannot get initial (order) margins: no account registered for {venue}"
340 );
341 AHashMap::new()
342 },
343 |account| match account {
344 AccountAny::Margin(margin_account) => margin_account.initial_margins(),
345 AccountAny::Cash(_) => {
346 log::warn!("Initial margins not applicable for cash account");
347 AHashMap::new()
348 }
349 },
350 )
351 }
352
353 #[must_use]
357 pub fn margins_maint(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
358 self.cache.borrow().account_for_venue(venue).map_or_else(
359 || {
360 log::error!(
361 "Cannot get maintenance (position) margins: no account registered for {venue}"
362 );
363 AHashMap::new()
364 },
365 |account| match account {
366 AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
367 AccountAny::Cash(_) => {
368 log::warn!("Maintenance margins not applicable for cash account");
369 AHashMap::new()
370 }
371 },
372 )
373 }
374
375 #[must_use]
379 pub fn unrealized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
380 let instrument_ids = {
381 let cache = self.cache.borrow();
382 let positions = cache.positions(Some(venue), None, None, None);
383
384 if positions.is_empty() {
385 return AHashMap::new(); }
387
388 let instrument_ids: AHashSet<InstrumentId> =
389 positions.iter().map(|p| p.instrument_id).collect();
390
391 instrument_ids
392 };
393
394 let mut unrealized_pnls: AHashMap<Currency, f64> = AHashMap::new();
395
396 for instrument_id in instrument_ids {
397 if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
398 *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
400 continue;
401 }
402
403 match self.calculate_unrealized_pnl(&instrument_id) {
405 Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
406 None => continue,
407 }
408 }
409
410 unrealized_pnls
411 .into_iter()
412 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
413 .collect()
414 }
415
416 #[must_use]
420 pub fn realized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
421 let instrument_ids = {
422 let cache = self.cache.borrow();
423 let positions = cache.positions(Some(venue), None, None, None);
424
425 if positions.is_empty() {
426 return AHashMap::new(); }
428
429 let instrument_ids: AHashSet<InstrumentId> =
430 positions.iter().map(|p| p.instrument_id).collect();
431
432 instrument_ids
433 };
434
435 let mut realized_pnls: AHashMap<Currency, f64> = AHashMap::new();
436
437 for instrument_id in instrument_ids {
438 if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
439 *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
441 continue;
442 }
443
444 match self.calculate_realized_pnl(&instrument_id) {
446 Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
447 None => continue,
448 }
449 }
450
451 realized_pnls
452 .into_iter()
453 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
454 .collect()
455 }
456
457 #[must_use]
458 pub fn net_exposures(&self, venue: &Venue) -> Option<AHashMap<Currency, Money>> {
459 let cache = self.cache.borrow();
460 let account = if let Some(account) = cache.account_for_venue(venue) {
461 account
462 } else {
463 log::error!("Cannot calculate net exposures: no account registered for {venue}");
464 return None; };
466
467 let positions_open = cache.positions_open(Some(venue), None, None, None);
468 if positions_open.is_empty() {
469 return Some(AHashMap::new()); }
471
472 let mut net_exposures: AHashMap<Currency, f64> = AHashMap::new();
473
474 for position in positions_open {
475 let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
476 instrument
477 } else {
478 log::error!(
479 "Cannot calculate net exposures: no instrument for {}",
480 position.instrument_id
481 );
482 return None; };
484
485 if position.side == PositionSide::Flat {
486 log::error!(
487 "Cannot calculate net exposures: position is flat for {}",
488 position.instrument_id
489 );
490 continue; }
492
493 let price = self.get_price(position)?;
494 let xrate = if let Some(xrate) =
495 self.calculate_xrate_to_base(instrument, account, position.entry)
496 {
497 xrate
498 } else {
499 log::error!(
500 "Cannot calculate net exposures: insufficient data for {}/{:?}",
502 instrument.settlement_currency(),
503 account.base_currency()
504 );
505 return None; };
507
508 let settlement_currency = account
509 .base_currency()
510 .unwrap_or_else(|| instrument.settlement_currency());
511
512 let net_exposure = instrument
513 .calculate_notional_value(position.quantity, price, None)
514 .as_f64()
515 * xrate;
516
517 let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
518 .round()
519 / 10f64.powi(settlement_currency.precision.into());
520
521 *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
522 }
523
524 Some(
525 net_exposures
526 .into_iter()
527 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
528 .collect(),
529 )
530 }
531
532 #[must_use]
533 pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
534 if let Some(pnl) = self
535 .inner
536 .borrow()
537 .unrealized_pnls
538 .get(instrument_id)
539 .copied()
540 {
541 return Some(pnl);
542 }
543
544 let pnl = self.calculate_unrealized_pnl(instrument_id)?;
545 self.inner
546 .borrow_mut()
547 .unrealized_pnls
548 .insert(*instrument_id, pnl);
549 Some(pnl)
550 }
551
552 #[must_use]
553 pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
554 if let Some(pnl) = self
555 .inner
556 .borrow()
557 .realized_pnls
558 .get(instrument_id)
559 .copied()
560 {
561 return Some(pnl);
562 }
563
564 let pnl = self.calculate_realized_pnl(instrument_id)?;
565 self.inner
566 .borrow_mut()
567 .realized_pnls
568 .insert(*instrument_id, pnl);
569 Some(pnl)
570 }
571
572 #[must_use]
576 pub fn total_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
577 let realized = self.realized_pnl(instrument_id)?;
578 let unrealized = self.unrealized_pnl(instrument_id)?;
579
580 if realized.currency != unrealized.currency {
581 log::error!(
582 "Cannot calculate total PnL: currency mismatch {} vs {}",
583 realized.currency,
584 unrealized.currency
585 );
586 return None;
587 }
588
589 Some(Money::new(
590 realized.as_f64() + unrealized.as_f64(),
591 realized.currency,
592 ))
593 }
594
595 #[must_use]
599 pub fn total_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
600 let realized_pnls = self.realized_pnls(venue);
601 let unrealized_pnls = self.unrealized_pnls(venue);
602
603 let mut total_pnls: AHashMap<Currency, Money> = AHashMap::new();
604
605 for (currency, realized) in realized_pnls {
607 total_pnls.insert(currency, realized);
608 }
609
610 for (currency, unrealized) in unrealized_pnls {
612 match total_pnls.get_mut(¤cy) {
613 Some(total) => {
614 *total = Money::new(total.as_f64() + unrealized.as_f64(), currency);
615 }
616 None => {
617 total_pnls.insert(currency, unrealized);
618 }
619 }
620 }
621
622 total_pnls
623 }
624
625 #[must_use]
626 pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
627 let cache = self.cache.borrow();
628 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
629 account
630 } else {
631 log::error!(
632 "Cannot calculate net exposure: no account registered for {}",
633 instrument_id.venue
634 );
635 return None;
636 };
637
638 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
639 instrument
640 } else {
641 log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
642 return None;
643 };
644
645 let positions_open = cache.positions_open(
646 None, Some(instrument_id),
648 None,
649 None,
650 );
651
652 if positions_open.is_empty() {
653 return Some(Money::new(0.0, instrument.settlement_currency()));
654 }
655
656 let mut net_exposure = 0.0;
657
658 for position in positions_open {
659 let price = self.get_price(position)?;
660 let xrate = if let Some(xrate) =
661 self.calculate_xrate_to_base(instrument, account, position.entry)
662 {
663 xrate
664 } else {
665 log::error!(
666 "Cannot calculate net exposures: insufficient data for {}/{:?}",
668 instrument.settlement_currency(),
669 account.base_currency()
670 );
671 return None; };
673
674 let notional_value =
675 instrument.calculate_notional_value(position.quantity, price, None);
676 net_exposure += notional_value.as_f64() * xrate;
677 }
678
679 let settlement_currency = account
680 .base_currency()
681 .unwrap_or_else(|| instrument.settlement_currency());
682
683 Some(Money::new(net_exposure, settlement_currency))
684 }
685
686 #[must_use]
687 pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
688 self.inner
689 .borrow()
690 .net_positions
691 .get(instrument_id)
692 .copied()
693 .unwrap_or(Decimal::ZERO)
694 }
695
696 #[must_use]
697 pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
698 self.inner
699 .borrow()
700 .net_positions
701 .get(instrument_id)
702 .copied()
703 .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
704 }
705
706 #[must_use]
707 pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
708 self.inner
709 .borrow()
710 .net_positions
711 .get(instrument_id)
712 .copied()
713 .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
714 }
715
716 #[must_use]
717 pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
718 self.inner
719 .borrow()
720 .net_positions
721 .get(instrument_id)
722 .copied()
723 .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
724 }
725
726 #[must_use]
727 pub fn is_completely_flat(&self) -> bool {
728 for net_position in self.inner.borrow().net_positions.values() {
729 if *net_position != Decimal::ZERO {
730 return false;
731 }
732 }
733 true
734 }
735
736 pub fn initialize_orders(&mut self) {
744 let mut initialized = true;
745 let orders_and_instruments = {
746 let cache = self.cache.borrow();
747 let all_orders_open = cache.orders_open(None, None, None, None);
748
749 let mut instruments_with_orders = Vec::new();
750 let mut instruments = AHashSet::new();
751
752 for order in &all_orders_open {
753 instruments.insert(order.instrument_id());
754 }
755
756 for instrument_id in instruments {
757 if let Some(instrument) = cache.instrument(&instrument_id) {
758 let orders = cache
759 .orders_open(None, Some(&instrument_id), None, None)
760 .into_iter()
761 .cloned()
762 .collect::<Vec<OrderAny>>();
763 instruments_with_orders.push((instrument.clone(), orders));
764 } else {
765 log::error!(
766 "Cannot update initial (order) margin: no instrument found for {instrument_id}"
767 );
768 initialized = false;
769 break;
770 }
771 }
772 instruments_with_orders
773 };
774
775 for (instrument, orders_open) in &orders_and_instruments {
776 let mut cache = self.cache.borrow_mut();
777 let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
778 account
779 } else {
780 log::error!(
781 "Cannot update initial (order) margin: no account registered for {}",
782 instrument.id().venue
783 );
784 initialized = false;
785 break;
786 };
787
788 let result = self.inner.borrow_mut().accounts.update_orders(
789 account,
790 instrument.clone(),
791 orders_open.iter().collect(),
792 self.clock.borrow().timestamp_ns(),
793 );
794
795 match result {
796 Some((updated_account, _)) => {
797 cache.update_account(updated_account).unwrap();
798 }
799 None => {
800 initialized = false;
801 }
802 }
803 }
804
805 let total_orders = orders_and_instruments
806 .into_iter()
807 .map(|(_, orders)| orders.len())
808 .sum::<usize>();
809
810 log::info!(
811 color = if total_orders > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
812 "Initialized {} open order{}",
813 total_orders,
814 if total_orders == 1 { "" } else { "s" }
815 );
816
817 self.inner.borrow_mut().initialized = initialized;
818 }
819
820 pub fn initialize_positions(&mut self) {
826 self.inner.borrow_mut().unrealized_pnls.clear();
827 self.inner.borrow_mut().realized_pnls.clear();
828 let all_positions_open: Vec<Position>;
829 let mut instruments = AHashSet::new();
830 {
831 let cache = self.cache.borrow();
832 all_positions_open = cache
833 .positions_open(None, None, None, None)
834 .into_iter()
835 .cloned()
836 .collect();
837 for position in &all_positions_open {
838 instruments.insert(position.instrument_id);
839 }
840 }
841
842 let mut initialized = true;
843
844 for instrument_id in instruments {
845 let positions_open: Vec<Position> = {
846 let cache = self.cache.borrow();
847 cache
848 .positions_open(None, Some(&instrument_id), None, None)
849 .into_iter()
850 .cloned()
851 .collect()
852 };
853
854 self.update_net_position(&instrument_id, positions_open);
855
856 if let Some(calculated_unrealized_pnl) = self.calculate_unrealized_pnl(&instrument_id) {
857 self.inner
858 .borrow_mut()
859 .unrealized_pnls
860 .insert(instrument_id, calculated_unrealized_pnl);
861 } else {
862 log::warn!(
863 "Failed to calculate unrealized PnL for {instrument_id}, marking as pending"
864 );
865 self.inner.borrow_mut().pending_calcs.insert(instrument_id);
866 }
867
868 if let Some(calculated_realized_pnl) = self.calculate_realized_pnl(&instrument_id) {
869 self.inner
870 .borrow_mut()
871 .realized_pnls
872 .insert(instrument_id, calculated_realized_pnl);
873 } else {
874 log::warn!(
875 "Failed to calculate realized PnL for {instrument_id}, marking as pending"
876 );
877 self.inner.borrow_mut().pending_calcs.insert(instrument_id);
878 }
879
880 let cache = self.cache.borrow();
881 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
882 account
883 } else {
884 log::error!(
885 "Cannot update maintenance (position) margin: no account registered for {}",
886 instrument_id.venue
887 );
888 initialized = false;
889 break;
890 };
891
892 let account = match account {
893 AccountAny::Cash(_) => continue,
894 AccountAny::Margin(margin_account) => margin_account,
895 };
896
897 let mut cache = self.cache.borrow_mut();
898 let instrument = if let Some(instrument) = cache.instrument(&instrument_id) {
899 instrument
900 } else {
901 log::error!(
902 "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
903 );
904 initialized = false;
905 break;
906 };
907
908 let result = self.inner.borrow_mut().accounts.update_positions(
909 account,
910 instrument.clone(),
911 self.cache
912 .borrow()
913 .positions_open(None, Some(&instrument_id), None, None),
914 self.clock.borrow().timestamp_ns(),
915 );
916
917 match result {
918 Some((updated_account, _)) => {
919 cache
920 .update_account(AccountAny::Margin(updated_account))
921 .unwrap();
922 }
923 None => {
924 initialized = false;
925 }
926 }
927 }
928
929 let open_count = all_positions_open.len();
930 self.inner.borrow_mut().initialized = initialized;
931 log::info!(
932 color = if open_count > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
933 "Initialized {} open position{}",
934 open_count,
935 if open_count == 1 { "" } else { "s" }
936 );
937 }
938
939 pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
943 update_quote_tick(
944 self.cache.clone(),
945 self.clock.clone(),
946 self.inner.clone(),
947 self.config.clone(),
948 quote,
949 );
950 }
951
952 pub fn update_bar(&mut self, bar: &Bar) {
956 update_bar(
957 self.cache.clone(),
958 self.clock.clone(),
959 self.inner.clone(),
960 self.config.clone(),
961 bar,
962 );
963 }
964
965 pub fn update_account(&mut self, event: &AccountState) {
967 update_account(self.cache.clone(), self.inner.clone(), event);
968 }
969
970 pub fn update_order(&mut self, event: &OrderEventAny) {
974 update_order(
975 self.cache.clone(),
976 self.clock.clone(),
977 self.inner.clone(),
978 self.config.clone(),
979 event,
980 );
981 }
982
983 pub fn update_position(&mut self, event: &PositionEvent) {
987 update_position(
988 self.cache.clone(),
989 self.clock.clone(),
990 self.inner.clone(),
991 self.config.clone(),
992 event,
993 );
994 }
995
996 fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
999 let mut net_position = Decimal::ZERO;
1000
1001 for open_position in positions_open {
1002 log::debug!("open_position: {open_position}");
1003 net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
1004 }
1005
1006 let existing_position = self.net_position(instrument_id);
1007 if existing_position != net_position {
1008 self.inner
1009 .borrow_mut()
1010 .net_positions
1011 .insert(*instrument_id, net_position);
1012 log::info!("{instrument_id} net_position={net_position}");
1013 }
1014 }
1015
1016 fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1017 let cache = self.cache.borrow();
1018 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1019 account
1020 } else {
1021 log::error!(
1022 "Cannot calculate unrealized PnL: no account registered for {}",
1023 instrument_id.venue
1024 );
1025 return None;
1026 };
1027
1028 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1029 instrument
1030 } else {
1031 log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
1032 return None;
1033 };
1034
1035 let currency = account
1036 .base_currency()
1037 .unwrap_or_else(|| instrument.settlement_currency());
1038
1039 let positions_open = cache.positions_open(
1040 None, Some(instrument_id),
1042 None,
1043 None,
1044 );
1045
1046 if positions_open.is_empty() {
1047 return Some(Money::new(0.0, currency));
1048 }
1049
1050 let mut total_pnl = 0.0;
1051
1052 for position in positions_open {
1053 if position.instrument_id != *instrument_id {
1054 continue; }
1056
1057 if position.side == PositionSide::Flat {
1058 continue; }
1060
1061 let price = if let Some(price) = self.get_price(position) {
1062 price
1063 } else {
1064 log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
1065 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1066 return None; };
1068
1069 let mut pnl = position.unrealized_pnl(price).as_f64();
1070
1071 if let Some(base_currency) = account.base_currency() {
1072 let xrate = if let Some(xrate) =
1073 self.calculate_xrate_to_base(instrument, account, position.entry)
1074 {
1075 xrate
1076 } else {
1077 log::error!(
1078 "Cannot calculate unrealized PnL: insufficient data for {}/{}",
1080 instrument.settlement_currency(),
1081 base_currency
1082 );
1083 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1084 return None; };
1086
1087 let scale = 10f64.powi(currency.precision.into());
1088 pnl = ((pnl * xrate) * scale).round() / scale;
1089 }
1090
1091 total_pnl += pnl;
1092 }
1093
1094 Some(Money::new(total_pnl, currency))
1095 }
1096
1097 fn ensure_snapshot_pnls_cached_for(&mut self, instrument_id: &InstrumentId) {
1098 let snapshot_position_ids = self.cache.borrow().position_snapshot_ids(instrument_id);
1104
1105 if snapshot_position_ids.is_empty() {
1106 return; }
1108
1109 let mut rebuild = false;
1110
1111 for position_id in &snapshot_position_ids {
1113 let position_snapshots = self.cache.borrow().position_snapshot_bytes(position_id);
1114 let curr_count = position_snapshots.map_or(0, |s| {
1115 s.split(|&b| b == b'{').count() - 1
1117 });
1118 let prev_count = self
1119 .inner
1120 .borrow()
1121 .snapshot_processed_counts
1122 .get(position_id)
1123 .copied()
1124 .unwrap_or(0);
1125
1126 if prev_count > curr_count {
1127 rebuild = true;
1128 break;
1129 }
1130 }
1131
1132 if rebuild {
1133 for position_id in &snapshot_position_ids {
1135 if let Some(position_snapshots) =
1136 self.cache.borrow().position_snapshot_bytes(position_id)
1137 {
1138 let mut sum_pnl: Option<Money> = None;
1139 let mut last_pnl: Option<Money> = None;
1140
1141 let mut start = 0;
1143 let mut depth = 0;
1144 let mut in_string = false;
1145 let mut escape_next = false;
1146
1147 for (i, &byte) in position_snapshots.iter().enumerate() {
1148 if escape_next {
1149 escape_next = false;
1150 continue;
1151 }
1152
1153 if byte == b'\\' && in_string {
1154 escape_next = true;
1155 continue;
1156 }
1157
1158 if byte == b'"' && !escape_next {
1159 in_string = !in_string;
1160 }
1161
1162 if !in_string {
1163 if byte == b'{' {
1164 if depth == 0 {
1165 start = i;
1166 }
1167 depth += 1;
1168 } else if byte == b'}' {
1169 depth -= 1;
1170 if depth == 0
1171 && let Ok(snapshot) = serde_json::from_slice::<Position>(
1172 &position_snapshots[start..=i],
1173 )
1174 && let Some(realized_pnl) = snapshot.realized_pnl
1175 {
1176 if let Some(ref mut sum) = sum_pnl {
1177 if sum.currency == realized_pnl.currency {
1178 *sum = Money::new(
1179 sum.as_f64() + realized_pnl.as_f64(),
1180 sum.currency,
1181 );
1182 }
1183 } else {
1184 sum_pnl = Some(realized_pnl);
1185 }
1186 last_pnl = Some(realized_pnl);
1187 }
1188 }
1189 }
1190 }
1191
1192 let mut inner = self.inner.borrow_mut();
1193 if let Some(sum) = sum_pnl {
1194 inner.snapshot_sum_per_position.insert(*position_id, sum);
1195 if let Some(last) = last_pnl {
1196 inner.snapshot_last_per_position.insert(*position_id, last);
1197 }
1198 } else {
1199 inner.snapshot_sum_per_position.remove(position_id);
1200 inner.snapshot_last_per_position.remove(position_id);
1201 }
1202
1203 let snapshot_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1204 inner
1205 .snapshot_processed_counts
1206 .insert(*position_id, snapshot_count);
1207 }
1208 }
1209 } else {
1210 for position_id in &snapshot_position_ids {
1212 if let Some(position_snapshots) =
1213 self.cache.borrow().position_snapshot_bytes(position_id)
1214 {
1215 let curr_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1216 let prev_count = self
1217 .inner
1218 .borrow()
1219 .snapshot_processed_counts
1220 .get(position_id)
1221 .copied()
1222 .unwrap_or(0);
1223
1224 if prev_count >= curr_count {
1225 continue;
1226 }
1227
1228 let mut sum_pnl = self
1229 .inner
1230 .borrow()
1231 .snapshot_sum_per_position
1232 .get(position_id)
1233 .copied();
1234 let mut last_pnl = self
1235 .inner
1236 .borrow()
1237 .snapshot_last_per_position
1238 .get(position_id)
1239 .copied();
1240
1241 let mut start = 0;
1243 let mut depth = 0;
1244 let mut in_string = false;
1245 let mut escape_next = false;
1246 let mut snapshot_index = 0;
1247
1248 for (i, &byte) in position_snapshots.iter().enumerate() {
1249 if escape_next {
1250 escape_next = false;
1251 continue;
1252 }
1253
1254 if byte == b'\\' && in_string {
1255 escape_next = true;
1256 continue;
1257 }
1258
1259 if byte == b'"' && !escape_next {
1260 in_string = !in_string;
1261 }
1262
1263 if !in_string {
1264 if byte == b'{' {
1265 if depth == 0 {
1266 start = i;
1267 }
1268 depth += 1;
1269 } else if byte == b'}' {
1270 depth -= 1;
1271 if depth == 0 {
1272 snapshot_index += 1;
1273 if snapshot_index > prev_count
1275 && let Ok(snapshot) = serde_json::from_slice::<Position>(
1276 &position_snapshots[start..=i],
1277 )
1278 && let Some(realized_pnl) = snapshot.realized_pnl
1279 {
1280 if let Some(ref mut sum) = sum_pnl {
1281 if sum.currency == realized_pnl.currency {
1282 *sum = Money::new(
1283 sum.as_f64() + realized_pnl.as_f64(),
1284 sum.currency,
1285 );
1286 }
1287 } else {
1288 sum_pnl = Some(realized_pnl);
1289 }
1290 last_pnl = Some(realized_pnl);
1291 }
1292 }
1293 }
1294 }
1295 }
1296
1297 let mut inner = self.inner.borrow_mut();
1298 if let Some(sum) = sum_pnl {
1299 inner.snapshot_sum_per_position.insert(*position_id, sum);
1300 if let Some(last) = last_pnl {
1301 inner.snapshot_last_per_position.insert(*position_id, last);
1302 }
1303 }
1304 inner
1305 .snapshot_processed_counts
1306 .insert(*position_id, curr_count);
1307 }
1308 }
1309 }
1310 }
1311
1312 fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1313 self.ensure_snapshot_pnls_cached_for(instrument_id);
1315
1316 let cache = self.cache.borrow();
1317 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1318 account
1319 } else {
1320 log::error!(
1321 "Cannot calculate realized PnL: no account registered for {}",
1322 instrument_id.venue
1323 );
1324 return None;
1325 };
1326
1327 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1328 instrument
1329 } else {
1330 log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
1331 return None;
1332 };
1333
1334 let currency = account
1335 .base_currency()
1336 .unwrap_or_else(|| instrument.settlement_currency());
1337
1338 let positions = cache.positions(
1339 None, Some(instrument_id),
1341 None,
1342 None,
1343 );
1344
1345 let snapshot_position_ids = cache.position_snapshot_ids(instrument_id);
1346
1347 let is_netting = positions
1349 .iter()
1350 .any(|p| cache.oms_type(&p.id) == Some(OmsType::Netting));
1351
1352 let mut total_pnl = 0.0;
1353
1354 if is_netting && !snapshot_position_ids.is_empty() {
1355 for position_id in &snapshot_position_ids {
1358 let is_active = positions.iter().any(|p| p.id == *position_id);
1359
1360 if is_active {
1361 let last_pnl = self
1363 .inner
1364 .borrow()
1365 .snapshot_last_per_position
1366 .get(position_id)
1367 .copied();
1368 if let Some(last_pnl) = last_pnl {
1369 let mut pnl = last_pnl.as_f64();
1370
1371 if let Some(base_currency) = account.base_currency()
1372 && let Some(position) = positions.iter().find(|p| p.id == *position_id)
1373 {
1374 let xrate = if let Some(xrate) =
1375 self.calculate_xrate_to_base(instrument, account, position.entry)
1376 {
1377 xrate
1378 } else {
1379 log::error!(
1380 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1381 instrument.settlement_currency(),
1382 base_currency
1383 );
1384 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1385 return Some(Money::new(0.0, currency));
1386 };
1387
1388 let scale = 10f64.powi(currency.precision.into());
1389 pnl = ((pnl * xrate) * scale).round() / scale;
1390 }
1391
1392 total_pnl += pnl;
1393 }
1394 } else {
1395 let sum_pnl = self
1397 .inner
1398 .borrow()
1399 .snapshot_sum_per_position
1400 .get(position_id)
1401 .copied();
1402 if let Some(sum_pnl) = sum_pnl {
1403 let mut pnl = sum_pnl.as_f64();
1404
1405 if let Some(base_currency) = account.base_currency() {
1406 let xrate = cache.get_xrate(
1408 instrument_id.venue,
1409 instrument.settlement_currency(),
1410 base_currency,
1411 PriceType::Mid,
1412 );
1413
1414 if let Some(xrate) = xrate {
1415 let scale = 10f64.powi(currency.precision.into());
1416 pnl = ((pnl * xrate) * scale).round() / scale;
1417 } else {
1418 log::error!(
1419 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1420 instrument.settlement_currency(),
1421 base_currency
1422 );
1423 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1424 return Some(Money::new(0.0, currency));
1425 }
1426 }
1427
1428 total_pnl += pnl;
1429 }
1430 }
1431 }
1432
1433 for position in positions {
1435 if position.instrument_id != *instrument_id {
1436 continue;
1437 }
1438
1439 if let Some(realized_pnl) = position.realized_pnl {
1440 let mut pnl = realized_pnl.as_f64();
1441
1442 if let Some(base_currency) = account.base_currency() {
1443 let xrate = if let Some(xrate) =
1444 self.calculate_xrate_to_base(instrument, account, position.entry)
1445 {
1446 xrate
1447 } else {
1448 log::error!(
1449 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1450 instrument.settlement_currency(),
1451 base_currency
1452 );
1453 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1454 return Some(Money::new(0.0, currency));
1455 };
1456
1457 let scale = 10f64.powi(currency.precision.into());
1458 pnl = ((pnl * xrate) * scale).round() / scale;
1459 }
1460
1461 total_pnl += pnl;
1462 }
1463 }
1464 } else {
1465 for position_id in &snapshot_position_ids {
1468 let sum_pnl = self
1469 .inner
1470 .borrow()
1471 .snapshot_sum_per_position
1472 .get(position_id)
1473 .copied();
1474 if let Some(sum_pnl) = sum_pnl {
1475 let mut pnl = sum_pnl.as_f64();
1476
1477 if let Some(base_currency) = account.base_currency() {
1478 let xrate = cache.get_xrate(
1479 instrument_id.venue,
1480 instrument.settlement_currency(),
1481 base_currency,
1482 PriceType::Mid,
1483 );
1484
1485 if let Some(xrate) = xrate {
1486 let scale = 10f64.powi(currency.precision.into());
1487 pnl = ((pnl * xrate) * scale).round() / scale;
1488 } else {
1489 log::error!(
1490 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1491 instrument.settlement_currency(),
1492 base_currency
1493 );
1494 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1495 return Some(Money::new(0.0, currency));
1496 }
1497 }
1498
1499 total_pnl += pnl;
1500 }
1501 }
1502
1503 for position in positions {
1505 if position.instrument_id != *instrument_id {
1506 continue;
1507 }
1508
1509 if let Some(realized_pnl) = position.realized_pnl {
1510 let mut pnl = realized_pnl.as_f64();
1511
1512 if let Some(base_currency) = account.base_currency() {
1513 let xrate = if let Some(xrate) =
1514 self.calculate_xrate_to_base(instrument, account, position.entry)
1515 {
1516 xrate
1517 } else {
1518 log::error!(
1519 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1520 instrument.settlement_currency(),
1521 base_currency
1522 );
1523 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1524 return Some(Money::new(0.0, currency));
1525 };
1526
1527 let scale = 10f64.powi(currency.precision.into());
1528 pnl = ((pnl * xrate) * scale).round() / scale;
1529 }
1530
1531 total_pnl += pnl;
1532 }
1533 }
1534 }
1535
1536 Some(Money::new(total_pnl, currency))
1537 }
1538
1539 fn get_price(&self, position: &Position) -> Option<Price> {
1540 let cache = self.cache.borrow();
1541 let instrument_id = &position.instrument_id;
1542
1543 if self.config.use_mark_prices
1545 && let Some(mark_price) = cache.mark_price(instrument_id)
1546 {
1547 return Some(mark_price.value);
1548 }
1549
1550 let price_type = match position.side {
1552 PositionSide::Long => PriceType::Bid,
1553 PositionSide::Short => PriceType::Ask,
1554 _ => panic!("invalid `PositionSide`, was {}", position.side),
1555 };
1556
1557 cache
1558 .price(instrument_id, price_type)
1559 .or_else(|| cache.price(instrument_id, PriceType::Last))
1560 .or_else(|| {
1561 self.inner
1562 .borrow()
1563 .bar_close_prices
1564 .get(instrument_id)
1565 .copied()
1566 })
1567 }
1568
1569 fn calculate_xrate_to_base(
1570 &self,
1571 instrument: &InstrumentAny,
1572 account: &AccountAny,
1573 side: OrderSide,
1574 ) -> Option<f64> {
1575 if !self.config.convert_to_account_base_currency {
1576 return Some(1.0); }
1578
1579 match account.base_currency() {
1580 None => Some(1.0), Some(base_currency) => {
1582 let cache = self.cache.borrow();
1583
1584 if self.config.use_mark_xrates {
1585 return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1586 }
1587
1588 let price_type = if side == OrderSide::Buy {
1589 PriceType::Bid
1590 } else {
1591 PriceType::Ask
1592 };
1593
1594 cache.get_xrate(
1595 instrument.id().venue,
1596 instrument.settlement_currency(),
1597 base_currency,
1598 price_type,
1599 )
1600 }
1601 }
1602 }
1603}
1604
1605fn update_quote_tick(
1607 cache: Rc<RefCell<Cache>>,
1608 clock: Rc<RefCell<dyn Clock>>,
1609 inner: Rc<RefCell<PortfolioState>>,
1610 config: PortfolioConfig,
1611 quote: &QuoteTick,
1612) {
1613 update_instrument_id(cache, clock.clone(), inner, config, "e.instrument_id);
1614}
1615
1616fn update_bar(
1617 cache: Rc<RefCell<Cache>>,
1618 clock: Rc<RefCell<dyn Clock>>,
1619 inner: Rc<RefCell<PortfolioState>>,
1620 config: PortfolioConfig,
1621 bar: &Bar,
1622) {
1623 let instrument_id = bar.bar_type.instrument_id();
1624 inner
1625 .borrow_mut()
1626 .bar_close_prices
1627 .insert(instrument_id, bar.close);
1628 update_instrument_id(cache, clock.clone(), inner, config, &instrument_id);
1629}
1630
1631fn update_instrument_id(
1632 cache: Rc<RefCell<Cache>>,
1633 clock: Rc<RefCell<dyn Clock>>,
1634 inner: Rc<RefCell<PortfolioState>>,
1635 config: PortfolioConfig,
1636 instrument_id: &InstrumentId,
1637) {
1638 inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1639
1640 if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1641 return;
1642 }
1643
1644 let result_init;
1645 let mut result_maint = None;
1646
1647 let account = {
1648 let account = {
1649 let cache_ref = cache.borrow();
1650 if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1651 account.clone()
1652 } else {
1653 log::error!(
1654 "Cannot update tick: no account registered for {}",
1655 instrument_id.venue
1656 );
1657 return;
1658 }
1659 };
1660
1661 let mut cache_ref = cache.borrow_mut();
1662 let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1663 instrument.clone()
1664 } else {
1665 log::error!("Cannot update tick: no instrument found for {instrument_id}");
1666 return;
1667 };
1668
1669 let orders_open: Vec<OrderAny> = cache_ref
1671 .orders_open(None, Some(instrument_id), None, None)
1672 .iter()
1673 .map(|o| (*o).clone())
1674 .collect();
1675
1676 let positions_open: Vec<Position> = cache_ref
1677 .positions_open(None, Some(instrument_id), None, None)
1678 .iter()
1679 .map(|p| (*p).clone())
1680 .collect();
1681
1682 result_init = inner.borrow().accounts.update_orders(
1683 &account,
1684 instrument.clone(),
1685 orders_open.iter().collect(),
1686 clock.borrow().timestamp_ns(),
1687 );
1688
1689 if let AccountAny::Margin(ref margin_account) = account {
1690 result_maint = inner.borrow().accounts.update_positions(
1691 margin_account,
1692 instrument,
1693 positions_open.iter().collect(),
1694 clock.borrow().timestamp_ns(),
1695 );
1696 }
1697
1698 if let Some((ref updated_account, _)) = result_init {
1699 cache_ref.update_account(updated_account.clone()).unwrap();
1700 }
1701 account
1702 };
1703
1704 let mut portfolio_clone = Portfolio {
1705 clock: clock.clone(),
1706 cache,
1707 inner: inner.clone(),
1708 config,
1709 };
1710
1711 let result_unrealized_pnl: Option<Money> =
1712 portfolio_clone.calculate_unrealized_pnl(instrument_id);
1713
1714 if result_init.is_some()
1715 && (matches!(account, AccountAny::Cash(_))
1716 || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1717 {
1718 inner.borrow_mut().pending_calcs.remove(instrument_id);
1719 if inner.borrow().pending_calcs.is_empty() {
1720 inner.borrow_mut().initialized = true;
1721 }
1722 }
1723}
1724
1725fn update_order(
1726 cache: Rc<RefCell<Cache>>,
1727 clock: Rc<RefCell<dyn Clock>>,
1728 inner: Rc<RefCell<PortfolioState>>,
1729 _config: PortfolioConfig,
1730 event: &OrderEventAny,
1731) {
1732 let cache_ref = cache.borrow();
1733 let account_id = match event.account_id() {
1734 Some(account_id) => account_id,
1735 None => {
1736 return; }
1738 };
1739
1740 let account = if let Some(account) = cache_ref.account(&account_id) {
1741 account
1742 } else {
1743 log::error!("Cannot update order: no account registered for {account_id}");
1744 return;
1745 };
1746
1747 match account {
1748 AccountAny::Cash(cash_account) => {
1749 if !cash_account.base.calculate_account_state {
1750 return;
1751 }
1752 }
1753 AccountAny::Margin(margin_account) => {
1754 if !margin_account.base.calculate_account_state {
1755 return;
1756 }
1757 }
1758 }
1759
1760 match event {
1761 OrderEventAny::Accepted(_)
1762 | OrderEventAny::Canceled(_)
1763 | OrderEventAny::Rejected(_)
1764 | OrderEventAny::Updated(_)
1765 | OrderEventAny::Filled(_) => {}
1766 _ => {
1767 return;
1768 }
1769 }
1770
1771 let cache_ref = cache.borrow();
1772 let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1773 order
1774 } else {
1775 log::error!(
1776 "Cannot update order: {} not found in the cache",
1777 event.client_order_id()
1778 );
1779 return; };
1781
1782 if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1783 return; }
1785
1786 let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1787 instrument_id
1788 } else {
1789 log::error!(
1790 "Cannot update order: no instrument found for {}",
1791 event.instrument_id()
1792 );
1793 return;
1794 };
1795
1796 if let OrderEventAny::Filled(order_filled) = event {
1797 let _ = inner.borrow().accounts.update_balances(
1798 account.clone(),
1799 instrument.clone(),
1800 *order_filled,
1801 );
1802
1803 let mut portfolio_clone = Portfolio {
1804 clock: clock.clone(),
1805 cache: cache.clone(),
1806 inner: inner.clone(),
1807 config: PortfolioConfig::default(), };
1809
1810 match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1811 Some(unrealized_pnl) => {
1812 inner
1813 .borrow_mut()
1814 .unrealized_pnls
1815 .insert(event.instrument_id(), unrealized_pnl);
1816 }
1817 None => {
1818 log::error!(
1819 "Failed to calculate unrealized PnL for instrument {}",
1820 event.instrument_id()
1821 );
1822 }
1823 }
1824 }
1825
1826 let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None);
1827
1828 let account_state = inner.borrow_mut().accounts.update_orders(
1829 account,
1830 instrument.clone(),
1831 orders_open,
1832 clock.borrow().timestamp_ns(),
1833 );
1834
1835 let mut cache_ref = cache.borrow_mut();
1836 cache_ref.update_account(account.clone()).unwrap();
1837
1838 if let Some(account_state) = account_state {
1839 msgbus::publish(
1840 format!("events.account.{}", account.id()).into(),
1841 &account_state,
1842 );
1843 } else {
1844 log::debug!("Added pending calculation for {}", instrument.id());
1845 inner.borrow_mut().pending_calcs.insert(instrument.id());
1846 }
1847
1848 log::debug!("Updated {event}");
1849}
1850
1851fn update_position(
1852 cache: Rc<RefCell<Cache>>,
1853 clock: Rc<RefCell<dyn Clock>>,
1854 inner: Rc<RefCell<PortfolioState>>,
1855 _config: PortfolioConfig,
1856 event: &PositionEvent,
1857) {
1858 let instrument_id = event.instrument_id();
1859
1860 let positions_open: Vec<Position> = {
1861 let cache_ref = cache.borrow();
1862
1863 cache_ref
1864 .positions_open(None, Some(&instrument_id), None, None)
1865 .iter()
1866 .map(|o| (*o).clone())
1867 .collect()
1868 };
1869
1870 log::debug!("position fresh from cache -> {positions_open:?}");
1871
1872 let mut portfolio_clone = Portfolio {
1873 clock: clock.clone(),
1874 cache: cache.clone(),
1875 inner: inner.clone(),
1876 config: PortfolioConfig::default(), };
1878
1879 portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1880
1881 if let Some(calculated_unrealized_pnl) =
1882 portfolio_clone.calculate_unrealized_pnl(&instrument_id)
1883 {
1884 inner
1885 .borrow_mut()
1886 .unrealized_pnls
1887 .insert(event.instrument_id(), calculated_unrealized_pnl);
1888 } else {
1889 log::warn!(
1890 "Failed to calculate unrealized PnL for {}, marking as pending",
1891 event.instrument_id()
1892 );
1893 inner
1894 .borrow_mut()
1895 .pending_calcs
1896 .insert(event.instrument_id());
1897 }
1898
1899 if let Some(calculated_realized_pnl) = portfolio_clone.calculate_realized_pnl(&instrument_id) {
1900 inner
1901 .borrow_mut()
1902 .realized_pnls
1903 .insert(event.instrument_id(), calculated_realized_pnl);
1904 } else {
1905 log::warn!(
1906 "Failed to calculate realized PnL for {}, marking as pending",
1907 event.instrument_id()
1908 );
1909 inner
1910 .borrow_mut()
1911 .pending_calcs
1912 .insert(event.instrument_id());
1913 }
1914
1915 let cache_ref = cache.borrow();
1916 let account = cache_ref.account(&event.account_id());
1917
1918 if let Some(AccountAny::Margin(margin_account)) = account {
1919 if !margin_account.calculate_account_state {
1920 return; }
1922
1923 let cache_ref = cache.borrow();
1924 let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1925 instrument
1926 } else {
1927 log::error!("Cannot update position: no instrument found for {instrument_id}");
1928 return;
1929 };
1930
1931 let result = inner.borrow_mut().accounts.update_positions(
1932 margin_account,
1933 instrument.clone(),
1934 positions_open.iter().collect(),
1935 clock.borrow().timestamp_ns(),
1936 );
1937 let mut cache_ref = cache.borrow_mut();
1938 if let Some((margin_account, _)) = result {
1939 cache_ref
1940 .update_account(AccountAny::Margin(margin_account))
1941 .unwrap();
1942 }
1943 } else if account.is_none() {
1944 log::error!(
1945 "Cannot update position: no account registered for {}",
1946 event.account_id()
1947 );
1948 }
1949}
1950
1951fn update_account(
1952 cache: Rc<RefCell<Cache>>,
1953 inner: Rc<RefCell<PortfolioState>>,
1954 event: &AccountState,
1955) {
1956 let mut cache_ref = cache.borrow_mut();
1957
1958 if let Some(existing) = cache_ref.account(&event.account_id) {
1959 let mut account = existing.clone();
1960 account.apply(event.clone());
1961
1962 if let Err(e) = cache_ref.update_account(account.clone()) {
1963 log::error!("Failed to update account: {e}");
1964 return;
1965 }
1966 } else {
1967 let account = match AccountAny::from_events(vec![event.clone()]) {
1968 Ok(account) => account,
1969 Err(e) => {
1970 log::error!("Failed to create account: {e}");
1971 return;
1972 }
1973 };
1974
1975 if let Err(e) = cache_ref.add_account(account) {
1976 log::error!("Failed to add account: {e}");
1977 return;
1978 }
1979 }
1980
1981 let mut inner_ref = inner.borrow_mut();
1983 let should_log = if inner_ref.min_account_state_logging_interval_ns > 0 {
1984 let current_ts = event.ts_init.as_u64();
1985 let last_ts = inner_ref
1986 .last_account_state_log_ts
1987 .get(&event.account_id)
1988 .copied()
1989 .unwrap_or(0);
1990
1991 if last_ts == 0 || (current_ts - last_ts) >= inner_ref.min_account_state_logging_interval_ns
1992 {
1993 inner_ref
1994 .last_account_state_log_ts
1995 .insert(event.account_id, current_ts);
1996 true
1997 } else {
1998 false
1999 }
2000 } else {
2001 true };
2003
2004 if should_log {
2005 log::info!("Updated {event}");
2006 }
2007}