1use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_analysis::analyzer::PortfolioAnalyzer;
22use nautilus_common::{
23 cache::Cache,
24 clock::Clock,
25 msgbus::{
26 self,
27 handler::{ShareableMessageHandler, TypedMessageHandler},
28 },
29};
30use nautilus_core::{WeakCell, datetime::NANOSECONDS_IN_MILLISECOND};
31use nautilus_model::{
32 accounts::AccountAny,
33 data::{Bar, MarkPriceUpdate, QuoteTick},
34 enums::{OmsType, OrderSide, OrderType, PositionSide, PriceType},
35 events::{AccountState, OrderEventAny, position::PositionEvent},
36 identifiers::{AccountId, InstrumentId, PositionId, Venue},
37 instruments::{Instrument, InstrumentAny},
38 orders::{Order, OrderAny},
39 position::Position,
40 types::{Currency, Money, Price},
41};
42use rust_decimal::{Decimal, prelude::FromPrimitive};
43
44use crate::{config::PortfolioConfig, manager::AccountsManager};
45
46struct PortfolioState {
47 accounts: AccountsManager,
48 analyzer: PortfolioAnalyzer,
49 unrealized_pnls: AHashMap<InstrumentId, Money>,
50 realized_pnls: AHashMap<InstrumentId, Money>,
51 snapshot_sum_per_position: AHashMap<PositionId, Money>,
52 snapshot_last_per_position: AHashMap<PositionId, Money>,
53 snapshot_processed_counts: AHashMap<PositionId, usize>,
54 net_positions: AHashMap<InstrumentId, Decimal>,
55 pending_calcs: AHashSet<InstrumentId>,
56 bar_close_prices: AHashMap<InstrumentId, Price>,
57 initialized: bool,
58 last_account_state_log_ts: AHashMap<AccountId, u64>,
59 min_account_state_logging_interval_ns: u64,
60}
61
62impl PortfolioState {
63 fn new(
64 clock: Rc<RefCell<dyn Clock>>,
65 cache: Rc<RefCell<Cache>>,
66 config: &PortfolioConfig,
67 ) -> Self {
68 let min_account_state_logging_interval_ns = config
69 .min_account_state_logging_interval_ms
70 .map_or(0, |ms| ms * NANOSECONDS_IN_MILLISECOND);
71
72 Self {
73 accounts: AccountsManager::new(clock, cache),
74 analyzer: PortfolioAnalyzer::default(),
75 unrealized_pnls: AHashMap::new(),
76 realized_pnls: AHashMap::new(),
77 snapshot_sum_per_position: AHashMap::new(),
78 snapshot_last_per_position: AHashMap::new(),
79 snapshot_processed_counts: AHashMap::new(),
80 net_positions: AHashMap::new(),
81 pending_calcs: AHashSet::new(),
82 bar_close_prices: AHashMap::new(),
83 initialized: false,
84 last_account_state_log_ts: AHashMap::new(),
85 min_account_state_logging_interval_ns,
86 }
87 }
88
89 fn reset(&mut self) {
90 log::debug!("RESETTING");
91 self.net_positions.clear();
92 self.unrealized_pnls.clear();
93 self.realized_pnls.clear();
94 self.snapshot_sum_per_position.clear();
95 self.snapshot_last_per_position.clear();
96 self.snapshot_processed_counts.clear();
97 self.pending_calcs.clear();
98 self.last_account_state_log_ts.clear();
99 self.analyzer.reset();
100 log::debug!("READY");
101 }
102}
103
104pub struct Portfolio {
105 pub(crate) clock: Rc<RefCell<dyn Clock>>,
106 pub(crate) cache: Rc<RefCell<Cache>>,
107 inner: Rc<RefCell<PortfolioState>>,
108 config: PortfolioConfig,
109}
110
111impl Debug for Portfolio {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct(stringify!(Portfolio)).finish()
114 }
115}
116
117impl Portfolio {
118 pub fn new(
119 cache: Rc<RefCell<Cache>>,
120 clock: Rc<RefCell<dyn Clock>>,
121 config: Option<PortfolioConfig>,
122 ) -> Self {
123 let config = config.unwrap_or_default();
124 let inner = Rc::new(RefCell::new(PortfolioState::new(
125 clock.clone(),
126 cache.clone(),
127 &config,
128 )));
129
130 Self::register_message_handlers(
131 cache.clone(),
132 clock.clone(),
133 inner.clone(),
134 config.clone(),
135 );
136
137 Self {
138 clock,
139 cache,
140 inner,
141 config,
142 }
143 }
144
145 fn register_message_handlers(
146 cache: Rc<RefCell<Cache>>,
147 clock: Rc<RefCell<dyn Clock>>,
148 inner: Rc<RefCell<PortfolioState>>,
149 config: PortfolioConfig,
150 ) {
151 let inner_weak = WeakCell::from(Rc::downgrade(&inner));
152
153 let update_account_handler = {
154 let cache = cache.clone();
155 let inner = inner_weak.clone();
156 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
157 move |event: &AccountState| {
158 if let Some(inner_rc) = inner.upgrade() {
159 update_account(cache.clone(), inner_rc.into(), event);
160 }
161 },
162 )))
163 };
164
165 let update_position_handler = {
166 let cache = cache.clone();
167 let clock = clock.clone();
168 let inner = inner_weak.clone();
169 let config = config.clone();
170 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
171 move |event: &PositionEvent| {
172 if let Some(inner_rc) = inner.upgrade() {
173 update_position(
174 cache.clone(),
175 clock.clone(),
176 inner_rc.into(),
177 config.clone(),
178 event,
179 );
180 }
181 },
182 )))
183 };
184
185 let update_quote_handler = {
186 let cache = cache.clone();
187 let clock = clock.clone();
188 let inner = inner_weak.clone();
189 let config = config.clone();
190 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
191 move |quote: &QuoteTick| {
192 if let Some(inner_rc) = inner.upgrade() {
193 update_quote_tick(
194 cache.clone(),
195 clock.clone(),
196 inner_rc.into(),
197 config.clone(),
198 quote,
199 );
200 }
201 },
202 )))
203 };
204
205 let update_bar_handler = {
206 let cache = cache.clone();
207 let clock = clock.clone();
208 let inner = inner_weak.clone();
209 let config = config.clone();
210 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
211 if let Some(inner_rc) = inner.upgrade() {
212 update_bar(
213 cache.clone(),
214 clock.clone(),
215 inner_rc.into(),
216 config.clone(),
217 bar,
218 );
219 }
220 })))
221 };
222
223 let update_mark_price_handler = {
224 let cache = cache.clone();
225 let clock = clock.clone();
226 let inner = inner_weak.clone();
227 let config = config.clone();
228 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
229 move |mark_price: &MarkPriceUpdate| {
230 if let Some(inner_rc) = inner.upgrade() {
231 update_instrument_id(
232 cache.clone(),
233 clock.clone(),
234 inner_rc.into(),
235 config.clone(),
236 &mark_price.instrument_id,
237 );
238 }
239 },
240 )))
241 };
242
243 let update_order_handler = {
244 let cache = cache;
245 let clock = clock.clone();
246 let inner = inner_weak;
247 let config = config.clone();
248 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
249 move |event: &OrderEventAny| {
250 if let Some(inner_rc) = inner.upgrade() {
251 update_order(
252 cache.clone(),
253 clock.clone(),
254 inner_rc.into(),
255 config.clone(),
256 event,
257 );
258 }
259 },
260 )))
261 };
262
263 msgbus::register(
264 "Portfolio.update_account".into(),
265 update_account_handler.clone(),
266 );
267
268 msgbus::subscribe("data.quotes.*".into(), update_quote_handler, Some(10));
269 if config.bar_updates {
270 msgbus::subscribe("data.bars.*EXTERNAL".into(), update_bar_handler, Some(10));
271 }
272 if config.use_mark_prices {
273 msgbus::subscribe(
274 "data.mark_prices.*".into(),
275 update_mark_price_handler,
276 Some(10),
277 );
278 }
279 msgbus::subscribe("events.order.*".into(), update_order_handler, Some(10));
280 msgbus::subscribe(
281 "events.position.*".into(),
282 update_position_handler,
283 Some(10),
284 );
285 msgbus::subscribe("events.account.*".into(), update_account_handler, Some(10));
286 }
287
288 pub fn reset(&mut self) {
289 log::debug!("RESETTING");
290 self.inner.borrow_mut().reset();
291 log::debug!("READY");
292 }
293
294 #[must_use]
298 pub fn is_initialized(&self) -> bool {
299 self.inner.borrow().initialized
300 }
301
302 #[must_use]
306 pub fn balances_locked(&self, venue: &Venue) -> AHashMap<Currency, Money> {
307 self.cache.borrow().account_for_venue(venue).map_or_else(
308 || {
309 log::error!("Cannot get balances locked: no account generated for {venue}");
310 AHashMap::new()
311 },
312 AccountAny::balances_locked,
313 )
314 }
315
316 #[must_use]
320 pub fn margins_init(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
321 self.cache.borrow().account_for_venue(venue).map_or_else(
322 || {
323 log::error!(
324 "Cannot get initial (order) margins: no account registered for {venue}"
325 );
326 AHashMap::new()
327 },
328 |account| match account {
329 AccountAny::Margin(margin_account) => margin_account.initial_margins(),
330 AccountAny::Cash(_) => {
331 log::warn!("Initial margins not applicable for cash account");
332 AHashMap::new()
333 }
334 },
335 )
336 }
337
338 #[must_use]
342 pub fn margins_maint(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
343 self.cache.borrow().account_for_venue(venue).map_or_else(
344 || {
345 log::error!(
346 "Cannot get maintenance (position) margins: no account registered for {venue}"
347 );
348 AHashMap::new()
349 },
350 |account| match account {
351 AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
352 AccountAny::Cash(_) => {
353 log::warn!("Maintenance margins not applicable for cash account");
354 AHashMap::new()
355 }
356 },
357 )
358 }
359
360 #[must_use]
364 pub fn unrealized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
365 let instrument_ids = {
366 let cache = self.cache.borrow();
367 let positions = cache.positions(Some(venue), None, None, None);
368
369 if positions.is_empty() {
370 return AHashMap::new(); }
372
373 let instrument_ids: AHashSet<InstrumentId> =
374 positions.iter().map(|p| p.instrument_id).collect();
375
376 instrument_ids
377 };
378
379 let mut unrealized_pnls: AHashMap<Currency, f64> = AHashMap::new();
380
381 for instrument_id in instrument_ids {
382 if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
383 *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
385 continue;
386 }
387
388 match self.calculate_unrealized_pnl(&instrument_id) {
390 Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
391 None => continue,
392 }
393 }
394
395 unrealized_pnls
396 .into_iter()
397 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
398 .collect()
399 }
400
401 #[must_use]
405 pub fn realized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
406 let instrument_ids = {
407 let cache = self.cache.borrow();
408 let positions = cache.positions(Some(venue), None, None, None);
409
410 if positions.is_empty() {
411 return AHashMap::new(); }
413
414 let instrument_ids: AHashSet<InstrumentId> =
415 positions.iter().map(|p| p.instrument_id).collect();
416
417 instrument_ids
418 };
419
420 let mut realized_pnls: AHashMap<Currency, f64> = AHashMap::new();
421
422 for instrument_id in instrument_ids {
423 if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
424 *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
426 continue;
427 }
428
429 match self.calculate_realized_pnl(&instrument_id) {
431 Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
432 None => continue,
433 }
434 }
435
436 realized_pnls
437 .into_iter()
438 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
439 .collect()
440 }
441
442 #[must_use]
443 pub fn net_exposures(&self, venue: &Venue) -> Option<AHashMap<Currency, Money>> {
444 let cache = self.cache.borrow();
445 let account = if let Some(account) = cache.account_for_venue(venue) {
446 account
447 } else {
448 log::error!("Cannot calculate net exposures: no account registered for {venue}");
449 return None; };
451
452 let positions_open = cache.positions_open(Some(venue), None, None, None);
453 if positions_open.is_empty() {
454 return Some(AHashMap::new()); }
456
457 let mut net_exposures: AHashMap<Currency, f64> = AHashMap::new();
458
459 for position in positions_open {
460 let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
461 instrument
462 } else {
463 log::error!(
464 "Cannot calculate net exposures: no instrument for {}",
465 position.instrument_id
466 );
467 return None; };
469
470 if position.side == PositionSide::Flat {
471 log::error!(
472 "Cannot calculate net exposures: position is flat for {}",
473 position.instrument_id
474 );
475 continue; }
477
478 let price = self.get_price(position)?;
479 let xrate = if let Some(xrate) =
480 self.calculate_xrate_to_base(instrument, account, position.entry)
481 {
482 xrate
483 } else {
484 log::error!(
485 "Cannot calculate net exposures: insufficient data for {}/{:?}",
487 instrument.settlement_currency(),
488 account.base_currency()
489 );
490 return None; };
492
493 let settlement_currency = account
494 .base_currency()
495 .unwrap_or_else(|| instrument.settlement_currency());
496
497 let net_exposure = instrument
498 .calculate_notional_value(position.quantity, price, None)
499 .as_f64()
500 * xrate;
501
502 let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
503 .round()
504 / 10f64.powi(settlement_currency.precision.into());
505
506 *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
507 }
508
509 Some(
510 net_exposures
511 .into_iter()
512 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
513 .collect(),
514 )
515 }
516
517 #[must_use]
518 pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
519 if let Some(pnl) = self
520 .inner
521 .borrow()
522 .unrealized_pnls
523 .get(instrument_id)
524 .copied()
525 {
526 return Some(pnl);
527 }
528
529 let pnl = self.calculate_unrealized_pnl(instrument_id)?;
530 self.inner
531 .borrow_mut()
532 .unrealized_pnls
533 .insert(*instrument_id, pnl);
534 Some(pnl)
535 }
536
537 #[must_use]
538 pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
539 if let Some(pnl) = self
540 .inner
541 .borrow()
542 .realized_pnls
543 .get(instrument_id)
544 .copied()
545 {
546 return Some(pnl);
547 }
548
549 let pnl = self.calculate_realized_pnl(instrument_id)?;
550 self.inner
551 .borrow_mut()
552 .realized_pnls
553 .insert(*instrument_id, pnl);
554 Some(pnl)
555 }
556
557 #[must_use]
561 pub fn total_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
562 let realized = self.realized_pnl(instrument_id)?;
563 let unrealized = self.unrealized_pnl(instrument_id)?;
564
565 if realized.currency != unrealized.currency {
566 log::error!(
567 "Cannot calculate total PnL: currency mismatch {} vs {}",
568 realized.currency,
569 unrealized.currency
570 );
571 return None;
572 }
573
574 Some(Money::new(
575 realized.as_f64() + unrealized.as_f64(),
576 realized.currency,
577 ))
578 }
579
580 #[must_use]
584 pub fn total_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
585 let realized_pnls = self.realized_pnls(venue);
586 let unrealized_pnls = self.unrealized_pnls(venue);
587
588 let mut total_pnls: AHashMap<Currency, Money> = AHashMap::new();
589
590 for (currency, realized) in realized_pnls {
592 total_pnls.insert(currency, realized);
593 }
594
595 for (currency, unrealized) in unrealized_pnls {
597 match total_pnls.get_mut(¤cy) {
598 Some(total) => {
599 *total = Money::new(total.as_f64() + unrealized.as_f64(), currency);
600 }
601 None => {
602 total_pnls.insert(currency, unrealized);
603 }
604 }
605 }
606
607 total_pnls
608 }
609
610 #[must_use]
611 pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
612 let cache = self.cache.borrow();
613 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
614 account
615 } else {
616 log::error!(
617 "Cannot calculate net exposure: no account registered for {}",
618 instrument_id.venue
619 );
620 return None;
621 };
622
623 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
624 instrument
625 } else {
626 log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
627 return None;
628 };
629
630 let positions_open = cache.positions_open(
631 None, Some(instrument_id),
633 None,
634 None,
635 );
636
637 if positions_open.is_empty() {
638 return Some(Money::new(0.0, instrument.settlement_currency()));
639 }
640
641 let mut net_exposure = 0.0;
642
643 for position in positions_open {
644 let price = self.get_price(position)?;
645 let xrate = if let Some(xrate) =
646 self.calculate_xrate_to_base(instrument, account, position.entry)
647 {
648 xrate
649 } else {
650 log::error!(
651 "Cannot calculate net exposures: insufficient data for {}/{:?}",
653 instrument.settlement_currency(),
654 account.base_currency()
655 );
656 return None; };
658
659 let notional_value =
660 instrument.calculate_notional_value(position.quantity, price, None);
661 net_exposure += notional_value.as_f64() * xrate;
662 }
663
664 let settlement_currency = account
665 .base_currency()
666 .unwrap_or_else(|| instrument.settlement_currency());
667
668 Some(Money::new(net_exposure, settlement_currency))
669 }
670
671 #[must_use]
672 pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
673 self.inner
674 .borrow()
675 .net_positions
676 .get(instrument_id)
677 .copied()
678 .unwrap_or(Decimal::ZERO)
679 }
680
681 #[must_use]
682 pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
683 self.inner
684 .borrow()
685 .net_positions
686 .get(instrument_id)
687 .copied()
688 .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
689 }
690
691 #[must_use]
692 pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
693 self.inner
694 .borrow()
695 .net_positions
696 .get(instrument_id)
697 .copied()
698 .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
699 }
700
701 #[must_use]
702 pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
703 self.inner
704 .borrow()
705 .net_positions
706 .get(instrument_id)
707 .copied()
708 .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
709 }
710
711 #[must_use]
712 pub fn is_completely_flat(&self) -> bool {
713 for net_position in self.inner.borrow().net_positions.values() {
714 if *net_position != Decimal::ZERO {
715 return false;
716 }
717 }
718 true
719 }
720
721 pub fn initialize_orders(&mut self) {
729 let mut initialized = true;
730 let orders_and_instruments = {
731 let cache = self.cache.borrow();
732 let all_orders_open = cache.orders_open(None, None, None, None);
733
734 let mut instruments_with_orders = Vec::new();
735 let mut instruments = AHashSet::new();
736
737 for order in &all_orders_open {
738 instruments.insert(order.instrument_id());
739 }
740
741 for instrument_id in instruments {
742 if let Some(instrument) = cache.instrument(&instrument_id) {
743 let orders = cache
744 .orders_open(None, Some(&instrument_id), None, None)
745 .into_iter()
746 .cloned()
747 .collect::<Vec<OrderAny>>();
748 instruments_with_orders.push((instrument.clone(), orders));
749 } else {
750 log::error!(
751 "Cannot update initial (order) margin: no instrument found for {instrument_id}"
752 );
753 initialized = false;
754 break;
755 }
756 }
757 instruments_with_orders
758 };
759
760 for (instrument, orders_open) in &orders_and_instruments {
761 let mut cache = self.cache.borrow_mut();
762 let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
763 account
764 } else {
765 log::error!(
766 "Cannot update initial (order) margin: no account registered for {}",
767 instrument.id().venue
768 );
769 initialized = false;
770 break;
771 };
772
773 let result = self.inner.borrow_mut().accounts.update_orders(
774 account,
775 instrument.clone(),
776 orders_open.iter().collect(),
777 self.clock.borrow().timestamp_ns(),
778 );
779
780 match result {
781 Some((updated_account, _)) => {
782 cache.update_account(updated_account).unwrap();
783 }
784 None => {
785 initialized = false;
786 }
787 }
788 }
789
790 let total_orders = orders_and_instruments
791 .into_iter()
792 .map(|(_, orders)| orders.len())
793 .sum::<usize>();
794
795 log::info!(
796 "Initialized {} open order{}",
797 total_orders,
798 if total_orders == 1 { "" } else { "s" }
799 );
800
801 self.inner.borrow_mut().initialized = initialized;
802 }
803
804 pub fn initialize_positions(&mut self) {
810 self.inner.borrow_mut().unrealized_pnls.clear();
811 self.inner.borrow_mut().realized_pnls.clear();
812 let all_positions_open: Vec<Position>;
813 let mut instruments = AHashSet::new();
814 {
815 let cache = self.cache.borrow();
816 all_positions_open = cache
817 .positions_open(None, None, None, None)
818 .into_iter()
819 .cloned()
820 .collect();
821 for position in &all_positions_open {
822 instruments.insert(position.instrument_id);
823 }
824 }
825
826 let mut initialized = true;
827
828 for instrument_id in instruments {
829 let positions_open: Vec<Position> = {
830 let cache = self.cache.borrow();
831 cache
832 .positions_open(None, Some(&instrument_id), None, None)
833 .into_iter()
834 .cloned()
835 .collect()
836 };
837
838 self.update_net_position(&instrument_id, positions_open);
839
840 if let Some(calculated_unrealized_pnl) = self.calculate_unrealized_pnl(&instrument_id) {
841 self.inner
842 .borrow_mut()
843 .unrealized_pnls
844 .insert(instrument_id, calculated_unrealized_pnl);
845 } else {
846 log::warn!(
847 "Failed to calculate unrealized PnL for {instrument_id}, marking as pending"
848 );
849 self.inner.borrow_mut().pending_calcs.insert(instrument_id);
850 }
851
852 if let Some(calculated_realized_pnl) = self.calculate_realized_pnl(&instrument_id) {
853 self.inner
854 .borrow_mut()
855 .realized_pnls
856 .insert(instrument_id, calculated_realized_pnl);
857 } else {
858 log::warn!(
859 "Failed to calculate realized PnL for {instrument_id}, marking as pending"
860 );
861 self.inner.borrow_mut().pending_calcs.insert(instrument_id);
862 }
863
864 let cache = self.cache.borrow();
865 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
866 account
867 } else {
868 log::error!(
869 "Cannot update maintenance (position) margin: no account registered for {}",
870 instrument_id.venue
871 );
872 initialized = false;
873 break;
874 };
875
876 let account = match account {
877 AccountAny::Cash(_) => continue,
878 AccountAny::Margin(margin_account) => margin_account,
879 };
880
881 let mut cache = self.cache.borrow_mut();
882 let instrument = if let Some(instrument) = cache.instrument(&instrument_id) {
883 instrument
884 } else {
885 log::error!(
886 "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
887 );
888 initialized = false;
889 break;
890 };
891
892 let result = self.inner.borrow_mut().accounts.update_positions(
893 account,
894 instrument.clone(),
895 self.cache
896 .borrow()
897 .positions_open(None, Some(&instrument_id), None, None),
898 self.clock.borrow().timestamp_ns(),
899 );
900
901 match result {
902 Some((updated_account, _)) => {
903 cache
904 .update_account(AccountAny::Margin(updated_account))
905 .unwrap();
906 }
907 None => {
908 initialized = false;
909 }
910 }
911 }
912
913 let open_count = all_positions_open.len();
914 self.inner.borrow_mut().initialized = initialized;
915 log::info!(
916 "Initialized {} open position{}",
917 open_count,
918 if open_count == 1 { "" } else { "s" }
919 );
920 }
921
922 pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
926 update_quote_tick(
927 self.cache.clone(),
928 self.clock.clone(),
929 self.inner.clone(),
930 self.config.clone(),
931 quote,
932 );
933 }
934
935 pub fn update_bar(&mut self, bar: &Bar) {
939 update_bar(
940 self.cache.clone(),
941 self.clock.clone(),
942 self.inner.clone(),
943 self.config.clone(),
944 bar,
945 );
946 }
947
948 pub fn update_account(&mut self, event: &AccountState) {
950 update_account(self.cache.clone(), self.inner.clone(), event);
951 }
952
953 pub fn update_order(&mut self, event: &OrderEventAny) {
957 update_order(
958 self.cache.clone(),
959 self.clock.clone(),
960 self.inner.clone(),
961 self.config.clone(),
962 event,
963 );
964 }
965
966 pub fn update_position(&mut self, event: &PositionEvent) {
970 update_position(
971 self.cache.clone(),
972 self.clock.clone(),
973 self.inner.clone(),
974 self.config.clone(),
975 event,
976 );
977 }
978
979 fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
982 let mut net_position = Decimal::ZERO;
983
984 for open_position in positions_open {
985 log::debug!("open_position: {open_position}");
986 net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
987 }
988
989 let existing_position = self.net_position(instrument_id);
990 if existing_position != net_position {
991 self.inner
992 .borrow_mut()
993 .net_positions
994 .insert(*instrument_id, net_position);
995 log::info!("{instrument_id} net_position={net_position}");
996 }
997 }
998
999 fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1000 let cache = self.cache.borrow();
1001 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1002 account
1003 } else {
1004 log::error!(
1005 "Cannot calculate unrealized PnL: no account registered for {}",
1006 instrument_id.venue
1007 );
1008 return None;
1009 };
1010
1011 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1012 instrument
1013 } else {
1014 log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
1015 return None;
1016 };
1017
1018 let currency = account
1019 .base_currency()
1020 .unwrap_or_else(|| instrument.settlement_currency());
1021
1022 let positions_open = cache.positions_open(
1023 None, Some(instrument_id),
1025 None,
1026 None,
1027 );
1028
1029 if positions_open.is_empty() {
1030 return Some(Money::new(0.0, currency));
1031 }
1032
1033 let mut total_pnl = 0.0;
1034
1035 for position in positions_open {
1036 if position.instrument_id != *instrument_id {
1037 continue; }
1039
1040 if position.side == PositionSide::Flat {
1041 continue; }
1043
1044 let price = if let Some(price) = self.get_price(position) {
1045 price
1046 } else {
1047 log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
1048 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1049 return None; };
1051
1052 let mut pnl = position.unrealized_pnl(price).as_f64();
1053
1054 if let Some(base_currency) = account.base_currency() {
1055 let xrate = if let Some(xrate) =
1056 self.calculate_xrate_to_base(instrument, account, position.entry)
1057 {
1058 xrate
1059 } else {
1060 log::error!(
1061 "Cannot calculate unrealized PnL: insufficient data for {}/{}",
1063 instrument.settlement_currency(),
1064 base_currency
1065 );
1066 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1067 return None; };
1069
1070 let scale = 10f64.powi(currency.precision.into());
1071 pnl = ((pnl * xrate) * scale).round() / scale;
1072 }
1073
1074 total_pnl += pnl;
1075 }
1076
1077 Some(Money::new(total_pnl, currency))
1078 }
1079
1080 fn ensure_snapshot_pnls_cached_for(&mut self, instrument_id: &InstrumentId) {
1081 let snapshot_position_ids = self.cache.borrow().position_snapshot_ids(instrument_id);
1087
1088 if snapshot_position_ids.is_empty() {
1089 return; }
1091
1092 let mut rebuild = false;
1093
1094 for position_id in &snapshot_position_ids {
1096 let position_snapshots = self.cache.borrow().position_snapshot_bytes(position_id);
1097 let curr_count = position_snapshots.map_or(0, |s| {
1098 s.split(|&b| b == b'{').count() - 1
1100 });
1101 let prev_count = self
1102 .inner
1103 .borrow()
1104 .snapshot_processed_counts
1105 .get(position_id)
1106 .copied()
1107 .unwrap_or(0);
1108
1109 if prev_count > curr_count {
1110 rebuild = true;
1111 break;
1112 }
1113 }
1114
1115 if rebuild {
1116 for position_id in &snapshot_position_ids {
1118 if let Some(position_snapshots) =
1119 self.cache.borrow().position_snapshot_bytes(position_id)
1120 {
1121 let mut sum_pnl: Option<Money> = None;
1122 let mut last_pnl: Option<Money> = None;
1123
1124 let mut start = 0;
1126 let mut depth = 0;
1127 let mut in_string = false;
1128 let mut escape_next = false;
1129
1130 for (i, &byte) in position_snapshots.iter().enumerate() {
1131 if escape_next {
1132 escape_next = false;
1133 continue;
1134 }
1135
1136 if byte == b'\\' && in_string {
1137 escape_next = true;
1138 continue;
1139 }
1140
1141 if byte == b'"' && !escape_next {
1142 in_string = !in_string;
1143 }
1144
1145 if !in_string {
1146 if byte == b'{' {
1147 if depth == 0 {
1148 start = i;
1149 }
1150 depth += 1;
1151 } else if byte == b'}' {
1152 depth -= 1;
1153 if depth == 0
1154 && let Ok(snapshot) = serde_json::from_slice::<Position>(
1155 &position_snapshots[start..=i],
1156 )
1157 && let Some(realized_pnl) = snapshot.realized_pnl
1158 {
1159 if let Some(ref mut sum) = sum_pnl {
1160 if sum.currency == realized_pnl.currency {
1161 *sum = Money::new(
1162 sum.as_f64() + realized_pnl.as_f64(),
1163 sum.currency,
1164 );
1165 }
1166 } else {
1167 sum_pnl = Some(realized_pnl);
1168 }
1169 last_pnl = Some(realized_pnl);
1170 }
1171 }
1172 }
1173 }
1174
1175 let mut inner = self.inner.borrow_mut();
1176 if let Some(sum) = sum_pnl {
1177 inner.snapshot_sum_per_position.insert(*position_id, sum);
1178 if let Some(last) = last_pnl {
1179 inner.snapshot_last_per_position.insert(*position_id, last);
1180 }
1181 } else {
1182 inner.snapshot_sum_per_position.remove(position_id);
1183 inner.snapshot_last_per_position.remove(position_id);
1184 }
1185
1186 let snapshot_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1187 inner
1188 .snapshot_processed_counts
1189 .insert(*position_id, snapshot_count);
1190 }
1191 }
1192 } else {
1193 for position_id in &snapshot_position_ids {
1195 if let Some(position_snapshots) =
1196 self.cache.borrow().position_snapshot_bytes(position_id)
1197 {
1198 let curr_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1199 let prev_count = self
1200 .inner
1201 .borrow()
1202 .snapshot_processed_counts
1203 .get(position_id)
1204 .copied()
1205 .unwrap_or(0);
1206
1207 if prev_count >= curr_count {
1208 continue;
1209 }
1210
1211 let mut sum_pnl = self
1212 .inner
1213 .borrow()
1214 .snapshot_sum_per_position
1215 .get(position_id)
1216 .copied();
1217 let mut last_pnl = self
1218 .inner
1219 .borrow()
1220 .snapshot_last_per_position
1221 .get(position_id)
1222 .copied();
1223
1224 let mut start = 0;
1226 let mut depth = 0;
1227 let mut in_string = false;
1228 let mut escape_next = false;
1229 let mut snapshot_index = 0;
1230
1231 for (i, &byte) in position_snapshots.iter().enumerate() {
1232 if escape_next {
1233 escape_next = false;
1234 continue;
1235 }
1236
1237 if byte == b'\\' && in_string {
1238 escape_next = true;
1239 continue;
1240 }
1241
1242 if byte == b'"' && !escape_next {
1243 in_string = !in_string;
1244 }
1245
1246 if !in_string {
1247 if byte == b'{' {
1248 if depth == 0 {
1249 start = i;
1250 }
1251 depth += 1;
1252 } else if byte == b'}' {
1253 depth -= 1;
1254 if depth == 0 {
1255 snapshot_index += 1;
1256 if snapshot_index > prev_count
1258 && let Ok(snapshot) = serde_json::from_slice::<Position>(
1259 &position_snapshots[start..=i],
1260 )
1261 && let Some(realized_pnl) = snapshot.realized_pnl
1262 {
1263 if let Some(ref mut sum) = sum_pnl {
1264 if sum.currency == realized_pnl.currency {
1265 *sum = Money::new(
1266 sum.as_f64() + realized_pnl.as_f64(),
1267 sum.currency,
1268 );
1269 }
1270 } else {
1271 sum_pnl = Some(realized_pnl);
1272 }
1273 last_pnl = Some(realized_pnl);
1274 }
1275 }
1276 }
1277 }
1278 }
1279
1280 let mut inner = self.inner.borrow_mut();
1281 if let Some(sum) = sum_pnl {
1282 inner.snapshot_sum_per_position.insert(*position_id, sum);
1283 if let Some(last) = last_pnl {
1284 inner.snapshot_last_per_position.insert(*position_id, last);
1285 }
1286 }
1287 inner
1288 .snapshot_processed_counts
1289 .insert(*position_id, curr_count);
1290 }
1291 }
1292 }
1293 }
1294
1295 fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1296 self.ensure_snapshot_pnls_cached_for(instrument_id);
1298
1299 let cache = self.cache.borrow();
1300 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1301 account
1302 } else {
1303 log::error!(
1304 "Cannot calculate realized PnL: no account registered for {}",
1305 instrument_id.venue
1306 );
1307 return None;
1308 };
1309
1310 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1311 instrument
1312 } else {
1313 log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
1314 return None;
1315 };
1316
1317 let currency = account
1318 .base_currency()
1319 .unwrap_or_else(|| instrument.settlement_currency());
1320
1321 let positions = cache.positions(
1322 None, Some(instrument_id),
1324 None,
1325 None,
1326 );
1327
1328 let snapshot_position_ids = cache.position_snapshot_ids(instrument_id);
1329
1330 let is_netting = positions
1332 .iter()
1333 .any(|p| cache.oms_type(&p.id) == Some(OmsType::Netting));
1334
1335 let mut total_pnl = 0.0;
1336
1337 if is_netting && !snapshot_position_ids.is_empty() {
1338 for position_id in &snapshot_position_ids {
1341 let is_active = positions.iter().any(|p| p.id == *position_id);
1342
1343 if is_active {
1344 let last_pnl = self
1346 .inner
1347 .borrow()
1348 .snapshot_last_per_position
1349 .get(position_id)
1350 .copied();
1351 if let Some(last_pnl) = last_pnl {
1352 let mut pnl = last_pnl.as_f64();
1353
1354 if let Some(base_currency) = account.base_currency()
1355 && let Some(position) = positions.iter().find(|p| p.id == *position_id)
1356 {
1357 let xrate = if let Some(xrate) =
1358 self.calculate_xrate_to_base(instrument, account, position.entry)
1359 {
1360 xrate
1361 } else {
1362 log::error!(
1363 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1364 instrument.settlement_currency(),
1365 base_currency
1366 );
1367 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1368 return Some(Money::new(0.0, currency));
1369 };
1370
1371 let scale = 10f64.powi(currency.precision.into());
1372 pnl = ((pnl * xrate) * scale).round() / scale;
1373 }
1374
1375 total_pnl += pnl;
1376 }
1377 } else {
1378 let sum_pnl = self
1380 .inner
1381 .borrow()
1382 .snapshot_sum_per_position
1383 .get(position_id)
1384 .copied();
1385 if let Some(sum_pnl) = sum_pnl {
1386 let mut pnl = sum_pnl.as_f64();
1387
1388 if let Some(base_currency) = account.base_currency() {
1389 let xrate = cache.get_xrate(
1391 instrument_id.venue,
1392 instrument.settlement_currency(),
1393 base_currency,
1394 PriceType::Mid,
1395 );
1396
1397 if let Some(xrate) = xrate {
1398 let scale = 10f64.powi(currency.precision.into());
1399 pnl = ((pnl * xrate) * scale).round() / scale;
1400 } else {
1401 log::error!(
1402 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1403 instrument.settlement_currency(),
1404 base_currency
1405 );
1406 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1407 return Some(Money::new(0.0, currency));
1408 }
1409 }
1410
1411 total_pnl += pnl;
1412 }
1413 }
1414 }
1415
1416 for position in positions {
1418 if position.instrument_id != *instrument_id {
1419 continue;
1420 }
1421
1422 if let Some(realized_pnl) = position.realized_pnl {
1423 let mut pnl = realized_pnl.as_f64();
1424
1425 if let Some(base_currency) = account.base_currency() {
1426 let xrate = if let Some(xrate) =
1427 self.calculate_xrate_to_base(instrument, account, position.entry)
1428 {
1429 xrate
1430 } else {
1431 log::error!(
1432 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1433 instrument.settlement_currency(),
1434 base_currency
1435 );
1436 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1437 return Some(Money::new(0.0, currency));
1438 };
1439
1440 let scale = 10f64.powi(currency.precision.into());
1441 pnl = ((pnl * xrate) * scale).round() / scale;
1442 }
1443
1444 total_pnl += pnl;
1445 }
1446 }
1447 } else {
1448 for position_id in &snapshot_position_ids {
1451 let sum_pnl = self
1452 .inner
1453 .borrow()
1454 .snapshot_sum_per_position
1455 .get(position_id)
1456 .copied();
1457 if let Some(sum_pnl) = sum_pnl {
1458 let mut pnl = sum_pnl.as_f64();
1459
1460 if let Some(base_currency) = account.base_currency() {
1461 let xrate = cache.get_xrate(
1462 instrument_id.venue,
1463 instrument.settlement_currency(),
1464 base_currency,
1465 PriceType::Mid,
1466 );
1467
1468 if let Some(xrate) = xrate {
1469 let scale = 10f64.powi(currency.precision.into());
1470 pnl = ((pnl * xrate) * scale).round() / scale;
1471 } else {
1472 log::error!(
1473 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1474 instrument.settlement_currency(),
1475 base_currency
1476 );
1477 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1478 return Some(Money::new(0.0, currency));
1479 }
1480 }
1481
1482 total_pnl += pnl;
1483 }
1484 }
1485
1486 for position in positions {
1488 if position.instrument_id != *instrument_id {
1489 continue;
1490 }
1491
1492 if let Some(realized_pnl) = position.realized_pnl {
1493 let mut pnl = realized_pnl.as_f64();
1494
1495 if let Some(base_currency) = account.base_currency() {
1496 let xrate = if let Some(xrate) =
1497 self.calculate_xrate_to_base(instrument, account, position.entry)
1498 {
1499 xrate
1500 } else {
1501 log::error!(
1502 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1503 instrument.settlement_currency(),
1504 base_currency
1505 );
1506 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1507 return Some(Money::new(0.0, currency));
1508 };
1509
1510 let scale = 10f64.powi(currency.precision.into());
1511 pnl = ((pnl * xrate) * scale).round() / scale;
1512 }
1513
1514 total_pnl += pnl;
1515 }
1516 }
1517 }
1518
1519 Some(Money::new(total_pnl, currency))
1520 }
1521
1522 fn get_price(&self, position: &Position) -> Option<Price> {
1523 let cache = self.cache.borrow();
1524 let instrument_id = &position.instrument_id;
1525
1526 if self.config.use_mark_prices
1528 && let Some(mark_price) = cache.mark_price(instrument_id)
1529 {
1530 return Some(mark_price.value);
1531 }
1532
1533 let price_type = match position.side {
1535 PositionSide::Long => PriceType::Bid,
1536 PositionSide::Short => PriceType::Ask,
1537 _ => panic!("invalid `PositionSide`, was {}", position.side),
1538 };
1539
1540 cache
1541 .price(instrument_id, price_type)
1542 .or_else(|| cache.price(instrument_id, PriceType::Last))
1543 .or_else(|| {
1544 self.inner
1545 .borrow()
1546 .bar_close_prices
1547 .get(instrument_id)
1548 .copied()
1549 })
1550 }
1551
1552 fn calculate_xrate_to_base(
1553 &self,
1554 instrument: &InstrumentAny,
1555 account: &AccountAny,
1556 side: OrderSide,
1557 ) -> Option<f64> {
1558 if !self.config.convert_to_account_base_currency {
1559 return Some(1.0); }
1561
1562 match account.base_currency() {
1563 None => Some(1.0), Some(base_currency) => {
1565 let cache = self.cache.borrow();
1566
1567 if self.config.use_mark_xrates {
1568 return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1569 }
1570
1571 let price_type = if side == OrderSide::Buy {
1572 PriceType::Bid
1573 } else {
1574 PriceType::Ask
1575 };
1576
1577 cache.get_xrate(
1578 instrument.id().venue,
1579 instrument.settlement_currency(),
1580 base_currency,
1581 price_type,
1582 )
1583 }
1584 }
1585 }
1586}
1587
1588fn update_quote_tick(
1590 cache: Rc<RefCell<Cache>>,
1591 clock: Rc<RefCell<dyn Clock>>,
1592 inner: Rc<RefCell<PortfolioState>>,
1593 config: PortfolioConfig,
1594 quote: &QuoteTick,
1595) {
1596 update_instrument_id(cache, clock.clone(), inner, config, "e.instrument_id);
1597}
1598
1599fn update_bar(
1600 cache: Rc<RefCell<Cache>>,
1601 clock: Rc<RefCell<dyn Clock>>,
1602 inner: Rc<RefCell<PortfolioState>>,
1603 config: PortfolioConfig,
1604 bar: &Bar,
1605) {
1606 let instrument_id = bar.bar_type.instrument_id();
1607 inner
1608 .borrow_mut()
1609 .bar_close_prices
1610 .insert(instrument_id, bar.close);
1611 update_instrument_id(cache, clock.clone(), inner, config, &instrument_id);
1612}
1613
1614fn update_instrument_id(
1615 cache: Rc<RefCell<Cache>>,
1616 clock: Rc<RefCell<dyn Clock>>,
1617 inner: Rc<RefCell<PortfolioState>>,
1618 config: PortfolioConfig,
1619 instrument_id: &InstrumentId,
1620) {
1621 inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1622
1623 if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1624 return;
1625 }
1626
1627 let result_init;
1628 let mut result_maint = None;
1629
1630 let account = {
1631 let cache_ref = cache.borrow();
1632 let account = if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1633 account
1634 } else {
1635 log::error!(
1636 "Cannot update tick: no account registered for {}",
1637 instrument_id.venue
1638 );
1639 return;
1640 };
1641
1642 let mut cache_ref = cache.borrow_mut();
1643 let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1644 instrument.clone()
1645 } else {
1646 log::error!("Cannot update tick: no instrument found for {instrument_id}");
1647 return;
1648 };
1649
1650 let orders_open: Vec<OrderAny> = cache_ref
1652 .orders_open(None, Some(instrument_id), None, None)
1653 .iter()
1654 .map(|o| (*o).clone())
1655 .collect();
1656
1657 let positions_open: Vec<Position> = cache_ref
1658 .positions_open(None, Some(instrument_id), None, None)
1659 .iter()
1660 .map(|p| (*p).clone())
1661 .collect();
1662
1663 result_init = inner.borrow().accounts.update_orders(
1664 account,
1665 instrument.clone(),
1666 orders_open.iter().collect(),
1667 clock.borrow().timestamp_ns(),
1668 );
1669
1670 if let AccountAny::Margin(margin_account) = account {
1671 result_maint = inner.borrow().accounts.update_positions(
1672 margin_account,
1673 instrument,
1674 positions_open.iter().collect(),
1675 clock.borrow().timestamp_ns(),
1676 );
1677 }
1678
1679 if let Some((ref updated_account, _)) = result_init {
1680 cache_ref.update_account(updated_account.clone()).unwrap();
1681 }
1682 account.clone()
1683 };
1684
1685 let mut portfolio_clone = Portfolio {
1686 clock: clock.clone(),
1687 cache,
1688 inner: inner.clone(),
1689 config,
1690 };
1691
1692 let result_unrealized_pnl: Option<Money> =
1693 portfolio_clone.calculate_unrealized_pnl(instrument_id);
1694
1695 if result_init.is_some()
1696 && (matches!(account, AccountAny::Cash(_))
1697 || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1698 {
1699 inner.borrow_mut().pending_calcs.remove(instrument_id);
1700 if inner.borrow().pending_calcs.is_empty() {
1701 inner.borrow_mut().initialized = true;
1702 }
1703 }
1704}
1705
1706fn update_order(
1707 cache: Rc<RefCell<Cache>>,
1708 clock: Rc<RefCell<dyn Clock>>,
1709 inner: Rc<RefCell<PortfolioState>>,
1710 _config: PortfolioConfig,
1711 event: &OrderEventAny,
1712) {
1713 let cache_ref = cache.borrow();
1714 let account_id = match event.account_id() {
1715 Some(account_id) => account_id,
1716 None => {
1717 return; }
1719 };
1720
1721 let account = if let Some(account) = cache_ref.account(&account_id) {
1722 account
1723 } else {
1724 log::error!("Cannot update order: no account registered for {account_id}");
1725 return;
1726 };
1727
1728 match account {
1729 AccountAny::Cash(cash_account) => {
1730 if !cash_account.base.calculate_account_state {
1731 return;
1732 }
1733 }
1734 AccountAny::Margin(margin_account) => {
1735 if !margin_account.base.calculate_account_state {
1736 return;
1737 }
1738 }
1739 }
1740
1741 match event {
1742 OrderEventAny::Accepted(_)
1743 | OrderEventAny::Canceled(_)
1744 | OrderEventAny::Rejected(_)
1745 | OrderEventAny::Updated(_)
1746 | OrderEventAny::Filled(_) => {}
1747 _ => {
1748 return;
1749 }
1750 }
1751
1752 let cache_ref = cache.borrow();
1753 let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1754 order
1755 } else {
1756 log::error!(
1757 "Cannot update order: {} not found in the cache",
1758 event.client_order_id()
1759 );
1760 return; };
1762
1763 if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1764 return; }
1766
1767 let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1768 instrument_id
1769 } else {
1770 log::error!(
1771 "Cannot update order: no instrument found for {}",
1772 event.instrument_id()
1773 );
1774 return;
1775 };
1776
1777 if let OrderEventAny::Filled(order_filled) = event {
1778 let _ = inner.borrow().accounts.update_balances(
1779 account.clone(),
1780 instrument.clone(),
1781 *order_filled,
1782 );
1783
1784 let mut portfolio_clone = Portfolio {
1785 clock: clock.clone(),
1786 cache: cache.clone(),
1787 inner: inner.clone(),
1788 config: PortfolioConfig::default(), };
1790
1791 match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1792 Some(unrealized_pnl) => {
1793 inner
1794 .borrow_mut()
1795 .unrealized_pnls
1796 .insert(event.instrument_id(), unrealized_pnl);
1797 }
1798 None => {
1799 log::error!(
1800 "Failed to calculate unrealized PnL for instrument {}",
1801 event.instrument_id()
1802 );
1803 }
1804 }
1805 }
1806
1807 let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None);
1808
1809 let account_state = inner.borrow_mut().accounts.update_orders(
1810 account,
1811 instrument.clone(),
1812 orders_open,
1813 clock.borrow().timestamp_ns(),
1814 );
1815
1816 let mut cache_ref = cache.borrow_mut();
1817 cache_ref.update_account(account.clone()).unwrap();
1818
1819 if let Some(account_state) = account_state {
1820 msgbus::publish(
1821 format!("events.account.{}", account.id()).into(),
1822 &account_state,
1823 );
1824 } else {
1825 log::debug!("Added pending calculation for {}", instrument.id());
1826 inner.borrow_mut().pending_calcs.insert(instrument.id());
1827 }
1828
1829 log::debug!("Updated {event}");
1830}
1831
1832fn update_position(
1833 cache: Rc<RefCell<Cache>>,
1834 clock: Rc<RefCell<dyn Clock>>,
1835 inner: Rc<RefCell<PortfolioState>>,
1836 _config: PortfolioConfig,
1837 event: &PositionEvent,
1838) {
1839 let instrument_id = event.instrument_id();
1840
1841 let positions_open: Vec<Position> = {
1842 let cache_ref = cache.borrow();
1843
1844 cache_ref
1845 .positions_open(None, Some(&instrument_id), None, None)
1846 .iter()
1847 .map(|o| (*o).clone())
1848 .collect()
1849 };
1850
1851 log::debug!("position fresh from cache -> {positions_open:?}");
1852
1853 let mut portfolio_clone = Portfolio {
1854 clock: clock.clone(),
1855 cache: cache.clone(),
1856 inner: inner.clone(),
1857 config: PortfolioConfig::default(), };
1859
1860 portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1861
1862 if let Some(calculated_unrealized_pnl) =
1863 portfolio_clone.calculate_unrealized_pnl(&instrument_id)
1864 {
1865 inner
1866 .borrow_mut()
1867 .unrealized_pnls
1868 .insert(event.instrument_id(), calculated_unrealized_pnl);
1869 } else {
1870 log::warn!(
1871 "Failed to calculate unrealized PnL for {}, marking as pending",
1872 event.instrument_id()
1873 );
1874 inner
1875 .borrow_mut()
1876 .pending_calcs
1877 .insert(event.instrument_id());
1878 }
1879
1880 if let Some(calculated_realized_pnl) = portfolio_clone.calculate_realized_pnl(&instrument_id) {
1881 inner
1882 .borrow_mut()
1883 .realized_pnls
1884 .insert(event.instrument_id(), calculated_realized_pnl);
1885 } else {
1886 log::warn!(
1887 "Failed to calculate realized PnL for {}, marking as pending",
1888 event.instrument_id()
1889 );
1890 inner
1891 .borrow_mut()
1892 .pending_calcs
1893 .insert(event.instrument_id());
1894 }
1895
1896 let cache_ref = cache.borrow();
1897 let account = cache_ref.account(&event.account_id());
1898
1899 if let Some(AccountAny::Margin(margin_account)) = account {
1900 if !margin_account.calculate_account_state {
1901 return; }
1903
1904 let cache_ref = cache.borrow();
1905 let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1906 instrument
1907 } else {
1908 log::error!("Cannot update position: no instrument found for {instrument_id}");
1909 return;
1910 };
1911
1912 let result = inner.borrow_mut().accounts.update_positions(
1913 margin_account,
1914 instrument.clone(),
1915 positions_open.iter().collect(),
1916 clock.borrow().timestamp_ns(),
1917 );
1918 let mut cache_ref = cache.borrow_mut();
1919 if let Some((margin_account, _)) = result {
1920 cache_ref
1921 .update_account(AccountAny::Margin(margin_account))
1922 .unwrap();
1923 }
1924 } else if account.is_none() {
1925 log::error!(
1926 "Cannot update position: no account registered for {}",
1927 event.account_id()
1928 );
1929 }
1930}
1931
1932fn update_account(
1933 cache: Rc<RefCell<Cache>>,
1934 inner: Rc<RefCell<PortfolioState>>,
1935 event: &AccountState,
1936) {
1937 let mut cache_ref = cache.borrow_mut();
1938
1939 if let Some(existing) = cache_ref.account(&event.account_id) {
1940 let mut account = existing.clone();
1941 account.apply(event.clone());
1942
1943 if let Err(e) = cache_ref.update_account(account.clone()) {
1944 log::error!("Failed to update account: {e}");
1945 return;
1946 }
1947 } else {
1948 let account = match AccountAny::from_events(vec![event.clone()]) {
1949 Ok(account) => account,
1950 Err(e) => {
1951 log::error!("Failed to create account: {e}");
1952 return;
1953 }
1954 };
1955
1956 if let Err(e) = cache_ref.add_account(account) {
1957 log::error!("Failed to add account: {e}");
1958 return;
1959 }
1960 }
1961
1962 let mut inner_ref = inner.borrow_mut();
1964 let should_log = if inner_ref.min_account_state_logging_interval_ns > 0 {
1965 let current_ts = event.ts_init.as_u64();
1966 let last_ts = inner_ref
1967 .last_account_state_log_ts
1968 .get(&event.account_id)
1969 .copied()
1970 .unwrap_or(0);
1971
1972 if last_ts == 0 || (current_ts - last_ts) >= inner_ref.min_account_state_logging_interval_ns
1973 {
1974 inner_ref
1975 .last_account_state_log_ts
1976 .insert(event.account_id, current_ts);
1977 true
1978 } else {
1979 false
1980 }
1981 } else {
1982 true };
1984
1985 if should_log {
1986 log::info!("Updated {event}");
1987 }
1988}