nautilus_infrastructure/sql/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::{HashMap, VecDeque},
18    time::{Duration, Instant},
19};
20
21use bytes::Bytes;
22use nautilus_common::{
23    cache::database::{CacheDatabaseAdapter, CacheMap},
24    custom::CustomData,
25    logging::{log_task_awaiting, log_task_started, log_task_stopped},
26    runtime::get_runtime,
27    signal::Signal,
28};
29use nautilus_core::UnixNanos;
30use nautilus_model::{
31    accounts::AccountAny,
32    data::{Bar, DataType, QuoteTick, TradeTick},
33    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
34    identifiers::{
35        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
36        VenueOrderId,
37    },
38    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
39    orderbook::OrderBook,
40    orders::{Order, OrderAny},
41    position::Position,
42    types::Currency,
43};
44use sqlx::{PgPool, postgres::PgConnectOptions};
45use tokio::try_join;
46use ustr::Ustr;
47
48use crate::sql::{
49    pg::{connect_pg, get_postgres_connect_options},
50    queries::DatabaseQueries,
51};
52
53// Task and connection names
54const CACHE_PROCESS: &str = "cache-process";
55
56#[derive(Debug)]
57#[cfg_attr(
58    feature = "python",
59    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
60)]
61pub struct PostgresCacheDatabase {
62    pub pool: PgPool,
63    tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
64    handle: tokio::task::JoinHandle<()>,
65}
66
67#[allow(clippy::large_enum_variant)]
68#[derive(Debug, Clone)]
69pub enum DatabaseQuery {
70    Close,
71    Add(String, Vec<u8>),
72    AddCurrency(Currency),
73    AddInstrument(InstrumentAny),
74    AddOrder(OrderAny, Option<ClientId>, bool),
75    AddOrderSnapshot(OrderSnapshot),
76    AddPositionSnapshot(PositionSnapshot),
77    AddAccount(AccountAny, bool),
78    AddSignal(Signal),
79    AddCustom(CustomData),
80    AddQuote(QuoteTick),
81    AddTrade(TradeTick),
82    AddBar(Bar),
83    UpdateOrder(OrderEventAny),
84}
85
86impl PostgresCacheDatabase {
87    /// Connects to the Postgres cache database using the provided connection parameters.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if establishing the database connection fails.
92    ///
93    /// # Panics
94    ///
95    /// Panics if the internal Postgres pool connection attempt (`connect_pg`) unwraps on error.
96    pub async fn connect(
97        host: Option<String>,
98        port: Option<u16>,
99        username: Option<String>,
100        password: Option<String>,
101        database: Option<String>,
102    ) -> Result<Self, sqlx::Error> {
103        let pg_connect_options =
104            get_postgres_connect_options(host, port, username, password, database);
105        let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
106        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
107
108        // Spawn a task to handle messages
109        let handle = tokio::spawn(async move {
110            Self::process_commands(rx, pg_connect_options.clone().into()).await;
111        });
112        Ok(Self { pool, tx, handle })
113    }
114
115    async fn process_commands(
116        mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
117        pg_connect_options: PgConnectOptions,
118    ) {
119        log_task_started(CACHE_PROCESS);
120
121        let pool = connect_pg(pg_connect_options).await.unwrap();
122
123        // Buffering
124        let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
125        let mut last_drain = Instant::now();
126
127        // TODO: Add `buffer_interval_ms` to config, setting this above 0 currently fails tests
128        let buffer_interval = Duration::from_millis(0);
129
130        // Continue to receive and handle messages until channel is hung up
131        loop {
132            if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
133                drain_buffer(&pool, &mut buffer).await;
134                last_drain = Instant::now();
135            } else if let Some(msg) = rx.recv().await {
136                tracing::debug!("Received {msg:?}");
137                match msg {
138                    DatabaseQuery::Close => break,
139                    _ => buffer.push_back(msg),
140                }
141            } else {
142                tracing::debug!("Command channel closed");
143                break;
144            }
145        }
146
147        // Drain any remaining message
148        if !buffer.is_empty() {
149            drain_buffer(&pool, &mut buffer).await;
150        }
151
152        log_task_stopped(CACHE_PROCESS);
153    }
154}
155
156/// Retrieves a `PostgresCacheDatabase` using default connection options.
157///
158/// # Errors
159///
160/// Returns an error if connecting to the database or initializing the cache adapter fails.
161pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
162    let connect_options = get_postgres_connect_options(None, None, None, None, None);
163    Ok(PostgresCacheDatabase::connect(
164        Some(connect_options.host),
165        Some(connect_options.port),
166        Some(connect_options.username),
167        Some(connect_options.password),
168        Some(connect_options.database),
169    )
170    .await?)
171}
172
173#[allow(dead_code)]
174#[allow(unused)]
175#[async_trait::async_trait]
176impl CacheDatabaseAdapter for PostgresCacheDatabase {
177    fn close(&mut self) -> anyhow::Result<()> {
178        let pool = self.pool.clone();
179        let (tx, rx) = std::sync::mpsc::channel();
180
181        log::debug!("Closing connection pool");
182
183        tokio::task::block_in_place(|| {
184            get_runtime().block_on(async {
185                pool.close().await;
186                if let Err(e) = tx.send(()) {
187                    log::error!("Error closing pool: {e:?}");
188                }
189            });
190        });
191
192        // Cancel message handling task
193        if let Err(e) = self.tx.send(DatabaseQuery::Close) {
194            log::error!("Error sending close: {e:?}");
195        }
196
197        log_task_awaiting("cache-write");
198
199        tokio::task::block_in_place(|| {
200            if let Err(e) = get_runtime().block_on(&mut self.handle) {
201                log::error!("Error awaiting task 'cache-write': {e:?}");
202            }
203        });
204
205        log::debug!("Closed");
206
207        Ok(rx.recv()?)
208    }
209
210    fn flush(&mut self) -> anyhow::Result<()> {
211        let pool = self.pool.clone();
212        let (tx, rx) = std::sync::mpsc::channel();
213
214        tokio::task::block_in_place(|| {
215            get_runtime().block_on(async {
216                if let Err(e) = DatabaseQueries::truncate(&pool).await {
217                    log::error!("Error flushing pool: {e:?}");
218                }
219                if let Err(e) = tx.send(()) {
220                    log::error!("Error sending flush result: {e:?}");
221                }
222            });
223        });
224
225        Ok(rx.recv()?)
226    }
227
228    async fn load_all(&self) -> anyhow::Result<CacheMap> {
229        let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
230            self.load_currencies(),
231            self.load_instruments(),
232            self.load_synthetics(),
233            self.load_accounts(),
234            self.load_orders(),
235            self.load_positions()
236        )
237        .map_err(|e| anyhow::anyhow!("Error loading cache data: {}", e))?;
238
239        // For now, we don't load greeks and yield curves from the database
240        // This will be implemented in the future
241        let greeks = HashMap::new();
242        let yield_curves = HashMap::new();
243
244        Ok(CacheMap {
245            currencies,
246            instruments,
247            synthetics,
248            accounts,
249            orders,
250            positions,
251            greeks,
252            yield_curves,
253        })
254    }
255
256    fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
257        let pool = self.pool.clone();
258        let (tx, rx) = std::sync::mpsc::channel();
259        tokio::spawn(async move {
260            let result = DatabaseQueries::load(&pool).await;
261            match result {
262                Ok(items) => {
263                    let mapping = items
264                        .into_iter()
265                        .map(|(k, v)| (k, Bytes::from(v)))
266                        .collect();
267                    if let Err(e) = tx.send(mapping) {
268                        log::error!("Failed to send general items: {e:?}");
269                    }
270                }
271                Err(e) => {
272                    log::error!("Failed to load general items: {e:?}");
273                    if let Err(e) = tx.send(HashMap::new()) {
274                        log::error!("Failed to send empty general items: {e:?}");
275                    }
276                }
277            }
278        });
279        Ok(rx.recv()?)
280    }
281
282    async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>> {
283        let pool = self.pool.clone();
284        let (tx, rx) = std::sync::mpsc::channel();
285        tokio::spawn(async move {
286            let result = DatabaseQueries::load_currencies(&pool).await;
287            match result {
288                Ok(currencies) => {
289                    let mapping = currencies
290                        .into_iter()
291                        .map(|currency| (currency.code, currency))
292                        .collect();
293                    if let Err(e) = tx.send(mapping) {
294                        log::error!("Failed to send currencies: {e:?}");
295                    }
296                }
297                Err(e) => {
298                    log::error!("Failed to load currencies: {e:?}");
299                    if let Err(e) = tx.send(HashMap::new()) {
300                        log::error!("Failed to send empty currencies: {e:?}");
301                    }
302                }
303            }
304        });
305        Ok(rx.recv()?)
306    }
307
308    async fn load_instruments(&self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
309        let pool = self.pool.clone();
310        let (tx, rx) = std::sync::mpsc::channel();
311        tokio::spawn(async move {
312            let result = DatabaseQueries::load_instruments(&pool).await;
313            match result {
314                Ok(instruments) => {
315                    let mapping = instruments
316                        .into_iter()
317                        .map(|instrument| (instrument.id(), instrument))
318                        .collect();
319                    if let Err(e) = tx.send(mapping) {
320                        log::error!("Failed to send instruments: {e:?}");
321                    }
322                }
323                Err(e) => {
324                    log::error!("Failed to load instruments: {e:?}");
325                    if let Err(e) = tx.send(HashMap::new()) {
326                        log::error!("Failed to send empty instruments: {e:?}");
327                    }
328                }
329            }
330        });
331        Ok(rx.recv()?)
332    }
333
334    async fn load_synthetics(&self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
335        todo!()
336    }
337
338    async fn load_accounts(&self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
339        let pool = self.pool.clone();
340        let (tx, rx) = std::sync::mpsc::channel();
341        tokio::spawn(async move {
342            let result = DatabaseQueries::load_accounts(&pool).await;
343            match result {
344                Ok(accounts) => {
345                    let mapping = accounts
346                        .into_iter()
347                        .map(|account| (account.id(), account))
348                        .collect();
349                    if let Err(e) = tx.send(mapping) {
350                        log::error!("Failed to send accounts: {e:?}");
351                    }
352                }
353                Err(e) => {
354                    log::error!("Failed to load accounts: {e:?}");
355                    if let Err(e) = tx.send(HashMap::new()) {
356                        log::error!("Failed to send empty accounts: {e:?}");
357                    }
358                }
359            }
360        });
361        Ok(rx.recv()?)
362    }
363
364    async fn load_orders(&self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
365        let pool = self.pool.clone();
366        let (tx, rx) = std::sync::mpsc::channel();
367        tokio::spawn(async move {
368            let result = DatabaseQueries::load_orders(&pool).await;
369            match result {
370                Ok(orders) => {
371                    let mapping = orders
372                        .into_iter()
373                        .map(|order| (order.client_order_id(), order))
374                        .collect();
375                    if let Err(e) = tx.send(mapping) {
376                        log::error!("Failed to send orders: {e:?}");
377                    }
378                }
379                Err(e) => {
380                    log::error!("Failed to load orders: {e:?}");
381                    if let Err(e) = tx.send(HashMap::new()) {
382                        log::error!("Failed to send empty orders: {e:?}");
383                    }
384                }
385            }
386        });
387        Ok(rx.recv()?)
388    }
389
390    async fn load_positions(&self) -> anyhow::Result<HashMap<PositionId, Position>> {
391        todo!()
392    }
393
394    fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
395        todo!()
396    }
397
398    fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
399        let pool = self.pool.clone();
400        let (tx, rx) = std::sync::mpsc::channel();
401        tokio::spawn(async move {
402            let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
403            match result {
404                Ok(currency) => {
405                    if let Err(e) = tx.send(currency) {
406                        log::error!("Failed to send load_index_order_client result: {e:?}");
407                    }
408                }
409                Err(e) => {
410                    log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
411                    if let Err(e) = tx.send(HashMap::new()) {
412                        log::error!("Failed to send empty load_index_order_client result: {e:?}");
413                    }
414                }
415            }
416        });
417        Ok(rx.recv()?)
418    }
419
420    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
421        let pool = self.pool.clone();
422        let code = code.to_owned(); // Clone the code
423        let (tx, rx) = std::sync::mpsc::channel();
424        tokio::spawn(async move {
425            let result = DatabaseQueries::load_currency(&pool, &code).await;
426            match result {
427                Ok(currency) => {
428                    if let Err(e) = tx.send(currency) {
429                        log::error!("Failed to send currency {code}: {e:?}");
430                    }
431                }
432                Err(e) => {
433                    log::error!("Failed to load currency {code}: {e:?}");
434                    if let Err(e) = tx.send(None) {
435                        log::error!("Failed to send None for currency {code}: {e:?}");
436                    }
437                }
438            }
439        });
440        Ok(rx.recv()?)
441    }
442
443    async fn load_instrument(
444        &self,
445        instrument_id: &InstrumentId,
446    ) -> anyhow::Result<Option<InstrumentAny>> {
447        let pool = self.pool.clone();
448        let instrument_id = instrument_id.to_owned(); // Clone the instrument_id
449        let (tx, rx) = std::sync::mpsc::channel();
450        tokio::spawn(async move {
451            let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
452            match result {
453                Ok(instrument) => {
454                    if let Err(e) = tx.send(instrument) {
455                        log::error!("Failed to send instrument {instrument_id}: {e:?}");
456                    }
457                }
458                Err(e) => {
459                    log::error!("Failed to load instrument {instrument_id}: {e:?}");
460                    if let Err(e) = tx.send(None) {
461                        log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
462                    }
463                }
464            }
465        });
466        Ok(rx.recv()?)
467    }
468
469    async fn load_synthetic(
470        &self,
471        instrument_id: &InstrumentId,
472    ) -> anyhow::Result<Option<SyntheticInstrument>> {
473        todo!()
474    }
475
476    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
477        let pool = self.pool.clone();
478        let account_id = account_id.to_owned();
479        let (tx, rx) = std::sync::mpsc::channel();
480        tokio::spawn(async move {
481            let result = DatabaseQueries::load_account(&pool, &account_id).await;
482            match result {
483                Ok(account) => {
484                    if let Err(e) = tx.send(account) {
485                        log::error!("Failed to send account {account_id}: {e:?}");
486                    }
487                }
488                Err(e) => {
489                    log::error!("Failed to load account {account_id}: {e:?}");
490                    if let Err(e) = tx.send(None) {
491                        log::error!("Failed to send None for account {account_id}: {e:?}");
492                    }
493                }
494            }
495        });
496        Ok(rx.recv()?)
497    }
498
499    async fn load_order(
500        &self,
501        client_order_id: &ClientOrderId,
502    ) -> anyhow::Result<Option<OrderAny>> {
503        let pool = self.pool.clone();
504        let client_order_id = client_order_id.to_owned();
505        let (tx, rx) = std::sync::mpsc::channel();
506        tokio::spawn(async move {
507            let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
508            match result {
509                Ok(order) => {
510                    if let Err(e) = tx.send(order) {
511                        log::error!("Failed to send order {client_order_id}: {e:?}");
512                    }
513                }
514                Err(e) => {
515                    log::error!("Failed to load order {client_order_id}: {e:?}");
516                    let _ = tx.send(None);
517                }
518            }
519        });
520        Ok(rx.recv()?)
521    }
522
523    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
524        todo!()
525    }
526
527    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
528        todo!()
529    }
530
531    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
532        todo!()
533    }
534
535    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
536        todo!()
537    }
538
539    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
540        todo!()
541    }
542
543    fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
544        anyhow::bail!(
545            "delete_order not implemented for PostgreSQL cache adapter: {client_order_id}"
546        )
547    }
548
549    fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
550        anyhow::bail!("delete_position not implemented for PostgreSQL cache adapter: {position_id}")
551    }
552
553    fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
554        anyhow::bail!(
555            "delete_account_event not implemented for PostgreSQL cache adapter: {account_id}, {event_id}"
556        )
557    }
558
559    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
560        let query = DatabaseQuery::Add(key, value.into());
561        self.tx
562            .send(query)
563            .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
564    }
565
566    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
567        let query = DatabaseQuery::AddCurrency(*currency);
568        self.tx.send(query).map_err(|e| {
569            anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
570        })
571    }
572
573    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
574        let query = DatabaseQuery::AddInstrument(instrument.clone());
575        self.tx.send(query).map_err(|e| {
576            anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
577        })
578    }
579
580    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
581        todo!()
582    }
583
584    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
585        let query = DatabaseQuery::AddAccount(account.clone(), false);
586        self.tx.send(query).map_err(|e| {
587            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
588        })
589    }
590
591    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
592        let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
593        self.tx.send(query).map_err(|e| {
594            anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
595        })
596    }
597
598    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
599        let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
600        self.tx.send(query).map_err(|e| {
601            anyhow::anyhow!(
602                "Failed to send query add_order_snapshot to database message handler: {e}"
603            )
604        })
605    }
606
607    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
608        todo!()
609    }
610
611    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
612        let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
613        self.tx.send(query).map_err(|e| {
614            anyhow::anyhow!(
615                "Failed to send query add_position_snapshot to database message handler: {e}"
616            )
617        })
618    }
619
620    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
621        todo!()
622    }
623
624    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
625        let query = DatabaseQuery::AddQuote(quote.to_owned());
626        self.tx.send(query).map_err(|e| {
627            anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
628        })
629    }
630
631    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
632        let pool = self.pool.clone();
633        let instrument_id = instrument_id.to_owned();
634        let (tx, rx) = std::sync::mpsc::channel();
635        tokio::spawn(async move {
636            let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
637            match result {
638                Ok(quotes) => {
639                    if let Err(e) = tx.send(quotes) {
640                        log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
641                    }
642                }
643                Err(e) => {
644                    log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
645                    if let Err(e) = tx.send(Vec::new()) {
646                        log::error!(
647                            "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
648                        );
649                    }
650                }
651            }
652        });
653        Ok(rx.recv()?)
654    }
655
656    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
657        let query = DatabaseQuery::AddTrade(trade.to_owned());
658        self.tx.send(query).map_err(|e| {
659            anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
660        })
661    }
662
663    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
664        let pool = self.pool.clone();
665        let instrument_id = instrument_id.to_owned();
666        let (tx, rx) = std::sync::mpsc::channel();
667        tokio::spawn(async move {
668            let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
669            match result {
670                Ok(trades) => {
671                    if let Err(e) = tx.send(trades) {
672                        log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
673                    }
674                }
675                Err(e) => {
676                    log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
677                    if let Err(e) = tx.send(Vec::new()) {
678                        log::error!(
679                            "Failed to send empty trades for instrument {instrument_id}: {e:?}"
680                        );
681                    }
682                }
683            }
684        });
685        Ok(rx.recv()?)
686    }
687
688    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
689        let query = DatabaseQuery::AddBar(bar.to_owned());
690        self.tx.send(query).map_err(|e| {
691            anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
692        })
693    }
694
695    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
696        let pool = self.pool.clone();
697        let instrument_id = instrument_id.to_owned();
698        let (tx, rx) = std::sync::mpsc::channel();
699        tokio::spawn(async move {
700            let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
701            match result {
702                Ok(bars) => {
703                    if let Err(e) = tx.send(bars) {
704                        log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
705                    }
706                }
707                Err(e) => {
708                    log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
709                    if let Err(e) = tx.send(Vec::new()) {
710                        log::error!(
711                            "Failed to send empty bars for instrument {instrument_id}: {e:?}"
712                        );
713                    }
714                }
715            }
716        });
717        Ok(rx.recv()?)
718    }
719
720    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
721        let query = DatabaseQuery::AddSignal(signal.to_owned());
722        self.tx.send(query).map_err(|e| {
723            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
724        })
725    }
726
727    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
728        let pool = self.pool.clone();
729        let name = name.to_owned();
730        let (tx, rx) = std::sync::mpsc::channel();
731        tokio::spawn(async move {
732            let result = DatabaseQueries::load_signals(&pool, &name).await;
733            match result {
734                Ok(signals) => {
735                    if let Err(e) = tx.send(signals) {
736                        log::error!("Failed to send signals for '{name}': {e:?}");
737                    }
738                }
739                Err(e) => {
740                    log::error!("Failed to load signals for '{name}': {e:?}");
741                    if let Err(e) = tx.send(Vec::new()) {
742                        log::error!("Failed to send empty signals for '{name}': {e:?}");
743                    }
744                }
745            }
746        });
747        Ok(rx.recv()?)
748    }
749
750    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
751        let query = DatabaseQuery::AddCustom(data.to_owned());
752        self.tx.send(query).map_err(|e| {
753            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
754        })
755    }
756
757    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
758        let pool = self.pool.clone();
759        let data_type = data_type.to_owned();
760        let (tx, rx) = std::sync::mpsc::channel();
761        tokio::spawn(async move {
762            let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
763            match result {
764                Ok(signals) => {
765                    if let Err(e) = tx.send(signals) {
766                        log::error!("Failed to send custom data for '{data_type}': {e:?}");
767                    }
768                }
769                Err(e) => {
770                    log::error!("Failed to load custom data for '{data_type}': {e:?}");
771                    if let Err(e) = tx.send(Vec::new()) {
772                        log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
773                    }
774                }
775            }
776        });
777        Ok(rx.recv()?)
778    }
779
780    fn load_order_snapshot(
781        &self,
782        client_order_id: &ClientOrderId,
783    ) -> anyhow::Result<Option<OrderSnapshot>> {
784        let pool = self.pool.clone();
785        let client_order_id = client_order_id.to_owned();
786        let (tx, rx) = std::sync::mpsc::channel();
787        tokio::spawn(async move {
788            let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
789            match result {
790                Ok(snapshot) => {
791                    if let Err(e) = tx.send(snapshot) {
792                        log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
793                    }
794                }
795                Err(e) => {
796                    log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
797                    if let Err(e) = tx.send(None) {
798                        log::error!(
799                            "Failed to send None for order snapshot {client_order_id}: {e:?}"
800                        );
801                    }
802                }
803            }
804        });
805        Ok(rx.recv()?)
806    }
807
808    fn load_position_snapshot(
809        &self,
810        position_id: &PositionId,
811    ) -> anyhow::Result<Option<PositionSnapshot>> {
812        let pool = self.pool.clone();
813        let position_id = position_id.to_owned();
814        let (tx, rx) = std::sync::mpsc::channel();
815        tokio::spawn(async move {
816            let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
817            match result {
818                Ok(snapshot) => {
819                    if let Err(e) = tx.send(snapshot) {
820                        log::error!("Failed to send position snapshot {position_id}: {e:?}");
821                    }
822                }
823                Err(e) => {
824                    log::error!("Failed to load position snapshot {position_id}: {e:?}");
825                    if let Err(e) = tx.send(None) {
826                        log::error!(
827                            "Failed to send None for position snapshot {position_id}: {e:?}"
828                        );
829                    }
830                }
831            }
832        });
833        Ok(rx.recv()?)
834    }
835
836    fn index_venue_order_id(
837        &self,
838        client_order_id: ClientOrderId,
839        venue_order_id: VenueOrderId,
840    ) -> anyhow::Result<()> {
841        todo!()
842    }
843
844    fn index_order_position(
845        &self,
846        client_order_id: ClientOrderId,
847        position_id: PositionId,
848    ) -> anyhow::Result<()> {
849        todo!()
850    }
851
852    fn update_actor(&self) -> anyhow::Result<()> {
853        todo!()
854    }
855
856    fn update_strategy(&self) -> anyhow::Result<()> {
857        todo!()
858    }
859
860    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
861        let query = DatabaseQuery::AddAccount(account.clone(), true);
862        self.tx.send(query).map_err(|e| {
863            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
864        })
865    }
866
867    fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
868        let query = DatabaseQuery::UpdateOrder(event.clone());
869        self.tx.send(query).map_err(|e| {
870            anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
871        })
872    }
873
874    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
875        todo!()
876    }
877
878    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
879        todo!()
880    }
881
882    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
883        todo!()
884    }
885
886    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
887        todo!()
888    }
889}
890
891async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
892    for cmd in buffer.drain(..) {
893        let result: anyhow::Result<()> = match cmd {
894            DatabaseQuery::Close => Ok(()),
895            DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value).await,
896            DatabaseQuery::AddCurrency(currency) => {
897                DatabaseQueries::add_currency(pool, currency).await
898            }
899            DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
900                InstrumentAny::Betting(instrument) => {
901                    DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
902                }
903                InstrumentAny::BinaryOption(instrument) => {
904                    DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
905                        .await
906                }
907                InstrumentAny::CryptoFuture(instrument) => {
908                    DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
909                        .await
910                }
911                InstrumentAny::CryptoOption(instrument) => {
912                    DatabaseQueries::add_instrument(pool, "CRYPTO_OPTION", Box::new(instrument))
913                        .await
914                }
915                InstrumentAny::CryptoPerpetual(instrument) => {
916                    DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
917                        .await
918                }
919                InstrumentAny::CurrencyPair(instrument) => {
920                    DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
921                        .await
922                }
923                InstrumentAny::Equity(equity) => {
924                    DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
925                }
926                InstrumentAny::FuturesContract(instrument) => {
927                    DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
928                        .await
929                }
930                InstrumentAny::FuturesSpread(instrument) => {
931                    DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
932                        .await
933                }
934                InstrumentAny::OptionContract(instrument) => {
935                    DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
936                        .await
937                }
938                InstrumentAny::OptionSpread(instrument) => {
939                    DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
940                        .await
941                }
942            },
943            DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
944                OrderAny::Limit(order) => {
945                    DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
946                        .await
947                }
948                OrderAny::LimitIfTouched(order) => {
949                    DatabaseQueries::add_order(
950                        pool,
951                        "LIMIT_IF_TOUCHED",
952                        updated,
953                        Box::new(order),
954                        client_id,
955                    )
956                    .await
957                }
958                OrderAny::Market(order) => {
959                    DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
960                        .await
961                }
962                OrderAny::MarketIfTouched(order) => {
963                    DatabaseQueries::add_order(
964                        pool,
965                        "MARKET_IF_TOUCHED",
966                        updated,
967                        Box::new(order),
968                        client_id,
969                    )
970                    .await
971                }
972                OrderAny::MarketToLimit(order) => {
973                    DatabaseQueries::add_order(
974                        pool,
975                        "MARKET_TO_LIMIT",
976                        updated,
977                        Box::new(order),
978                        client_id,
979                    )
980                    .await
981                }
982                OrderAny::StopLimit(order) => {
983                    DatabaseQueries::add_order(
984                        pool,
985                        "STOP_LIMIT",
986                        updated,
987                        Box::new(order),
988                        client_id,
989                    )
990                    .await
991                }
992                OrderAny::StopMarket(order) => {
993                    DatabaseQueries::add_order(
994                        pool,
995                        "STOP_MARKET",
996                        updated,
997                        Box::new(order),
998                        client_id,
999                    )
1000                    .await
1001                }
1002                OrderAny::TrailingStopLimit(order) => {
1003                    DatabaseQueries::add_order(
1004                        pool,
1005                        "TRAILING_STOP_LIMIT",
1006                        updated,
1007                        Box::new(order),
1008                        client_id,
1009                    )
1010                    .await
1011                }
1012                OrderAny::TrailingStopMarket(order) => {
1013                    DatabaseQueries::add_order(
1014                        pool,
1015                        "TRAILING_STOP_MARKET",
1016                        updated,
1017                        Box::new(order),
1018                        client_id,
1019                    )
1020                    .await
1021                }
1022            },
1023            DatabaseQuery::AddOrderSnapshot(snapshot) => {
1024                DatabaseQueries::add_order_snapshot(pool, snapshot).await
1025            }
1026            DatabaseQuery::AddPositionSnapshot(snapshot) => {
1027                DatabaseQueries::add_position_snapshot(pool, snapshot).await
1028            }
1029            DatabaseQuery::AddAccount(account_any, updated) => match account_any {
1030                AccountAny::Cash(account) => {
1031                    DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
1032                }
1033                AccountAny::Margin(account) => {
1034                    DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
1035                }
1036            },
1037            DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
1038            DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
1039            DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, &quote).await,
1040            DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
1041            DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
1042            DatabaseQuery::UpdateOrder(event) => {
1043                DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
1044            }
1045        };
1046
1047        if let Err(e) = result {
1048            tracing::error!("Error on query: {e:?}");
1049        }
1050    }
1051}