nautilus_portfolio/
portfolio.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `Portfolio` for all environments.
17
18use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_analysis::analyzer::PortfolioAnalyzer;
22use nautilus_common::{
23    cache::Cache,
24    clock::Clock,
25    msgbus::{
26        self,
27        handler::{ShareableMessageHandler, TypedMessageHandler},
28    },
29};
30use nautilus_core::{WeakCell, datetime::NANOSECONDS_IN_MILLISECOND};
31use nautilus_model::{
32    accounts::AccountAny,
33    data::{Bar, MarkPriceUpdate, QuoteTick},
34    enums::{OmsType, OrderSide, OrderType, PositionSide, PriceType},
35    events::{AccountState, OrderEventAny, position::PositionEvent},
36    identifiers::{AccountId, InstrumentId, PositionId, Venue},
37    instruments::{Instrument, InstrumentAny},
38    orders::{Order, OrderAny},
39    position::Position,
40    types::{Currency, Money, Price},
41};
42use rust_decimal::{Decimal, prelude::FromPrimitive};
43
44use crate::{config::PortfolioConfig, manager::AccountsManager};
45
46struct PortfolioState {
47    accounts: AccountsManager,
48    analyzer: PortfolioAnalyzer,
49    unrealized_pnls: AHashMap<InstrumentId, Money>,
50    realized_pnls: AHashMap<InstrumentId, Money>,
51    snapshot_sum_per_position: AHashMap<PositionId, Money>,
52    snapshot_last_per_position: AHashMap<PositionId, Money>,
53    snapshot_processed_counts: AHashMap<PositionId, usize>,
54    net_positions: AHashMap<InstrumentId, Decimal>,
55    pending_calcs: AHashSet<InstrumentId>,
56    bar_close_prices: AHashMap<InstrumentId, Price>,
57    initialized: bool,
58    last_account_state_log_ts: AHashMap<AccountId, u64>,
59    min_account_state_logging_interval_ns: u64,
60}
61
62impl PortfolioState {
63    fn new(
64        clock: Rc<RefCell<dyn Clock>>,
65        cache: Rc<RefCell<Cache>>,
66        config: &PortfolioConfig,
67    ) -> Self {
68        let min_account_state_logging_interval_ns = config
69            .min_account_state_logging_interval_ms
70            .map_or(0, |ms| ms * NANOSECONDS_IN_MILLISECOND);
71
72        Self {
73            accounts: AccountsManager::new(clock, cache),
74            analyzer: PortfolioAnalyzer::default(),
75            unrealized_pnls: AHashMap::new(),
76            realized_pnls: AHashMap::new(),
77            snapshot_sum_per_position: AHashMap::new(),
78            snapshot_last_per_position: AHashMap::new(),
79            snapshot_processed_counts: AHashMap::new(),
80            net_positions: AHashMap::new(),
81            pending_calcs: AHashSet::new(),
82            bar_close_prices: AHashMap::new(),
83            initialized: false,
84            last_account_state_log_ts: AHashMap::new(),
85            min_account_state_logging_interval_ns,
86        }
87    }
88
89    fn reset(&mut self) {
90        log::debug!("RESETTING");
91        self.net_positions.clear();
92        self.unrealized_pnls.clear();
93        self.realized_pnls.clear();
94        self.snapshot_sum_per_position.clear();
95        self.snapshot_last_per_position.clear();
96        self.snapshot_processed_counts.clear();
97        self.pending_calcs.clear();
98        self.last_account_state_log_ts.clear();
99        self.analyzer.reset();
100        log::debug!("READY");
101    }
102}
103
104pub struct Portfolio {
105    pub(crate) clock: Rc<RefCell<dyn Clock>>,
106    pub(crate) cache: Rc<RefCell<Cache>>,
107    inner: Rc<RefCell<PortfolioState>>,
108    config: PortfolioConfig,
109}
110
111impl Debug for Portfolio {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct(stringify!(Portfolio)).finish()
114    }
115}
116
117impl Portfolio {
118    pub fn new(
119        cache: Rc<RefCell<Cache>>,
120        clock: Rc<RefCell<dyn Clock>>,
121        config: Option<PortfolioConfig>,
122    ) -> Self {
123        let config = config.unwrap_or_default();
124        let inner = Rc::new(RefCell::new(PortfolioState::new(
125            clock.clone(),
126            cache.clone(),
127            &config,
128        )));
129
130        Self::register_message_handlers(
131            cache.clone(),
132            clock.clone(),
133            inner.clone(),
134            config.clone(),
135        );
136
137        Self {
138            clock,
139            cache,
140            inner,
141            config,
142        }
143    }
144
145    fn register_message_handlers(
146        cache: Rc<RefCell<Cache>>,
147        clock: Rc<RefCell<dyn Clock>>,
148        inner: Rc<RefCell<PortfolioState>>,
149        config: PortfolioConfig,
150    ) {
151        let inner_weak = WeakCell::from(Rc::downgrade(&inner));
152
153        let update_account_handler = {
154            let cache = cache.clone();
155            let inner = inner_weak.clone();
156            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
157                move |event: &AccountState| {
158                    if let Some(inner_rc) = inner.upgrade() {
159                        update_account(cache.clone(), inner_rc.into(), event);
160                    }
161                },
162            )))
163        };
164
165        let update_position_handler = {
166            let cache = cache.clone();
167            let clock = clock.clone();
168            let inner = inner_weak.clone();
169            let config = config.clone();
170            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
171                move |event: &PositionEvent| {
172                    if let Some(inner_rc) = inner.upgrade() {
173                        update_position(
174                            cache.clone(),
175                            clock.clone(),
176                            inner_rc.into(),
177                            config.clone(),
178                            event,
179                        );
180                    }
181                },
182            )))
183        };
184
185        let update_quote_handler = {
186            let cache = cache.clone();
187            let clock = clock.clone();
188            let inner = inner_weak.clone();
189            let config = config.clone();
190            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
191                move |quote: &QuoteTick| {
192                    if let Some(inner_rc) = inner.upgrade() {
193                        update_quote_tick(
194                            cache.clone(),
195                            clock.clone(),
196                            inner_rc.into(),
197                            config.clone(),
198                            quote,
199                        );
200                    }
201                },
202            )))
203        };
204
205        let update_bar_handler = {
206            let cache = cache.clone();
207            let clock = clock.clone();
208            let inner = inner_weak.clone();
209            let config = config.clone();
210            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
211                if let Some(inner_rc) = inner.upgrade() {
212                    update_bar(
213                        cache.clone(),
214                        clock.clone(),
215                        inner_rc.into(),
216                        config.clone(),
217                        bar,
218                    );
219                }
220            })))
221        };
222
223        let update_mark_price_handler = {
224            let cache = cache.clone();
225            let clock = clock.clone();
226            let inner = inner_weak.clone();
227            let config = config.clone();
228            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
229                move |mark_price: &MarkPriceUpdate| {
230                    if let Some(inner_rc) = inner.upgrade() {
231                        update_instrument_id(
232                            cache.clone(),
233                            clock.clone(),
234                            inner_rc.into(),
235                            config.clone(),
236                            &mark_price.instrument_id,
237                        );
238                    }
239                },
240            )))
241        };
242
243        let update_order_handler = {
244            let cache = cache;
245            let clock = clock.clone();
246            let inner = inner_weak;
247            let config = config.clone();
248            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
249                move |event: &OrderEventAny| {
250                    if let Some(inner_rc) = inner.upgrade() {
251                        update_order(
252                            cache.clone(),
253                            clock.clone(),
254                            inner_rc.into(),
255                            config.clone(),
256                            event,
257                        );
258                    }
259                },
260            )))
261        };
262
263        msgbus::register(
264            "Portfolio.update_account".into(),
265            update_account_handler.clone(),
266        );
267
268        msgbus::subscribe("data.quotes.*".into(), update_quote_handler, Some(10));
269        if config.bar_updates {
270            msgbus::subscribe("data.bars.*EXTERNAL".into(), update_bar_handler, Some(10));
271        }
272        if config.use_mark_prices {
273            msgbus::subscribe(
274                "data.mark_prices.*".into(),
275                update_mark_price_handler,
276                Some(10),
277            );
278        }
279        msgbus::subscribe("events.order.*".into(), update_order_handler, Some(10));
280        msgbus::subscribe(
281            "events.position.*".into(),
282            update_position_handler,
283            Some(10),
284        );
285        msgbus::subscribe("events.account.*".into(), update_account_handler, Some(10));
286    }
287
288    pub fn reset(&mut self) {
289        log::debug!("RESETTING");
290        self.inner.borrow_mut().reset();
291        log::debug!("READY");
292    }
293
294    // -- QUERIES ---------------------------------------------------------------------------------
295
296    /// Returns `true` if the portfolio has been initialized.
297    #[must_use]
298    pub fn is_initialized(&self) -> bool {
299        self.inner.borrow().initialized
300    }
301
302    /// Returns the locked balances for the given venue.
303    ///
304    /// Locked balances represent funds reserved for open orders.
305    #[must_use]
306    pub fn balances_locked(&self, venue: &Venue) -> AHashMap<Currency, Money> {
307        self.cache.borrow().account_for_venue(venue).map_or_else(
308            || {
309                log::error!("Cannot get balances locked: no account generated for {venue}");
310                AHashMap::new()
311            },
312            AccountAny::balances_locked,
313        )
314    }
315
316    /// Returns the initial margin requirements for the given venue.
317    ///
318    /// Only applicable for margin accounts. Returns empty map for cash accounts.
319    #[must_use]
320    pub fn margins_init(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
321        self.cache.borrow().account_for_venue(venue).map_or_else(
322            || {
323                log::error!(
324                    "Cannot get initial (order) margins: no account registered for {venue}"
325                );
326                AHashMap::new()
327            },
328            |account| match account {
329                AccountAny::Margin(margin_account) => margin_account.initial_margins(),
330                AccountAny::Cash(_) => {
331                    log::warn!("Initial margins not applicable for cash account");
332                    AHashMap::new()
333                }
334            },
335        )
336    }
337
338    /// Returns the maintenance margin requirements for the given venue.
339    ///
340    /// Only applicable for margin accounts. Returns empty map for cash accounts.
341    #[must_use]
342    pub fn margins_maint(&self, venue: &Venue) -> AHashMap<InstrumentId, Money> {
343        self.cache.borrow().account_for_venue(venue).map_or_else(
344            || {
345                log::error!(
346                    "Cannot get maintenance (position) margins: no account registered for {venue}"
347                );
348                AHashMap::new()
349            },
350            |account| match account {
351                AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
352                AccountAny::Cash(_) => {
353                    log::warn!("Maintenance margins not applicable for cash account");
354                    AHashMap::new()
355                }
356            },
357        )
358    }
359
360    /// Returns the unrealized PnLs for all positions at the given venue.
361    ///
362    /// Calculates mark-to-market PnL based on current market prices.
363    #[must_use]
364    pub fn unrealized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
365        let instrument_ids = {
366            let cache = self.cache.borrow();
367            let positions = cache.positions(Some(venue), None, None, None);
368
369            if positions.is_empty() {
370                return AHashMap::new(); // Nothing to calculate
371            }
372
373            let instrument_ids: AHashSet<InstrumentId> =
374                positions.iter().map(|p| p.instrument_id).collect();
375
376            instrument_ids
377        };
378
379        let mut unrealized_pnls: AHashMap<Currency, f64> = AHashMap::new();
380
381        for instrument_id in instrument_ids {
382            if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
383                // PnL already calculated
384                *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
385                continue;
386            }
387
388            // Calculate PnL
389            match self.calculate_unrealized_pnl(&instrument_id) {
390                Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
391                None => continue,
392            }
393        }
394
395        unrealized_pnls
396            .into_iter()
397            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
398            .collect()
399    }
400
401    /// Returns the realized PnLs for all positions at the given venue.
402    ///
403    /// Calculates total realized profit and loss from closed positions.
404    #[must_use]
405    pub fn realized_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
406        let instrument_ids = {
407            let cache = self.cache.borrow();
408            let positions = cache.positions(Some(venue), None, None, None);
409
410            if positions.is_empty() {
411                return AHashMap::new(); // Nothing to calculate
412            }
413
414            let instrument_ids: AHashSet<InstrumentId> =
415                positions.iter().map(|p| p.instrument_id).collect();
416
417            instrument_ids
418        };
419
420        let mut realized_pnls: AHashMap<Currency, f64> = AHashMap::new();
421
422        for instrument_id in instrument_ids {
423            if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
424                // PnL already calculated
425                *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
426                continue;
427            }
428
429            // Calculate PnL
430            match self.calculate_realized_pnl(&instrument_id) {
431                Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
432                None => continue,
433            }
434        }
435
436        realized_pnls
437            .into_iter()
438            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
439            .collect()
440    }
441
442    #[must_use]
443    pub fn net_exposures(&self, venue: &Venue) -> Option<AHashMap<Currency, Money>> {
444        let cache = self.cache.borrow();
445        let account = if let Some(account) = cache.account_for_venue(venue) {
446            account
447        } else {
448            log::error!("Cannot calculate net exposures: no account registered for {venue}");
449            return None; // Cannot calculate
450        };
451
452        let positions_open = cache.positions_open(Some(venue), None, None, None);
453        if positions_open.is_empty() {
454            return Some(AHashMap::new()); // Nothing to calculate
455        }
456
457        let mut net_exposures: AHashMap<Currency, f64> = AHashMap::new();
458
459        for position in positions_open {
460            let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
461                instrument
462            } else {
463                log::error!(
464                    "Cannot calculate net exposures: no instrument for {}",
465                    position.instrument_id
466                );
467                return None; // Cannot calculate
468            };
469
470            if position.side == PositionSide::Flat {
471                log::error!(
472                    "Cannot calculate net exposures: position is flat for {}",
473                    position.instrument_id
474                );
475                continue; // Nothing to calculate
476            }
477
478            let price = self.get_price(position)?;
479            let xrate = if let Some(xrate) =
480                self.calculate_xrate_to_base(instrument, account, position.entry)
481            {
482                xrate
483            } else {
484                log::error!(
485                    // TODO: Improve logging
486                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
487                    instrument.settlement_currency(),
488                    account.base_currency()
489                );
490                return None; // Cannot calculate
491            };
492
493            let settlement_currency = account
494                .base_currency()
495                .unwrap_or_else(|| instrument.settlement_currency());
496
497            let net_exposure = instrument
498                .calculate_notional_value(position.quantity, price, None)
499                .as_f64()
500                * xrate;
501
502            let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
503                .round()
504                / 10f64.powi(settlement_currency.precision.into());
505
506            *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
507        }
508
509        Some(
510            net_exposures
511                .into_iter()
512                .map(|(currency, amount)| (currency, Money::new(amount, currency)))
513                .collect(),
514        )
515    }
516
517    #[must_use]
518    pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
519        if let Some(pnl) = self
520            .inner
521            .borrow()
522            .unrealized_pnls
523            .get(instrument_id)
524            .copied()
525        {
526            return Some(pnl);
527        }
528
529        let pnl = self.calculate_unrealized_pnl(instrument_id)?;
530        self.inner
531            .borrow_mut()
532            .unrealized_pnls
533            .insert(*instrument_id, pnl);
534        Some(pnl)
535    }
536
537    #[must_use]
538    pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
539        if let Some(pnl) = self
540            .inner
541            .borrow()
542            .realized_pnls
543            .get(instrument_id)
544            .copied()
545        {
546            return Some(pnl);
547        }
548
549        let pnl = self.calculate_realized_pnl(instrument_id)?;
550        self.inner
551            .borrow_mut()
552            .realized_pnls
553            .insert(*instrument_id, pnl);
554        Some(pnl)
555    }
556
557    /// Returns the total PnL for the given instrument ID.
558    ///
559    /// Total PnL = Realized PnL + Unrealized PnL
560    #[must_use]
561    pub fn total_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
562        let realized = self.realized_pnl(instrument_id)?;
563        let unrealized = self.unrealized_pnl(instrument_id)?;
564
565        if realized.currency != unrealized.currency {
566            log::error!(
567                "Cannot calculate total PnL: currency mismatch {} vs {}",
568                realized.currency,
569                unrealized.currency
570            );
571            return None;
572        }
573
574        Some(Money::new(
575            realized.as_f64() + unrealized.as_f64(),
576            realized.currency,
577        ))
578    }
579
580    /// Returns the total PnLs for the given venue.
581    ///
582    /// Total PnL = Realized PnL + Unrealized PnL for each currency
583    #[must_use]
584    pub fn total_pnls(&mut self, venue: &Venue) -> AHashMap<Currency, Money> {
585        let realized_pnls = self.realized_pnls(venue);
586        let unrealized_pnls = self.unrealized_pnls(venue);
587
588        let mut total_pnls: AHashMap<Currency, Money> = AHashMap::new();
589
590        // Add realized PnLs
591        for (currency, realized) in realized_pnls {
592            total_pnls.insert(currency, realized);
593        }
594
595        // Add unrealized PnLs
596        for (currency, unrealized) in unrealized_pnls {
597            match total_pnls.get_mut(&currency) {
598                Some(total) => {
599                    *total = Money::new(total.as_f64() + unrealized.as_f64(), currency);
600                }
601                None => {
602                    total_pnls.insert(currency, unrealized);
603                }
604            }
605        }
606
607        total_pnls
608    }
609
610    #[must_use]
611    pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
612        let cache = self.cache.borrow();
613        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
614            account
615        } else {
616            log::error!(
617                "Cannot calculate net exposure: no account registered for {}",
618                instrument_id.venue
619            );
620            return None;
621        };
622
623        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
624            instrument
625        } else {
626            log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
627            return None;
628        };
629
630        let positions_open = cache.positions_open(
631            None, // Faster query filtering
632            Some(instrument_id),
633            None,
634            None,
635        );
636
637        if positions_open.is_empty() {
638            return Some(Money::new(0.0, instrument.settlement_currency()));
639        }
640
641        let mut net_exposure = 0.0;
642
643        for position in positions_open {
644            let price = self.get_price(position)?;
645            let xrate = if let Some(xrate) =
646                self.calculate_xrate_to_base(instrument, account, position.entry)
647            {
648                xrate
649            } else {
650                log::error!(
651                    // TODO: Improve logging
652                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
653                    instrument.settlement_currency(),
654                    account.base_currency()
655                );
656                return None; // Cannot calculate
657            };
658
659            let notional_value =
660                instrument.calculate_notional_value(position.quantity, price, None);
661            net_exposure += notional_value.as_f64() * xrate;
662        }
663
664        let settlement_currency = account
665            .base_currency()
666            .unwrap_or_else(|| instrument.settlement_currency());
667
668        Some(Money::new(net_exposure, settlement_currency))
669    }
670
671    #[must_use]
672    pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
673        self.inner
674            .borrow()
675            .net_positions
676            .get(instrument_id)
677            .copied()
678            .unwrap_or(Decimal::ZERO)
679    }
680
681    #[must_use]
682    pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
683        self.inner
684            .borrow()
685            .net_positions
686            .get(instrument_id)
687            .copied()
688            .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
689    }
690
691    #[must_use]
692    pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
693        self.inner
694            .borrow()
695            .net_positions
696            .get(instrument_id)
697            .copied()
698            .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
699    }
700
701    #[must_use]
702    pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
703        self.inner
704            .borrow()
705            .net_positions
706            .get(instrument_id)
707            .copied()
708            .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
709    }
710
711    #[must_use]
712    pub fn is_completely_flat(&self) -> bool {
713        for net_position in self.inner.borrow().net_positions.values() {
714            if *net_position != Decimal::ZERO {
715                return false;
716            }
717        }
718        true
719    }
720
721    // -- COMMANDS --------------------------------------------------------------------------------
722
723    /// Initializes account margin based on existing open orders.
724    ///
725    /// # Panics
726    ///
727    /// Panics if updating the cache with a mutated account fails.
728    pub fn initialize_orders(&mut self) {
729        let mut initialized = true;
730        let orders_and_instruments = {
731            let cache = self.cache.borrow();
732            let all_orders_open = cache.orders_open(None, None, None, None);
733
734            let mut instruments_with_orders = Vec::new();
735            let mut instruments = AHashSet::new();
736
737            for order in &all_orders_open {
738                instruments.insert(order.instrument_id());
739            }
740
741            for instrument_id in instruments {
742                if let Some(instrument) = cache.instrument(&instrument_id) {
743                    let orders = cache
744                        .orders_open(None, Some(&instrument_id), None, None)
745                        .into_iter()
746                        .cloned()
747                        .collect::<Vec<OrderAny>>();
748                    instruments_with_orders.push((instrument.clone(), orders));
749                } else {
750                    log::error!(
751                        "Cannot update initial (order) margin: no instrument found for {instrument_id}"
752                    );
753                    initialized = false;
754                    break;
755                }
756            }
757            instruments_with_orders
758        };
759
760        for (instrument, orders_open) in &orders_and_instruments {
761            let mut cache = self.cache.borrow_mut();
762            let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
763                account
764            } else {
765                log::error!(
766                    "Cannot update initial (order) margin: no account registered for {}",
767                    instrument.id().venue
768                );
769                initialized = false;
770                break;
771            };
772
773            let result = self.inner.borrow_mut().accounts.update_orders(
774                account,
775                instrument.clone(),
776                orders_open.iter().collect(),
777                self.clock.borrow().timestamp_ns(),
778            );
779
780            match result {
781                Some((updated_account, _)) => {
782                    cache.update_account(updated_account).unwrap();
783                }
784                None => {
785                    initialized = false;
786                }
787            }
788        }
789
790        let total_orders = orders_and_instruments
791            .into_iter()
792            .map(|(_, orders)| orders.len())
793            .sum::<usize>();
794
795        log::info!(
796            "Initialized {} open order{}",
797            total_orders,
798            if total_orders == 1 { "" } else { "s" }
799        );
800
801        self.inner.borrow_mut().initialized = initialized;
802    }
803
804    /// Initializes account margin based on existing open positions.
805    ///
806    /// # Panics
807    ///
808    /// Panics if calculation of PnL or updating the cache with a mutated account fails.
809    pub fn initialize_positions(&mut self) {
810        self.inner.borrow_mut().unrealized_pnls.clear();
811        self.inner.borrow_mut().realized_pnls.clear();
812        let all_positions_open: Vec<Position>;
813        let mut instruments = AHashSet::new();
814        {
815            let cache = self.cache.borrow();
816            all_positions_open = cache
817                .positions_open(None, None, None, None)
818                .into_iter()
819                .cloned()
820                .collect();
821            for position in &all_positions_open {
822                instruments.insert(position.instrument_id);
823            }
824        }
825
826        let mut initialized = true;
827
828        for instrument_id in instruments {
829            let positions_open: Vec<Position> = {
830                let cache = self.cache.borrow();
831                cache
832                    .positions_open(None, Some(&instrument_id), None, None)
833                    .into_iter()
834                    .cloned()
835                    .collect()
836            };
837
838            self.update_net_position(&instrument_id, positions_open);
839
840            if let Some(calculated_unrealized_pnl) = self.calculate_unrealized_pnl(&instrument_id) {
841                self.inner
842                    .borrow_mut()
843                    .unrealized_pnls
844                    .insert(instrument_id, calculated_unrealized_pnl);
845            } else {
846                log::warn!(
847                    "Failed to calculate unrealized PnL for {instrument_id}, marking as pending"
848                );
849                self.inner.borrow_mut().pending_calcs.insert(instrument_id);
850            }
851
852            if let Some(calculated_realized_pnl) = self.calculate_realized_pnl(&instrument_id) {
853                self.inner
854                    .borrow_mut()
855                    .realized_pnls
856                    .insert(instrument_id, calculated_realized_pnl);
857            } else {
858                log::warn!(
859                    "Failed to calculate realized PnL for {instrument_id}, marking as pending"
860                );
861                self.inner.borrow_mut().pending_calcs.insert(instrument_id);
862            }
863
864            let cache = self.cache.borrow();
865            let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
866                account
867            } else {
868                log::error!(
869                    "Cannot update maintenance (position) margin: no account registered for {}",
870                    instrument_id.venue
871                );
872                initialized = false;
873                break;
874            };
875
876            let account = match account {
877                AccountAny::Cash(_) => continue,
878                AccountAny::Margin(margin_account) => margin_account,
879            };
880
881            let mut cache = self.cache.borrow_mut();
882            let instrument = if let Some(instrument) = cache.instrument(&instrument_id) {
883                instrument
884            } else {
885                log::error!(
886                    "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
887                );
888                initialized = false;
889                break;
890            };
891
892            let result = self.inner.borrow_mut().accounts.update_positions(
893                account,
894                instrument.clone(),
895                self.cache
896                    .borrow()
897                    .positions_open(None, Some(&instrument_id), None, None),
898                self.clock.borrow().timestamp_ns(),
899            );
900
901            match result {
902                Some((updated_account, _)) => {
903                    cache
904                        .update_account(AccountAny::Margin(updated_account))
905                        .unwrap();
906                }
907                None => {
908                    initialized = false;
909                }
910            }
911        }
912
913        let open_count = all_positions_open.len();
914        self.inner.borrow_mut().initialized = initialized;
915        log::info!(
916            "Initialized {} open position{}",
917            open_count,
918            if open_count == 1 { "" } else { "s" }
919        );
920    }
921
922    /// Updates portfolio calculations based on a new quote tick.
923    ///
924    /// Recalculates unrealized PnL for positions affected by the quote update.
925    pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
926        update_quote_tick(
927            self.cache.clone(),
928            self.clock.clone(),
929            self.inner.clone(),
930            self.config.clone(),
931            quote,
932        );
933    }
934
935    /// Updates portfolio calculations based on a new bar.
936    ///
937    /// Updates cached bar close prices and recalculates unrealized PnL.
938    pub fn update_bar(&mut self, bar: &Bar) {
939        update_bar(
940            self.cache.clone(),
941            self.clock.clone(),
942            self.inner.clone(),
943            self.config.clone(),
944            bar,
945        );
946    }
947
948    /// Updates portfolio with a new account state event.
949    pub fn update_account(&mut self, event: &AccountState) {
950        update_account(self.cache.clone(), self.inner.clone(), event);
951    }
952
953    /// Updates portfolio calculations based on an order event.
954    ///
955    /// Handles balance updates for order fills and margin calculations for order changes.
956    pub fn update_order(&mut self, event: &OrderEventAny) {
957        update_order(
958            self.cache.clone(),
959            self.clock.clone(),
960            self.inner.clone(),
961            self.config.clone(),
962            event,
963        );
964    }
965
966    /// Updates portfolio calculations based on a position event.
967    ///
968    /// Recalculates net positions, unrealized PnL, and margin requirements.
969    pub fn update_position(&mut self, event: &PositionEvent) {
970        update_position(
971            self.cache.clone(),
972            self.clock.clone(),
973            self.inner.clone(),
974            self.config.clone(),
975            event,
976        );
977    }
978
979    // -- INTERNAL --------------------------------------------------------------------------------
980
981    fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
982        let mut net_position = Decimal::ZERO;
983
984        for open_position in positions_open {
985            log::debug!("open_position: {open_position}");
986            net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
987        }
988
989        let existing_position = self.net_position(instrument_id);
990        if existing_position != net_position {
991            self.inner
992                .borrow_mut()
993                .net_positions
994                .insert(*instrument_id, net_position);
995            log::info!("{instrument_id} net_position={net_position}");
996        }
997    }
998
999    fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1000        let cache = self.cache.borrow();
1001        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1002            account
1003        } else {
1004            log::error!(
1005                "Cannot calculate unrealized PnL: no account registered for {}",
1006                instrument_id.venue
1007            );
1008            return None;
1009        };
1010
1011        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1012            instrument
1013        } else {
1014            log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
1015            return None;
1016        };
1017
1018        let currency = account
1019            .base_currency()
1020            .unwrap_or_else(|| instrument.settlement_currency());
1021
1022        let positions_open = cache.positions_open(
1023            None, // Faster query filtering
1024            Some(instrument_id),
1025            None,
1026            None,
1027        );
1028
1029        if positions_open.is_empty() {
1030            return Some(Money::new(0.0, currency));
1031        }
1032
1033        let mut total_pnl = 0.0;
1034
1035        for position in positions_open {
1036            if position.instrument_id != *instrument_id {
1037                continue; // Nothing to calculate
1038            }
1039
1040            if position.side == PositionSide::Flat {
1041                continue; // Nothing to calculate
1042            }
1043
1044            let price = if let Some(price) = self.get_price(position) {
1045                price
1046            } else {
1047                log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
1048                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1049                return None; // Cannot calculate
1050            };
1051
1052            let mut pnl = position.unrealized_pnl(price).as_f64();
1053
1054            if let Some(base_currency) = account.base_currency() {
1055                let xrate = if let Some(xrate) =
1056                    self.calculate_xrate_to_base(instrument, account, position.entry)
1057                {
1058                    xrate
1059                } else {
1060                    log::error!(
1061                        // TODO: Improve logging
1062                        "Cannot calculate unrealized PnL: insufficient data for {}/{}",
1063                        instrument.settlement_currency(),
1064                        base_currency
1065                    );
1066                    self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1067                    return None; // Cannot calculate
1068                };
1069
1070                let scale = 10f64.powi(currency.precision.into());
1071                pnl = ((pnl * xrate) * scale).round() / scale;
1072            }
1073
1074            total_pnl += pnl;
1075        }
1076
1077        Some(Money::new(total_pnl, currency))
1078    }
1079
1080    fn ensure_snapshot_pnls_cached_for(&mut self, instrument_id: &InstrumentId) {
1081        // Performance: This method maintains an incremental cache of snapshot PnLs
1082        // It only deserializes new snapshots that haven't been processed yet
1083        // Tracks sum and last PnL per position for efficient NETTING OMS support
1084
1085        // Get all position IDs that have snapshots for this instrument
1086        let snapshot_position_ids = self.cache.borrow().position_snapshot_ids(instrument_id);
1087
1088        if snapshot_position_ids.is_empty() {
1089            return; // Nothing to process
1090        }
1091
1092        let mut rebuild = false;
1093
1094        // Detect purge/reset (count regression) to trigger full rebuild
1095        for position_id in &snapshot_position_ids {
1096            let position_snapshots = self.cache.borrow().position_snapshot_bytes(position_id);
1097            let curr_count = position_snapshots.map_or(0, |s| {
1098                // Count the number of snapshots (they're serialized as JSON objects)
1099                s.split(|&b| b == b'{').count() - 1
1100            });
1101            let prev_count = self
1102                .inner
1103                .borrow()
1104                .snapshot_processed_counts
1105                .get(position_id)
1106                .copied()
1107                .unwrap_or(0);
1108
1109            if prev_count > curr_count {
1110                rebuild = true;
1111                break;
1112            }
1113        }
1114
1115        if rebuild {
1116            // Full rebuild: process all snapshots from scratch
1117            for position_id in &snapshot_position_ids {
1118                if let Some(position_snapshots) =
1119                    self.cache.borrow().position_snapshot_bytes(position_id)
1120                {
1121                    let mut sum_pnl: Option<Money> = None;
1122                    let mut last_pnl: Option<Money> = None;
1123
1124                    // Snapshots are concatenated JSON objects
1125                    let mut start = 0;
1126                    let mut depth = 0;
1127                    let mut in_string = false;
1128                    let mut escape_next = false;
1129
1130                    for (i, &byte) in position_snapshots.iter().enumerate() {
1131                        if escape_next {
1132                            escape_next = false;
1133                            continue;
1134                        }
1135
1136                        if byte == b'\\' && in_string {
1137                            escape_next = true;
1138                            continue;
1139                        }
1140
1141                        if byte == b'"' && !escape_next {
1142                            in_string = !in_string;
1143                        }
1144
1145                        if !in_string {
1146                            if byte == b'{' {
1147                                if depth == 0 {
1148                                    start = i;
1149                                }
1150                                depth += 1;
1151                            } else if byte == b'}' {
1152                                depth -= 1;
1153                                if depth == 0
1154                                    && let Ok(snapshot) = serde_json::from_slice::<Position>(
1155                                        &position_snapshots[start..=i],
1156                                    )
1157                                    && let Some(realized_pnl) = snapshot.realized_pnl
1158                                {
1159                                    if let Some(ref mut sum) = sum_pnl {
1160                                        if sum.currency == realized_pnl.currency {
1161                                            *sum = Money::new(
1162                                                sum.as_f64() + realized_pnl.as_f64(),
1163                                                sum.currency,
1164                                            );
1165                                        }
1166                                    } else {
1167                                        sum_pnl = Some(realized_pnl);
1168                                    }
1169                                    last_pnl = Some(realized_pnl);
1170                                }
1171                            }
1172                        }
1173                    }
1174
1175                    let mut inner = self.inner.borrow_mut();
1176                    if let Some(sum) = sum_pnl {
1177                        inner.snapshot_sum_per_position.insert(*position_id, sum);
1178                        if let Some(last) = last_pnl {
1179                            inner.snapshot_last_per_position.insert(*position_id, last);
1180                        }
1181                    } else {
1182                        inner.snapshot_sum_per_position.remove(position_id);
1183                        inner.snapshot_last_per_position.remove(position_id);
1184                    }
1185
1186                    let snapshot_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1187                    inner
1188                        .snapshot_processed_counts
1189                        .insert(*position_id, snapshot_count);
1190                }
1191            }
1192        } else {
1193            // Incremental path: only process new snapshots
1194            for position_id in &snapshot_position_ids {
1195                if let Some(position_snapshots) =
1196                    self.cache.borrow().position_snapshot_bytes(position_id)
1197                {
1198                    let curr_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1199                    let prev_count = self
1200                        .inner
1201                        .borrow()
1202                        .snapshot_processed_counts
1203                        .get(position_id)
1204                        .copied()
1205                        .unwrap_or(0);
1206
1207                    if prev_count >= curr_count {
1208                        continue;
1209                    }
1210
1211                    let mut sum_pnl = self
1212                        .inner
1213                        .borrow()
1214                        .snapshot_sum_per_position
1215                        .get(position_id)
1216                        .copied();
1217                    let mut last_pnl = self
1218                        .inner
1219                        .borrow()
1220                        .snapshot_last_per_position
1221                        .get(position_id)
1222                        .copied();
1223
1224                    // Process only new snapshots
1225                    let mut start = 0;
1226                    let mut depth = 0;
1227                    let mut in_string = false;
1228                    let mut escape_next = false;
1229                    let mut snapshot_index = 0;
1230
1231                    for (i, &byte) in position_snapshots.iter().enumerate() {
1232                        if escape_next {
1233                            escape_next = false;
1234                            continue;
1235                        }
1236
1237                        if byte == b'\\' && in_string {
1238                            escape_next = true;
1239                            continue;
1240                        }
1241
1242                        if byte == b'"' && !escape_next {
1243                            in_string = !in_string;
1244                        }
1245
1246                        if !in_string {
1247                            if byte == b'{' {
1248                                if depth == 0 {
1249                                    start = i;
1250                                }
1251                                depth += 1;
1252                            } else if byte == b'}' {
1253                                depth -= 1;
1254                                if depth == 0 {
1255                                    snapshot_index += 1;
1256                                    // Only process new snapshots
1257                                    if snapshot_index > prev_count
1258                                        && let Ok(snapshot) = serde_json::from_slice::<Position>(
1259                                            &position_snapshots[start..=i],
1260                                        )
1261                                        && let Some(realized_pnl) = snapshot.realized_pnl
1262                                    {
1263                                        if let Some(ref mut sum) = sum_pnl {
1264                                            if sum.currency == realized_pnl.currency {
1265                                                *sum = Money::new(
1266                                                    sum.as_f64() + realized_pnl.as_f64(),
1267                                                    sum.currency,
1268                                                );
1269                                            }
1270                                        } else {
1271                                            sum_pnl = Some(realized_pnl);
1272                                        }
1273                                        last_pnl = Some(realized_pnl);
1274                                    }
1275                                }
1276                            }
1277                        }
1278                    }
1279
1280                    let mut inner = self.inner.borrow_mut();
1281                    if let Some(sum) = sum_pnl {
1282                        inner.snapshot_sum_per_position.insert(*position_id, sum);
1283                        if let Some(last) = last_pnl {
1284                            inner.snapshot_last_per_position.insert(*position_id, last);
1285                        }
1286                    }
1287                    inner
1288                        .snapshot_processed_counts
1289                        .insert(*position_id, curr_count);
1290                }
1291            }
1292        }
1293    }
1294
1295    fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1296        // Ensure snapshot PnLs are cached for this instrument
1297        self.ensure_snapshot_pnls_cached_for(instrument_id);
1298
1299        let cache = self.cache.borrow();
1300        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1301            account
1302        } else {
1303            log::error!(
1304                "Cannot calculate realized PnL: no account registered for {}",
1305                instrument_id.venue
1306            );
1307            return None;
1308        };
1309
1310        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1311            instrument
1312        } else {
1313            log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
1314            return None;
1315        };
1316
1317        let currency = account
1318            .base_currency()
1319            .unwrap_or_else(|| instrument.settlement_currency());
1320
1321        let positions = cache.positions(
1322            None, // Faster query filtering
1323            Some(instrument_id),
1324            None,
1325            None,
1326        );
1327
1328        let snapshot_position_ids = cache.position_snapshot_ids(instrument_id);
1329
1330        // Check if we need to use NETTING OMS logic
1331        let is_netting = positions
1332            .iter()
1333            .any(|p| cache.oms_type(&p.id) == Some(OmsType::Netting));
1334
1335        let mut total_pnl = 0.0;
1336
1337        if is_netting && !snapshot_position_ids.is_empty() {
1338            // NETTING OMS: Apply 3-case rule for position cycles
1339
1340            for position_id in &snapshot_position_ids {
1341                let is_active = positions.iter().any(|p| p.id == *position_id);
1342
1343                if is_active {
1344                    // Case 1 & 2: Active position - use only the last snapshot PnL
1345                    let last_pnl = self
1346                        .inner
1347                        .borrow()
1348                        .snapshot_last_per_position
1349                        .get(position_id)
1350                        .copied();
1351                    if let Some(last_pnl) = last_pnl {
1352                        let mut pnl = last_pnl.as_f64();
1353
1354                        if let Some(base_currency) = account.base_currency()
1355                            && let Some(position) = positions.iter().find(|p| p.id == *position_id)
1356                        {
1357                            let xrate = if let Some(xrate) =
1358                                self.calculate_xrate_to_base(instrument, account, position.entry)
1359                            {
1360                                xrate
1361                            } else {
1362                                log::error!(
1363                                    "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1364                                    instrument.settlement_currency(),
1365                                    base_currency
1366                                );
1367                                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1368                                return Some(Money::new(0.0, currency));
1369                            };
1370
1371                            let scale = 10f64.powi(currency.precision.into());
1372                            pnl = ((pnl * xrate) * scale).round() / scale;
1373                        }
1374
1375                        total_pnl += pnl;
1376                    }
1377                } else {
1378                    // Case 3: Closed position - use sum of all snapshot PnLs
1379                    let sum_pnl = self
1380                        .inner
1381                        .borrow()
1382                        .snapshot_sum_per_position
1383                        .get(position_id)
1384                        .copied();
1385                    if let Some(sum_pnl) = sum_pnl {
1386                        let mut pnl = sum_pnl.as_f64();
1387
1388                        if let Some(base_currency) = account.base_currency() {
1389                            // For closed positions, we don't have entry price, use current rates
1390                            let xrate = cache.get_xrate(
1391                                instrument_id.venue,
1392                                instrument.settlement_currency(),
1393                                base_currency,
1394                                PriceType::Mid,
1395                            );
1396
1397                            if let Some(xrate) = xrate {
1398                                let scale = 10f64.powi(currency.precision.into());
1399                                pnl = ((pnl * xrate) * scale).round() / scale;
1400                            } else {
1401                                log::error!(
1402                                    "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1403                                    instrument.settlement_currency(),
1404                                    base_currency
1405                                );
1406                                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1407                                return Some(Money::new(0.0, currency));
1408                            }
1409                        }
1410
1411                        total_pnl += pnl;
1412                    }
1413                }
1414            }
1415
1416            // Add realized PnL from current active positions
1417            for position in positions {
1418                if position.instrument_id != *instrument_id {
1419                    continue;
1420                }
1421
1422                if let Some(realized_pnl) = position.realized_pnl {
1423                    let mut pnl = realized_pnl.as_f64();
1424
1425                    if let Some(base_currency) = account.base_currency() {
1426                        let xrate = if let Some(xrate) =
1427                            self.calculate_xrate_to_base(instrument, account, position.entry)
1428                        {
1429                            xrate
1430                        } else {
1431                            log::error!(
1432                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1433                                instrument.settlement_currency(),
1434                                base_currency
1435                            );
1436                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1437                            return Some(Money::new(0.0, currency));
1438                        };
1439
1440                        let scale = 10f64.powi(currency.precision.into());
1441                        pnl = ((pnl * xrate) * scale).round() / scale;
1442                    }
1443
1444                    total_pnl += pnl;
1445                }
1446            }
1447        } else {
1448            // HEDGING OMS or no snapshots: Simple aggregation
1449            // Add snapshot PnLs (sum all)
1450            for position_id in &snapshot_position_ids {
1451                let sum_pnl = self
1452                    .inner
1453                    .borrow()
1454                    .snapshot_sum_per_position
1455                    .get(position_id)
1456                    .copied();
1457                if let Some(sum_pnl) = sum_pnl {
1458                    let mut pnl = sum_pnl.as_f64();
1459
1460                    if let Some(base_currency) = account.base_currency() {
1461                        let xrate = cache.get_xrate(
1462                            instrument_id.venue,
1463                            instrument.settlement_currency(),
1464                            base_currency,
1465                            PriceType::Mid,
1466                        );
1467
1468                        if let Some(xrate) = xrate {
1469                            let scale = 10f64.powi(currency.precision.into());
1470                            pnl = ((pnl * xrate) * scale).round() / scale;
1471                        } else {
1472                            log::error!(
1473                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1474                                instrument.settlement_currency(),
1475                                base_currency
1476                            );
1477                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1478                            return Some(Money::new(0.0, currency));
1479                        }
1480                    }
1481
1482                    total_pnl += pnl;
1483                }
1484            }
1485
1486            // Add realized PnL from current positions
1487            for position in positions {
1488                if position.instrument_id != *instrument_id {
1489                    continue;
1490                }
1491
1492                if let Some(realized_pnl) = position.realized_pnl {
1493                    let mut pnl = realized_pnl.as_f64();
1494
1495                    if let Some(base_currency) = account.base_currency() {
1496                        let xrate = if let Some(xrate) =
1497                            self.calculate_xrate_to_base(instrument, account, position.entry)
1498                        {
1499                            xrate
1500                        } else {
1501                            log::error!(
1502                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1503                                instrument.settlement_currency(),
1504                                base_currency
1505                            );
1506                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1507                            return Some(Money::new(0.0, currency));
1508                        };
1509
1510                        let scale = 10f64.powi(currency.precision.into());
1511                        pnl = ((pnl * xrate) * scale).round() / scale;
1512                    }
1513
1514                    total_pnl += pnl;
1515                }
1516            }
1517        }
1518
1519        Some(Money::new(total_pnl, currency))
1520    }
1521
1522    fn get_price(&self, position: &Position) -> Option<Price> {
1523        let cache = self.cache.borrow();
1524        let instrument_id = &position.instrument_id;
1525
1526        // Check for mark price first if configured
1527        if self.config.use_mark_prices
1528            && let Some(mark_price) = cache.mark_price(instrument_id)
1529        {
1530            return Some(mark_price.value);
1531        }
1532
1533        // Fall back to bid/ask based on position side
1534        let price_type = match position.side {
1535            PositionSide::Long => PriceType::Bid,
1536            PositionSide::Short => PriceType::Ask,
1537            _ => panic!("invalid `PositionSide`, was {}", position.side),
1538        };
1539
1540        cache
1541            .price(instrument_id, price_type)
1542            .or_else(|| cache.price(instrument_id, PriceType::Last))
1543            .or_else(|| {
1544                self.inner
1545                    .borrow()
1546                    .bar_close_prices
1547                    .get(instrument_id)
1548                    .copied()
1549            })
1550    }
1551
1552    fn calculate_xrate_to_base(
1553        &self,
1554        instrument: &InstrumentAny,
1555        account: &AccountAny,
1556        side: OrderSide,
1557    ) -> Option<f64> {
1558        if !self.config.convert_to_account_base_currency {
1559            return Some(1.0); // No conversion needed
1560        }
1561
1562        match account.base_currency() {
1563            None => Some(1.0), // No conversion needed
1564            Some(base_currency) => {
1565                let cache = self.cache.borrow();
1566
1567                if self.config.use_mark_xrates {
1568                    return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1569                }
1570
1571                let price_type = if side == OrderSide::Buy {
1572                    PriceType::Bid
1573                } else {
1574                    PriceType::Ask
1575                };
1576
1577                cache.get_xrate(
1578                    instrument.id().venue,
1579                    instrument.settlement_currency(),
1580                    base_currency,
1581                    price_type,
1582                )
1583            }
1584        }
1585    }
1586}
1587
1588// Helper functions
1589fn update_quote_tick(
1590    cache: Rc<RefCell<Cache>>,
1591    clock: Rc<RefCell<dyn Clock>>,
1592    inner: Rc<RefCell<PortfolioState>>,
1593    config: PortfolioConfig,
1594    quote: &QuoteTick,
1595) {
1596    update_instrument_id(cache, clock.clone(), inner, config, &quote.instrument_id);
1597}
1598
1599fn update_bar(
1600    cache: Rc<RefCell<Cache>>,
1601    clock: Rc<RefCell<dyn Clock>>,
1602    inner: Rc<RefCell<PortfolioState>>,
1603    config: PortfolioConfig,
1604    bar: &Bar,
1605) {
1606    let instrument_id = bar.bar_type.instrument_id();
1607    inner
1608        .borrow_mut()
1609        .bar_close_prices
1610        .insert(instrument_id, bar.close);
1611    update_instrument_id(cache, clock.clone(), inner, config, &instrument_id);
1612}
1613
1614fn update_instrument_id(
1615    cache: Rc<RefCell<Cache>>,
1616    clock: Rc<RefCell<dyn Clock>>,
1617    inner: Rc<RefCell<PortfolioState>>,
1618    config: PortfolioConfig,
1619    instrument_id: &InstrumentId,
1620) {
1621    inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1622
1623    if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1624        return;
1625    }
1626
1627    let result_init;
1628    let mut result_maint = None;
1629
1630    let account = {
1631        let cache_ref = cache.borrow();
1632        let account = if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1633            account
1634        } else {
1635            log::error!(
1636                "Cannot update tick: no account registered for {}",
1637                instrument_id.venue
1638            );
1639            return;
1640        };
1641
1642        let mut cache_ref = cache.borrow_mut();
1643        let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1644            instrument.clone()
1645        } else {
1646            log::error!("Cannot update tick: no instrument found for {instrument_id}");
1647            return;
1648        };
1649
1650        // Clone the orders and positions to own the data
1651        let orders_open: Vec<OrderAny> = cache_ref
1652            .orders_open(None, Some(instrument_id), None, None)
1653            .iter()
1654            .map(|o| (*o).clone())
1655            .collect();
1656
1657        let positions_open: Vec<Position> = cache_ref
1658            .positions_open(None, Some(instrument_id), None, None)
1659            .iter()
1660            .map(|p| (*p).clone())
1661            .collect();
1662
1663        result_init = inner.borrow().accounts.update_orders(
1664            account,
1665            instrument.clone(),
1666            orders_open.iter().collect(),
1667            clock.borrow().timestamp_ns(),
1668        );
1669
1670        if let AccountAny::Margin(margin_account) = account {
1671            result_maint = inner.borrow().accounts.update_positions(
1672                margin_account,
1673                instrument,
1674                positions_open.iter().collect(),
1675                clock.borrow().timestamp_ns(),
1676            );
1677        }
1678
1679        if let Some((ref updated_account, _)) = result_init {
1680            cache_ref.update_account(updated_account.clone()).unwrap();
1681        }
1682        account.clone()
1683    };
1684
1685    let mut portfolio_clone = Portfolio {
1686        clock: clock.clone(),
1687        cache,
1688        inner: inner.clone(),
1689        config,
1690    };
1691
1692    let result_unrealized_pnl: Option<Money> =
1693        portfolio_clone.calculate_unrealized_pnl(instrument_id);
1694
1695    if result_init.is_some()
1696        && (matches!(account, AccountAny::Cash(_))
1697            || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1698    {
1699        inner.borrow_mut().pending_calcs.remove(instrument_id);
1700        if inner.borrow().pending_calcs.is_empty() {
1701            inner.borrow_mut().initialized = true;
1702        }
1703    }
1704}
1705
1706fn update_order(
1707    cache: Rc<RefCell<Cache>>,
1708    clock: Rc<RefCell<dyn Clock>>,
1709    inner: Rc<RefCell<PortfolioState>>,
1710    _config: PortfolioConfig,
1711    event: &OrderEventAny,
1712) {
1713    let cache_ref = cache.borrow();
1714    let account_id = match event.account_id() {
1715        Some(account_id) => account_id,
1716        None => {
1717            return; // No Account Assigned
1718        }
1719    };
1720
1721    let account = if let Some(account) = cache_ref.account(&account_id) {
1722        account
1723    } else {
1724        log::error!("Cannot update order: no account registered for {account_id}");
1725        return;
1726    };
1727
1728    match account {
1729        AccountAny::Cash(cash_account) => {
1730            if !cash_account.base.calculate_account_state {
1731                return;
1732            }
1733        }
1734        AccountAny::Margin(margin_account) => {
1735            if !margin_account.base.calculate_account_state {
1736                return;
1737            }
1738        }
1739    }
1740
1741    match event {
1742        OrderEventAny::Accepted(_)
1743        | OrderEventAny::Canceled(_)
1744        | OrderEventAny::Rejected(_)
1745        | OrderEventAny::Updated(_)
1746        | OrderEventAny::Filled(_) => {}
1747        _ => {
1748            return;
1749        }
1750    }
1751
1752    let cache_ref = cache.borrow();
1753    let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1754        order
1755    } else {
1756        log::error!(
1757            "Cannot update order: {} not found in the cache",
1758            event.client_order_id()
1759        );
1760        return; // No Order Found
1761    };
1762
1763    if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1764        return; // No change to account state
1765    }
1766
1767    let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1768        instrument_id
1769    } else {
1770        log::error!(
1771            "Cannot update order: no instrument found for {}",
1772            event.instrument_id()
1773        );
1774        return;
1775    };
1776
1777    if let OrderEventAny::Filled(order_filled) = event {
1778        let _ = inner.borrow().accounts.update_balances(
1779            account.clone(),
1780            instrument.clone(),
1781            *order_filled,
1782        );
1783
1784        let mut portfolio_clone = Portfolio {
1785            clock: clock.clone(),
1786            cache: cache.clone(),
1787            inner: inner.clone(),
1788            config: PortfolioConfig::default(), // TODO: TBD
1789        };
1790
1791        match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1792            Some(unrealized_pnl) => {
1793                inner
1794                    .borrow_mut()
1795                    .unrealized_pnls
1796                    .insert(event.instrument_id(), unrealized_pnl);
1797            }
1798            None => {
1799                log::error!(
1800                    "Failed to calculate unrealized PnL for instrument {}",
1801                    event.instrument_id()
1802                );
1803            }
1804        }
1805    }
1806
1807    let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None);
1808
1809    let account_state = inner.borrow_mut().accounts.update_orders(
1810        account,
1811        instrument.clone(),
1812        orders_open,
1813        clock.borrow().timestamp_ns(),
1814    );
1815
1816    let mut cache_ref = cache.borrow_mut();
1817    cache_ref.update_account(account.clone()).unwrap();
1818
1819    if let Some(account_state) = account_state {
1820        msgbus::publish(
1821            format!("events.account.{}", account.id()).into(),
1822            &account_state,
1823        );
1824    } else {
1825        log::debug!("Added pending calculation for {}", instrument.id());
1826        inner.borrow_mut().pending_calcs.insert(instrument.id());
1827    }
1828
1829    log::debug!("Updated {event}");
1830}
1831
1832fn update_position(
1833    cache: Rc<RefCell<Cache>>,
1834    clock: Rc<RefCell<dyn Clock>>,
1835    inner: Rc<RefCell<PortfolioState>>,
1836    _config: PortfolioConfig,
1837    event: &PositionEvent,
1838) {
1839    let instrument_id = event.instrument_id();
1840
1841    let positions_open: Vec<Position> = {
1842        let cache_ref = cache.borrow();
1843
1844        cache_ref
1845            .positions_open(None, Some(&instrument_id), None, None)
1846            .iter()
1847            .map(|o| (*o).clone())
1848            .collect()
1849    };
1850
1851    log::debug!("position fresh from cache -> {positions_open:?}");
1852
1853    let mut portfolio_clone = Portfolio {
1854        clock: clock.clone(),
1855        cache: cache.clone(),
1856        inner: inner.clone(),
1857        config: PortfolioConfig::default(), // TODO: TBD
1858    };
1859
1860    portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1861
1862    if let Some(calculated_unrealized_pnl) =
1863        portfolio_clone.calculate_unrealized_pnl(&instrument_id)
1864    {
1865        inner
1866            .borrow_mut()
1867            .unrealized_pnls
1868            .insert(event.instrument_id(), calculated_unrealized_pnl);
1869    } else {
1870        log::warn!(
1871            "Failed to calculate unrealized PnL for {}, marking as pending",
1872            event.instrument_id()
1873        );
1874        inner
1875            .borrow_mut()
1876            .pending_calcs
1877            .insert(event.instrument_id());
1878    }
1879
1880    if let Some(calculated_realized_pnl) = portfolio_clone.calculate_realized_pnl(&instrument_id) {
1881        inner
1882            .borrow_mut()
1883            .realized_pnls
1884            .insert(event.instrument_id(), calculated_realized_pnl);
1885    } else {
1886        log::warn!(
1887            "Failed to calculate realized PnL for {}, marking as pending",
1888            event.instrument_id()
1889        );
1890        inner
1891            .borrow_mut()
1892            .pending_calcs
1893            .insert(event.instrument_id());
1894    }
1895
1896    let cache_ref = cache.borrow();
1897    let account = cache_ref.account(&event.account_id());
1898
1899    if let Some(AccountAny::Margin(margin_account)) = account {
1900        if !margin_account.calculate_account_state {
1901            return; // Nothing to calculate
1902        }
1903
1904        let cache_ref = cache.borrow();
1905        let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1906            instrument
1907        } else {
1908            log::error!("Cannot update position: no instrument found for {instrument_id}");
1909            return;
1910        };
1911
1912        let result = inner.borrow_mut().accounts.update_positions(
1913            margin_account,
1914            instrument.clone(),
1915            positions_open.iter().collect(),
1916            clock.borrow().timestamp_ns(),
1917        );
1918        let mut cache_ref = cache.borrow_mut();
1919        if let Some((margin_account, _)) = result {
1920            cache_ref
1921                .update_account(AccountAny::Margin(margin_account))
1922                .unwrap();
1923        }
1924    } else if account.is_none() {
1925        log::error!(
1926            "Cannot update position: no account registered for {}",
1927            event.account_id()
1928        );
1929    }
1930}
1931
1932fn update_account(
1933    cache: Rc<RefCell<Cache>>,
1934    inner: Rc<RefCell<PortfolioState>>,
1935    event: &AccountState,
1936) {
1937    let mut cache_ref = cache.borrow_mut();
1938
1939    if let Some(existing) = cache_ref.account(&event.account_id) {
1940        let mut account = existing.clone();
1941        account.apply(event.clone());
1942
1943        if let Err(e) = cache_ref.update_account(account.clone()) {
1944            log::error!("Failed to update account: {e}");
1945            return;
1946        }
1947    } else {
1948        let account = match AccountAny::from_events(vec![event.clone()]) {
1949            Ok(account) => account,
1950            Err(e) => {
1951                log::error!("Failed to create account: {e}");
1952                return;
1953            }
1954        };
1955
1956        if let Err(e) = cache_ref.add_account(account) {
1957            log::error!("Failed to add account: {e}");
1958            return;
1959        }
1960    }
1961
1962    // Throttled logging logic
1963    let mut inner_ref = inner.borrow_mut();
1964    let should_log = if inner_ref.min_account_state_logging_interval_ns > 0 {
1965        let current_ts = event.ts_init.as_u64();
1966        let last_ts = inner_ref
1967            .last_account_state_log_ts
1968            .get(&event.account_id)
1969            .copied()
1970            .unwrap_or(0);
1971
1972        if last_ts == 0 || (current_ts - last_ts) >= inner_ref.min_account_state_logging_interval_ns
1973        {
1974            inner_ref
1975                .last_account_state_log_ts
1976                .insert(event.account_id, current_ts);
1977            true
1978        } else {
1979            false
1980        }
1981    } else {
1982        true // Throttling disabled, always log
1983    };
1984
1985    if should_log {
1986        log::info!("Updated {event}");
1987    }
1988}