nautilus_portfolio/
portfolio.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `Portfolio` for all environments.
17
18use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_analysis::analyzer::PortfolioAnalyzer;
22use nautilus_common::{
23    cache::Cache,
24    clock::Clock,
25    enums::LogColor,
26    msgbus::{
27        self,
28        handler::{ShareableMessageHandler, TypedMessageHandler},
29    },
30};
31use nautilus_core::{WeakCell, datetime::NANOSECONDS_IN_MILLISECOND};
32use nautilus_model::{
33    accounts::AccountAny,
34    data::{Bar, MarkPriceUpdate, QuoteTick},
35    enums::{OmsType, OrderSide, OrderType, PositionSide, PriceType},
36    events::{AccountState, OrderEventAny, position::PositionEvent},
37    identifiers::{AccountId, InstrumentId, PositionId, Venue},
38    instruments::{Instrument, InstrumentAny},
39    orders::{Order, OrderAny},
40    position::Position,
41    types::{Currency, Money, Price},
42};
43use rust_decimal::{Decimal, prelude::FromPrimitive};
44
45use crate::{config::PortfolioConfig, manager::AccountsManager};
46
47struct PortfolioState {
48    accounts: AccountsManager,
49    analyzer: PortfolioAnalyzer,
50    unrealized_pnls: AHashMap<InstrumentId, Money>,
51    realized_pnls: AHashMap<InstrumentId, Money>,
52    snapshot_sum_per_position: AHashMap<PositionId, Money>,
53    snapshot_last_per_position: AHashMap<PositionId, Money>,
54    snapshot_processed_counts: AHashMap<PositionId, usize>,
55    net_positions: AHashMap<InstrumentId, Decimal>,
56    pending_calcs: AHashSet<InstrumentId>,
57    bar_close_prices: AHashMap<InstrumentId, Price>,
58    initialized: bool,
59    last_account_state_log_ts: AHashMap<AccountId, u64>,
60    min_account_state_logging_interval_ns: u64,
61}
62
63impl PortfolioState {
64    fn new(
65        clock: Rc<RefCell<dyn Clock>>,
66        cache: Rc<RefCell<Cache>>,
67        config: &PortfolioConfig,
68    ) -> Self {
69        let min_account_state_logging_interval_ns = config
70            .min_account_state_logging_interval_ms
71            .map_or(0, |ms| ms * NANOSECONDS_IN_MILLISECOND);
72
73        Self {
74            accounts: AccountsManager::new(clock, cache),
75            analyzer: PortfolioAnalyzer::default(),
76            unrealized_pnls: AHashMap::new(),
77            realized_pnls: AHashMap::new(),
78            snapshot_sum_per_position: AHashMap::new(),
79            snapshot_last_per_position: AHashMap::new(),
80            snapshot_processed_counts: AHashMap::new(),
81            net_positions: AHashMap::new(),
82            pending_calcs: AHashSet::new(),
83            bar_close_prices: AHashMap::new(),
84            initialized: false,
85            last_account_state_log_ts: AHashMap::new(),
86            min_account_state_logging_interval_ns,
87        }
88    }
89
90    fn reset(&mut self) {
91        log::debug!("RESETTING");
92        self.net_positions.clear();
93        self.unrealized_pnls.clear();
94        self.realized_pnls.clear();
95        self.snapshot_sum_per_position.clear();
96        self.snapshot_last_per_position.clear();
97        self.snapshot_processed_counts.clear();
98        self.pending_calcs.clear();
99        self.last_account_state_log_ts.clear();
100        self.analyzer.reset();
101        log::debug!("READY");
102    }
103}
104
105pub struct Portfolio {
106    pub(crate) clock: Rc<RefCell<dyn Clock>>,
107    pub(crate) cache: Rc<RefCell<Cache>>,
108    inner: Rc<RefCell<PortfolioState>>,
109    config: PortfolioConfig,
110}
111
112impl Debug for Portfolio {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.debug_struct(stringify!(Portfolio)).finish()
115    }
116}
117
118impl Portfolio {
119    pub fn new(
120        cache: Rc<RefCell<Cache>>,
121        clock: Rc<RefCell<dyn Clock>>,
122        config: Option<PortfolioConfig>,
123    ) -> Self {
124        let config = config.unwrap_or_default();
125        let inner = Rc::new(RefCell::new(PortfolioState::new(
126            clock.clone(),
127            cache.clone(),
128            &config,
129        )));
130
131        Self::register_message_handlers(
132            cache.clone(),
133            clock.clone(),
134            inner.clone(),
135            config.clone(),
136        );
137
138        Self {
139            clock,
140            cache,
141            inner,
142            config,
143        }
144    }
145
146    /// Creates a shallow clone of the Portfolio that shares the same internal state.
147    ///
148    /// This is useful when multiple components need to reference the same Portfolio
149    /// without creating duplicate msgbus handler registrations.
150    #[must_use]
151    pub fn clone_shallow(&self) -> Self {
152        Self {
153            clock: self.clock.clone(),
154            cache: self.cache.clone(),
155            inner: self.inner.clone(),
156            config: self.config.clone(),
157        }
158    }
159
160    fn register_message_handlers(
161        cache: Rc<RefCell<Cache>>,
162        clock: Rc<RefCell<dyn Clock>>,
163        inner: Rc<RefCell<PortfolioState>>,
164        config: PortfolioConfig,
165    ) {
166        let inner_weak = WeakCell::from(Rc::downgrade(&inner));
167
168        let update_account_handler = {
169            let cache = cache.clone();
170            let inner = inner_weak.clone();
171            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
172                move |event: &AccountState| {
173                    if let Some(inner_rc) = inner.upgrade() {
174                        update_account(cache.clone(), inner_rc.into(), event);
175                    }
176                },
177            )))
178        };
179
180        let update_position_handler = {
181            let cache = cache.clone();
182            let clock = clock.clone();
183            let inner = inner_weak.clone();
184            let config = config.clone();
185            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
186                move |event: &PositionEvent| {
187                    if let Some(inner_rc) = inner.upgrade() {
188                        update_position(
189                            cache.clone(),
190                            clock.clone(),
191                            inner_rc.into(),
192                            config.clone(),
193                            event,
194                        );
195                    }
196                },
197            )))
198        };
199
200        let update_quote_handler = {
201            let cache = cache.clone();
202            let clock = clock.clone();
203            let inner = inner_weak.clone();
204            let config = config.clone();
205            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
206                move |quote: &QuoteTick| {
207                    if let Some(inner_rc) = inner.upgrade() {
208                        update_quote_tick(
209                            cache.clone(),
210                            clock.clone(),
211                            inner_rc.into(),
212                            config.clone(),
213                            quote,
214                        );
215                    }
216                },
217            )))
218        };
219
220        let update_bar_handler = {
221            let cache = cache.clone();
222            let clock = clock.clone();
223            let inner = inner_weak.clone();
224            let config = config.clone();
225            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
226                if let Some(inner_rc) = inner.upgrade() {
227                    update_bar(
228                        cache.clone(),
229                        clock.clone(),
230                        inner_rc.into(),
231                        config.clone(),
232                        bar,
233                    );
234                }
235            })))
236        };
237
238        let update_mark_price_handler = {
239            let cache = cache.clone();
240            let clock = clock.clone();
241            let inner = inner_weak.clone();
242            let config = config.clone();
243            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
244                move |mark_price: &MarkPriceUpdate| {
245                    if let Some(inner_rc) = inner.upgrade() {
246                        update_instrument_id(
247                            cache.clone(),
248                            clock.clone(),
249                            inner_rc.into(),
250                            config.clone(),
251                            &mark_price.instrument_id,
252                        );
253                    }
254                },
255            )))
256        };
257
258        let update_order_handler = {
259            let cache = cache;
260            let clock = clock.clone();
261            let inner = inner_weak;
262            let config = config.clone();
263            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
264                move |event: &OrderEventAny| {
265                    if let Some(inner_rc) = inner.upgrade() {
266                        update_order(
267                            cache.clone(),
268                            clock.clone(),
269                            inner_rc.into(),
270                            config.clone(),
271                            event,
272                        );
273                    }
274                },
275            )))
276        };
277
278        msgbus::register(
279            "Portfolio.update_account".into(),
280            update_account_handler.clone(),
281        );
282
283        msgbus::subscribe("data.quotes.*".into(), update_quote_handler, Some(10));
284        if config.bar_updates {
285            msgbus::subscribe("data.bars.*EXTERNAL".into(), update_bar_handler, Some(10));
286        }
287        if config.use_mark_prices {
288            msgbus::subscribe(
289                "data.mark_prices.*".into(),
290                update_mark_price_handler,
291                Some(10),
292            );
293        }
294        msgbus::subscribe("events.order.*".into(), update_order_handler, Some(10));
295        msgbus::subscribe(
296            "events.position.*".into(),
297            update_position_handler,
298            Some(10),
299        );
300        msgbus::subscribe("events.account.*".into(), update_account_handler, Some(10));
301    }
302
303    pub fn reset(&mut self) {
304        log::debug!("RESETTING");
305        self.inner.borrow_mut().reset();
306        log::debug!("READY");
307    }
308
309    // -- QUERIES ---------------------------------------------------------------------------------
310
311    /// Returns `true` if the portfolio has been initialized.
312    #[must_use]
313    pub fn is_initialized(&self) -> bool {
314        self.inner.borrow().initialized
315    }
316
317    /// Returns the locked balances for the given venue.
318    ///
319    /// Locked balances represent funds reserved for open orders.
320    #[must_use]
321    pub fn balances_locked(&self, venue: &Venue) -> AHashMap<Currency, Money> {
322        self.cache.borrow().account_for_venue(venue).map_or_else(
323            || {
324                log::error!("Cannot get balances locked: no account generated for {venue}");
325                AHashMap::new()
326            },
327            AccountAny::balances_locked,
328        )
329    }
330
331    /// Returns the initial margin requirements for the given venue.
332    ///
333    /// Only applicable for margin accounts. Returns empty map for cash accounts.
334    #[must_use]
335    pub fn margins_init(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
336        self.cache.borrow().account_for_venue(venue).map_or_else(
337            || {
338                log::error!(
339                    "Cannot get initial (order) margins: no account registered for {venue}"
340                );
341                AHashMap::new()
342            },
343            |account| match account {
344                AccountAny::Margin(margin_account) => margin_account.initial_margins(),
345                AccountAny::Cash(_) => {
346                    log::warn!("Initial margins not applicable for cash account");
347                    AHashMap::new()
348                }
349            },
350        )
351    }
352
353    /// Returns the maintenance margin requirements for the given venue.
354    ///
355    /// Only applicable for margin accounts. Returns empty map for cash accounts.
356    #[must_use]
357    pub fn margins_maint(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
358        self.cache.borrow().account_for_venue(venue).map_or_else(
359            || {
360                log::error!(
361                    "Cannot get maintenance (position) margins: no account registered for {venue}"
362                );
363                AHashMap::new()
364            },
365            |account| match account {
366                AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
367                AccountAny::Cash(_) => {
368                    log::warn!("Maintenance margins not applicable for cash account");
369                    AHashMap::new()
370                }
371            },
372        )
373    }
374
375    /// Returns the unrealized PnLs for all positions at the given venue.
376    ///
377    /// Calculates mark-to-market PnL based on current market prices.
378    #[must_use]
379    pub fn unrealized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
380        let instrument_ids = {
381            let cache = self.cache.borrow();
382            let positions = cache.positions(Some(venue), None, None, None);
383
384            if positions.is_empty() {
385                return AHashMap::new(); // Nothing to calculate
386            }
387
388            let instrument_ids: AHashSet<InstrumentId> =
389                positions.iter().map(|p| p.instrument_id).collect();
390
391            instrument_ids
392        };
393
394        let mut unrealized_pnls: AHashMap<Currency, f64> = AHashMap::new();
395
396        for instrument_id in instrument_ids {
397            if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
398                // PnL already calculated
399                *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
400                continue;
401            }
402
403            // Calculate PnL
404            match self.calculate_unrealized_pnl(&instrument_id) {
405                Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
406                None => continue,
407            }
408        }
409
410        unrealized_pnls
411            .into_iter()
412            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
413            .collect()
414    }
415
416    /// Returns the realized PnLs for all positions at the given venue.
417    ///
418    /// Calculates total realized profit and loss from closed positions.
419    #[must_use]
420    pub fn realized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
421        let instrument_ids = {
422            let cache = self.cache.borrow();
423            let positions = cache.positions(Some(venue), None, None, None);
424
425            if positions.is_empty() {
426                return AHashMap::new(); // Nothing to calculate
427            }
428
429            let instrument_ids: AHashSet<InstrumentId> =
430                positions.iter().map(|p| p.instrument_id).collect();
431
432            instrument_ids
433        };
434
435        let mut realized_pnls: AHashMap<Currency, f64> = AHashMap::new();
436
437        for instrument_id in instrument_ids {
438            if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
439                // PnL already calculated
440                *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
441                continue;
442            }
443
444            // Calculate PnL
445            match self.calculate_realized_pnl(&instrument_id) {
446                Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
447                None => continue,
448            }
449        }
450
451        realized_pnls
452            .into_iter()
453            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
454            .collect()
455    }
456
457    #[must_use]
458    pub fn net_exposures(&self, venue: &Venue) -> Option<AHashMap<Currency, Money>> {
459        let cache = self.cache.borrow();
460        let account = if let Some(account) = cache.account_for_venue(venue) {
461            account
462        } else {
463            log::error!("Cannot calculate net exposures: no account registered for {venue}");
464            return None; // Cannot calculate
465        };
466
467        let positions_open = cache.positions_open(Some(venue), None, None, None);
468        if positions_open.is_empty() {
469            return Some(AHashMap::new()); // Nothing to calculate
470        }
471
472        let mut net_exposures: AHashMap<Currency, f64> = AHashMap::new();
473
474        for position in positions_open {
475            let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
476                instrument
477            } else {
478                log::error!(
479                    "Cannot calculate net exposures: no instrument for {}",
480                    position.instrument_id
481                );
482                return None; // Cannot calculate
483            };
484
485            if position.side == PositionSide::Flat {
486                log::error!(
487                    "Cannot calculate net exposures: position is flat for {}",
488                    position.instrument_id
489                );
490                continue; // Nothing to calculate
491            }
492
493            let price = self.get_price(position)?;
494            let xrate = if let Some(xrate) =
495                self.calculate_xrate_to_base(instrument, account, position.entry)
496            {
497                xrate
498            } else {
499                log::error!(
500                    // TODO: Improve logging
501                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
502                    instrument.settlement_currency(),
503                    account.base_currency()
504                );
505                return None; // Cannot calculate
506            };
507
508            let settlement_currency = account
509                .base_currency()
510                .unwrap_or_else(|| instrument.settlement_currency());
511
512            let net_exposure = instrument
513                .calculate_notional_value(position.quantity, price, None)
514                .as_f64()
515                * xrate;
516
517            let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
518                .round()
519                / 10f64.powi(settlement_currency.precision.into());
520
521            *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
522        }
523
524        Some(
525            net_exposures
526                .into_iter()
527                .map(|(currency, amount)| (currency, Money::new(amount, currency)))
528                .collect(),
529        )
530    }
531
532    #[must_use]
533    pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
534        if let Some(pnl) = self
535            .inner
536            .borrow()
537            .unrealized_pnls
538            .get(instrument_id)
539            .copied()
540        {
541            return Some(pnl);
542        }
543
544        let pnl = self.calculate_unrealized_pnl(instrument_id)?;
545        self.inner
546            .borrow_mut()
547            .unrealized_pnls
548            .insert(*instrument_id, pnl);
549        Some(pnl)
550    }
551
552    #[must_use]
553    pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
554        if let Some(pnl) = self
555            .inner
556            .borrow()
557            .realized_pnls
558            .get(instrument_id)
559            .copied()
560        {
561            return Some(pnl);
562        }
563
564        let pnl = self.calculate_realized_pnl(instrument_id)?;
565        self.inner
566            .borrow_mut()
567            .realized_pnls
568            .insert(*instrument_id, pnl);
569        Some(pnl)
570    }
571
572    /// Returns the total PnL for the given instrument ID.
573    ///
574    /// Total PnL = Realized PnL + Unrealized PnL
575    #[must_use]
576    pub fn total_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
577        let realized = self.realized_pnl(instrument_id)?;
578        let unrealized = self.unrealized_pnl(instrument_id)?;
579
580        if realized.currency != unrealized.currency {
581            log::error!(
582                "Cannot calculate total PnL: currency mismatch {} vs {}",
583                realized.currency,
584                unrealized.currency
585            );
586            return None;
587        }
588
589        Some(Money::new(
590            realized.as_f64() + unrealized.as_f64(),
591            realized.currency,
592        ))
593    }
594
595    /// Returns the total PnLs for the given venue.
596    ///
597    /// Total PnL = Realized PnL + Unrealized PnL for each currency
598    #[must_use]
599    pub fn total_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
600        let realized_pnls = self.realized_pnls(venue);
601        let unrealized_pnls = self.unrealized_pnls(venue);
602
603        let mut total_pnls: AHashMap<Currency, Money> = AHashMap::new();
604
605        // Add realized PnLs
606        for (currency, realized) in realized_pnls {
607            total_pnls.insert(currency, realized);
608        }
609
610        // Add unrealized PnLs
611        for (currency, unrealized) in unrealized_pnls {
612            match total_pnls.get_mut(&currency) {
613                Some(total) => {
614                    *total = Money::new(total.as_f64() + unrealized.as_f64(), currency);
615                }
616                None => {
617                    total_pnls.insert(currency, unrealized);
618                }
619            }
620        }
621
622        total_pnls
623    }
624
625    #[must_use]
626    pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
627        let cache = self.cache.borrow();
628        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
629            account
630        } else {
631            log::error!(
632                "Cannot calculate net exposure: no account registered for {}",
633                instrument_id.venue
634            );
635            return None;
636        };
637
638        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
639            instrument
640        } else {
641            log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
642            return None;
643        };
644
645        let positions_open = cache.positions_open(
646            None, // Faster query filtering
647            Some(instrument_id),
648            None,
649            None,
650        );
651
652        if positions_open.is_empty() {
653            return Some(Money::new(0.0, instrument.settlement_currency()));
654        }
655
656        let mut net_exposure = 0.0;
657
658        for position in positions_open {
659            let price = self.get_price(position)?;
660            let xrate = if let Some(xrate) =
661                self.calculate_xrate_to_base(instrument, account, position.entry)
662            {
663                xrate
664            } else {
665                log::error!(
666                    // TODO: Improve logging
667                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
668                    instrument.settlement_currency(),
669                    account.base_currency()
670                );
671                return None; // Cannot calculate
672            };
673
674            let notional_value =
675                instrument.calculate_notional_value(position.quantity, price, None);
676            net_exposure += notional_value.as_f64() * xrate;
677        }
678
679        let settlement_currency = account
680            .base_currency()
681            .unwrap_or_else(|| instrument.settlement_currency());
682
683        Some(Money::new(net_exposure, settlement_currency))
684    }
685
686    #[must_use]
687    pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
688        self.inner
689            .borrow()
690            .net_positions
691            .get(instrument_id)
692            .copied()
693            .unwrap_or(Decimal::ZERO)
694    }
695
696    #[must_use]
697    pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
698        self.inner
699            .borrow()
700            .net_positions
701            .get(instrument_id)
702            .copied()
703            .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
704    }
705
706    #[must_use]
707    pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
708        self.inner
709            .borrow()
710            .net_positions
711            .get(instrument_id)
712            .copied()
713            .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
714    }
715
716    #[must_use]
717    pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
718        self.inner
719            .borrow()
720            .net_positions
721            .get(instrument_id)
722            .copied()
723            .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
724    }
725
726    #[must_use]
727    pub fn is_completely_flat(&self) -> bool {
728        for net_position in self.inner.borrow().net_positions.values() {
729            if *net_position != Decimal::ZERO {
730                return false;
731            }
732        }
733        true
734    }
735
736    // -- COMMANDS --------------------------------------------------------------------------------
737
738    /// Initializes account margin based on existing open orders.
739    ///
740    /// # Panics
741    ///
742    /// Panics if updating the cache with a mutated account fails.
743    pub fn initialize_orders(&mut self) {
744        let mut initialized = true;
745        let orders_and_instruments = {
746            let cache = self.cache.borrow();
747            let all_orders_open = cache.orders_open(None, None, None, None);
748
749            let mut instruments_with_orders = Vec::new();
750            let mut instruments = AHashSet::new();
751
752            for order in &all_orders_open {
753                instruments.insert(order.instrument_id());
754            }
755
756            for instrument_id in instruments {
757                if let Some(instrument) = cache.instrument(&instrument_id) {
758                    let orders = cache
759                        .orders_open(None, Some(&instrument_id), None, None)
760                        .into_iter()
761                        .cloned()
762                        .collect::<Vec<OrderAny>>();
763                    instruments_with_orders.push((instrument.clone(), orders));
764                } else {
765                    log::error!(
766                        "Cannot update initial (order) margin: no instrument found for {instrument_id}"
767                    );
768                    initialized = false;
769                    break;
770                }
771            }
772            instruments_with_orders
773        };
774
775        for (instrument, orders_open) in &orders_and_instruments {
776            let mut cache = self.cache.borrow_mut();
777            let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
778                account
779            } else {
780                log::error!(
781                    "Cannot update initial (order) margin: no account registered for {}",
782                    instrument.id().venue
783                );
784                initialized = false;
785                break;
786            };
787
788            let result = self.inner.borrow_mut().accounts.update_orders(
789                account,
790                instrument.clone(),
791                orders_open.iter().collect(),
792                self.clock.borrow().timestamp_ns(),
793            );
794
795            match result {
796                Some((updated_account, _)) => {
797                    cache.update_account(updated_account).unwrap();
798                }
799                None => {
800                    initialized = false;
801                }
802            }
803        }
804
805        let total_orders = orders_and_instruments
806            .into_iter()
807            .map(|(_, orders)| orders.len())
808            .sum::<usize>();
809
810        log::info!(
811            color = if total_orders > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
812            "Initialized {} open order{}",
813            total_orders,
814            if total_orders == 1 { "" } else { "s" }
815        );
816
817        self.inner.borrow_mut().initialized = initialized;
818    }
819
820    /// Initializes account margin based on existing open positions.
821    ///
822    /// # Panics
823    ///
824    /// Panics if calculation of PnL or updating the cache with a mutated account fails.
825    pub fn initialize_positions(&mut self) {
826        self.inner.borrow_mut().unrealized_pnls.clear();
827        self.inner.borrow_mut().realized_pnls.clear();
828        let all_positions_open: Vec<Position>;
829        let mut instruments = AHashSet::new();
830        {
831            let cache = self.cache.borrow();
832            all_positions_open = cache
833                .positions_open(None, None, None, None)
834                .into_iter()
835                .cloned()
836                .collect();
837            for position in &all_positions_open {
838                instruments.insert(position.instrument_id);
839            }
840        }
841
842        let mut initialized = true;
843
844        for instrument_id in instruments {
845            let positions_open: Vec<Position> = {
846                let cache = self.cache.borrow();
847                cache
848                    .positions_open(None, Some(&instrument_id), None, None)
849                    .into_iter()
850                    .cloned()
851                    .collect()
852            };
853
854            self.update_net_position(&instrument_id, positions_open);
855
856            if let Some(calculated_unrealized_pnl) = self.calculate_unrealized_pnl(&instrument_id) {
857                self.inner
858                    .borrow_mut()
859                    .unrealized_pnls
860                    .insert(instrument_id, calculated_unrealized_pnl);
861            } else {
862                log::warn!(
863                    "Failed to calculate unrealized PnL for {instrument_id}, marking as pending"
864                );
865                self.inner.borrow_mut().pending_calcs.insert(instrument_id);
866            }
867
868            if let Some(calculated_realized_pnl) = self.calculate_realized_pnl(&instrument_id) {
869                self.inner
870                    .borrow_mut()
871                    .realized_pnls
872                    .insert(instrument_id, calculated_realized_pnl);
873            } else {
874                log::warn!(
875                    "Failed to calculate realized PnL for {instrument_id}, marking as pending"
876                );
877                self.inner.borrow_mut().pending_calcs.insert(instrument_id);
878            }
879
880            let cache = self.cache.borrow();
881            let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
882                account
883            } else {
884                log::error!(
885                    "Cannot update maintenance (position) margin: no account registered for {}",
886                    instrument_id.venue
887                );
888                initialized = false;
889                break;
890            };
891
892            let account = match account {
893                AccountAny::Cash(_) => continue,
894                AccountAny::Margin(margin_account) => margin_account,
895            };
896
897            let mut cache = self.cache.borrow_mut();
898            let instrument = if let Some(instrument) = cache.instrument(&instrument_id) {
899                instrument
900            } else {
901                log::error!(
902                    "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
903                );
904                initialized = false;
905                break;
906            };
907
908            let result = self.inner.borrow_mut().accounts.update_positions(
909                account,
910                instrument.clone(),
911                self.cache
912                    .borrow()
913                    .positions_open(None, Some(&instrument_id), None, None),
914                self.clock.borrow().timestamp_ns(),
915            );
916
917            match result {
918                Some((updated_account, _)) => {
919                    cache
920                        .update_account(AccountAny::Margin(updated_account))
921                        .unwrap();
922                }
923                None => {
924                    initialized = false;
925                }
926            }
927        }
928
929        let open_count = all_positions_open.len();
930        self.inner.borrow_mut().initialized = initialized;
931        log::info!(
932            color = if open_count > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
933            "Initialized {} open position{}",
934            open_count,
935            if open_count == 1 { "" } else { "s" }
936        );
937    }
938
939    /// Updates portfolio calculations based on a new quote tick.
940    ///
941    /// Recalculates unrealized PnL for positions affected by the quote update.
942    pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
943        update_quote_tick(
944            self.cache.clone(),
945            self.clock.clone(),
946            self.inner.clone(),
947            self.config.clone(),
948            quote,
949        );
950    }
951
952    /// Updates portfolio calculations based on a new bar.
953    ///
954    /// Updates cached bar close prices and recalculates unrealized PnL.
955    pub fn update_bar(&mut self, bar: &Bar) {
956        update_bar(
957            self.cache.clone(),
958            self.clock.clone(),
959            self.inner.clone(),
960            self.config.clone(),
961            bar,
962        );
963    }
964
965    /// Updates portfolio with a new account state event.
966    pub fn update_account(&mut self, event: &AccountState) {
967        update_account(self.cache.clone(), self.inner.clone(), event);
968    }
969
970    /// Updates portfolio calculations based on an order event.
971    ///
972    /// Handles balance updates for order fills and margin calculations for order changes.
973    pub fn update_order(&mut self, event: &OrderEventAny) {
974        update_order(
975            self.cache.clone(),
976            self.clock.clone(),
977            self.inner.clone(),
978            self.config.clone(),
979            event,
980        );
981    }
982
983    /// Updates portfolio calculations based on a position event.
984    ///
985    /// Recalculates net positions, unrealized PnL, and margin requirements.
986    pub fn update_position(&mut self, event: &PositionEvent) {
987        update_position(
988            self.cache.clone(),
989            self.clock.clone(),
990            self.inner.clone(),
991            self.config.clone(),
992            event,
993        );
994    }
995
996    // -- INTERNAL --------------------------------------------------------------------------------
997
998    fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
999        let mut net_position = Decimal::ZERO;
1000
1001        for open_position in positions_open {
1002            log::debug!("open_position: {open_position}");
1003            net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
1004        }
1005
1006        let existing_position = self.net_position(instrument_id);
1007        if existing_position != net_position {
1008            self.inner
1009                .borrow_mut()
1010                .net_positions
1011                .insert(*instrument_id, net_position);
1012            log::info!("{instrument_id} net_position={net_position}");
1013        }
1014    }
1015
1016    fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1017        let cache = self.cache.borrow();
1018        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1019            account
1020        } else {
1021            log::error!(
1022                "Cannot calculate unrealized PnL: no account registered for {}",
1023                instrument_id.venue
1024            );
1025            return None;
1026        };
1027
1028        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1029            instrument
1030        } else {
1031            log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
1032            return None;
1033        };
1034
1035        let currency = account
1036            .base_currency()
1037            .unwrap_or_else(|| instrument.settlement_currency());
1038
1039        let positions_open = cache.positions_open(
1040            None, // Faster query filtering
1041            Some(instrument_id),
1042            None,
1043            None,
1044        );
1045
1046        if positions_open.is_empty() {
1047            return Some(Money::new(0.0, currency));
1048        }
1049
1050        let mut total_pnl = 0.0;
1051
1052        for position in positions_open {
1053            if position.instrument_id != *instrument_id {
1054                continue; // Nothing to calculate
1055            }
1056
1057            if position.side == PositionSide::Flat {
1058                continue; // Nothing to calculate
1059            }
1060
1061            let price = if let Some(price) = self.get_price(position) {
1062                price
1063            } else {
1064                log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
1065                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1066                return None; // Cannot calculate
1067            };
1068
1069            let mut pnl = position.unrealized_pnl(price).as_f64();
1070
1071            if let Some(base_currency) = account.base_currency() {
1072                let xrate = if let Some(xrate) =
1073                    self.calculate_xrate_to_base(instrument, account, position.entry)
1074                {
1075                    xrate
1076                } else {
1077                    log::error!(
1078                        // TODO: Improve logging
1079                        "Cannot calculate unrealized PnL: insufficient data for {}/{}",
1080                        instrument.settlement_currency(),
1081                        base_currency
1082                    );
1083                    self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1084                    return None; // Cannot calculate
1085                };
1086
1087                let scale = 10f64.powi(currency.precision.into());
1088                pnl = ((pnl * xrate) * scale).round() / scale;
1089            }
1090
1091            total_pnl += pnl;
1092        }
1093
1094        Some(Money::new(total_pnl, currency))
1095    }
1096
1097    fn ensure_snapshot_pnls_cached_for(&mut self, instrument_id: &InstrumentId) {
1098        // Performance: This method maintains an incremental cache of snapshot PnLs
1099        // It only deserializes new snapshots that haven't been processed yet
1100        // Tracks sum and last PnL per position for efficient NETTING OMS support
1101
1102        // Get all position IDs that have snapshots for this instrument
1103        let snapshot_position_ids = self.cache.borrow().position_snapshot_ids(instrument_id);
1104
1105        if snapshot_position_ids.is_empty() {
1106            return; // Nothing to process
1107        }
1108
1109        let mut rebuild = false;
1110
1111        // Detect purge/reset (count regression) to trigger full rebuild
1112        for position_id in &snapshot_position_ids {
1113            let position_snapshots = self.cache.borrow().position_snapshot_bytes(position_id);
1114            let curr_count = position_snapshots.map_or(0, |s| {
1115                // Count the number of snapshots (they're serialized as JSON objects)
1116                s.split(|&b| b == b'{').count() - 1
1117            });
1118            let prev_count = self
1119                .inner
1120                .borrow()
1121                .snapshot_processed_counts
1122                .get(position_id)
1123                .copied()
1124                .unwrap_or(0);
1125
1126            if prev_count > curr_count {
1127                rebuild = true;
1128                break;
1129            }
1130        }
1131
1132        if rebuild {
1133            // Full rebuild: process all snapshots from scratch
1134            for position_id in &snapshot_position_ids {
1135                if let Some(position_snapshots) =
1136                    self.cache.borrow().position_snapshot_bytes(position_id)
1137                {
1138                    let mut sum_pnl: Option<Money> = None;
1139                    let mut last_pnl: Option<Money> = None;
1140
1141                    // Snapshots are concatenated JSON objects
1142                    let mut start = 0;
1143                    let mut depth = 0;
1144                    let mut in_string = false;
1145                    let mut escape_next = false;
1146
1147                    for (i, &byte) in position_snapshots.iter().enumerate() {
1148                        if escape_next {
1149                            escape_next = false;
1150                            continue;
1151                        }
1152
1153                        if byte == b'\\' && in_string {
1154                            escape_next = true;
1155                            continue;
1156                        }
1157
1158                        if byte == b'"' && !escape_next {
1159                            in_string = !in_string;
1160                        }
1161
1162                        if !in_string {
1163                            if byte == b'{' {
1164                                if depth == 0 {
1165                                    start = i;
1166                                }
1167                                depth += 1;
1168                            } else if byte == b'}' {
1169                                depth -= 1;
1170                                if depth == 0
1171                                    && let Ok(snapshot) = serde_json::from_slice::<Position>(
1172                                        &position_snapshots[start..=i],
1173                                    )
1174                                    && let Some(realized_pnl) = snapshot.realized_pnl
1175                                {
1176                                    if let Some(ref mut sum) = sum_pnl {
1177                                        if sum.currency == realized_pnl.currency {
1178                                            *sum = Money::new(
1179                                                sum.as_f64() + realized_pnl.as_f64(),
1180                                                sum.currency,
1181                                            );
1182                                        }
1183                                    } else {
1184                                        sum_pnl = Some(realized_pnl);
1185                                    }
1186                                    last_pnl = Some(realized_pnl);
1187                                }
1188                            }
1189                        }
1190                    }
1191
1192                    let mut inner = self.inner.borrow_mut();
1193                    if let Some(sum) = sum_pnl {
1194                        inner.snapshot_sum_per_position.insert(*position_id, sum);
1195                        if let Some(last) = last_pnl {
1196                            inner.snapshot_last_per_position.insert(*position_id, last);
1197                        }
1198                    } else {
1199                        inner.snapshot_sum_per_position.remove(position_id);
1200                        inner.snapshot_last_per_position.remove(position_id);
1201                    }
1202
1203                    let snapshot_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1204                    inner
1205                        .snapshot_processed_counts
1206                        .insert(*position_id, snapshot_count);
1207                }
1208            }
1209        } else {
1210            // Incremental path: only process new snapshots
1211            for position_id in &snapshot_position_ids {
1212                if let Some(position_snapshots) =
1213                    self.cache.borrow().position_snapshot_bytes(position_id)
1214                {
1215                    let curr_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1216                    let prev_count = self
1217                        .inner
1218                        .borrow()
1219                        .snapshot_processed_counts
1220                        .get(position_id)
1221                        .copied()
1222                        .unwrap_or(0);
1223
1224                    if prev_count >= curr_count {
1225                        continue;
1226                    }
1227
1228                    let mut sum_pnl = self
1229                        .inner
1230                        .borrow()
1231                        .snapshot_sum_per_position
1232                        .get(position_id)
1233                        .copied();
1234                    let mut last_pnl = self
1235                        .inner
1236                        .borrow()
1237                        .snapshot_last_per_position
1238                        .get(position_id)
1239                        .copied();
1240
1241                    // Process only new snapshots
1242                    let mut start = 0;
1243                    let mut depth = 0;
1244                    let mut in_string = false;
1245                    let mut escape_next = false;
1246                    let mut snapshot_index = 0;
1247
1248                    for (i, &byte) in position_snapshots.iter().enumerate() {
1249                        if escape_next {
1250                            escape_next = false;
1251                            continue;
1252                        }
1253
1254                        if byte == b'\\' && in_string {
1255                            escape_next = true;
1256                            continue;
1257                        }
1258
1259                        if byte == b'"' && !escape_next {
1260                            in_string = !in_string;
1261                        }
1262
1263                        if !in_string {
1264                            if byte == b'{' {
1265                                if depth == 0 {
1266                                    start = i;
1267                                }
1268                                depth += 1;
1269                            } else if byte == b'}' {
1270                                depth -= 1;
1271                                if depth == 0 {
1272                                    snapshot_index += 1;
1273                                    // Only process new snapshots
1274                                    if snapshot_index > prev_count
1275                                        && let Ok(snapshot) = serde_json::from_slice::<Position>(
1276                                            &position_snapshots[start..=i],
1277                                        )
1278                                        && let Some(realized_pnl) = snapshot.realized_pnl
1279                                    {
1280                                        if let Some(ref mut sum) = sum_pnl {
1281                                            if sum.currency == realized_pnl.currency {
1282                                                *sum = Money::new(
1283                                                    sum.as_f64() + realized_pnl.as_f64(),
1284                                                    sum.currency,
1285                                                );
1286                                            }
1287                                        } else {
1288                                            sum_pnl = Some(realized_pnl);
1289                                        }
1290                                        last_pnl = Some(realized_pnl);
1291                                    }
1292                                }
1293                            }
1294                        }
1295                    }
1296
1297                    let mut inner = self.inner.borrow_mut();
1298                    if let Some(sum) = sum_pnl {
1299                        inner.snapshot_sum_per_position.insert(*position_id, sum);
1300                        if let Some(last) = last_pnl {
1301                            inner.snapshot_last_per_position.insert(*position_id, last);
1302                        }
1303                    }
1304                    inner
1305                        .snapshot_processed_counts
1306                        .insert(*position_id, curr_count);
1307                }
1308            }
1309        }
1310    }
1311
1312    fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1313        // Ensure snapshot PnLs are cached for this instrument
1314        self.ensure_snapshot_pnls_cached_for(instrument_id);
1315
1316        let cache = self.cache.borrow();
1317        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1318            account
1319        } else {
1320            log::error!(
1321                "Cannot calculate realized PnL: no account registered for {}",
1322                instrument_id.venue
1323            );
1324            return None;
1325        };
1326
1327        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1328            instrument
1329        } else {
1330            log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
1331            return None;
1332        };
1333
1334        let currency = account
1335            .base_currency()
1336            .unwrap_or_else(|| instrument.settlement_currency());
1337
1338        let positions = cache.positions(
1339            None, // Faster query filtering
1340            Some(instrument_id),
1341            None,
1342            None,
1343        );
1344
1345        let snapshot_position_ids = cache.position_snapshot_ids(instrument_id);
1346
1347        // Check if we need to use NETTING OMS logic
1348        let is_netting = positions
1349            .iter()
1350            .any(|p| cache.oms_type(&p.id) == Some(OmsType::Netting));
1351
1352        let mut total_pnl = 0.0;
1353
1354        if is_netting && !snapshot_position_ids.is_empty() {
1355            // NETTING OMS: Apply 3-case rule for position cycles
1356
1357            for position_id in &snapshot_position_ids {
1358                let is_active = positions.iter().any(|p| p.id == *position_id);
1359
1360                if is_active {
1361                    // Case 1 & 2: Active position - use only the last snapshot PnL
1362                    let last_pnl = self
1363                        .inner
1364                        .borrow()
1365                        .snapshot_last_per_position
1366                        .get(position_id)
1367                        .copied();
1368                    if let Some(last_pnl) = last_pnl {
1369                        let mut pnl = last_pnl.as_f64();
1370
1371                        if let Some(base_currency) = account.base_currency()
1372                            && let Some(position) = positions.iter().find(|p| p.id == *position_id)
1373                        {
1374                            let xrate = if let Some(xrate) =
1375                                self.calculate_xrate_to_base(instrument, account, position.entry)
1376                            {
1377                                xrate
1378                            } else {
1379                                log::error!(
1380                                    "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1381                                    instrument.settlement_currency(),
1382                                    base_currency
1383                                );
1384                                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1385                                return Some(Money::new(0.0, currency));
1386                            };
1387
1388                            let scale = 10f64.powi(currency.precision.into());
1389                            pnl = ((pnl * xrate) * scale).round() / scale;
1390                        }
1391
1392                        total_pnl += pnl;
1393                    }
1394                } else {
1395                    // Case 3: Closed position - use sum of all snapshot PnLs
1396                    let sum_pnl = self
1397                        .inner
1398                        .borrow()
1399                        .snapshot_sum_per_position
1400                        .get(position_id)
1401                        .copied();
1402                    if let Some(sum_pnl) = sum_pnl {
1403                        let mut pnl = sum_pnl.as_f64();
1404
1405                        if let Some(base_currency) = account.base_currency() {
1406                            // For closed positions, we don't have entry price, use current rates
1407                            let xrate = cache.get_xrate(
1408                                instrument_id.venue,
1409                                instrument.settlement_currency(),
1410                                base_currency,
1411                                PriceType::Mid,
1412                            );
1413
1414                            if let Some(xrate) = xrate {
1415                                let scale = 10f64.powi(currency.precision.into());
1416                                pnl = ((pnl * xrate) * scale).round() / scale;
1417                            } else {
1418                                log::error!(
1419                                    "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1420                                    instrument.settlement_currency(),
1421                                    base_currency
1422                                );
1423                                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1424                                return Some(Money::new(0.0, currency));
1425                            }
1426                        }
1427
1428                        total_pnl += pnl;
1429                    }
1430                }
1431            }
1432
1433            // Add realized PnL from current active positions
1434            for position in positions {
1435                if position.instrument_id != *instrument_id {
1436                    continue;
1437                }
1438
1439                if let Some(realized_pnl) = position.realized_pnl {
1440                    let mut pnl = realized_pnl.as_f64();
1441
1442                    if let Some(base_currency) = account.base_currency() {
1443                        let xrate = if let Some(xrate) =
1444                            self.calculate_xrate_to_base(instrument, account, position.entry)
1445                        {
1446                            xrate
1447                        } else {
1448                            log::error!(
1449                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1450                                instrument.settlement_currency(),
1451                                base_currency
1452                            );
1453                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1454                            return Some(Money::new(0.0, currency));
1455                        };
1456
1457                        let scale = 10f64.powi(currency.precision.into());
1458                        pnl = ((pnl * xrate) * scale).round() / scale;
1459                    }
1460
1461                    total_pnl += pnl;
1462                }
1463            }
1464        } else {
1465            // HEDGING OMS or no snapshots: Simple aggregation
1466            // Add snapshot PnLs (sum all)
1467            for position_id in &snapshot_position_ids {
1468                let sum_pnl = self
1469                    .inner
1470                    .borrow()
1471                    .snapshot_sum_per_position
1472                    .get(position_id)
1473                    .copied();
1474                if let Some(sum_pnl) = sum_pnl {
1475                    let mut pnl = sum_pnl.as_f64();
1476
1477                    if let Some(base_currency) = account.base_currency() {
1478                        let xrate = cache.get_xrate(
1479                            instrument_id.venue,
1480                            instrument.settlement_currency(),
1481                            base_currency,
1482                            PriceType::Mid,
1483                        );
1484
1485                        if let Some(xrate) = xrate {
1486                            let scale = 10f64.powi(currency.precision.into());
1487                            pnl = ((pnl * xrate) * scale).round() / scale;
1488                        } else {
1489                            log::error!(
1490                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1491                                instrument.settlement_currency(),
1492                                base_currency
1493                            );
1494                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1495                            return Some(Money::new(0.0, currency));
1496                        }
1497                    }
1498
1499                    total_pnl += pnl;
1500                }
1501            }
1502
1503            // Add realized PnL from current positions
1504            for position in positions {
1505                if position.instrument_id != *instrument_id {
1506                    continue;
1507                }
1508
1509                if let Some(realized_pnl) = position.realized_pnl {
1510                    let mut pnl = realized_pnl.as_f64();
1511
1512                    if let Some(base_currency) = account.base_currency() {
1513                        let xrate = if let Some(xrate) =
1514                            self.calculate_xrate_to_base(instrument, account, position.entry)
1515                        {
1516                            xrate
1517                        } else {
1518                            log::error!(
1519                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1520                                instrument.settlement_currency(),
1521                                base_currency
1522                            );
1523                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1524                            return Some(Money::new(0.0, currency));
1525                        };
1526
1527                        let scale = 10f64.powi(currency.precision.into());
1528                        pnl = ((pnl * xrate) * scale).round() / scale;
1529                    }
1530
1531                    total_pnl += pnl;
1532                }
1533            }
1534        }
1535
1536        Some(Money::new(total_pnl, currency))
1537    }
1538
1539    fn get_price(&self, position: &Position) -> Option<Price> {
1540        let cache = self.cache.borrow();
1541        let instrument_id = &position.instrument_id;
1542
1543        // Check for mark price first if configured
1544        if self.config.use_mark_prices
1545            && let Some(mark_price) = cache.mark_price(instrument_id)
1546        {
1547            return Some(mark_price.value);
1548        }
1549
1550        // Fall back to bid/ask based on position side
1551        let price_type = match position.side {
1552            PositionSide::Long => PriceType::Bid,
1553            PositionSide::Short => PriceType::Ask,
1554            _ => panic!("invalid `PositionSide`, was {}", position.side),
1555        };
1556
1557        cache
1558            .price(instrument_id, price_type)
1559            .or_else(|| cache.price(instrument_id, PriceType::Last))
1560            .or_else(|| {
1561                self.inner
1562                    .borrow()
1563                    .bar_close_prices
1564                    .get(instrument_id)
1565                    .copied()
1566            })
1567    }
1568
1569    fn calculate_xrate_to_base(
1570        &self,
1571        instrument: &InstrumentAny,
1572        account: &AccountAny,
1573        side: OrderSide,
1574    ) -> Option<f64> {
1575        if !self.config.convert_to_account_base_currency {
1576            return Some(1.0); // No conversion needed
1577        }
1578
1579        match account.base_currency() {
1580            None => Some(1.0), // No conversion needed
1581            Some(base_currency) => {
1582                let cache = self.cache.borrow();
1583
1584                if self.config.use_mark_xrates {
1585                    return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1586                }
1587
1588                let price_type = if side == OrderSide::Buy {
1589                    PriceType::Bid
1590                } else {
1591                    PriceType::Ask
1592                };
1593
1594                cache.get_xrate(
1595                    instrument.id().venue,
1596                    instrument.settlement_currency(),
1597                    base_currency,
1598                    price_type,
1599                )
1600            }
1601        }
1602    }
1603}
1604
1605// Helper functions
1606fn update_quote_tick(
1607    cache: Rc<RefCell<Cache>>,
1608    clock: Rc<RefCell<dyn Clock>>,
1609    inner: Rc<RefCell<PortfolioState>>,
1610    config: PortfolioConfig,
1611    quote: &QuoteTick,
1612) {
1613    update_instrument_id(cache, clock.clone(), inner, config, &quote.instrument_id);
1614}
1615
1616fn update_bar(
1617    cache: Rc<RefCell<Cache>>,
1618    clock: Rc<RefCell<dyn Clock>>,
1619    inner: Rc<RefCell<PortfolioState>>,
1620    config: PortfolioConfig,
1621    bar: &Bar,
1622) {
1623    let instrument_id = bar.bar_type.instrument_id();
1624    inner
1625        .borrow_mut()
1626        .bar_close_prices
1627        .insert(instrument_id, bar.close);
1628    update_instrument_id(cache, clock.clone(), inner, config, &instrument_id);
1629}
1630
1631fn update_instrument_id(
1632    cache: Rc<RefCell<Cache>>,
1633    clock: Rc<RefCell<dyn Clock>>,
1634    inner: Rc<RefCell<PortfolioState>>,
1635    config: PortfolioConfig,
1636    instrument_id: &InstrumentId,
1637) {
1638    inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1639
1640    if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1641        return;
1642    }
1643
1644    let result_init;
1645    let mut result_maint = None;
1646
1647    let account = {
1648        let account = {
1649            let cache_ref = cache.borrow();
1650            if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1651                account.clone()
1652            } else {
1653                log::error!(
1654                    "Cannot update tick: no account registered for {}",
1655                    instrument_id.venue
1656                );
1657                return;
1658            }
1659        };
1660
1661        let mut cache_ref = cache.borrow_mut();
1662        let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1663            instrument.clone()
1664        } else {
1665            log::error!("Cannot update tick: no instrument found for {instrument_id}");
1666            return;
1667        };
1668
1669        // Clone the orders and positions to own the data
1670        let orders_open: Vec<OrderAny> = cache_ref
1671            .orders_open(None, Some(instrument_id), None, None)
1672            .iter()
1673            .map(|o| (*o).clone())
1674            .collect();
1675
1676        let positions_open: Vec<Position> = cache_ref
1677            .positions_open(None, Some(instrument_id), None, None)
1678            .iter()
1679            .map(|p| (*p).clone())
1680            .collect();
1681
1682        result_init = inner.borrow().accounts.update_orders(
1683            &account,
1684            instrument.clone(),
1685            orders_open.iter().collect(),
1686            clock.borrow().timestamp_ns(),
1687        );
1688
1689        if let AccountAny::Margin(ref margin_account) = account {
1690            result_maint = inner.borrow().accounts.update_positions(
1691                margin_account,
1692                instrument,
1693                positions_open.iter().collect(),
1694                clock.borrow().timestamp_ns(),
1695            );
1696        }
1697
1698        if let Some((ref updated_account, _)) = result_init {
1699            cache_ref.update_account(updated_account.clone()).unwrap();
1700        }
1701        account
1702    };
1703
1704    let mut portfolio_clone = Portfolio {
1705        clock: clock.clone(),
1706        cache,
1707        inner: inner.clone(),
1708        config,
1709    };
1710
1711    let result_unrealized_pnl: Option<Money> =
1712        portfolio_clone.calculate_unrealized_pnl(instrument_id);
1713
1714    if result_init.is_some()
1715        && (matches!(account, AccountAny::Cash(_))
1716            || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1717    {
1718        inner.borrow_mut().pending_calcs.remove(instrument_id);
1719        if inner.borrow().pending_calcs.is_empty() {
1720            inner.borrow_mut().initialized = true;
1721        }
1722    }
1723}
1724
1725fn update_order(
1726    cache: Rc<RefCell<Cache>>,
1727    clock: Rc<RefCell<dyn Clock>>,
1728    inner: Rc<RefCell<PortfolioState>>,
1729    _config: PortfolioConfig,
1730    event: &OrderEventAny,
1731) {
1732    let cache_ref = cache.borrow();
1733    let account_id = match event.account_id() {
1734        Some(account_id) => account_id,
1735        None => {
1736            return; // No Account Assigned
1737        }
1738    };
1739
1740    let account = if let Some(account) = cache_ref.account(&account_id) {
1741        account
1742    } else {
1743        log::error!("Cannot update order: no account registered for {account_id}");
1744        return;
1745    };
1746
1747    match account {
1748        AccountAny::Cash(cash_account) => {
1749            if !cash_account.base.calculate_account_state {
1750                return;
1751            }
1752        }
1753        AccountAny::Margin(margin_account) => {
1754            if !margin_account.base.calculate_account_state {
1755                return;
1756            }
1757        }
1758    }
1759
1760    match event {
1761        OrderEventAny::Accepted(_)
1762        | OrderEventAny::Canceled(_)
1763        | OrderEventAny::Rejected(_)
1764        | OrderEventAny::Updated(_)
1765        | OrderEventAny::Filled(_) => {}
1766        _ => {
1767            return;
1768        }
1769    }
1770
1771    let cache_ref = cache.borrow();
1772    let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1773        order
1774    } else {
1775        log::error!(
1776            "Cannot update order: {} not found in the cache",
1777            event.client_order_id()
1778        );
1779        return; // No Order Found
1780    };
1781
1782    if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1783        return; // No change to account state
1784    }
1785
1786    let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1787        instrument_id
1788    } else {
1789        log::error!(
1790            "Cannot update order: no instrument found for {}",
1791            event.instrument_id()
1792        );
1793        return;
1794    };
1795
1796    if let OrderEventAny::Filled(order_filled) = event {
1797        let _ = inner.borrow().accounts.update_balances(
1798            account.clone(),
1799            instrument.clone(),
1800            *order_filled,
1801        );
1802
1803        let mut portfolio_clone = Portfolio {
1804            clock: clock.clone(),
1805            cache: cache.clone(),
1806            inner: inner.clone(),
1807            config: PortfolioConfig::default(), // TODO: TBD
1808        };
1809
1810        match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1811            Some(unrealized_pnl) => {
1812                inner
1813                    .borrow_mut()
1814                    .unrealized_pnls
1815                    .insert(event.instrument_id(), unrealized_pnl);
1816            }
1817            None => {
1818                log::error!(
1819                    "Failed to calculate unrealized PnL for instrument {}",
1820                    event.instrument_id()
1821                );
1822            }
1823        }
1824    }
1825
1826    let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None);
1827
1828    let account_state = inner.borrow_mut().accounts.update_orders(
1829        account,
1830        instrument.clone(),
1831        orders_open,
1832        clock.borrow().timestamp_ns(),
1833    );
1834
1835    let mut cache_ref = cache.borrow_mut();
1836    cache_ref.update_account(account.clone()).unwrap();
1837
1838    if let Some(account_state) = account_state {
1839        msgbus::publish(
1840            format!("events.account.{}", account.id()).into(),
1841            &account_state,
1842        );
1843    } else {
1844        log::debug!("Added pending calculation for {}", instrument.id());
1845        inner.borrow_mut().pending_calcs.insert(instrument.id());
1846    }
1847
1848    log::debug!("Updated {event}");
1849}
1850
1851fn update_position(
1852    cache: Rc<RefCell<Cache>>,
1853    clock: Rc<RefCell<dyn Clock>>,
1854    inner: Rc<RefCell<PortfolioState>>,
1855    _config: PortfolioConfig,
1856    event: &PositionEvent,
1857) {
1858    let instrument_id = event.instrument_id();
1859
1860    let positions_open: Vec<Position> = {
1861        let cache_ref = cache.borrow();
1862
1863        cache_ref
1864            .positions_open(None, Some(&instrument_id), None, None)
1865            .iter()
1866            .map(|o| (*o).clone())
1867            .collect()
1868    };
1869
1870    log::debug!("position fresh from cache -> {positions_open:?}");
1871
1872    let mut portfolio_clone = Portfolio {
1873        clock: clock.clone(),
1874        cache: cache.clone(),
1875        inner: inner.clone(),
1876        config: PortfolioConfig::default(), // TODO: TBD
1877    };
1878
1879    portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1880
1881    if let Some(calculated_unrealized_pnl) =
1882        portfolio_clone.calculate_unrealized_pnl(&instrument_id)
1883    {
1884        inner
1885            .borrow_mut()
1886            .unrealized_pnls
1887            .insert(event.instrument_id(), calculated_unrealized_pnl);
1888    } else {
1889        log::warn!(
1890            "Failed to calculate unrealized PnL for {}, marking as pending",
1891            event.instrument_id()
1892        );
1893        inner
1894            .borrow_mut()
1895            .pending_calcs
1896            .insert(event.instrument_id());
1897    }
1898
1899    if let Some(calculated_realized_pnl) = portfolio_clone.calculate_realized_pnl(&instrument_id) {
1900        inner
1901            .borrow_mut()
1902            .realized_pnls
1903            .insert(event.instrument_id(), calculated_realized_pnl);
1904    } else {
1905        log::warn!(
1906            "Failed to calculate realized PnL for {}, marking as pending",
1907            event.instrument_id()
1908        );
1909        inner
1910            .borrow_mut()
1911            .pending_calcs
1912            .insert(event.instrument_id());
1913    }
1914
1915    let cache_ref = cache.borrow();
1916    let account = cache_ref.account(&event.account_id());
1917
1918    if let Some(AccountAny::Margin(margin_account)) = account {
1919        if !margin_account.calculate_account_state {
1920            return; // Nothing to calculate
1921        }
1922
1923        let cache_ref = cache.borrow();
1924        let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1925            instrument
1926        } else {
1927            log::error!("Cannot update position: no instrument found for {instrument_id}");
1928            return;
1929        };
1930
1931        let result = inner.borrow_mut().accounts.update_positions(
1932            margin_account,
1933            instrument.clone(),
1934            positions_open.iter().collect(),
1935            clock.borrow().timestamp_ns(),
1936        );
1937        let mut cache_ref = cache.borrow_mut();
1938        if let Some((margin_account, _)) = result {
1939            cache_ref
1940                .update_account(AccountAny::Margin(margin_account))
1941                .unwrap();
1942        }
1943    } else if account.is_none() {
1944        log::error!(
1945            "Cannot update position: no account registered for {}",
1946            event.account_id()
1947        );
1948    }
1949}
1950
1951fn update_account(
1952    cache: Rc<RefCell<Cache>>,
1953    inner: Rc<RefCell<PortfolioState>>,
1954    event: &AccountState,
1955) {
1956    let mut cache_ref = cache.borrow_mut();
1957
1958    if let Some(existing) = cache_ref.account(&event.account_id) {
1959        let mut account = existing.clone();
1960        account.apply(event.clone());
1961
1962        if let Err(e) = cache_ref.update_account(account.clone()) {
1963            log::error!("Failed to update account: {e}");
1964            return;
1965        }
1966    } else {
1967        let account = match AccountAny::from_events(vec![event.clone()]) {
1968            Ok(account) => account,
1969            Err(e) => {
1970                log::error!("Failed to create account: {e}");
1971                return;
1972            }
1973        };
1974
1975        if let Err(e) = cache_ref.add_account(account) {
1976            log::error!("Failed to add account: {e}");
1977            return;
1978        }
1979    }
1980
1981    // Throttled logging logic
1982    let mut inner_ref = inner.borrow_mut();
1983    let should_log = if inner_ref.min_account_state_logging_interval_ns > 0 {
1984        let current_ts = event.ts_init.as_u64();
1985        let last_ts = inner_ref
1986            .last_account_state_log_ts
1987            .get(&event.account_id)
1988            .copied()
1989            .unwrap_or(0);
1990
1991        if last_ts == 0 || (current_ts - last_ts) >= inner_ref.min_account_state_logging_interval_ns
1992        {
1993            inner_ref
1994                .last_account_state_log_ts
1995                .insert(event.account_id, current_ts);
1996            true
1997        } else {
1998            false
1999        }
2000    } else {
2001        true // Throttling disabled, always log
2002    };
2003
2004    if should_log {
2005        log::info!("Updated {event}");
2006    }
2007}