Skip to main content

nautilus_infrastructure/sql/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::VecDeque, ops::ControlFlow, pin::Pin, time::Duration};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use nautilus_common::{
21    cache::database::{CacheDatabaseAdapter, CacheMap},
22    custom::CustomData,
23    live::get_runtime,
24    logging::{log_task_awaiting, log_task_started, log_task_stopped},
25    signal::Signal,
26};
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29    accounts::AccountAny,
30    data::{Bar, DataType, FundingRateUpdate, QuoteTick, TradeTick},
31    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
32    identifiers::{
33        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
34        VenueOrderId,
35    },
36    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
37    orderbook::OrderBook,
38    orders::{Order, OrderAny},
39    position::Position,
40    types::Currency,
41};
42use sqlx::{PgPool, postgres::PgConnectOptions};
43use tokio::{time::Instant, try_join};
44use ustr::Ustr;
45
46use crate::sql::{
47    pg::{connect_pg, get_postgres_connect_options},
48    queries::DatabaseQueries,
49};
50
51// Task and connection names
52const CACHE_PROCESS: &str = "cache-process";
53
54#[derive(Debug)]
55#[cfg_attr(
56    feature = "python",
57    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
58)]
59pub struct PostgresCacheDatabase {
60    pub pool: PgPool,
61    tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
62    handle: tokio::task::JoinHandle<()>,
63}
64
65#[allow(clippy::large_enum_variant)]
66#[derive(Debug, Clone)]
67pub enum DatabaseQuery {
68    Close,
69    Add(String, Vec<u8>),
70    AddCurrency(Currency),
71    AddInstrument(InstrumentAny),
72    AddOrder(OrderAny, Option<ClientId>, bool),
73    AddOrderSnapshot(OrderSnapshot),
74    AddPositionSnapshot(PositionSnapshot),
75    AddAccount(AccountAny, bool),
76    AddSignal(Signal),
77    AddCustom(CustomData),
78    AddQuote(QuoteTick),
79    AddTrade(TradeTick),
80    AddBar(Bar),
81    UpdateOrder(OrderEventAny),
82}
83
84impl PostgresCacheDatabase {
85    /// Connects to the Postgres cache database using the provided connection parameters.
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if establishing the database connection fails.
90    ///
91    /// # Panics
92    ///
93    /// Panics if the internal Postgres pool connection attempt (`connect_pg`) unwraps on error.
94    pub async fn connect(
95        host: Option<String>,
96        port: Option<u16>,
97        username: Option<String>,
98        password: Option<String>,
99        database: Option<String>,
100    ) -> Result<Self, sqlx::Error> {
101        let pg_connect_options =
102            get_postgres_connect_options(host, port, username, password, database);
103        let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
104        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
105
106        // Spawn a task to handle messages
107        let handle = tokio::spawn(async move {
108            Self::process_commands(rx, pg_connect_options.clone().into()).await;
109        });
110        Ok(Self { pool, tx, handle })
111    }
112
113    async fn process_commands(
114        mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
115        pg_connect_options: PgConnectOptions,
116    ) {
117        log_task_started(CACHE_PROCESS);
118
119        let pool = connect_pg(pg_connect_options).await.unwrap();
120
121        // Buffering
122        let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
123
124        // TODO: expose this via configuration once tests are fixed
125        let buffer_interval = Duration::from_millis(0);
126
127        // A sleep used to trigger periodic flushing of the buffer.
128        // When `buffer_interval` is zero we skip using the timer and flush immediately
129        // after every message.
130        let flush_timer = tokio::time::sleep(buffer_interval);
131        tokio::pin!(flush_timer);
132
133        // Continue to receive and handle messages until channel is hung up
134        loop {
135            tokio::select! {
136                maybe_msg = rx.recv() => {
137                    let result = handle_query(
138                        maybe_msg,
139                        &mut buffer,
140                        buffer_interval,
141                        &pool,
142                    ).await;
143                    if result.is_break() {
144                        break;
145                    }
146                }
147                () = &mut flush_timer, if !buffer_interval.is_zero() => {
148                    flush_buffer(&mut buffer, &pool, &mut flush_timer, buffer_interval).await;
149                }
150            }
151        }
152
153        if !buffer.is_empty() {
154            drain_buffer(&pool, &mut buffer).await;
155        }
156
157        log_task_stopped(CACHE_PROCESS);
158    }
159}
160
161async fn handle_query(
162    maybe_msg: Option<DatabaseQuery>,
163    buffer: &mut VecDeque<DatabaseQuery>,
164    buffer_interval: Duration,
165    pool: &PgPool,
166) -> ControlFlow<()> {
167    let Some(msg) = maybe_msg else {
168        log::debug!("Command channel closed");
169        return ControlFlow::Break(());
170    };
171
172    log::debug!("Received {msg:?}");
173
174    if matches!(msg, DatabaseQuery::Close) {
175        if !buffer.is_empty() {
176            drain_buffer(pool, buffer).await;
177        }
178        return ControlFlow::Break(());
179    }
180
181    buffer.push_back(msg);
182
183    if buffer_interval.is_zero() {
184        drain_buffer(pool, buffer).await;
185    }
186
187    ControlFlow::Continue(())
188}
189
190async fn flush_buffer(
191    buffer: &mut VecDeque<DatabaseQuery>,
192    pool: &PgPool,
193    flush_timer: &mut Pin<&mut tokio::time::Sleep>,
194    buffer_interval: Duration,
195) {
196    if !buffer.is_empty() {
197        drain_buffer(pool, buffer).await;
198    }
199    flush_timer.as_mut().reset(Instant::now() + buffer_interval);
200}
201
202/// Retrieves a `PostgresCacheDatabase` using default connection options.
203///
204/// # Errors
205///
206/// Returns an error if connecting to the database or initializing the cache adapter fails.
207pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
208    let connect_options = get_postgres_connect_options(None, None, None, None, None);
209    Ok(PostgresCacheDatabase::connect(
210        Some(connect_options.host),
211        Some(connect_options.port),
212        Some(connect_options.username),
213        Some(connect_options.password),
214        Some(connect_options.database),
215    )
216    .await?)
217}
218
219#[allow(dead_code)]
220#[allow(unused)]
221#[async_trait::async_trait]
222impl CacheDatabaseAdapter for PostgresCacheDatabase {
223    fn close(&mut self) -> anyhow::Result<()> {
224        let pool = self.pool.clone();
225        let (tx, rx) = std::sync::mpsc::channel();
226
227        log::debug!("Closing connection pool");
228
229        tokio::task::block_in_place(|| {
230            get_runtime().block_on(async {
231                pool.close().await;
232                if let Err(e) = tx.send(()) {
233                    log::error!("Error closing pool: {e:?}");
234                }
235            });
236        });
237
238        // Cancel message handling task
239        if let Err(e) = self.tx.send(DatabaseQuery::Close) {
240            log::error!("Error sending close: {e:?}");
241        }
242
243        log_task_awaiting("cache-write");
244
245        tokio::task::block_in_place(|| {
246            if let Err(e) = get_runtime().block_on(&mut self.handle) {
247                log::error!("Error awaiting task 'cache-write': {e:?}");
248            }
249        });
250
251        log::debug!("Closed");
252
253        Ok(rx.recv()?)
254    }
255
256    fn flush(&mut self) -> anyhow::Result<()> {
257        let pool = self.pool.clone();
258        let (tx, rx) = std::sync::mpsc::channel();
259
260        tokio::task::block_in_place(|| {
261            get_runtime().block_on(async {
262                if let Err(e) = DatabaseQueries::truncate(&pool).await {
263                    log::error!("Error flushing pool: {e:?}");
264                }
265                if let Err(e) = tx.send(()) {
266                    log::error!("Error sending flush result: {e:?}");
267                }
268            });
269        });
270
271        Ok(rx.recv()?)
272    }
273
274    async fn load_all(&self) -> anyhow::Result<CacheMap> {
275        let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
276            self.load_currencies(),
277            self.load_instruments(),
278            self.load_synthetics(),
279            self.load_accounts(),
280            self.load_orders(),
281            self.load_positions()
282        )
283        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
284
285        // For now, we don't load greeks and yield curves from the database
286        // This will be implemented in the future
287        let greeks = AHashMap::new();
288        let yield_curves = AHashMap::new();
289
290        Ok(CacheMap {
291            currencies,
292            instruments,
293            synthetics,
294            accounts,
295            orders,
296            positions,
297            greeks,
298            yield_curves,
299        })
300    }
301
302    fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
303        let pool = self.pool.clone();
304        let (tx, rx) = std::sync::mpsc::channel();
305
306        tokio::spawn(async move {
307            let result = DatabaseQueries::load(&pool).await;
308            match result {
309                Ok(items) => {
310                    let mapping = items
311                        .into_iter()
312                        .map(|(k, v)| (k, Bytes::from(v)))
313                        .collect();
314                    if let Err(e) = tx.send(mapping) {
315                        log::error!("Failed to send general items: {e:?}");
316                    }
317                }
318                Err(e) => {
319                    log::error!("Failed to load general items: {e:?}");
320                    if let Err(e) = tx.send(AHashMap::new()) {
321                        log::error!("Failed to send empty general items: {e:?}");
322                    }
323                }
324            }
325        });
326        Ok(rx.recv()?)
327    }
328
329    async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
330        let pool = self.pool.clone();
331        let (tx, rx) = std::sync::mpsc::channel();
332
333        tokio::spawn(async move {
334            let result = DatabaseQueries::load_currencies(&pool).await;
335            match result {
336                Ok(currencies) => {
337                    let mapping = currencies
338                        .into_iter()
339                        .map(|currency| (currency.code, currency))
340                        .collect();
341                    if let Err(e) = tx.send(mapping) {
342                        log::error!("Failed to send currencies: {e:?}");
343                    }
344                }
345                Err(e) => {
346                    log::error!("Failed to load currencies: {e:?}");
347                    if let Err(e) = tx.send(AHashMap::new()) {
348                        log::error!("Failed to send empty currencies: {e:?}");
349                    }
350                }
351            }
352        });
353        Ok(rx.recv()?)
354    }
355
356    async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
357        let pool = self.pool.clone();
358        let (tx, rx) = std::sync::mpsc::channel();
359
360        tokio::spawn(async move {
361            let result = DatabaseQueries::load_instruments(&pool).await;
362            match result {
363                Ok(instruments) => {
364                    let mapping = instruments
365                        .into_iter()
366                        .map(|instrument| (instrument.id(), instrument))
367                        .collect();
368                    if let Err(e) = tx.send(mapping) {
369                        log::error!("Failed to send instruments: {e:?}");
370                    }
371                }
372                Err(e) => {
373                    log::error!("Failed to load instruments: {e:?}");
374                    if let Err(e) = tx.send(AHashMap::new()) {
375                        log::error!("Failed to send empty instruments: {e:?}");
376                    }
377                }
378            }
379        });
380        Ok(rx.recv()?)
381    }
382
383    async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
384        todo!()
385    }
386
387    async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
388        let pool = self.pool.clone();
389        let (tx, rx) = std::sync::mpsc::channel();
390
391        tokio::spawn(async move {
392            let result = DatabaseQueries::load_accounts(&pool).await;
393            match result {
394                Ok(accounts) => {
395                    let mapping = accounts
396                        .into_iter()
397                        .map(|account| (account.id(), account))
398                        .collect();
399                    if let Err(e) = tx.send(mapping) {
400                        log::error!("Failed to send accounts: {e:?}");
401                    }
402                }
403                Err(e) => {
404                    log::error!("Failed to load accounts: {e:?}");
405                    if let Err(e) = tx.send(AHashMap::new()) {
406                        log::error!("Failed to send empty accounts: {e:?}");
407                    }
408                }
409            }
410        });
411        Ok(rx.recv()?)
412    }
413
414    async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
415        let pool = self.pool.clone();
416        let (tx, rx) = std::sync::mpsc::channel();
417
418        tokio::spawn(async move {
419            let result = DatabaseQueries::load_orders(&pool).await;
420            match result {
421                Ok(orders) => {
422                    let mapping = orders
423                        .into_iter()
424                        .map(|order| (order.client_order_id(), order))
425                        .collect();
426                    if let Err(e) = tx.send(mapping) {
427                        log::error!("Failed to send orders: {e:?}");
428                    }
429                }
430                Err(e) => {
431                    log::error!("Failed to load orders: {e:?}");
432                    if let Err(e) = tx.send(AHashMap::new()) {
433                        log::error!("Failed to send empty orders: {e:?}");
434                    }
435                }
436            }
437        });
438        Ok(rx.recv()?)
439    }
440
441    async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
442        todo!()
443    }
444
445    fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
446        todo!()
447    }
448
449    fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
450        let pool = self.pool.clone();
451        let (tx, rx) = std::sync::mpsc::channel();
452
453        tokio::spawn(async move {
454            let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
455            match result {
456                Ok(currency) => {
457                    if let Err(e) = tx.send(currency) {
458                        log::error!("Failed to send load_index_order_client result: {e:?}");
459                    }
460                }
461                Err(e) => {
462                    log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
463                    if let Err(e) = tx.send(AHashMap::new()) {
464                        log::error!("Failed to send empty load_index_order_client result: {e:?}");
465                    }
466                }
467            }
468        });
469        Ok(rx.recv()?)
470    }
471
472    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
473        let pool = self.pool.clone();
474        let code = code.to_owned(); // Clone the code
475        let (tx, rx) = std::sync::mpsc::channel();
476
477        tokio::spawn(async move {
478            let result = DatabaseQueries::load_currency(&pool, &code).await;
479            match result {
480                Ok(currency) => {
481                    if let Err(e) = tx.send(currency) {
482                        log::error!("Failed to send currency {code}: {e:?}");
483                    }
484                }
485                Err(e) => {
486                    log::error!("Failed to load currency {code}: {e:?}");
487                    if let Err(e) = tx.send(None) {
488                        log::error!("Failed to send None for currency {code}: {e:?}");
489                    }
490                }
491            }
492        });
493        Ok(rx.recv()?)
494    }
495
496    async fn load_instrument(
497        &self,
498        instrument_id: &InstrumentId,
499    ) -> anyhow::Result<Option<InstrumentAny>> {
500        let pool = self.pool.clone();
501        let instrument_id = instrument_id.to_owned(); // Clone the instrument_id
502        let (tx, rx) = std::sync::mpsc::channel();
503
504        tokio::spawn(async move {
505            let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
506            match result {
507                Ok(instrument) => {
508                    if let Err(e) = tx.send(instrument) {
509                        log::error!("Failed to send instrument {instrument_id}: {e:?}");
510                    }
511                }
512                Err(e) => {
513                    log::error!("Failed to load instrument {instrument_id}: {e:?}");
514                    if let Err(e) = tx.send(None) {
515                        log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
516                    }
517                }
518            }
519        });
520        Ok(rx.recv()?)
521    }
522
523    async fn load_synthetic(
524        &self,
525        instrument_id: &InstrumentId,
526    ) -> anyhow::Result<Option<SyntheticInstrument>> {
527        todo!()
528    }
529
530    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
531        let pool = self.pool.clone();
532        let account_id = account_id.to_owned();
533        let (tx, rx) = std::sync::mpsc::channel();
534
535        tokio::spawn(async move {
536            let result = DatabaseQueries::load_account(&pool, &account_id).await;
537            match result {
538                Ok(account) => {
539                    if let Err(e) = tx.send(account) {
540                        log::error!("Failed to send account {account_id}: {e:?}");
541                    }
542                }
543                Err(e) => {
544                    log::error!("Failed to load account {account_id}: {e:?}");
545                    if let Err(e) = tx.send(None) {
546                        log::error!("Failed to send None for account {account_id}: {e:?}");
547                    }
548                }
549            }
550        });
551        Ok(rx.recv()?)
552    }
553
554    async fn load_order(
555        &self,
556        client_order_id: &ClientOrderId,
557    ) -> anyhow::Result<Option<OrderAny>> {
558        let pool = self.pool.clone();
559        let client_order_id = client_order_id.to_owned();
560        let (tx, rx) = std::sync::mpsc::channel();
561
562        tokio::spawn(async move {
563            let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
564            match result {
565                Ok(order) => {
566                    if let Err(e) = tx.send(order) {
567                        log::error!("Failed to send order {client_order_id}: {e:?}");
568                    }
569                }
570                Err(e) => {
571                    log::error!("Failed to load order {client_order_id}: {e:?}");
572                    let _ = tx.send(None);
573                }
574            }
575        });
576        Ok(rx.recv()?)
577    }
578
579    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
580        todo!()
581    }
582
583    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
584        todo!()
585    }
586
587    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
588        todo!()
589    }
590
591    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
592        todo!()
593    }
594
595    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
596        todo!()
597    }
598
599    fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
600        anyhow::bail!(
601            "delete_order not implemented for PostgreSQL cache adapter: {client_order_id}"
602        )
603    }
604
605    fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
606        anyhow::bail!("delete_position not implemented for PostgreSQL cache adapter: {position_id}")
607    }
608
609    fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
610        anyhow::bail!(
611            "delete_account_event not implemented for PostgreSQL cache adapter: {account_id}, {event_id}"
612        )
613    }
614
615    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
616        let query = DatabaseQuery::Add(key, value.into());
617        self.tx
618            .send(query)
619            .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
620    }
621
622    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
623        let query = DatabaseQuery::AddCurrency(*currency);
624        self.tx.send(query).map_err(|e| {
625            anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
626        })
627    }
628
629    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
630        let query = DatabaseQuery::AddInstrument(instrument.clone());
631        self.tx.send(query).map_err(|e| {
632            anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
633        })
634    }
635
636    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
637        todo!()
638    }
639
640    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
641        let query = DatabaseQuery::AddAccount(account.clone(), false);
642        self.tx.send(query).map_err(|e| {
643            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
644        })
645    }
646
647    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
648        let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
649        self.tx.send(query).map_err(|e| {
650            anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
651        })
652    }
653
654    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
655        let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
656        self.tx.send(query).map_err(|e| {
657            anyhow::anyhow!(
658                "Failed to send query add_order_snapshot to database message handler: {e}"
659            )
660        })
661    }
662
663    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
664        todo!()
665    }
666
667    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
668        let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
669        self.tx.send(query).map_err(|e| {
670            anyhow::anyhow!(
671                "Failed to send query add_position_snapshot to database message handler: {e}"
672            )
673        })
674    }
675
676    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
677        todo!()
678    }
679
680    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
681        let query = DatabaseQuery::AddQuote(quote.to_owned());
682        self.tx.send(query).map_err(|e| {
683            anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
684        })
685    }
686
687    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
688        let pool = self.pool.clone();
689        let instrument_id = instrument_id.to_owned();
690        let (tx, rx) = std::sync::mpsc::channel();
691
692        tokio::spawn(async move {
693            let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
694            match result {
695                Ok(quotes) => {
696                    if let Err(e) = tx.send(quotes) {
697                        log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
698                    }
699                }
700                Err(e) => {
701                    log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
702                    if let Err(e) = tx.send(Vec::new()) {
703                        log::error!(
704                            "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
705                        );
706                    }
707                }
708            }
709        });
710        Ok(rx.recv()?)
711    }
712
713    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
714        let query = DatabaseQuery::AddTrade(trade.to_owned());
715        self.tx.send(query).map_err(|e| {
716            anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
717        })
718    }
719
720    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
721        let pool = self.pool.clone();
722        let instrument_id = instrument_id.to_owned();
723        let (tx, rx) = std::sync::mpsc::channel();
724
725        tokio::spawn(async move {
726            let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
727            match result {
728                Ok(trades) => {
729                    if let Err(e) = tx.send(trades) {
730                        log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
731                    }
732                }
733                Err(e) => {
734                    log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
735                    if let Err(e) = tx.send(Vec::new()) {
736                        log::error!(
737                            "Failed to send empty trades for instrument {instrument_id}: {e:?}"
738                        );
739                    }
740                }
741            }
742        });
743        Ok(rx.recv()?)
744    }
745
746    fn add_funding_rate(&self, _funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
747        anyhow::bail!("add_funding_rate not implemented for PostgreSQL cache adapter")
748    }
749
750    fn load_funding_rates(
751        &self,
752        _instrument_id: &InstrumentId,
753    ) -> anyhow::Result<Vec<FundingRateUpdate>> {
754        anyhow::bail!("load_funding_rates not implemented for PostgreSQL cache adapter")
755    }
756
757    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
758        let query = DatabaseQuery::AddBar(bar.to_owned());
759        self.tx.send(query).map_err(|e| {
760            anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
761        })
762    }
763
764    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
765        let pool = self.pool.clone();
766        let instrument_id = instrument_id.to_owned();
767        let (tx, rx) = std::sync::mpsc::channel();
768
769        tokio::spawn(async move {
770            let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
771            match result {
772                Ok(bars) => {
773                    if let Err(e) = tx.send(bars) {
774                        log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
775                    }
776                }
777                Err(e) => {
778                    log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
779                    if let Err(e) = tx.send(Vec::new()) {
780                        log::error!(
781                            "Failed to send empty bars for instrument {instrument_id}: {e:?}"
782                        );
783                    }
784                }
785            }
786        });
787        Ok(rx.recv()?)
788    }
789
790    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
791        let query = DatabaseQuery::AddSignal(signal.to_owned());
792        self.tx.send(query).map_err(|e| {
793            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
794        })
795    }
796
797    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
798        let pool = self.pool.clone();
799        let name = name.to_owned();
800        let (tx, rx) = std::sync::mpsc::channel();
801
802        tokio::spawn(async move {
803            let result = DatabaseQueries::load_signals(&pool, &name).await;
804            match result {
805                Ok(signals) => {
806                    if let Err(e) = tx.send(signals) {
807                        log::error!("Failed to send signals for '{name}': {e:?}");
808                    }
809                }
810                Err(e) => {
811                    log::error!("Failed to load signals for '{name}': {e:?}");
812                    if let Err(e) = tx.send(Vec::new()) {
813                        log::error!("Failed to send empty signals for '{name}': {e:?}");
814                    }
815                }
816            }
817        });
818        Ok(rx.recv()?)
819    }
820
821    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
822        let query = DatabaseQuery::AddCustom(data.to_owned());
823        self.tx.send(query).map_err(|e| {
824            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
825        })
826    }
827
828    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
829        let pool = self.pool.clone();
830        let data_type = data_type.to_owned();
831        let (tx, rx) = std::sync::mpsc::channel();
832
833        tokio::spawn(async move {
834            let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
835            match result {
836                Ok(signals) => {
837                    if let Err(e) = tx.send(signals) {
838                        log::error!("Failed to send custom data for '{data_type}': {e:?}");
839                    }
840                }
841                Err(e) => {
842                    log::error!("Failed to load custom data for '{data_type}': {e:?}");
843                    if let Err(e) = tx.send(Vec::new()) {
844                        log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
845                    }
846                }
847            }
848        });
849        Ok(rx.recv()?)
850    }
851
852    fn load_order_snapshot(
853        &self,
854        client_order_id: &ClientOrderId,
855    ) -> anyhow::Result<Option<OrderSnapshot>> {
856        let pool = self.pool.clone();
857        let client_order_id = client_order_id.to_owned();
858        let (tx, rx) = std::sync::mpsc::channel();
859
860        tokio::spawn(async move {
861            let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
862            match result {
863                Ok(snapshot) => {
864                    if let Err(e) = tx.send(snapshot) {
865                        log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
866                    }
867                }
868                Err(e) => {
869                    log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
870                    if let Err(e) = tx.send(None) {
871                        log::error!(
872                            "Failed to send None for order snapshot {client_order_id}: {e:?}"
873                        );
874                    }
875                }
876            }
877        });
878        Ok(rx.recv()?)
879    }
880
881    fn load_position_snapshot(
882        &self,
883        position_id: &PositionId,
884    ) -> anyhow::Result<Option<PositionSnapshot>> {
885        let pool = self.pool.clone();
886        let position_id = position_id.to_owned();
887        let (tx, rx) = std::sync::mpsc::channel();
888
889        tokio::spawn(async move {
890            let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
891            match result {
892                Ok(snapshot) => {
893                    if let Err(e) = tx.send(snapshot) {
894                        log::error!("Failed to send position snapshot {position_id}: {e:?}");
895                    }
896                }
897                Err(e) => {
898                    log::error!("Failed to load position snapshot {position_id}: {e:?}");
899                    if let Err(e) = tx.send(None) {
900                        log::error!(
901                            "Failed to send None for position snapshot {position_id}: {e:?}"
902                        );
903                    }
904                }
905            }
906        });
907        Ok(rx.recv()?)
908    }
909
910    fn index_venue_order_id(
911        &self,
912        client_order_id: ClientOrderId,
913        venue_order_id: VenueOrderId,
914    ) -> anyhow::Result<()> {
915        todo!()
916    }
917
918    fn index_order_position(
919        &self,
920        client_order_id: ClientOrderId,
921        position_id: PositionId,
922    ) -> anyhow::Result<()> {
923        todo!()
924    }
925
926    fn update_actor(&self) -> anyhow::Result<()> {
927        todo!()
928    }
929
930    fn update_strategy(&self) -> anyhow::Result<()> {
931        todo!()
932    }
933
934    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
935        let query = DatabaseQuery::AddAccount(account.clone(), true);
936        self.tx.send(query).map_err(|e| {
937            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
938        })
939    }
940
941    fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
942        let query = DatabaseQuery::UpdateOrder(event.clone());
943        self.tx.send(query).map_err(|e| {
944            anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
945        })
946    }
947
948    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
949        todo!()
950    }
951
952    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
953        todo!()
954    }
955
956    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
957        todo!()
958    }
959
960    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
961        todo!()
962    }
963}
964
965async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
966    for cmd in buffer.drain(..) {
967        let result: anyhow::Result<()> = match cmd {
968            DatabaseQuery::Close => Ok(()),
969            DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value).await,
970            DatabaseQuery::AddCurrency(currency) => {
971                DatabaseQueries::add_currency(pool, currency).await
972            }
973            DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
974                InstrumentAny::Betting(instrument) => {
975                    DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
976                }
977                InstrumentAny::BinaryOption(instrument) => {
978                    DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
979                        .await
980                }
981                InstrumentAny::CryptoFuture(instrument) => {
982                    DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
983                        .await
984                }
985                InstrumentAny::CryptoOption(instrument) => {
986                    DatabaseQueries::add_instrument(pool, "CRYPTO_OPTION", Box::new(instrument))
987                        .await
988                }
989                InstrumentAny::CryptoPerpetual(instrument) => {
990                    DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
991                        .await
992                }
993                InstrumentAny::CurrencyPair(instrument) => {
994                    DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
995                        .await
996                }
997                InstrumentAny::Equity(equity) => {
998                    DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
999                }
1000                InstrumentAny::FuturesContract(instrument) => {
1001                    DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
1002                        .await
1003                }
1004                InstrumentAny::FuturesSpread(instrument) => {
1005                    DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
1006                        .await
1007                }
1008                InstrumentAny::OptionContract(instrument) => {
1009                    DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
1010                        .await
1011                }
1012                InstrumentAny::OptionSpread(instrument) => {
1013                    DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
1014                        .await
1015                }
1016            },
1017            DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
1018                OrderAny::Limit(order) => {
1019                    DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
1020                        .await
1021                }
1022                OrderAny::LimitIfTouched(order) => {
1023                    DatabaseQueries::add_order(
1024                        pool,
1025                        "LIMIT_IF_TOUCHED",
1026                        updated,
1027                        Box::new(order),
1028                        client_id,
1029                    )
1030                    .await
1031                }
1032                OrderAny::Market(order) => {
1033                    DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
1034                        .await
1035                }
1036                OrderAny::MarketIfTouched(order) => {
1037                    DatabaseQueries::add_order(
1038                        pool,
1039                        "MARKET_IF_TOUCHED",
1040                        updated,
1041                        Box::new(order),
1042                        client_id,
1043                    )
1044                    .await
1045                }
1046                OrderAny::MarketToLimit(order) => {
1047                    DatabaseQueries::add_order(
1048                        pool,
1049                        "MARKET_TO_LIMIT",
1050                        updated,
1051                        Box::new(order),
1052                        client_id,
1053                    )
1054                    .await
1055                }
1056                OrderAny::StopLimit(order) => {
1057                    DatabaseQueries::add_order(
1058                        pool,
1059                        "STOP_LIMIT",
1060                        updated,
1061                        Box::new(order),
1062                        client_id,
1063                    )
1064                    .await
1065                }
1066                OrderAny::StopMarket(order) => {
1067                    DatabaseQueries::add_order(
1068                        pool,
1069                        "STOP_MARKET",
1070                        updated,
1071                        Box::new(order),
1072                        client_id,
1073                    )
1074                    .await
1075                }
1076                OrderAny::TrailingStopLimit(order) => {
1077                    DatabaseQueries::add_order(
1078                        pool,
1079                        "TRAILING_STOP_LIMIT",
1080                        updated,
1081                        Box::new(order),
1082                        client_id,
1083                    )
1084                    .await
1085                }
1086                OrderAny::TrailingStopMarket(order) => {
1087                    DatabaseQueries::add_order(
1088                        pool,
1089                        "TRAILING_STOP_MARKET",
1090                        updated,
1091                        Box::new(order),
1092                        client_id,
1093                    )
1094                    .await
1095                }
1096            },
1097            DatabaseQuery::AddOrderSnapshot(snapshot) => {
1098                DatabaseQueries::add_order_snapshot(pool, snapshot).await
1099            }
1100            DatabaseQuery::AddPositionSnapshot(snapshot) => {
1101                DatabaseQueries::add_position_snapshot(pool, snapshot).await
1102            }
1103            DatabaseQuery::AddAccount(account_any, updated) => match account_any {
1104                AccountAny::Cash(account) => {
1105                    DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
1106                }
1107                AccountAny::Margin(account) => {
1108                    DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
1109                }
1110            },
1111            DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
1112            DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
1113            DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, &quote).await,
1114            DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
1115            DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
1116            DatabaseQuery::UpdateOrder(event) => {
1117                DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
1118            }
1119        };
1120
1121        if let Err(e) = result {
1122            log::error!("Error on query: {e:?}");
1123        }
1124    }
1125}