1#![allow(dead_code)] use std::{
21 cell::RefCell,
22 collections::{HashMap, HashSet},
23 rc::Rc,
24};
25
26use nautilus_analysis::analyzer::PortfolioAnalyzer;
27use nautilus_common::{
28 cache::Cache,
29 clock::Clock,
30 msgbus::{
31 self,
32 handler::{ShareableMessageHandler, TypedMessageHandler},
33 },
34};
35use nautilus_model::{
36 accounts::AccountAny,
37 data::{Bar, QuoteTick},
38 enums::{OrderSide, OrderType, PositionSide, PriceType},
39 events::{AccountState, OrderEventAny, position::PositionEvent},
40 identifiers::{InstrumentId, Venue},
41 instruments::{Instrument, InstrumentAny},
42 orders::{Order, OrderAny},
43 position::Position,
44 types::{Currency, Money, Price},
45};
46use rust_decimal::{Decimal, prelude::FromPrimitive};
47use ustr::Ustr;
48
49use crate::{config::PortfolioConfig, manager::AccountsManager};
50
51struct PortfolioState {
52 accounts: AccountsManager,
53 analyzer: PortfolioAnalyzer,
54 unrealized_pnls: HashMap<InstrumentId, Money>,
55 realized_pnls: HashMap<InstrumentId, Money>,
56 net_positions: HashMap<InstrumentId, Decimal>,
57 pending_calcs: HashSet<InstrumentId>,
58 bar_close_prices: HashMap<InstrumentId, Price>,
59 initialized: bool,
60}
61
62impl PortfolioState {
63 fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
64 Self {
65 accounts: AccountsManager::new(clock, cache),
66 analyzer: PortfolioAnalyzer::default(),
67 unrealized_pnls: HashMap::new(),
68 realized_pnls: HashMap::new(),
69 net_positions: HashMap::new(),
70 pending_calcs: HashSet::new(),
71 bar_close_prices: HashMap::new(),
72 initialized: false,
73 }
74 }
75
76 fn reset(&mut self) {
77 log::debug!("RESETTING");
78 self.net_positions.clear();
79 self.unrealized_pnls.clear();
80 self.realized_pnls.clear();
81 self.pending_calcs.clear();
82 self.analyzer.reset();
83 log::debug!("READY");
84 }
85}
86
87pub struct Portfolio {
88 pub(crate) clock: Rc<RefCell<dyn Clock>>,
89 pub(crate) cache: Rc<RefCell<Cache>>,
90 inner: Rc<RefCell<PortfolioState>>,
91 config: PortfolioConfig,
92}
93
94impl Portfolio {
95 pub fn new(
96 cache: Rc<RefCell<Cache>>,
97 clock: Rc<RefCell<dyn Clock>>,
98 config: Option<PortfolioConfig>,
99 ) -> Self {
100 let inner = Rc::new(RefCell::new(PortfolioState::new(
101 clock.clone(),
102 cache.clone(),
103 )));
104 let config = config.unwrap_or_default();
105
106 Self::register_message_handlers(
107 cache.clone(),
108 clock.clone(),
109 inner.clone(),
110 config.bar_updates,
111 );
112
113 Self {
114 clock,
115 cache,
116 inner,
117 config,
118 }
119 }
120
121 fn register_message_handlers(
122 cache: Rc<RefCell<Cache>>,
123 clock: Rc<RefCell<dyn Clock>>,
124 inner: Rc<RefCell<PortfolioState>>,
125 bar_updates: bool,
126 ) {
127 let update_account_handler = {
128 let cache = cache.clone();
129 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
130 move |event: &AccountState| {
131 update_account(cache.clone(), event);
132 },
133 )))
134 };
135
136 let update_position_handler = {
137 let cache = cache.clone();
138 let clock = clock.clone();
139 let inner = inner.clone();
140 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
141 move |event: &PositionEvent| {
142 update_position(cache.clone(), clock.clone(), inner.clone(), event);
143 },
144 )))
145 };
146
147 let update_quote_handler = {
148 let cache = cache.clone();
149 let clock = clock.clone();
150 let inner = inner.clone();
151 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
152 move |quote: &QuoteTick| {
153 update_quote_tick(cache.clone(), clock.clone(), inner.clone(), quote);
154 },
155 )))
156 };
157
158 let update_bar_handler = {
159 let cache = cache.clone();
160 let clock = clock.clone();
161 let inner = inner.clone();
162 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
163 update_bar(cache.clone(), clock.clone(), inner.clone(), bar);
164 })))
165 };
166
167 let update_order_handler = {
168 let cache = cache;
169 let clock = clock.clone();
170 let inner = inner;
171 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
172 move |event: &OrderEventAny| {
173 update_order(cache.clone(), clock.clone(), inner.clone(), event);
174 },
175 )))
176 };
177
178 msgbus::register("Portfolio.update_account", update_account_handler.clone());
179
180 msgbus::subscribe("data.quotes.*", update_quote_handler, Some(10));
181 if bar_updates {
182 msgbus::subscribe("data.quotes.*EXTERNAL", update_bar_handler, Some(10));
183 }
184 msgbus::subscribe("events.order.*", update_order_handler, Some(10));
185 msgbus::subscribe("events.position.*", update_position_handler, Some(10));
186 msgbus::subscribe("events.account.*", update_account_handler, Some(10));
187 }
188
189 pub fn reset(&mut self) {
190 log::debug!("RESETTING");
191 self.inner.borrow_mut().reset();
192 log::debug!("READY");
193 }
194
195 #[must_use]
198 pub fn is_initialized(&self) -> bool {
199 self.inner.borrow().initialized
200 }
201
202 #[must_use]
203 pub fn balances_locked(&self, venue: &Venue) -> HashMap<Currency, Money> {
204 self.cache.borrow().account_for_venue(venue).map_or_else(
205 || {
206 log::error!("Cannot get balances locked: no account generated for {venue}");
207 HashMap::new()
208 },
209 AccountAny::balances_locked,
210 )
211 }
212
213 #[must_use]
214 pub fn margins_init(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
215 self.cache.borrow().account_for_venue(venue).map_or_else(
216 || {
217 log::error!(
218 "Cannot get initial (order) margins: no account registered for {venue}"
219 );
220 HashMap::new()
221 },
222 |account| match account {
223 AccountAny::Margin(margin_account) => margin_account.initial_margins(),
224 AccountAny::Cash(_) => {
225 log::warn!("Initial margins not applicable for cash account");
226 HashMap::new()
227 }
228 },
229 )
230 }
231
232 #[must_use]
233 pub fn margins_maint(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
234 self.cache.borrow().account_for_venue(venue).map_or_else(
235 || {
236 log::error!(
237 "Cannot get maintenance (position) margins: no account registered for {venue}"
238 );
239 HashMap::new()
240 },
241 |account| match account {
242 AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
243 AccountAny::Cash(_) => {
244 log::warn!("Maintenance margins not applicable for cash account");
245 HashMap::new()
246 }
247 },
248 )
249 }
250
251 #[must_use]
252 pub fn unrealized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
253 let instrument_ids = {
254 let cache = self.cache.borrow();
255 let positions = cache.positions(Some(venue), None, None, None);
256
257 if positions.is_empty() {
258 return HashMap::new(); }
260
261 let instrument_ids: HashSet<InstrumentId> =
262 positions.iter().map(|p| p.instrument_id).collect();
263
264 instrument_ids
265 };
266
267 let mut unrealized_pnls: HashMap<Currency, f64> = HashMap::new();
268
269 for instrument_id in instrument_ids {
270 if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
271 *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
273 continue;
274 }
275
276 match self.calculate_unrealized_pnl(&instrument_id) {
278 Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
279 None => continue,
280 }
281 }
282
283 unrealized_pnls
284 .into_iter()
285 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
286 .collect()
287 }
288
289 #[must_use]
290 pub fn realized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
291 let instrument_ids = {
292 let cache = self.cache.borrow();
293 let positions = cache.positions(Some(venue), None, None, None);
294
295 if positions.is_empty() {
296 return HashMap::new(); }
298
299 let instrument_ids: HashSet<InstrumentId> =
300 positions.iter().map(|p| p.instrument_id).collect();
301
302 instrument_ids
303 };
304
305 let mut realized_pnls: HashMap<Currency, f64> = HashMap::new();
306
307 for instrument_id in instrument_ids {
308 if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
309 *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
311 continue;
312 }
313
314 match self.calculate_realized_pnl(&instrument_id) {
316 Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
317 None => continue,
318 }
319 }
320
321 realized_pnls
322 .into_iter()
323 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
324 .collect()
325 }
326
327 #[must_use]
328 pub fn net_exposures(&self, venue: &Venue) -> Option<HashMap<Currency, Money>> {
329 let cache = self.cache.borrow();
330 let account = if let Some(account) = cache.account_for_venue(venue) {
331 account
332 } else {
333 log::error!("Cannot calculate net exposures: no account registered for {venue}");
334 return None; };
336
337 let positions_open = cache.positions_open(Some(venue), None, None, None);
338 if positions_open.is_empty() {
339 return Some(HashMap::new()); }
341
342 let mut net_exposures: HashMap<Currency, f64> = HashMap::new();
343
344 for position in positions_open {
345 let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
346 instrument
347 } else {
348 log::error!(
349 "Cannot calculate net exposures: no instrument for {}",
350 position.instrument_id
351 );
352 return None; };
354
355 if position.side == PositionSide::Flat {
356 log::error!(
357 "Cannot calculate net exposures: position is flat for {}",
358 position.instrument_id
359 );
360 continue; }
362
363 let price = self.get_price(position)?;
364 let xrate = if let Some(xrate) =
365 self.calculate_xrate_to_base(instrument, account, position.entry)
366 {
367 xrate
368 } else {
369 log::error!(
370 "Cannot calculate net exposures: insufficient data for {}/{:?}",
372 instrument.settlement_currency(),
373 account.base_currency()
374 );
375 return None; };
377
378 let settlement_currency = account
379 .base_currency()
380 .unwrap_or_else(|| instrument.settlement_currency());
381
382 let net_exposure = instrument
383 .calculate_notional_value(position.quantity, price, None)
384 .as_f64()
385 * xrate;
386
387 let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
388 .round()
389 / 10f64.powi(settlement_currency.precision.into());
390
391 *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
392 }
393
394 Some(
395 net_exposures
396 .into_iter()
397 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
398 .collect(),
399 )
400 }
401
402 #[must_use]
403 pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
404 if let Some(pnl) = self
405 .inner
406 .borrow()
407 .unrealized_pnls
408 .get(instrument_id)
409 .copied()
410 {
411 return Some(pnl);
412 }
413
414 let pnl = self.calculate_unrealized_pnl(instrument_id)?;
415 self.inner
416 .borrow_mut()
417 .unrealized_pnls
418 .insert(*instrument_id, pnl);
419 Some(pnl)
420 }
421
422 #[must_use]
423 pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
424 if let Some(pnl) = self
425 .inner
426 .borrow()
427 .realized_pnls
428 .get(instrument_id)
429 .copied()
430 {
431 return Some(pnl);
432 }
433
434 let pnl = self.calculate_realized_pnl(instrument_id)?;
435 self.inner
436 .borrow_mut()
437 .realized_pnls
438 .insert(*instrument_id, pnl);
439 Some(pnl)
440 }
441
442 #[must_use]
443 pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
444 let cache = self.cache.borrow();
445 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
446 account
447 } else {
448 log::error!(
449 "Cannot calculate net exposure: no account registered for {}",
450 instrument_id.venue
451 );
452 return None;
453 };
454
455 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
456 instrument
457 } else {
458 log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
459 return None;
460 };
461
462 let positions_open = cache.positions_open(
463 None, Some(instrument_id),
465 None,
466 None,
467 );
468
469 if positions_open.is_empty() {
470 return Some(Money::new(0.0, instrument.settlement_currency()));
471 }
472
473 let mut net_exposure = 0.0;
474
475 for position in positions_open {
476 let price = self.get_price(position)?;
477 let xrate = if let Some(xrate) =
478 self.calculate_xrate_to_base(instrument, account, position.entry)
479 {
480 xrate
481 } else {
482 log::error!(
483 "Cannot calculate net exposures: insufficient data for {}/{:?}",
485 instrument.settlement_currency(),
486 account.base_currency()
487 );
488 return None; };
490
491 let notional_value =
492 instrument.calculate_notional_value(position.quantity, price, None);
493 net_exposure += notional_value.as_f64() * xrate;
494 }
495
496 let settlement_currency = account
497 .base_currency()
498 .unwrap_or_else(|| instrument.settlement_currency());
499
500 Some(Money::new(net_exposure, settlement_currency))
501 }
502
503 #[must_use]
504 pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
505 self.inner
506 .borrow()
507 .net_positions
508 .get(instrument_id)
509 .copied()
510 .unwrap_or(Decimal::ZERO)
511 }
512
513 #[must_use]
514 pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
515 self.inner
516 .borrow()
517 .net_positions
518 .get(instrument_id)
519 .copied()
520 .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
521 }
522
523 #[must_use]
524 pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
525 self.inner
526 .borrow()
527 .net_positions
528 .get(instrument_id)
529 .copied()
530 .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
531 }
532
533 #[must_use]
534 pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
535 self.inner
536 .borrow()
537 .net_positions
538 .get(instrument_id)
539 .copied()
540 .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
541 }
542
543 #[must_use]
544 pub fn is_completely_flat(&self) -> bool {
545 for net_position in self.inner.borrow().net_positions.values() {
546 if *net_position != Decimal::ZERO {
547 return false;
548 }
549 }
550 true
551 }
552
553 pub fn initialize_orders(&mut self) {
556 let mut initialized = true;
557 let orders_and_instruments = {
558 let cache = self.cache.borrow();
559 let all_orders_open = cache.orders_open(None, None, None, None);
560
561 let mut instruments_with_orders = Vec::new();
562 let mut instruments = HashSet::new();
563
564 for order in &all_orders_open {
565 instruments.insert(order.instrument_id());
566 }
567
568 for instrument_id in instruments {
569 if let Some(instrument) = cache.instrument(&instrument_id) {
570 let orders = cache
571 .orders_open(None, Some(&instrument_id), None, None)
572 .into_iter()
573 .cloned()
574 .collect::<Vec<OrderAny>>();
575 instruments_with_orders.push((instrument.clone(), orders));
576 } else {
577 log::error!(
578 "Cannot update initial (order) margin: no instrument found for {instrument_id}"
579 );
580 initialized = false;
581 break;
582 }
583 }
584 instruments_with_orders
585 };
586
587 for (instrument, orders_open) in &orders_and_instruments {
588 let mut cache = self.cache.borrow_mut();
589 let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
590 account
591 } else {
592 log::error!(
593 "Cannot update initial (order) margin: no account registered for {}",
594 instrument.id().venue
595 );
596 initialized = false;
597 break;
598 };
599
600 let result = self.inner.borrow_mut().accounts.update_orders(
601 account,
602 instrument.clone(),
603 orders_open.iter().collect(),
604 self.clock.borrow().timestamp_ns(),
605 );
606
607 match result {
608 Some((updated_account, _)) => {
609 cache.add_account(updated_account).unwrap(); }
611 None => {
612 initialized = false;
613 }
614 }
615 }
616
617 let total_orders = orders_and_instruments
618 .into_iter()
619 .map(|(_, orders)| orders.len())
620 .sum::<usize>();
621
622 log::info!(
623 "Initialized {} open order{}",
624 total_orders,
625 if total_orders == 1 { "" } else { "s" }
626 );
627
628 self.inner.borrow_mut().initialized = initialized;
629 }
630
631 pub fn initialize_positions(&mut self) {
632 self.inner.borrow_mut().unrealized_pnls.clear();
633 self.inner.borrow_mut().realized_pnls.clear();
634 let all_positions_open: Vec<Position>;
635 let mut instruments = HashSet::new();
636 {
637 let cache = self.cache.borrow();
638 all_positions_open = cache
639 .positions_open(None, None, None, None)
640 .into_iter()
641 .cloned()
642 .collect();
643 for position in &all_positions_open {
644 instruments.insert(position.instrument_id);
645 }
646 }
647
648 let mut initialized = true;
649
650 for instrument_id in instruments {
651 let positions_open: Vec<Position> = {
652 let cache = self.cache.borrow();
653 cache
654 .positions_open(None, Some(&instrument_id), None, None)
655 .into_iter()
656 .cloned()
657 .collect()
658 };
659
660 self.update_net_position(&instrument_id, positions_open);
661
662 let calculated_unrealized_pnl = self
663 .calculate_unrealized_pnl(&instrument_id)
664 .expect("Failed to calculate unrealized PnL");
665 let calculated_realized_pnl = self
666 .calculate_realized_pnl(&instrument_id)
667 .expect("Failed to calculate realized PnL");
668
669 self.inner
670 .borrow_mut()
671 .unrealized_pnls
672 .insert(instrument_id, calculated_unrealized_pnl);
673 self.inner
674 .borrow_mut()
675 .realized_pnls
676 .insert(instrument_id, calculated_realized_pnl);
677
678 let cache = self.cache.borrow();
679 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
680 account
681 } else {
682 log::error!(
683 "Cannot update maintenance (position) margin: no account registered for {}",
684 instrument_id.venue
685 );
686 initialized = false;
687 break;
688 };
689
690 let account = match account {
691 AccountAny::Cash(_) => continue,
692 AccountAny::Margin(margin_account) => margin_account,
693 };
694
695 let mut cache = self.cache.borrow_mut();
696 let instrument = if let Some(instrument) = cache.instrument(&instrument_id) {
697 instrument
698 } else {
699 log::error!(
700 "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
701 );
702 initialized = false;
703 break;
704 };
705
706 let result = self.inner.borrow_mut().accounts.update_positions(
707 account,
708 instrument.clone(),
709 self.cache
710 .borrow()
711 .positions_open(None, Some(&instrument_id), None, None),
712 self.clock.borrow().timestamp_ns(),
713 );
714
715 match result {
716 Some((updated_account, _)) => {
717 cache
718 .add_account(AccountAny::Margin(updated_account)) .unwrap();
720 }
721 None => {
722 initialized = false;
723 }
724 }
725 }
726
727 let open_count = all_positions_open.len();
728 self.inner.borrow_mut().initialized = initialized;
729 log::info!(
730 "Initialized {} open position{}",
731 open_count,
732 if open_count == 1 { "" } else { "s" }
733 );
734 }
735
736 pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
737 update_quote_tick(
738 self.cache.clone(),
739 self.clock.clone(),
740 self.inner.clone(),
741 quote,
742 );
743 }
744
745 pub fn update_bar(&mut self, bar: &Bar) {
746 update_bar(
747 self.cache.clone(),
748 self.clock.clone(),
749 self.inner.clone(),
750 bar,
751 );
752 }
753
754 pub fn update_account(&mut self, event: &AccountState) {
755 update_account(self.cache.clone(), event);
756 }
757
758 pub fn update_order(&mut self, event: &OrderEventAny) {
759 update_order(
760 self.cache.clone(),
761 self.clock.clone(),
762 self.inner.clone(),
763 event,
764 );
765 }
766
767 pub fn update_position(&mut self, event: &PositionEvent) {
768 update_position(
769 self.cache.clone(),
770 self.clock.clone(),
771 self.inner.clone(),
772 event,
773 );
774 }
775
776 fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
779 let mut net_position = Decimal::ZERO;
780
781 for open_position in positions_open {
782 log::debug!("open_position: {open_position}");
783 net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
784 }
785
786 let existing_position = self.net_position(instrument_id);
787 if existing_position != net_position {
788 self.inner
789 .borrow_mut()
790 .net_positions
791 .insert(*instrument_id, net_position);
792 log::info!("{instrument_id} net_position={net_position}");
793 }
794 }
795
796 fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
797 let cache = self.cache.borrow();
798 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
799 account
800 } else {
801 log::error!(
802 "Cannot calculate unrealized PnL: no account registered for {}",
803 instrument_id.venue
804 );
805 return None;
806 };
807
808 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
809 instrument
810 } else {
811 log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
812 return None;
813 };
814
815 let currency = account
816 .base_currency()
817 .unwrap_or_else(|| instrument.settlement_currency());
818
819 let positions_open = cache.positions_open(
820 None, Some(instrument_id),
822 None,
823 None,
824 );
825
826 if positions_open.is_empty() {
827 return Some(Money::new(0.0, currency));
828 }
829
830 let mut total_pnl = 0.0;
831
832 for position in positions_open {
833 if position.instrument_id != *instrument_id {
834 continue; }
836
837 if position.side == PositionSide::Flat {
838 continue; }
840
841 let price = if let Some(price) = self.get_price(position) {
842 price
843 } else {
844 log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
845 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
846 return None; };
848
849 let mut pnl = position.unrealized_pnl(price).as_f64();
850
851 if let Some(base_currency) = account.base_currency() {
852 let xrate = if let Some(xrate) =
853 self.calculate_xrate_to_base(instrument, account, position.entry)
854 {
855 xrate
856 } else {
857 log::error!(
858 "Cannot calculate unrealized PnL: insufficient data for {}/{}",
860 instrument.settlement_currency(),
861 base_currency
862 );
863 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
864 return None; };
866
867 let scale = 10f64.powi(currency.precision.into());
868 pnl = ((pnl * xrate) * scale).round() / scale;
869 }
870
871 total_pnl += pnl;
872 }
873
874 Some(Money::new(total_pnl, currency))
875 }
876
877 fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
878 let cache = self.cache.borrow();
879 let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
880 account
881 } else {
882 log::error!(
883 "Cannot calculate realized PnL: no account registered for {}",
884 instrument_id.venue
885 );
886 return None;
887 };
888
889 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
890 instrument
891 } else {
892 log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
893 return None;
894 };
895
896 let currency = account
897 .base_currency()
898 .unwrap_or_else(|| instrument.settlement_currency());
899
900 let positions = cache.positions(
901 None, Some(instrument_id),
903 None,
904 None,
905 );
906
907 if positions.is_empty() {
908 return Some(Money::new(0.0, currency));
909 }
910
911 let mut total_pnl = 0.0;
912
913 for position in positions {
914 if position.instrument_id != *instrument_id {
915 continue; }
917
918 if position.realized_pnl.is_none() {
919 continue; }
921
922 let mut pnl = position.realized_pnl?.as_f64();
923
924 if let Some(base_currency) = account.base_currency() {
925 let xrate = if let Some(xrate) =
926 self.calculate_xrate_to_base(instrument, account, position.entry)
927 {
928 xrate
929 } else {
930 log::error!(
931 "Cannot calculate realized PnL: insufficient data for {}/{}",
933 instrument.settlement_currency(),
934 base_currency
935 );
936 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
937 return None; };
939
940 let scale = 10f64.powi(currency.precision.into());
941 pnl = ((pnl * xrate) * scale).round() / scale;
942 }
943
944 total_pnl += pnl;
945 }
946
947 Some(Money::new(total_pnl, currency))
948 }
949
950 fn get_price(&self, position: &Position) -> Option<Price> {
951 let price_type = match position.side {
952 PositionSide::Long => PriceType::Bid,
953 PositionSide::Short => PriceType::Ask,
954 _ => panic!("invalid `PositionSide`, was {}", position.side),
955 };
956
957 let cache = self.cache.borrow();
958
959 let instrument_id = &position.instrument_id;
960 cache
961 .price(instrument_id, price_type)
962 .or_else(|| cache.price(instrument_id, PriceType::Last))
963 .or_else(|| {
964 self.inner
965 .borrow()
966 .bar_close_prices
967 .get(instrument_id)
968 .copied()
969 })
970 }
971
972 fn calculate_xrate_to_base(
973 &self,
974 instrument: &InstrumentAny,
975 account: &AccountAny,
976 side: OrderSide,
977 ) -> Option<f64> {
978 if !self.config.convert_to_account_base_currency {
979 return Some(1.0); }
981
982 match account.base_currency() {
983 None => Some(1.0), Some(base_currency) => {
985 let cache = self.cache.borrow();
986
987 if self.config.use_mark_xrates {
988 return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
989 }
990
991 let price_type = if side == OrderSide::Buy {
992 PriceType::Bid
993 } else {
994 PriceType::Ask
995 };
996
997 cache.get_xrate(
998 instrument.id().venue,
999 instrument.settlement_currency(),
1000 base_currency,
1001 price_type,
1002 )
1003 }
1004 }
1005 }
1006}
1007
1008fn update_quote_tick(
1010 cache: Rc<RefCell<Cache>>,
1011 clock: Rc<RefCell<dyn Clock>>,
1012 inner: Rc<RefCell<PortfolioState>>,
1013 quote: &QuoteTick,
1014) {
1015 update_instrument_id(cache, clock.clone(), inner, "e.instrument_id);
1016}
1017
1018fn update_bar(
1019 cache: Rc<RefCell<Cache>>,
1020 clock: Rc<RefCell<dyn Clock>>,
1021 inner: Rc<RefCell<PortfolioState>>,
1022 bar: &Bar,
1023) {
1024 let instrument_id = bar.bar_type.instrument_id();
1025 inner
1026 .borrow_mut()
1027 .bar_close_prices
1028 .insert(instrument_id, bar.close);
1029 update_instrument_id(cache, clock.clone(), inner, &instrument_id);
1030}
1031
1032fn update_instrument_id(
1033 cache: Rc<RefCell<Cache>>,
1034 clock: Rc<RefCell<dyn Clock>>,
1035 inner: Rc<RefCell<PortfolioState>>,
1036 instrument_id: &InstrumentId,
1037) {
1038 inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1039
1040 if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1041 return;
1042 }
1043
1044 let result_init;
1045 let mut result_maint = None;
1046
1047 let account = {
1048 let borrowed_cache = cache.borrow();
1049 let account = if let Some(account) = borrowed_cache.account_for_venue(&instrument_id.venue)
1050 {
1051 account
1052 } else {
1053 log::error!(
1054 "Cannot update tick: no account registered for {}",
1055 instrument_id.venue
1056 );
1057 return;
1058 };
1059
1060 let mut borrowed_cache = cache.borrow_mut();
1061 let instrument = if let Some(instrument) = borrowed_cache.instrument(instrument_id) {
1062 instrument.clone()
1063 } else {
1064 log::error!("Cannot update tick: no instrument found for {instrument_id}");
1065 return;
1066 };
1067
1068 let orders_open: Vec<OrderAny> = borrowed_cache
1070 .orders_open(None, Some(instrument_id), None, None)
1071 .iter()
1072 .map(|o| (*o).clone())
1073 .collect();
1074
1075 let positions_open: Vec<Position> = borrowed_cache
1076 .positions_open(None, Some(instrument_id), None, None)
1077 .iter()
1078 .map(|p| (*p).clone())
1079 .collect();
1080
1081 result_init = inner.borrow().accounts.update_orders(
1082 account,
1083 instrument.clone(),
1084 orders_open.iter().collect(),
1085 clock.borrow().timestamp_ns(),
1086 );
1087
1088 if let AccountAny::Margin(margin_account) = account {
1089 result_maint = inner.borrow().accounts.update_positions(
1090 margin_account,
1091 instrument,
1092 positions_open.iter().collect(),
1093 clock.borrow().timestamp_ns(),
1094 );
1095 }
1096
1097 if let Some((ref updated_account, _)) = result_init {
1098 borrowed_cache.add_account(updated_account.clone()).unwrap(); }
1100 account.clone()
1101 };
1102
1103 let mut portfolio_clone = Portfolio {
1104 clock: clock.clone(),
1105 cache,
1106 inner: inner.clone(),
1107 config: PortfolioConfig::default(), };
1109
1110 let result_unrealized_pnl: Option<Money> =
1111 portfolio_clone.calculate_unrealized_pnl(instrument_id);
1112
1113 if result_init.is_some()
1114 && (matches!(account, AccountAny::Cash(_))
1115 || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1116 {
1117 inner.borrow_mut().pending_calcs.remove(instrument_id);
1118 if inner.borrow().pending_calcs.is_empty() {
1119 inner.borrow_mut().initialized = true;
1120 }
1121 }
1122}
1123
1124fn update_order(
1125 cache: Rc<RefCell<Cache>>,
1126 clock: Rc<RefCell<dyn Clock>>,
1127 inner: Rc<RefCell<PortfolioState>>,
1128 event: &OrderEventAny,
1129) {
1130 let borrowed_cache = cache.borrow();
1131 let account_id = match event.account_id() {
1132 Some(account_id) => account_id,
1133 None => {
1134 return; }
1136 };
1137
1138 let account = if let Some(account) = borrowed_cache.account(&account_id) {
1139 account
1140 } else {
1141 log::error!("Cannot update order: no account registered for {account_id}");
1142 return;
1143 };
1144
1145 match account {
1146 AccountAny::Cash(cash_account) => {
1147 if !cash_account.base.calculate_account_state {
1148 return;
1149 }
1150 }
1151 AccountAny::Margin(margin_account) => {
1152 if !margin_account.base.calculate_account_state {
1153 return;
1154 }
1155 }
1156 }
1157
1158 match event {
1159 OrderEventAny::Accepted(_)
1160 | OrderEventAny::Canceled(_)
1161 | OrderEventAny::Rejected(_)
1162 | OrderEventAny::Updated(_)
1163 | OrderEventAny::Filled(_) => {}
1164 _ => {
1165 return;
1166 }
1167 }
1168
1169 let borrowed_cache = cache.borrow();
1170 let order = if let Some(order) = borrowed_cache.order(&event.client_order_id()) {
1171 order
1172 } else {
1173 log::error!(
1174 "Cannot update order: {} not found in the cache",
1175 event.client_order_id()
1176 );
1177 return; };
1179
1180 if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1181 return; }
1183
1184 let instrument = if let Some(instrument_id) = borrowed_cache.instrument(&event.instrument_id())
1185 {
1186 instrument_id
1187 } else {
1188 log::error!(
1189 "Cannot update order: no instrument found for {}",
1190 event.instrument_id()
1191 );
1192 return;
1193 };
1194
1195 if let OrderEventAny::Filled(order_filled) = event {
1196 let _ = inner.borrow().accounts.update_balances(
1197 account.clone(),
1198 instrument.clone(),
1199 *order_filled,
1200 );
1201
1202 let mut portfolio_clone = Portfolio {
1203 clock: clock.clone(),
1204 cache: cache.clone(),
1205 inner: inner.clone(),
1206 config: PortfolioConfig::default(), };
1208
1209 match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1210 Some(unrealized_pnl) => {
1211 inner
1212 .borrow_mut()
1213 .unrealized_pnls
1214 .insert(event.instrument_id(), unrealized_pnl);
1215 }
1216 None => {
1217 log::error!(
1218 "Failed to calculate unrealized PnL for instrument {}",
1219 event.instrument_id()
1220 );
1221 }
1222 }
1223 }
1224
1225 let orders_open = borrowed_cache.orders_open(None, Some(&event.instrument_id()), None, None);
1226
1227 let account_state = inner.borrow_mut().accounts.update_orders(
1228 account,
1229 instrument.clone(),
1230 orders_open,
1231 clock.borrow().timestamp_ns(),
1232 );
1233
1234 let mut borrowed_cache = cache.borrow_mut();
1235 borrowed_cache.update_account(account.clone()).unwrap();
1236
1237 if let Some(account_state) = account_state {
1238 msgbus::publish(
1239 &Ustr::from(&format!("events.account.{}", account.id())),
1240 &account_state,
1241 );
1242 } else {
1243 log::debug!("Added pending calculation for {}", instrument.id());
1244 inner.borrow_mut().pending_calcs.insert(instrument.id());
1245 }
1246
1247 log::debug!("Updated {event}");
1248}
1249
1250fn update_position(
1251 cache: Rc<RefCell<Cache>>,
1252 clock: Rc<RefCell<dyn Clock>>,
1253 inner: Rc<RefCell<PortfolioState>>,
1254 event: &PositionEvent,
1255) {
1256 let instrument_id = event.instrument_id();
1257
1258 let positions_open: Vec<Position> = {
1259 let borrowed_cache = cache.borrow();
1260
1261 borrowed_cache
1262 .positions_open(None, Some(&instrument_id), None, None)
1263 .iter()
1264 .map(|o| (*o).clone())
1265 .collect()
1266 };
1267
1268 log::debug!("postion fresh from cache -> {positions_open:?}");
1269
1270 let mut portfolio_clone = Portfolio {
1271 clock: clock.clone(),
1272 cache: cache.clone(),
1273 inner: inner.clone(),
1274 config: PortfolioConfig::default(), };
1276
1277 portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1278
1279 let calculated_unrealized_pnl = portfolio_clone
1280 .calculate_unrealized_pnl(&instrument_id)
1281 .expect("Failed to calculate unrealized PnL");
1282 let calculated_realized_pnl = portfolio_clone
1283 .calculate_realized_pnl(&instrument_id)
1284 .expect("Failed to calculate realized PnL");
1285
1286 inner
1287 .borrow_mut()
1288 .unrealized_pnls
1289 .insert(event.instrument_id(), calculated_unrealized_pnl);
1290 inner
1291 .borrow_mut()
1292 .realized_pnls
1293 .insert(event.instrument_id(), calculated_realized_pnl);
1294
1295 let borrowed_cache = cache.borrow();
1296 let account = borrowed_cache.account(&event.account_id());
1297
1298 if let Some(AccountAny::Margin(margin_account)) = account {
1299 if !margin_account.calculate_account_state {
1300 return; }
1302
1303 let borrowed_cache = cache.borrow();
1304 let instrument = if let Some(instrument) = borrowed_cache.instrument(&instrument_id) {
1305 instrument
1306 } else {
1307 log::error!("Cannot update position: no instrument found for {instrument_id}");
1308 return;
1309 };
1310
1311 let result = inner.borrow_mut().accounts.update_positions(
1312 margin_account,
1313 instrument.clone(),
1314 positions_open.iter().collect(),
1315 clock.borrow().timestamp_ns(),
1316 );
1317 let mut borrowed_cache = cache.borrow_mut();
1318 if let Some((margin_account, _)) = result {
1319 borrowed_cache
1320 .add_account(AccountAny::Margin(margin_account)) .unwrap();
1322 }
1323 } else if account.is_none() {
1324 log::error!(
1325 "Cannot update position: no account registered for {}",
1326 event.account_id()
1327 );
1328 }
1329}
1330
1331pub fn update_account(cache: Rc<RefCell<Cache>>, event: &AccountState) {
1332 let mut borrowed_cache = cache.borrow_mut();
1333
1334 if let Some(existing) = borrowed_cache.account(&event.account_id) {
1335 let mut account = existing.clone();
1336 account.apply(event.clone());
1337
1338 if let Err(e) = borrowed_cache.update_account(account.clone()) {
1339 log::error!("Failed to update account: {e}");
1340 return;
1341 }
1342 } else {
1343 let account = match AccountAny::from_events(vec![event.clone()]) {
1344 Ok(account) => account,
1345 Err(e) => {
1346 log::error!("Failed to create account: {e}");
1347 return;
1348 }
1349 };
1350
1351 if let Err(e) = borrowed_cache.add_account(account) {
1352 log::error!("Failed to add account: {e}");
1353 return;
1354 }
1355 }
1356
1357 log::info!("Updated {event}");
1358}