1use std::{
19 cell::RefCell,
20 collections::{HashMap, HashSet},
21 fmt::Debug,
22 rc::Rc,
23};
24
25use nautilus_analysis::analyzer::PortfolioAnalyzer;
26use nautilus_common::{
27 cache::Cache,
28 clock::Clock,
29 msgbus::{
30 self,
31 handler::{ShareableMessageHandler, TypedMessageHandler},
32 },
33};
34use nautilus_core::{WeakCell, datetime::NANOSECONDS_IN_MILLISECOND};
35use nautilus_model::{
36 accounts::AccountAny,
37 data::{Bar, MarkPriceUpdate, QuoteTick},
38 enums::{OmsType, OrderSide, OrderType, PositionSide, PriceType},
39 events::{AccountState, OrderEventAny, position::PositionEvent},
40 identifiers::{AccountId, InstrumentId, PositionId, Venue},
41 instruments::{Instrument, InstrumentAny},
42 orders::{Order, OrderAny},
43 position::Position,
44 types::{Currency, Money, Price},
45};
46use rust_decimal::{Decimal, prelude::FromPrimitive};
47
48use crate::{config::PortfolioConfig, manager::AccountsManager};
49
50struct PortfolioState {
51 accounts: AccountsManager,
52 analyzer: PortfolioAnalyzer,
53 unrealized_pnls: HashMap<InstrumentId, Money>,
54 realized_pnls: HashMap<InstrumentId, Money>,
55 snapshot_sum_per_position: HashMap<PositionId, Money>,
56 snapshot_last_per_position: HashMap<PositionId, Money>,
57 snapshot_processed_counts: HashMap<PositionId, usize>,
58 net_positions: HashMap<InstrumentId, Decimal>,
59 pending_calcs: HashSet<InstrumentId>,
60 bar_close_prices: HashMap<InstrumentId, Price>,
61 initialized: bool,
62 last_account_state_log_ts: HashMap<AccountId, u64>,
63 min_account_state_logging_interval_ns: u64,
64}
65
66impl PortfolioState {
67 fn new(
68 clock: Rc<RefCell<dyn Clock>>,
69 cache: Rc<RefCell<Cache>>,
70 config: &PortfolioConfig,
71 ) -> Self {
72 let min_account_state_logging_interval_ns = config
73 .min_account_state_logging_interval_ms
74 .map_or(0, |ms| ms * NANOSECONDS_IN_MILLISECOND);
75
76 Self {
77 accounts: AccountsManager::new(clock, cache),
78 analyzer: PortfolioAnalyzer::default(),
79 unrealized_pnls: HashMap::new(),
80 realized_pnls: HashMap::new(),
81 snapshot_sum_per_position: HashMap::new(),
82 snapshot_last_per_position: HashMap::new(),
83 snapshot_processed_counts: HashMap::new(),
84 net_positions: HashMap::new(),
85 pending_calcs: HashSet::new(),
86 bar_close_prices: HashMap::new(),
87 initialized: false,
88 last_account_state_log_ts: HashMap::new(),
89 min_account_state_logging_interval_ns,
90 }
91 }
92
93 fn reset(&mut self) {
94 log::debug!("RESETTING");
95 self.net_positions.clear();
96 self.unrealized_pnls.clear();
97 self.realized_pnls.clear();
98 self.snapshot_sum_per_position.clear();
99 self.snapshot_last_per_position.clear();
100 self.snapshot_processed_counts.clear();
101 self.pending_calcs.clear();
102 self.last_account_state_log_ts.clear();
103 self.analyzer.reset();
104 log::debug!("READY");
105 }
106}
107
108pub struct Portfolio {
109 pub(crate) clock: Rc<RefCell<dyn Clock>>,
110 pub(crate) cache: Rc<RefCell<Cache>>,
111 inner: Rc<RefCell<PortfolioState>>,
112 config: PortfolioConfig,
113}
114
115impl Debug for Portfolio {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 f.debug_struct(stringify!(Portfolio)).finish()
118 }
119}
120
121impl Portfolio {
122 pub fn new(
123 cache: Rc<RefCell<Cache>>,
124 clock: Rc<RefCell<dyn Clock>>,
125 config: Option<PortfolioConfig>,
126 ) -> Self {
127 let config = config.unwrap_or_default();
128 let inner = Rc::new(RefCell::new(PortfolioState::new(
129 clock.clone(),
130 cache.clone(),
131 &config,
132 )));
133
134 Self::register_message_handlers(
135 cache.clone(),
136 clock.clone(),
137 inner.clone(),
138 config.clone(),
139 );
140
141 Self {
142 clock,
143 cache,
144 inner,
145 config,
146 }
147 }
148
149 fn register_message_handlers(
150 cache: Rc<RefCell<Cache>>,
151 clock: Rc<RefCell<dyn Clock>>,
152 inner: Rc<RefCell<PortfolioState>>,
153 config: PortfolioConfig,
154 ) {
155 let inner_weak = WeakCell::from(Rc::downgrade(&inner));
156
157 let update_account_handler = {
158 let cache = cache.clone();
159 let inner = inner_weak.clone();
160 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
161 move |event: &AccountState| {
162 if let Some(inner_rc) = inner.upgrade() {
163 update_account(cache.clone(), inner_rc.into(), event);
164 }
165 },
166 )))
167 };
168
169 let update_position_handler = {
170 let cache = cache.clone();
171 let clock = clock.clone();
172 let inner = inner_weak.clone();
173 let config = config.clone();
174 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
175 move |event: &PositionEvent| {
176 if let Some(inner_rc) = inner.upgrade() {
177 update_position(
178 cache.clone(),
179 clock.clone(),
180 inner_rc.into(),
181 config.clone(),
182 event,
183 );
184 }
185 },
186 )))
187 };
188
189 let update_quote_handler = {
190 let cache = cache.clone();
191 let clock = clock.clone();
192 let inner = inner_weak.clone();
193 let config = config.clone();
194 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
195 move |quote: &QuoteTick| {
196 if let Some(inner_rc) = inner.upgrade() {
197 update_quote_tick(
198 cache.clone(),
199 clock.clone(),
200 inner_rc.into(),
201 config.clone(),
202 quote,
203 );
204 }
205 },
206 )))
207 };
208
209 let update_bar_handler = {
210 let cache = cache.clone();
211 let clock = clock.clone();
212 let inner = inner_weak.clone();
213 let config = config.clone();
214 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
215 if let Some(inner_rc) = inner.upgrade() {
216 update_bar(
217 cache.clone(),
218 clock.clone(),
219 inner_rc.into(),
220 config.clone(),
221 bar,
222 );
223 }
224 })))
225 };
226
227 let update_mark_price_handler = {
228 let cache = cache.clone();
229 let clock = clock.clone();
230 let inner = inner_weak.clone();
231 let config = config.clone();
232 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
233 move |mark_price: &MarkPriceUpdate| {
234 if let Some(inner_rc) = inner.upgrade() {
235 update_instrument_id(
236 cache.clone(),
237 clock.clone(),
238 inner_rc.into(),
239 config.clone(),
240 &mark_price.instrument_id,
241 );
242 }
243 },
244 )))
245 };
246
247 let update_order_handler = {
248 let cache = cache;
249 let clock = clock.clone();
250 let inner = inner_weak;
251 let config = config.clone();
252 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
253 move |event: &OrderEventAny| {
254 if let Some(inner_rc) = inner.upgrade() {
255 update_order(
256 cache.clone(),
257 clock.clone(),
258 inner_rc.into(),
259 config.clone(),
260 event,
261 );
262 }
263 },
264 )))
265 };
266
267 msgbus::register(
268 "Portfolio.update_account".into(),
269 update_account_handler.clone(),
270 );
271
272 msgbus::subscribe("data.quotes.*".into(), update_quote_handler, Some(10));
273 if config.bar_updates {
274 msgbus::subscribe("data.bars.*".into(), update_bar_handler, Some(10));
275 }
276 if config.use_mark_prices {
277 msgbus::subscribe(
278 "data.mark_prices.*".into(),
279 update_mark_price_handler,
280 Some(10),
281 );
282 }
283 msgbus::subscribe("events.order.*".into(), update_order_handler, Some(10));
284 msgbus::subscribe(
285 "events.position.*".into(),
286 update_position_handler,
287 Some(10),
288 );
289 msgbus::subscribe("events.account.*".into(), update_account_handler, Some(10));
290 }
291
292 pub fn reset(&mut self) {
293 log::debug!("RESETTING");
294 self.inner.borrow_mut().reset();
295 log::debug!("READY");
296 }
297
298 #[must_use]
302 pub fn is_initialized(&self) -> bool {
303 self.inner.borrow().initialized
304 }
305
306 #[must_use]
310 pub fn balances_locked(&self, venue: &Venue) -> HashMap<Currency, Money> {
311 self.cache.borrow().account_for_venue(venue).map_or_else(
312 || {
313 log::error!("Cannot get balances locked: no account generated for {venue}");
314 HashMap::new()
315 },
316 AccountAny::balances_locked,
317 )
318 }
319
320 #[must_use]
324 pub fn margins_init(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
325 self.cache.borrow().account_for_venue(venue).map_or_else(
326 || {
327 log::error!(
328 "Cannot get initial (order) margins: no account registered for {venue}"
329 );
330 HashMap::new()
331 },
332 |account| match account {
333 AccountAny::Margin(margin_account) => margin_account.initial_margins(),
334 AccountAny::Cash(_) => {
335 log::warn!("Initial margins not applicable for cash account");
336 HashMap::new()
337 }
338 },
339 )
340 }
341
342 #[must_use]
346 pub fn margins_maint(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
347 self.cache.borrow().account_for_venue(venue).map_or_else(
348 || {
349 log::error!(
350 "Cannot get maintenance (position) margins: no account registered for {venue}"
351 );
352 HashMap::new()
353 },
354 |account| match account {
355 AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
356 AccountAny::Cash(_) => {
357 log::warn!("Maintenance margins not applicable for cash account");
358 HashMap::new()
359 }
360 },
361 )
362 }
363
364 #[must_use]
368 pub fn unrealized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
369 let instrument_ids = {
370 let cache = self.cache.borrow();
371 let positions = cache.positions(Some(venue), None, None, None);
372
373 if positions.is_empty() {
374 return HashMap::new(); }
376
377 let instrument_ids: HashSet<InstrumentId> =
378 positions.iter().map(|p| p.instrument_id).collect();
379
380 instrument_ids
381 };
382
383 let mut unrealized_pnls: HashMap<Currency, f64> = HashMap::new();
384
385 for instrument_id in instrument_ids {
386 if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
387 *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
389 continue;
390 }
391
392 match self.calculate_unrealized_pnl(&instrument_id) {
394 Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
395 None => continue,
396 }
397 }
398
399 unrealized_pnls
400 .into_iter()
401 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
402 .collect()
403 }
404
405 #[must_use]
409 pub fn realized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
410 let instrument_ids = {
411 let cache = self.cache.borrow();
412 let positions = cache.positions(Some(venue), None, None, None);
413
414 if positions.is_empty() {
415 return HashMap::new(); }
417
418 let instrument_ids: HashSet<InstrumentId> =
419 positions.iter().map(|p| p.instrument_id).collect();
420
421 instrument_ids
422 };
423
424 let mut realized_pnls: HashMap<Currency, f64> = HashMap::new();
425
426 for instrument_id in instrument_ids {
427 if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
428 *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
430 continue;
431 }
432
433 match self.calculate_realized_pnl(&instrument_id) {
435 Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
436 None => continue,
437 }
438 }
439
440 realized_pnls
441 .into_iter()
442 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
443 .collect()
444 }
445
446 #[must_use]
447 pub fn net_exposures(&self, venue: &Venue) -> Option<HashMap<Currency, Money>> {
448 let cache = self.cache.borrow();
449 let account = if let Some(account) = cache.account_for_venue(venue) {
450 account
451 } else {
452 log::error!("Cannot calculate net exposures: no account registered for {venue}");
453 return None; };
455
456 let positions_open = cache.positions_open(Some(venue), None, None, None);
457 if positions_open.is_empty() {
458 return Some(HashMap::new()); }
460
461 let mut net_exposures: HashMap<Currency, f64> = HashMap::new();
462
463 for position in positions_open {
464 let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
465 instrument
466 } else {
467 log::error!(
468 "Cannot calculate net exposures: no instrument for {}",
469 position.instrument_id
470 );
471 return None; };
473
474 if position.side == PositionSide::Flat {
475 log::error!(
476 "Cannot calculate net exposures: position is flat for {}",
477 position.instrument_id
478 );
479 continue; }
481
482 let price = self.get_price(position)?;
483 let xrate = if let Some(xrate) =
484 self.calculate_xrate_to_base(instrument, account, position.entry)
485 {
486 xrate
487 } else {
488 log::error!(
489 "Cannot calculate net exposures: insufficient data for {}/{:?}",
491 instrument.settlement_currency(),
492 account.base_currency()
493 );
494 return None; };
496
497 let settlement_currency = account
498 .base_currency()
499 .unwrap_or_else(|| instrument.settlement_currency());
500
501 let net_exposure = instrument
502 .calculate_notional_value(position.quantity, price, None)
503 .as_f64()
504 * xrate;
505
506 let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
507 .round()
508 / 10f64.powi(settlement_currency.precision.into());
509
510 *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
511 }
512
513 Some(
514 net_exposures
515 .into_iter()
516 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
517 .collect(),
518 )
519 }
520
521 #[must_use]
522 pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
523 if let Some(pnl) = self
524 .inner
525 .borrow()
526 .unrealized_pnls
527 .get(instrument_id)
528 .copied()
529 {
530 return Some(pnl);
531 }
532
533 let pnl = self.calculate_unrealized_pnl(instrument_id)?;
534 self.inner
535 .borrow_mut()
536 .unrealized_pnls
537 .insert(*instrument_id, pnl);
538 Some(pnl)
539 }
540
541 #[must_use]
542 pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
543 if let Some(pnl) = self
544 .inner
545 .borrow()
546 .realized_pnls
547 .get(instrument_id)
548 .copied()
549 {
550 return Some(pnl);
551 }
552
553 let pnl = self.calculate_realized_pnl(instrument_id)?;
554 self.inner
555 .borrow_mut()
556 .realized_pnls
557 .insert(*instrument_id, pnl);
558 Some(pnl)
559 }
560
561 #[must_use]
565 pub fn total_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
566 let realized = self.realized_pnl(instrument_id)?;
567 let unrealized = self.unrealized_pnl(instrument_id)?;
568
569 if realized.currency != unrealized.currency {
570 log::error!(
571 "Cannot calculate total PnL: currency mismatch {} vs {}",
572 realized.currency,
573 unrealized.currency
574 );
575 return None;
576 }
577
578 Some(Money::new(
579 realized.as_f64() + unrealized.as_f64(),
580 realized.currency,
581 ))
582 }
583
584 #[must_use]
588 pub fn total_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
589 let realized_pnls = self.realized_pnls(venue);
590 let unrealized_pnls = self.unrealized_pnls(venue);
591
592 let mut total_pnls: HashMap<Currency, Money> = HashMap::new();
593
594 for (currency, realized) in realized_pnls {
596 total_pnls.insert(currency, realized);
597 }
598
599 for (currency, unrealized) in unrealized_pnls {
601 match total_pnls.get_mut(¤cy) {
602 Some(total) => {
603 *total = Money::new(total.as_f64() + unrealized.as_f64(), currency);
604 }
605 None => {
606 total_pnls.insert(currency, unrealized);
607 }
608 }
609 }
610
611 total_pnls
612 }
613
614 #[must_use]
615 pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
616 let cache = self.cache.borrow();
617 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
618 account
619 } else {
620 log::error!(
621 "Cannot calculate net exposure: no account registered for {}",
622 instrument_id.venue
623 );
624 return None;
625 };
626
627 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
628 instrument
629 } else {
630 log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
631 return None;
632 };
633
634 let positions_open = cache.positions_open(
635 None, Some(instrument_id),
637 None,
638 None,
639 );
640
641 if positions_open.is_empty() {
642 return Some(Money::new(0.0, instrument.settlement_currency()));
643 }
644
645 let mut net_exposure = 0.0;
646
647 for position in positions_open {
648 let price = self.get_price(position)?;
649 let xrate = if let Some(xrate) =
650 self.calculate_xrate_to_base(instrument, account, position.entry)
651 {
652 xrate
653 } else {
654 log::error!(
655 "Cannot calculate net exposures: insufficient data for {}/{:?}",
657 instrument.settlement_currency(),
658 account.base_currency()
659 );
660 return None; };
662
663 let notional_value =
664 instrument.calculate_notional_value(position.quantity, price, None);
665 net_exposure += notional_value.as_f64() * xrate;
666 }
667
668 let settlement_currency = account
669 .base_currency()
670 .unwrap_or_else(|| instrument.settlement_currency());
671
672 Some(Money::new(net_exposure, settlement_currency))
673 }
674
675 #[must_use]
676 pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
677 self.inner
678 .borrow()
679 .net_positions
680 .get(instrument_id)
681 .copied()
682 .unwrap_or(Decimal::ZERO)
683 }
684
685 #[must_use]
686 pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
687 self.inner
688 .borrow()
689 .net_positions
690 .get(instrument_id)
691 .copied()
692 .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
693 }
694
695 #[must_use]
696 pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
697 self.inner
698 .borrow()
699 .net_positions
700 .get(instrument_id)
701 .copied()
702 .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
703 }
704
705 #[must_use]
706 pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
707 self.inner
708 .borrow()
709 .net_positions
710 .get(instrument_id)
711 .copied()
712 .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
713 }
714
715 #[must_use]
716 pub fn is_completely_flat(&self) -> bool {
717 for net_position in self.inner.borrow().net_positions.values() {
718 if *net_position != Decimal::ZERO {
719 return false;
720 }
721 }
722 true
723 }
724
725 pub fn initialize_orders(&mut self) {
733 let mut initialized = true;
734 let orders_and_instruments = {
735 let cache = self.cache.borrow();
736 let all_orders_open = cache.orders_open(None, None, None, None);
737
738 let mut instruments_with_orders = Vec::new();
739 let mut instruments = HashSet::new();
740
741 for order in &all_orders_open {
742 instruments.insert(order.instrument_id());
743 }
744
745 for instrument_id in instruments {
746 if let Some(instrument) = cache.instrument(&instrument_id) {
747 let orders = cache
748 .orders_open(None, Some(&instrument_id), None, None)
749 .into_iter()
750 .cloned()
751 .collect::<Vec<OrderAny>>();
752 instruments_with_orders.push((instrument.clone(), orders));
753 } else {
754 log::error!(
755 "Cannot update initial (order) margin: no instrument found for {instrument_id}"
756 );
757 initialized = false;
758 break;
759 }
760 }
761 instruments_with_orders
762 };
763
764 for (instrument, orders_open) in &orders_and_instruments {
765 let mut cache = self.cache.borrow_mut();
766 let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
767 account
768 } else {
769 log::error!(
770 "Cannot update initial (order) margin: no account registered for {}",
771 instrument.id().venue
772 );
773 initialized = false;
774 break;
775 };
776
777 let result = self.inner.borrow_mut().accounts.update_orders(
778 account,
779 instrument.clone(),
780 orders_open.iter().collect(),
781 self.clock.borrow().timestamp_ns(),
782 );
783
784 match result {
785 Some((updated_account, _)) => {
786 cache.update_account(updated_account).unwrap();
787 }
788 None => {
789 initialized = false;
790 }
791 }
792 }
793
794 let total_orders = orders_and_instruments
795 .into_iter()
796 .map(|(_, orders)| orders.len())
797 .sum::<usize>();
798
799 log::info!(
800 "Initialized {} open order{}",
801 total_orders,
802 if total_orders == 1 { "" } else { "s" }
803 );
804
805 self.inner.borrow_mut().initialized = initialized;
806 }
807
808 pub fn initialize_positions(&mut self) {
814 self.inner.borrow_mut().unrealized_pnls.clear();
815 self.inner.borrow_mut().realized_pnls.clear();
816 let all_positions_open: Vec<Position>;
817 let mut instruments = HashSet::new();
818 {
819 let cache = self.cache.borrow();
820 all_positions_open = cache
821 .positions_open(None, None, None, None)
822 .into_iter()
823 .cloned()
824 .collect();
825 for position in &all_positions_open {
826 instruments.insert(position.instrument_id);
827 }
828 }
829
830 let mut initialized = true;
831
832 for instrument_id in instruments {
833 let positions_open: Vec<Position> = {
834 let cache = self.cache.borrow();
835 cache
836 .positions_open(None, Some(&instrument_id), None, None)
837 .into_iter()
838 .cloned()
839 .collect()
840 };
841
842 self.update_net_position(&instrument_id, positions_open);
843
844 if let Some(calculated_unrealized_pnl) = self.calculate_unrealized_pnl(&instrument_id) {
845 self.inner
846 .borrow_mut()
847 .unrealized_pnls
848 .insert(instrument_id, calculated_unrealized_pnl);
849 } else {
850 log::warn!(
851 "Failed to calculate unrealized PnL for {instrument_id}, marking as pending"
852 );
853 self.inner.borrow_mut().pending_calcs.insert(instrument_id);
854 }
855
856 if let Some(calculated_realized_pnl) = self.calculate_realized_pnl(&instrument_id) {
857 self.inner
858 .borrow_mut()
859 .realized_pnls
860 .insert(instrument_id, calculated_realized_pnl);
861 } else {
862 log::warn!(
863 "Failed to calculate realized PnL for {instrument_id}, marking as pending"
864 );
865 self.inner.borrow_mut().pending_calcs.insert(instrument_id);
866 }
867
868 let cache = self.cache.borrow();
869 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
870 account
871 } else {
872 log::error!(
873 "Cannot update maintenance (position) margin: no account registered for {}",
874 instrument_id.venue
875 );
876 initialized = false;
877 break;
878 };
879
880 let account = match account {
881 AccountAny::Cash(_) => continue,
882 AccountAny::Margin(margin_account) => margin_account,
883 };
884
885 let mut cache = self.cache.borrow_mut();
886 let instrument = if let Some(instrument) = cache.instrument(&instrument_id) {
887 instrument
888 } else {
889 log::error!(
890 "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
891 );
892 initialized = false;
893 break;
894 };
895
896 let result = self.inner.borrow_mut().accounts.update_positions(
897 account,
898 instrument.clone(),
899 self.cache
900 .borrow()
901 .positions_open(None, Some(&instrument_id), None, None),
902 self.clock.borrow().timestamp_ns(),
903 );
904
905 match result {
906 Some((updated_account, _)) => {
907 cache
908 .update_account(AccountAny::Margin(updated_account))
909 .unwrap();
910 }
911 None => {
912 initialized = false;
913 }
914 }
915 }
916
917 let open_count = all_positions_open.len();
918 self.inner.borrow_mut().initialized = initialized;
919 log::info!(
920 "Initialized {} open position{}",
921 open_count,
922 if open_count == 1 { "" } else { "s" }
923 );
924 }
925
926 pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
930 update_quote_tick(
931 self.cache.clone(),
932 self.clock.clone(),
933 self.inner.clone(),
934 self.config.clone(),
935 quote,
936 );
937 }
938
939 pub fn update_bar(&mut self, bar: &Bar) {
943 update_bar(
944 self.cache.clone(),
945 self.clock.clone(),
946 self.inner.clone(),
947 self.config.clone(),
948 bar,
949 );
950 }
951
952 pub fn update_account(&mut self, event: &AccountState) {
954 update_account(self.cache.clone(), self.inner.clone(), event);
955 }
956
957 pub fn update_order(&mut self, event: &OrderEventAny) {
961 update_order(
962 self.cache.clone(),
963 self.clock.clone(),
964 self.inner.clone(),
965 self.config.clone(),
966 event,
967 );
968 }
969
970 pub fn update_position(&mut self, event: &PositionEvent) {
974 update_position(
975 self.cache.clone(),
976 self.clock.clone(),
977 self.inner.clone(),
978 self.config.clone(),
979 event,
980 );
981 }
982
983 fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
986 let mut net_position = Decimal::ZERO;
987
988 for open_position in positions_open {
989 log::debug!("open_position: {open_position}");
990 net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
991 }
992
993 let existing_position = self.net_position(instrument_id);
994 if existing_position != net_position {
995 self.inner
996 .borrow_mut()
997 .net_positions
998 .insert(*instrument_id, net_position);
999 log::info!("{instrument_id} net_position={net_position}");
1000 }
1001 }
1002
1003 fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1004 let cache = self.cache.borrow();
1005 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1006 account
1007 } else {
1008 log::error!(
1009 "Cannot calculate unrealized PnL: no account registered for {}",
1010 instrument_id.venue
1011 );
1012 return None;
1013 };
1014
1015 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1016 instrument
1017 } else {
1018 log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
1019 return None;
1020 };
1021
1022 let currency = account
1023 .base_currency()
1024 .unwrap_or_else(|| instrument.settlement_currency());
1025
1026 let positions_open = cache.positions_open(
1027 None, Some(instrument_id),
1029 None,
1030 None,
1031 );
1032
1033 if positions_open.is_empty() {
1034 return Some(Money::new(0.0, currency));
1035 }
1036
1037 let mut total_pnl = 0.0;
1038
1039 for position in positions_open {
1040 if position.instrument_id != *instrument_id {
1041 continue; }
1043
1044 if position.side == PositionSide::Flat {
1045 continue; }
1047
1048 let price = if let Some(price) = self.get_price(position) {
1049 price
1050 } else {
1051 log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
1052 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1053 return None; };
1055
1056 let mut pnl = position.unrealized_pnl(price).as_f64();
1057
1058 if let Some(base_currency) = account.base_currency() {
1059 let xrate = if let Some(xrate) =
1060 self.calculate_xrate_to_base(instrument, account, position.entry)
1061 {
1062 xrate
1063 } else {
1064 log::error!(
1065 "Cannot calculate unrealized PnL: insufficient data for {}/{}",
1067 instrument.settlement_currency(),
1068 base_currency
1069 );
1070 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1071 return None; };
1073
1074 let scale = 10f64.powi(currency.precision.into());
1075 pnl = ((pnl * xrate) * scale).round() / scale;
1076 }
1077
1078 total_pnl += pnl;
1079 }
1080
1081 Some(Money::new(total_pnl, currency))
1082 }
1083
1084 fn ensure_snapshot_pnls_cached_for(&mut self, instrument_id: &InstrumentId) {
1085 let snapshot_position_ids = self.cache.borrow().position_snapshot_ids(instrument_id);
1091
1092 if snapshot_position_ids.is_empty() {
1093 return; }
1095
1096 let mut rebuild = false;
1097
1098 for position_id in &snapshot_position_ids {
1100 let position_snapshots = self.cache.borrow().position_snapshot_bytes(position_id);
1101 let curr_count = position_snapshots.map_or(0, |s| {
1102 s.split(|&b| b == b'{').count() - 1
1104 });
1105 let prev_count = self
1106 .inner
1107 .borrow()
1108 .snapshot_processed_counts
1109 .get(position_id)
1110 .copied()
1111 .unwrap_or(0);
1112
1113 if prev_count > curr_count {
1114 rebuild = true;
1115 break;
1116 }
1117 }
1118
1119 if rebuild {
1120 for position_id in &snapshot_position_ids {
1122 if let Some(position_snapshots) =
1123 self.cache.borrow().position_snapshot_bytes(position_id)
1124 {
1125 let mut sum_pnl: Option<Money> = None;
1126 let mut last_pnl: Option<Money> = None;
1127
1128 let mut start = 0;
1130 let mut depth = 0;
1131 let mut in_string = false;
1132 let mut escape_next = false;
1133
1134 for (i, &byte) in position_snapshots.iter().enumerate() {
1135 if escape_next {
1136 escape_next = false;
1137 continue;
1138 }
1139
1140 if byte == b'\\' && in_string {
1141 escape_next = true;
1142 continue;
1143 }
1144
1145 if byte == b'"' && !escape_next {
1146 in_string = !in_string;
1147 }
1148
1149 if !in_string {
1150 if byte == b'{' {
1151 if depth == 0 {
1152 start = i;
1153 }
1154 depth += 1;
1155 } else if byte == b'}' {
1156 depth -= 1;
1157 if depth == 0
1158 && let Ok(snapshot) = serde_json::from_slice::<Position>(
1159 &position_snapshots[start..=i],
1160 )
1161 && let Some(realized_pnl) = snapshot.realized_pnl
1162 {
1163 if let Some(ref mut sum) = sum_pnl {
1164 if sum.currency == realized_pnl.currency {
1165 *sum = Money::new(
1166 sum.as_f64() + realized_pnl.as_f64(),
1167 sum.currency,
1168 );
1169 }
1170 } else {
1171 sum_pnl = Some(realized_pnl);
1172 }
1173 last_pnl = Some(realized_pnl);
1174 }
1175 }
1176 }
1177 }
1178
1179 let mut inner = self.inner.borrow_mut();
1180 if let Some(sum) = sum_pnl {
1181 inner.snapshot_sum_per_position.insert(*position_id, sum);
1182 if let Some(last) = last_pnl {
1183 inner.snapshot_last_per_position.insert(*position_id, last);
1184 }
1185 } else {
1186 inner.snapshot_sum_per_position.remove(position_id);
1187 inner.snapshot_last_per_position.remove(position_id);
1188 }
1189
1190 let snapshot_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1191 inner
1192 .snapshot_processed_counts
1193 .insert(*position_id, snapshot_count);
1194 }
1195 }
1196 } else {
1197 for position_id in &snapshot_position_ids {
1199 if let Some(position_snapshots) =
1200 self.cache.borrow().position_snapshot_bytes(position_id)
1201 {
1202 let curr_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1203 let prev_count = self
1204 .inner
1205 .borrow()
1206 .snapshot_processed_counts
1207 .get(position_id)
1208 .copied()
1209 .unwrap_or(0);
1210
1211 if prev_count >= curr_count {
1212 continue;
1213 }
1214
1215 let mut sum_pnl = self
1216 .inner
1217 .borrow()
1218 .snapshot_sum_per_position
1219 .get(position_id)
1220 .copied();
1221 let mut last_pnl = self
1222 .inner
1223 .borrow()
1224 .snapshot_last_per_position
1225 .get(position_id)
1226 .copied();
1227
1228 let mut start = 0;
1230 let mut depth = 0;
1231 let mut in_string = false;
1232 let mut escape_next = false;
1233 let mut snapshot_index = 0;
1234
1235 for (i, &byte) in position_snapshots.iter().enumerate() {
1236 if escape_next {
1237 escape_next = false;
1238 continue;
1239 }
1240
1241 if byte == b'\\' && in_string {
1242 escape_next = true;
1243 continue;
1244 }
1245
1246 if byte == b'"' && !escape_next {
1247 in_string = !in_string;
1248 }
1249
1250 if !in_string {
1251 if byte == b'{' {
1252 if depth == 0 {
1253 start = i;
1254 }
1255 depth += 1;
1256 } else if byte == b'}' {
1257 depth -= 1;
1258 if depth == 0 {
1259 snapshot_index += 1;
1260 if snapshot_index > prev_count
1262 && let Ok(snapshot) = serde_json::from_slice::<Position>(
1263 &position_snapshots[start..=i],
1264 )
1265 && let Some(realized_pnl) = snapshot.realized_pnl
1266 {
1267 if let Some(ref mut sum) = sum_pnl {
1268 if sum.currency == realized_pnl.currency {
1269 *sum = Money::new(
1270 sum.as_f64() + realized_pnl.as_f64(),
1271 sum.currency,
1272 );
1273 }
1274 } else {
1275 sum_pnl = Some(realized_pnl);
1276 }
1277 last_pnl = Some(realized_pnl);
1278 }
1279 }
1280 }
1281 }
1282 }
1283
1284 let mut inner = self.inner.borrow_mut();
1285 if let Some(sum) = sum_pnl {
1286 inner.snapshot_sum_per_position.insert(*position_id, sum);
1287 if let Some(last) = last_pnl {
1288 inner.snapshot_last_per_position.insert(*position_id, last);
1289 }
1290 }
1291 inner
1292 .snapshot_processed_counts
1293 .insert(*position_id, curr_count);
1294 }
1295 }
1296 }
1297 }
1298
1299 fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1300 self.ensure_snapshot_pnls_cached_for(instrument_id);
1302
1303 let cache = self.cache.borrow();
1304 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1305 account
1306 } else {
1307 log::error!(
1308 "Cannot calculate realized PnL: no account registered for {}",
1309 instrument_id.venue
1310 );
1311 return None;
1312 };
1313
1314 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1315 instrument
1316 } else {
1317 log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
1318 return None;
1319 };
1320
1321 let currency = account
1322 .base_currency()
1323 .unwrap_or_else(|| instrument.settlement_currency());
1324
1325 let positions = cache.positions(
1326 None, Some(instrument_id),
1328 None,
1329 None,
1330 );
1331
1332 let snapshot_position_ids = cache.position_snapshot_ids(instrument_id);
1333
1334 let is_netting = positions
1336 .iter()
1337 .any(|p| cache.oms_type(&p.id) == Some(OmsType::Netting));
1338
1339 let mut total_pnl = 0.0;
1340
1341 if is_netting && !snapshot_position_ids.is_empty() {
1342 for position_id in &snapshot_position_ids {
1345 let is_active = positions.iter().any(|p| p.id == *position_id);
1346
1347 if is_active {
1348 let last_pnl = self
1350 .inner
1351 .borrow()
1352 .snapshot_last_per_position
1353 .get(position_id)
1354 .copied();
1355 if let Some(last_pnl) = last_pnl {
1356 let mut pnl = last_pnl.as_f64();
1357
1358 if let Some(base_currency) = account.base_currency()
1359 && let Some(position) = positions.iter().find(|p| p.id == *position_id)
1360 {
1361 let xrate = if let Some(xrate) =
1362 self.calculate_xrate_to_base(instrument, account, position.entry)
1363 {
1364 xrate
1365 } else {
1366 log::error!(
1367 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1368 instrument.settlement_currency(),
1369 base_currency
1370 );
1371 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1372 return Some(Money::new(0.0, currency));
1373 };
1374
1375 let scale = 10f64.powi(currency.precision.into());
1376 pnl = ((pnl * xrate) * scale).round() / scale;
1377 }
1378
1379 total_pnl += pnl;
1380 }
1381 } else {
1382 let sum_pnl = self
1384 .inner
1385 .borrow()
1386 .snapshot_sum_per_position
1387 .get(position_id)
1388 .copied();
1389 if let Some(sum_pnl) = sum_pnl {
1390 let mut pnl = sum_pnl.as_f64();
1391
1392 if let Some(base_currency) = account.base_currency() {
1393 let xrate = cache.get_xrate(
1395 instrument_id.venue,
1396 instrument.settlement_currency(),
1397 base_currency,
1398 PriceType::Mid,
1399 );
1400
1401 if let Some(xrate) = xrate {
1402 let scale = 10f64.powi(currency.precision.into());
1403 pnl = ((pnl * xrate) * scale).round() / scale;
1404 } else {
1405 log::error!(
1406 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1407 instrument.settlement_currency(),
1408 base_currency
1409 );
1410 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1411 return Some(Money::new(0.0, currency));
1412 }
1413 }
1414
1415 total_pnl += pnl;
1416 }
1417 }
1418 }
1419
1420 for position in positions {
1422 if position.instrument_id != *instrument_id {
1423 continue;
1424 }
1425
1426 if let Some(realized_pnl) = position.realized_pnl {
1427 let mut pnl = realized_pnl.as_f64();
1428
1429 if let Some(base_currency) = account.base_currency() {
1430 let xrate = if let Some(xrate) =
1431 self.calculate_xrate_to_base(instrument, account, position.entry)
1432 {
1433 xrate
1434 } else {
1435 log::error!(
1436 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1437 instrument.settlement_currency(),
1438 base_currency
1439 );
1440 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1441 return Some(Money::new(0.0, currency));
1442 };
1443
1444 let scale = 10f64.powi(currency.precision.into());
1445 pnl = ((pnl * xrate) * scale).round() / scale;
1446 }
1447
1448 total_pnl += pnl;
1449 }
1450 }
1451 } else {
1452 for position_id in &snapshot_position_ids {
1455 let sum_pnl = self
1456 .inner
1457 .borrow()
1458 .snapshot_sum_per_position
1459 .get(position_id)
1460 .copied();
1461 if let Some(sum_pnl) = sum_pnl {
1462 let mut pnl = sum_pnl.as_f64();
1463
1464 if let Some(base_currency) = account.base_currency() {
1465 let xrate = cache.get_xrate(
1466 instrument_id.venue,
1467 instrument.settlement_currency(),
1468 base_currency,
1469 PriceType::Mid,
1470 );
1471
1472 if let Some(xrate) = xrate {
1473 let scale = 10f64.powi(currency.precision.into());
1474 pnl = ((pnl * xrate) * scale).round() / scale;
1475 } else {
1476 log::error!(
1477 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1478 instrument.settlement_currency(),
1479 base_currency
1480 );
1481 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1482 return Some(Money::new(0.0, currency));
1483 }
1484 }
1485
1486 total_pnl += pnl;
1487 }
1488 }
1489
1490 for position in positions {
1492 if position.instrument_id != *instrument_id {
1493 continue;
1494 }
1495
1496 if let Some(realized_pnl) = position.realized_pnl {
1497 let mut pnl = realized_pnl.as_f64();
1498
1499 if let Some(base_currency) = account.base_currency() {
1500 let xrate = if let Some(xrate) =
1501 self.calculate_xrate_to_base(instrument, account, position.entry)
1502 {
1503 xrate
1504 } else {
1505 log::error!(
1506 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1507 instrument.settlement_currency(),
1508 base_currency
1509 );
1510 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1511 return Some(Money::new(0.0, currency));
1512 };
1513
1514 let scale = 10f64.powi(currency.precision.into());
1515 pnl = ((pnl * xrate) * scale).round() / scale;
1516 }
1517
1518 total_pnl += pnl;
1519 }
1520 }
1521 }
1522
1523 Some(Money::new(total_pnl, currency))
1524 }
1525
1526 fn get_price(&self, position: &Position) -> Option<Price> {
1527 let cache = self.cache.borrow();
1528 let instrument_id = &position.instrument_id;
1529
1530 if self.config.use_mark_prices
1532 && let Some(mark_price) = cache.mark_price(instrument_id)
1533 {
1534 return Some(mark_price.value);
1535 }
1536
1537 let price_type = match position.side {
1539 PositionSide::Long => PriceType::Bid,
1540 PositionSide::Short => PriceType::Ask,
1541 _ => panic!("invalid `PositionSide`, was {}", position.side),
1542 };
1543
1544 cache
1545 .price(instrument_id, price_type)
1546 .or_else(|| cache.price(instrument_id, PriceType::Last))
1547 .or_else(|| {
1548 self.inner
1549 .borrow()
1550 .bar_close_prices
1551 .get(instrument_id)
1552 .copied()
1553 })
1554 }
1555
1556 fn calculate_xrate_to_base(
1557 &self,
1558 instrument: &InstrumentAny,
1559 account: &AccountAny,
1560 side: OrderSide,
1561 ) -> Option<f64> {
1562 if !self.config.convert_to_account_base_currency {
1563 return Some(1.0); }
1565
1566 match account.base_currency() {
1567 None => Some(1.0), Some(base_currency) => {
1569 let cache = self.cache.borrow();
1570
1571 if self.config.use_mark_xrates {
1572 return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1573 }
1574
1575 let price_type = if side == OrderSide::Buy {
1576 PriceType::Bid
1577 } else {
1578 PriceType::Ask
1579 };
1580
1581 cache.get_xrate(
1582 instrument.id().venue,
1583 instrument.settlement_currency(),
1584 base_currency,
1585 price_type,
1586 )
1587 }
1588 }
1589 }
1590}
1591
1592fn update_quote_tick(
1594 cache: Rc<RefCell<Cache>>,
1595 clock: Rc<RefCell<dyn Clock>>,
1596 inner: Rc<RefCell<PortfolioState>>,
1597 config: PortfolioConfig,
1598 quote: &QuoteTick,
1599) {
1600 update_instrument_id(cache, clock.clone(), inner, config, "e.instrument_id);
1601}
1602
1603fn update_bar(
1604 cache: Rc<RefCell<Cache>>,
1605 clock: Rc<RefCell<dyn Clock>>,
1606 inner: Rc<RefCell<PortfolioState>>,
1607 config: PortfolioConfig,
1608 bar: &Bar,
1609) {
1610 let instrument_id = bar.bar_type.instrument_id();
1611 inner
1612 .borrow_mut()
1613 .bar_close_prices
1614 .insert(instrument_id, bar.close);
1615 update_instrument_id(cache, clock.clone(), inner, config, &instrument_id);
1616}
1617
1618fn update_instrument_id(
1619 cache: Rc<RefCell<Cache>>,
1620 clock: Rc<RefCell<dyn Clock>>,
1621 inner: Rc<RefCell<PortfolioState>>,
1622 config: PortfolioConfig,
1623 instrument_id: &InstrumentId,
1624) {
1625 inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1626
1627 if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1628 return;
1629 }
1630
1631 let result_init;
1632 let mut result_maint = None;
1633
1634 let account = {
1635 let cache_ref = cache.borrow();
1636 let account = if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1637 account
1638 } else {
1639 log::error!(
1640 "Cannot update tick: no account registered for {}",
1641 instrument_id.venue
1642 );
1643 return;
1644 };
1645
1646 let mut cache_ref = cache.borrow_mut();
1647 let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1648 instrument.clone()
1649 } else {
1650 log::error!("Cannot update tick: no instrument found for {instrument_id}");
1651 return;
1652 };
1653
1654 let orders_open: Vec<OrderAny> = cache_ref
1656 .orders_open(None, Some(instrument_id), None, None)
1657 .iter()
1658 .map(|o| (*o).clone())
1659 .collect();
1660
1661 let positions_open: Vec<Position> = cache_ref
1662 .positions_open(None, Some(instrument_id), None, None)
1663 .iter()
1664 .map(|p| (*p).clone())
1665 .collect();
1666
1667 result_init = inner.borrow().accounts.update_orders(
1668 account,
1669 instrument.clone(),
1670 orders_open.iter().collect(),
1671 clock.borrow().timestamp_ns(),
1672 );
1673
1674 if let AccountAny::Margin(margin_account) = account {
1675 result_maint = inner.borrow().accounts.update_positions(
1676 margin_account,
1677 instrument,
1678 positions_open.iter().collect(),
1679 clock.borrow().timestamp_ns(),
1680 );
1681 }
1682
1683 if let Some((ref updated_account, _)) = result_init {
1684 cache_ref.update_account(updated_account.clone()).unwrap();
1685 }
1686 account.clone()
1687 };
1688
1689 let mut portfolio_clone = Portfolio {
1690 clock: clock.clone(),
1691 cache,
1692 inner: inner.clone(),
1693 config,
1694 };
1695
1696 let result_unrealized_pnl: Option<Money> =
1697 portfolio_clone.calculate_unrealized_pnl(instrument_id);
1698
1699 if result_init.is_some()
1700 && (matches!(account, AccountAny::Cash(_))
1701 || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1702 {
1703 inner.borrow_mut().pending_calcs.remove(instrument_id);
1704 if inner.borrow().pending_calcs.is_empty() {
1705 inner.borrow_mut().initialized = true;
1706 }
1707 }
1708}
1709
1710fn update_order(
1711 cache: Rc<RefCell<Cache>>,
1712 clock: Rc<RefCell<dyn Clock>>,
1713 inner: Rc<RefCell<PortfolioState>>,
1714 _config: PortfolioConfig,
1715 event: &OrderEventAny,
1716) {
1717 let cache_ref = cache.borrow();
1718 let account_id = match event.account_id() {
1719 Some(account_id) => account_id,
1720 None => {
1721 return; }
1723 };
1724
1725 let account = if let Some(account) = cache_ref.account(&account_id) {
1726 account
1727 } else {
1728 log::error!("Cannot update order: no account registered for {account_id}");
1729 return;
1730 };
1731
1732 match account {
1733 AccountAny::Cash(cash_account) => {
1734 if !cash_account.base.calculate_account_state {
1735 return;
1736 }
1737 }
1738 AccountAny::Margin(margin_account) => {
1739 if !margin_account.base.calculate_account_state {
1740 return;
1741 }
1742 }
1743 }
1744
1745 match event {
1746 OrderEventAny::Accepted(_)
1747 | OrderEventAny::Canceled(_)
1748 | OrderEventAny::Rejected(_)
1749 | OrderEventAny::Updated(_)
1750 | OrderEventAny::Filled(_) => {}
1751 _ => {
1752 return;
1753 }
1754 }
1755
1756 let cache_ref = cache.borrow();
1757 let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1758 order
1759 } else {
1760 log::error!(
1761 "Cannot update order: {} not found in the cache",
1762 event.client_order_id()
1763 );
1764 return; };
1766
1767 if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1768 return; }
1770
1771 let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1772 instrument_id
1773 } else {
1774 log::error!(
1775 "Cannot update order: no instrument found for {}",
1776 event.instrument_id()
1777 );
1778 return;
1779 };
1780
1781 if let OrderEventAny::Filled(order_filled) = event {
1782 let _ = inner.borrow().accounts.update_balances(
1783 account.clone(),
1784 instrument.clone(),
1785 *order_filled,
1786 );
1787
1788 let mut portfolio_clone = Portfolio {
1789 clock: clock.clone(),
1790 cache: cache.clone(),
1791 inner: inner.clone(),
1792 config: PortfolioConfig::default(), };
1794
1795 match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1796 Some(unrealized_pnl) => {
1797 inner
1798 .borrow_mut()
1799 .unrealized_pnls
1800 .insert(event.instrument_id(), unrealized_pnl);
1801 }
1802 None => {
1803 log::error!(
1804 "Failed to calculate unrealized PnL for instrument {}",
1805 event.instrument_id()
1806 );
1807 }
1808 }
1809 }
1810
1811 let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None);
1812
1813 let account_state = inner.borrow_mut().accounts.update_orders(
1814 account,
1815 instrument.clone(),
1816 orders_open,
1817 clock.borrow().timestamp_ns(),
1818 );
1819
1820 let mut cache_ref = cache.borrow_mut();
1821 cache_ref.update_account(account.clone()).unwrap();
1822
1823 if let Some(account_state) = account_state {
1824 msgbus::publish(
1825 format!("events.account.{}", account.id()).into(),
1826 &account_state,
1827 );
1828 } else {
1829 log::debug!("Added pending calculation for {}", instrument.id());
1830 inner.borrow_mut().pending_calcs.insert(instrument.id());
1831 }
1832
1833 log::debug!("Updated {event}");
1834}
1835
1836fn update_position(
1837 cache: Rc<RefCell<Cache>>,
1838 clock: Rc<RefCell<dyn Clock>>,
1839 inner: Rc<RefCell<PortfolioState>>,
1840 _config: PortfolioConfig,
1841 event: &PositionEvent,
1842) {
1843 let instrument_id = event.instrument_id();
1844
1845 let positions_open: Vec<Position> = {
1846 let cache_ref = cache.borrow();
1847
1848 cache_ref
1849 .positions_open(None, Some(&instrument_id), None, None)
1850 .iter()
1851 .map(|o| (*o).clone())
1852 .collect()
1853 };
1854
1855 log::debug!("postion fresh from cache -> {positions_open:?}");
1856
1857 let mut portfolio_clone = Portfolio {
1858 clock: clock.clone(),
1859 cache: cache.clone(),
1860 inner: inner.clone(),
1861 config: PortfolioConfig::default(), };
1863
1864 portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1865
1866 if let Some(calculated_unrealized_pnl) =
1867 portfolio_clone.calculate_unrealized_pnl(&instrument_id)
1868 {
1869 inner
1870 .borrow_mut()
1871 .unrealized_pnls
1872 .insert(event.instrument_id(), calculated_unrealized_pnl);
1873 } else {
1874 log::warn!(
1875 "Failed to calculate unrealized PnL for {}, marking as pending",
1876 event.instrument_id()
1877 );
1878 inner
1879 .borrow_mut()
1880 .pending_calcs
1881 .insert(event.instrument_id());
1882 }
1883
1884 if let Some(calculated_realized_pnl) = portfolio_clone.calculate_realized_pnl(&instrument_id) {
1885 inner
1886 .borrow_mut()
1887 .realized_pnls
1888 .insert(event.instrument_id(), calculated_realized_pnl);
1889 } else {
1890 log::warn!(
1891 "Failed to calculate realized PnL for {}, marking as pending",
1892 event.instrument_id()
1893 );
1894 inner
1895 .borrow_mut()
1896 .pending_calcs
1897 .insert(event.instrument_id());
1898 }
1899
1900 let cache_ref = cache.borrow();
1901 let account = cache_ref.account(&event.account_id());
1902
1903 if let Some(AccountAny::Margin(margin_account)) = account {
1904 if !margin_account.calculate_account_state {
1905 return; }
1907
1908 let cache_ref = cache.borrow();
1909 let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1910 instrument
1911 } else {
1912 log::error!("Cannot update position: no instrument found for {instrument_id}");
1913 return;
1914 };
1915
1916 let result = inner.borrow_mut().accounts.update_positions(
1917 margin_account,
1918 instrument.clone(),
1919 positions_open.iter().collect(),
1920 clock.borrow().timestamp_ns(),
1921 );
1922 let mut cache_ref = cache.borrow_mut();
1923 if let Some((margin_account, _)) = result {
1924 cache_ref
1925 .update_account(AccountAny::Margin(margin_account))
1926 .unwrap();
1927 }
1928 } else if account.is_none() {
1929 log::error!(
1930 "Cannot update position: no account registered for {}",
1931 event.account_id()
1932 );
1933 }
1934}
1935
1936fn update_account(
1937 cache: Rc<RefCell<Cache>>,
1938 inner: Rc<RefCell<PortfolioState>>,
1939 event: &AccountState,
1940) {
1941 let mut cache_ref = cache.borrow_mut();
1942
1943 if let Some(existing) = cache_ref.account(&event.account_id) {
1944 let mut account = existing.clone();
1945 account.apply(event.clone());
1946
1947 if let Err(e) = cache_ref.update_account(account.clone()) {
1948 log::error!("Failed to update account: {e}");
1949 return;
1950 }
1951 } else {
1952 let account = match AccountAny::from_events(vec![event.clone()]) {
1953 Ok(account) => account,
1954 Err(e) => {
1955 log::error!("Failed to create account: {e}");
1956 return;
1957 }
1958 };
1959
1960 if let Err(e) = cache_ref.add_account(account) {
1961 log::error!("Failed to add account: {e}");
1962 return;
1963 }
1964 }
1965
1966 let mut inner_ref = inner.borrow_mut();
1968 let should_log = if inner_ref.min_account_state_logging_interval_ns > 0 {
1969 let current_ts = event.ts_init.as_u64();
1970 let last_ts = inner_ref
1971 .last_account_state_log_ts
1972 .get(&event.account_id)
1973 .copied()
1974 .unwrap_or(0);
1975
1976 if last_ts == 0 || (current_ts - last_ts) >= inner_ref.min_account_state_logging_interval_ns
1977 {
1978 inner_ref
1979 .last_account_state_log_ts
1980 .insert(event.account_id, current_ts);
1981 true
1982 } else {
1983 false
1984 }
1985 } else {
1986 true };
1988
1989 if should_log {
1990 log::info!("Updated {event}");
1991 }
1992}