nautilus_portfolio/
portfolio.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `Portfolio` for all environments.
17
18use std::{
19    cell::RefCell,
20    collections::{HashMap, HashSet},
21    fmt::Debug,
22    rc::Rc,
23};
24
25use nautilus_analysis::analyzer::PortfolioAnalyzer;
26use nautilus_common::{
27    cache::Cache,
28    clock::Clock,
29    msgbus::{
30        self,
31        handler::{ShareableMessageHandler, TypedMessageHandler},
32    },
33};
34use nautilus_core::{WeakCell, datetime::NANOSECONDS_IN_MILLISECOND};
35use nautilus_model::{
36    accounts::AccountAny,
37    data::{Bar, MarkPriceUpdate, QuoteTick},
38    enums::{OmsType, OrderSide, OrderType, PositionSide, PriceType},
39    events::{AccountState, OrderEventAny, position::PositionEvent},
40    identifiers::{AccountId, InstrumentId, PositionId, Venue},
41    instruments::{Instrument, InstrumentAny},
42    orders::{Order, OrderAny},
43    position::Position,
44    types::{Currency, Money, Price},
45};
46use rust_decimal::{Decimal, prelude::FromPrimitive};
47
48use crate::{config::PortfolioConfig, manager::AccountsManager};
49
50struct PortfolioState {
51    accounts: AccountsManager,
52    analyzer: PortfolioAnalyzer,
53    unrealized_pnls: HashMap<InstrumentId, Money>,
54    realized_pnls: HashMap<InstrumentId, Money>,
55    snapshot_sum_per_position: HashMap<PositionId, Money>,
56    snapshot_last_per_position: HashMap<PositionId, Money>,
57    snapshot_processed_counts: HashMap<PositionId, usize>,
58    net_positions: HashMap<InstrumentId, Decimal>,
59    pending_calcs: HashSet<InstrumentId>,
60    bar_close_prices: HashMap<InstrumentId, Price>,
61    initialized: bool,
62    last_account_state_log_ts: HashMap<AccountId, u64>,
63    min_account_state_logging_interval_ns: u64,
64}
65
66impl PortfolioState {
67    fn new(
68        clock: Rc<RefCell<dyn Clock>>,
69        cache: Rc<RefCell<Cache>>,
70        config: &PortfolioConfig,
71    ) -> Self {
72        let min_account_state_logging_interval_ns = config
73            .min_account_state_logging_interval_ms
74            .map_or(0, |ms| ms * NANOSECONDS_IN_MILLISECOND);
75
76        Self {
77            accounts: AccountsManager::new(clock, cache),
78            analyzer: PortfolioAnalyzer::default(),
79            unrealized_pnls: HashMap::new(),
80            realized_pnls: HashMap::new(),
81            snapshot_sum_per_position: HashMap::new(),
82            snapshot_last_per_position: HashMap::new(),
83            snapshot_processed_counts: HashMap::new(),
84            net_positions: HashMap::new(),
85            pending_calcs: HashSet::new(),
86            bar_close_prices: HashMap::new(),
87            initialized: false,
88            last_account_state_log_ts: HashMap::new(),
89            min_account_state_logging_interval_ns,
90        }
91    }
92
93    fn reset(&mut self) {
94        log::debug!("RESETTING");
95        self.net_positions.clear();
96        self.unrealized_pnls.clear();
97        self.realized_pnls.clear();
98        self.snapshot_sum_per_position.clear();
99        self.snapshot_last_per_position.clear();
100        self.snapshot_processed_counts.clear();
101        self.pending_calcs.clear();
102        self.last_account_state_log_ts.clear();
103        self.analyzer.reset();
104        log::debug!("READY");
105    }
106}
107
108pub struct Portfolio {
109    pub(crate) clock: Rc<RefCell<dyn Clock>>,
110    pub(crate) cache: Rc<RefCell<Cache>>,
111    inner: Rc<RefCell<PortfolioState>>,
112    config: PortfolioConfig,
113}
114
115impl Debug for Portfolio {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        f.debug_struct(stringify!(Portfolio)).finish()
118    }
119}
120
121impl Portfolio {
122    pub fn new(
123        cache: Rc<RefCell<Cache>>,
124        clock: Rc<RefCell<dyn Clock>>,
125        config: Option<PortfolioConfig>,
126    ) -> Self {
127        let config = config.unwrap_or_default();
128        let inner = Rc::new(RefCell::new(PortfolioState::new(
129            clock.clone(),
130            cache.clone(),
131            &config,
132        )));
133
134        Self::register_message_handlers(
135            cache.clone(),
136            clock.clone(),
137            inner.clone(),
138            config.clone(),
139        );
140
141        Self {
142            clock,
143            cache,
144            inner,
145            config,
146        }
147    }
148
149    fn register_message_handlers(
150        cache: Rc<RefCell<Cache>>,
151        clock: Rc<RefCell<dyn Clock>>,
152        inner: Rc<RefCell<PortfolioState>>,
153        config: PortfolioConfig,
154    ) {
155        let inner_weak = WeakCell::from(Rc::downgrade(&inner));
156
157        let update_account_handler = {
158            let cache = cache.clone();
159            let inner = inner_weak.clone();
160            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
161                move |event: &AccountState| {
162                    if let Some(inner_rc) = inner.upgrade() {
163                        update_account(cache.clone(), inner_rc.into(), event);
164                    }
165                },
166            )))
167        };
168
169        let update_position_handler = {
170            let cache = cache.clone();
171            let clock = clock.clone();
172            let inner = inner_weak.clone();
173            let config = config.clone();
174            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
175                move |event: &PositionEvent| {
176                    if let Some(inner_rc) = inner.upgrade() {
177                        update_position(
178                            cache.clone(),
179                            clock.clone(),
180                            inner_rc.into(),
181                            config.clone(),
182                            event,
183                        );
184                    }
185                },
186            )))
187        };
188
189        let update_quote_handler = {
190            let cache = cache.clone();
191            let clock = clock.clone();
192            let inner = inner_weak.clone();
193            let config = config.clone();
194            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
195                move |quote: &QuoteTick| {
196                    if let Some(inner_rc) = inner.upgrade() {
197                        update_quote_tick(
198                            cache.clone(),
199                            clock.clone(),
200                            inner_rc.into(),
201                            config.clone(),
202                            quote,
203                        );
204                    }
205                },
206            )))
207        };
208
209        let update_bar_handler = {
210            let cache = cache.clone();
211            let clock = clock.clone();
212            let inner = inner_weak.clone();
213            let config = config.clone();
214            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
215                if let Some(inner_rc) = inner.upgrade() {
216                    update_bar(
217                        cache.clone(),
218                        clock.clone(),
219                        inner_rc.into(),
220                        config.clone(),
221                        bar,
222                    );
223                }
224            })))
225        };
226
227        let update_mark_price_handler = {
228            let cache = cache.clone();
229            let clock = clock.clone();
230            let inner = inner_weak.clone();
231            let config = config.clone();
232            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
233                move |mark_price: &MarkPriceUpdate| {
234                    if let Some(inner_rc) = inner.upgrade() {
235                        update_instrument_id(
236                            cache.clone(),
237                            clock.clone(),
238                            inner_rc.into(),
239                            config.clone(),
240                            &mark_price.instrument_id,
241                        );
242                    }
243                },
244            )))
245        };
246
247        let update_order_handler = {
248            let cache = cache;
249            let clock = clock.clone();
250            let inner = inner_weak;
251            let config = config.clone();
252            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
253                move |event: &OrderEventAny| {
254                    if let Some(inner_rc) = inner.upgrade() {
255                        update_order(
256                            cache.clone(),
257                            clock.clone(),
258                            inner_rc.into(),
259                            config.clone(),
260                            event,
261                        );
262                    }
263                },
264            )))
265        };
266
267        msgbus::register(
268            "Portfolio.update_account".into(),
269            update_account_handler.clone(),
270        );
271
272        msgbus::subscribe("data.quotes.*".into(), update_quote_handler, Some(10));
273        if config.bar_updates {
274            msgbus::subscribe("data.bars.*".into(), update_bar_handler, Some(10));
275        }
276        if config.use_mark_prices {
277            msgbus::subscribe(
278                "data.mark_prices.*".into(),
279                update_mark_price_handler,
280                Some(10),
281            );
282        }
283        msgbus::subscribe("events.order.*".into(), update_order_handler, Some(10));
284        msgbus::subscribe(
285            "events.position.*".into(),
286            update_position_handler,
287            Some(10),
288        );
289        msgbus::subscribe("events.account.*".into(), update_account_handler, Some(10));
290    }
291
292    pub fn reset(&mut self) {
293        log::debug!("RESETTING");
294        self.inner.borrow_mut().reset();
295        log::debug!("READY");
296    }
297
298    // -- QUERIES ---------------------------------------------------------------------------------
299
300    /// Returns `true` if the portfolio has been initialized.
301    #[must_use]
302    pub fn is_initialized(&self) -> bool {
303        self.inner.borrow().initialized
304    }
305
306    /// Returns the locked balances for the given venue.
307    ///
308    /// Locked balances represent funds reserved for open orders.
309    #[must_use]
310    pub fn balances_locked(&self, venue: &Venue) -> HashMap<Currency, Money> {
311        self.cache.borrow().account_for_venue(venue).map_or_else(
312            || {
313                log::error!("Cannot get balances locked: no account generated for {venue}");
314                HashMap::new()
315            },
316            AccountAny::balances_locked,
317        )
318    }
319
320    /// Returns the initial margin requirements for the given venue.
321    ///
322    /// Only applicable for margin accounts. Returns empty map for cash accounts.
323    #[must_use]
324    pub fn margins_init(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
325        self.cache.borrow().account_for_venue(venue).map_or_else(
326            || {
327                log::error!(
328                    "Cannot get initial (order) margins: no account registered for {venue}"
329                );
330                HashMap::new()
331            },
332            |account| match account {
333                AccountAny::Margin(margin_account) => margin_account.initial_margins(),
334                AccountAny::Cash(_) => {
335                    log::warn!("Initial margins not applicable for cash account");
336                    HashMap::new()
337                }
338            },
339        )
340    }
341
342    /// Returns the maintenance margin requirements for the given venue.
343    ///
344    /// Only applicable for margin accounts. Returns empty map for cash accounts.
345    #[must_use]
346    pub fn margins_maint(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
347        self.cache.borrow().account_for_venue(venue).map_or_else(
348            || {
349                log::error!(
350                    "Cannot get maintenance (position) margins: no account registered for {venue}"
351                );
352                HashMap::new()
353            },
354            |account| match account {
355                AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
356                AccountAny::Cash(_) => {
357                    log::warn!("Maintenance margins not applicable for cash account");
358                    HashMap::new()
359                }
360            },
361        )
362    }
363
364    /// Returns the unrealized PnLs for all positions at the given venue.
365    ///
366    /// Calculates mark-to-market PnL based on current market prices.
367    #[must_use]
368    pub fn unrealized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
369        let instrument_ids = {
370            let cache = self.cache.borrow();
371            let positions = cache.positions(Some(venue), None, None, None);
372
373            if positions.is_empty() {
374                return HashMap::new(); // Nothing to calculate
375            }
376
377            let instrument_ids: HashSet<InstrumentId> =
378                positions.iter().map(|p| p.instrument_id).collect();
379
380            instrument_ids
381        };
382
383        let mut unrealized_pnls: HashMap<Currency, f64> = HashMap::new();
384
385        for instrument_id in instrument_ids {
386            if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
387                // PnL already calculated
388                *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
389                continue;
390            }
391
392            // Calculate PnL
393            match self.calculate_unrealized_pnl(&instrument_id) {
394                Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
395                None => continue,
396            }
397        }
398
399        unrealized_pnls
400            .into_iter()
401            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
402            .collect()
403    }
404
405    /// Returns the realized PnLs for all positions at the given venue.
406    ///
407    /// Calculates total realized profit and loss from closed positions.
408    #[must_use]
409    pub fn realized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
410        let instrument_ids = {
411            let cache = self.cache.borrow();
412            let positions = cache.positions(Some(venue), None, None, None);
413
414            if positions.is_empty() {
415                return HashMap::new(); // Nothing to calculate
416            }
417
418            let instrument_ids: HashSet<InstrumentId> =
419                positions.iter().map(|p| p.instrument_id).collect();
420
421            instrument_ids
422        };
423
424        let mut realized_pnls: HashMap<Currency, f64> = HashMap::new();
425
426        for instrument_id in instrument_ids {
427            if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
428                // PnL already calculated
429                *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
430                continue;
431            }
432
433            // Calculate PnL
434            match self.calculate_realized_pnl(&instrument_id) {
435                Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
436                None => continue,
437            }
438        }
439
440        realized_pnls
441            .into_iter()
442            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
443            .collect()
444    }
445
446    #[must_use]
447    pub fn net_exposures(&self, venue: &Venue) -> Option<HashMap<Currency, Money>> {
448        let cache = self.cache.borrow();
449        let account = if let Some(account) = cache.account_for_venue(venue) {
450            account
451        } else {
452            log::error!("Cannot calculate net exposures: no account registered for {venue}");
453            return None; // Cannot calculate
454        };
455
456        let positions_open = cache.positions_open(Some(venue), None, None, None);
457        if positions_open.is_empty() {
458            return Some(HashMap::new()); // Nothing to calculate
459        }
460
461        let mut net_exposures: HashMap<Currency, f64> = HashMap::new();
462
463        for position in positions_open {
464            let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
465                instrument
466            } else {
467                log::error!(
468                    "Cannot calculate net exposures: no instrument for {}",
469                    position.instrument_id
470                );
471                return None; // Cannot calculate
472            };
473
474            if position.side == PositionSide::Flat {
475                log::error!(
476                    "Cannot calculate net exposures: position is flat for {}",
477                    position.instrument_id
478                );
479                continue; // Nothing to calculate
480            }
481
482            let price = self.get_price(position)?;
483            let xrate = if let Some(xrate) =
484                self.calculate_xrate_to_base(instrument, account, position.entry)
485            {
486                xrate
487            } else {
488                log::error!(
489                    // TODO: Improve logging
490                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
491                    instrument.settlement_currency(),
492                    account.base_currency()
493                );
494                return None; // Cannot calculate
495            };
496
497            let settlement_currency = account
498                .base_currency()
499                .unwrap_or_else(|| instrument.settlement_currency());
500
501            let net_exposure = instrument
502                .calculate_notional_value(position.quantity, price, None)
503                .as_f64()
504                * xrate;
505
506            let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
507                .round()
508                / 10f64.powi(settlement_currency.precision.into());
509
510            *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
511        }
512
513        Some(
514            net_exposures
515                .into_iter()
516                .map(|(currency, amount)| (currency, Money::new(amount, currency)))
517                .collect(),
518        )
519    }
520
521    #[must_use]
522    pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
523        if let Some(pnl) = self
524            .inner
525            .borrow()
526            .unrealized_pnls
527            .get(instrument_id)
528            .copied()
529        {
530            return Some(pnl);
531        }
532
533        let pnl = self.calculate_unrealized_pnl(instrument_id)?;
534        self.inner
535            .borrow_mut()
536            .unrealized_pnls
537            .insert(*instrument_id, pnl);
538        Some(pnl)
539    }
540
541    #[must_use]
542    pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
543        if let Some(pnl) = self
544            .inner
545            .borrow()
546            .realized_pnls
547            .get(instrument_id)
548            .copied()
549        {
550            return Some(pnl);
551        }
552
553        let pnl = self.calculate_realized_pnl(instrument_id)?;
554        self.inner
555            .borrow_mut()
556            .realized_pnls
557            .insert(*instrument_id, pnl);
558        Some(pnl)
559    }
560
561    /// Returns the total PnL for the given instrument ID.
562    ///
563    /// Total PnL = Realized PnL + Unrealized PnL
564    #[must_use]
565    pub fn total_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
566        let realized = self.realized_pnl(instrument_id)?;
567        let unrealized = self.unrealized_pnl(instrument_id)?;
568
569        if realized.currency != unrealized.currency {
570            log::error!(
571                "Cannot calculate total PnL: currency mismatch {} vs {}",
572                realized.currency,
573                unrealized.currency
574            );
575            return None;
576        }
577
578        Some(Money::new(
579            realized.as_f64() + unrealized.as_f64(),
580            realized.currency,
581        ))
582    }
583
584    /// Returns the total PnLs for the given venue.
585    ///
586    /// Total PnL = Realized PnL + Unrealized PnL for each currency
587    #[must_use]
588    pub fn total_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
589        let realized_pnls = self.realized_pnls(venue);
590        let unrealized_pnls = self.unrealized_pnls(venue);
591
592        let mut total_pnls: HashMap<Currency, Money> = HashMap::new();
593
594        // Add realized PnLs
595        for (currency, realized) in realized_pnls {
596            total_pnls.insert(currency, realized);
597        }
598
599        // Add unrealized PnLs
600        for (currency, unrealized) in unrealized_pnls {
601            match total_pnls.get_mut(&currency) {
602                Some(total) => {
603                    *total = Money::new(total.as_f64() + unrealized.as_f64(), currency);
604                }
605                None => {
606                    total_pnls.insert(currency, unrealized);
607                }
608            }
609        }
610
611        total_pnls
612    }
613
614    #[must_use]
615    pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
616        let cache = self.cache.borrow();
617        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
618            account
619        } else {
620            log::error!(
621                "Cannot calculate net exposure: no account registered for {}",
622                instrument_id.venue
623            );
624            return None;
625        };
626
627        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
628            instrument
629        } else {
630            log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
631            return None;
632        };
633
634        let positions_open = cache.positions_open(
635            None, // Faster query filtering
636            Some(instrument_id),
637            None,
638            None,
639        );
640
641        if positions_open.is_empty() {
642            return Some(Money::new(0.0, instrument.settlement_currency()));
643        }
644
645        let mut net_exposure = 0.0;
646
647        for position in positions_open {
648            let price = self.get_price(position)?;
649            let xrate = if let Some(xrate) =
650                self.calculate_xrate_to_base(instrument, account, position.entry)
651            {
652                xrate
653            } else {
654                log::error!(
655                    // TODO: Improve logging
656                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
657                    instrument.settlement_currency(),
658                    account.base_currency()
659                );
660                return None; // Cannot calculate
661            };
662
663            let notional_value =
664                instrument.calculate_notional_value(position.quantity, price, None);
665            net_exposure += notional_value.as_f64() * xrate;
666        }
667
668        let settlement_currency = account
669            .base_currency()
670            .unwrap_or_else(|| instrument.settlement_currency());
671
672        Some(Money::new(net_exposure, settlement_currency))
673    }
674
675    #[must_use]
676    pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
677        self.inner
678            .borrow()
679            .net_positions
680            .get(instrument_id)
681            .copied()
682            .unwrap_or(Decimal::ZERO)
683    }
684
685    #[must_use]
686    pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
687        self.inner
688            .borrow()
689            .net_positions
690            .get(instrument_id)
691            .copied()
692            .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
693    }
694
695    #[must_use]
696    pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
697        self.inner
698            .borrow()
699            .net_positions
700            .get(instrument_id)
701            .copied()
702            .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
703    }
704
705    #[must_use]
706    pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
707        self.inner
708            .borrow()
709            .net_positions
710            .get(instrument_id)
711            .copied()
712            .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
713    }
714
715    #[must_use]
716    pub fn is_completely_flat(&self) -> bool {
717        for net_position in self.inner.borrow().net_positions.values() {
718            if *net_position != Decimal::ZERO {
719                return false;
720            }
721        }
722        true
723    }
724
725    // -- COMMANDS --------------------------------------------------------------------------------
726
727    /// Initializes account margin based on existing open orders.
728    ///
729    /// # Panics
730    ///
731    /// Panics if updating the cache with a mutated account fails.
732    pub fn initialize_orders(&mut self) {
733        let mut initialized = true;
734        let orders_and_instruments = {
735            let cache = self.cache.borrow();
736            let all_orders_open = cache.orders_open(None, None, None, None);
737
738            let mut instruments_with_orders = Vec::new();
739            let mut instruments = HashSet::new();
740
741            for order in &all_orders_open {
742                instruments.insert(order.instrument_id());
743            }
744
745            for instrument_id in instruments {
746                if let Some(instrument) = cache.instrument(&instrument_id) {
747                    let orders = cache
748                        .orders_open(None, Some(&instrument_id), None, None)
749                        .into_iter()
750                        .cloned()
751                        .collect::<Vec<OrderAny>>();
752                    instruments_with_orders.push((instrument.clone(), orders));
753                } else {
754                    log::error!(
755                        "Cannot update initial (order) margin: no instrument found for {instrument_id}"
756                    );
757                    initialized = false;
758                    break;
759                }
760            }
761            instruments_with_orders
762        };
763
764        for (instrument, orders_open) in &orders_and_instruments {
765            let mut cache = self.cache.borrow_mut();
766            let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
767                account
768            } else {
769                log::error!(
770                    "Cannot update initial (order) margin: no account registered for {}",
771                    instrument.id().venue
772                );
773                initialized = false;
774                break;
775            };
776
777            let result = self.inner.borrow_mut().accounts.update_orders(
778                account,
779                instrument.clone(),
780                orders_open.iter().collect(),
781                self.clock.borrow().timestamp_ns(),
782            );
783
784            match result {
785                Some((updated_account, _)) => {
786                    cache.update_account(updated_account).unwrap();
787                }
788                None => {
789                    initialized = false;
790                }
791            }
792        }
793
794        let total_orders = orders_and_instruments
795            .into_iter()
796            .map(|(_, orders)| orders.len())
797            .sum::<usize>();
798
799        log::info!(
800            "Initialized {} open order{}",
801            total_orders,
802            if total_orders == 1 { "" } else { "s" }
803        );
804
805        self.inner.borrow_mut().initialized = initialized;
806    }
807
808    /// Initializes account margin based on existing open positions.
809    ///
810    /// # Panics
811    ///
812    /// Panics if calculation of PnL or updating the cache with a mutated account fails.
813    pub fn initialize_positions(&mut self) {
814        self.inner.borrow_mut().unrealized_pnls.clear();
815        self.inner.borrow_mut().realized_pnls.clear();
816        let all_positions_open: Vec<Position>;
817        let mut instruments = HashSet::new();
818        {
819            let cache = self.cache.borrow();
820            all_positions_open = cache
821                .positions_open(None, None, None, None)
822                .into_iter()
823                .cloned()
824                .collect();
825            for position in &all_positions_open {
826                instruments.insert(position.instrument_id);
827            }
828        }
829
830        let mut initialized = true;
831
832        for instrument_id in instruments {
833            let positions_open: Vec<Position> = {
834                let cache = self.cache.borrow();
835                cache
836                    .positions_open(None, Some(&instrument_id), None, None)
837                    .into_iter()
838                    .cloned()
839                    .collect()
840            };
841
842            self.update_net_position(&instrument_id, positions_open);
843
844            if let Some(calculated_unrealized_pnl) = self.calculate_unrealized_pnl(&instrument_id) {
845                self.inner
846                    .borrow_mut()
847                    .unrealized_pnls
848                    .insert(instrument_id, calculated_unrealized_pnl);
849            } else {
850                log::warn!(
851                    "Failed to calculate unrealized PnL for {instrument_id}, marking as pending"
852                );
853                self.inner.borrow_mut().pending_calcs.insert(instrument_id);
854            }
855
856            if let Some(calculated_realized_pnl) = self.calculate_realized_pnl(&instrument_id) {
857                self.inner
858                    .borrow_mut()
859                    .realized_pnls
860                    .insert(instrument_id, calculated_realized_pnl);
861            } else {
862                log::warn!(
863                    "Failed to calculate realized PnL for {instrument_id}, marking as pending"
864                );
865                self.inner.borrow_mut().pending_calcs.insert(instrument_id);
866            }
867
868            let cache = self.cache.borrow();
869            let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
870                account
871            } else {
872                log::error!(
873                    "Cannot update maintenance (position) margin: no account registered for {}",
874                    instrument_id.venue
875                );
876                initialized = false;
877                break;
878            };
879
880            let account = match account {
881                AccountAny::Cash(_) => continue,
882                AccountAny::Margin(margin_account) => margin_account,
883            };
884
885            let mut cache = self.cache.borrow_mut();
886            let instrument = if let Some(instrument) = cache.instrument(&instrument_id) {
887                instrument
888            } else {
889                log::error!(
890                    "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
891                );
892                initialized = false;
893                break;
894            };
895
896            let result = self.inner.borrow_mut().accounts.update_positions(
897                account,
898                instrument.clone(),
899                self.cache
900                    .borrow()
901                    .positions_open(None, Some(&instrument_id), None, None),
902                self.clock.borrow().timestamp_ns(),
903            );
904
905            match result {
906                Some((updated_account, _)) => {
907                    cache
908                        .update_account(AccountAny::Margin(updated_account))
909                        .unwrap();
910                }
911                None => {
912                    initialized = false;
913                }
914            }
915        }
916
917        let open_count = all_positions_open.len();
918        self.inner.borrow_mut().initialized = initialized;
919        log::info!(
920            "Initialized {} open position{}",
921            open_count,
922            if open_count == 1 { "" } else { "s" }
923        );
924    }
925
926    /// Updates portfolio calculations based on a new quote tick.
927    ///
928    /// Recalculates unrealized PnL for positions affected by the quote update.
929    pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
930        update_quote_tick(
931            self.cache.clone(),
932            self.clock.clone(),
933            self.inner.clone(),
934            self.config.clone(),
935            quote,
936        );
937    }
938
939    /// Updates portfolio calculations based on a new bar.
940    ///
941    /// Updates cached bar close prices and recalculates unrealized PnL.
942    pub fn update_bar(&mut self, bar: &Bar) {
943        update_bar(
944            self.cache.clone(),
945            self.clock.clone(),
946            self.inner.clone(),
947            self.config.clone(),
948            bar,
949        );
950    }
951
952    /// Updates portfolio with a new account state event.
953    pub fn update_account(&mut self, event: &AccountState) {
954        update_account(self.cache.clone(), self.inner.clone(), event);
955    }
956
957    /// Updates portfolio calculations based on an order event.
958    ///
959    /// Handles balance updates for order fills and margin calculations for order changes.
960    pub fn update_order(&mut self, event: &OrderEventAny) {
961        update_order(
962            self.cache.clone(),
963            self.clock.clone(),
964            self.inner.clone(),
965            self.config.clone(),
966            event,
967        );
968    }
969
970    /// Updates portfolio calculations based on a position event.
971    ///
972    /// Recalculates net positions, unrealized PnL, and margin requirements.
973    pub fn update_position(&mut self, event: &PositionEvent) {
974        update_position(
975            self.cache.clone(),
976            self.clock.clone(),
977            self.inner.clone(),
978            self.config.clone(),
979            event,
980        );
981    }
982
983    // -- INTERNAL --------------------------------------------------------------------------------
984
985    fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
986        let mut net_position = Decimal::ZERO;
987
988        for open_position in positions_open {
989            log::debug!("open_position: {open_position}");
990            net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
991        }
992
993        let existing_position = self.net_position(instrument_id);
994        if existing_position != net_position {
995            self.inner
996                .borrow_mut()
997                .net_positions
998                .insert(*instrument_id, net_position);
999            log::info!("{instrument_id} net_position={net_position}");
1000        }
1001    }
1002
1003    fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1004        let cache = self.cache.borrow();
1005        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1006            account
1007        } else {
1008            log::error!(
1009                "Cannot calculate unrealized PnL: no account registered for {}",
1010                instrument_id.venue
1011            );
1012            return None;
1013        };
1014
1015        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1016            instrument
1017        } else {
1018            log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
1019            return None;
1020        };
1021
1022        let currency = account
1023            .base_currency()
1024            .unwrap_or_else(|| instrument.settlement_currency());
1025
1026        let positions_open = cache.positions_open(
1027            None, // Faster query filtering
1028            Some(instrument_id),
1029            None,
1030            None,
1031        );
1032
1033        if positions_open.is_empty() {
1034            return Some(Money::new(0.0, currency));
1035        }
1036
1037        let mut total_pnl = 0.0;
1038
1039        for position in positions_open {
1040            if position.instrument_id != *instrument_id {
1041                continue; // Nothing to calculate
1042            }
1043
1044            if position.side == PositionSide::Flat {
1045                continue; // Nothing to calculate
1046            }
1047
1048            let price = if let Some(price) = self.get_price(position) {
1049                price
1050            } else {
1051                log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
1052                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1053                return None; // Cannot calculate
1054            };
1055
1056            let mut pnl = position.unrealized_pnl(price).as_f64();
1057
1058            if let Some(base_currency) = account.base_currency() {
1059                let xrate = if let Some(xrate) =
1060                    self.calculate_xrate_to_base(instrument, account, position.entry)
1061                {
1062                    xrate
1063                } else {
1064                    log::error!(
1065                        // TODO: Improve logging
1066                        "Cannot calculate unrealized PnL: insufficient data for {}/{}",
1067                        instrument.settlement_currency(),
1068                        base_currency
1069                    );
1070                    self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1071                    return None; // Cannot calculate
1072                };
1073
1074                let scale = 10f64.powi(currency.precision.into());
1075                pnl = ((pnl * xrate) * scale).round() / scale;
1076            }
1077
1078            total_pnl += pnl;
1079        }
1080
1081        Some(Money::new(total_pnl, currency))
1082    }
1083
1084    fn ensure_snapshot_pnls_cached_for(&mut self, instrument_id: &InstrumentId) {
1085        // Performance: This method maintains an incremental cache of snapshot PnLs
1086        // It only deserializes new snapshots that haven't been processed yet
1087        // Tracks sum and last PnL per position for efficient NETTING OMS support
1088
1089        // Get all position IDs that have snapshots for this instrument
1090        let snapshot_position_ids = self.cache.borrow().position_snapshot_ids(instrument_id);
1091
1092        if snapshot_position_ids.is_empty() {
1093            return; // Nothing to process
1094        }
1095
1096        let mut rebuild = false;
1097
1098        // Detect purge/reset (count regression) to trigger full rebuild
1099        for position_id in &snapshot_position_ids {
1100            let position_snapshots = self.cache.borrow().position_snapshot_bytes(position_id);
1101            let curr_count = position_snapshots.map_or(0, |s| {
1102                // Count the number of snapshots (they're serialized as JSON objects)
1103                s.split(|&b| b == b'{').count() - 1
1104            });
1105            let prev_count = self
1106                .inner
1107                .borrow()
1108                .snapshot_processed_counts
1109                .get(position_id)
1110                .copied()
1111                .unwrap_or(0);
1112
1113            if prev_count > curr_count {
1114                rebuild = true;
1115                break;
1116            }
1117        }
1118
1119        if rebuild {
1120            // Full rebuild: process all snapshots from scratch
1121            for position_id in &snapshot_position_ids {
1122                if let Some(position_snapshots) =
1123                    self.cache.borrow().position_snapshot_bytes(position_id)
1124                {
1125                    let mut sum_pnl: Option<Money> = None;
1126                    let mut last_pnl: Option<Money> = None;
1127
1128                    // Snapshots are concatenated JSON objects
1129                    let mut start = 0;
1130                    let mut depth = 0;
1131                    let mut in_string = false;
1132                    let mut escape_next = false;
1133
1134                    for (i, &byte) in position_snapshots.iter().enumerate() {
1135                        if escape_next {
1136                            escape_next = false;
1137                            continue;
1138                        }
1139
1140                        if byte == b'\\' && in_string {
1141                            escape_next = true;
1142                            continue;
1143                        }
1144
1145                        if byte == b'"' && !escape_next {
1146                            in_string = !in_string;
1147                        }
1148
1149                        if !in_string {
1150                            if byte == b'{' {
1151                                if depth == 0 {
1152                                    start = i;
1153                                }
1154                                depth += 1;
1155                            } else if byte == b'}' {
1156                                depth -= 1;
1157                                if depth == 0
1158                                    && let Ok(snapshot) = serde_json::from_slice::<Position>(
1159                                        &position_snapshots[start..=i],
1160                                    )
1161                                    && let Some(realized_pnl) = snapshot.realized_pnl
1162                                {
1163                                    if let Some(ref mut sum) = sum_pnl {
1164                                        if sum.currency == realized_pnl.currency {
1165                                            *sum = Money::new(
1166                                                sum.as_f64() + realized_pnl.as_f64(),
1167                                                sum.currency,
1168                                            );
1169                                        }
1170                                    } else {
1171                                        sum_pnl = Some(realized_pnl);
1172                                    }
1173                                    last_pnl = Some(realized_pnl);
1174                                }
1175                            }
1176                        }
1177                    }
1178
1179                    let mut inner = self.inner.borrow_mut();
1180                    if let Some(sum) = sum_pnl {
1181                        inner.snapshot_sum_per_position.insert(*position_id, sum);
1182                        if let Some(last) = last_pnl {
1183                            inner.snapshot_last_per_position.insert(*position_id, last);
1184                        }
1185                    } else {
1186                        inner.snapshot_sum_per_position.remove(position_id);
1187                        inner.snapshot_last_per_position.remove(position_id);
1188                    }
1189
1190                    let snapshot_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1191                    inner
1192                        .snapshot_processed_counts
1193                        .insert(*position_id, snapshot_count);
1194                }
1195            }
1196        } else {
1197            // Incremental path: only process new snapshots
1198            for position_id in &snapshot_position_ids {
1199                if let Some(position_snapshots) =
1200                    self.cache.borrow().position_snapshot_bytes(position_id)
1201                {
1202                    let curr_count = position_snapshots.split(|&b| b == b'{').count() - 1;
1203                    let prev_count = self
1204                        .inner
1205                        .borrow()
1206                        .snapshot_processed_counts
1207                        .get(position_id)
1208                        .copied()
1209                        .unwrap_or(0);
1210
1211                    if prev_count >= curr_count {
1212                        continue;
1213                    }
1214
1215                    let mut sum_pnl = self
1216                        .inner
1217                        .borrow()
1218                        .snapshot_sum_per_position
1219                        .get(position_id)
1220                        .copied();
1221                    let mut last_pnl = self
1222                        .inner
1223                        .borrow()
1224                        .snapshot_last_per_position
1225                        .get(position_id)
1226                        .copied();
1227
1228                    // Process only new snapshots
1229                    let mut start = 0;
1230                    let mut depth = 0;
1231                    let mut in_string = false;
1232                    let mut escape_next = false;
1233                    let mut snapshot_index = 0;
1234
1235                    for (i, &byte) in position_snapshots.iter().enumerate() {
1236                        if escape_next {
1237                            escape_next = false;
1238                            continue;
1239                        }
1240
1241                        if byte == b'\\' && in_string {
1242                            escape_next = true;
1243                            continue;
1244                        }
1245
1246                        if byte == b'"' && !escape_next {
1247                            in_string = !in_string;
1248                        }
1249
1250                        if !in_string {
1251                            if byte == b'{' {
1252                                if depth == 0 {
1253                                    start = i;
1254                                }
1255                                depth += 1;
1256                            } else if byte == b'}' {
1257                                depth -= 1;
1258                                if depth == 0 {
1259                                    snapshot_index += 1;
1260                                    // Only process new snapshots
1261                                    if snapshot_index > prev_count
1262                                        && let Ok(snapshot) = serde_json::from_slice::<Position>(
1263                                            &position_snapshots[start..=i],
1264                                        )
1265                                        && let Some(realized_pnl) = snapshot.realized_pnl
1266                                    {
1267                                        if let Some(ref mut sum) = sum_pnl {
1268                                            if sum.currency == realized_pnl.currency {
1269                                                *sum = Money::new(
1270                                                    sum.as_f64() + realized_pnl.as_f64(),
1271                                                    sum.currency,
1272                                                );
1273                                            }
1274                                        } else {
1275                                            sum_pnl = Some(realized_pnl);
1276                                        }
1277                                        last_pnl = Some(realized_pnl);
1278                                    }
1279                                }
1280                            }
1281                        }
1282                    }
1283
1284                    let mut inner = self.inner.borrow_mut();
1285                    if let Some(sum) = sum_pnl {
1286                        inner.snapshot_sum_per_position.insert(*position_id, sum);
1287                        if let Some(last) = last_pnl {
1288                            inner.snapshot_last_per_position.insert(*position_id, last);
1289                        }
1290                    }
1291                    inner
1292                        .snapshot_processed_counts
1293                        .insert(*position_id, curr_count);
1294                }
1295            }
1296        }
1297    }
1298
1299    fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
1300        // Ensure snapshot PnLs are cached for this instrument
1301        self.ensure_snapshot_pnls_cached_for(instrument_id);
1302
1303        let cache = self.cache.borrow();
1304        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
1305            account
1306        } else {
1307            log::error!(
1308                "Cannot calculate realized PnL: no account registered for {}",
1309                instrument_id.venue
1310            );
1311            return None;
1312        };
1313
1314        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1315            instrument
1316        } else {
1317            log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
1318            return None;
1319        };
1320
1321        let currency = account
1322            .base_currency()
1323            .unwrap_or_else(|| instrument.settlement_currency());
1324
1325        let positions = cache.positions(
1326            None, // Faster query filtering
1327            Some(instrument_id),
1328            None,
1329            None,
1330        );
1331
1332        let snapshot_position_ids = cache.position_snapshot_ids(instrument_id);
1333
1334        // Check if we need to use NETTING OMS logic
1335        let is_netting = positions
1336            .iter()
1337            .any(|p| cache.oms_type(&p.id) == Some(OmsType::Netting));
1338
1339        let mut total_pnl = 0.0;
1340
1341        if is_netting && !snapshot_position_ids.is_empty() {
1342            // NETTING OMS: Apply 3-case rule for position cycles
1343
1344            for position_id in &snapshot_position_ids {
1345                let is_active = positions.iter().any(|p| p.id == *position_id);
1346
1347                if is_active {
1348                    // Case 1 & 2: Active position - use only the last snapshot PnL
1349                    let last_pnl = self
1350                        .inner
1351                        .borrow()
1352                        .snapshot_last_per_position
1353                        .get(position_id)
1354                        .copied();
1355                    if let Some(last_pnl) = last_pnl {
1356                        let mut pnl = last_pnl.as_f64();
1357
1358                        if let Some(base_currency) = account.base_currency()
1359                            && let Some(position) = positions.iter().find(|p| p.id == *position_id)
1360                        {
1361                            let xrate = if let Some(xrate) =
1362                                self.calculate_xrate_to_base(instrument, account, position.entry)
1363                            {
1364                                xrate
1365                            } else {
1366                                log::error!(
1367                                    "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1368                                    instrument.settlement_currency(),
1369                                    base_currency
1370                                );
1371                                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1372                                return Some(Money::new(0.0, currency));
1373                            };
1374
1375                            let scale = 10f64.powi(currency.precision.into());
1376                            pnl = ((pnl * xrate) * scale).round() / scale;
1377                        }
1378
1379                        total_pnl += pnl;
1380                    }
1381                } else {
1382                    // Case 3: Closed position - use sum of all snapshot PnLs
1383                    let sum_pnl = self
1384                        .inner
1385                        .borrow()
1386                        .snapshot_sum_per_position
1387                        .get(position_id)
1388                        .copied();
1389                    if let Some(sum_pnl) = sum_pnl {
1390                        let mut pnl = sum_pnl.as_f64();
1391
1392                        if let Some(base_currency) = account.base_currency() {
1393                            // For closed positions, we don't have entry price, use current rates
1394                            let xrate = cache.get_xrate(
1395                                instrument_id.venue,
1396                                instrument.settlement_currency(),
1397                                base_currency,
1398                                PriceType::Mid,
1399                            );
1400
1401                            if let Some(xrate) = xrate {
1402                                let scale = 10f64.powi(currency.precision.into());
1403                                pnl = ((pnl * xrate) * scale).round() / scale;
1404                            } else {
1405                                log::error!(
1406                                    "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1407                                    instrument.settlement_currency(),
1408                                    base_currency
1409                                );
1410                                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1411                                return Some(Money::new(0.0, currency));
1412                            }
1413                        }
1414
1415                        total_pnl += pnl;
1416                    }
1417                }
1418            }
1419
1420            // Add realized PnL from current active positions
1421            for position in positions {
1422                if position.instrument_id != *instrument_id {
1423                    continue;
1424                }
1425
1426                if let Some(realized_pnl) = position.realized_pnl {
1427                    let mut pnl = realized_pnl.as_f64();
1428
1429                    if let Some(base_currency) = account.base_currency() {
1430                        let xrate = if let Some(xrate) =
1431                            self.calculate_xrate_to_base(instrument, account, position.entry)
1432                        {
1433                            xrate
1434                        } else {
1435                            log::error!(
1436                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1437                                instrument.settlement_currency(),
1438                                base_currency
1439                            );
1440                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1441                            return Some(Money::new(0.0, currency));
1442                        };
1443
1444                        let scale = 10f64.powi(currency.precision.into());
1445                        pnl = ((pnl * xrate) * scale).round() / scale;
1446                    }
1447
1448                    total_pnl += pnl;
1449                }
1450            }
1451        } else {
1452            // HEDGING OMS or no snapshots: Simple aggregation
1453            // Add snapshot PnLs (sum all)
1454            for position_id in &snapshot_position_ids {
1455                let sum_pnl = self
1456                    .inner
1457                    .borrow()
1458                    .snapshot_sum_per_position
1459                    .get(position_id)
1460                    .copied();
1461                if let Some(sum_pnl) = sum_pnl {
1462                    let mut pnl = sum_pnl.as_f64();
1463
1464                    if let Some(base_currency) = account.base_currency() {
1465                        let xrate = cache.get_xrate(
1466                            instrument_id.venue,
1467                            instrument.settlement_currency(),
1468                            base_currency,
1469                            PriceType::Mid,
1470                        );
1471
1472                        if let Some(xrate) = xrate {
1473                            let scale = 10f64.powi(currency.precision.into());
1474                            pnl = ((pnl * xrate) * scale).round() / scale;
1475                        } else {
1476                            log::error!(
1477                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1478                                instrument.settlement_currency(),
1479                                base_currency
1480                            );
1481                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1482                            return Some(Money::new(0.0, currency));
1483                        }
1484                    }
1485
1486                    total_pnl += pnl;
1487                }
1488            }
1489
1490            // Add realized PnL from current positions
1491            for position in positions {
1492                if position.instrument_id != *instrument_id {
1493                    continue;
1494                }
1495
1496                if let Some(realized_pnl) = position.realized_pnl {
1497                    let mut pnl = realized_pnl.as_f64();
1498
1499                    if let Some(base_currency) = account.base_currency() {
1500                        let xrate = if let Some(xrate) =
1501                            self.calculate_xrate_to_base(instrument, account, position.entry)
1502                        {
1503                            xrate
1504                        } else {
1505                            log::error!(
1506                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1507                                instrument.settlement_currency(),
1508                                base_currency
1509                            );
1510                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1511                            return Some(Money::new(0.0, currency));
1512                        };
1513
1514                        let scale = 10f64.powi(currency.precision.into());
1515                        pnl = ((pnl * xrate) * scale).round() / scale;
1516                    }
1517
1518                    total_pnl += pnl;
1519                }
1520            }
1521        }
1522
1523        Some(Money::new(total_pnl, currency))
1524    }
1525
1526    fn get_price(&self, position: &Position) -> Option<Price> {
1527        let cache = self.cache.borrow();
1528        let instrument_id = &position.instrument_id;
1529
1530        // Check for mark price first if configured
1531        if self.config.use_mark_prices
1532            && let Some(mark_price) = cache.mark_price(instrument_id)
1533        {
1534            return Some(mark_price.value);
1535        }
1536
1537        // Fall back to bid/ask based on position side
1538        let price_type = match position.side {
1539            PositionSide::Long => PriceType::Bid,
1540            PositionSide::Short => PriceType::Ask,
1541            _ => panic!("invalid `PositionSide`, was {}", position.side),
1542        };
1543
1544        cache
1545            .price(instrument_id, price_type)
1546            .or_else(|| cache.price(instrument_id, PriceType::Last))
1547            .or_else(|| {
1548                self.inner
1549                    .borrow()
1550                    .bar_close_prices
1551                    .get(instrument_id)
1552                    .copied()
1553            })
1554    }
1555
1556    fn calculate_xrate_to_base(
1557        &self,
1558        instrument: &InstrumentAny,
1559        account: &AccountAny,
1560        side: OrderSide,
1561    ) -> Option<f64> {
1562        if !self.config.convert_to_account_base_currency {
1563            return Some(1.0); // No conversion needed
1564        }
1565
1566        match account.base_currency() {
1567            None => Some(1.0), // No conversion needed
1568            Some(base_currency) => {
1569                let cache = self.cache.borrow();
1570
1571                if self.config.use_mark_xrates {
1572                    return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1573                }
1574
1575                let price_type = if side == OrderSide::Buy {
1576                    PriceType::Bid
1577                } else {
1578                    PriceType::Ask
1579                };
1580
1581                cache.get_xrate(
1582                    instrument.id().venue,
1583                    instrument.settlement_currency(),
1584                    base_currency,
1585                    price_type,
1586                )
1587            }
1588        }
1589    }
1590}
1591
1592// Helper functions
1593fn update_quote_tick(
1594    cache: Rc<RefCell<Cache>>,
1595    clock: Rc<RefCell<dyn Clock>>,
1596    inner: Rc<RefCell<PortfolioState>>,
1597    config: PortfolioConfig,
1598    quote: &QuoteTick,
1599) {
1600    update_instrument_id(cache, clock.clone(), inner, config, &quote.instrument_id);
1601}
1602
1603fn update_bar(
1604    cache: Rc<RefCell<Cache>>,
1605    clock: Rc<RefCell<dyn Clock>>,
1606    inner: Rc<RefCell<PortfolioState>>,
1607    config: PortfolioConfig,
1608    bar: &Bar,
1609) {
1610    let instrument_id = bar.bar_type.instrument_id();
1611    inner
1612        .borrow_mut()
1613        .bar_close_prices
1614        .insert(instrument_id, bar.close);
1615    update_instrument_id(cache, clock.clone(), inner, config, &instrument_id);
1616}
1617
1618fn update_instrument_id(
1619    cache: Rc<RefCell<Cache>>,
1620    clock: Rc<RefCell<dyn Clock>>,
1621    inner: Rc<RefCell<PortfolioState>>,
1622    config: PortfolioConfig,
1623    instrument_id: &InstrumentId,
1624) {
1625    inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1626
1627    if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1628        return;
1629    }
1630
1631    let result_init;
1632    let mut result_maint = None;
1633
1634    let account = {
1635        let cache_ref = cache.borrow();
1636        let account = if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1637            account
1638        } else {
1639            log::error!(
1640                "Cannot update tick: no account registered for {}",
1641                instrument_id.venue
1642            );
1643            return;
1644        };
1645
1646        let mut cache_ref = cache.borrow_mut();
1647        let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1648            instrument.clone()
1649        } else {
1650            log::error!("Cannot update tick: no instrument found for {instrument_id}");
1651            return;
1652        };
1653
1654        // Clone the orders and positions to own the data
1655        let orders_open: Vec<OrderAny> = cache_ref
1656            .orders_open(None, Some(instrument_id), None, None)
1657            .iter()
1658            .map(|o| (*o).clone())
1659            .collect();
1660
1661        let positions_open: Vec<Position> = cache_ref
1662            .positions_open(None, Some(instrument_id), None, None)
1663            .iter()
1664            .map(|p| (*p).clone())
1665            .collect();
1666
1667        result_init = inner.borrow().accounts.update_orders(
1668            account,
1669            instrument.clone(),
1670            orders_open.iter().collect(),
1671            clock.borrow().timestamp_ns(),
1672        );
1673
1674        if let AccountAny::Margin(margin_account) = account {
1675            result_maint = inner.borrow().accounts.update_positions(
1676                margin_account,
1677                instrument,
1678                positions_open.iter().collect(),
1679                clock.borrow().timestamp_ns(),
1680            );
1681        }
1682
1683        if let Some((ref updated_account, _)) = result_init {
1684            cache_ref.update_account(updated_account.clone()).unwrap();
1685        }
1686        account.clone()
1687    };
1688
1689    let mut portfolio_clone = Portfolio {
1690        clock: clock.clone(),
1691        cache,
1692        inner: inner.clone(),
1693        config,
1694    };
1695
1696    let result_unrealized_pnl: Option<Money> =
1697        portfolio_clone.calculate_unrealized_pnl(instrument_id);
1698
1699    if result_init.is_some()
1700        && (matches!(account, AccountAny::Cash(_))
1701            || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1702    {
1703        inner.borrow_mut().pending_calcs.remove(instrument_id);
1704        if inner.borrow().pending_calcs.is_empty() {
1705            inner.borrow_mut().initialized = true;
1706        }
1707    }
1708}
1709
1710fn update_order(
1711    cache: Rc<RefCell<Cache>>,
1712    clock: Rc<RefCell<dyn Clock>>,
1713    inner: Rc<RefCell<PortfolioState>>,
1714    _config: PortfolioConfig,
1715    event: &OrderEventAny,
1716) {
1717    let cache_ref = cache.borrow();
1718    let account_id = match event.account_id() {
1719        Some(account_id) => account_id,
1720        None => {
1721            return; // No Account Assigned
1722        }
1723    };
1724
1725    let account = if let Some(account) = cache_ref.account(&account_id) {
1726        account
1727    } else {
1728        log::error!("Cannot update order: no account registered for {account_id}");
1729        return;
1730    };
1731
1732    match account {
1733        AccountAny::Cash(cash_account) => {
1734            if !cash_account.base.calculate_account_state {
1735                return;
1736            }
1737        }
1738        AccountAny::Margin(margin_account) => {
1739            if !margin_account.base.calculate_account_state {
1740                return;
1741            }
1742        }
1743    }
1744
1745    match event {
1746        OrderEventAny::Accepted(_)
1747        | OrderEventAny::Canceled(_)
1748        | OrderEventAny::Rejected(_)
1749        | OrderEventAny::Updated(_)
1750        | OrderEventAny::Filled(_) => {}
1751        _ => {
1752            return;
1753        }
1754    }
1755
1756    let cache_ref = cache.borrow();
1757    let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1758        order
1759    } else {
1760        log::error!(
1761            "Cannot update order: {} not found in the cache",
1762            event.client_order_id()
1763        );
1764        return; // No Order Found
1765    };
1766
1767    if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1768        return; // No change to account state
1769    }
1770
1771    let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1772        instrument_id
1773    } else {
1774        log::error!(
1775            "Cannot update order: no instrument found for {}",
1776            event.instrument_id()
1777        );
1778        return;
1779    };
1780
1781    if let OrderEventAny::Filled(order_filled) = event {
1782        let _ = inner.borrow().accounts.update_balances(
1783            account.clone(),
1784            instrument.clone(),
1785            *order_filled,
1786        );
1787
1788        let mut portfolio_clone = Portfolio {
1789            clock: clock.clone(),
1790            cache: cache.clone(),
1791            inner: inner.clone(),
1792            config: PortfolioConfig::default(), // TODO: TBD
1793        };
1794
1795        match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1796            Some(unrealized_pnl) => {
1797                inner
1798                    .borrow_mut()
1799                    .unrealized_pnls
1800                    .insert(event.instrument_id(), unrealized_pnl);
1801            }
1802            None => {
1803                log::error!(
1804                    "Failed to calculate unrealized PnL for instrument {}",
1805                    event.instrument_id()
1806                );
1807            }
1808        }
1809    }
1810
1811    let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None);
1812
1813    let account_state = inner.borrow_mut().accounts.update_orders(
1814        account,
1815        instrument.clone(),
1816        orders_open,
1817        clock.borrow().timestamp_ns(),
1818    );
1819
1820    let mut cache_ref = cache.borrow_mut();
1821    cache_ref.update_account(account.clone()).unwrap();
1822
1823    if let Some(account_state) = account_state {
1824        msgbus::publish(
1825            format!("events.account.{}", account.id()).into(),
1826            &account_state,
1827        );
1828    } else {
1829        log::debug!("Added pending calculation for {}", instrument.id());
1830        inner.borrow_mut().pending_calcs.insert(instrument.id());
1831    }
1832
1833    log::debug!("Updated {event}");
1834}
1835
1836fn update_position(
1837    cache: Rc<RefCell<Cache>>,
1838    clock: Rc<RefCell<dyn Clock>>,
1839    inner: Rc<RefCell<PortfolioState>>,
1840    _config: PortfolioConfig,
1841    event: &PositionEvent,
1842) {
1843    let instrument_id = event.instrument_id();
1844
1845    let positions_open: Vec<Position> = {
1846        let cache_ref = cache.borrow();
1847
1848        cache_ref
1849            .positions_open(None, Some(&instrument_id), None, None)
1850            .iter()
1851            .map(|o| (*o).clone())
1852            .collect()
1853    };
1854
1855    log::debug!("postion fresh from cache -> {positions_open:?}");
1856
1857    let mut portfolio_clone = Portfolio {
1858        clock: clock.clone(),
1859        cache: cache.clone(),
1860        inner: inner.clone(),
1861        config: PortfolioConfig::default(), // TODO: TBD
1862    };
1863
1864    portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1865
1866    if let Some(calculated_unrealized_pnl) =
1867        portfolio_clone.calculate_unrealized_pnl(&instrument_id)
1868    {
1869        inner
1870            .borrow_mut()
1871            .unrealized_pnls
1872            .insert(event.instrument_id(), calculated_unrealized_pnl);
1873    } else {
1874        log::warn!(
1875            "Failed to calculate unrealized PnL for {}, marking as pending",
1876            event.instrument_id()
1877        );
1878        inner
1879            .borrow_mut()
1880            .pending_calcs
1881            .insert(event.instrument_id());
1882    }
1883
1884    if let Some(calculated_realized_pnl) = portfolio_clone.calculate_realized_pnl(&instrument_id) {
1885        inner
1886            .borrow_mut()
1887            .realized_pnls
1888            .insert(event.instrument_id(), calculated_realized_pnl);
1889    } else {
1890        log::warn!(
1891            "Failed to calculate realized PnL for {}, marking as pending",
1892            event.instrument_id()
1893        );
1894        inner
1895            .borrow_mut()
1896            .pending_calcs
1897            .insert(event.instrument_id());
1898    }
1899
1900    let cache_ref = cache.borrow();
1901    let account = cache_ref.account(&event.account_id());
1902
1903    if let Some(AccountAny::Margin(margin_account)) = account {
1904        if !margin_account.calculate_account_state {
1905            return; // Nothing to calculate
1906        }
1907
1908        let cache_ref = cache.borrow();
1909        let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1910            instrument
1911        } else {
1912            log::error!("Cannot update position: no instrument found for {instrument_id}");
1913            return;
1914        };
1915
1916        let result = inner.borrow_mut().accounts.update_positions(
1917            margin_account,
1918            instrument.clone(),
1919            positions_open.iter().collect(),
1920            clock.borrow().timestamp_ns(),
1921        );
1922        let mut cache_ref = cache.borrow_mut();
1923        if let Some((margin_account, _)) = result {
1924            cache_ref
1925                .update_account(AccountAny::Margin(margin_account))
1926                .unwrap();
1927        }
1928    } else if account.is_none() {
1929        log::error!(
1930            "Cannot update position: no account registered for {}",
1931            event.account_id()
1932        );
1933    }
1934}
1935
1936fn update_account(
1937    cache: Rc<RefCell<Cache>>,
1938    inner: Rc<RefCell<PortfolioState>>,
1939    event: &AccountState,
1940) {
1941    let mut cache_ref = cache.borrow_mut();
1942
1943    if let Some(existing) = cache_ref.account(&event.account_id) {
1944        let mut account = existing.clone();
1945        account.apply(event.clone());
1946
1947        if let Err(e) = cache_ref.update_account(account.clone()) {
1948            log::error!("Failed to update account: {e}");
1949            return;
1950        }
1951    } else {
1952        let account = match AccountAny::from_events(vec![event.clone()]) {
1953            Ok(account) => account,
1954            Err(e) => {
1955                log::error!("Failed to create account: {e}");
1956                return;
1957            }
1958        };
1959
1960        if let Err(e) = cache_ref.add_account(account) {
1961            log::error!("Failed to add account: {e}");
1962            return;
1963        }
1964    }
1965
1966    // Throttled logging logic
1967    let mut inner_ref = inner.borrow_mut();
1968    let should_log = if inner_ref.min_account_state_logging_interval_ns > 0 {
1969        let current_ts = event.ts_init.as_u64();
1970        let last_ts = inner_ref
1971            .last_account_state_log_ts
1972            .get(&event.account_id)
1973            .copied()
1974            .unwrap_or(0);
1975
1976        if last_ts == 0 || (current_ts - last_ts) >= inner_ref.min_account_state_logging_interval_ns
1977        {
1978            inner_ref
1979                .last_account_state_log_ts
1980                .insert(event.account_id, current_ts);
1981            true
1982        } else {
1983            false
1984        }
1985    } else {
1986        true // Throttling disabled, always log
1987    };
1988
1989    if should_log {
1990        log::info!("Updated {event}");
1991    }
1992}