nautilus_infrastructure/sql/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::{HashMap, VecDeque},
18    time::{Duration, Instant},
19};
20
21use bytes::Bytes;
22use nautilus_common::{
23    cache::database::{CacheDatabaseAdapter, CacheMap},
24    custom::CustomData,
25    runtime::get_runtime,
26    signal::Signal,
27};
28use nautilus_core::UnixNanos;
29use nautilus_model::{
30    accounts::AccountAny,
31    data::{Bar, DataType, QuoteTick, TradeTick},
32    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
33    identifiers::{
34        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
35        VenueOrderId,
36    },
37    instruments::{InstrumentAny, SyntheticInstrument},
38    orderbook::OrderBook,
39    orders::OrderAny,
40    position::Position,
41    types::Currency,
42};
43use sqlx::{PgPool, postgres::PgConnectOptions};
44use tokio::try_join;
45use ustr::Ustr;
46
47use crate::sql::{
48    pg::{connect_pg, get_postgres_connect_options},
49    queries::DatabaseQueries,
50};
51
52#[derive(Debug)]
53#[cfg_attr(
54    feature = "python",
55    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
56)]
57pub struct PostgresCacheDatabase {
58    pub pool: PgPool,
59    tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
60    handle: tokio::task::JoinHandle<()>,
61}
62
63#[allow(clippy::large_enum_variant)]
64#[derive(Debug, Clone)]
65pub enum DatabaseQuery {
66    Close,
67    Add(String, Vec<u8>),
68    AddCurrency(Currency),
69    AddInstrument(InstrumentAny),
70    AddOrder(OrderAny, Option<ClientId>, bool),
71    AddOrderSnapshot(OrderSnapshot),
72    AddPositionSnapshot(PositionSnapshot),
73    AddAccount(AccountAny, bool),
74    AddSignal(Signal),
75    AddCustom(CustomData),
76    AddQuote(QuoteTick),
77    AddTrade(TradeTick),
78    AddBar(Bar),
79    UpdateOrder(OrderEventAny),
80}
81
82impl PostgresCacheDatabase {
83    pub async fn connect(
84        host: Option<String>,
85        port: Option<u16>,
86        username: Option<String>,
87        password: Option<String>,
88        database: Option<String>,
89    ) -> Result<Self, sqlx::Error> {
90        let pg_connect_options =
91            get_postgres_connect_options(host, port, username, password, database);
92        let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
93        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
94
95        // Spawn a task to handle messages
96        let handle = tokio::spawn(async move {
97            PostgresCacheDatabase::process_commands(rx, pg_connect_options.clone().into()).await;
98        });
99        Ok(PostgresCacheDatabase { pool, tx, handle })
100    }
101
102    async fn process_commands(
103        mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
104        pg_connect_options: PgConnectOptions,
105    ) {
106        tracing::debug!("Starting cache processing");
107
108        let pool = connect_pg(pg_connect_options).await.unwrap();
109
110        // Buffering
111        let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
112        let mut last_drain = Instant::now();
113
114        // TODO: Add `buffer_interval_ms` to config, setting this above 0 currently fails tests
115        let buffer_interval = Duration::from_millis(0);
116
117        // Continue to receive and handle messages until channel is hung up
118        loop {
119            if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
120                drain_buffer(&pool, &mut buffer).await;
121                last_drain = Instant::now();
122            } else {
123                match rx.recv().await {
124                    Some(msg) => match msg {
125                        DatabaseQuery::Close => break,
126                        _ => buffer.push_back(msg),
127                    },
128                    None => break, // Channel hung up
129                }
130            }
131        }
132
133        // Drain any remaining message
134        if !buffer.is_empty() {
135            drain_buffer(&pool, &mut buffer).await;
136        }
137
138        tracing::debug!("Stopped cache processing");
139    }
140}
141
142pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
143    let connect_options = get_postgres_connect_options(None, None, None, None, None);
144    Ok(PostgresCacheDatabase::connect(
145        Some(connect_options.host),
146        Some(connect_options.port),
147        Some(connect_options.username),
148        Some(connect_options.password),
149        Some(connect_options.database),
150    )
151    .await?)
152}
153
154#[allow(dead_code)]
155#[allow(unused)]
156#[async_trait::async_trait]
157impl CacheDatabaseAdapter for PostgresCacheDatabase {
158    fn close(&mut self) -> anyhow::Result<()> {
159        let pool = self.pool.clone();
160        let (tx, rx) = std::sync::mpsc::channel();
161
162        log::debug!("Closing connection pool");
163        tokio::task::block_in_place(|| {
164            get_runtime().block_on(async {
165                pool.close().await;
166                if let Err(e) = tx.send(()) {
167                    log::error!("Error closing pool: {e:?}");
168                }
169            });
170        });
171
172        // Cancel message handling task
173        if let Err(e) = self.tx.send(DatabaseQuery::Close) {
174            log::error!("Error sending close: {e:?}");
175        }
176
177        log::debug!("Awaiting task 'cache-write'"); // Naming tasks will soon be stablized
178        tokio::task::block_in_place(|| {
179            if let Err(e) = get_runtime().block_on(&mut self.handle) {
180                log::error!("Error awaiting task 'cache-write': {e:?}");
181            }
182        });
183
184        log::debug!("Closed");
185
186        Ok(rx.recv()?)
187    }
188
189    fn flush(&mut self) -> anyhow::Result<()> {
190        let pool = self.pool.clone();
191        let (tx, rx) = std::sync::mpsc::channel();
192
193        tokio::task::block_in_place(|| {
194            get_runtime().block_on(async {
195                if let Err(e) = DatabaseQueries::truncate(&pool).await {
196                    log::error!("Error flushing pool: {e:?}");
197                }
198                if let Err(e) = tx.send(()) {
199                    log::error!("Error sending flush result: {e:?}");
200                }
201            });
202        });
203
204        Ok(rx.recv()?)
205    }
206
207    async fn load_all(&self) -> anyhow::Result<CacheMap> {
208        let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
209            self.load_currencies(),
210            self.load_instruments(),
211            self.load_synthetics(),
212            self.load_accounts(),
213            self.load_orders(),
214            self.load_positions()
215        )
216        .map_err(|e| anyhow::anyhow!("Error loading cache data: {}", e))?;
217
218        Ok(CacheMap {
219            currencies,
220            instruments,
221            synthetics,
222            accounts,
223            orders,
224            positions,
225        })
226    }
227
228    fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
229        let pool = self.pool.clone();
230        let (tx, rx) = std::sync::mpsc::channel();
231        tokio::spawn(async move {
232            let result = DatabaseQueries::load(&pool).await;
233            match result {
234                Ok(items) => {
235                    let mapping = items
236                        .into_iter()
237                        .map(|(k, v)| (k, Bytes::from(v)))
238                        .collect();
239                    if let Err(e) = tx.send(mapping) {
240                        log::error!("Failed to send general items: {e:?}");
241                    }
242                }
243                Err(e) => {
244                    log::error!("Failed to load general items: {e:?}");
245                    if let Err(e) = tx.send(HashMap::new()) {
246                        log::error!("Failed to send empty general items: {e:?}");
247                    }
248                }
249            }
250        });
251        Ok(rx.recv()?)
252    }
253
254    async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>> {
255        let pool = self.pool.clone();
256        let (tx, rx) = std::sync::mpsc::channel();
257        tokio::spawn(async move {
258            let result = DatabaseQueries::load_currencies(&pool).await;
259            match result {
260                Ok(currencies) => {
261                    let mapping = currencies
262                        .into_iter()
263                        .map(|currency| (currency.code, currency))
264                        .collect();
265                    if let Err(e) = tx.send(mapping) {
266                        log::error!("Failed to send currencies: {e:?}");
267                    }
268                }
269                Err(e) => {
270                    log::error!("Failed to load currencies: {e:?}");
271                    if let Err(e) = tx.send(HashMap::new()) {
272                        log::error!("Failed to send empty currencies: {e:?}");
273                    }
274                }
275            }
276        });
277        Ok(rx.recv()?)
278    }
279
280    async fn load_instruments(&self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
281        let pool = self.pool.clone();
282        let (tx, rx) = std::sync::mpsc::channel();
283        tokio::spawn(async move {
284            let result = DatabaseQueries::load_instruments(&pool).await;
285            match result {
286                Ok(instruments) => {
287                    let mapping = instruments
288                        .into_iter()
289                        .map(|instrument| (instrument.id(), instrument))
290                        .collect();
291                    if let Err(e) = tx.send(mapping) {
292                        log::error!("Failed to send instruments: {e:?}");
293                    }
294                }
295                Err(e) => {
296                    log::error!("Failed to load instruments: {e:?}");
297                    if let Err(e) = tx.send(HashMap::new()) {
298                        log::error!("Failed to send empty instruments: {e:?}");
299                    }
300                }
301            }
302        });
303        Ok(rx.recv()?)
304    }
305
306    async fn load_synthetics(&self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
307        todo!()
308    }
309
310    async fn load_accounts(&self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
311        let pool = self.pool.clone();
312        let (tx, rx) = std::sync::mpsc::channel();
313        tokio::spawn(async move {
314            let result = DatabaseQueries::load_accounts(&pool).await;
315            match result {
316                Ok(accounts) => {
317                    let mapping = accounts
318                        .into_iter()
319                        .map(|account| (account.id(), account))
320                        .collect();
321                    if let Err(e) = tx.send(mapping) {
322                        log::error!("Failed to send accounts: {e:?}");
323                    }
324                }
325                Err(e) => {
326                    log::error!("Failed to load accounts: {e:?}");
327                    if let Err(e) = tx.send(HashMap::new()) {
328                        log::error!("Failed to send empty accounts: {e:?}");
329                    }
330                }
331            }
332        });
333        Ok(rx.recv()?)
334    }
335
336    async fn load_orders(&self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
337        let pool = self.pool.clone();
338        let (tx, rx) = std::sync::mpsc::channel();
339        tokio::spawn(async move {
340            let result = DatabaseQueries::load_orders(&pool).await;
341            match result {
342                Ok(orders) => {
343                    let mapping = orders
344                        .into_iter()
345                        .map(|order| (order.client_order_id(), order))
346                        .collect();
347                    if let Err(e) = tx.send(mapping) {
348                        log::error!("Failed to send orders: {e:?}");
349                    }
350                }
351                Err(e) => {
352                    log::error!("Failed to load orders: {e:?}");
353                    if let Err(e) = tx.send(HashMap::new()) {
354                        log::error!("Failed to send empty orders: {e:?}");
355                    }
356                }
357            }
358        });
359        Ok(rx.recv()?)
360    }
361
362    async fn load_positions(&self) -> anyhow::Result<HashMap<PositionId, Position>> {
363        todo!()
364    }
365
366    fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
367        todo!()
368    }
369
370    fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
371        let pool = self.pool.clone();
372        let (tx, rx) = std::sync::mpsc::channel();
373        tokio::spawn(async move {
374            let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
375            match result {
376                Ok(currency) => {
377                    if let Err(e) = tx.send(currency) {
378                        log::error!("Failed to send load_index_order_client result: {e:?}");
379                    }
380                }
381                Err(e) => {
382                    log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
383                    if let Err(e) = tx.send(HashMap::new()) {
384                        log::error!("Failed to send empty load_index_order_client result: {e:?}");
385                    }
386                }
387            }
388        });
389        Ok(rx.recv()?)
390    }
391
392    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
393        let pool = self.pool.clone();
394        let code = code.to_owned(); // Clone the code
395        let (tx, rx) = std::sync::mpsc::channel();
396        tokio::spawn(async move {
397            let result = DatabaseQueries::load_currency(&pool, &code).await;
398            match result {
399                Ok(currency) => {
400                    if let Err(e) = tx.send(currency) {
401                        log::error!("Failed to send currency {code}: {e:?}");
402                    }
403                }
404                Err(e) => {
405                    log::error!("Failed to load currency {code}: {e:?}");
406                    if let Err(e) = tx.send(None) {
407                        log::error!("Failed to send None for currency {code}: {e:?}");
408                    }
409                }
410            }
411        });
412        Ok(rx.recv()?)
413    }
414
415    async fn load_instrument(
416        &self,
417        instrument_id: &InstrumentId,
418    ) -> anyhow::Result<Option<InstrumentAny>> {
419        let pool = self.pool.clone();
420        let instrument_id = instrument_id.to_owned(); // Clone the instrument_id
421        let (tx, rx) = std::sync::mpsc::channel();
422        tokio::spawn(async move {
423            let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
424            match result {
425                Ok(instrument) => {
426                    if let Err(e) = tx.send(instrument) {
427                        log::error!("Failed to send instrument {instrument_id}: {e:?}");
428                    }
429                }
430                Err(e) => {
431                    log::error!("Failed to load instrument {instrument_id}: {e:?}");
432                    if let Err(e) = tx.send(None) {
433                        log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
434                    }
435                }
436            }
437        });
438        Ok(rx.recv()?)
439    }
440
441    async fn load_synthetic(
442        &self,
443        instrument_id: &InstrumentId,
444    ) -> anyhow::Result<Option<SyntheticInstrument>> {
445        todo!()
446    }
447
448    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
449        let pool = self.pool.clone();
450        let account_id = account_id.to_owned();
451        let (tx, rx) = std::sync::mpsc::channel();
452        tokio::spawn(async move {
453            let result = DatabaseQueries::load_account(&pool, &account_id).await;
454            match result {
455                Ok(account) => {
456                    if let Err(e) = tx.send(account) {
457                        log::error!("Failed to send account {account_id}: {e:?}");
458                    }
459                }
460                Err(e) => {
461                    log::error!("Failed to load account {account_id}: {e:?}");
462                    if let Err(e) = tx.send(None) {
463                        log::error!("Failed to send None for account {account_id}: {e:?}");
464                    }
465                }
466            }
467        });
468        Ok(rx.recv()?)
469    }
470
471    async fn load_order(
472        &self,
473        client_order_id: &ClientOrderId,
474    ) -> anyhow::Result<Option<OrderAny>> {
475        let pool = self.pool.clone();
476        let client_order_id = client_order_id.to_owned();
477        let (tx, rx) = std::sync::mpsc::channel();
478        tokio::spawn(async move {
479            let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
480            match result {
481                Ok(order) => {
482                    if let Err(e) = tx.send(order) {
483                        log::error!("Failed to send order {client_order_id}: {e:?}");
484                    }
485                }
486                Err(e) => {
487                    log::error!("Failed to load order {client_order_id}: {e:?}");
488                    let _ = tx.send(None);
489                }
490            }
491        });
492        Ok(rx.recv()?)
493    }
494
495    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
496        todo!()
497    }
498
499    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
500        todo!()
501    }
502
503    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
504        todo!()
505    }
506
507    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
508        todo!()
509    }
510
511    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
512        todo!()
513    }
514
515    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
516        let query = DatabaseQuery::Add(key, value.into());
517        self.tx
518            .send(query)
519            .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
520    }
521
522    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
523        let query = DatabaseQuery::AddCurrency(*currency);
524        self.tx.send(query).map_err(|e| {
525            anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
526        })
527    }
528
529    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
530        let query = DatabaseQuery::AddInstrument(instrument.clone());
531        self.tx.send(query).map_err(|e| {
532            anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
533        })
534    }
535
536    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
537        todo!()
538    }
539
540    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
541        let query = DatabaseQuery::AddAccount(account.clone(), false);
542        self.tx.send(query).map_err(|e| {
543            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
544        })
545    }
546
547    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
548        let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
549        self.tx.send(query).map_err(|e| {
550            anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
551        })
552    }
553
554    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
555        let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
556        self.tx.send(query).map_err(|e| {
557            anyhow::anyhow!(
558                "Failed to send query add_order_snapshot to database message handler: {e}"
559            )
560        })
561    }
562
563    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
564        todo!()
565    }
566
567    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
568        let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
569        self.tx.send(query).map_err(|e| {
570            anyhow::anyhow!(
571                "Failed to send query add_position_snapshot to database message handler: {e}"
572            )
573        })
574    }
575
576    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
577        todo!()
578    }
579
580    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
581        let query = DatabaseQuery::AddQuote(quote.to_owned());
582        self.tx.send(query).map_err(|e| {
583            anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
584        })
585    }
586
587    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
588        let pool = self.pool.clone();
589        let instrument_id = instrument_id.to_owned();
590        let (tx, rx) = std::sync::mpsc::channel();
591        tokio::spawn(async move {
592            let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
593            match result {
594                Ok(quotes) => {
595                    if let Err(e) = tx.send(quotes) {
596                        log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
597                    }
598                }
599                Err(e) => {
600                    log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
601                    if let Err(e) = tx.send(Vec::new()) {
602                        log::error!(
603                            "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
604                        );
605                    }
606                }
607            }
608        });
609        Ok(rx.recv()?)
610    }
611
612    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
613        let query = DatabaseQuery::AddTrade(trade.to_owned());
614        self.tx.send(query).map_err(|e| {
615            anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
616        })
617    }
618
619    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
620        let pool = self.pool.clone();
621        let instrument_id = instrument_id.to_owned();
622        let (tx, rx) = std::sync::mpsc::channel();
623        tokio::spawn(async move {
624            let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
625            match result {
626                Ok(trades) => {
627                    if let Err(e) = tx.send(trades) {
628                        log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
629                    }
630                }
631                Err(e) => {
632                    log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
633                    if let Err(e) = tx.send(Vec::new()) {
634                        log::error!(
635                            "Failed to send empty trades for instrument {instrument_id}: {e:?}"
636                        );
637                    }
638                }
639            }
640        });
641        Ok(rx.recv()?)
642    }
643
644    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
645        let query = DatabaseQuery::AddBar(bar.to_owned());
646        self.tx.send(query).map_err(|e| {
647            anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
648        })
649    }
650
651    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
652        let pool = self.pool.clone();
653        let instrument_id = instrument_id.to_owned();
654        let (tx, rx) = std::sync::mpsc::channel();
655        tokio::spawn(async move {
656            let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
657            match result {
658                Ok(bars) => {
659                    if let Err(e) = tx.send(bars) {
660                        log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
661                    }
662                }
663                Err(e) => {
664                    log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
665                    if let Err(e) = tx.send(Vec::new()) {
666                        log::error!(
667                            "Failed to send empty bars for instrument {instrument_id}: {e:?}"
668                        );
669                    }
670                }
671            }
672        });
673        Ok(rx.recv()?)
674    }
675
676    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
677        let query = DatabaseQuery::AddSignal(signal.to_owned());
678        self.tx.send(query).map_err(|e| {
679            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
680        })
681    }
682
683    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
684        let pool = self.pool.clone();
685        let name = name.to_owned();
686        let (tx, rx) = std::sync::mpsc::channel();
687        tokio::spawn(async move {
688            let result = DatabaseQueries::load_signals(&pool, &name).await;
689            match result {
690                Ok(signals) => {
691                    if let Err(e) = tx.send(signals) {
692                        log::error!("Failed to send signals for '{name}': {e:?}");
693                    }
694                }
695                Err(e) => {
696                    log::error!("Failed to load signals for '{name}': {e:?}");
697                    if let Err(e) = tx.send(Vec::new()) {
698                        log::error!("Failed to send empty signals for '{name}': {e:?}");
699                    }
700                }
701            }
702        });
703        Ok(rx.recv()?)
704    }
705
706    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
707        let query = DatabaseQuery::AddCustom(data.to_owned());
708        self.tx.send(query).map_err(|e| {
709            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
710        })
711    }
712
713    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
714        let pool = self.pool.clone();
715        let data_type = data_type.to_owned();
716        let (tx, rx) = std::sync::mpsc::channel();
717        tokio::spawn(async move {
718            let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
719            match result {
720                Ok(signals) => {
721                    if let Err(e) = tx.send(signals) {
722                        log::error!("Failed to send custom data for '{data_type}': {e:?}");
723                    }
724                }
725                Err(e) => {
726                    log::error!("Failed to load custom data for '{data_type}': {e:?}");
727                    if let Err(e) = tx.send(Vec::new()) {
728                        log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
729                    }
730                }
731            }
732        });
733        Ok(rx.recv()?)
734    }
735
736    fn load_order_snapshot(
737        &self,
738        client_order_id: &ClientOrderId,
739    ) -> anyhow::Result<Option<OrderSnapshot>> {
740        let pool = self.pool.clone();
741        let client_order_id = client_order_id.to_owned();
742        let (tx, rx) = std::sync::mpsc::channel();
743        tokio::spawn(async move {
744            let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
745            match result {
746                Ok(snapshot) => {
747                    if let Err(e) = tx.send(snapshot) {
748                        log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
749                    }
750                }
751                Err(e) => {
752                    log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
753                    if let Err(e) = tx.send(None) {
754                        log::error!(
755                            "Failed to send None for order snapshot {client_order_id}: {e:?}"
756                        );
757                    }
758                }
759            }
760        });
761        Ok(rx.recv()?)
762    }
763
764    fn load_position_snapshot(
765        &self,
766        position_id: &PositionId,
767    ) -> anyhow::Result<Option<PositionSnapshot>> {
768        let pool = self.pool.clone();
769        let position_id = position_id.to_owned();
770        let (tx, rx) = std::sync::mpsc::channel();
771        tokio::spawn(async move {
772            let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
773            match result {
774                Ok(snapshot) => {
775                    if let Err(e) = tx.send(snapshot) {
776                        log::error!("Failed to send position snapshot {position_id}: {e:?}");
777                    }
778                }
779                Err(e) => {
780                    log::error!("Failed to load position snapshot {position_id}: {e:?}");
781                    if let Err(e) = tx.send(None) {
782                        log::error!(
783                            "Failed to send None for position snapshot {position_id}: {e:?}"
784                        );
785                    }
786                }
787            }
788        });
789        Ok(rx.recv()?)
790    }
791
792    fn index_venue_order_id(
793        &self,
794        client_order_id: ClientOrderId,
795        venue_order_id: VenueOrderId,
796    ) -> anyhow::Result<()> {
797        todo!()
798    }
799
800    fn index_order_position(
801        &self,
802        client_order_id: ClientOrderId,
803        position_id: PositionId,
804    ) -> anyhow::Result<()> {
805        todo!()
806    }
807
808    fn update_actor(&self) -> anyhow::Result<()> {
809        todo!()
810    }
811
812    fn update_strategy(&self) -> anyhow::Result<()> {
813        todo!()
814    }
815
816    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
817        let query = DatabaseQuery::AddAccount(account.clone(), true);
818        self.tx.send(query).map_err(|e| {
819            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
820        })
821    }
822
823    fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
824        let query = DatabaseQuery::UpdateOrder(event.clone());
825        self.tx.send(query).map_err(|e| {
826            anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
827        })
828    }
829
830    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
831        todo!()
832    }
833
834    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
835        todo!()
836    }
837
838    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
839        todo!()
840    }
841
842    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
843        todo!()
844    }
845}
846
847async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
848    for cmd in buffer.drain(..) {
849        let result: anyhow::Result<()> = match cmd {
850            DatabaseQuery::Close => Ok(()),
851            DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value).await,
852            DatabaseQuery::AddCurrency(currency) => {
853                DatabaseQueries::add_currency(pool, currency).await
854            }
855            DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
856                InstrumentAny::Betting(instrument) => {
857                    DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
858                }
859                InstrumentAny::BinaryOption(instrument) => {
860                    DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
861                        .await
862                }
863                InstrumentAny::CryptoFuture(instrument) => {
864                    DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
865                        .await
866                }
867                InstrumentAny::CryptoPerpetual(instrument) => {
868                    DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
869                        .await
870                }
871                InstrumentAny::CurrencyPair(instrument) => {
872                    DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
873                        .await
874                }
875                InstrumentAny::Equity(equity) => {
876                    DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
877                }
878                InstrumentAny::FuturesContract(instrument) => {
879                    DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
880                        .await
881                }
882                InstrumentAny::FuturesSpread(instrument) => {
883                    DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
884                        .await
885                }
886                InstrumentAny::OptionContract(instrument) => {
887                    DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
888                        .await
889                }
890                InstrumentAny::OptionSpread(instrument) => {
891                    DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
892                        .await
893                }
894            },
895            DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
896                OrderAny::Limit(order) => {
897                    DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
898                        .await
899                }
900                OrderAny::LimitIfTouched(order) => {
901                    DatabaseQueries::add_order(
902                        pool,
903                        "LIMIT_IF_TOUCHED",
904                        updated,
905                        Box::new(order),
906                        client_id,
907                    )
908                    .await
909                }
910                OrderAny::Market(order) => {
911                    DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
912                        .await
913                }
914                OrderAny::MarketIfTouched(order) => {
915                    DatabaseQueries::add_order(
916                        pool,
917                        "MARKET_IF_TOUCHED",
918                        updated,
919                        Box::new(order),
920                        client_id,
921                    )
922                    .await
923                }
924                OrderAny::MarketToLimit(order) => {
925                    DatabaseQueries::add_order(
926                        pool,
927                        "MARKET_TO_LIMIT",
928                        updated,
929                        Box::new(order),
930                        client_id,
931                    )
932                    .await
933                }
934                OrderAny::StopLimit(order) => {
935                    DatabaseQueries::add_order(
936                        pool,
937                        "STOP_LIMIT",
938                        updated,
939                        Box::new(order),
940                        client_id,
941                    )
942                    .await
943                }
944                OrderAny::StopMarket(order) => {
945                    DatabaseQueries::add_order(
946                        pool,
947                        "STOP_MARKET",
948                        updated,
949                        Box::new(order),
950                        client_id,
951                    )
952                    .await
953                }
954                OrderAny::TrailingStopLimit(order) => {
955                    DatabaseQueries::add_order(
956                        pool,
957                        "TRAILING_STOP_LIMIT",
958                        updated,
959                        Box::new(order),
960                        client_id,
961                    )
962                    .await
963                }
964                OrderAny::TrailingStopMarket(order) => {
965                    DatabaseQueries::add_order(
966                        pool,
967                        "TRAILING_STOP_MARKET",
968                        updated,
969                        Box::new(order),
970                        client_id,
971                    )
972                    .await
973                }
974            },
975            DatabaseQuery::AddOrderSnapshot(snapshot) => {
976                DatabaseQueries::add_order_snapshot(pool, snapshot).await
977            }
978            DatabaseQuery::AddPositionSnapshot(snapshot) => {
979                DatabaseQueries::add_position_snapshot(pool, snapshot).await
980            }
981            DatabaseQuery::AddAccount(account_any, updated) => match account_any {
982                AccountAny::Cash(account) => {
983                    DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
984                }
985                AccountAny::Margin(account) => {
986                    DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
987                }
988            },
989            DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
990            DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
991            DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, &quote).await,
992            DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
993            DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
994            DatabaseQuery::UpdateOrder(event) => {
995                DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
996            }
997        };
998
999        if let Err(e) = result {
1000            tracing::error!("Error on query: {e:?}");
1001        }
1002    }
1003}