nautilus_infrastructure/sql/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::VecDeque, time::Duration};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use nautilus_common::{
21    cache::database::{CacheDatabaseAdapter, CacheMap},
22    custom::CustomData,
23    logging::{log_task_awaiting, log_task_started, log_task_stopped},
24    runtime::get_runtime,
25    signal::Signal,
26};
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29    accounts::AccountAny,
30    data::{Bar, DataType, QuoteTick, TradeTick},
31    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
32    identifiers::{
33        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
34        VenueOrderId,
35    },
36    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
37    orderbook::OrderBook,
38    orders::{Order, OrderAny},
39    position::Position,
40    types::Currency,
41};
42use sqlx::{PgPool, postgres::PgConnectOptions};
43use tokio::{time::Instant, try_join};
44use ustr::Ustr;
45
46use crate::sql::{
47    pg::{connect_pg, get_postgres_connect_options},
48    queries::DatabaseQueries,
49};
50
51// Task and connection names
52const CACHE_PROCESS: &str = "cache-process";
53
54#[derive(Debug)]
55#[cfg_attr(
56    feature = "python",
57    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
58)]
59pub struct PostgresCacheDatabase {
60    pub pool: PgPool,
61    tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
62    handle: tokio::task::JoinHandle<()>,
63}
64
65#[allow(clippy::large_enum_variant)]
66#[derive(Debug, Clone)]
67pub enum DatabaseQuery {
68    Close,
69    Add(String, Vec<u8>),
70    AddCurrency(Currency),
71    AddInstrument(InstrumentAny),
72    AddOrder(OrderAny, Option<ClientId>, bool),
73    AddOrderSnapshot(OrderSnapshot),
74    AddPositionSnapshot(PositionSnapshot),
75    AddAccount(AccountAny, bool),
76    AddSignal(Signal),
77    AddCustom(CustomData),
78    AddQuote(QuoteTick),
79    AddTrade(TradeTick),
80    AddBar(Bar),
81    UpdateOrder(OrderEventAny),
82}
83
84impl PostgresCacheDatabase {
85    /// Connects to the Postgres cache database using the provided connection parameters.
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if establishing the database connection fails.
90    ///
91    /// # Panics
92    ///
93    /// Panics if the internal Postgres pool connection attempt (`connect_pg`) unwraps on error.
94    pub async fn connect(
95        host: Option<String>,
96        port: Option<u16>,
97        username: Option<String>,
98        password: Option<String>,
99        database: Option<String>,
100    ) -> Result<Self, sqlx::Error> {
101        let pg_connect_options =
102            get_postgres_connect_options(host, port, username, password, database);
103        let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
104        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
105
106        // Spawn a task to handle messages
107        let handle = tokio::spawn(async move {
108            Self::process_commands(rx, pg_connect_options.clone().into()).await;
109        });
110        Ok(Self { pool, tx, handle })
111    }
112
113    async fn process_commands(
114        mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
115        pg_connect_options: PgConnectOptions,
116    ) {
117        log_task_started(CACHE_PROCESS);
118
119        let pool = connect_pg(pg_connect_options).await.unwrap();
120
121        // Buffering
122        let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
123
124        // TODO: expose this via configuration once tests are fixed
125        let buffer_interval = Duration::from_millis(0);
126
127        // Use a timer so the task wakes up even when no new message arrives
128        let flush_timer = tokio::time::sleep(buffer_interval);
129        tokio::pin!(flush_timer);
130
131        // Continue to receive and handle messages until channel is hung up
132        loop {
133            tokio::select! {
134                maybe_msg = rx.recv() => {
135                    if let Some(msg) = maybe_msg {
136                        tracing::debug!("Received {msg:?}");
137                        if matches!(msg, DatabaseQuery::Close) {
138                            break;
139                        }
140                        buffer.push_back(msg);
141
142                        // If interval is zero flush straight away so tests remain fast
143                        if buffer_interval.is_zero() {
144                            drain_buffer(&pool, &mut buffer).await;
145                        }
146                    } else {
147                        tracing::debug!("Command channel closed");
148                        break;
149                    }
150                }
151                () = &mut flush_timer, if !buffer_interval.is_zero() => {
152                    if !buffer.is_empty() {
153                        drain_buffer(&pool, &mut buffer).await;
154                    }
155
156                    flush_timer.as_mut().reset(Instant::now() + buffer_interval);
157                }
158            }
159        }
160
161        // Drain any remaining message
162        if !buffer.is_empty() {
163            drain_buffer(&pool, &mut buffer).await;
164        }
165
166        log_task_stopped(CACHE_PROCESS);
167    }
168}
169
170/// Retrieves a `PostgresCacheDatabase` using default connection options.
171///
172/// # Errors
173///
174/// Returns an error if connecting to the database or initializing the cache adapter fails.
175pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
176    let connect_options = get_postgres_connect_options(None, None, None, None, None);
177    Ok(PostgresCacheDatabase::connect(
178        Some(connect_options.host),
179        Some(connect_options.port),
180        Some(connect_options.username),
181        Some(connect_options.password),
182        Some(connect_options.database),
183    )
184    .await?)
185}
186
187#[allow(dead_code)]
188#[allow(unused)]
189#[async_trait::async_trait]
190impl CacheDatabaseAdapter for PostgresCacheDatabase {
191    fn close(&mut self) -> anyhow::Result<()> {
192        let pool = self.pool.clone();
193        let (tx, rx) = std::sync::mpsc::channel();
194
195        log::debug!("Closing connection pool");
196
197        tokio::task::block_in_place(|| {
198            get_runtime().block_on(async {
199                pool.close().await;
200                if let Err(e) = tx.send(()) {
201                    log::error!("Error closing pool: {e:?}");
202                }
203            });
204        });
205
206        // Cancel message handling task
207        if let Err(e) = self.tx.send(DatabaseQuery::Close) {
208            log::error!("Error sending close: {e:?}");
209        }
210
211        log_task_awaiting("cache-write");
212
213        tokio::task::block_in_place(|| {
214            if let Err(e) = get_runtime().block_on(&mut self.handle) {
215                log::error!("Error awaiting task 'cache-write': {e:?}");
216            }
217        });
218
219        log::debug!("Closed");
220
221        Ok(rx.recv()?)
222    }
223
224    fn flush(&mut self) -> anyhow::Result<()> {
225        let pool = self.pool.clone();
226        let (tx, rx) = std::sync::mpsc::channel();
227
228        tokio::task::block_in_place(|| {
229            get_runtime().block_on(async {
230                if let Err(e) = DatabaseQueries::truncate(&pool).await {
231                    log::error!("Error flushing pool: {e:?}");
232                }
233                if let Err(e) = tx.send(()) {
234                    log::error!("Error sending flush result: {e:?}");
235                }
236            });
237        });
238
239        Ok(rx.recv()?)
240    }
241
242    async fn load_all(&self) -> anyhow::Result<CacheMap> {
243        let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
244            self.load_currencies(),
245            self.load_instruments(),
246            self.load_synthetics(),
247            self.load_accounts(),
248            self.load_orders(),
249            self.load_positions()
250        )
251        .map_err(|e| anyhow::anyhow!("Error loading cache data: {}", e))?;
252
253        // For now, we don't load greeks and yield curves from the database
254        // This will be implemented in the future
255        let greeks = AHashMap::new();
256        let yield_curves = AHashMap::new();
257
258        Ok(CacheMap {
259            currencies,
260            instruments,
261            synthetics,
262            accounts,
263            orders,
264            positions,
265            greeks,
266            yield_curves,
267        })
268    }
269
270    fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
271        let pool = self.pool.clone();
272        let (tx, rx) = std::sync::mpsc::channel();
273        tokio::spawn(async move {
274            let result = DatabaseQueries::load(&pool).await;
275            match result {
276                Ok(items) => {
277                    let mapping = items
278                        .into_iter()
279                        .map(|(k, v)| (k, Bytes::from(v)))
280                        .collect();
281                    if let Err(e) = tx.send(mapping) {
282                        log::error!("Failed to send general items: {e:?}");
283                    }
284                }
285                Err(e) => {
286                    log::error!("Failed to load general items: {e:?}");
287                    if let Err(e) = tx.send(AHashMap::new()) {
288                        log::error!("Failed to send empty general items: {e:?}");
289                    }
290                }
291            }
292        });
293        Ok(rx.recv()?)
294    }
295
296    async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
297        let pool = self.pool.clone();
298        let (tx, rx) = std::sync::mpsc::channel();
299        tokio::spawn(async move {
300            let result = DatabaseQueries::load_currencies(&pool).await;
301            match result {
302                Ok(currencies) => {
303                    let mapping = currencies
304                        .into_iter()
305                        .map(|currency| (currency.code, currency))
306                        .collect();
307                    if let Err(e) = tx.send(mapping) {
308                        log::error!("Failed to send currencies: {e:?}");
309                    }
310                }
311                Err(e) => {
312                    log::error!("Failed to load currencies: {e:?}");
313                    if let Err(e) = tx.send(AHashMap::new()) {
314                        log::error!("Failed to send empty currencies: {e:?}");
315                    }
316                }
317            }
318        });
319        Ok(rx.recv()?)
320    }
321
322    async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
323        let pool = self.pool.clone();
324        let (tx, rx) = std::sync::mpsc::channel();
325        tokio::spawn(async move {
326            let result = DatabaseQueries::load_instruments(&pool).await;
327            match result {
328                Ok(instruments) => {
329                    let mapping = instruments
330                        .into_iter()
331                        .map(|instrument| (instrument.id(), instrument))
332                        .collect();
333                    if let Err(e) = tx.send(mapping) {
334                        log::error!("Failed to send instruments: {e:?}");
335                    }
336                }
337                Err(e) => {
338                    log::error!("Failed to load instruments: {e:?}");
339                    if let Err(e) = tx.send(AHashMap::new()) {
340                        log::error!("Failed to send empty instruments: {e:?}");
341                    }
342                }
343            }
344        });
345        Ok(rx.recv()?)
346    }
347
348    async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
349        todo!()
350    }
351
352    async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
353        let pool = self.pool.clone();
354        let (tx, rx) = std::sync::mpsc::channel();
355        tokio::spawn(async move {
356            let result = DatabaseQueries::load_accounts(&pool).await;
357            match result {
358                Ok(accounts) => {
359                    let mapping = accounts
360                        .into_iter()
361                        .map(|account| (account.id(), account))
362                        .collect();
363                    if let Err(e) = tx.send(mapping) {
364                        log::error!("Failed to send accounts: {e:?}");
365                    }
366                }
367                Err(e) => {
368                    log::error!("Failed to load accounts: {e:?}");
369                    if let Err(e) = tx.send(AHashMap::new()) {
370                        log::error!("Failed to send empty accounts: {e:?}");
371                    }
372                }
373            }
374        });
375        Ok(rx.recv()?)
376    }
377
378    async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
379        let pool = self.pool.clone();
380        let (tx, rx) = std::sync::mpsc::channel();
381        tokio::spawn(async move {
382            let result = DatabaseQueries::load_orders(&pool).await;
383            match result {
384                Ok(orders) => {
385                    let mapping = orders
386                        .into_iter()
387                        .map(|order| (order.client_order_id(), order))
388                        .collect();
389                    if let Err(e) = tx.send(mapping) {
390                        log::error!("Failed to send orders: {e:?}");
391                    }
392                }
393                Err(e) => {
394                    log::error!("Failed to load orders: {e:?}");
395                    if let Err(e) = tx.send(AHashMap::new()) {
396                        log::error!("Failed to send empty orders: {e:?}");
397                    }
398                }
399            }
400        });
401        Ok(rx.recv()?)
402    }
403
404    async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
405        todo!()
406    }
407
408    fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
409        todo!()
410    }
411
412    fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
413        let pool = self.pool.clone();
414        let (tx, rx) = std::sync::mpsc::channel();
415        tokio::spawn(async move {
416            let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
417            match result {
418                Ok(currency) => {
419                    if let Err(e) = tx.send(currency) {
420                        log::error!("Failed to send load_index_order_client result: {e:?}");
421                    }
422                }
423                Err(e) => {
424                    log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
425                    if let Err(e) = tx.send(AHashMap::new()) {
426                        log::error!("Failed to send empty load_index_order_client result: {e:?}");
427                    }
428                }
429            }
430        });
431        Ok(rx.recv()?)
432    }
433
434    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
435        let pool = self.pool.clone();
436        let code = code.to_owned(); // Clone the code
437        let (tx, rx) = std::sync::mpsc::channel();
438        tokio::spawn(async move {
439            let result = DatabaseQueries::load_currency(&pool, &code).await;
440            match result {
441                Ok(currency) => {
442                    if let Err(e) = tx.send(currency) {
443                        log::error!("Failed to send currency {code}: {e:?}");
444                    }
445                }
446                Err(e) => {
447                    log::error!("Failed to load currency {code}: {e:?}");
448                    if let Err(e) = tx.send(None) {
449                        log::error!("Failed to send None for currency {code}: {e:?}");
450                    }
451                }
452            }
453        });
454        Ok(rx.recv()?)
455    }
456
457    async fn load_instrument(
458        &self,
459        instrument_id: &InstrumentId,
460    ) -> anyhow::Result<Option<InstrumentAny>> {
461        let pool = self.pool.clone();
462        let instrument_id = instrument_id.to_owned(); // Clone the instrument_id
463        let (tx, rx) = std::sync::mpsc::channel();
464        tokio::spawn(async move {
465            let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
466            match result {
467                Ok(instrument) => {
468                    if let Err(e) = tx.send(instrument) {
469                        log::error!("Failed to send instrument {instrument_id}: {e:?}");
470                    }
471                }
472                Err(e) => {
473                    log::error!("Failed to load instrument {instrument_id}: {e:?}");
474                    if let Err(e) = tx.send(None) {
475                        log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
476                    }
477                }
478            }
479        });
480        Ok(rx.recv()?)
481    }
482
483    async fn load_synthetic(
484        &self,
485        instrument_id: &InstrumentId,
486    ) -> anyhow::Result<Option<SyntheticInstrument>> {
487        todo!()
488    }
489
490    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
491        let pool = self.pool.clone();
492        let account_id = account_id.to_owned();
493        let (tx, rx) = std::sync::mpsc::channel();
494        tokio::spawn(async move {
495            let result = DatabaseQueries::load_account(&pool, &account_id).await;
496            match result {
497                Ok(account) => {
498                    if let Err(e) = tx.send(account) {
499                        log::error!("Failed to send account {account_id}: {e:?}");
500                    }
501                }
502                Err(e) => {
503                    log::error!("Failed to load account {account_id}: {e:?}");
504                    if let Err(e) = tx.send(None) {
505                        log::error!("Failed to send None for account {account_id}: {e:?}");
506                    }
507                }
508            }
509        });
510        Ok(rx.recv()?)
511    }
512
513    async fn load_order(
514        &self,
515        client_order_id: &ClientOrderId,
516    ) -> anyhow::Result<Option<OrderAny>> {
517        let pool = self.pool.clone();
518        let client_order_id = client_order_id.to_owned();
519        let (tx, rx) = std::sync::mpsc::channel();
520        tokio::spawn(async move {
521            let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
522            match result {
523                Ok(order) => {
524                    if let Err(e) = tx.send(order) {
525                        log::error!("Failed to send order {client_order_id}: {e:?}");
526                    }
527                }
528                Err(e) => {
529                    log::error!("Failed to load order {client_order_id}: {e:?}");
530                    let _ = tx.send(None);
531                }
532            }
533        });
534        Ok(rx.recv()?)
535    }
536
537    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
538        todo!()
539    }
540
541    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
542        todo!()
543    }
544
545    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
546        todo!()
547    }
548
549    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
550        todo!()
551    }
552
553    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
554        todo!()
555    }
556
557    fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
558        anyhow::bail!(
559            "delete_order not implemented for PostgreSQL cache adapter: {client_order_id}"
560        )
561    }
562
563    fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
564        anyhow::bail!("delete_position not implemented for PostgreSQL cache adapter: {position_id}")
565    }
566
567    fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
568        anyhow::bail!(
569            "delete_account_event not implemented for PostgreSQL cache adapter: {account_id}, {event_id}"
570        )
571    }
572
573    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
574        let query = DatabaseQuery::Add(key, value.into());
575        self.tx
576            .send(query)
577            .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
578    }
579
580    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
581        let query = DatabaseQuery::AddCurrency(*currency);
582        self.tx.send(query).map_err(|e| {
583            anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
584        })
585    }
586
587    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
588        let query = DatabaseQuery::AddInstrument(instrument.clone());
589        self.tx.send(query).map_err(|e| {
590            anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
591        })
592    }
593
594    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
595        todo!()
596    }
597
598    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
599        let query = DatabaseQuery::AddAccount(account.clone(), false);
600        self.tx.send(query).map_err(|e| {
601            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
602        })
603    }
604
605    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
606        let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
607        self.tx.send(query).map_err(|e| {
608            anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
609        })
610    }
611
612    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
613        let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
614        self.tx.send(query).map_err(|e| {
615            anyhow::anyhow!(
616                "Failed to send query add_order_snapshot to database message handler: {e}"
617            )
618        })
619    }
620
621    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
622        todo!()
623    }
624
625    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
626        let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
627        self.tx.send(query).map_err(|e| {
628            anyhow::anyhow!(
629                "Failed to send query add_position_snapshot to database message handler: {e}"
630            )
631        })
632    }
633
634    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
635        todo!()
636    }
637
638    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
639        let query = DatabaseQuery::AddQuote(quote.to_owned());
640        self.tx.send(query).map_err(|e| {
641            anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
642        })
643    }
644
645    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
646        let pool = self.pool.clone();
647        let instrument_id = instrument_id.to_owned();
648        let (tx, rx) = std::sync::mpsc::channel();
649        tokio::spawn(async move {
650            let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
651            match result {
652                Ok(quotes) => {
653                    if let Err(e) = tx.send(quotes) {
654                        log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
655                    }
656                }
657                Err(e) => {
658                    log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
659                    if let Err(e) = tx.send(Vec::new()) {
660                        log::error!(
661                            "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
662                        );
663                    }
664                }
665            }
666        });
667        Ok(rx.recv()?)
668    }
669
670    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
671        let query = DatabaseQuery::AddTrade(trade.to_owned());
672        self.tx.send(query).map_err(|e| {
673            anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
674        })
675    }
676
677    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
678        let pool = self.pool.clone();
679        let instrument_id = instrument_id.to_owned();
680        let (tx, rx) = std::sync::mpsc::channel();
681        tokio::spawn(async move {
682            let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
683            match result {
684                Ok(trades) => {
685                    if let Err(e) = tx.send(trades) {
686                        log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
687                    }
688                }
689                Err(e) => {
690                    log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
691                    if let Err(e) = tx.send(Vec::new()) {
692                        log::error!(
693                            "Failed to send empty trades for instrument {instrument_id}: {e:?}"
694                        );
695                    }
696                }
697            }
698        });
699        Ok(rx.recv()?)
700    }
701
702    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
703        let query = DatabaseQuery::AddBar(bar.to_owned());
704        self.tx.send(query).map_err(|e| {
705            anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
706        })
707    }
708
709    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
710        let pool = self.pool.clone();
711        let instrument_id = instrument_id.to_owned();
712        let (tx, rx) = std::sync::mpsc::channel();
713        tokio::spawn(async move {
714            let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
715            match result {
716                Ok(bars) => {
717                    if let Err(e) = tx.send(bars) {
718                        log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
719                    }
720                }
721                Err(e) => {
722                    log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
723                    if let Err(e) = tx.send(Vec::new()) {
724                        log::error!(
725                            "Failed to send empty bars for instrument {instrument_id}: {e:?}"
726                        );
727                    }
728                }
729            }
730        });
731        Ok(rx.recv()?)
732    }
733
734    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
735        let query = DatabaseQuery::AddSignal(signal.to_owned());
736        self.tx.send(query).map_err(|e| {
737            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
738        })
739    }
740
741    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
742        let pool = self.pool.clone();
743        let name = name.to_owned();
744        let (tx, rx) = std::sync::mpsc::channel();
745        tokio::spawn(async move {
746            let result = DatabaseQueries::load_signals(&pool, &name).await;
747            match result {
748                Ok(signals) => {
749                    if let Err(e) = tx.send(signals) {
750                        log::error!("Failed to send signals for '{name}': {e:?}");
751                    }
752                }
753                Err(e) => {
754                    log::error!("Failed to load signals for '{name}': {e:?}");
755                    if let Err(e) = tx.send(Vec::new()) {
756                        log::error!("Failed to send empty signals for '{name}': {e:?}");
757                    }
758                }
759            }
760        });
761        Ok(rx.recv()?)
762    }
763
764    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
765        let query = DatabaseQuery::AddCustom(data.to_owned());
766        self.tx.send(query).map_err(|e| {
767            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
768        })
769    }
770
771    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
772        let pool = self.pool.clone();
773        let data_type = data_type.to_owned();
774        let (tx, rx) = std::sync::mpsc::channel();
775        tokio::spawn(async move {
776            let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
777            match result {
778                Ok(signals) => {
779                    if let Err(e) = tx.send(signals) {
780                        log::error!("Failed to send custom data for '{data_type}': {e:?}");
781                    }
782                }
783                Err(e) => {
784                    log::error!("Failed to load custom data for '{data_type}': {e:?}");
785                    if let Err(e) = tx.send(Vec::new()) {
786                        log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
787                    }
788                }
789            }
790        });
791        Ok(rx.recv()?)
792    }
793
794    fn load_order_snapshot(
795        &self,
796        client_order_id: &ClientOrderId,
797    ) -> anyhow::Result<Option<OrderSnapshot>> {
798        let pool = self.pool.clone();
799        let client_order_id = client_order_id.to_owned();
800        let (tx, rx) = std::sync::mpsc::channel();
801        tokio::spawn(async move {
802            let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
803            match result {
804                Ok(snapshot) => {
805                    if let Err(e) = tx.send(snapshot) {
806                        log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
807                    }
808                }
809                Err(e) => {
810                    log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
811                    if let Err(e) = tx.send(None) {
812                        log::error!(
813                            "Failed to send None for order snapshot {client_order_id}: {e:?}"
814                        );
815                    }
816                }
817            }
818        });
819        Ok(rx.recv()?)
820    }
821
822    fn load_position_snapshot(
823        &self,
824        position_id: &PositionId,
825    ) -> anyhow::Result<Option<PositionSnapshot>> {
826        let pool = self.pool.clone();
827        let position_id = position_id.to_owned();
828        let (tx, rx) = std::sync::mpsc::channel();
829        tokio::spawn(async move {
830            let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
831            match result {
832                Ok(snapshot) => {
833                    if let Err(e) = tx.send(snapshot) {
834                        log::error!("Failed to send position snapshot {position_id}: {e:?}");
835                    }
836                }
837                Err(e) => {
838                    log::error!("Failed to load position snapshot {position_id}: {e:?}");
839                    if let Err(e) = tx.send(None) {
840                        log::error!(
841                            "Failed to send None for position snapshot {position_id}: {e:?}"
842                        );
843                    }
844                }
845            }
846        });
847        Ok(rx.recv()?)
848    }
849
850    fn index_venue_order_id(
851        &self,
852        client_order_id: ClientOrderId,
853        venue_order_id: VenueOrderId,
854    ) -> anyhow::Result<()> {
855        todo!()
856    }
857
858    fn index_order_position(
859        &self,
860        client_order_id: ClientOrderId,
861        position_id: PositionId,
862    ) -> anyhow::Result<()> {
863        todo!()
864    }
865
866    fn update_actor(&self) -> anyhow::Result<()> {
867        todo!()
868    }
869
870    fn update_strategy(&self) -> anyhow::Result<()> {
871        todo!()
872    }
873
874    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
875        let query = DatabaseQuery::AddAccount(account.clone(), true);
876        self.tx.send(query).map_err(|e| {
877            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
878        })
879    }
880
881    fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
882        let query = DatabaseQuery::UpdateOrder(event.clone());
883        self.tx.send(query).map_err(|e| {
884            anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
885        })
886    }
887
888    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
889        todo!()
890    }
891
892    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
893        todo!()
894    }
895
896    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
897        todo!()
898    }
899
900    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
901        todo!()
902    }
903}
904
905async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
906    for cmd in buffer.drain(..) {
907        let result: anyhow::Result<()> = match cmd {
908            DatabaseQuery::Close => Ok(()),
909            DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value).await,
910            DatabaseQuery::AddCurrency(currency) => {
911                DatabaseQueries::add_currency(pool, currency).await
912            }
913            DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
914                InstrumentAny::Betting(instrument) => {
915                    DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
916                }
917                InstrumentAny::BinaryOption(instrument) => {
918                    DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
919                        .await
920                }
921                InstrumentAny::CryptoFuture(instrument) => {
922                    DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
923                        .await
924                }
925                InstrumentAny::CryptoOption(instrument) => {
926                    DatabaseQueries::add_instrument(pool, "CRYPTO_OPTION", Box::new(instrument))
927                        .await
928                }
929                InstrumentAny::CryptoPerpetual(instrument) => {
930                    DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
931                        .await
932                }
933                InstrumentAny::CurrencyPair(instrument) => {
934                    DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
935                        .await
936                }
937                InstrumentAny::Equity(equity) => {
938                    DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
939                }
940                InstrumentAny::FuturesContract(instrument) => {
941                    DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
942                        .await
943                }
944                InstrumentAny::FuturesSpread(instrument) => {
945                    DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
946                        .await
947                }
948                InstrumentAny::OptionContract(instrument) => {
949                    DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
950                        .await
951                }
952                InstrumentAny::OptionSpread(instrument) => {
953                    DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
954                        .await
955                }
956            },
957            DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
958                OrderAny::Limit(order) => {
959                    DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
960                        .await
961                }
962                OrderAny::LimitIfTouched(order) => {
963                    DatabaseQueries::add_order(
964                        pool,
965                        "LIMIT_IF_TOUCHED",
966                        updated,
967                        Box::new(order),
968                        client_id,
969                    )
970                    .await
971                }
972                OrderAny::Market(order) => {
973                    DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
974                        .await
975                }
976                OrderAny::MarketIfTouched(order) => {
977                    DatabaseQueries::add_order(
978                        pool,
979                        "MARKET_IF_TOUCHED",
980                        updated,
981                        Box::new(order),
982                        client_id,
983                    )
984                    .await
985                }
986                OrderAny::MarketToLimit(order) => {
987                    DatabaseQueries::add_order(
988                        pool,
989                        "MARKET_TO_LIMIT",
990                        updated,
991                        Box::new(order),
992                        client_id,
993                    )
994                    .await
995                }
996                OrderAny::StopLimit(order) => {
997                    DatabaseQueries::add_order(
998                        pool,
999                        "STOP_LIMIT",
1000                        updated,
1001                        Box::new(order),
1002                        client_id,
1003                    )
1004                    .await
1005                }
1006                OrderAny::StopMarket(order) => {
1007                    DatabaseQueries::add_order(
1008                        pool,
1009                        "STOP_MARKET",
1010                        updated,
1011                        Box::new(order),
1012                        client_id,
1013                    )
1014                    .await
1015                }
1016                OrderAny::TrailingStopLimit(order) => {
1017                    DatabaseQueries::add_order(
1018                        pool,
1019                        "TRAILING_STOP_LIMIT",
1020                        updated,
1021                        Box::new(order),
1022                        client_id,
1023                    )
1024                    .await
1025                }
1026                OrderAny::TrailingStopMarket(order) => {
1027                    DatabaseQueries::add_order(
1028                        pool,
1029                        "TRAILING_STOP_MARKET",
1030                        updated,
1031                        Box::new(order),
1032                        client_id,
1033                    )
1034                    .await
1035                }
1036            },
1037            DatabaseQuery::AddOrderSnapshot(snapshot) => {
1038                DatabaseQueries::add_order_snapshot(pool, snapshot).await
1039            }
1040            DatabaseQuery::AddPositionSnapshot(snapshot) => {
1041                DatabaseQueries::add_position_snapshot(pool, snapshot).await
1042            }
1043            DatabaseQuery::AddAccount(account_any, updated) => match account_any {
1044                AccountAny::Cash(account) => {
1045                    DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
1046                }
1047                AccountAny::Margin(account) => {
1048                    DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
1049                }
1050            },
1051            DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
1052            DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
1053            DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, &quote).await,
1054            DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
1055            DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
1056            DatabaseQuery::UpdateOrder(event) => {
1057                DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
1058            }
1059        };
1060
1061        if let Err(e) = result {
1062            tracing::error!("Error on query: {e:?}");
1063        }
1064    }
1065}