nautilus_portfolio/
portfolio.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `Portfolio` for all environments.
17
18use std::{
19    cell::RefCell,
20    collections::{HashMap, HashSet},
21    fmt::Debug,
22    rc::Rc,
23};
24
25use nautilus_analysis::analyzer::PortfolioAnalyzer;
26use nautilus_common::{
27    cache::Cache,
28    clock::Clock,
29    msgbus::{
30        self,
31        handler::{ShareableMessageHandler, TypedMessageHandler},
32    },
33};
34use nautilus_core::{WeakCell, datetime::NANOSECONDS_IN_MILLISECOND};
35use nautilus_model::{
36    accounts::AccountAny,
37    data::{Bar, QuoteTick},
38    enums::{OrderSide, OrderType, PositionSide, PriceType},
39    events::{AccountState, OrderEventAny, position::PositionEvent},
40    identifiers::{AccountId, InstrumentId, Venue},
41    instruments::{Instrument, InstrumentAny},
42    orders::{Order, OrderAny},
43    position::Position,
44    types::{Currency, Money, Price},
45};
46use rust_decimal::{Decimal, prelude::FromPrimitive};
47
48use crate::{config::PortfolioConfig, manager::AccountsManager};
49
50struct PortfolioState {
51    accounts: AccountsManager,
52    analyzer: PortfolioAnalyzer,
53    unrealized_pnls: HashMap<InstrumentId, Money>,
54    realized_pnls: HashMap<InstrumentId, Money>,
55    net_positions: HashMap<InstrumentId, Decimal>,
56    pending_calcs: HashSet<InstrumentId>,
57    bar_close_prices: HashMap<InstrumentId, Price>,
58    initialized: bool,
59    last_account_state_log_ts: HashMap<AccountId, u64>,
60    min_account_state_logging_interval_ns: u64,
61}
62
63impl PortfolioState {
64    fn new(
65        clock: Rc<RefCell<dyn Clock>>,
66        cache: Rc<RefCell<Cache>>,
67        config: &PortfolioConfig,
68    ) -> Self {
69        let min_account_state_logging_interval_ns = config
70            .min_account_state_logging_interval_ms
71            .map_or(0, |ms| ms * NANOSECONDS_IN_MILLISECOND);
72
73        Self {
74            accounts: AccountsManager::new(clock, cache),
75            analyzer: PortfolioAnalyzer::default(),
76            unrealized_pnls: HashMap::new(),
77            realized_pnls: HashMap::new(),
78            net_positions: HashMap::new(),
79            pending_calcs: HashSet::new(),
80            bar_close_prices: HashMap::new(),
81            initialized: false,
82            last_account_state_log_ts: HashMap::new(),
83            min_account_state_logging_interval_ns,
84        }
85    }
86
87    fn reset(&mut self) {
88        log::debug!("RESETTING");
89        self.net_positions.clear();
90        self.unrealized_pnls.clear();
91        self.realized_pnls.clear();
92        self.pending_calcs.clear();
93        self.last_account_state_log_ts.clear();
94        self.analyzer.reset();
95        log::debug!("READY");
96    }
97}
98
99pub struct Portfolio {
100    pub(crate) clock: Rc<RefCell<dyn Clock>>,
101    pub(crate) cache: Rc<RefCell<Cache>>,
102    inner: Rc<RefCell<PortfolioState>>,
103    config: PortfolioConfig,
104}
105
106impl Debug for Portfolio {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        f.debug_struct(stringify!(Portfolio)).finish()
109    }
110}
111
112impl Portfolio {
113    pub fn new(
114        cache: Rc<RefCell<Cache>>,
115        clock: Rc<RefCell<dyn Clock>>,
116        config: Option<PortfolioConfig>,
117    ) -> Self {
118        let config = config.unwrap_or_default();
119        let inner = Rc::new(RefCell::new(PortfolioState::new(
120            clock.clone(),
121            cache.clone(),
122            &config,
123        )));
124
125        Self::register_message_handlers(
126            cache.clone(),
127            clock.clone(),
128            inner.clone(),
129            config.bar_updates,
130        );
131
132        Self {
133            clock,
134            cache,
135            inner,
136            config,
137        }
138    }
139
140    fn register_message_handlers(
141        cache: Rc<RefCell<Cache>>,
142        clock: Rc<RefCell<dyn Clock>>,
143        inner: Rc<RefCell<PortfolioState>>,
144        bar_updates: bool,
145    ) {
146        let inner_weak = WeakCell::from(Rc::downgrade(&inner));
147
148        let update_account_handler = {
149            let cache = cache.clone();
150            let inner = inner_weak.clone();
151            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
152                move |event: &AccountState| {
153                    if let Some(inner_rc) = inner.upgrade() {
154                        update_account(cache.clone(), inner_rc.into(), event);
155                    }
156                },
157            )))
158        };
159
160        let update_position_handler = {
161            let cache = cache.clone();
162            let clock = clock.clone();
163            let inner = inner_weak.clone();
164            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
165                move |event: &PositionEvent| {
166                    if let Some(inner_rc) = inner.upgrade() {
167                        update_position(cache.clone(), clock.clone(), inner_rc.into(), event);
168                    }
169                },
170            )))
171        };
172
173        let update_quote_handler = {
174            let cache = cache.clone();
175            let clock = clock.clone();
176            let inner = inner_weak.clone();
177            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
178                move |quote: &QuoteTick| {
179                    if let Some(inner_rc) = inner.upgrade() {
180                        update_quote_tick(cache.clone(), clock.clone(), inner_rc.into(), quote);
181                    }
182                },
183            )))
184        };
185
186        let update_bar_handler = {
187            let cache = cache.clone();
188            let clock = clock.clone();
189            let inner = inner_weak.clone();
190            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
191                if let Some(inner_rc) = inner.upgrade() {
192                    update_bar(cache.clone(), clock.clone(), inner_rc.into(), bar);
193                }
194            })))
195        };
196
197        let update_order_handler = {
198            let cache = cache;
199            let clock = clock.clone();
200            let inner = inner_weak;
201            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
202                move |event: &OrderEventAny| {
203                    if let Some(inner_rc) = inner.upgrade() {
204                        update_order(cache.clone(), clock.clone(), inner_rc.into(), event);
205                    }
206                },
207            )))
208        };
209
210        msgbus::register(
211            "Portfolio.update_account".into(),
212            update_account_handler.clone(),
213        );
214
215        msgbus::subscribe("data.quotes.*".into(), update_quote_handler, Some(10));
216        if bar_updates {
217            msgbus::subscribe("data.quotes.*EXTERNAL".into(), update_bar_handler, Some(10));
218        }
219        msgbus::subscribe("events.order.*".into(), update_order_handler, Some(10));
220        msgbus::subscribe(
221            "events.position.*".into(),
222            update_position_handler,
223            Some(10),
224        );
225        msgbus::subscribe("events.account.*".into(), update_account_handler, Some(10));
226    }
227
228    pub fn reset(&mut self) {
229        log::debug!("RESETTING");
230        self.inner.borrow_mut().reset();
231        log::debug!("READY");
232    }
233
234    // -- QUERIES ---------------------------------------------------------------------------------
235
236    /// Returns `true` if the portfolio has been initialized.
237    #[must_use]
238    pub fn is_initialized(&self) -> bool {
239        self.inner.borrow().initialized
240    }
241
242    /// Returns the locked balances for the given venue.
243    ///
244    /// Locked balances represent funds reserved for open orders.
245    #[must_use]
246    pub fn balances_locked(&self, venue: &Venue) -> HashMap<Currency, Money> {
247        self.cache.borrow().account_for_venue(venue).map_or_else(
248            || {
249                log::error!("Cannot get balances locked: no account generated for {venue}");
250                HashMap::new()
251            },
252            AccountAny::balances_locked,
253        )
254    }
255
256    /// Returns the initial margin requirements for the given venue.
257    ///
258    /// Only applicable for margin accounts. Returns empty map for cash accounts.
259    #[must_use]
260    pub fn margins_init(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
261        self.cache.borrow().account_for_venue(venue).map_or_else(
262            || {
263                log::error!(
264                    "Cannot get initial (order) margins: no account registered for {venue}"
265                );
266                HashMap::new()
267            },
268            |account| match account {
269                AccountAny::Margin(margin_account) => margin_account.initial_margins(),
270                AccountAny::Cash(_) => {
271                    log::warn!("Initial margins not applicable for cash account");
272                    HashMap::new()
273                }
274            },
275        )
276    }
277
278    /// Returns the maintenance margin requirements for the given venue.
279    ///
280    /// Only applicable for margin accounts. Returns empty map for cash accounts.
281    #[must_use]
282    pub fn margins_maint(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
283        self.cache.borrow().account_for_venue(venue).map_or_else(
284            || {
285                log::error!(
286                    "Cannot get maintenance (position) margins: no account registered for {venue}"
287                );
288                HashMap::new()
289            },
290            |account| match account {
291                AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
292                AccountAny::Cash(_) => {
293                    log::warn!("Maintenance margins not applicable for cash account");
294                    HashMap::new()
295                }
296            },
297        )
298    }
299
300    /// Returns the unrealized PnLs for all positions at the given venue.
301    ///
302    /// Calculates mark-to-market PnL based on current market prices.
303    #[must_use]
304    pub fn unrealized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
305        let instrument_ids = {
306            let cache = self.cache.borrow();
307            let positions = cache.positions(Some(venue), None, None, None);
308
309            if positions.is_empty() {
310                return HashMap::new(); // Nothing to calculate
311            }
312
313            let instrument_ids: HashSet<InstrumentId> =
314                positions.iter().map(|p| p.instrument_id).collect();
315
316            instrument_ids
317        };
318
319        let mut unrealized_pnls: HashMap<Currency, f64> = HashMap::new();
320
321        for instrument_id in instrument_ids {
322            if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
323                // PnL already calculated
324                *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
325                continue;
326            }
327
328            // Calculate PnL
329            match self.calculate_unrealized_pnl(&instrument_id) {
330                Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
331                None => continue,
332            }
333        }
334
335        unrealized_pnls
336            .into_iter()
337            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
338            .collect()
339    }
340
341    /// Returns the realized PnLs for all positions at the given venue.
342    ///
343    /// Calculates total realized profit and loss from closed positions.
344    #[must_use]
345    pub fn realized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
346        let instrument_ids = {
347            let cache = self.cache.borrow();
348            let positions = cache.positions(Some(venue), None, None, None);
349
350            if positions.is_empty() {
351                return HashMap::new(); // Nothing to calculate
352            }
353
354            let instrument_ids: HashSet<InstrumentId> =
355                positions.iter().map(|p| p.instrument_id).collect();
356
357            instrument_ids
358        };
359
360        let mut realized_pnls: HashMap<Currency, f64> = HashMap::new();
361
362        for instrument_id in instrument_ids {
363            if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
364                // PnL already calculated
365                *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
366                continue;
367            }
368
369            // Calculate PnL
370            match self.calculate_realized_pnl(&instrument_id) {
371                Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
372                None => continue,
373            }
374        }
375
376        realized_pnls
377            .into_iter()
378            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
379            .collect()
380    }
381
382    #[must_use]
383    pub fn net_exposures(&self, venue: &Venue) -> Option<HashMap<Currency, Money>> {
384        let cache = self.cache.borrow();
385        let account = if let Some(account) = cache.account_for_venue(venue) {
386            account
387        } else {
388            log::error!("Cannot calculate net exposures: no account registered for {venue}");
389            return None; // Cannot calculate
390        };
391
392        let positions_open = cache.positions_open(Some(venue), None, None, None);
393        if positions_open.is_empty() {
394            return Some(HashMap::new()); // Nothing to calculate
395        }
396
397        let mut net_exposures: HashMap<Currency, f64> = HashMap::new();
398
399        for position in positions_open {
400            let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
401                instrument
402            } else {
403                log::error!(
404                    "Cannot calculate net exposures: no instrument for {}",
405                    position.instrument_id
406                );
407                return None; // Cannot calculate
408            };
409
410            if position.side == PositionSide::Flat {
411                log::error!(
412                    "Cannot calculate net exposures: position is flat for {}",
413                    position.instrument_id
414                );
415                continue; // Nothing to calculate
416            }
417
418            let price = self.get_price(position)?;
419            let xrate = if let Some(xrate) =
420                self.calculate_xrate_to_base(instrument, account, position.entry)
421            {
422                xrate
423            } else {
424                log::error!(
425                    // TODO: Improve logging
426                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
427                    instrument.settlement_currency(),
428                    account.base_currency()
429                );
430                return None; // Cannot calculate
431            };
432
433            let settlement_currency = account
434                .base_currency()
435                .unwrap_or_else(|| instrument.settlement_currency());
436
437            let net_exposure = instrument
438                .calculate_notional_value(position.quantity, price, None)
439                .as_f64()
440                * xrate;
441
442            let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
443                .round()
444                / 10f64.powi(settlement_currency.precision.into());
445
446            *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
447        }
448
449        Some(
450            net_exposures
451                .into_iter()
452                .map(|(currency, amount)| (currency, Money::new(amount, currency)))
453                .collect(),
454        )
455    }
456
457    #[must_use]
458    pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
459        if let Some(pnl) = self
460            .inner
461            .borrow()
462            .unrealized_pnls
463            .get(instrument_id)
464            .copied()
465        {
466            return Some(pnl);
467        }
468
469        let pnl = self.calculate_unrealized_pnl(instrument_id)?;
470        self.inner
471            .borrow_mut()
472            .unrealized_pnls
473            .insert(*instrument_id, pnl);
474        Some(pnl)
475    }
476
477    #[must_use]
478    pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
479        if let Some(pnl) = self
480            .inner
481            .borrow()
482            .realized_pnls
483            .get(instrument_id)
484            .copied()
485        {
486            return Some(pnl);
487        }
488
489        let pnl = self.calculate_realized_pnl(instrument_id)?;
490        self.inner
491            .borrow_mut()
492            .realized_pnls
493            .insert(*instrument_id, pnl);
494        Some(pnl)
495    }
496
497    #[must_use]
498    pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
499        let cache = self.cache.borrow();
500        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
501            account
502        } else {
503            log::error!(
504                "Cannot calculate net exposure: no account registered for {}",
505                instrument_id.venue
506            );
507            return None;
508        };
509
510        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
511            instrument
512        } else {
513            log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
514            return None;
515        };
516
517        let positions_open = cache.positions_open(
518            None, // Faster query filtering
519            Some(instrument_id),
520            None,
521            None,
522        );
523
524        if positions_open.is_empty() {
525            return Some(Money::new(0.0, instrument.settlement_currency()));
526        }
527
528        let mut net_exposure = 0.0;
529
530        for position in positions_open {
531            let price = self.get_price(position)?;
532            let xrate = if let Some(xrate) =
533                self.calculate_xrate_to_base(instrument, account, position.entry)
534            {
535                xrate
536            } else {
537                log::error!(
538                    // TODO: Improve logging
539                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
540                    instrument.settlement_currency(),
541                    account.base_currency()
542                );
543                return None; // Cannot calculate
544            };
545
546            let notional_value =
547                instrument.calculate_notional_value(position.quantity, price, None);
548            net_exposure += notional_value.as_f64() * xrate;
549        }
550
551        let settlement_currency = account
552            .base_currency()
553            .unwrap_or_else(|| instrument.settlement_currency());
554
555        Some(Money::new(net_exposure, settlement_currency))
556    }
557
558    #[must_use]
559    pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
560        self.inner
561            .borrow()
562            .net_positions
563            .get(instrument_id)
564            .copied()
565            .unwrap_or(Decimal::ZERO)
566    }
567
568    #[must_use]
569    pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
570        self.inner
571            .borrow()
572            .net_positions
573            .get(instrument_id)
574            .copied()
575            .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
576    }
577
578    #[must_use]
579    pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
580        self.inner
581            .borrow()
582            .net_positions
583            .get(instrument_id)
584            .copied()
585            .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
586    }
587
588    #[must_use]
589    pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
590        self.inner
591            .borrow()
592            .net_positions
593            .get(instrument_id)
594            .copied()
595            .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
596    }
597
598    #[must_use]
599    pub fn is_completely_flat(&self) -> bool {
600        for net_position in self.inner.borrow().net_positions.values() {
601            if *net_position != Decimal::ZERO {
602                return false;
603            }
604        }
605        true
606    }
607
608    // -- COMMANDS --------------------------------------------------------------------------------
609
610    /// Initializes account margin based on existing open orders.
611    ///
612    /// # Panics
613    ///
614    /// Panics if updating the cache with a mutated account fails.
615    pub fn initialize_orders(&mut self) {
616        let mut initialized = true;
617        let orders_and_instruments = {
618            let cache = self.cache.borrow();
619            let all_orders_open = cache.orders_open(None, None, None, None);
620
621            let mut instruments_with_orders = Vec::new();
622            let mut instruments = HashSet::new();
623
624            for order in &all_orders_open {
625                instruments.insert(order.instrument_id());
626            }
627
628            for instrument_id in instruments {
629                if let Some(instrument) = cache.instrument(&instrument_id) {
630                    let orders = cache
631                        .orders_open(None, Some(&instrument_id), None, None)
632                        .into_iter()
633                        .cloned()
634                        .collect::<Vec<OrderAny>>();
635                    instruments_with_orders.push((instrument.clone(), orders));
636                } else {
637                    log::error!(
638                        "Cannot update initial (order) margin: no instrument found for {instrument_id}"
639                    );
640                    initialized = false;
641                    break;
642                }
643            }
644            instruments_with_orders
645        };
646
647        for (instrument, orders_open) in &orders_and_instruments {
648            let mut cache = self.cache.borrow_mut();
649            let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
650                account
651            } else {
652                log::error!(
653                    "Cannot update initial (order) margin: no account registered for {}",
654                    instrument.id().venue
655                );
656                initialized = false;
657                break;
658            };
659
660            let result = self.inner.borrow_mut().accounts.update_orders(
661                account,
662                instrument.clone(),
663                orders_open.iter().collect(),
664                self.clock.borrow().timestamp_ns(),
665            );
666
667            match result {
668                Some((updated_account, _)) => {
669                    cache.add_account(updated_account).unwrap(); // Temp Fix to update the mutated account
670                }
671                None => {
672                    initialized = false;
673                }
674            }
675        }
676
677        let total_orders = orders_and_instruments
678            .into_iter()
679            .map(|(_, orders)| orders.len())
680            .sum::<usize>();
681
682        log::info!(
683            "Initialized {} open order{}",
684            total_orders,
685            if total_orders == 1 { "" } else { "s" }
686        );
687
688        self.inner.borrow_mut().initialized = initialized;
689    }
690
691    /// Initializes account margin based on existing open positions.
692    ///
693    /// # Panics
694    ///
695    /// Panics if calculation of PnL or updating the cache with a mutated account fails.
696    pub fn initialize_positions(&mut self) {
697        self.inner.borrow_mut().unrealized_pnls.clear();
698        self.inner.borrow_mut().realized_pnls.clear();
699        let all_positions_open: Vec<Position>;
700        let mut instruments = HashSet::new();
701        {
702            let cache = self.cache.borrow();
703            all_positions_open = cache
704                .positions_open(None, None, None, None)
705                .into_iter()
706                .cloned()
707                .collect();
708            for position in &all_positions_open {
709                instruments.insert(position.instrument_id);
710            }
711        }
712
713        let mut initialized = true;
714
715        for instrument_id in instruments {
716            let positions_open: Vec<Position> = {
717                let cache = self.cache.borrow();
718                cache
719                    .positions_open(None, Some(&instrument_id), None, None)
720                    .into_iter()
721                    .cloned()
722                    .collect()
723            };
724
725            self.update_net_position(&instrument_id, positions_open);
726
727            let calculated_unrealized_pnl = self
728                .calculate_unrealized_pnl(&instrument_id)
729                .expect("Failed to calculate unrealized PnL");
730            let calculated_realized_pnl = self
731                .calculate_realized_pnl(&instrument_id)
732                .expect("Failed to calculate realized PnL");
733
734            self.inner
735                .borrow_mut()
736                .unrealized_pnls
737                .insert(instrument_id, calculated_unrealized_pnl);
738            self.inner
739                .borrow_mut()
740                .realized_pnls
741                .insert(instrument_id, calculated_realized_pnl);
742
743            let cache = self.cache.borrow();
744            let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
745                account
746            } else {
747                log::error!(
748                    "Cannot update maintenance (position) margin: no account registered for {}",
749                    instrument_id.venue
750                );
751                initialized = false;
752                break;
753            };
754
755            let account = match account {
756                AccountAny::Cash(_) => continue,
757                AccountAny::Margin(margin_account) => margin_account,
758            };
759
760            let mut cache = self.cache.borrow_mut();
761            let instrument = if let Some(instrument) = cache.instrument(&instrument_id) {
762                instrument
763            } else {
764                log::error!(
765                    "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
766                );
767                initialized = false;
768                break;
769            };
770
771            let result = self.inner.borrow_mut().accounts.update_positions(
772                account,
773                instrument.clone(),
774                self.cache
775                    .borrow()
776                    .positions_open(None, Some(&instrument_id), None, None),
777                self.clock.borrow().timestamp_ns(),
778            );
779
780            match result {
781                Some((updated_account, _)) => {
782                    cache
783                        .add_account(AccountAny::Margin(updated_account)) // Temp Fix to update the mutated account
784                        .unwrap();
785                }
786                None => {
787                    initialized = false;
788                }
789            }
790        }
791
792        let open_count = all_positions_open.len();
793        self.inner.borrow_mut().initialized = initialized;
794        log::info!(
795            "Initialized {} open position{}",
796            open_count,
797            if open_count == 1 { "" } else { "s" }
798        );
799    }
800
801    /// Updates portfolio calculations based on a new quote tick.
802    ///
803    /// Recalculates unrealized PnL for positions affected by the quote update.
804    pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
805        update_quote_tick(
806            self.cache.clone(),
807            self.clock.clone(),
808            self.inner.clone(),
809            quote,
810        );
811    }
812
813    /// Updates portfolio calculations based on a new bar.
814    ///
815    /// Updates cached bar close prices and recalculates unrealized PnL.
816    pub fn update_bar(&mut self, bar: &Bar) {
817        update_bar(
818            self.cache.clone(),
819            self.clock.clone(),
820            self.inner.clone(),
821            bar,
822        );
823    }
824
825    /// Updates portfolio with a new account state event.
826    pub fn update_account(&mut self, event: &AccountState) {
827        update_account(self.cache.clone(), self.inner.clone(), event);
828    }
829
830    /// Updates portfolio calculations based on an order event.
831    ///
832    /// Handles balance updates for order fills and margin calculations for order changes.
833    pub fn update_order(&mut self, event: &OrderEventAny) {
834        update_order(
835            self.cache.clone(),
836            self.clock.clone(),
837            self.inner.clone(),
838            event,
839        );
840    }
841
842    /// Updates portfolio calculations based on a position event.
843    ///
844    /// Recalculates net positions, unrealized PnL, and margin requirements.
845    pub fn update_position(&mut self, event: &PositionEvent) {
846        update_position(
847            self.cache.clone(),
848            self.clock.clone(),
849            self.inner.clone(),
850            event,
851        );
852    }
853
854    // -- INTERNAL --------------------------------------------------------------------------------
855
856    fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
857        let mut net_position = Decimal::ZERO;
858
859        for open_position in positions_open {
860            log::debug!("open_position: {open_position}");
861            net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
862        }
863
864        let existing_position = self.net_position(instrument_id);
865        if existing_position != net_position {
866            self.inner
867                .borrow_mut()
868                .net_positions
869                .insert(*instrument_id, net_position);
870            log::info!("{instrument_id} net_position={net_position}");
871        }
872    }
873
874    fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
875        let cache = self.cache.borrow();
876        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
877            account
878        } else {
879            log::error!(
880                "Cannot calculate unrealized PnL: no account registered for {}",
881                instrument_id.venue
882            );
883            return None;
884        };
885
886        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
887            instrument
888        } else {
889            log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
890            return None;
891        };
892
893        let currency = account
894            .base_currency()
895            .unwrap_or_else(|| instrument.settlement_currency());
896
897        let positions_open = cache.positions_open(
898            None, // Faster query filtering
899            Some(instrument_id),
900            None,
901            None,
902        );
903
904        if positions_open.is_empty() {
905            return Some(Money::new(0.0, currency));
906        }
907
908        let mut total_pnl = 0.0;
909
910        for position in positions_open {
911            if position.instrument_id != *instrument_id {
912                continue; // Nothing to calculate
913            }
914
915            if position.side == PositionSide::Flat {
916                continue; // Nothing to calculate
917            }
918
919            let price = if let Some(price) = self.get_price(position) {
920                price
921            } else {
922                log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
923                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
924                return None; // Cannot calculate
925            };
926
927            let mut pnl = position.unrealized_pnl(price).as_f64();
928
929            if let Some(base_currency) = account.base_currency() {
930                let xrate = if let Some(xrate) =
931                    self.calculate_xrate_to_base(instrument, account, position.entry)
932                {
933                    xrate
934                } else {
935                    log::error!(
936                        // TODO: Improve logging
937                        "Cannot calculate unrealized PnL: insufficient data for {}/{}",
938                        instrument.settlement_currency(),
939                        base_currency
940                    );
941                    self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
942                    return None; // Cannot calculate
943                };
944
945                let scale = 10f64.powi(currency.precision.into());
946                pnl = ((pnl * xrate) * scale).round() / scale;
947            }
948
949            total_pnl += pnl;
950        }
951
952        Some(Money::new(total_pnl, currency))
953    }
954
955    fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
956        let cache = self.cache.borrow();
957        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
958            account
959        } else {
960            log::error!(
961                "Cannot calculate realized PnL: no account registered for {}",
962                instrument_id.venue
963            );
964            return None;
965        };
966
967        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
968            instrument
969        } else {
970            log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
971            return None;
972        };
973
974        let currency = account
975            .base_currency()
976            .unwrap_or_else(|| instrument.settlement_currency());
977
978        let positions = cache.positions(
979            None, // Faster query filtering
980            Some(instrument_id),
981            None,
982            None,
983        );
984
985        if positions.is_empty() {
986            return Some(Money::new(0.0, currency));
987        }
988
989        let mut total_pnl = 0.0;
990
991        for position in positions {
992            if position.instrument_id != *instrument_id {
993                continue; // Nothing to calculate
994            }
995
996            if position.realized_pnl.is_none() {
997                continue; // Nothing to calculate
998            }
999
1000            let mut pnl = position.realized_pnl?.as_f64();
1001
1002            if let Some(base_currency) = account.base_currency() {
1003                let xrate = if let Some(xrate) =
1004                    self.calculate_xrate_to_base(instrument, account, position.entry)
1005                {
1006                    xrate
1007                } else {
1008                    log::error!(
1009                        "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1010                        instrument.settlement_currency(),
1011                        base_currency
1012                    );
1013                    self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1014                    // Return zero instead of None to avoid null pointer dereference
1015                    // The pending calculation will be retried when exchange rate data becomes available
1016                    return Some(Money::new(0.0, currency));
1017                };
1018
1019                let scale = 10f64.powi(currency.precision.into());
1020                pnl = ((pnl * xrate) * scale).round() / scale;
1021            }
1022
1023            total_pnl += pnl;
1024        }
1025
1026        Some(Money::new(total_pnl, currency))
1027    }
1028
1029    fn get_price(&self, position: &Position) -> Option<Price> {
1030        let price_type = match position.side {
1031            PositionSide::Long => PriceType::Bid,
1032            PositionSide::Short => PriceType::Ask,
1033            _ => panic!("invalid `PositionSide`, was {}", position.side),
1034        };
1035
1036        let cache = self.cache.borrow();
1037
1038        let instrument_id = &position.instrument_id;
1039        cache
1040            .price(instrument_id, price_type)
1041            .or_else(|| cache.price(instrument_id, PriceType::Last))
1042            .or_else(|| {
1043                self.inner
1044                    .borrow()
1045                    .bar_close_prices
1046                    .get(instrument_id)
1047                    .copied()
1048            })
1049    }
1050
1051    fn calculate_xrate_to_base(
1052        &self,
1053        instrument: &InstrumentAny,
1054        account: &AccountAny,
1055        side: OrderSide,
1056    ) -> Option<f64> {
1057        if !self.config.convert_to_account_base_currency {
1058            return Some(1.0); // No conversion needed
1059        }
1060
1061        match account.base_currency() {
1062            None => Some(1.0), // No conversion needed
1063            Some(base_currency) => {
1064                let cache = self.cache.borrow();
1065
1066                if self.config.use_mark_xrates {
1067                    return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
1068                }
1069
1070                let price_type = if side == OrderSide::Buy {
1071                    PriceType::Bid
1072                } else {
1073                    PriceType::Ask
1074                };
1075
1076                cache.get_xrate(
1077                    instrument.id().venue,
1078                    instrument.settlement_currency(),
1079                    base_currency,
1080                    price_type,
1081                )
1082            }
1083        }
1084    }
1085}
1086
1087// Helper functions
1088fn update_quote_tick(
1089    cache: Rc<RefCell<Cache>>,
1090    clock: Rc<RefCell<dyn Clock>>,
1091    inner: Rc<RefCell<PortfolioState>>,
1092    quote: &QuoteTick,
1093) {
1094    update_instrument_id(cache, clock.clone(), inner, &quote.instrument_id);
1095}
1096
1097fn update_bar(
1098    cache: Rc<RefCell<Cache>>,
1099    clock: Rc<RefCell<dyn Clock>>,
1100    inner: Rc<RefCell<PortfolioState>>,
1101    bar: &Bar,
1102) {
1103    let instrument_id = bar.bar_type.instrument_id();
1104    inner
1105        .borrow_mut()
1106        .bar_close_prices
1107        .insert(instrument_id, bar.close);
1108    update_instrument_id(cache, clock.clone(), inner, &instrument_id);
1109}
1110
1111fn update_instrument_id(
1112    cache: Rc<RefCell<Cache>>,
1113    clock: Rc<RefCell<dyn Clock>>,
1114    inner: Rc<RefCell<PortfolioState>>,
1115    instrument_id: &InstrumentId,
1116) {
1117    inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1118
1119    if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1120        return;
1121    }
1122
1123    let result_init;
1124    let mut result_maint = None;
1125
1126    let account = {
1127        let cache_ref = cache.borrow();
1128        let account = if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1129            account
1130        } else {
1131            log::error!(
1132                "Cannot update tick: no account registered for {}",
1133                instrument_id.venue
1134            );
1135            return;
1136        };
1137
1138        let mut cache_ref = cache.borrow_mut();
1139        let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1140            instrument.clone()
1141        } else {
1142            log::error!("Cannot update tick: no instrument found for {instrument_id}");
1143            return;
1144        };
1145
1146        // Clone the orders and positions to own the data
1147        let orders_open: Vec<OrderAny> = cache_ref
1148            .orders_open(None, Some(instrument_id), None, None)
1149            .iter()
1150            .map(|o| (*o).clone())
1151            .collect();
1152
1153        let positions_open: Vec<Position> = cache_ref
1154            .positions_open(None, Some(instrument_id), None, None)
1155            .iter()
1156            .map(|p| (*p).clone())
1157            .collect();
1158
1159        result_init = inner.borrow().accounts.update_orders(
1160            account,
1161            instrument.clone(),
1162            orders_open.iter().collect(),
1163            clock.borrow().timestamp_ns(),
1164        );
1165
1166        if let AccountAny::Margin(margin_account) = account {
1167            result_maint = inner.borrow().accounts.update_positions(
1168                margin_account,
1169                instrument,
1170                positions_open.iter().collect(),
1171                clock.borrow().timestamp_ns(),
1172            );
1173        }
1174
1175        if let Some((ref updated_account, _)) = result_init {
1176            cache_ref.add_account(updated_account.clone()).unwrap(); // Temp Fix to update the mutated account
1177        }
1178        account.clone()
1179    };
1180
1181    let mut portfolio_clone = Portfolio {
1182        clock: clock.clone(),
1183        cache,
1184        inner: inner.clone(),
1185        config: PortfolioConfig::default(), // TODO: TBD
1186    };
1187
1188    let result_unrealized_pnl: Option<Money> =
1189        portfolio_clone.calculate_unrealized_pnl(instrument_id);
1190
1191    if result_init.is_some()
1192        && (matches!(account, AccountAny::Cash(_))
1193            || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1194    {
1195        inner.borrow_mut().pending_calcs.remove(instrument_id);
1196        if inner.borrow().pending_calcs.is_empty() {
1197            inner.borrow_mut().initialized = true;
1198        }
1199    }
1200}
1201
1202fn update_order(
1203    cache: Rc<RefCell<Cache>>,
1204    clock: Rc<RefCell<dyn Clock>>,
1205    inner: Rc<RefCell<PortfolioState>>,
1206    event: &OrderEventAny,
1207) {
1208    let cache_ref = cache.borrow();
1209    let account_id = match event.account_id() {
1210        Some(account_id) => account_id,
1211        None => {
1212            return; // No Account Assigned
1213        }
1214    };
1215
1216    let account = if let Some(account) = cache_ref.account(&account_id) {
1217        account
1218    } else {
1219        log::error!("Cannot update order: no account registered for {account_id}");
1220        return;
1221    };
1222
1223    match account {
1224        AccountAny::Cash(cash_account) => {
1225            if !cash_account.base.calculate_account_state {
1226                return;
1227            }
1228        }
1229        AccountAny::Margin(margin_account) => {
1230            if !margin_account.base.calculate_account_state {
1231                return;
1232            }
1233        }
1234    }
1235
1236    match event {
1237        OrderEventAny::Accepted(_)
1238        | OrderEventAny::Canceled(_)
1239        | OrderEventAny::Rejected(_)
1240        | OrderEventAny::Updated(_)
1241        | OrderEventAny::Filled(_) => {}
1242        _ => {
1243            return;
1244        }
1245    }
1246
1247    let cache_ref = cache.borrow();
1248    let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1249        order
1250    } else {
1251        log::error!(
1252            "Cannot update order: {} not found in the cache",
1253            event.client_order_id()
1254        );
1255        return; // No Order Found
1256    };
1257
1258    if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1259        return; // No change to account state
1260    }
1261
1262    let instrument = if let Some(instrument_id) = cache_ref.instrument(&event.instrument_id()) {
1263        instrument_id
1264    } else {
1265        log::error!(
1266            "Cannot update order: no instrument found for {}",
1267            event.instrument_id()
1268        );
1269        return;
1270    };
1271
1272    if let OrderEventAny::Filled(order_filled) = event {
1273        let _ = inner.borrow().accounts.update_balances(
1274            account.clone(),
1275            instrument.clone(),
1276            *order_filled,
1277        );
1278
1279        let mut portfolio_clone = Portfolio {
1280            clock: clock.clone(),
1281            cache: cache.clone(),
1282            inner: inner.clone(),
1283            config: PortfolioConfig::default(), // TODO: TBD
1284        };
1285
1286        match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1287            Some(unrealized_pnl) => {
1288                inner
1289                    .borrow_mut()
1290                    .unrealized_pnls
1291                    .insert(event.instrument_id(), unrealized_pnl);
1292            }
1293            None => {
1294                log::error!(
1295                    "Failed to calculate unrealized PnL for instrument {}",
1296                    event.instrument_id()
1297                );
1298            }
1299        }
1300    }
1301
1302    let orders_open = cache_ref.orders_open(None, Some(&event.instrument_id()), None, None);
1303
1304    let account_state = inner.borrow_mut().accounts.update_orders(
1305        account,
1306        instrument.clone(),
1307        orders_open,
1308        clock.borrow().timestamp_ns(),
1309    );
1310
1311    let mut cache_ref = cache.borrow_mut();
1312    cache_ref.update_account(account.clone()).unwrap();
1313
1314    if let Some(account_state) = account_state {
1315        msgbus::publish(
1316            format!("events.account.{}", account.id()).into(),
1317            &account_state,
1318        );
1319    } else {
1320        log::debug!("Added pending calculation for {}", instrument.id());
1321        inner.borrow_mut().pending_calcs.insert(instrument.id());
1322    }
1323
1324    log::debug!("Updated {event}");
1325}
1326
1327fn update_position(
1328    cache: Rc<RefCell<Cache>>,
1329    clock: Rc<RefCell<dyn Clock>>,
1330    inner: Rc<RefCell<PortfolioState>>,
1331    event: &PositionEvent,
1332) {
1333    let instrument_id = event.instrument_id();
1334
1335    let positions_open: Vec<Position> = {
1336        let cache_ref = cache.borrow();
1337
1338        cache_ref
1339            .positions_open(None, Some(&instrument_id), None, None)
1340            .iter()
1341            .map(|o| (*o).clone())
1342            .collect()
1343    };
1344
1345    log::debug!("postion fresh from cache -> {positions_open:?}");
1346
1347    let mut portfolio_clone = Portfolio {
1348        clock: clock.clone(),
1349        cache: cache.clone(),
1350        inner: inner.clone(),
1351        config: PortfolioConfig::default(), // TODO: TBD
1352    };
1353
1354    portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1355
1356    let calculated_unrealized_pnl = portfolio_clone
1357        .calculate_unrealized_pnl(&instrument_id)
1358        .expect("Failed to calculate unrealized PnL");
1359    let calculated_realized_pnl = portfolio_clone
1360        .calculate_realized_pnl(&instrument_id)
1361        .expect("Failed to calculate realized PnL");
1362
1363    inner
1364        .borrow_mut()
1365        .unrealized_pnls
1366        .insert(event.instrument_id(), calculated_unrealized_pnl);
1367    inner
1368        .borrow_mut()
1369        .realized_pnls
1370        .insert(event.instrument_id(), calculated_realized_pnl);
1371
1372    let cache_ref = cache.borrow();
1373    let account = cache_ref.account(&event.account_id());
1374
1375    if let Some(AccountAny::Margin(margin_account)) = account {
1376        if !margin_account.calculate_account_state {
1377            return; // Nothing to calculate
1378        }
1379
1380        let cache_ref = cache.borrow();
1381        let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
1382            instrument
1383        } else {
1384            log::error!("Cannot update position: no instrument found for {instrument_id}");
1385            return;
1386        };
1387
1388        let result = inner.borrow_mut().accounts.update_positions(
1389            margin_account,
1390            instrument.clone(),
1391            positions_open.iter().collect(),
1392            clock.borrow().timestamp_ns(),
1393        );
1394        let mut cache_ref = cache.borrow_mut();
1395        if let Some((margin_account, _)) = result {
1396            cache_ref
1397                .add_account(AccountAny::Margin(margin_account)) // Temp Fix to update the mutated account
1398                .unwrap();
1399        }
1400    } else if account.is_none() {
1401        log::error!(
1402            "Cannot update position: no account registered for {}",
1403            event.account_id()
1404        );
1405    }
1406}
1407
1408fn update_account(
1409    cache: Rc<RefCell<Cache>>,
1410    inner: Rc<RefCell<PortfolioState>>,
1411    event: &AccountState,
1412) {
1413    let mut cache_ref = cache.borrow_mut();
1414
1415    if let Some(existing) = cache_ref.account(&event.account_id) {
1416        let mut account = existing.clone();
1417        account.apply(event.clone());
1418
1419        if let Err(e) = cache_ref.update_account(account.clone()) {
1420            log::error!("Failed to update account: {e}");
1421            return;
1422        }
1423    } else {
1424        let account = match AccountAny::from_events(vec![event.clone()]) {
1425            Ok(account) => account,
1426            Err(e) => {
1427                log::error!("Failed to create account: {e}");
1428                return;
1429            }
1430        };
1431
1432        if let Err(e) = cache_ref.add_account(account) {
1433            log::error!("Failed to add account: {e}");
1434            return;
1435        }
1436    }
1437
1438    // Throttled logging logic
1439    let mut inner_ref = inner.borrow_mut();
1440    let should_log = if inner_ref.min_account_state_logging_interval_ns > 0 {
1441        let current_ts = event.ts_init.as_u64();
1442        let last_ts = inner_ref
1443            .last_account_state_log_ts
1444            .get(&event.account_id)
1445            .copied()
1446            .unwrap_or(0);
1447
1448        if last_ts == 0 || (current_ts - last_ts) >= inner_ref.min_account_state_logging_interval_ns
1449        {
1450            inner_ref
1451                .last_account_state_log_ts
1452                .insert(event.account_id, current_ts);
1453            true
1454        } else {
1455            false
1456        }
1457    } else {
1458        true // Throttling disabled, always log
1459    };
1460
1461    if should_log {
1462        log::info!("Updated {event}");
1463    }
1464}