nautilus_portfolio/
portfolio.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// TODO: Under development
17#![allow(dead_code)] // For PortfolioConfig
18
19//! Provides a generic `Portfolio` for all environments.
20use std::{
21    cell::RefCell,
22    collections::{HashMap, HashSet},
23    rc::Rc,
24};
25
26use nautilus_analysis::analyzer::PortfolioAnalyzer;
27use nautilus_common::{
28    cache::Cache,
29    clock::Clock,
30    msgbus::{
31        self,
32        handler::{ShareableMessageHandler, TypedMessageHandler},
33    },
34};
35use nautilus_model::{
36    accounts::AccountAny,
37    data::{Bar, QuoteTick},
38    enums::{OrderSide, OrderType, PositionSide, PriceType},
39    events::{AccountState, OrderEventAny, position::PositionEvent},
40    identifiers::{InstrumentId, Venue},
41    instruments::{Instrument, InstrumentAny},
42    orders::{Order, OrderAny},
43    position::Position,
44    types::{Currency, Money, Price},
45};
46use rust_decimal::{Decimal, prelude::FromPrimitive};
47use ustr::Ustr;
48
49use crate::{config::PortfolioConfig, manager::AccountsManager};
50
51struct PortfolioState {
52    accounts: AccountsManager,
53    analyzer: PortfolioAnalyzer,
54    unrealized_pnls: HashMap<InstrumentId, Money>,
55    realized_pnls: HashMap<InstrumentId, Money>,
56    net_positions: HashMap<InstrumentId, Decimal>,
57    pending_calcs: HashSet<InstrumentId>,
58    bar_close_prices: HashMap<InstrumentId, Price>,
59    initialized: bool,
60}
61
62impl PortfolioState {
63    fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
64        Self {
65            accounts: AccountsManager::new(clock, cache),
66            analyzer: PortfolioAnalyzer::default(),
67            unrealized_pnls: HashMap::new(),
68            realized_pnls: HashMap::new(),
69            net_positions: HashMap::new(),
70            pending_calcs: HashSet::new(),
71            bar_close_prices: HashMap::new(),
72            initialized: false,
73        }
74    }
75
76    fn reset(&mut self) {
77        log::debug!("RESETTING");
78        self.net_positions.clear();
79        self.unrealized_pnls.clear();
80        self.realized_pnls.clear();
81        self.pending_calcs.clear();
82        self.analyzer.reset();
83        log::debug!("READY");
84    }
85}
86
87pub struct Portfolio {
88    pub(crate) clock: Rc<RefCell<dyn Clock>>,
89    pub(crate) cache: Rc<RefCell<Cache>>,
90    inner: Rc<RefCell<PortfolioState>>,
91    config: PortfolioConfig,
92}
93
94impl Portfolio {
95    pub fn new(
96        cache: Rc<RefCell<Cache>>,
97        clock: Rc<RefCell<dyn Clock>>,
98        config: Option<PortfolioConfig>,
99    ) -> Self {
100        let inner = Rc::new(RefCell::new(PortfolioState::new(
101            clock.clone(),
102            cache.clone(),
103        )));
104        let config = config.unwrap_or_default();
105
106        Self::register_message_handlers(
107            cache.clone(),
108            clock.clone(),
109            inner.clone(),
110            config.bar_updates,
111        );
112
113        Self {
114            clock,
115            cache,
116            inner,
117            config,
118        }
119    }
120
121    fn register_message_handlers(
122        cache: Rc<RefCell<Cache>>,
123        clock: Rc<RefCell<dyn Clock>>,
124        inner: Rc<RefCell<PortfolioState>>,
125        bar_updates: bool,
126    ) {
127        let update_account_handler = {
128            let cache = cache.clone();
129            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
130                move |event: &AccountState| {
131                    update_account(cache.clone(), event);
132                },
133            )))
134        };
135
136        let update_position_handler = {
137            let cache = cache.clone();
138            let clock = clock.clone();
139            let inner = inner.clone();
140            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
141                move |event: &PositionEvent| {
142                    update_position(cache.clone(), clock.clone(), inner.clone(), event);
143                },
144            )))
145        };
146
147        let update_quote_handler = {
148            let cache = cache.clone();
149            let clock = clock.clone();
150            let inner = inner.clone();
151            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
152                move |quote: &QuoteTick| {
153                    update_quote_tick(cache.clone(), clock.clone(), inner.clone(), quote);
154                },
155            )))
156        };
157
158        let update_bar_handler = {
159            let cache = cache.clone();
160            let clock = clock.clone();
161            let inner = inner.clone();
162            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
163                update_bar(cache.clone(), clock.clone(), inner.clone(), bar);
164            })))
165        };
166
167        let update_order_handler = {
168            let cache = cache;
169            let clock = clock.clone();
170            let inner = inner;
171            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
172                move |event: &OrderEventAny| {
173                    update_order(cache.clone(), clock.clone(), inner.clone(), event);
174                },
175            )))
176        };
177
178        msgbus::register("Portfolio.update_account", update_account_handler.clone());
179
180        msgbus::subscribe("data.quotes.*", update_quote_handler, Some(10));
181        if bar_updates {
182            msgbus::subscribe("data.quotes.*EXTERNAL", update_bar_handler, Some(10));
183        }
184        msgbus::subscribe("events.order.*", update_order_handler, Some(10));
185        msgbus::subscribe("events.position.*", update_position_handler, Some(10));
186        msgbus::subscribe("events.account.*", update_account_handler, Some(10));
187    }
188
189    pub fn reset(&mut self) {
190        log::debug!("RESETTING");
191        self.inner.borrow_mut().reset();
192        log::debug!("READY");
193    }
194
195    // -- QUERIES ---------------------------------------------------------------------------------
196
197    #[must_use]
198    pub fn is_initialized(&self) -> bool {
199        self.inner.borrow().initialized
200    }
201
202    #[must_use]
203    pub fn balances_locked(&self, venue: &Venue) -> HashMap<Currency, Money> {
204        self.cache.borrow().account_for_venue(venue).map_or_else(
205            || {
206                log::error!("Cannot get balances locked: no account generated for {venue}");
207                HashMap::new()
208            },
209            AccountAny::balances_locked,
210        )
211    }
212
213    #[must_use]
214    pub fn margins_init(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
215        self.cache.borrow().account_for_venue(venue).map_or_else(
216            || {
217                log::error!(
218                    "Cannot get initial (order) margins: no account registered for {venue}"
219                );
220                HashMap::new()
221            },
222            |account| match account {
223                AccountAny::Margin(margin_account) => margin_account.initial_margins(),
224                AccountAny::Cash(_) => {
225                    log::warn!("Initial margins not applicable for cash account");
226                    HashMap::new()
227                }
228            },
229        )
230    }
231
232    #[must_use]
233    pub fn margins_maint(&self, venue: &Venue) -> HashMap<InstrumentId, Money> {
234        self.cache.borrow().account_for_venue(venue).map_or_else(
235            || {
236                log::error!(
237                    "Cannot get maintenance (position) margins: no account registered for {venue}"
238                );
239                HashMap::new()
240            },
241            |account| match account {
242                AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
243                AccountAny::Cash(_) => {
244                    log::warn!("Maintenance margins not applicable for cash account");
245                    HashMap::new()
246                }
247            },
248        )
249    }
250
251    #[must_use]
252    pub fn unrealized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
253        let instrument_ids = {
254            let cache = self.cache.borrow();
255            let positions = cache.positions(Some(venue), None, None, None);
256
257            if positions.is_empty() {
258                return HashMap::new(); // Nothing to calculate
259            }
260
261            let instrument_ids: HashSet<InstrumentId> =
262                positions.iter().map(|p| p.instrument_id).collect();
263
264            instrument_ids
265        };
266
267        let mut unrealized_pnls: HashMap<Currency, f64> = HashMap::new();
268
269        for instrument_id in instrument_ids {
270            if let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id) {
271                // PnL already calculated
272                *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
273                continue;
274            }
275
276            // Calculate PnL
277            match self.calculate_unrealized_pnl(&instrument_id) {
278                Some(pnl) => *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
279                None => continue,
280            }
281        }
282
283        unrealized_pnls
284            .into_iter()
285            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
286            .collect()
287    }
288
289    #[must_use]
290    pub fn realized_pnls(&mut self, venue: &Venue) -> HashMap<Currency, Money> {
291        let instrument_ids = {
292            let cache = self.cache.borrow();
293            let positions = cache.positions(Some(venue), None, None, None);
294
295            if positions.is_empty() {
296                return HashMap::new(); // Nothing to calculate
297            }
298
299            let instrument_ids: HashSet<InstrumentId> =
300                positions.iter().map(|p| p.instrument_id).collect();
301
302            instrument_ids
303        };
304
305        let mut realized_pnls: HashMap<Currency, f64> = HashMap::new();
306
307        for instrument_id in instrument_ids {
308            if let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id) {
309                // PnL already calculated
310                *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
311                continue;
312            }
313
314            // Calculate PnL
315            match self.calculate_realized_pnl(&instrument_id) {
316                Some(pnl) => *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64(),
317                None => continue,
318            }
319        }
320
321        realized_pnls
322            .into_iter()
323            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
324            .collect()
325    }
326
327    #[must_use]
328    pub fn net_exposures(&self, venue: &Venue) -> Option<HashMap<Currency, Money>> {
329        let cache = self.cache.borrow();
330        let account = if let Some(account) = cache.account_for_venue(venue) {
331            account
332        } else {
333            log::error!("Cannot calculate net exposures: no account registered for {venue}");
334            return None; // Cannot calculate
335        };
336
337        let positions_open = cache.positions_open(Some(venue), None, None, None);
338        if positions_open.is_empty() {
339            return Some(HashMap::new()); // Nothing to calculate
340        }
341
342        let mut net_exposures: HashMap<Currency, f64> = HashMap::new();
343
344        for position in positions_open {
345            let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
346                instrument
347            } else {
348                log::error!(
349                    "Cannot calculate net exposures: no instrument for {}",
350                    position.instrument_id
351                );
352                return None; // Cannot calculate
353            };
354
355            if position.side == PositionSide::Flat {
356                log::error!(
357                    "Cannot calculate net exposures: position is flat for {}",
358                    position.instrument_id
359                );
360                continue; // Nothing to calculate
361            }
362
363            let price = self.get_price(position)?;
364            let xrate = if let Some(xrate) =
365                self.calculate_xrate_to_base(instrument, account, position.entry)
366            {
367                xrate
368            } else {
369                log::error!(
370                    // TODO: Improve logging
371                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
372                    instrument.settlement_currency(),
373                    account.base_currency()
374                );
375                return None; // Cannot calculate
376            };
377
378            let settlement_currency = account
379                .base_currency()
380                .unwrap_or_else(|| instrument.settlement_currency());
381
382            let net_exposure = instrument
383                .calculate_notional_value(position.quantity, price, None)
384                .as_f64()
385                * xrate;
386
387            let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
388                .round()
389                / 10f64.powi(settlement_currency.precision.into());
390
391            *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
392        }
393
394        Some(
395            net_exposures
396                .into_iter()
397                .map(|(currency, amount)| (currency, Money::new(amount, currency)))
398                .collect(),
399        )
400    }
401
402    #[must_use]
403    pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
404        if let Some(pnl) = self
405            .inner
406            .borrow()
407            .unrealized_pnls
408            .get(instrument_id)
409            .copied()
410        {
411            return Some(pnl);
412        }
413
414        let pnl = self.calculate_unrealized_pnl(instrument_id)?;
415        self.inner
416            .borrow_mut()
417            .unrealized_pnls
418            .insert(*instrument_id, pnl);
419        Some(pnl)
420    }
421
422    #[must_use]
423    pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
424        if let Some(pnl) = self
425            .inner
426            .borrow()
427            .realized_pnls
428            .get(instrument_id)
429            .copied()
430        {
431            return Some(pnl);
432        }
433
434        let pnl = self.calculate_realized_pnl(instrument_id)?;
435        self.inner
436            .borrow_mut()
437            .realized_pnls
438            .insert(*instrument_id, pnl);
439        Some(pnl)
440    }
441
442    #[must_use]
443    pub fn net_exposure(&self, instrument_id: &InstrumentId) -> Option<Money> {
444        let cache = self.cache.borrow();
445        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
446            account
447        } else {
448            log::error!(
449                "Cannot calculate net exposure: no account registered for {}",
450                instrument_id.venue
451            );
452            return None;
453        };
454
455        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
456            instrument
457        } else {
458            log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
459            return None;
460        };
461
462        let positions_open = cache.positions_open(
463            None, // Faster query filtering
464            Some(instrument_id),
465            None,
466            None,
467        );
468
469        if positions_open.is_empty() {
470            return Some(Money::new(0.0, instrument.settlement_currency()));
471        }
472
473        let mut net_exposure = 0.0;
474
475        for position in positions_open {
476            let price = self.get_price(position)?;
477            let xrate = if let Some(xrate) =
478                self.calculate_xrate_to_base(instrument, account, position.entry)
479            {
480                xrate
481            } else {
482                log::error!(
483                    // TODO: Improve logging
484                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
485                    instrument.settlement_currency(),
486                    account.base_currency()
487                );
488                return None; // Cannot calculate
489            };
490
491            let notional_value =
492                instrument.calculate_notional_value(position.quantity, price, None);
493            net_exposure += notional_value.as_f64() * xrate;
494        }
495
496        let settlement_currency = account
497            .base_currency()
498            .unwrap_or_else(|| instrument.settlement_currency());
499
500        Some(Money::new(net_exposure, settlement_currency))
501    }
502
503    #[must_use]
504    pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
505        self.inner
506            .borrow()
507            .net_positions
508            .get(instrument_id)
509            .copied()
510            .unwrap_or(Decimal::ZERO)
511    }
512
513    #[must_use]
514    pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
515        self.inner
516            .borrow()
517            .net_positions
518            .get(instrument_id)
519            .copied()
520            .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
521    }
522
523    #[must_use]
524    pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
525        self.inner
526            .borrow()
527            .net_positions
528            .get(instrument_id)
529            .copied()
530            .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
531    }
532
533    #[must_use]
534    pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
535        self.inner
536            .borrow()
537            .net_positions
538            .get(instrument_id)
539            .copied()
540            .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
541    }
542
543    #[must_use]
544    pub fn is_completely_flat(&self) -> bool {
545        for net_position in self.inner.borrow().net_positions.values() {
546            if *net_position != Decimal::ZERO {
547                return false;
548            }
549        }
550        true
551    }
552
553    // -- COMMANDS --------------------------------------------------------------------------------
554
555    pub fn initialize_orders(&mut self) {
556        let mut initialized = true;
557        let orders_and_instruments = {
558            let cache = self.cache.borrow();
559            let all_orders_open = cache.orders_open(None, None, None, None);
560
561            let mut instruments_with_orders = Vec::new();
562            let mut instruments = HashSet::new();
563
564            for order in &all_orders_open {
565                instruments.insert(order.instrument_id());
566            }
567
568            for instrument_id in instruments {
569                if let Some(instrument) = cache.instrument(&instrument_id) {
570                    let orders = cache
571                        .orders_open(None, Some(&instrument_id), None, None)
572                        .into_iter()
573                        .cloned()
574                        .collect::<Vec<OrderAny>>();
575                    instruments_with_orders.push((instrument.clone(), orders));
576                } else {
577                    log::error!(
578                        "Cannot update initial (order) margin: no instrument found for {instrument_id}"
579                    );
580                    initialized = false;
581                    break;
582                }
583            }
584            instruments_with_orders
585        };
586
587        for (instrument, orders_open) in &orders_and_instruments {
588            let mut cache = self.cache.borrow_mut();
589            let account = if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
590                account
591            } else {
592                log::error!(
593                    "Cannot update initial (order) margin: no account registered for {}",
594                    instrument.id().venue
595                );
596                initialized = false;
597                break;
598            };
599
600            let result = self.inner.borrow_mut().accounts.update_orders(
601                account,
602                instrument.clone(),
603                orders_open.iter().collect(),
604                self.clock.borrow().timestamp_ns(),
605            );
606
607            match result {
608                Some((updated_account, _)) => {
609                    cache.add_account(updated_account).unwrap(); // Temp Fix to update the mutated account
610                }
611                None => {
612                    initialized = false;
613                }
614            }
615        }
616
617        let total_orders = orders_and_instruments
618            .into_iter()
619            .map(|(_, orders)| orders.len())
620            .sum::<usize>();
621
622        log::info!(
623            "Initialized {} open order{}",
624            total_orders,
625            if total_orders == 1 { "" } else { "s" }
626        );
627
628        self.inner.borrow_mut().initialized = initialized;
629    }
630
631    pub fn initialize_positions(&mut self) {
632        self.inner.borrow_mut().unrealized_pnls.clear();
633        self.inner.borrow_mut().realized_pnls.clear();
634        let all_positions_open: Vec<Position>;
635        let mut instruments = HashSet::new();
636        {
637            let cache = self.cache.borrow();
638            all_positions_open = cache
639                .positions_open(None, None, None, None)
640                .into_iter()
641                .cloned()
642                .collect();
643            for position in &all_positions_open {
644                instruments.insert(position.instrument_id);
645            }
646        }
647
648        let mut initialized = true;
649
650        for instrument_id in instruments {
651            let positions_open: Vec<Position> = {
652                let cache = self.cache.borrow();
653                cache
654                    .positions_open(None, Some(&instrument_id), None, None)
655                    .into_iter()
656                    .cloned()
657                    .collect()
658            };
659
660            self.update_net_position(&instrument_id, positions_open);
661
662            let calculated_unrealized_pnl = self
663                .calculate_unrealized_pnl(&instrument_id)
664                .expect("Failed to calculate unrealized PnL");
665            let calculated_realized_pnl = self
666                .calculate_realized_pnl(&instrument_id)
667                .expect("Failed to calculate realized PnL");
668
669            self.inner
670                .borrow_mut()
671                .unrealized_pnls
672                .insert(instrument_id, calculated_unrealized_pnl);
673            self.inner
674                .borrow_mut()
675                .realized_pnls
676                .insert(instrument_id, calculated_realized_pnl);
677
678            let cache = self.cache.borrow();
679            let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
680                account
681            } else {
682                log::error!(
683                    "Cannot update maintenance (position) margin: no account registered for {}",
684                    instrument_id.venue
685                );
686                initialized = false;
687                break;
688            };
689
690            let account = match account {
691                AccountAny::Cash(_) => continue,
692                AccountAny::Margin(margin_account) => margin_account,
693            };
694
695            let mut cache = self.cache.borrow_mut();
696            let instrument = if let Some(instrument) = cache.instrument(&instrument_id) {
697                instrument
698            } else {
699                log::error!(
700                    "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
701                );
702                initialized = false;
703                break;
704            };
705
706            let result = self.inner.borrow_mut().accounts.update_positions(
707                account,
708                instrument.clone(),
709                self.cache
710                    .borrow()
711                    .positions_open(None, Some(&instrument_id), None, None),
712                self.clock.borrow().timestamp_ns(),
713            );
714
715            match result {
716                Some((updated_account, _)) => {
717                    cache
718                        .add_account(AccountAny::Margin(updated_account)) // Temp Fix to update the mutated account
719                        .unwrap();
720                }
721                None => {
722                    initialized = false;
723                }
724            }
725        }
726
727        let open_count = all_positions_open.len();
728        self.inner.borrow_mut().initialized = initialized;
729        log::info!(
730            "Initialized {} open position{}",
731            open_count,
732            if open_count == 1 { "" } else { "s" }
733        );
734    }
735
736    pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
737        update_quote_tick(
738            self.cache.clone(),
739            self.clock.clone(),
740            self.inner.clone(),
741            quote,
742        );
743    }
744
745    pub fn update_bar(&mut self, bar: &Bar) {
746        update_bar(
747            self.cache.clone(),
748            self.clock.clone(),
749            self.inner.clone(),
750            bar,
751        );
752    }
753
754    pub fn update_account(&mut self, event: &AccountState) {
755        update_account(self.cache.clone(), event);
756    }
757
758    pub fn update_order(&mut self, event: &OrderEventAny) {
759        update_order(
760            self.cache.clone(),
761            self.clock.clone(),
762            self.inner.clone(),
763            event,
764        );
765    }
766
767    pub fn update_position(&mut self, event: &PositionEvent) {
768        update_position(
769            self.cache.clone(),
770            self.clock.clone(),
771            self.inner.clone(),
772            event,
773        );
774    }
775
776    // -- INTERNAL --------------------------------------------------------------------------------
777
778    fn update_net_position(&mut self, instrument_id: &InstrumentId, positions_open: Vec<Position>) {
779        let mut net_position = Decimal::ZERO;
780
781        for open_position in positions_open {
782            log::debug!("open_position: {open_position}");
783            net_position += Decimal::from_f64(open_position.signed_qty).unwrap_or(Decimal::ZERO);
784        }
785
786        let existing_position = self.net_position(instrument_id);
787        if existing_position != net_position {
788            self.inner
789                .borrow_mut()
790                .net_positions
791                .insert(*instrument_id, net_position);
792            log::info!("{instrument_id} net_position={net_position}");
793        }
794    }
795
796    fn calculate_unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
797        let cache = self.cache.borrow();
798        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
799            account
800        } else {
801            log::error!(
802                "Cannot calculate unrealized PnL: no account registered for {}",
803                instrument_id.venue
804            );
805            return None;
806        };
807
808        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
809            instrument
810        } else {
811            log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
812            return None;
813        };
814
815        let currency = account
816            .base_currency()
817            .unwrap_or_else(|| instrument.settlement_currency());
818
819        let positions_open = cache.positions_open(
820            None, // Faster query filtering
821            Some(instrument_id),
822            None,
823            None,
824        );
825
826        if positions_open.is_empty() {
827            return Some(Money::new(0.0, currency));
828        }
829
830        let mut total_pnl = 0.0;
831
832        for position in positions_open {
833            if position.instrument_id != *instrument_id {
834                continue; // Nothing to calculate
835            }
836
837            if position.side == PositionSide::Flat {
838                continue; // Nothing to calculate
839            }
840
841            let price = if let Some(price) = self.get_price(position) {
842                price
843            } else {
844                log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
845                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
846                return None; // Cannot calculate
847            };
848
849            let mut pnl = position.unrealized_pnl(price).as_f64();
850
851            if let Some(base_currency) = account.base_currency() {
852                let xrate = if let Some(xrate) =
853                    self.calculate_xrate_to_base(instrument, account, position.entry)
854                {
855                    xrate
856                } else {
857                    log::error!(
858                        // TODO: Improve logging
859                        "Cannot calculate unrealized PnL: insufficient data for {}/{}",
860                        instrument.settlement_currency(),
861                        base_currency
862                    );
863                    self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
864                    return None; // Cannot calculate
865                };
866
867                let scale = 10f64.powi(currency.precision.into());
868                pnl = ((pnl * xrate) * scale).round() / scale;
869            }
870
871            total_pnl += pnl;
872        }
873
874        Some(Money::new(total_pnl, currency))
875    }
876
877    fn calculate_realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
878        let cache = self.cache.borrow();
879        let account = if let Some(account) = cache.account_for_venue(&instrument_id.venue) {
880            account
881        } else {
882            log::error!(
883                "Cannot calculate realized PnL: no account registered for {}",
884                instrument_id.venue
885            );
886            return None;
887        };
888
889        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
890            instrument
891        } else {
892            log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
893            return None;
894        };
895
896        let currency = account
897            .base_currency()
898            .unwrap_or_else(|| instrument.settlement_currency());
899
900        let positions = cache.positions(
901            None, // Faster query filtering
902            Some(instrument_id),
903            None,
904            None,
905        );
906
907        if positions.is_empty() {
908            return Some(Money::new(0.0, currency));
909        }
910
911        let mut total_pnl = 0.0;
912
913        for position in positions {
914            if position.instrument_id != *instrument_id {
915                continue; // Nothing to calculate
916            }
917
918            if position.realized_pnl.is_none() {
919                continue; // Nothing to calculate
920            }
921
922            let mut pnl = position.realized_pnl?.as_f64();
923
924            if let Some(base_currency) = account.base_currency() {
925                let xrate = if let Some(xrate) =
926                    self.calculate_xrate_to_base(instrument, account, position.entry)
927                {
928                    xrate
929                } else {
930                    log::error!(
931                        // TODO: Improve logging
932                        "Cannot calculate realized PnL: insufficient data for {}/{}",
933                        instrument.settlement_currency(),
934                        base_currency
935                    );
936                    self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
937                    return None; // Cannot calculate
938                };
939
940                let scale = 10f64.powi(currency.precision.into());
941                pnl = ((pnl * xrate) * scale).round() / scale;
942            }
943
944            total_pnl += pnl;
945        }
946
947        Some(Money::new(total_pnl, currency))
948    }
949
950    fn get_price(&self, position: &Position) -> Option<Price> {
951        let price_type = match position.side {
952            PositionSide::Long => PriceType::Bid,
953            PositionSide::Short => PriceType::Ask,
954            _ => panic!("invalid `PositionSide`, was {}", position.side),
955        };
956
957        let cache = self.cache.borrow();
958
959        let instrument_id = &position.instrument_id;
960        cache
961            .price(instrument_id, price_type)
962            .or_else(|| cache.price(instrument_id, PriceType::Last))
963            .or_else(|| {
964                self.inner
965                    .borrow()
966                    .bar_close_prices
967                    .get(instrument_id)
968                    .copied()
969            })
970    }
971
972    fn calculate_xrate_to_base(
973        &self,
974        instrument: &InstrumentAny,
975        account: &AccountAny,
976        side: OrderSide,
977    ) -> Option<f64> {
978        if !self.config.convert_to_account_base_currency {
979            return Some(1.0); // No conversion needed
980        }
981
982        match account.base_currency() {
983            None => Some(1.0), // No conversion needed
984            Some(base_currency) => {
985                let cache = self.cache.borrow();
986
987                if self.config.use_mark_xrates {
988                    return cache.get_mark_xrate(instrument.settlement_currency(), base_currency);
989                }
990
991                let price_type = if side == OrderSide::Buy {
992                    PriceType::Bid
993                } else {
994                    PriceType::Ask
995                };
996
997                cache.get_xrate(
998                    instrument.id().venue,
999                    instrument.settlement_currency(),
1000                    base_currency,
1001                    price_type,
1002                )
1003            }
1004        }
1005    }
1006}
1007
1008// Helper functions
1009fn update_quote_tick(
1010    cache: Rc<RefCell<Cache>>,
1011    clock: Rc<RefCell<dyn Clock>>,
1012    inner: Rc<RefCell<PortfolioState>>,
1013    quote: &QuoteTick,
1014) {
1015    update_instrument_id(cache, clock.clone(), inner, &quote.instrument_id);
1016}
1017
1018fn update_bar(
1019    cache: Rc<RefCell<Cache>>,
1020    clock: Rc<RefCell<dyn Clock>>,
1021    inner: Rc<RefCell<PortfolioState>>,
1022    bar: &Bar,
1023) {
1024    let instrument_id = bar.bar_type.instrument_id();
1025    inner
1026        .borrow_mut()
1027        .bar_close_prices
1028        .insert(instrument_id, bar.close);
1029    update_instrument_id(cache, clock.clone(), inner, &instrument_id);
1030}
1031
1032fn update_instrument_id(
1033    cache: Rc<RefCell<Cache>>,
1034    clock: Rc<RefCell<dyn Clock>>,
1035    inner: Rc<RefCell<PortfolioState>>,
1036    instrument_id: &InstrumentId,
1037) {
1038    inner.borrow_mut().unrealized_pnls.remove(instrument_id);
1039
1040    if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1041        return;
1042    }
1043
1044    let result_init;
1045    let mut result_maint = None;
1046
1047    let account = {
1048        let borrowed_cache = cache.borrow();
1049        let account = if let Some(account) = borrowed_cache.account_for_venue(&instrument_id.venue)
1050        {
1051            account
1052        } else {
1053            log::error!(
1054                "Cannot update tick: no account registered for {}",
1055                instrument_id.venue
1056            );
1057            return;
1058        };
1059
1060        let mut borrowed_cache = cache.borrow_mut();
1061        let instrument = if let Some(instrument) = borrowed_cache.instrument(instrument_id) {
1062            instrument.clone()
1063        } else {
1064            log::error!("Cannot update tick: no instrument found for {instrument_id}");
1065            return;
1066        };
1067
1068        // Clone the orders and positions to own the data
1069        let orders_open: Vec<OrderAny> = borrowed_cache
1070            .orders_open(None, Some(instrument_id), None, None)
1071            .iter()
1072            .map(|o| (*o).clone())
1073            .collect();
1074
1075        let positions_open: Vec<Position> = borrowed_cache
1076            .positions_open(None, Some(instrument_id), None, None)
1077            .iter()
1078            .map(|p| (*p).clone())
1079            .collect();
1080
1081        result_init = inner.borrow().accounts.update_orders(
1082            account,
1083            instrument.clone(),
1084            orders_open.iter().collect(),
1085            clock.borrow().timestamp_ns(),
1086        );
1087
1088        if let AccountAny::Margin(margin_account) = account {
1089            result_maint = inner.borrow().accounts.update_positions(
1090                margin_account,
1091                instrument,
1092                positions_open.iter().collect(),
1093                clock.borrow().timestamp_ns(),
1094            );
1095        }
1096
1097        if let Some((ref updated_account, _)) = result_init {
1098            borrowed_cache.add_account(updated_account.clone()).unwrap(); // Temp Fix to update the mutated account
1099        }
1100        account.clone()
1101    };
1102
1103    let mut portfolio_clone = Portfolio {
1104        clock: clock.clone(),
1105        cache,
1106        inner: inner.clone(),
1107        config: PortfolioConfig::default(), // TODO: TBD
1108    };
1109
1110    let result_unrealized_pnl: Option<Money> =
1111        portfolio_clone.calculate_unrealized_pnl(instrument_id);
1112
1113    if result_init.is_some()
1114        && (matches!(account, AccountAny::Cash(_))
1115            || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1116    {
1117        inner.borrow_mut().pending_calcs.remove(instrument_id);
1118        if inner.borrow().pending_calcs.is_empty() {
1119            inner.borrow_mut().initialized = true;
1120        }
1121    }
1122}
1123
1124fn update_order(
1125    cache: Rc<RefCell<Cache>>,
1126    clock: Rc<RefCell<dyn Clock>>,
1127    inner: Rc<RefCell<PortfolioState>>,
1128    event: &OrderEventAny,
1129) {
1130    let borrowed_cache = cache.borrow();
1131    let account_id = match event.account_id() {
1132        Some(account_id) => account_id,
1133        None => {
1134            return; // No Account Assigned
1135        }
1136    };
1137
1138    let account = if let Some(account) = borrowed_cache.account(&account_id) {
1139        account
1140    } else {
1141        log::error!("Cannot update order: no account registered for {account_id}");
1142        return;
1143    };
1144
1145    match account {
1146        AccountAny::Cash(cash_account) => {
1147            if !cash_account.base.calculate_account_state {
1148                return;
1149            }
1150        }
1151        AccountAny::Margin(margin_account) => {
1152            if !margin_account.base.calculate_account_state {
1153                return;
1154            }
1155        }
1156    }
1157
1158    match event {
1159        OrderEventAny::Accepted(_)
1160        | OrderEventAny::Canceled(_)
1161        | OrderEventAny::Rejected(_)
1162        | OrderEventAny::Updated(_)
1163        | OrderEventAny::Filled(_) => {}
1164        _ => {
1165            return;
1166        }
1167    }
1168
1169    let borrowed_cache = cache.borrow();
1170    let order = if let Some(order) = borrowed_cache.order(&event.client_order_id()) {
1171        order
1172    } else {
1173        log::error!(
1174            "Cannot update order: {} not found in the cache",
1175            event.client_order_id()
1176        );
1177        return; // No Order Found
1178    };
1179
1180    if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit {
1181        return; // No change to account state
1182    }
1183
1184    let instrument = if let Some(instrument_id) = borrowed_cache.instrument(&event.instrument_id())
1185    {
1186        instrument_id
1187    } else {
1188        log::error!(
1189            "Cannot update order: no instrument found for {}",
1190            event.instrument_id()
1191        );
1192        return;
1193    };
1194
1195    if let OrderEventAny::Filled(order_filled) = event {
1196        let _ = inner.borrow().accounts.update_balances(
1197            account.clone(),
1198            instrument.clone(),
1199            *order_filled,
1200        );
1201
1202        let mut portfolio_clone = Portfolio {
1203            clock: clock.clone(),
1204            cache: cache.clone(),
1205            inner: inner.clone(),
1206            config: PortfolioConfig::default(), // TODO: TBD
1207        };
1208
1209        match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id) {
1210            Some(unrealized_pnl) => {
1211                inner
1212                    .borrow_mut()
1213                    .unrealized_pnls
1214                    .insert(event.instrument_id(), unrealized_pnl);
1215            }
1216            None => {
1217                log::error!(
1218                    "Failed to calculate unrealized PnL for instrument {}",
1219                    event.instrument_id()
1220                );
1221            }
1222        }
1223    }
1224
1225    let orders_open = borrowed_cache.orders_open(None, Some(&event.instrument_id()), None, None);
1226
1227    let account_state = inner.borrow_mut().accounts.update_orders(
1228        account,
1229        instrument.clone(),
1230        orders_open,
1231        clock.borrow().timestamp_ns(),
1232    );
1233
1234    let mut borrowed_cache = cache.borrow_mut();
1235    borrowed_cache.update_account(account.clone()).unwrap();
1236
1237    if let Some(account_state) = account_state {
1238        msgbus::publish(
1239            &Ustr::from(&format!("events.account.{}", account.id())),
1240            &account_state,
1241        );
1242    } else {
1243        log::debug!("Added pending calculation for {}", instrument.id());
1244        inner.borrow_mut().pending_calcs.insert(instrument.id());
1245    }
1246
1247    log::debug!("Updated {event}");
1248}
1249
1250fn update_position(
1251    cache: Rc<RefCell<Cache>>,
1252    clock: Rc<RefCell<dyn Clock>>,
1253    inner: Rc<RefCell<PortfolioState>>,
1254    event: &PositionEvent,
1255) {
1256    let instrument_id = event.instrument_id();
1257
1258    let positions_open: Vec<Position> = {
1259        let borrowed_cache = cache.borrow();
1260
1261        borrowed_cache
1262            .positions_open(None, Some(&instrument_id), None, None)
1263            .iter()
1264            .map(|o| (*o).clone())
1265            .collect()
1266    };
1267
1268    log::debug!("postion fresh from cache -> {positions_open:?}");
1269
1270    let mut portfolio_clone = Portfolio {
1271        clock: clock.clone(),
1272        cache: cache.clone(),
1273        inner: inner.clone(),
1274        config: PortfolioConfig::default(), // TODO: TBD
1275    };
1276
1277    portfolio_clone.update_net_position(&instrument_id, positions_open.clone());
1278
1279    let calculated_unrealized_pnl = portfolio_clone
1280        .calculate_unrealized_pnl(&instrument_id)
1281        .expect("Failed to calculate unrealized PnL");
1282    let calculated_realized_pnl = portfolio_clone
1283        .calculate_realized_pnl(&instrument_id)
1284        .expect("Failed to calculate realized PnL");
1285
1286    inner
1287        .borrow_mut()
1288        .unrealized_pnls
1289        .insert(event.instrument_id(), calculated_unrealized_pnl);
1290    inner
1291        .borrow_mut()
1292        .realized_pnls
1293        .insert(event.instrument_id(), calculated_realized_pnl);
1294
1295    let borrowed_cache = cache.borrow();
1296    let account = borrowed_cache.account(&event.account_id());
1297
1298    if let Some(AccountAny::Margin(margin_account)) = account {
1299        if !margin_account.calculate_account_state {
1300            return; // Nothing to calculate
1301        }
1302
1303        let borrowed_cache = cache.borrow();
1304        let instrument = if let Some(instrument) = borrowed_cache.instrument(&instrument_id) {
1305            instrument
1306        } else {
1307            log::error!("Cannot update position: no instrument found for {instrument_id}");
1308            return;
1309        };
1310
1311        let result = inner.borrow_mut().accounts.update_positions(
1312            margin_account,
1313            instrument.clone(),
1314            positions_open.iter().collect(),
1315            clock.borrow().timestamp_ns(),
1316        );
1317        let mut borrowed_cache = cache.borrow_mut();
1318        if let Some((margin_account, _)) = result {
1319            borrowed_cache
1320                .add_account(AccountAny::Margin(margin_account)) // Temp Fix to update the mutated account
1321                .unwrap();
1322        }
1323    } else if account.is_none() {
1324        log::error!(
1325            "Cannot update position: no account registered for {}",
1326            event.account_id()
1327        );
1328    }
1329}
1330
1331pub fn update_account(cache: Rc<RefCell<Cache>>, event: &AccountState) {
1332    let mut borrowed_cache = cache.borrow_mut();
1333
1334    if let Some(existing) = borrowed_cache.account(&event.account_id) {
1335        let mut account = existing.clone();
1336        account.apply(event.clone());
1337
1338        if let Err(e) = borrowed_cache.update_account(account.clone()) {
1339            log::error!("Failed to update account: {e}");
1340            return;
1341        }
1342    } else {
1343        let account = match AccountAny::from_events(vec![event.clone()]) {
1344            Ok(account) => account,
1345            Err(e) => {
1346                log::error!("Failed to create account: {e}");
1347                return;
1348            }
1349        };
1350
1351        if let Err(e) = borrowed_cache.add_account(account) {
1352            log::error!("Failed to add account: {e}");
1353            return;
1354        }
1355    }
1356
1357    log::info!("Updated {event}");
1358}