nautilus_portfolio/
portfolio.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `Portfolio` for all environments.
17
18use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_analysis::analyzer::PortfolioAnalyzer;
22use nautilus_common::{
23    cache::Cache,
24    clock::Clock,
25    enums::LogColor,
26    msgbus::{
27        self,
28        handler::{ShareableMessageHandler, TypedMessageHandler},
29    },
30};
31use nautilus_core::{WeakCell, datetime::NANOSECONDS_IN_MILLISECOND};
32use nautilus_model::{
33    accounts::AccountAny,
34    data::{Bar, MarkPriceUpdate, QuoteTick},
35    enums::{OmsType, OrderSide, OrderType, PositionSide, PriceType},
36    events::{AccountState, OrderEventAny, position::PositionEvent},
37    identifiers::{AccountId, InstrumentId, PositionId, Venue},
38    instruments::{Instrument, InstrumentAny},
39    orders::{Order, OrderAny},
40    position::Position,
41    types::{Currency, Money, Price},
42};
43use rust_decimal::{Decimal, prelude::FromPrimitive};
44
45use crate::{config::PortfolioConfig, manager::AccountsManager};
46
47struct PortfolioState {
48    accounts: AccountsManager,
49    analyzer: PortfolioAnalyzer,
50    unrealized_pnls: AHashMap<InstrumentId, Money>,
51    realized_pnls: AHashMap<InstrumentId, Money>,
52    snapshot_sum_per_position: AHashMap<PositionId, Money>,
53    snapshot_last_per_position: AHashMap<PositionId, Money>,
54    snapshot_processed_counts: AHashMap<PositionId, usize>,
55    net_positions: AHashMap<InstrumentId, Decimal>,
56    pending_calcs: AHashSet<InstrumentId>,
57    bar_close_prices: AHashMap<InstrumentId, Price>,
58    initialized: bool,
59    last_account_state_log_ts: AHashMap<AccountId, u64>,
60    min_account_state_logging_interval_ns: u64,
61}
62
63impl PortfolioState {
64    fn new(
65        clock: Rc<RefCell<dyn Clock>>,
66        cache: Rc<RefCell<Cache>>,
67        config: &PortfolioConfig,
68    ) -> Self {
69        let min_account_state_logging_interval_ns = config
70            .min_account_state_logging_interval_ms
71            .map_or(0, |ms| ms * NANOSECONDS_IN_MILLISECOND);
72
73        Self {
74            accounts: AccountsManager::new(clock, cache),
75            analyzer: PortfolioAnalyzer::default(),
76            unrealized_pnls: AHashMap::new(),
77            realized_pnls: AHashMap::new(),
78            snapshot_sum_per_position: AHashMap::new(),
79            snapshot_last_per_position: AHashMap::new(),
80            snapshot_processed_counts: AHashMap::new(),
81            net_positions: AHashMap::new(),
82            pending_calcs: AHashSet::new(),
83            bar_close_prices: AHashMap::new(),
84            initialized: false,
85            last_account_state_log_ts: AHashMap::new(),
86            min_account_state_logging_interval_ns,
87        }
88    }
89
90    fn reset(&mut self) {
91        log::debug!("RESETTING");
92        self.net_positions.clear();
93        self.unrealized_pnls.clear();
94        self.realized_pnls.clear();
95        self.snapshot_sum_per_position.clear();
96        self.snapshot_last_per_position.clear();
97        self.snapshot_processed_counts.clear();
98        self.pending_calcs.clear();
99        self.last_account_state_log_ts.clear();
100        self.analyzer.reset();
101        log::debug!("READY");
102    }
103}
104
105pub struct Portfolio {
106    pub(crate) clock: Rc<RefCell<dyn Clock>>,
107    pub(crate) cache: Rc<RefCell<Cache>>,
108    inner: Rc<RefCell<PortfolioState>>,
109    config: PortfolioConfig,
110}
111
112impl Debug for Portfolio {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.debug_struct(stringify!(Portfolio)).finish()
115    }
116}
117
118impl Portfolio {
119    pub fn new(
120        cache: Rc<RefCell<Cache>>,
121        clock: Rc<RefCell<dyn Clock>>,
122        config: Option<PortfolioConfig>,
123    ) -> Self {
124        let config = config.unwrap_or_default();
125        let inner = Rc::new(RefCell::new(PortfolioState::new(
126            clock.clone(),
127            cache.clone(),
128            &config,
129        )));
130
131        Self::register_message_handlers(
132            cache.clone(),
133            clock.clone(),
134            inner.clone(),
135            config.clone(),
136        );
137
138        Self {
139            clock,
140            cache,
141            inner,
142            config,
143        }
144    }
145
146    /// Creates a shallow clone of the Portfolio that shares the same internal state.
147    ///
148    /// This is useful when multiple components need to reference the same Portfolio
149    /// without creating duplicate msgbus handler registrations.
150    #[must_use]
151    pub fn clone_shallow(&self) -> Self {
152        Self {
153            clock: self.clock.clone(),
154            cache: self.cache.clone(),
155            inner: self.inner.clone(),
156            config: self.config.clone(),
157        }
158    }
159
160    fn register_message_handlers(
161        cache: Rc<RefCell<Cache>>,
162        clock: Rc<RefCell<dyn Clock>>,
163        inner: Rc<RefCell<PortfolioState>>,
164        config: PortfolioConfig,
165    ) {
166        let inner_weak = WeakCell::from(Rc::downgrade(&inner));
167
168        let update_account_handler = {
169            let cache = cache.clone();
170            let inner = inner_weak.clone();
171            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
172                move |event: &AccountState| {
173                    if let Some(inner_rc) = inner.upgrade() {
174                        update_account(cache.clone(), inner_rc.into(), event);
175                    }
176                },
177            )))
178        };
179
180        let update_position_handler = {
181            let cache = cache.clone();
182            let clock = clock.clone();
183            let inner = inner_weak.clone();
184            let config = config.clone();
185            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
186                move |event: &PositionEvent| {
187                    if let Some(inner_rc) = inner.upgrade() {
188                        update_position(
189                            cache.clone(),
190                            clock.clone(),
191                            inner_rc.into(),
192                            config.clone(),
193                            event,
194                        );
195                    }
196                },
197            )))
198        };
199
200        let update_quote_handler = {
201            let cache = cache.clone();
202            let clock = clock.clone();
203            let inner = inner_weak.clone();
204            let config = config.clone();
205            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
206                move |quote: &QuoteTick| {
207                    if let Some(inner_rc) = inner.upgrade() {
208                        update_quote_tick(
209                            cache.clone(),
210                            clock.clone(),
211                            inner_rc.into(),
212                            config.clone(),
213                            quote,
214                        );
215                    }
216                },
217            )))
218        };
219
220        let update_bar_handler = {
221            let cache = cache.clone();
222            let clock = clock.clone();
223            let inner = inner_weak.clone();
224            let config = config.clone();
225            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
226                if let Some(inner_rc) = inner.upgrade() {
227                    update_bar(
228                        cache.clone(),
229                        clock.clone(),
230                        inner_rc.into(),
231                        config.clone(),
232                        bar,
233                    );
234                }
235            })))
236        };
237
238        let update_mark_price_handler = {
239            let cache = cache.clone();
240            let clock = clock.clone();
241            let inner = inner_weak.clone();
242            let config = config.clone();
243            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
244                move |mark_price: &MarkPriceUpdate| {
245                    if let Some(inner_rc) = inner.upgrade() {
246                        update_instrument_id(
247                            cache.clone(),
248                            clock.clone(),
249                            inner_rc.into(),
250                            config.clone(),
251                            &mark_price.instrument_id,
252                        );
253                    }
254                },
255            )))
256        };
257
258        let update_order_handler = {
259            let cache = cache;
260            let clock = clock.clone();
261            let inner = inner_weak;
262            let config = config.clone();
263            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
264                move |event: &OrderEventAny| {
265                    if let Some(inner_rc) = inner.upgrade() {
266                        update_order(
267                            cache.clone(),
268                            clock.clone(),
269                            inner_rc.into(),
270                            config.clone(),
271                            event,
272                        );
273                    }
274                },
275            )))
276        };
277
278        msgbus::register(
279            "Portfolio.update_account".into(),
280            update_account_handler.clone(),
281        );
282
283        msgbus::subscribe("data.quotes.*".into(), update_quote_handler, Some(10));
284        if config.bar_updates {
285            msgbus::subscribe("data.bars.*EXTERNAL".into(), update_bar_handler, Some(10));
286        }
287        if config.use_mark_prices {
288            msgbus::subscribe(
289                "data.mark_prices.*".into(),
290                update_mark_price_handler,
291                Some(10),
292            );
293        }
294        msgbus::subscribe("events.order.*".into(), update_order_handler, Some(10));
295        msgbus::subscribe(
296            "events.position.*".into(),
297            update_position_handler,
298            Some(10),
299        );
300        msgbus::subscribe("events.account.*".into(), update_account_handler, Some(10));
301    }
302
303    pub fn reset(&mut self) {
304        log::debug!("RESETTING");
305        self.inner.borrow_mut().reset();
306        log::debug!("READY");
307    }
308
309    /// Returns a reference to the cache.
310    #[must_use]
311    pub fn cache(&self) -> &Rc<RefCell<Cache>> {
312        &self.cache
313    }
314
315    /// Returns `true` if the portfolio has been initialized.
316    #[must_use]
317    pub fn is_initialized(&self) -> bool {
318        self.inner.borrow().initialized
319    }
320
321    /// Returns the locked balances for the given venue.
322    ///
323    /// Locked balances represent funds reserved for open orders.
324    #[must_use]
325    pub fn balances_locked(&self, venue: &Venue) -> AHashMap<Currency, Money> {
326        self.cache.borrow().account_for_venue(venue).map_or_else(
327            || {
328                log::error!("Cannot get balances locked: no account generated for {venue}");
329                AHashMap::new()
330            },
331            AccountAny::balances_locked,
332        )
333    }
334
335    /// Returns the initial margin requirements for the given venue.
336    ///
337    /// Only applicable for margin accounts. Returns empty map for cash accounts.
338    #[must_use]
339    pub fn margins_init(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
340        self.cache.borrow().account_for_venue(venue).map_or_else(
341            || {
342                log::error!(
343                    "Cannot get initial (order) margins: no account registered for {venue}"
344                );
345                AHashMap::new()
346            },
347            |account| match account {
348                AccountAny::Margin(margin_account) => margin_account.initial_margins(),
349                AccountAny::Cash(_) => {
350                    log::warn!("Initial margins not applicable for cash account");
351                    AHashMap::new()
352                }
353            },
354        )
355    }
356
357    /// Returns the maintenance margin requirements for the given venue.
358    ///
359    /// Only applicable for margin accounts. Returns empty map for cash accounts.
360    #[must_use]
361    pub fn margins_maint(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
362        self.cache.borrow().account_for_venue(venue).map_or_else(
363            || {
364                log::error!(
365                    "Cannot get maintenance (position) margins: no account registered for {venue}"
366                );
367                AHashMap::new()
368            },
369            |account| match account {
370                AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
371                AccountAny::Cash(_) => {
372                    log::warn!("Maintenance margins not applicable for cash account");
373                    AHashMap::new()
374                }
375            },
376        )
377    }
378
379    /// Returns the unrealized PnLs for all positions at the given venue.
380    ///
381    /// Calculates mark-to-market PnL based on current market prices.
382    #[must_use]
383    pub fn unrealized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
384        let instrument_ids = {
385            let cache = self.cache.borrow();
386            let positions = cache.positions(Some(venue), None, None, None);
387
388            if positions.is_empty() {
389                return AHashMap::new(); // Nothing to calculate
390            }
391
392            let instrument_ids: AHashSet<InstrumentId> =
393                positions.iter().map(|p| p.instrument_id).collect();
394
395            instrument_ids
396        };
397
398        let mut unrealized_pnls: AHashMap<Currency, f64> = AHashMap::new();
399
400        for instrument_id in instrument_ids {
401            if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
402                // PnL already calculated
403                *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
404                continue;
405            }
406
407            // Calculate PnL
408            match self.calculate_unrealized_pnl(&instrument_id) {
409                Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
410                None => continue,
411            }
412        }
413
414        unrealized_pnls
415            .into_iter()
416            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
417            .collect()
418    }
419
420    /// Returns the realized PnLs for all positions at the given venue.
421    ///
422    /// Calculates total realized profit and loss from closed positions.
423    #[must_use]
424    pub fn realized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
425        let instrument_ids = {
426            let cache = self.cache.borrow();
427            let positions = cache.positions(Some(venue), None, None, None);
428
429            if positions.is_empty() {
430                return AHashMap::new(); // Nothing to calculate
431            }
432
433            let instrument_ids: AHashSet<InstrumentId> =
434                positions.iter().map(|p| p.instrument_id).collect();
435
436            instrument_ids
437        };
438
439        let mut realized_pnls: AHashMap<Currency, f64> = AHashMap::new();
440
441        for instrument_id in instrument_ids {
442            if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
443                // PnL already calculated
444                *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
445                continue;
446            }
447
448            // Calculate PnL
449            match self.calculate_realized_pnl(&instrument_id) {
450                Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
451                None => continue,
452            }
453        }
454
455        realized_pnls
456            .into_iter()
457            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
458            .collect()
459    }
460
461    #[must_use]
462    pub fn net_exposures(&self, venue: &Venue) -> Option<AHashMap<Currency, Money>> {
463        let cache = self.cache.borrow();
464        let account = if let Some(account) = cache.account_for_venue(venue) {
465            account
466        } else {
467            log::error!("Cannot calculate net exposures: no account registered for {venue}");
468            return None; // Cannot calculate
469        };
470
471        let positions_open = cache.positions_open(Some(venue), None, None, None);
472        if positions_open.is_empty() {
473            return Some(AHashMap::new()); // Nothing to calculate
474        }
475
476        let mut net_exposures: AHashMap<Currency, f64> = AHashMap::new();
477
478        for position in positions_open {
479            let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
480                instrument
481            } else {
482                log::error!(
483                    "Cannot calculate net exposures: no instrument for {}",
484                    position.instrument_id
485                );
486                return None; // Cannot calculate
487            };
488
489            if position.side == PositionSide::Flat {
490                log::error!(
491                    "Cannot calculate net exposures: position is flat for {}",
492                    position.instrument_id
493                );
494                continue; // Nothing to calculate
495            }
496
497            let price = self.get_price(position)?;
498            let xrate = if let Some(xrate) =
499                self.calculate_xrate_to_base(instrument, account, position.entry)
500            {
501                xrate
502            } else {
503                log::error!(
504                    // TODO: Improve logging
505                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
506                    instrument.settlement_currency(),
507                    account.base_currency()
508                );
509                return None; // Cannot calculate
510            };
511
512            let settlement_currency = account
513                .base_currency()
514                .unwrap_or_else(|| instrument.settlement_currency());
515
516            let net_exposure = instrument
517                .calculate_notional_value(position.quantity, price, None)
518                .as_f64()
519                * xrate;
520
521            let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
522                .round()
523                / 10f64.powi(settlement_currency.precision.into());
524
525            *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
526        }
527
528        Some(
529            net_exposures
530                .into_iter()
531                .map(|(currency, amount)| (currency, Money::new(amount, currency)))
532                .collect(),
533        )
534    }
535
536    #[must_use]
537    pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
538        if let Some(pnl) = self
539            .inner
540            .borrow()
541            .unrealized_pnls
542            .get(instrument_id)
543            .copied()
544        {
545            return Some(pnl);
546        }
547
548        let pnl = self.calculate_unrealized_pnl(instrument_id)?;
549        self.inner
550            .borrow_mut()
551            .unrealized_pnls
552            .insert(*instrument_id, pnl);
553        Some(pnl)
554    }
555
556    #[must_use]
557    pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
558        if let Some(pnl) = self
559            .inner
560            .borrow()
561            .realized_pnls
562            .get(instrument_id)
563            .copied()
564        {
565            return Some(pnl);
566        }
567
568        let pnl = self.calculate_realized_pnl(instrument_id)?;
569        self.inner
570            .borrow_mut()
571            .realized_pnls
572            .insert(*instrument_id, pnl);
573        Some(pnl)
574    }
575
576    /// Returns the total PnL for the given instrument ID.
577    ///
578    /// Total PnL = Realized PnL + Unrealized PnL
579    #[must_use]
580    pub fn total_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
581        let realized = self.realized_pnl(instrument_id)?;
582        let unrealized = self.unrealized_pnl(instrument_id)?;
583
584        if realized.currency != unrealized.currency {
585            log::error!(
586                "Cannot calculate total PnL: currency mismatch {} vs {}",
587                realized.currency,
588                unrealized.currency
589            );
590            return None;
591        }
592
593        Some(Money::new(
594            realized.as_f64() + unrealized.as_f64(),
595            realized.currency,
596        ))
597    }
598
599    /// Returns the total PnLs for the given venue.
600    ///
601    /// Total PnL = Realized PnL + Unrealized PnL for each currency
602    #[must_use]
603    pub fn total_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
604        let realized_pnls = self.realized_pnls(venue);
605        let unrealized_pnls = self.unrealized_pnls(venue);
606
607        let mut total_pnls: AHashMap<Currency, Money> = AHashMap::new();
608
609        // Add realized PnLs
610        for (currency, realized) in realized_pnls {
611            total_pnls.insert(currency, realized);
612        }
613
614        // Add unrealized PnLs
615        for (currency, unrealized) in unrealized_pnls {
616            match total_pnls.get_mut(&currency) {
617                Some(total) => {
618                    *total = Money::new(total.as_f64() + unrealized.as_f64(), currency);
619                }
620                None => {
621                    total_pnls.insert(currency, unrealized);
622                }
623            }
624        }
625
626        total_pnls
627    }
628
629    #[must_use]
630    pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
631        let cache = self.cache.borrow();
632        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
633            account
634        } else {
635            log::error!(
636                "Cannot calculate net exposure: no account registered for {}",
637                instrument_id.venue
638            );
639            return None;
640        };
641
642        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
643            instrument
644        } else {
645            log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
646            return None;
647        };
648
649        let positions_open = cache.positions_open(
650            None, // Faster query filtering
651            Some(instrument_id),
652            None,
653            None,
654        );
655
656        if positions_open.is_empty() {
657            return Some(Money::new(0.0, instrument.settlement_currency()));
658        }
659
660        let mut net_exposure = 0.0;
661
662        for position in positions_open {
663            let price = self.get_price(position)?;
664            let xrate = if let Some(xrate) =
665                self.calculate_xrate_to_base(instrument, account, position.entry)
666            {
667                xrate
668            } else {
669                log::error!(
670                    // TODO: Improve logging
671                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
672                    instrument.settlement_currency(),
673                    account.base_currency()
674                );
675                return None; // Cannot calculate
676            };
677
678            let notional_value =
679                instrument.calculate_notional_value(position.quantity, price, None);
680            net_exposure += notional_value.as_f64() * xrate;
681        }
682
683        let settlement_currency = account
684            .base_currency()
685            .unwrap_or_else(|| instrument.settlement_currency());
686
687        Some(Money::new(net_exposure, settlement_currency))
688    }
689
690    #[must_use]
691    pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
692        self.inner
693            .borrow()
694            .net_positions
695            .get(instrument_id)
696            .copied()
697            .unwrap_or(Decimal::ZERO)
698    }
699
700    #[must_use]
701    pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
702        self.inner
703            .borrow()
704            .net_positions
705            .get(instrument_id)
706            .copied()
707            .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
708    }
709
710    #[must_use]
711    pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
712        self.inner
713            .borrow()
714            .net_positions
715            .get(instrument_id)
716            .copied()
717            .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
718    }
719
720    #[must_use]
721    pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
722        self.inner
723            .borrow()
724            .net_positions
725            .get(instrument_id)
726            .copied()
727            .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
728    }
729
730    #[must_use]
731    pub fn is_completely_flat(&self) -> bool {
732        for net_position in self.inner.borrow().net_positions.values() {
733            if *net_position != Decimal::ZERO {
734                return false;
735            }
736        }
737        true
738    }
739
740    /// Initializes account margin based on existing open orders.
741    ///
742    /// # Panics
743    ///
744    /// Panics if updating the cache with a mutated account fails.
745    pub fn initialize_orders(&mut self) {
746        let mut initialized = true;
747        let orders_and_instruments = {
748            let cache = self.cache.borrow();
749            let all_orders_open = cache.orders_open(None, None, None, None);
750
751            let mut instruments_with_orders = Vec::new();
752            let mut instruments = AHashSet::new();
753
754            for order in &all_orders_open {
755                instruments.insert(order.instrument_id());
756            }
757
758            for instrument_id in instruments {
759                if let Some(instrument) = cache.instrument(&instrument_id) {
760                    let orders = cache
761                        .orders_open(None, Some(&instrument_id), None, None)
762                        .into_iter()
763                        .cloned()
764                        .collect::<Vec<OrderAny>>();
765                    instruments_with_orders.push((instrument.clone(), orders));
766                } else {
767                    log::error!(
768                        "Cannot update initial (order) margin: no instrument found for {instrument_id}"
769                    );
770                    initialized = false;
771                    break;
772                }
773            }
774            instruments_with_orders
775        };
776
777        for (instrument, orders_open) in &orders_and_instruments {
778            let mut cache = self.cache.borrow_mut();
779            let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
780                account
781            } else {
782                log::error!(
783                    "Cannot update initial (order) margin: no account registered for {}",
784                    instrument.id().venue
785                );
786                initialized = false;
787                break;
788            };
789
790            let result = self.inner.borrow_mut().accounts.update_orders(
791                account,
792                instrument.clone(),
793                orders_open.iter().collect(),
794                self.clock.borrow().timestamp_ns(),
795            );
796
797            match result {
798                Some((updated_account, _)) => {
799                    cache.update_account(updated_account).unwrap();
800                }
801                None => {
802                    initialized = false;
803                }
804            }
805        }
806
807        let total_orders = orders_and_instruments
808            .into_iter()
809            .map(|(_, orders)| orders.len())
810            .sum::<usize>();
811
812        log::info!(
813            color = if total_orders > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
814            "Initialized {} open order{}",
815            total_orders,
816            if total_orders == 1 { "" } else { "s" }
817        );
818
819        self.inner.borrow_mut().initialized = initialized;
820    }
821
822    /// Initializes account margin based on existing open positions.
823    ///
824    /// # Panics
825    ///
826    /// Panics if calculation of PnL or updating the cache with a mutated account fails.
827    pub fn initialize_positions(&mut self) {
828        self.inner.borrow_mut().unrealized_pnls.clear();
829        self.inner.borrow_mut().realized_pnls.clear();
830        let all_positions_open: Vec<Position>;
831        let mut instruments = AHashSet::new();
832        {
833            let cache = self.cache.borrow();
834            all_positions_open = cache
835                .positions_open(None, None, None, None)
836                .into_iter()
837                .cloned()
838                .collect();
839            for position in &all_positions_open {
840                instruments.insert(position.instrument_id);
841            }
842        }
843
844        let mut initialized = true;
845
846        for instrument_id in instruments {
847            let positions_open: Vec<Position> = {
848                let cache = self.cache.borrow();
849                cache
850                    .positions_open(None, Some(&instrument_id), None, None)
851                    .into_iter()
852                    .cloned()
853                    .collect()
854            };
855
856            self.update_net_position(&instrument_id, positions_open);
857
858            if let Some(calculated_unrealized_pnl) = self.calculate_unrealized_pnl(&instrument_id) {
859                self.inner
860                    .borrow_mut()
861                    .unrealized_pnls
862                    .insert(instrument_id, calculated_unrealized_pnl);
863            } else {
864                log::warn!(
865                    "Failed to calculate unrealized PnL for {instrument_id}, marking as pending"
866                );
867                self.inner.borrow_mut().pending_calcs.insert(instrument_id);
868            }
869
870            if let Some(calculated_realized_pnl) = self.calculate_realized_pnl(&instrument_id) {
871                self.inner
872                    .borrow_mut()
873                    .realized_pnls
874                    .insert(instrument_id, calculated_realized_pnl);
875            } else {
876                log::warn!(
877                    "Failed to calculate realized PnL for {instrument_id}, marking as pending"
878                );
879                self.inner.borrow_mut().pending_calcs.insert(instrument_id);
880            }
881
882            let cache = self.cache.borrow();
883            let Some(account) = cache.account_for_venue(&instrument_id.venue).cloned() else {
884                log::error!(
885                    "Cannot update maintenance (position) margin: no account registered for {}",
886                    instrument_id.venue
887                );
888                initialized = false;
889                break;
890            };
891
892            let account = match account {
893                AccountAny::Cash(_) => continue,
894                AccountAny::Margin(margin_account) => margin_account,
895            };
896
897            let Some(instrument) = cache.instrument(&instrument_id).cloned() else {
898                log::error!(
899                    "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
900                );
901                initialized = false;
902                break;
903            };
904            let positions: Vec<Position> = cache
905                .positions_open(None, Some(&instrument_id), None, None)
906                .into_iter()
907                .cloned()
908                .collect();
909            drop(cache);
910
911            let result = self.inner.borrow_mut().accounts.update_positions(
912                &account,
913                instrument,
914                positions.iter().collect(),
915                self.clock.borrow().timestamp_ns(),
916            );
917
918            match result {
919                Some((updated_account, _)) => {
920                    self.cache
921                        .borrow_mut()
922                        .update_account(AccountAny::Margin(updated_account))
923                        .unwrap();
924                }
925                None => {
926                    initialized = false;
927                }
928            }
929        }
930
931        let open_count = all_positions_open.len();
932        self.inner.borrow_mut().initialized = initialized;
933        log::info!(
934            color = if open_count > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
935            "Initialized {} open position{}",
936            open_count,
937            if open_count == 1 { "" } else { "s" }
938        );
939    }
940
941    /// Updates portfolio calculations based on a new quote tick.
942    ///
943    /// Recalculates unrealized PnL for positions affected by the quote update.
944    pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
945        update_quote_tick(
946            self.cache.clone(),
947            self.clock.clone(),
948            self.inner.clone(),
949            self.config.clone(),
950            quote,
951        );
952    }
953
954    /// Updates portfolio calculations based on a new bar.
955    ///
956    /// Updates cached bar close prices and recalculates unrealized PnL.
957    pub fn update_bar(&mut self, bar: &Bar) {
958        update_bar(
959            self.cache.clone(),
960            self.clock.clone(),
961            self.inner.clone(),
962            self.config.clone(),
963            bar,
964        );
965    }
966
967    /// Updates portfolio with a new account state event.
968    pub fn update_account(&mut self, event: &AccountState) {
969        update_account(self.cache.clone(), self.inner.clone(), event);
970    }
971
972    /// Updates portfolio calculations based on an order event.
973    ///
974    /// Handles balance updates for order fills and margin calculations for order changes.
975    pub fn update_order(&mut self, event: &OrderEventAny) {
976        update_order(
977            self.cache.clone(),
978            self.clock.clone(),
979            self.inner.clone(),
980            self.config.clone(),
981            event,
982        );
983    }
984
985    /// Updates portfolio calculations based on a position event.
986    ///
987    /// Recalculates net positions, unrealized PnL, and margin requirements.
988    pub fn update_position(&mut self, event: &PositionEvent) {
989        update_position(
990            self.cache.clone(),
991            self.clock.clone(),
992            self.inner.clone(),
993            self.config.clone(),
994            event,
995        );
996    }
997
998    fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
999        let mut net_position = Decimal::ZERO;
1000
1001        for open_position in positions_open {
1002            log::debug!("open_position: {open_position}");
1003            net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
1004        }
1005
1006        let existing_position = self.net_position(instrument_id);
1007        if existing_position != net_position {
1008            self.inner
1009                .borrow_mut()
1010                .net_positions
1011                .insert(*instrument_id, net_position);
1012            log::info!("{instrument_id} net_position={net_position}");
1013        }
1014    }
1015
1016    fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1017        let cache = self.cache.borrow();
1018        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1019            account
1020        } else {
1021            log::error!(
1022                "Cannot calculate unrealized PnL: no account registered for {}",
1023                instrument_id.venue
1024            );
1025            return None;
1026        };
1027
1028        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1029            instrument
1030        } else {
1031            log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
1032            return None;
1033        };
1034
1035        let currency = account
1036            .base_currency()
1037            .unwrap_or_else(|| instrument.settlement_currency());
1038
1039        let positions_open = cache.positions_open(
1040            None, // Faster query filtering
1041            Some(instrument_id),
1042            None,
1043            None,
1044        );
1045
1046        if positions_open.is_empty() {
1047            return Some(Money::new(0.0, currency));
1048        }
1049
1050        let mut total_pnl = 0.0;
1051
1052        for position in positions_open {
1053            if position.instrument_id != *instrument_id {
1054                continue; // Nothing to calculate
1055            }
1056
1057            if position.side == PositionSide::Flat {
1058                continue; // Nothing to calculate
1059            }
1060
1061            let price = if let Some(price) = self.get_price(position) {
1062                price
1063            } else {
1064                log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
1065                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1066                return None; // Cannot calculate
1067            };
1068
1069            let mut pnl = position.unrealized_pnl(price).as_f64();
1070
1071            if let Some(base_currency) = account.base_currency() {
1072                let xrate = if let Some(xrate) =
1073                    self.calculate_xrate_to_base(instrument, account, position.entry)
1074                {
1075                    xrate
1076                } else {
1077                    log::error!(
1078                        // TODO: Improve logging
1079                        "Cannot calculate unrealized PnL: insufficient data for {}/{}",
1080                        instrument.settlement_currency(),
1081                        base_currency
1082                    );
1083                    self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1084                    return None; // Cannot calculate
1085                };
1086
1087                let scale = 10f64.powi(currency.precision.into());
1088                pnl = ((pnl * xrate) * scale).round() / scale;
1089            }
1090
1091            total_pnl += pnl;
1092        }
1093
1094        Some(Money::new(total_pnl, currency))
1095    }
1096
1097    fn ensure_snapshot_pnls_cached_for(&mut self, instrument_id: &InstrumentId) {
1098        // Performance: This method maintains an incremental cache of snapshot PnLs
1099        // It only deserializes new snapshots that haven't been processed yet
1100        // Tracks sum and last PnL per position for efficient NETTING OMS support
1101
1102        // Get all position IDs that have snapshots for this instrument
1103        let snapshot_position_ids = self.cache.borrow().position_snapshot_ids(instrument_id);
1104
1105        if snapshot_position_ids.is_empty() {
1106            return; // Nothing to process
1107        }
1108
1109        let mut rebuild = false;
1110
1111        // Detect purge/reset (count regression) to trigger full rebuild
1112        for position_id in &snapshot_position_ids {
1113            let position_snapshots = self.cache.borrow().position_snapshot_bytes(position_id);
1114            let curr_count = position_snapshots.map_or(0, |s| {
1115                // Count the number of snapshots (they're serialized as JSON objects)
1116                s.split(|&b| b == b'{').count() - 1
1117            });
1118            let prev_count = self
1119                .inner
1120                .borrow()
1121                .snapshot_processed_counts
1122                .get(position_id)
1123                .copied()
1124                .unwrap_or(0);
1125
1126            if prev_count > curr_count {
1127                rebuild = true;
1128                break;
1129            }
1130        }
1131
1132        if rebuild {
1133            // Full rebuild: process all snapshots from scratch
1134            for position_id in &snapshot_position_ids {
1135                if let Some(position_snapshots) =
1136                    self.cache.borrow().position_snapshot_bytes(position_id)
1137                {
1138                    let mut sum_pnl: Option<Money> = None;
1139                    let mut last_pnl: Option<Money> = None;
1140
1141                    // Snapshots are concatenated JSON objects
1142                    let mut start = 0;
1143                    let mut depth = 0;
1144                    let mut in_string = false;
1145                    let mut escape_next = false;
1146
1147                    for (i, &byte) in position_snapshots.iter().enumerate() {
1148                        if escape_next {
1149                            escape_next = false;
1150                            continue;
1151                        }
1152
1153                        if byte == b'\\' && in_string {
1154                            escape_next = true;
1155                            continue;
1156                        }
1157
1158                        if byte == b'"' && !escape_next {
1159                            in_string = !in_string;
1160                        }
1161
1162                        if !in_string {
1163                            if byte == b'{' {
1164                                if depth == 0 {
1165                                    start = i;
1166                                }
1167                                depth += 1;
1168                            } else if byte == b'}' {
1169                                depth -= 1;
1170                                if depth == 0
1171                                    && let Ok(snapshot) = serde_json::from_slice::<Position>(
1172                                        &position_snapshots[start..=i],
1173                                    )
1174                                    && let Some(realized_pnl) = snapshot.realized_pnl
1175                                {
1176                                    if let Some(ref mut sum) = sum_pnl {
1177                                        if sum.currency == realized_pnl.currency {
1178                                            *sum = Money::new(
1179                                                sum.as_f64() + realized_pnl.as_f64(),
1180                                                sum.currency,
1181                                            );
1182                                        }
1183                                    } else {
1184                                        sum_pnl = Some(realized_pnl);
1185                                    }
1186                                    last_pnl = Some(realized_pnl);
1187                                }
1188                            }
1189                        }
1190                    }
1191
1192                    let mut inner = self.inner.borrow_mut();
1193                    if let Some(sum) = sum_pnl {
1194                        inner.snapshot_sum_per_position.insert(*position_id, sum);
1195                        if let Some(last) = last_pnl {
1196                            inner.snapshot_last_per_position.insert(*position_id, last);
1197                        }
1198                    } else {
1199                        inner.snapshot_sum_per_position.remove(position_id);
1200                        inner.snapshot_last_per_position.remove(position_id);
1201                    }
1202
1203                    let snapshot_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1204                    inner
1205                        .snapshot_processed_counts
1206                        .insert(*position_id, snapshot_count);
1207                }
1208            }
1209        } else {
1210            // Incremental path: only process new snapshots
1211            for position_id in &snapshot_position_ids {
1212                if let Some(position_snapshots) =
1213                    self.cache.borrow().position_snapshot_bytes(position_id)
1214                {
1215                    let curr_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1216                    let prev_count = self
1217                        .inner
1218                        .borrow()
1219                        .snapshot_processed_counts
1220                        .get(position_id)
1221                        .copied()
1222                        .unwrap_or(0);
1223
1224                    if prev_count >= curr_count {
1225                        continue;
1226                    }
1227
1228                    let mut sum_pnl = self
1229                        .inner
1230                        .borrow()
1231                        .snapshot_sum_per_position
1232                        .get(position_id)
1233                        .copied();
1234                    let mut last_pnl = self
1235                        .inner
1236                        .borrow()
1237                        .snapshot_last_per_position
1238                        .get(position_id)
1239                        .copied();
1240
1241                    // Process only new snapshots
1242                    let mut start = 0;
1243                    let mut depth = 0;
1244                    let mut in_string = false;
1245                    let mut escape_next = false;
1246                    let mut snapshot_index = 0;
1247
1248                    for (i, &byte) in position_snapshots.iter().enumerate() {
1249                        if escape_next {
1250                            escape_next = false;
1251                            continue;
1252                        }
1253
1254                        if byte == b'\\' && in_string {
1255                            escape_next = true;
1256                            continue;
1257                        }
1258
1259                        if byte == b'"' && !escape_next {
1260                            in_string = !in_string;
1261                        }
1262
1263                        if !in_string {
1264                            if byte == b'{' {
1265                                if depth == 0 {
1266                                    start = i;
1267                                }
1268                                depth += 1;
1269                            } else if byte == b'}' {
1270                                depth -= 1;
1271                                if depth == 0 {
1272                                    snapshot_index += 1;
1273                                    // Only process new snapshots
1274                                    if snapshot_index > prev_count
1275                                        && let Ok(snapshot) = serde_json::from_slice::<Position>(
1276                                            &position_snapshots[start..=i],
1277                                        )
1278                                        && let Some(realized_pnl) = snapshot.realized_pnl
1279                                    {
1280                                        if let Some(ref mut sum) = sum_pnl {
1281                                            if sum.currency == realized_pnl.currency {
1282                                                *sum = Money::new(
1283                                                    sum.as_f64() + realized_pnl.as_f64(),
1284                                                    sum.currency,
1285                                                );
1286                                            }
1287                                        } else {
1288                                            sum_pnl = Some(realized_pnl);
1289                                        }
1290                                        last_pnl = Some(realized_pnl);
1291                                    }
1292                                }
1293                            }
1294                        }
1295                    }
1296
1297                    let mut inner = self.inner.borrow_mut();
1298                    if let Some(sum) = sum_pnl {
1299                        inner.snapshot_sum_per_position.insert(*position_id, sum);
1300                        if let Some(last) = last_pnl {
1301                            inner.snapshot_last_per_position.insert(*position_id, last);
1302                        }
1303                    }
1304                    inner
1305                        .snapshot_processed_counts
1306                        .insert(*position_id, curr_count);
1307                }
1308            }
1309        }
1310    }
1311
1312    fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1313        // Ensure snapshot PnLs are cached for this instrument
1314        self.ensure_snapshot_pnls_cached_for(instrument_id);
1315
1316        let cache = self.cache.borrow();
1317        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1318            account
1319        } else {
1320            log::error!(
1321                "Cannot calculate realized PnL: no account registered for {}",
1322                instrument_id.venue
1323            );
1324            return None;
1325        };
1326
1327        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1328            instrument
1329        } else {
1330            log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
1331            return None;
1332        };
1333
1334        let currency = account
1335            .base_currency()
1336            .unwrap_or_else(|| instrument.settlement_currency());
1337
1338        let positions = cache.positions(
1339            None, // Faster query filtering
1340            Some(instrument_id),
1341            None,
1342            None,
1343        );
1344
1345        let snapshot_position_ids = cache.position_snapshot_ids(instrument_id);
1346
1347        // Check if we need to use NETTING OMS logic
1348        let is_netting = positions
1349            .iter()
1350            .any(|p| cache.oms_type(&p.id) == Some(OmsType::Netting));
1351
1352        let mut total_pnl = 0.0;
1353
1354        if is_netting && !snapshot_position_ids.is_empty() {
1355            // NETTING OMS: Apply 3-case rule for position cycles
1356
1357            for position_id in &snapshot_position_ids {
1358                let is_active = positions.iter().any(|p| p.id == *position_id);
1359
1360                if is_active {
1361                    // Case 1 & 2: Active position - use only the last snapshot PnL
1362                    let last_pnl = self
1363                        .inner
1364                        .borrow()
1365                        .snapshot_last_per_position
1366                        .get(position_id)
1367                        .copied();
1368                    if let Some(last_pnl) = last_pnl {
1369                        let mut pnl = last_pnl.as_f64();
1370
1371                        if let Some(base_currency) = account.base_currency()
1372                            && let Some(position) = positions.iter().find(|p| p.id == *position_id)
1373                        {
1374                            let xrate = if let Some(xrate) =
1375                                self.calculate_xrate_to_base(instrument, account, position.entry)
1376                            {
1377                                xrate
1378                            } else {
1379                                log::error!(
1380                                    "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1381                                    instrument.settlement_currency(),
1382                                    base_currency
1383                                );
1384                                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1385                                return Some(Money::new(0.0, currency));
1386                            };
1387
1388                            let scale = 10f64.powi(currency.precision.into());
1389                            pnl = ((pnl * xrate) * scale).round() / scale;
1390                        }
1391
1392                        total_pnl += pnl;
1393                    }
1394                } else {
1395                    // Case 3: Closed position - use sum of all snapshot PnLs
1396                    let sum_pnl = self
1397                        .inner
1398                        .borrow()
1399                        .snapshot_sum_per_position
1400                        .get(position_id)
1401                        .copied();
1402                    if let Some(sum_pnl) = sum_pnl {
1403                        let mut pnl = sum_pnl.as_f64();
1404
1405                        if let Some(base_currency) = account.base_currency() {
1406                            // For closed positions, we don't have entry price, use current rates
1407                            let xrate = cache.get_xrate(
1408                                instrument_id.venue,
1409                                instrument.settlement_currency(),
1410                                base_currency,
1411                                PriceType::Mid,
1412                            );
1413
1414                            if let Some(xrate) = xrate {
1415                                let scale = 10f64.powi(currency.precision.into());
1416                                pnl = ((pnl * xrate) * scale).round() / scale;
1417                            } else {
1418                                log::error!(
1419                                    "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1420                                    instrument.settlement_currency(),
1421                                    base_currency
1422                                );
1423                                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1424                                return Some(Money::new(0.0, currency));
1425                            }
1426                        }
1427
1428                        total_pnl += pnl;
1429                    }
1430                }
1431            }
1432
1433            // Add realized PnL from current active positions
1434            for position in positions {
1435                if position.instrument_id != *instrument_id {
1436                    continue;
1437                }
1438
1439                if let Some(realized_pnl) = position.realized_pnl {
1440                    let mut pnl = realized_pnl.as_f64();
1441
1442                    if let Some(base_currency) = account.base_currency() {
1443                        let xrate = if let Some(xrate) =
1444                            self.calculate_xrate_to_base(instrument, account, position.entry)
1445                        {
1446                            xrate
1447                        } else {
1448                            log::error!(
1449                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1450                                instrument.settlement_currency(),
1451                                base_currency
1452                            );
1453                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1454                            return Some(Money::new(0.0, currency));
1455                        };
1456
1457                        let scale = 10f64.powi(currency.precision.into());
1458                        pnl = ((pnl * xrate) * scale).round() / scale;
1459                    }
1460
1461                    total_pnl += pnl;
1462                }
1463            }
1464        } else {
1465            // HEDGING OMS or no snapshots: Simple aggregation
1466            // Add snapshot PnLs (sum all)
1467            for position_id in &snapshot_position_ids {
1468                let sum_pnl = self
1469                    .inner
1470                    .borrow()
1471                    .snapshot_sum_per_position
1472                    .get(position_id)
1473                    .copied();
1474                if let Some(sum_pnl) = sum_pnl {
1475                    let mut pnl = sum_pnl.as_f64();
1476
1477                    if let Some(base_currency) = account.base_currency() {
1478                        let xrate = cache.get_xrate(
1479                            instrument_id.venue,
1480                            instrument.settlement_currency(),
1481                            base_currency,
1482                            PriceType::Mid,
1483                        );
1484
1485                        if let Some(xrate) = xrate {
1486                            let scale = 10f64.powi(currency.precision.into());
1487                            pnl = ((pnl * xrate) * scale).round() / scale;
1488                        } else {
1489                            log::error!(
1490                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1491                                instrument.settlement_currency(),
1492                                base_currency
1493                            );
1494                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1495                            return Some(Money::new(0.0, currency));
1496                        }
1497                    }
1498
1499                    total_pnl += pnl;
1500                }
1501            }
1502
1503            // Add realized PnL from current positions
1504            for position in positions {
1505                if position.instrument_id != *instrument_id {
1506                    continue;
1507                }
1508
1509                if let Some(realized_pnl) = position.realized_pnl {
1510                    let mut pnl = realized_pnl.as_f64();
1511
1512                    if let Some(base_currency) = account.base_currency() {
1513                        let xrate = if let Some(xrate) =
1514                            self.calculate_xrate_to_base(instrument, account, position.entry)
1515                        {
1516                            xrate
1517                        } else {
1518                            log::error!(
1519                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1520                                instrument.settlement_currency(),
1521                                base_currency
1522                            );
1523                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1524                            return Some(Money::new(0.0, currency));
1525                        };
1526
1527                        let scale = 10f64.powi(currency.precision.into());
1528                        pnl = ((pnl * xrate) * scale).round() / scale;
1529                    }
1530
1531                    total_pnl += pnl;
1532                }
1533            }
1534        }
1535
1536        Some(Money::new(total_pnl, currency))
1537    }
1538
1539    fn get_price(&self, position: &Position) -> Option<Price> {
1540        let cache = self.cache.borrow();
1541        let instrument_id = &position.instrument_id;
1542
1543        // Check for mark price first if configured
1544        if self.config.use_mark_prices
1545            && let Some(mark_price) = cache.mark_price(instrument_id)
1546        {
1547            return Some(mark_price.value);
1548        }
1549
1550        // Fall back to bid/ask based on position side
1551        let price_type = match position.side {
1552            PositionSide::Long => PriceType::Bid,
1553            PositionSide::Short => PriceType::Ask,
1554            _ => panic!("invalid `PositionSide`, was {}", position.side),
1555        };
1556
1557        cache
1558            .price(instrument_id, price_type)
1559            .or_else(|| cache.price(instrument_id, PriceType::Last))
1560            .or_else(|| {
1561                self.inner
1562                    .borrow()
1563                    .bar_close_prices
1564                    .get(instrument_id)
1565                    .copied()
1566            })
1567    }
1568
1569    fn calculate_xrate_to_base(
1570        &self,
1571        instrument: &InstrumentAny,
1572        account: &AccountAny,
1573        side: OrderSide,
1574    ) -> Option<f64> {
1575        if !self.config.convert_to_account_base_currency {
1576            return Some(1.0); // No conversion needed
1577        }
1578
1579        match account.base_currency() {
1580            None => Some(1.0), // No conversion needed
1581            Some(base_currency) => {
1582                let cache = self.cache.borrow();
1583
1584                if self.config.use_mark_xrates {
1585                    return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1586                }
1587
1588                let price_type = if side == OrderSide::Buy {
1589                    PriceType::Bid
1590                } else {
1591                    PriceType::Ask
1592                };
1593
1594                cache.get_xrate(
1595                    instrument.id().venue,
1596                    instrument.settlement_currency(),
1597                    base_currency,
1598                    price_type,
1599                )
1600            }
1601        }
1602    }
1603}
1604
1605// Helper functions
1606fn update_quote_tick(
1607    cache: Rc<RefCell<Cache>>,
1608    clock: Rc<RefCell<dyn Clock>>,
1609    inner: Rc<RefCell<PortfolioState>>,
1610    config: PortfolioConfig,
1611    quote: &QuoteTick,
1612) {
1613    update_instrument_id(cache, clock.clone(), inner, config, &quote.instrument_id);
1614}
1615
1616fn update_bar(
1617    cache: Rc<RefCell<Cache>>,
1618    clock: Rc<RefCell<dyn Clock>>,
1619    inner: Rc<RefCell<PortfolioState>>,
1620    config: PortfolioConfig,
1621    bar: &Bar,
1622) {
1623    let instrument_id = bar.bar_type.instrument_id();
1624    inner
1625        .borrow_mut()
1626        .bar_close_prices
1627        .insert(instrument_id, bar.close);
1628    update_instrument_id(cache, clock.clone(), inner, config, &instrument_id);
1629}
1630
1631fn update_instrument_id(
1632    cache: Rc<RefCell<Cache>>,
1633    clock: Rc<RefCell<dyn Clock>>,
1634    inner: Rc<RefCell<PortfolioState>>,
1635    config: PortfolioConfig,
1636    instrument_id: &InstrumentId,
1637) {
1638    inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1639
1640    if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1641        return;
1642    }
1643
1644    let result_init;
1645    let mut result_maint = None;
1646
1647    let account = {
1648        let account = {
1649            let cache_ref = cache.borrow();
1650            if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1651                account.clone()
1652            } else {
1653                log::error!(
1654                    "Cannot update tick: no account registered for {}",
1655                    instrument_id.venue
1656                );
1657                return;
1658            }
1659        };
1660
1661        let mut cache_ref = cache.borrow_mut();
1662        let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1663            instrument.clone()
1664        } else {
1665            log::error!("Cannot update tick: no instrument found for {instrument_id}");
1666            return;
1667        };
1668
1669        // Clone the orders and positions to own the data
1670        let orders_open: Vec<OrderAny> = cache_ref
1671            .orders_open(None, Some(instrument_id), None, None)
1672            .iter()
1673            .map(|o| (*o).clone())
1674            .collect();
1675
1676        let positions_open: Vec<Position> = cache_ref
1677            .positions_open(None, Some(instrument_id), None, None)
1678            .iter()
1679            .map(|p| (*p).clone())
1680            .collect();
1681
1682        result_init = inner.borrow().accounts.update_orders(
1683            &account,
1684            instrument.clone(),
1685            orders_open.iter().collect(),
1686            clock.borrow().timestamp_ns(),
1687        );
1688
1689        if let AccountAny::Margin(ref margin_account) = account {
1690            result_maint = inner.borrow().accounts.update_positions(
1691                margin_account,
1692                instrument,
1693                positions_open.iter().collect(),
1694                clock.borrow().timestamp_ns(),
1695            );
1696        }
1697
1698        if let Some((ref updated_account, _)) = result_init {
1699            cache_ref.update_account(updated_account.clone()).unwrap();
1700        }
1701        account
1702    };
1703
1704    let mut portfolio_clone = Portfolio {
1705        clock: clock.clone(),
1706        cache,
1707        inner: inner.clone(),
1708        config,
1709    };
1710
1711    let result_unrealized_pnl: Option<Money> =
1712        portfolio_clone.calculate_unrealized_pnl(instrument_id);
1713
1714    if result_init.is_some()
1715        && (matches!(account, AccountAny::Cash(_))
1716            || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1717    {
1718        inner.borrow_mut().pending_calcs.remove(instrument_id);
1719        if inner.borrow().pending_calcs.is_empty() {
1720            inner.borrow_mut().initialized = true;
1721        }
1722    }
1723}
1724
1725fn update_order(
1726    cache: Rc<RefCell<Cache>>,
1727    clock: Rc<RefCell<dyn Clock>>,
1728    inner: Rc<RefCell<PortfolioState>>,
1729    _config: PortfolioConfig,
1730    event: &OrderEventAny,
1731) {
1732    let cache_ref = cache.borrow();
1733    let account_id = match event.account_id() {
1734        Some(account_id) => account_id,
1735        None => {
1736            return; // No Account Assigned
1737        }
1738    };
1739
1740    let account = if let Some(account) = cache_ref.account(&account_id) {
1741        account
1742    } else {
1743        log::error!("Cannot update order: no account registered for {account_id}");
1744        return;
1745    };
1746
1747    match account {
1748        AccountAny::Cash(cash_account) => {
1749            if !cash_account.base.calculate_account_state {
1750                return;
1751            }
1752        }
1753        AccountAny::Margin(margin_account) => {
1754            if !margin_account.base.calculate_account_state {
1755                return;
1756            }
1757        }
1758    }
1759
1760    match event {
1761        OrderEventAny::Accepted(_)
1762        | OrderEventAny::Canceled(_)
1763        | OrderEventAny::Rejected(_)
1764        | OrderEventAny::Updated(_)
1765        | OrderEventAny::Filled(_) => {}
1766        _ => {
1767            return;
1768        }
1769    }
1770
1771    let cache_ref = cache.borrow();
1772    let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1773        order
1774    } else {
1775        log::error!(
1776            "Cannot update order: {} not found in the cache",
1777            event.client_order_id()
1778        );
1779        return; // No Order Found
1780    };
1781
1782    if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1783        return; // No change to account state
1784    }
1785
1786    let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1787        instrument_id
1788    } else {
1789        log::error!(
1790            "Cannot update order: no instrument found for {}",
1791            event.instrument_id()
1792        );
1793        return;
1794    };
1795
1796    if let OrderEventAny::Filled(order_filled) = event {
1797        let _ = inner.borrow().accounts.update_balances(
1798            account.clone(),
1799            instrument.clone(),
1800            *order_filled,
1801        );
1802
1803        let mut portfolio_clone = Portfolio {
1804            clock: clock.clone(),
1805            cache: cache.clone(),
1806            inner: inner.clone(),
1807            config: PortfolioConfig::default(), // TODO: TBD
1808        };
1809
1810        match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1811            Some(unrealized_pnl) => {
1812                inner
1813                    .borrow_mut()
1814                    .unrealized_pnls
1815                    .insert(event.instrument_id(), unrealized_pnl);
1816            }
1817            None => {
1818                log::error!(
1819                    "Failed to calculate unrealized PnL for instrument {}",
1820                    event.instrument_id()
1821                );
1822            }
1823        }
1824    }
1825
1826    let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None);
1827
1828    let account_state = inner.borrow_mut().accounts.update_orders(
1829        account,
1830        instrument.clone(),
1831        orders_open,
1832        clock.borrow().timestamp_ns(),
1833    );
1834
1835    let mut cache_ref = cache.borrow_mut();
1836    cache_ref.update_account(account.clone()).unwrap();
1837
1838    if let Some(account_state) = account_state {
1839        msgbus::publish(
1840            format!("events.account.{}", account.id()).into(),
1841            &account_state,
1842        );
1843    } else {
1844        log::debug!("Added pending calculation for {}", instrument.id());
1845        inner.borrow_mut().pending_calcs.insert(instrument.id());
1846    }
1847
1848    log::debug!("Updated {event}");
1849}
1850
1851fn update_position(
1852    cache: Rc<RefCell<Cache>>,
1853    clock: Rc<RefCell<dyn Clock>>,
1854    inner: Rc<RefCell<PortfolioState>>,
1855    _config: PortfolioConfig,
1856    event: &PositionEvent,
1857) {
1858    let instrument_id = event.instrument_id();
1859
1860    let positions_open: Vec<Position> = {
1861        let cache_ref = cache.borrow();
1862
1863        cache_ref
1864            .positions_open(None, Some(&instrument_id), None, None)
1865            .iter()
1866            .map(|o| (*o).clone())
1867            .collect()
1868    };
1869
1870    log::debug!("position fresh from cache -> {positions_open:?}");
1871
1872    let mut portfolio_clone = Portfolio {
1873        clock: clock.clone(),
1874        cache: cache.clone(),
1875        inner: inner.clone(),
1876        config: PortfolioConfig::default(), // TODO: TBD
1877    };
1878
1879    portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1880
1881    if let Some(calculated_unrealized_pnl) =
1882        portfolio_clone.calculate_unrealized_pnl(&instrument_id)
1883    {
1884        inner
1885            .borrow_mut()
1886            .unrealized_pnls
1887            .insert(event.instrument_id(), calculated_unrealized_pnl);
1888    } else {
1889        log::warn!(
1890            "Failed to calculate unrealized PnL for {}, marking as pending",
1891            event.instrument_id()
1892        );
1893        inner
1894            .borrow_mut()
1895            .pending_calcs
1896            .insert(event.instrument_id());
1897    }
1898
1899    if let Some(calculated_realized_pnl) = portfolio_clone.calculate_realized_pnl(&instrument_id) {
1900        inner
1901            .borrow_mut()
1902            .realized_pnls
1903            .insert(event.instrument_id(), calculated_realized_pnl);
1904    } else {
1905        log::warn!(
1906            "Failed to calculate realized PnL for {}, marking as pending",
1907            event.instrument_id()
1908        );
1909        inner
1910            .borrow_mut()
1911            .pending_calcs
1912            .insert(event.instrument_id());
1913    }
1914
1915    let cache_ref = cache.borrow();
1916    let account = cache_ref.account(&event.account_id());
1917
1918    if let Some(AccountAny::Margin(margin_account)) = account {
1919        if !margin_account.calculate_account_state {
1920            return; // Nothing to calculate
1921        }
1922
1923        let cache_ref = cache.borrow();
1924        let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1925            instrument
1926        } else {
1927            log::error!("Cannot update position: no instrument found for {instrument_id}");
1928            return;
1929        };
1930
1931        let result = inner.borrow_mut().accounts.update_positions(
1932            margin_account,
1933            instrument.clone(),
1934            positions_open.iter().collect(),
1935            clock.borrow().timestamp_ns(),
1936        );
1937        let mut cache_ref = cache.borrow_mut();
1938        if let Some((margin_account, _)) = result {
1939            cache_ref
1940                .update_account(AccountAny::Margin(margin_account))
1941                .unwrap();
1942        }
1943    } else if account.is_none() {
1944        log::error!(
1945            "Cannot update position: no account registered for {}",
1946            event.account_id()
1947        );
1948    }
1949}
1950
1951fn update_account(
1952    cache: Rc<RefCell<Cache>>,
1953    inner: Rc<RefCell<PortfolioState>>,
1954    event: &AccountState,
1955) {
1956    let mut cache_ref = cache.borrow_mut();
1957
1958    if let Some(existing) = cache_ref.account(&event.account_id) {
1959        let mut account = existing.clone();
1960        account.apply(event.clone());
1961
1962        if let Err(e) = cache_ref.update_account(account.clone()) {
1963            log::error!("Failed to update account: {e}");
1964            return;
1965        }
1966    } else {
1967        let account = match AccountAny::from_events(vec![event.clone()]) {
1968            Ok(account) => account,
1969            Err(e) => {
1970                log::error!("Failed to create account: {e}");
1971                return;
1972            }
1973        };
1974
1975        if let Err(e) = cache_ref.add_account(account) {
1976            log::error!("Failed to add account: {e}");
1977            return;
1978        }
1979    }
1980
1981    // Throttled logging logic
1982    let mut inner_ref = inner.borrow_mut();
1983    let should_log = if inner_ref.min_account_state_logging_interval_ns > 0 {
1984        let current_ts = event.ts_init.as_u64();
1985        let last_ts = inner_ref
1986            .last_account_state_log_ts
1987            .get(&event.account_id)
1988            .copied()
1989            .unwrap_or(0);
1990
1991        if last_ts == 0 || (current_ts - last_ts) >= inner_ref.min_account_state_logging_interval_ns
1992        {
1993            inner_ref
1994                .last_account_state_log_ts
1995                .insert(event.account_id, current_ts);
1996            true
1997        } else {
1998            false
1999        }
2000    } else {
2001        true // Throttling disabled, always log
2002    };
2003
2004    if should_log {
2005        log::info!("Updated {event}");
2006    }
2007}