nautilus_infrastructure/sql/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::VecDeque, time::Duration};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use nautilus_common::{
21    cache::database::{CacheDatabaseAdapter, CacheMap},
22    custom::CustomData,
23    logging::{log_task_awaiting, log_task_started, log_task_stopped},
24    runtime::get_runtime,
25    signal::Signal,
26};
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29    accounts::AccountAny,
30    data::{Bar, DataType, QuoteTick, TradeTick},
31    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
32    identifiers::{
33        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
34        VenueOrderId,
35    },
36    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
37    orderbook::OrderBook,
38    orders::{Order, OrderAny},
39    position::Position,
40    types::Currency,
41};
42use sqlx::{PgPool, postgres::PgConnectOptions};
43use tokio::{time::Instant, try_join};
44use ustr::Ustr;
45
46use crate::sql::{
47    pg::{connect_pg, get_postgres_connect_options},
48    queries::DatabaseQueries,
49};
50
51// Task and connection names
52const CACHE_PROCESS: &str = "cache-process";
53
54#[derive(Debug)]
55#[cfg_attr(
56    feature = "python",
57    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
58)]
59pub struct PostgresCacheDatabase {
60    pub pool: PgPool,
61    tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
62    handle: tokio::task::JoinHandle<()>,
63}
64
65#[allow(clippy::large_enum_variant)]
66#[derive(Debug, Clone)]
67pub enum DatabaseQuery {
68    Close,
69    Add(String, Vec<u8>),
70    AddCurrency(Currency),
71    AddInstrument(InstrumentAny),
72    AddOrder(OrderAny, Option<ClientId>, bool),
73    AddOrderSnapshot(OrderSnapshot),
74    AddPositionSnapshot(PositionSnapshot),
75    AddAccount(AccountAny, bool),
76    AddSignal(Signal),
77    AddCustom(CustomData),
78    AddQuote(QuoteTick),
79    AddTrade(TradeTick),
80    AddBar(Bar),
81    UpdateOrder(OrderEventAny),
82}
83
84impl PostgresCacheDatabase {
85    /// Connects to the Postgres cache database using the provided connection parameters.
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if establishing the database connection fails.
90    ///
91    /// # Panics
92    ///
93    /// Panics if the internal Postgres pool connection attempt (`connect_pg`) unwraps on error.
94    pub async fn connect(
95        host: Option<String>,
96        port: Option<u16>,
97        username: Option<String>,
98        password: Option<String>,
99        database: Option<String>,
100    ) -> Result<Self, sqlx::Error> {
101        let pg_connect_options =
102            get_postgres_connect_options(host, port, username, password, database);
103        let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
104        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
105
106        // Spawn a task to handle messages
107        let handle = tokio::spawn(async move {
108            Self::process_commands(rx, pg_connect_options.clone().into()).await;
109        });
110        Ok(Self { pool, tx, handle })
111    }
112
113    async fn process_commands(
114        mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
115        pg_connect_options: PgConnectOptions,
116    ) {
117        log_task_started(CACHE_PROCESS);
118
119        let pool = connect_pg(pg_connect_options).await.unwrap();
120
121        // Buffering
122        let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
123
124        // TODO: expose this via configuration once tests are fixed
125        let buffer_interval = Duration::from_millis(0);
126
127        // Use a timer so the task wakes up even when no new message arrives
128        let flush_timer = tokio::time::sleep(buffer_interval);
129        tokio::pin!(flush_timer);
130
131        // Continue to receive and handle messages until channel is hung up
132        loop {
133            tokio::select! {
134                maybe_msg = rx.recv() => {
135                    if let Some(msg) = maybe_msg {
136                        tracing::debug!("Received {msg:?}");
137                        if matches!(msg, DatabaseQuery::Close) {
138                            break;
139                        }
140                        buffer.push_back(msg);
141
142                        // If interval is zero flush straight away so tests remain fast
143                        if buffer_interval.is_zero() {
144                            drain_buffer(&pool, &mut buffer).await;
145                        }
146                    } else {
147                        tracing::debug!("Command channel closed");
148                        break;
149                    }
150                }
151                () = &mut flush_timer, if !buffer_interval.is_zero() => {
152                    if !buffer.is_empty() {
153                        drain_buffer(&pool, &mut buffer).await;
154                    }
155
156                    flush_timer.as_mut().reset(Instant::now() + buffer_interval);
157                }
158            }
159        }
160
161        // Drain any remaining message
162        if !buffer.is_empty() {
163            drain_buffer(&pool, &mut buffer).await;
164        }
165
166        log_task_stopped(CACHE_PROCESS);
167    }
168}
169
170/// Retrieves a `PostgresCacheDatabase` using default connection options.
171///
172/// # Errors
173///
174/// Returns an error if connecting to the database or initializing the cache adapter fails.
175pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
176    let connect_options = get_postgres_connect_options(None, None, None, None, None);
177    Ok(PostgresCacheDatabase::connect(
178        Some(connect_options.host),
179        Some(connect_options.port),
180        Some(connect_options.username),
181        Some(connect_options.password),
182        Some(connect_options.database),
183    )
184    .await?)
185}
186
187#[allow(dead_code)]
188#[allow(unused)]
189#[async_trait::async_trait]
190impl CacheDatabaseAdapter for PostgresCacheDatabase {
191    fn close(&mut self) -> anyhow::Result<()> {
192        let pool = self.pool.clone();
193        let (tx, rx) = std::sync::mpsc::channel();
194
195        log::debug!("Closing connection pool");
196
197        tokio::task::block_in_place(|| {
198            get_runtime().block_on(async {
199                pool.close().await;
200                if let Err(e) = tx.send(()) {
201                    log::error!("Error closing pool: {e:?}");
202                }
203            });
204        });
205
206        // Cancel message handling task
207        if let Err(e) = self.tx.send(DatabaseQuery::Close) {
208            log::error!("Error sending close: {e:?}");
209        }
210
211        log_task_awaiting("cache-write");
212
213        tokio::task::block_in_place(|| {
214            if let Err(e) = get_runtime().block_on(&mut self.handle) {
215                log::error!("Error awaiting task 'cache-write': {e:?}");
216            }
217        });
218
219        log::debug!("Closed");
220
221        Ok(rx.recv()?)
222    }
223
224    fn flush(&mut self) -> anyhow::Result<()> {
225        let pool = self.pool.clone();
226        let (tx, rx) = std::sync::mpsc::channel();
227
228        tokio::task::block_in_place(|| {
229            get_runtime().block_on(async {
230                if let Err(e) = DatabaseQueries::truncate(&pool).await {
231                    log::error!("Error flushing pool: {e:?}");
232                }
233                if let Err(e) = tx.send(()) {
234                    log::error!("Error sending flush result: {e:?}");
235                }
236            });
237        });
238
239        Ok(rx.recv()?)
240    }
241
242    async fn load_all(&self) -> anyhow::Result<CacheMap> {
243        let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
244            self.load_currencies(),
245            self.load_instruments(),
246            self.load_synthetics(),
247            self.load_accounts(),
248            self.load_orders(),
249            self.load_positions()
250        )
251        .map_err(|e| anyhow::anyhow!("Error loading cache data: {}", e))?;
252
253        // For now, we don't load greeks and yield curves from the database
254        // This will be implemented in the future
255        let greeks = AHashMap::new();
256        let yield_curves = AHashMap::new();
257
258        Ok(CacheMap {
259            currencies,
260            instruments,
261            synthetics,
262            accounts,
263            orders,
264            positions,
265            greeks,
266            yield_curves,
267        })
268    }
269
270    fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
271        let pool = self.pool.clone();
272        let (tx, rx) = std::sync::mpsc::channel();
273
274        tokio::spawn(async move {
275            let result = DatabaseQueries::load(&pool).await;
276            match result {
277                Ok(items) => {
278                    let mapping = items
279                        .into_iter()
280                        .map(|(k, v)| (k, Bytes::from(v)))
281                        .collect();
282                    if let Err(e) = tx.send(mapping) {
283                        log::error!("Failed to send general items: {e:?}");
284                    }
285                }
286                Err(e) => {
287                    log::error!("Failed to load general items: {e:?}");
288                    if let Err(e) = tx.send(AHashMap::new()) {
289                        log::error!("Failed to send empty general items: {e:?}");
290                    }
291                }
292            }
293        });
294        Ok(rx.recv()?)
295    }
296
297    async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
298        let pool = self.pool.clone();
299        let (tx, rx) = std::sync::mpsc::channel();
300
301        tokio::spawn(async move {
302            let result = DatabaseQueries::load_currencies(&pool).await;
303            match result {
304                Ok(currencies) => {
305                    let mapping = currencies
306                        .into_iter()
307                        .map(|currency| (currency.code, currency))
308                        .collect();
309                    if let Err(e) = tx.send(mapping) {
310                        log::error!("Failed to send currencies: {e:?}");
311                    }
312                }
313                Err(e) => {
314                    log::error!("Failed to load currencies: {e:?}");
315                    if let Err(e) = tx.send(AHashMap::new()) {
316                        log::error!("Failed to send empty currencies: {e:?}");
317                    }
318                }
319            }
320        });
321        Ok(rx.recv()?)
322    }
323
324    async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
325        let pool = self.pool.clone();
326        let (tx, rx) = std::sync::mpsc::channel();
327
328        tokio::spawn(async move {
329            let result = DatabaseQueries::load_instruments(&pool).await;
330            match result {
331                Ok(instruments) => {
332                    let mapping = instruments
333                        .into_iter()
334                        .map(|instrument| (instrument.id(), instrument))
335                        .collect();
336                    if let Err(e) = tx.send(mapping) {
337                        log::error!("Failed to send instruments: {e:?}");
338                    }
339                }
340                Err(e) => {
341                    log::error!("Failed to load instruments: {e:?}");
342                    if let Err(e) = tx.send(AHashMap::new()) {
343                        log::error!("Failed to send empty instruments: {e:?}");
344                    }
345                }
346            }
347        });
348        Ok(rx.recv()?)
349    }
350
351    async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
352        todo!()
353    }
354
355    async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
356        let pool = self.pool.clone();
357        let (tx, rx) = std::sync::mpsc::channel();
358
359        tokio::spawn(async move {
360            let result = DatabaseQueries::load_accounts(&pool).await;
361            match result {
362                Ok(accounts) => {
363                    let mapping = accounts
364                        .into_iter()
365                        .map(|account| (account.id(), account))
366                        .collect();
367                    if let Err(e) = tx.send(mapping) {
368                        log::error!("Failed to send accounts: {e:?}");
369                    }
370                }
371                Err(e) => {
372                    log::error!("Failed to load accounts: {e:?}");
373                    if let Err(e) = tx.send(AHashMap::new()) {
374                        log::error!("Failed to send empty accounts: {e:?}");
375                    }
376                }
377            }
378        });
379        Ok(rx.recv()?)
380    }
381
382    async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
383        let pool = self.pool.clone();
384        let (tx, rx) = std::sync::mpsc::channel();
385
386        tokio::spawn(async move {
387            let result = DatabaseQueries::load_orders(&pool).await;
388            match result {
389                Ok(orders) => {
390                    let mapping = orders
391                        .into_iter()
392                        .map(|order| (order.client_order_id(), order))
393                        .collect();
394                    if let Err(e) = tx.send(mapping) {
395                        log::error!("Failed to send orders: {e:?}");
396                    }
397                }
398                Err(e) => {
399                    log::error!("Failed to load orders: {e:?}");
400                    if let Err(e) = tx.send(AHashMap::new()) {
401                        log::error!("Failed to send empty orders: {e:?}");
402                    }
403                }
404            }
405        });
406        Ok(rx.recv()?)
407    }
408
409    async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
410        todo!()
411    }
412
413    fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
414        todo!()
415    }
416
417    fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
418        let pool = self.pool.clone();
419        let (tx, rx) = std::sync::mpsc::channel();
420
421        tokio::spawn(async move {
422            let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
423            match result {
424                Ok(currency) => {
425                    if let Err(e) = tx.send(currency) {
426                        log::error!("Failed to send load_index_order_client result: {e:?}");
427                    }
428                }
429                Err(e) => {
430                    log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
431                    if let Err(e) = tx.send(AHashMap::new()) {
432                        log::error!("Failed to send empty load_index_order_client result: {e:?}");
433                    }
434                }
435            }
436        });
437        Ok(rx.recv()?)
438    }
439
440    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
441        let pool = self.pool.clone();
442        let code = code.to_owned(); // Clone the code
443        let (tx, rx) = std::sync::mpsc::channel();
444
445        tokio::spawn(async move {
446            let result = DatabaseQueries::load_currency(&pool, &code).await;
447            match result {
448                Ok(currency) => {
449                    if let Err(e) = tx.send(currency) {
450                        log::error!("Failed to send currency {code}: {e:?}");
451                    }
452                }
453                Err(e) => {
454                    log::error!("Failed to load currency {code}: {e:?}");
455                    if let Err(e) = tx.send(None) {
456                        log::error!("Failed to send None for currency {code}: {e:?}");
457                    }
458                }
459            }
460        });
461        Ok(rx.recv()?)
462    }
463
464    async fn load_instrument(
465        &self,
466        instrument_id: &InstrumentId,
467    ) -> anyhow::Result<Option<InstrumentAny>> {
468        let pool = self.pool.clone();
469        let instrument_id = instrument_id.to_owned(); // Clone the instrument_id
470        let (tx, rx) = std::sync::mpsc::channel();
471
472        tokio::spawn(async move {
473            let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
474            match result {
475                Ok(instrument) => {
476                    if let Err(e) = tx.send(instrument) {
477                        log::error!("Failed to send instrument {instrument_id}: {e:?}");
478                    }
479                }
480                Err(e) => {
481                    log::error!("Failed to load instrument {instrument_id}: {e:?}");
482                    if let Err(e) = tx.send(None) {
483                        log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
484                    }
485                }
486            }
487        });
488        Ok(rx.recv()?)
489    }
490
491    async fn load_synthetic(
492        &self,
493        instrument_id: &InstrumentId,
494    ) -> anyhow::Result<Option<SyntheticInstrument>> {
495        todo!()
496    }
497
498    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
499        let pool = self.pool.clone();
500        let account_id = account_id.to_owned();
501        let (tx, rx) = std::sync::mpsc::channel();
502
503        tokio::spawn(async move {
504            let result = DatabaseQueries::load_account(&pool, &account_id).await;
505            match result {
506                Ok(account) => {
507                    if let Err(e) = tx.send(account) {
508                        log::error!("Failed to send account {account_id}: {e:?}");
509                    }
510                }
511                Err(e) => {
512                    log::error!("Failed to load account {account_id}: {e:?}");
513                    if let Err(e) = tx.send(None) {
514                        log::error!("Failed to send None for account {account_id}: {e:?}");
515                    }
516                }
517            }
518        });
519        Ok(rx.recv()?)
520    }
521
522    async fn load_order(
523        &self,
524        client_order_id: &ClientOrderId,
525    ) -> anyhow::Result<Option<OrderAny>> {
526        let pool = self.pool.clone();
527        let client_order_id = client_order_id.to_owned();
528        let (tx, rx) = std::sync::mpsc::channel();
529
530        tokio::spawn(async move {
531            let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
532            match result {
533                Ok(order) => {
534                    if let Err(e) = tx.send(order) {
535                        log::error!("Failed to send order {client_order_id}: {e:?}");
536                    }
537                }
538                Err(e) => {
539                    log::error!("Failed to load order {client_order_id}: {e:?}");
540                    let _ = tx.send(None);
541                }
542            }
543        });
544        Ok(rx.recv()?)
545    }
546
547    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
548        todo!()
549    }
550
551    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
552        todo!()
553    }
554
555    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
556        todo!()
557    }
558
559    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
560        todo!()
561    }
562
563    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
564        todo!()
565    }
566
567    fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
568        anyhow::bail!(
569            "delete_order not implemented for PostgreSQL cache adapter: {client_order_id}"
570        )
571    }
572
573    fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
574        anyhow::bail!("delete_position not implemented for PostgreSQL cache adapter: {position_id}")
575    }
576
577    fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
578        anyhow::bail!(
579            "delete_account_event not implemented for PostgreSQL cache adapter: {account_id}, {event_id}"
580        )
581    }
582
583    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
584        let query = DatabaseQuery::Add(key, value.into());
585        self.tx
586            .send(query)
587            .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
588    }
589
590    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
591        let query = DatabaseQuery::AddCurrency(*currency);
592        self.tx.send(query).map_err(|e| {
593            anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
594        })
595    }
596
597    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
598        let query = DatabaseQuery::AddInstrument(instrument.clone());
599        self.tx.send(query).map_err(|e| {
600            anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
601        })
602    }
603
604    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
605        todo!()
606    }
607
608    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
609        let query = DatabaseQuery::AddAccount(account.clone(), false);
610        self.tx.send(query).map_err(|e| {
611            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
612        })
613    }
614
615    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
616        let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
617        self.tx.send(query).map_err(|e| {
618            anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
619        })
620    }
621
622    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
623        let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
624        self.tx.send(query).map_err(|e| {
625            anyhow::anyhow!(
626                "Failed to send query add_order_snapshot to database message handler: {e}"
627            )
628        })
629    }
630
631    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
632        todo!()
633    }
634
635    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
636        let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
637        self.tx.send(query).map_err(|e| {
638            anyhow::anyhow!(
639                "Failed to send query add_position_snapshot to database message handler: {e}"
640            )
641        })
642    }
643
644    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
645        todo!()
646    }
647
648    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
649        let query = DatabaseQuery::AddQuote(quote.to_owned());
650        self.tx.send(query).map_err(|e| {
651            anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
652        })
653    }
654
655    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
656        let pool = self.pool.clone();
657        let instrument_id = instrument_id.to_owned();
658        let (tx, rx) = std::sync::mpsc::channel();
659
660        tokio::spawn(async move {
661            let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
662            match result {
663                Ok(quotes) => {
664                    if let Err(e) = tx.send(quotes) {
665                        log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
666                    }
667                }
668                Err(e) => {
669                    log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
670                    if let Err(e) = tx.send(Vec::new()) {
671                        log::error!(
672                            "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
673                        );
674                    }
675                }
676            }
677        });
678        Ok(rx.recv()?)
679    }
680
681    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
682        let query = DatabaseQuery::AddTrade(trade.to_owned());
683        self.tx.send(query).map_err(|e| {
684            anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
685        })
686    }
687
688    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
689        let pool = self.pool.clone();
690        let instrument_id = instrument_id.to_owned();
691        let (tx, rx) = std::sync::mpsc::channel();
692
693        tokio::spawn(async move {
694            let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
695            match result {
696                Ok(trades) => {
697                    if let Err(e) = tx.send(trades) {
698                        log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
699                    }
700                }
701                Err(e) => {
702                    log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
703                    if let Err(e) = tx.send(Vec::new()) {
704                        log::error!(
705                            "Failed to send empty trades for instrument {instrument_id}: {e:?}"
706                        );
707                    }
708                }
709            }
710        });
711        Ok(rx.recv()?)
712    }
713
714    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
715        let query = DatabaseQuery::AddBar(bar.to_owned());
716        self.tx.send(query).map_err(|e| {
717            anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
718        })
719    }
720
721    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
722        let pool = self.pool.clone();
723        let instrument_id = instrument_id.to_owned();
724        let (tx, rx) = std::sync::mpsc::channel();
725
726        tokio::spawn(async move {
727            let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
728            match result {
729                Ok(bars) => {
730                    if let Err(e) = tx.send(bars) {
731                        log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
732                    }
733                }
734                Err(e) => {
735                    log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
736                    if let Err(e) = tx.send(Vec::new()) {
737                        log::error!(
738                            "Failed to send empty bars for instrument {instrument_id}: {e:?}"
739                        );
740                    }
741                }
742            }
743        });
744        Ok(rx.recv()?)
745    }
746
747    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
748        let query = DatabaseQuery::AddSignal(signal.to_owned());
749        self.tx.send(query).map_err(|e| {
750            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
751        })
752    }
753
754    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
755        let pool = self.pool.clone();
756        let name = name.to_owned();
757        let (tx, rx) = std::sync::mpsc::channel();
758
759        tokio::spawn(async move {
760            let result = DatabaseQueries::load_signals(&pool, &name).await;
761            match result {
762                Ok(signals) => {
763                    if let Err(e) = tx.send(signals) {
764                        log::error!("Failed to send signals for '{name}': {e:?}");
765                    }
766                }
767                Err(e) => {
768                    log::error!("Failed to load signals for '{name}': {e:?}");
769                    if let Err(e) = tx.send(Vec::new()) {
770                        log::error!("Failed to send empty signals for '{name}': {e:?}");
771                    }
772                }
773            }
774        });
775        Ok(rx.recv()?)
776    }
777
778    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
779        let query = DatabaseQuery::AddCustom(data.to_owned());
780        self.tx.send(query).map_err(|e| {
781            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
782        })
783    }
784
785    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
786        let pool = self.pool.clone();
787        let data_type = data_type.to_owned();
788        let (tx, rx) = std::sync::mpsc::channel();
789
790        tokio::spawn(async move {
791            let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
792            match result {
793                Ok(signals) => {
794                    if let Err(e) = tx.send(signals) {
795                        log::error!("Failed to send custom data for '{data_type}': {e:?}");
796                    }
797                }
798                Err(e) => {
799                    log::error!("Failed to load custom data for '{data_type}': {e:?}");
800                    if let Err(e) = tx.send(Vec::new()) {
801                        log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
802                    }
803                }
804            }
805        });
806        Ok(rx.recv()?)
807    }
808
809    fn load_order_snapshot(
810        &self,
811        client_order_id: &ClientOrderId,
812    ) -> anyhow::Result<Option<OrderSnapshot>> {
813        let pool = self.pool.clone();
814        let client_order_id = client_order_id.to_owned();
815        let (tx, rx) = std::sync::mpsc::channel();
816
817        tokio::spawn(async move {
818            let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
819            match result {
820                Ok(snapshot) => {
821                    if let Err(e) = tx.send(snapshot) {
822                        log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
823                    }
824                }
825                Err(e) => {
826                    log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
827                    if let Err(e) = tx.send(None) {
828                        log::error!(
829                            "Failed to send None for order snapshot {client_order_id}: {e:?}"
830                        );
831                    }
832                }
833            }
834        });
835        Ok(rx.recv()?)
836    }
837
838    fn load_position_snapshot(
839        &self,
840        position_id: &PositionId,
841    ) -> anyhow::Result<Option<PositionSnapshot>> {
842        let pool = self.pool.clone();
843        let position_id = position_id.to_owned();
844        let (tx, rx) = std::sync::mpsc::channel();
845
846        tokio::spawn(async move {
847            let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
848            match result {
849                Ok(snapshot) => {
850                    if let Err(e) = tx.send(snapshot) {
851                        log::error!("Failed to send position snapshot {position_id}: {e:?}");
852                    }
853                }
854                Err(e) => {
855                    log::error!("Failed to load position snapshot {position_id}: {e:?}");
856                    if let Err(e) = tx.send(None) {
857                        log::error!(
858                            "Failed to send None for position snapshot {position_id}: {e:?}"
859                        );
860                    }
861                }
862            }
863        });
864        Ok(rx.recv()?)
865    }
866
867    fn index_venue_order_id(
868        &self,
869        client_order_id: ClientOrderId,
870        venue_order_id: VenueOrderId,
871    ) -> anyhow::Result<()> {
872        todo!()
873    }
874
875    fn index_order_position(
876        &self,
877        client_order_id: ClientOrderId,
878        position_id: PositionId,
879    ) -> anyhow::Result<()> {
880        todo!()
881    }
882
883    fn update_actor(&self) -> anyhow::Result<()> {
884        todo!()
885    }
886
887    fn update_strategy(&self) -> anyhow::Result<()> {
888        todo!()
889    }
890
891    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
892        let query = DatabaseQuery::AddAccount(account.clone(), true);
893        self.tx.send(query).map_err(|e| {
894            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
895        })
896    }
897
898    fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
899        let query = DatabaseQuery::UpdateOrder(event.clone());
900        self.tx.send(query).map_err(|e| {
901            anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
902        })
903    }
904
905    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
906        todo!()
907    }
908
909    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
910        todo!()
911    }
912
913    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
914        todo!()
915    }
916
917    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
918        todo!()
919    }
920}
921
922async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
923    for cmd in buffer.drain(..) {
924        let result: anyhow::Result<()> = match cmd {
925            DatabaseQuery::Close => Ok(()),
926            DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value).await,
927            DatabaseQuery::AddCurrency(currency) => {
928                DatabaseQueries::add_currency(pool, currency).await
929            }
930            DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
931                InstrumentAny::Betting(instrument) => {
932                    DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
933                }
934                InstrumentAny::BinaryOption(instrument) => {
935                    DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
936                        .await
937                }
938                InstrumentAny::CryptoFuture(instrument) => {
939                    DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
940                        .await
941                }
942                InstrumentAny::CryptoOption(instrument) => {
943                    DatabaseQueries::add_instrument(pool, "CRYPTO_OPTION", Box::new(instrument))
944                        .await
945                }
946                InstrumentAny::CryptoPerpetual(instrument) => {
947                    DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
948                        .await
949                }
950                InstrumentAny::CurrencyPair(instrument) => {
951                    DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
952                        .await
953                }
954                InstrumentAny::Equity(equity) => {
955                    DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
956                }
957                InstrumentAny::FuturesContract(instrument) => {
958                    DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
959                        .await
960                }
961                InstrumentAny::FuturesSpread(instrument) => {
962                    DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
963                        .await
964                }
965                InstrumentAny::OptionContract(instrument) => {
966                    DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
967                        .await
968                }
969                InstrumentAny::OptionSpread(instrument) => {
970                    DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
971                        .await
972                }
973            },
974            DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
975                OrderAny::Limit(order) => {
976                    DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
977                        .await
978                }
979                OrderAny::LimitIfTouched(order) => {
980                    DatabaseQueries::add_order(
981                        pool,
982                        "LIMIT_IF_TOUCHED",
983                        updated,
984                        Box::new(order),
985                        client_id,
986                    )
987                    .await
988                }
989                OrderAny::Market(order) => {
990                    DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
991                        .await
992                }
993                OrderAny::MarketIfTouched(order) => {
994                    DatabaseQueries::add_order(
995                        pool,
996                        "MARKET_IF_TOUCHED",
997                        updated,
998                        Box::new(order),
999                        client_id,
1000                    )
1001                    .await
1002                }
1003                OrderAny::MarketToLimit(order) => {
1004                    DatabaseQueries::add_order(
1005                        pool,
1006                        "MARKET_TO_LIMIT",
1007                        updated,
1008                        Box::new(order),
1009                        client_id,
1010                    )
1011                    .await
1012                }
1013                OrderAny::StopLimit(order) => {
1014                    DatabaseQueries::add_order(
1015                        pool,
1016                        "STOP_LIMIT",
1017                        updated,
1018                        Box::new(order),
1019                        client_id,
1020                    )
1021                    .await
1022                }
1023                OrderAny::StopMarket(order) => {
1024                    DatabaseQueries::add_order(
1025                        pool,
1026                        "STOP_MARKET",
1027                        updated,
1028                        Box::new(order),
1029                        client_id,
1030                    )
1031                    .await
1032                }
1033                OrderAny::TrailingStopLimit(order) => {
1034                    DatabaseQueries::add_order(
1035                        pool,
1036                        "TRAILING_STOP_LIMIT",
1037                        updated,
1038                        Box::new(order),
1039                        client_id,
1040                    )
1041                    .await
1042                }
1043                OrderAny::TrailingStopMarket(order) => {
1044                    DatabaseQueries::add_order(
1045                        pool,
1046                        "TRAILING_STOP_MARKET",
1047                        updated,
1048                        Box::new(order),
1049                        client_id,
1050                    )
1051                    .await
1052                }
1053            },
1054            DatabaseQuery::AddOrderSnapshot(snapshot) => {
1055                DatabaseQueries::add_order_snapshot(pool, snapshot).await
1056            }
1057            DatabaseQuery::AddPositionSnapshot(snapshot) => {
1058                DatabaseQueries::add_position_snapshot(pool, snapshot).await
1059            }
1060            DatabaseQuery::AddAccount(account_any, updated) => match account_any {
1061                AccountAny::Cash(account) => {
1062                    DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
1063                }
1064                AccountAny::Margin(account) => {
1065                    DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
1066                }
1067            },
1068            DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
1069            DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
1070            DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, &quote).await,
1071            DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
1072            DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
1073            DatabaseQuery::UpdateOrder(event) => {
1074                DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
1075            }
1076        };
1077
1078        if let Err(e) = result {
1079            tracing::error!("Error on query: {e:?}");
1080        }
1081    }
1082}