Skip to main content

nautilus_portfolio/
portfolio.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `Portfolio` for all environments.
17
18use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_analysis::analyzer::PortfolioAnalyzer;
22use nautilus_common::{
23    cache::Cache,
24    clock::Clock,
25    enums::LogColor,
26    msgbus::{self, MessagingSwitchboard, TypedHandler},
27};
28use nautilus_core::{WeakCell, datetime::NANOSECONDS_IN_MILLISECOND};
29use nautilus_model::{
30    accounts::AccountAny,
31    data::{Bar, MarkPriceUpdate, QuoteTick},
32    enums::{OmsType, OrderSide, OrderType, PositionSide, PriceType},
33    events::{AccountState, OrderEventAny, position::PositionEvent},
34    identifiers::{AccountId, InstrumentId, PositionId, Venue},
35    instruments::{Instrument, InstrumentAny},
36    orders::{Order, OrderAny},
37    position::Position,
38    types::{Currency, Money, Price},
39};
40use rust_decimal::Decimal;
41
42use crate::{config::PortfolioConfig, manager::AccountsManager};
43
44struct PortfolioState {
45    accounts: AccountsManager,
46    analyzer: PortfolioAnalyzer,
47    unrealized_pnls: AHashMap<InstrumentId, Money>,
48    realized_pnls: AHashMap<InstrumentId, Money>,
49    snapshot_sum_per_position: AHashMap<PositionId, Money>,
50    snapshot_last_per_position: AHashMap<PositionId, Money>,
51    snapshot_processed_counts: AHashMap<PositionId, usize>,
52    net_positions: AHashMap<InstrumentId, Decimal>,
53    pending_calcs: AHashSet<InstrumentId>,
54    bar_close_prices: AHashMap<InstrumentId, Price>,
55    initialized: bool,
56    last_account_state_log_ts: AHashMap<AccountId, u64>,
57    min_account_state_logging_interval_ns: u64,
58}
59
60impl PortfolioState {
61    fn new(
62        clock: Rc<RefCell<dyn Clock>>,
63        cache: Rc<RefCell<Cache>>,
64        config: &PortfolioConfig,
65    ) -> Self {
66        let min_account_state_logging_interval_ns = config
67            .min_account_state_logging_interval_ms
68            .map_or(0, |ms| ms * NANOSECONDS_IN_MILLISECOND);
69
70        Self {
71            accounts: AccountsManager::new(clock, cache),
72            analyzer: PortfolioAnalyzer::default(),
73            unrealized_pnls: AHashMap::new(),
74            realized_pnls: AHashMap::new(),
75            snapshot_sum_per_position: AHashMap::new(),
76            snapshot_last_per_position: AHashMap::new(),
77            snapshot_processed_counts: AHashMap::new(),
78            net_positions: AHashMap::new(),
79            pending_calcs: AHashSet::new(),
80            bar_close_prices: AHashMap::new(),
81            initialized: false,
82            last_account_state_log_ts: AHashMap::new(),
83            min_account_state_logging_interval_ns,
84        }
85    }
86
87    fn reset(&mut self) {
88        log::debug!("RESETTING");
89        self.net_positions.clear();
90        self.unrealized_pnls.clear();
91        self.realized_pnls.clear();
92        self.snapshot_sum_per_position.clear();
93        self.snapshot_last_per_position.clear();
94        self.snapshot_processed_counts.clear();
95        self.pending_calcs.clear();
96        self.last_account_state_log_ts.clear();
97        self.analyzer.reset();
98        log::debug!("READY");
99    }
100}
101
102pub struct Portfolio {
103    pub(crate) clock: Rc<RefCell<dyn Clock>>,
104    pub(crate) cache: Rc<RefCell<Cache>>,
105    inner: Rc<RefCell<PortfolioState>>,
106    config: PortfolioConfig,
107}
108
109impl Debug for Portfolio {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        f.debug_struct(stringify!(Portfolio)).finish()
112    }
113}
114
115impl Portfolio {
116    pub fn new(
117        cache: Rc<RefCell<Cache>>,
118        clock: Rc<RefCell<dyn Clock>>,
119        config: Option<PortfolioConfig>,
120    ) -> Self {
121        let config = config.unwrap_or_default();
122        let inner = Rc::new(RefCell::new(PortfolioState::new(
123            clock.clone(),
124            cache.clone(),
125            &config,
126        )));
127
128        Self::register_message_handlers(
129            cache.clone(),
130            clock.clone(),
131            inner.clone(),
132            config.clone(),
133        );
134
135        Self {
136            clock,
137            cache,
138            inner,
139            config,
140        }
141    }
142
143    /// Creates a shallow clone of the Portfolio that shares the same internal state.
144    ///
145    /// This is useful when multiple components need to reference the same Portfolio
146    /// without creating duplicate msgbus handler registrations.
147    #[must_use]
148    pub fn clone_shallow(&self) -> Self {
149        Self {
150            clock: self.clock.clone(),
151            cache: self.cache.clone(),
152            inner: self.inner.clone(),
153            config: self.config.clone(),
154        }
155    }
156
157    fn register_message_handlers(
158        cache: Rc<RefCell<Cache>>,
159        clock: Rc<RefCell<dyn Clock>>,
160        inner: Rc<RefCell<PortfolioState>>,
161        config: PortfolioConfig,
162    ) {
163        let inner_weak = WeakCell::from(Rc::downgrade(&inner));
164
165        // Typed handlers for subscriptions
166        let update_account_handler = {
167            let cache = cache.clone();
168            let inner = inner_weak.clone();
169            TypedHandler::from(move |event: &AccountState| {
170                if let Some(inner_rc) = inner.upgrade() {
171                    update_account(cache.clone(), inner_rc.into(), event);
172                }
173            })
174        };
175
176        let update_position_handler = {
177            let cache = cache.clone();
178            let clock = clock.clone();
179            let inner = inner_weak.clone();
180            let config = config.clone();
181            TypedHandler::from(move |event: &PositionEvent| {
182                if let Some(inner_rc) = inner.upgrade() {
183                    update_position(
184                        cache.clone(),
185                        clock.clone(),
186                        inner_rc.into(),
187                        config.clone(),
188                        event,
189                    );
190                }
191            })
192        };
193
194        let update_quote_handler = {
195            let cache = cache.clone();
196            let clock = clock.clone();
197            let inner = inner_weak.clone();
198            let config = config.clone();
199            TypedHandler::from(move |quote: &QuoteTick| {
200                if let Some(inner_rc) = inner.upgrade() {
201                    update_quote_tick(
202                        cache.clone(),
203                        clock.clone(),
204                        inner_rc.into(),
205                        config.clone(),
206                        quote,
207                    );
208                }
209            })
210        };
211
212        let update_bar_handler = {
213            let cache = cache.clone();
214            let clock = clock.clone();
215            let inner = inner_weak.clone();
216            let config = config.clone();
217            TypedHandler::from(move |bar: &Bar| {
218                if let Some(inner_rc) = inner.upgrade() {
219                    update_bar(
220                        cache.clone(),
221                        clock.clone(),
222                        inner_rc.into(),
223                        config.clone(),
224                        bar,
225                    );
226                }
227            })
228        };
229
230        let update_mark_price_handler = {
231            let cache = cache.clone();
232            let clock = clock.clone();
233            let inner = inner_weak.clone();
234            let config = config.clone();
235            TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
236                if let Some(inner_rc) = inner.upgrade() {
237                    update_instrument_id(
238                        cache.clone(),
239                        clock.clone(),
240                        inner_rc.into(),
241                        config.clone(),
242                        &mark_price.instrument_id,
243                    );
244                }
245            })
246        };
247
248        let update_order_handler = {
249            let cache = cache;
250            let clock = clock.clone();
251            let inner = inner_weak;
252            let config = config.clone();
253            TypedHandler::from(move |event: &OrderEventAny| {
254                if let Some(inner_rc) = inner.upgrade() {
255                    update_order(
256                        cache.clone(),
257                        clock.clone(),
258                        inner_rc.into(),
259                        config.clone(),
260                        event,
261                    );
262                }
263            })
264        };
265
266        let endpoint = MessagingSwitchboard::portfolio_update_account();
267        msgbus::register_account_state_endpoint(endpoint, update_account_handler.clone());
268
269        msgbus::subscribe_quotes("data.quotes.*".into(), update_quote_handler, Some(10));
270        if config.bar_updates {
271            msgbus::subscribe_bars("data.bars.*EXTERNAL".into(), update_bar_handler, Some(10));
272        }
273        if config.use_mark_prices {
274            msgbus::subscribe_mark_prices(
275                "data.mark_prices.*".into(),
276                update_mark_price_handler,
277                Some(10),
278            );
279        }
280        msgbus::subscribe_order_events("events.order.*".into(), update_order_handler, Some(10));
281        msgbus::subscribe_position_events(
282            "events.position.*".into(),
283            update_position_handler,
284            Some(10),
285        );
286        msgbus::subscribe_account_state(
287            "events.account.*".into(),
288            update_account_handler,
289            Some(10),
290        );
291    }
292
293    pub fn reset(&mut self) {
294        log::debug!("RESETTING");
295        self.inner.borrow_mut().reset();
296        log::debug!("READY");
297    }
298
299    /// Returns a reference to the cache.
300    #[must_use]
301    pub fn cache(&self) -> &Rc<RefCell<Cache>> {
302        &self.cache
303    }
304
305    /// Returns `true` if the portfolio has been initialized.
306    #[must_use]
307    pub fn is_initialized(&self) -> bool {
308        self.inner.borrow().initialized
309    }
310
311    /// Returns the locked balances for the given venue.
312    ///
313    /// Locked balances represent funds reserved for open orders.
314    #[must_use]
315    pub fn balances_locked(&self, venue: &Venue) -> AHashMap<Currency, Money> {
316        self.cache.borrow().account_for_venue(venue).map_or_else(
317            || {
318                log::error!("Cannot get balances locked: no account generated for {venue}");
319                AHashMap::new()
320            },
321            AccountAny::balances_locked,
322        )
323    }
324
325    /// Returns the initial margin requirements for the given venue.
326    ///
327    /// Only applicable for margin accounts. Returns empty map for cash accounts.
328    #[must_use]
329    pub fn margins_init(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
330        self.cache.borrow().account_for_venue(venue).map_or_else(
331            || {
332                log::error!(
333                    "Cannot get initial (order) margins: no account registered for {venue}"
334                );
335                AHashMap::new()
336            },
337            |account| match account {
338                AccountAny::Margin(margin_account) => margin_account.initial_margins(),
339                AccountAny::Cash(_) => {
340                    log::warn!("Initial margins not applicable for cash account");
341                    AHashMap::new()
342                }
343            },
344        )
345    }
346
347    /// Returns the maintenance margin requirements for the given venue.
348    ///
349    /// Only applicable for margin accounts. Returns empty map for cash accounts.
350    #[must_use]
351    pub fn margins_maint(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
352        self.cache.borrow().account_for_venue(venue).map_or_else(
353            || {
354                log::error!(
355                    "Cannot get maintenance (position) margins: no account registered for {venue}"
356                );
357                AHashMap::new()
358            },
359            |account| match account {
360                AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
361                AccountAny::Cash(_) => {
362                    log::warn!("Maintenance margins not applicable for cash account");
363                    AHashMap::new()
364                }
365            },
366        )
367    }
368
369    /// Returns the unrealized PnLs for all positions at the given venue.
370    ///
371    /// Calculates mark-to-market PnL based on current market prices.
372    #[must_use]
373    pub fn unrealized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
374        let instrument_ids = {
375            let cache = self.cache.borrow();
376            let positions = cache.positions(Some(venue), None, None, None, None);
377
378            if positions.is_empty() {
379                return AHashMap::new(); // Nothing to calculate
380            }
381
382            let instrument_ids: AHashSet<InstrumentId> =
383                positions.iter().map(|p| p.instrument_id).collect();
384
385            instrument_ids
386        };
387
388        let mut unrealized_pnls: AHashMap<Currency, f64> = AHashMap::new();
389
390        for instrument_id in instrument_ids {
391            if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
392                // PnL already calculated
393                *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
394                continue;
395            }
396
397            // Calculate PnL
398            match self.calculate_unrealized_pnl(&instrument_id) {
399                Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
400                None => continue,
401            }
402        }
403
404        unrealized_pnls
405            .into_iter()
406            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
407            .collect()
408    }
409
410    /// Returns the realized PnLs for all positions at the given venue.
411    ///
412    /// Calculates total realized profit and loss from closed positions.
413    #[must_use]
414    pub fn realized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
415        let instrument_ids = {
416            let cache = self.cache.borrow();
417            let positions = cache.positions(Some(venue), None, None, None, None);
418
419            if positions.is_empty() {
420                return AHashMap::new(); // Nothing to calculate
421            }
422
423            let instrument_ids: AHashSet<InstrumentId> =
424                positions.iter().map(|p| p.instrument_id).collect();
425
426            instrument_ids
427        };
428
429        let mut realized_pnls: AHashMap<Currency, f64> = AHashMap::new();
430
431        for instrument_id in instrument_ids {
432            if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
433                // PnL already calculated
434                *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
435                continue;
436            }
437
438            // Calculate PnL
439            match self.calculate_realized_pnl(&instrument_id) {
440                Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
441                None => continue,
442            }
443        }
444
445        realized_pnls
446            .into_iter()
447            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
448            .collect()
449    }
450
451    #[must_use]
452    pub fn net_exposures(&self, venue: &Venue) -> Option<AHashMap<Currency, Money>> {
453        let cache = self.cache.borrow();
454        let account = if let Some(account) = cache.account_for_venue(venue) {
455            account
456        } else {
457            log::error!("Cannot calculate net exposures: no account registered for {venue}");
458            return None; // Cannot calculate
459        };
460
461        let positions_open = cache.positions_open(Some(venue), None, None, None, None);
462        if positions_open.is_empty() {
463            return Some(AHashMap::new()); // Nothing to calculate
464        }
465
466        let mut net_exposures: AHashMap<Currency, f64> = AHashMap::new();
467
468        for position in positions_open {
469            let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
470                instrument
471            } else {
472                log::error!(
473                    "Cannot calculate net exposures: no instrument for {}",
474                    position.instrument_id
475                );
476                return None; // Cannot calculate
477            };
478
479            if position.side == PositionSide::Flat {
480                log::error!(
481                    "Cannot calculate net exposures: position is flat for {}",
482                    position.instrument_id
483                );
484                continue; // Nothing to calculate
485            }
486
487            let price = self.get_price(position)?;
488            let xrate = if let Some(xrate) =
489                self.calculate_xrate_to_base(instrument, account, position.entry)
490            {
491                xrate
492            } else {
493                log::error!(
494                    // TODO: Improve logging
495                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
496                    instrument.settlement_currency(),
497                    account.base_currency()
498                );
499                return None; // Cannot calculate
500            };
501
502            let settlement_currency = account
503                .base_currency()
504                .unwrap_or_else(|| instrument.settlement_currency());
505
506            let net_exposure = instrument
507                .calculate_notional_value(position.quantity, price, None)
508                .as_f64()
509                * xrate;
510
511            let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
512                .round()
513                / 10f64.powi(settlement_currency.precision.into());
514
515            *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
516        }
517
518        Some(
519            net_exposures
520                .into_iter()
521                .map(|(currency, amount)| (currency, Money::new(amount, currency)))
522                .collect(),
523        )
524    }
525
526    #[must_use]
527    pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
528        if let Some(pnl) = self
529            .inner
530            .borrow()
531            .unrealized_pnls
532            .get(instrument_id)
533            .copied()
534        {
535            return Some(pnl);
536        }
537
538        let pnl = self.calculate_unrealized_pnl(instrument_id)?;
539        self.inner
540            .borrow_mut()
541            .unrealized_pnls
542            .insert(*instrument_id, pnl);
543        Some(pnl)
544    }
545
546    #[must_use]
547    pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
548        if let Some(pnl) = self
549            .inner
550            .borrow()
551            .realized_pnls
552            .get(instrument_id)
553            .copied()
554        {
555            return Some(pnl);
556        }
557
558        let pnl = self.calculate_realized_pnl(instrument_id)?;
559        self.inner
560            .borrow_mut()
561            .realized_pnls
562            .insert(*instrument_id, pnl);
563        Some(pnl)
564    }
565
566    /// Returns the total PnL for the given instrument ID.
567    ///
568    /// Total PnL = Realized PnL + Unrealized PnL
569    #[must_use]
570    pub fn total_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
571        let realized = self.realized_pnl(instrument_id)?;
572        let unrealized = self.unrealized_pnl(instrument_id)?;
573
574        if realized.currency != unrealized.currency {
575            log::error!(
576                "Cannot calculate total PnL: currency mismatch {} vs {}",
577                realized.currency,
578                unrealized.currency
579            );
580            return None;
581        }
582
583        Some(Money::new(
584            realized.as_f64() + unrealized.as_f64(),
585            realized.currency,
586        ))
587    }
588
589    /// Returns the total PnLs for the given venue.
590    ///
591    /// Total PnL = Realized PnL + Unrealized PnL for each currency
592    #[must_use]
593    pub fn total_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
594        let realized_pnls = self.realized_pnls(venue);
595        let unrealized_pnls = self.unrealized_pnls(venue);
596
597        let mut total_pnls: AHashMap<Currency, Money> = AHashMap::new();
598
599        // Add realized PnLs
600        for (currency, realized) in realized_pnls {
601            total_pnls.insert(currency, realized);
602        }
603
604        // Add unrealized PnLs
605        for (currency, unrealized) in unrealized_pnls {
606            match total_pnls.get_mut(&currency) {
607                Some(total) => {
608                    *total = *total + unrealized;
609                }
610                None => {
611                    total_pnls.insert(currency, unrealized);
612                }
613            }
614        }
615
616        total_pnls
617    }
618
619    #[must_use]
620    pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
621        let cache = self.cache.borrow();
622
623        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
624            instrument
625        } else {
626            log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
627            return None;
628        };
629
630        let positions_open = cache.positions_open(
631            None, // Faster query filtering
632            Some(instrument_id),
633            None,
634            None,
635            None,
636        );
637
638        if positions_open.is_empty() {
639            return Some(Money::new(0.0, instrument.settlement_currency()));
640        }
641
642        let mut net_exposure = 0.0;
643        let mut first_base_currency: Option<Currency> = None;
644        let mut first_account: Option<&AccountAny> = None;
645
646        for position in &positions_open {
647            // Get account for THIS position
648            let account = if let Some(account) = cache.account(&position.account_id) {
649                account
650            } else {
651                log::error!(
652                    "Cannot calculate net exposure: no account for {}",
653                    position.account_id
654                );
655                return None;
656            };
657
658            // Validate consistent base currency across accounts
659            if let Some(base) = account.base_currency() {
660                match first_base_currency {
661                    None => {
662                        first_base_currency = Some(base);
663                        first_account = Some(account);
664                    }
665                    Some(first) if first != base => {
666                        log::error!(
667                            "Cannot calculate net exposure: accounts have different base \
668                            currencies ({first} vs {base}); multi-account aggregation requires \
669                            consistent base currencies"
670                        );
671                        return None;
672                    }
673                    _ => {}
674                }
675            }
676
677            let price = self.get_price(position)?;
678            let xrate = if let Some(xrate) =
679                self.calculate_xrate_to_base(instrument, account, position.entry)
680            {
681                xrate
682            } else {
683                log::error!(
684                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
685                    instrument.settlement_currency(),
686                    account.base_currency()
687                );
688                return None;
689            };
690
691            let notional_value =
692                instrument.calculate_notional_value(position.quantity, price, None);
693            net_exposure += notional_value.as_f64() * xrate;
694        }
695
696        let settlement_currency = first_account
697            .and_then(|a| a.base_currency())
698            .unwrap_or_else(|| instrument.settlement_currency());
699
700        Some(Money::new(net_exposure, settlement_currency))
701    }
702
703    #[must_use]
704    pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
705        self.inner
706            .borrow()
707            .net_positions
708            .get(instrument_id)
709            .copied()
710            .unwrap_or(Decimal::ZERO)
711    }
712
713    #[must_use]
714    pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
715        self.inner
716            .borrow()
717            .net_positions
718            .get(instrument_id)
719            .copied()
720            .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
721    }
722
723    #[must_use]
724    pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
725        self.inner
726            .borrow()
727            .net_positions
728            .get(instrument_id)
729            .copied()
730            .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
731    }
732
733    #[must_use]
734    pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
735        self.inner
736            .borrow()
737            .net_positions
738            .get(instrument_id)
739            .copied()
740            .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
741    }
742
743    #[must_use]
744    pub fn is_completely_flat(&self) -> bool {
745        for net_position in self.inner.borrow().net_positions.values() {
746            if *net_position != Decimal::ZERO {
747                return false;
748            }
749        }
750        true
751    }
752
753    /// Initializes account margin based on existing open orders.
754    ///
755    /// # Panics
756    ///
757    /// Panics if updating the cache with a mutated account fails.
758    pub fn initialize_orders(&mut self) {
759        let mut initialized = true;
760        let orders_and_instruments = {
761            let cache = self.cache.borrow();
762            let all_orders_open = cache.orders_open(None, None, None, None, None);
763
764            let mut instruments_with_orders = Vec::new();
765            let mut instruments = AHashSet::new();
766
767            for order in &all_orders_open {
768                instruments.insert(order.instrument_id());
769            }
770
771            for instrument_id in instruments {
772                if let Some(instrument) = cache.instrument(&instrument_id) {
773                    let orders = cache
774                        .orders_open(None, Some(&instrument_id), None, None, None)
775                        .into_iter()
776                        .cloned()
777                        .collect::<Vec<OrderAny>>();
778                    instruments_with_orders.push((instrument.clone(), orders));
779                } else {
780                    log::error!(
781                        "Cannot update initial (order) margin: no instrument found for {instrument_id}"
782                    );
783                    initialized = false;
784                    break;
785                }
786            }
787            instruments_with_orders
788        };
789
790        for (instrument, orders_open) in &orders_and_instruments {
791            let mut cache = self.cache.borrow_mut();
792            let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
793                account
794            } else {
795                log::error!(
796                    "Cannot update initial (order) margin: no account registered for {}",
797                    instrument.id().venue
798                );
799                initialized = false;
800                break;
801            };
802
803            let result = self.inner.borrow_mut().accounts.update_orders(
804                account,
805                instrument.clone(),
806                orders_open.iter().collect(),
807                self.clock.borrow().timestamp_ns(),
808            );
809
810            match result {
811                Some((updated_account, _)) => {
812                    cache.update_account(updated_account).unwrap();
813                }
814                None => {
815                    initialized = false;
816                }
817            }
818        }
819
820        let total_orders = orders_and_instruments
821            .into_iter()
822            .map(|(_, orders)| orders.len())
823            .sum::<usize>();
824
825        log::info!(
826            color = if total_orders > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
827            "Initialized {} open order{}",
828            total_orders,
829            if total_orders == 1 { "" } else { "s" }
830        );
831
832        self.inner.borrow_mut().initialized = initialized;
833    }
834
835    /// Initializes account margin based on existing open positions.
836    ///
837    /// # Panics
838    ///
839    /// Panics if calculation of PnL or updating the cache with a mutated account fails.
840    pub fn initialize_positions(&mut self) {
841        self.inner.borrow_mut().unrealized_pnls.clear();
842        self.inner.borrow_mut().realized_pnls.clear();
843        let all_positions_open: Vec<Position>;
844        let mut instruments = AHashSet::new();
845        {
846            let cache = self.cache.borrow();
847            all_positions_open = cache
848                .positions_open(None, None, None, None, None)
849                .into_iter()
850                .cloned()
851                .collect();
852            for position in &all_positions_open {
853                instruments.insert(position.instrument_id);
854            }
855        }
856
857        let mut initialized = true;
858
859        for instrument_id in instruments {
860            let positions_open: Vec<Position> = {
861                let cache = self.cache.borrow();
862                cache
863                    .positions_open(None, Some(&instrument_id), None, None, None)
864                    .into_iter()
865                    .cloned()
866                    .collect()
867            };
868
869            self.update_net_position(&instrument_id, &positions_open);
870
871            if let Some(calculated_unrealized_pnl) = self.calculate_unrealized_pnl(&instrument_id) {
872                self.inner
873                    .borrow_mut()
874                    .unrealized_pnls
875                    .insert(instrument_id, calculated_unrealized_pnl);
876            } else {
877                log::warn!(
878                    "Failed to calculate unrealized PnL for {instrument_id}, marking as pending"
879                );
880                self.inner.borrow_mut().pending_calcs.insert(instrument_id);
881            }
882
883            if let Some(calculated_realized_pnl) = self.calculate_realized_pnl(&instrument_id) {
884                self.inner
885                    .borrow_mut()
886                    .realized_pnls
887                    .insert(instrument_id, calculated_realized_pnl);
888            } else {
889                log::warn!(
890                    "Failed to calculate realized PnL for {instrument_id}, marking as pending"
891                );
892                self.inner.borrow_mut().pending_calcs.insert(instrument_id);
893            }
894
895            let cache = self.cache.borrow();
896            let Some(account) = cache.account_for_venue(&instrument_id.venue).cloned() else {
897                log::error!(
898                    "Cannot update maintenance (position) margin: no account registered for {}",
899                    instrument_id.venue
900                );
901                initialized = false;
902                break;
903            };
904
905            let account = match account {
906                AccountAny::Cash(_) => continue,
907                AccountAny::Margin(margin_account) => margin_account,
908            };
909
910            let Some(instrument) = cache.instrument(&instrument_id).cloned() else {
911                log::error!(
912                    "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
913                );
914                initialized = false;
915                break;
916            };
917            let positions: Vec<Position> = cache
918                .positions_open(None, Some(&instrument_id), None, None, None)
919                .into_iter()
920                .cloned()
921                .collect();
922            drop(cache);
923
924            let result = self.inner.borrow_mut().accounts.update_positions(
925                &account,
926                instrument,
927                positions.iter().collect(),
928                self.clock.borrow().timestamp_ns(),
929            );
930
931            match result {
932                Some((updated_account, _)) => {
933                    self.cache
934                        .borrow_mut()
935                        .update_account(AccountAny::Margin(updated_account))
936                        .unwrap();
937                }
938                None => {
939                    initialized = false;
940                }
941            }
942        }
943
944        let open_count = all_positions_open.len();
945        self.inner.borrow_mut().initialized = initialized;
946        log::info!(
947            color = if open_count > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
948            "Initialized {} open position{}",
949            open_count,
950            if open_count == 1 { "" } else { "s" }
951        );
952    }
953
954    /// Updates portfolio calculations based on a new quote tick.
955    ///
956    /// Recalculates unrealized PnL for positions affected by the quote update.
957    pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
958        update_quote_tick(
959            self.cache.clone(),
960            self.clock.clone(),
961            self.inner.clone(),
962            self.config.clone(),
963            quote,
964        );
965    }
966
967    /// Updates portfolio calculations based on a new bar.
968    ///
969    /// Updates cached bar close prices and recalculates unrealized PnL.
970    pub fn update_bar(&mut self, bar: &Bar) {
971        update_bar(
972            self.cache.clone(),
973            self.clock.clone(),
974            self.inner.clone(),
975            self.config.clone(),
976            bar,
977        );
978    }
979
980    /// Updates portfolio with a new account state event.
981    pub fn update_account(&mut self, event: &AccountState) {
982        update_account(self.cache.clone(), self.inner.clone(), event);
983    }
984
985    /// Updates portfolio calculations based on an order event.
986    ///
987    /// Handles balance updates for order fills and margin calculations for order changes.
988    pub fn update_order(&mut self, event: &OrderEventAny) {
989        update_order(
990            self.cache.clone(),
991            self.clock.clone(),
992            self.inner.clone(),
993            self.config.clone(),
994            event,
995        );
996    }
997
998    /// Updates portfolio calculations based on a position event.
999    ///
1000    /// Recalculates net positions, unrealized PnL, and margin requirements.
1001    pub fn update_position(&mut self, event: &PositionEvent) {
1002        update_position(
1003            self.cache.clone(),
1004            self.clock.clone(),
1005            self.inner.clone(),
1006            self.config.clone(),
1007            event,
1008        );
1009    }
1010
1011    fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: &[Position]) {
1012        let mut net_position = Decimal::ZERO;
1013
1014        for open_position in positions_open {
1015            log::debug!("open_position: {open_position}");
1016            net_position += open_position.signed_decimal_qty();
1017        }
1018
1019        let existing_position = self.net_position(instrument_id);
1020        if existing_position != net_position {
1021            self.inner
1022                .borrow_mut()
1023                .net_positions
1024                .insert(*instrument_id, net_position);
1025            log::info!("{instrument_id} net_position={net_position}");
1026        }
1027    }
1028
1029    fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1030        let cache = self.cache.borrow();
1031        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1032            account
1033        } else {
1034            log::error!(
1035                "Cannot calculate unrealized PnL: no account registered for {}",
1036                instrument_id.venue
1037            );
1038            return None;
1039        };
1040
1041        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1042            instrument
1043        } else {
1044            log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
1045            return None;
1046        };
1047
1048        let currency = account
1049            .base_currency()
1050            .unwrap_or_else(|| instrument.settlement_currency());
1051
1052        let positions_open = cache.positions_open(
1053            None, // Faster query filtering
1054            Some(instrument_id),
1055            None,
1056            None,
1057            None,
1058        );
1059
1060        if positions_open.is_empty() {
1061            return Some(Money::new(0.0, currency));
1062        }
1063
1064        let mut total_pnl = 0.0;
1065
1066        for position in positions_open {
1067            if position.instrument_id != *instrument_id {
1068                continue; // Nothing to calculate
1069            }
1070
1071            if position.side == PositionSide::Flat {
1072                continue; // Nothing to calculate
1073            }
1074
1075            let price = if let Some(price) = self.get_price(position) {
1076                price
1077            } else {
1078                log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
1079                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1080                return None; // Cannot calculate
1081            };
1082
1083            let mut pnl = position.unrealized_pnl(price).as_f64();
1084
1085            if let Some(base_currency) = account.base_currency() {
1086                let xrate = if let Some(xrate) =
1087                    self.calculate_xrate_to_base(instrument, account, position.entry)
1088                {
1089                    xrate
1090                } else {
1091                    log::error!(
1092                        // TODO: Improve logging
1093                        "Cannot calculate unrealized PnL: insufficient data for {}/{}",
1094                        instrument.settlement_currency(),
1095                        base_currency
1096                    );
1097                    self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1098                    return None; // Cannot calculate
1099                };
1100
1101                let scale = 10f64.powi(currency.precision.into());
1102                pnl = ((pnl * xrate) * scale).round() / scale;
1103            }
1104
1105            total_pnl += pnl;
1106        }
1107
1108        Some(Money::new(total_pnl, currency))
1109    }
1110
1111    fn ensure_snapshot_pnls_cached_for(&mut self, instrument_id: &InstrumentId) {
1112        // Performance: This method maintains an incremental cache of snapshot PnLs
1113        // It only deserializes new snapshots that haven't been processed yet
1114        // Tracks sum and last PnL per position for efficient NETTING OMS support
1115
1116        // Get all position IDs that have snapshots for this instrument
1117        let snapshot_position_ids = self.cache.borrow().position_snapshot_ids(instrument_id);
1118
1119        if snapshot_position_ids.is_empty() {
1120            return; // Nothing to process
1121        }
1122
1123        let mut rebuild = false;
1124
1125        // Detect purge/reset (count regression) to trigger full rebuild
1126        for position_id in &snapshot_position_ids {
1127            let position_snapshots = self.cache.borrow().position_snapshot_bytes(position_id);
1128            let curr_count = position_snapshots.map_or(0, |s| {
1129                // Count the number of snapshots (they're serialized as JSON objects)
1130                s.split(|&b| b == b'{').count() - 1
1131            });
1132            let prev_count = self
1133                .inner
1134                .borrow()
1135                .snapshot_processed_counts
1136                .get(position_id)
1137                .copied()
1138                .unwrap_or(0);
1139
1140            if prev_count > curr_count {
1141                rebuild = true;
1142                break;
1143            }
1144        }
1145
1146        if rebuild {
1147            // Full rebuild: process all snapshots from scratch
1148            for position_id in &snapshot_position_ids {
1149                if let Some(position_snapshots) =
1150                    self.cache.borrow().position_snapshot_bytes(position_id)
1151                {
1152                    let mut sum_pnl: Option<Money> = None;
1153                    let mut last_pnl: Option<Money> = None;
1154
1155                    // Snapshots are concatenated JSON objects
1156                    let mut start = 0;
1157                    let mut depth = 0;
1158                    let mut in_string = false;
1159                    let mut escape_next = false;
1160
1161                    for (i, &byte) in position_snapshots.iter().enumerate() {
1162                        if escape_next {
1163                            escape_next = false;
1164                            continue;
1165                        }
1166
1167                        if byte == b'\\' && in_string {
1168                            escape_next = true;
1169                            continue;
1170                        }
1171
1172                        if byte == b'"' && !escape_next {
1173                            in_string = !in_string;
1174                        }
1175
1176                        if !in_string {
1177                            if byte == b'{' {
1178                                if depth == 0 {
1179                                    start = i;
1180                                }
1181                                depth += 1;
1182                            } else if byte == b'}' {
1183                                depth -= 1;
1184                                if depth == 0
1185                                    && let Ok(snapshot) = serde_json::from_slice::<Position>(
1186                                        &position_snapshots[start..=i],
1187                                    )
1188                                    && let Some(realized_pnl) = snapshot.realized_pnl
1189                                {
1190                                    if let Some(ref mut sum) = sum_pnl {
1191                                        if sum.currency == realized_pnl.currency {
1192                                            *sum = Money::new(
1193                                                sum.as_f64() + realized_pnl.as_f64(),
1194                                                sum.currency,
1195                                            );
1196                                        }
1197                                    } else {
1198                                        sum_pnl = Some(realized_pnl);
1199                                    }
1200                                    last_pnl = Some(realized_pnl);
1201                                }
1202                            }
1203                        }
1204                    }
1205
1206                    let mut inner = self.inner.borrow_mut();
1207                    if let Some(sum) = sum_pnl {
1208                        inner.snapshot_sum_per_position.insert(*position_id, sum);
1209                        if let Some(last) = last_pnl {
1210                            inner.snapshot_last_per_position.insert(*position_id, last);
1211                        }
1212                    } else {
1213                        inner.snapshot_sum_per_position.remove(position_id);
1214                        inner.snapshot_last_per_position.remove(position_id);
1215                    }
1216
1217                    let snapshot_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1218                    inner
1219                        .snapshot_processed_counts
1220                        .insert(*position_id, snapshot_count);
1221                }
1222            }
1223        } else {
1224            // Incremental path: only process new snapshots
1225            for position_id in &snapshot_position_ids {
1226                if let Some(position_snapshots) =
1227                    self.cache.borrow().position_snapshot_bytes(position_id)
1228                {
1229                    let curr_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1230                    let prev_count = self
1231                        .inner
1232                        .borrow()
1233                        .snapshot_processed_counts
1234                        .get(position_id)
1235                        .copied()
1236                        .unwrap_or(0);
1237
1238                    if prev_count >= curr_count {
1239                        continue;
1240                    }
1241
1242                    let mut sum_pnl = self
1243                        .inner
1244                        .borrow()
1245                        .snapshot_sum_per_position
1246                        .get(position_id)
1247                        .copied();
1248                    let mut last_pnl = self
1249                        .inner
1250                        .borrow()
1251                        .snapshot_last_per_position
1252                        .get(position_id)
1253                        .copied();
1254
1255                    // Process only new snapshots
1256                    let mut start = 0;
1257                    let mut depth = 0;
1258                    let mut in_string = false;
1259                    let mut escape_next = false;
1260                    let mut snapshot_index = 0;
1261
1262                    for (i, &byte) in position_snapshots.iter().enumerate() {
1263                        if escape_next {
1264                            escape_next = false;
1265                            continue;
1266                        }
1267
1268                        if byte == b'\\' && in_string {
1269                            escape_next = true;
1270                            continue;
1271                        }
1272
1273                        if byte == b'"' && !escape_next {
1274                            in_string = !in_string;
1275                        }
1276
1277                        if !in_string {
1278                            if byte == b'{' {
1279                                if depth == 0 {
1280                                    start = i;
1281                                }
1282                                depth += 1;
1283                            } else if byte == b'}' {
1284                                depth -= 1;
1285                                if depth == 0 {
1286                                    snapshot_index += 1;
1287                                    // Only process new snapshots
1288                                    if snapshot_index > prev_count
1289                                        && let Ok(snapshot) = serde_json::from_slice::<Position>(
1290                                            &position_snapshots[start..=i],
1291                                        )
1292                                        && let Some(realized_pnl) = snapshot.realized_pnl
1293                                    {
1294                                        if let Some(ref mut sum) = sum_pnl {
1295                                            if sum.currency == realized_pnl.currency {
1296                                                *sum = Money::new(
1297                                                    sum.as_f64() + realized_pnl.as_f64(),
1298                                                    sum.currency,
1299                                                );
1300                                            }
1301                                        } else {
1302                                            sum_pnl = Some(realized_pnl);
1303                                        }
1304                                        last_pnl = Some(realized_pnl);
1305                                    }
1306                                }
1307                            }
1308                        }
1309                    }
1310
1311                    let mut inner = self.inner.borrow_mut();
1312                    if let Some(sum) = sum_pnl {
1313                        inner.snapshot_sum_per_position.insert(*position_id, sum);
1314                        if let Some(last) = last_pnl {
1315                            inner.snapshot_last_per_position.insert(*position_id, last);
1316                        }
1317                    }
1318                    inner
1319                        .snapshot_processed_counts
1320                        .insert(*position_id, curr_count);
1321                }
1322            }
1323        }
1324    }
1325
1326    fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1327        // Ensure snapshot PnLs are cached for this instrument
1328        self.ensure_snapshot_pnls_cached_for(instrument_id);
1329
1330        let cache = self.cache.borrow();
1331        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1332            account
1333        } else {
1334            log::error!(
1335                "Cannot calculate realized PnL: no account registered for {}",
1336                instrument_id.venue
1337            );
1338            return None;
1339        };
1340
1341        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1342            instrument
1343        } else {
1344            log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
1345            return None;
1346        };
1347
1348        let currency = account
1349            .base_currency()
1350            .unwrap_or_else(|| instrument.settlement_currency());
1351
1352        let positions = cache.positions(
1353            None, // Faster query filtering
1354            Some(instrument_id),
1355            None,
1356            None,
1357            None,
1358        );
1359
1360        let snapshot_position_ids = cache.position_snapshot_ids(instrument_id);
1361
1362        // Check if we need to use NETTING OMS logic
1363        let is_netting = positions
1364            .iter()
1365            .any(|p| cache.oms_type(&p.id) == Some(OmsType::Netting));
1366
1367        let mut total_pnl = 0.0;
1368
1369        if is_netting && !snapshot_position_ids.is_empty() {
1370            // NETTING OMS: Apply 3-case rule for position cycles
1371
1372            for position_id in &snapshot_position_ids {
1373                let is_active = positions.iter().any(|p| p.id == *position_id);
1374
1375                if is_active {
1376                    // Case 1 & 2: Active position - use only the last snapshot PnL
1377                    let last_pnl = self
1378                        .inner
1379                        .borrow()
1380                        .snapshot_last_per_position
1381                        .get(position_id)
1382                        .copied();
1383                    if let Some(last_pnl) = last_pnl {
1384                        let mut pnl = last_pnl.as_f64();
1385
1386                        if let Some(base_currency) = account.base_currency()
1387                            && let Some(position) = positions.iter().find(|p| p.id == *position_id)
1388                        {
1389                            let xrate = if let Some(xrate) =
1390                                self.calculate_xrate_to_base(instrument, account, position.entry)
1391                            {
1392                                xrate
1393                            } else {
1394                                log::error!(
1395                                    "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1396                                    instrument.settlement_currency(),
1397                                    base_currency
1398                                );
1399                                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1400                                return Some(Money::new(0.0, currency));
1401                            };
1402
1403                            let scale = 10f64.powi(currency.precision.into());
1404                            pnl = ((pnl * xrate) * scale).round() / scale;
1405                        }
1406
1407                        total_pnl += pnl;
1408                    }
1409                } else {
1410                    // Case 3: Closed position - use sum of all snapshot PnLs
1411                    let sum_pnl = self
1412                        .inner
1413                        .borrow()
1414                        .snapshot_sum_per_position
1415                        .get(position_id)
1416                        .copied();
1417                    if let Some(sum_pnl) = sum_pnl {
1418                        let mut pnl = sum_pnl.as_f64();
1419
1420                        if let Some(base_currency) = account.base_currency() {
1421                            // For closed positions, we don't have entry price, use current rates
1422                            let xrate = cache.get_xrate(
1423                                instrument_id.venue,
1424                                instrument.settlement_currency(),
1425                                base_currency,
1426                                PriceType::Mid,
1427                            );
1428
1429                            if let Some(xrate) = xrate {
1430                                let scale = 10f64.powi(currency.precision.into());
1431                                pnl = ((pnl * xrate) * scale).round() / scale;
1432                            } else {
1433                                log::error!(
1434                                    "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1435                                    instrument.settlement_currency(),
1436                                    base_currency
1437                                );
1438                                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1439                                return Some(Money::new(0.0, currency));
1440                            }
1441                        }
1442
1443                        total_pnl += pnl;
1444                    }
1445                }
1446            }
1447
1448            // Add realized PnL from current active positions
1449            for position in positions {
1450                if position.instrument_id != *instrument_id {
1451                    continue;
1452                }
1453
1454                if let Some(realized_pnl) = position.realized_pnl {
1455                    let mut pnl = realized_pnl.as_f64();
1456
1457                    if let Some(base_currency) = account.base_currency() {
1458                        let xrate = if let Some(xrate) =
1459                            self.calculate_xrate_to_base(instrument, account, position.entry)
1460                        {
1461                            xrate
1462                        } else {
1463                            log::error!(
1464                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1465                                instrument.settlement_currency(),
1466                                base_currency
1467                            );
1468                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1469                            return Some(Money::new(0.0, currency));
1470                        };
1471
1472                        let scale = 10f64.powi(currency.precision.into());
1473                        pnl = ((pnl * xrate) * scale).round() / scale;
1474                    }
1475
1476                    total_pnl += pnl;
1477                }
1478            }
1479        } else {
1480            // HEDGING OMS or no snapshots: Simple aggregation
1481            // Add snapshot PnLs (sum all)
1482            for position_id in &snapshot_position_ids {
1483                let sum_pnl = self
1484                    .inner
1485                    .borrow()
1486                    .snapshot_sum_per_position
1487                    .get(position_id)
1488                    .copied();
1489                if let Some(sum_pnl) = sum_pnl {
1490                    let mut pnl = sum_pnl.as_f64();
1491
1492                    if let Some(base_currency) = account.base_currency() {
1493                        let xrate = cache.get_xrate(
1494                            instrument_id.venue,
1495                            instrument.settlement_currency(),
1496                            base_currency,
1497                            PriceType::Mid,
1498                        );
1499
1500                        if let Some(xrate) = xrate {
1501                            let scale = 10f64.powi(currency.precision.into());
1502                            pnl = ((pnl * xrate) * scale).round() / scale;
1503                        } else {
1504                            log::error!(
1505                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1506                                instrument.settlement_currency(),
1507                                base_currency
1508                            );
1509                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1510                            return Some(Money::new(0.0, currency));
1511                        }
1512                    }
1513
1514                    total_pnl += pnl;
1515                }
1516            }
1517
1518            // Add realized PnL from current positions
1519            for position in positions {
1520                if position.instrument_id != *instrument_id {
1521                    continue;
1522                }
1523
1524                if let Some(realized_pnl) = position.realized_pnl {
1525                    let mut pnl = realized_pnl.as_f64();
1526
1527                    if let Some(base_currency) = account.base_currency() {
1528                        let xrate = if let Some(xrate) =
1529                            self.calculate_xrate_to_base(instrument, account, position.entry)
1530                        {
1531                            xrate
1532                        } else {
1533                            log::error!(
1534                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1535                                instrument.settlement_currency(),
1536                                base_currency
1537                            );
1538                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1539                            return Some(Money::new(0.0, currency));
1540                        };
1541
1542                        let scale = 10f64.powi(currency.precision.into());
1543                        pnl = ((pnl * xrate) * scale).round() / scale;
1544                    }
1545
1546                    total_pnl += pnl;
1547                }
1548            }
1549        }
1550
1551        Some(Money::new(total_pnl, currency))
1552    }
1553
1554    fn get_price(&self, position: &Position) -> Option<Price> {
1555        let cache = self.cache.borrow();
1556        let instrument_id = &position.instrument_id;
1557
1558        // Check for mark price first if configured
1559        if self.config.use_mark_prices
1560            && let Some(mark_price) = cache.mark_price(instrument_id)
1561        {
1562            return Some(mark_price.value);
1563        }
1564
1565        // Fall back to bid/ask based on position side
1566        let price_type = match position.side {
1567            PositionSide::Long => PriceType::Bid,
1568            PositionSide::Short => PriceType::Ask,
1569            _ => panic!("invalid `PositionSide`, was {}", position.side),
1570        };
1571
1572        cache
1573            .price(instrument_id, price_type)
1574            .or_else(|| cache.price(instrument_id, PriceType::Last))
1575            .or_else(|| {
1576                self.inner
1577                    .borrow()
1578                    .bar_close_prices
1579                    .get(instrument_id)
1580                    .copied()
1581            })
1582    }
1583
1584    fn calculate_xrate_to_base(
1585        &self,
1586        instrument: &InstrumentAny,
1587        account: &AccountAny,
1588        side: OrderSide,
1589    ) -> Option<f64> {
1590        if !self.config.convert_to_account_base_currency {
1591            return Some(1.0); // No conversion needed
1592        }
1593
1594        match account.base_currency() {
1595            None => Some(1.0), // No conversion needed
1596            Some(base_currency) => {
1597                let cache = self.cache.borrow();
1598
1599                if self.config.use_mark_xrates {
1600                    return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1601                }
1602
1603                let price_type = if side == OrderSide::Buy {
1604                    PriceType::Bid
1605                } else {
1606                    PriceType::Ask
1607                };
1608
1609                cache.get_xrate(
1610                    instrument.id().venue,
1611                    instrument.settlement_currency(),
1612                    base_currency,
1613                    price_type,
1614                )
1615            }
1616        }
1617    }
1618}
1619
1620// Helper functions
1621fn update_quote_tick(
1622    cache: Rc<RefCell<Cache>>,
1623    clock: Rc<RefCell<dyn Clock>>,
1624    inner: Rc<RefCell<PortfolioState>>,
1625    config: PortfolioConfig,
1626    quote: &QuoteTick,
1627) {
1628    update_instrument_id(cache, clock.clone(), inner, config, &quote.instrument_id);
1629}
1630
1631fn update_bar(
1632    cache: Rc<RefCell<Cache>>,
1633    clock: Rc<RefCell<dyn Clock>>,
1634    inner: Rc<RefCell<PortfolioState>>,
1635    config: PortfolioConfig,
1636    bar: &Bar,
1637) {
1638    let instrument_id = bar.bar_type.instrument_id();
1639    inner
1640        .borrow_mut()
1641        .bar_close_prices
1642        .insert(instrument_id, bar.close);
1643    update_instrument_id(cache, clock.clone(), inner, config, &instrument_id);
1644}
1645
1646fn update_instrument_id(
1647    cache: Rc<RefCell<Cache>>,
1648    clock: Rc<RefCell<dyn Clock>>,
1649    inner: Rc<RefCell<PortfolioState>>,
1650    config: PortfolioConfig,
1651    instrument_id: &InstrumentId,
1652) {
1653    inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1654
1655    if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1656        return;
1657    }
1658
1659    let result_init;
1660    let mut result_maint = None;
1661
1662    let account = {
1663        let account = {
1664            let cache_ref = cache.borrow();
1665            if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1666                account.clone()
1667            } else {
1668                log::error!(
1669                    "Cannot update tick: no account registered for {}",
1670                    instrument_id.venue
1671                );
1672                return;
1673            }
1674        };
1675
1676        let mut cache_ref = cache.borrow_mut();
1677        let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1678            instrument.clone()
1679        } else {
1680            log::error!("Cannot update tick: no instrument found for {instrument_id}");
1681            return;
1682        };
1683
1684        // Clone the orders and positions to own the data
1685        let orders_open: Vec<OrderAny> = cache_ref
1686            .orders_open(None, Some(instrument_id), None, None, None)
1687            .iter()
1688            .map(|o| (*o).clone())
1689            .collect();
1690
1691        let positions_open: Vec<Position> = cache_ref
1692            .positions_open(None, Some(instrument_id), None, None, None)
1693            .iter()
1694            .map(|p| (*p).clone())
1695            .collect();
1696
1697        result_init = inner.borrow().accounts.update_orders(
1698            &account,
1699            instrument.clone(),
1700            orders_open.iter().collect(),
1701            clock.borrow().timestamp_ns(),
1702        );
1703
1704        if let AccountAny::Margin(ref margin_account) = account {
1705            result_maint = inner.borrow().accounts.update_positions(
1706                margin_account,
1707                instrument,
1708                positions_open.iter().collect(),
1709                clock.borrow().timestamp_ns(),
1710            );
1711        }
1712
1713        if let Some((ref updated_account, _)) = result_init {
1714            cache_ref.update_account(updated_account.clone()).unwrap();
1715        }
1716        account
1717    };
1718
1719    let mut portfolio_clone = Portfolio {
1720        clock: clock.clone(),
1721        cache,
1722        inner: inner.clone(),
1723        config,
1724    };
1725
1726    let result_unrealized_pnl: Option<Money> =
1727        portfolio_clone.calculate_unrealized_pnl(instrument_id);
1728
1729    if result_init.is_some()
1730        && (matches!(account, AccountAny::Cash(_))
1731            || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1732    {
1733        inner.borrow_mut().pending_calcs.remove(instrument_id);
1734        if inner.borrow().pending_calcs.is_empty() {
1735            inner.borrow_mut().initialized = true;
1736        }
1737    }
1738}
1739
1740fn update_order(
1741    cache: Rc<RefCell<Cache>>,
1742    clock: Rc<RefCell<dyn Clock>>,
1743    inner: Rc<RefCell<PortfolioState>>,
1744    _config: PortfolioConfig,
1745    event: &OrderEventAny,
1746) {
1747    let cache_ref = cache.borrow();
1748    let account_id = match event.account_id() {
1749        Some(account_id) => account_id,
1750        None => {
1751            return; // No Account Assigned
1752        }
1753    };
1754
1755    let account = if let Some(account) = cache_ref.account(&account_id) {
1756        account
1757    } else {
1758        log::error!("Cannot update order: no account registered for {account_id}");
1759        return;
1760    };
1761
1762    match account {
1763        AccountAny::Cash(cash_account) => {
1764            if !cash_account.base.calculate_account_state {
1765                return;
1766            }
1767        }
1768        AccountAny::Margin(margin_account) => {
1769            if !margin_account.base.calculate_account_state {
1770                return;
1771            }
1772        }
1773    }
1774
1775    match event {
1776        OrderEventAny::Accepted(_)
1777        | OrderEventAny::Canceled(_)
1778        | OrderEventAny::Rejected(_)
1779        | OrderEventAny::Updated(_)
1780        | OrderEventAny::Filled(_) => {}
1781        _ => {
1782            return;
1783        }
1784    }
1785
1786    let cache_ref = cache.borrow();
1787    let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1788        order
1789    } else {
1790        log::error!(
1791            "Cannot update order: {} not found in the cache",
1792            event.client_order_id()
1793        );
1794        return; // No Order Found
1795    };
1796
1797    if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1798        return; // No change to account state
1799    }
1800
1801    let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1802        instrument_id
1803    } else {
1804        log::error!(
1805            "Cannot update order: no instrument found for {}",
1806            event.instrument_id()
1807        );
1808        return;
1809    };
1810
1811    if let OrderEventAny::Filled(order_filled) = event {
1812        let _ = inner.borrow().accounts.update_balances(
1813            account.clone(),
1814            instrument.clone(),
1815            *order_filled,
1816        );
1817
1818        let mut portfolio_clone = Portfolio {
1819            clock: clock.clone(),
1820            cache: cache.clone(),
1821            inner: inner.clone(),
1822            config: PortfolioConfig::default(), // TODO: TBD
1823        };
1824
1825        match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1826            Some(unrealized_pnl) => {
1827                inner
1828                    .borrow_mut()
1829                    .unrealized_pnls
1830                    .insert(event.instrument_id(), unrealized_pnl);
1831            }
1832            None => {
1833                log::error!(
1834                    "Failed to calculate unrealized PnL for instrument {}",
1835                    event.instrument_id()
1836                );
1837            }
1838        }
1839    }
1840
1841    let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None, None);
1842
1843    let account_state = inner.borrow_mut().accounts.update_orders(
1844        account,
1845        instrument.clone(),
1846        orders_open,
1847        clock.borrow().timestamp_ns(),
1848    );
1849
1850    let mut cache_ref = cache.borrow_mut();
1851
1852    if let Some((updated_account, account_state)) = account_state {
1853        cache_ref.update_account(updated_account).unwrap();
1854        msgbus::publish_account_state(
1855            format!("events.account.{}", account.id()).into(),
1856            &account_state,
1857        );
1858    } else {
1859        log::debug!("Added pending calculation for {}", instrument.id());
1860        inner.borrow_mut().pending_calcs.insert(instrument.id());
1861    }
1862
1863    log::debug!("Updated {event}");
1864}
1865
1866fn update_position(
1867    cache: Rc<RefCell<Cache>>,
1868    clock: Rc<RefCell<dyn Clock>>,
1869    inner: Rc<RefCell<PortfolioState>>,
1870    _config: PortfolioConfig,
1871    event: &PositionEvent,
1872) {
1873    let instrument_id = event.instrument_id();
1874
1875    let positions_open: Vec<Position> = {
1876        let cache_ref = cache.borrow();
1877
1878        cache_ref
1879            .positions_open(None, Some(&instrument_id), None, None, None)
1880            .iter()
1881            .map(|o| (*o).clone())
1882            .collect()
1883    };
1884
1885    log::debug!("position fresh from cache -> {positions_open:?}");
1886
1887    let mut portfolio_clone = Portfolio {
1888        clock: clock.clone(),
1889        cache: cache.clone(),
1890        inner: inner.clone(),
1891        config: PortfolioConfig::default(), // TODO: TBD
1892    };
1893
1894    portfolio_clone.update_net_position(&instrument_id, &positions_open);
1895
1896    if let Some(calculated_unrealized_pnl) =
1897        portfolio_clone.calculate_unrealized_pnl(&instrument_id)
1898    {
1899        inner
1900            .borrow_mut()
1901            .unrealized_pnls
1902            .insert(event.instrument_id(), calculated_unrealized_pnl);
1903    } else {
1904        log::warn!(
1905            "Failed to calculate unrealized PnL for {}, marking as pending",
1906            event.instrument_id()
1907        );
1908        inner
1909            .borrow_mut()
1910            .pending_calcs
1911            .insert(event.instrument_id());
1912    }
1913
1914    if let Some(calculated_realized_pnl) = portfolio_clone.calculate_realized_pnl(&instrument_id) {
1915        inner
1916            .borrow_mut()
1917            .realized_pnls
1918            .insert(event.instrument_id(), calculated_realized_pnl);
1919    } else {
1920        log::warn!(
1921            "Failed to calculate realized PnL for {}, marking as pending",
1922            event.instrument_id()
1923        );
1924        inner
1925            .borrow_mut()
1926            .pending_calcs
1927            .insert(event.instrument_id());
1928    }
1929
1930    let cache_ref = cache.borrow();
1931    let account = cache_ref.account(&event.account_id());
1932
1933    if let Some(AccountAny::Margin(margin_account)) = account {
1934        if !margin_account.calculate_account_state {
1935            return; // Nothing to calculate
1936        }
1937
1938        let cache_ref = cache.borrow();
1939        let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1940            instrument
1941        } else {
1942            log::error!("Cannot update position: no instrument found for {instrument_id}");
1943            return;
1944        };
1945
1946        let result = inner.borrow_mut().accounts.update_positions(
1947            margin_account,
1948            instrument.clone(),
1949            positions_open.iter().collect(),
1950            clock.borrow().timestamp_ns(),
1951        );
1952        let mut cache_ref = cache.borrow_mut();
1953        if let Some((margin_account, _)) = result {
1954            cache_ref
1955                .update_account(AccountAny::Margin(margin_account))
1956                .unwrap();
1957        }
1958    } else if account.is_none() {
1959        log::error!(
1960            "Cannot update position: no account registered for {}",
1961            event.account_id()
1962        );
1963    }
1964}
1965
1966fn update_account(
1967    cache: Rc<RefCell<Cache>>,
1968    inner: Rc<RefCell<PortfolioState>>,
1969    event: &AccountState,
1970) {
1971    let mut cache_ref = cache.borrow_mut();
1972
1973    if let Some(existing) = cache_ref.account(&event.account_id) {
1974        let mut account = existing.clone();
1975        if let Err(e) = account.apply(event.clone()) {
1976            log::error!("Failed to apply account state: {e}");
1977            return;
1978        }
1979
1980        if let Err(e) = cache_ref.update_account(account.clone()) {
1981            log::error!("Failed to update account: {e}");
1982            return;
1983        }
1984    } else {
1985        let account = match AccountAny::from_events(vec![event.clone()]) {
1986            Ok(account) => account,
1987            Err(e) => {
1988                log::error!("Failed to create account: {e}");
1989                return;
1990            }
1991        };
1992
1993        if let Err(e) = cache_ref.add_account(account) {
1994            log::error!("Failed to add account: {e}");
1995            return;
1996        }
1997    }
1998
1999    // Throttled logging logic
2000    let mut inner_ref = inner.borrow_mut();
2001    let should_log = if inner_ref.min_account_state_logging_interval_ns > 0 {
2002        let current_ts = event.ts_init.as_u64();
2003        let last_ts = inner_ref
2004            .last_account_state_log_ts
2005            .get(&event.account_id)
2006            .copied()
2007            .unwrap_or(0);
2008
2009        if last_ts == 0 || (current_ts - last_ts) >= inner_ref.min_account_state_logging_interval_ns
2010        {
2011            inner_ref
2012                .last_account_state_log_ts
2013                .insert(event.account_id, current_ts);
2014            true
2015        } else {
2016            false
2017        }
2018    } else {
2019        true // Throttling disabled, always log
2020    };
2021
2022    if should_log {
2023        log::info!("Updated {event}");
2024    }
2025}