nautilus_infrastructure/sql/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::{HashMap, VecDeque},
18    time::{Duration, Instant},
19};
20
21use bytes::Bytes;
22use nautilus_common::{
23    cache::database::{CacheDatabaseAdapter, CacheMap},
24    custom::CustomData,
25    runtime::get_runtime,
26    signal::Signal,
27};
28use nautilus_core::UnixNanos;
29use nautilus_model::{
30    accounts::AccountAny,
31    data::{Bar, DataType, QuoteTick, TradeTick},
32    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
33    identifiers::{
34        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
35        VenueOrderId,
36    },
37    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
38    orderbook::OrderBook,
39    orders::{Order, OrderAny},
40    position::Position,
41    types::Currency,
42};
43use sqlx::{PgPool, postgres::PgConnectOptions};
44use tokio::try_join;
45use ustr::Ustr;
46
47use crate::sql::{
48    pg::{connect_pg, get_postgres_connect_options},
49    queries::DatabaseQueries,
50};
51
52#[derive(Debug)]
53#[cfg_attr(
54    feature = "python",
55    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
56)]
57pub struct PostgresCacheDatabase {
58    pub pool: PgPool,
59    tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
60    handle: tokio::task::JoinHandle<()>,
61}
62
63#[allow(clippy::large_enum_variant)]
64#[derive(Debug, Clone)]
65pub enum DatabaseQuery {
66    Close,
67    Add(String, Vec<u8>),
68    AddCurrency(Currency),
69    AddInstrument(InstrumentAny),
70    AddOrder(OrderAny, Option<ClientId>, bool),
71    AddOrderSnapshot(OrderSnapshot),
72    AddPositionSnapshot(PositionSnapshot),
73    AddAccount(AccountAny, bool),
74    AddSignal(Signal),
75    AddCustom(CustomData),
76    AddQuote(QuoteTick),
77    AddTrade(TradeTick),
78    AddBar(Bar),
79    UpdateOrder(OrderEventAny),
80}
81
82impl PostgresCacheDatabase {
83    pub async fn connect(
84        host: Option<String>,
85        port: Option<u16>,
86        username: Option<String>,
87        password: Option<String>,
88        database: Option<String>,
89    ) -> Result<Self, sqlx::Error> {
90        let pg_connect_options =
91            get_postgres_connect_options(host, port, username, password, database);
92        let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
93        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
94
95        // Spawn a task to handle messages
96        let handle = tokio::spawn(async move {
97            PostgresCacheDatabase::process_commands(rx, pg_connect_options.clone().into()).await;
98        });
99        Ok(PostgresCacheDatabase { pool, tx, handle })
100    }
101
102    async fn process_commands(
103        mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
104        pg_connect_options: PgConnectOptions,
105    ) {
106        tracing::debug!("Starting cache processing");
107
108        let pool = connect_pg(pg_connect_options).await.unwrap();
109
110        // Buffering
111        let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
112        let mut last_drain = Instant::now();
113
114        // TODO: Add `buffer_interval_ms` to config, setting this above 0 currently fails tests
115        let buffer_interval = Duration::from_millis(0);
116
117        // Continue to receive and handle messages until channel is hung up
118        loop {
119            if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
120                drain_buffer(&pool, &mut buffer).await;
121                last_drain = Instant::now();
122            } else {
123                match rx.recv().await {
124                    Some(msg) => {
125                        tracing::debug!("Received {msg:?}");
126                        match msg {
127                            DatabaseQuery::Close => break,
128                            _ => buffer.push_back(msg),
129                        }
130                    }
131                    None => {
132                        tracing::debug!("Command channel closed");
133                        break;
134                    }
135                }
136            }
137        }
138
139        // Drain any remaining message
140        if !buffer.is_empty() {
141            drain_buffer(&pool, &mut buffer).await;
142        }
143
144        tracing::debug!("Stopped cache processing");
145    }
146}
147
148pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
149    let connect_options = get_postgres_connect_options(None, None, None, None, None);
150    Ok(PostgresCacheDatabase::connect(
151        Some(connect_options.host),
152        Some(connect_options.port),
153        Some(connect_options.username),
154        Some(connect_options.password),
155        Some(connect_options.database),
156    )
157    .await?)
158}
159
160#[allow(dead_code)]
161#[allow(unused)]
162#[async_trait::async_trait]
163impl CacheDatabaseAdapter for PostgresCacheDatabase {
164    fn close(&mut self) -> anyhow::Result<()> {
165        let pool = self.pool.clone();
166        let (tx, rx) = std::sync::mpsc::channel();
167
168        log::debug!("Closing connection pool");
169        tokio::task::block_in_place(|| {
170            get_runtime().block_on(async {
171                pool.close().await;
172                if let Err(e) = tx.send(()) {
173                    log::error!("Error closing pool: {e:?}");
174                }
175            });
176        });
177
178        // Cancel message handling task
179        if let Err(e) = self.tx.send(DatabaseQuery::Close) {
180            log::error!("Error sending close: {e:?}");
181        }
182
183        log::debug!("Awaiting task 'cache-write'"); // Naming tasks will soon be stablized
184        tokio::task::block_in_place(|| {
185            if let Err(e) = get_runtime().block_on(&mut self.handle) {
186                log::error!("Error awaiting task 'cache-write': {e:?}");
187            }
188        });
189
190        log::debug!("Closed");
191
192        Ok(rx.recv()?)
193    }
194
195    fn flush(&mut self) -> anyhow::Result<()> {
196        let pool = self.pool.clone();
197        let (tx, rx) = std::sync::mpsc::channel();
198
199        tokio::task::block_in_place(|| {
200            get_runtime().block_on(async {
201                if let Err(e) = DatabaseQueries::truncate(&pool).await {
202                    log::error!("Error flushing pool: {e:?}");
203                }
204                if let Err(e) = tx.send(()) {
205                    log::error!("Error sending flush result: {e:?}");
206                }
207            });
208        });
209
210        Ok(rx.recv()?)
211    }
212
213    async fn load_all(&self) -> anyhow::Result<CacheMap> {
214        let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
215            self.load_currencies(),
216            self.load_instruments(),
217            self.load_synthetics(),
218            self.load_accounts(),
219            self.load_orders(),
220            self.load_positions()
221        )
222        .map_err(|e| anyhow::anyhow!("Error loading cache data: {}", e))?;
223
224        Ok(CacheMap {
225            currencies,
226            instruments,
227            synthetics,
228            accounts,
229            orders,
230            positions,
231        })
232    }
233
234    fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
235        let pool = self.pool.clone();
236        let (tx, rx) = std::sync::mpsc::channel();
237        tokio::spawn(async move {
238            let result = DatabaseQueries::load(&pool).await;
239            match result {
240                Ok(items) => {
241                    let mapping = items
242                        .into_iter()
243                        .map(|(k, v)| (k, Bytes::from(v)))
244                        .collect();
245                    if let Err(e) = tx.send(mapping) {
246                        log::error!("Failed to send general items: {e:?}");
247                    }
248                }
249                Err(e) => {
250                    log::error!("Failed to load general items: {e:?}");
251                    if let Err(e) = tx.send(HashMap::new()) {
252                        log::error!("Failed to send empty general items: {e:?}");
253                    }
254                }
255            }
256        });
257        Ok(rx.recv()?)
258    }
259
260    async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>> {
261        let pool = self.pool.clone();
262        let (tx, rx) = std::sync::mpsc::channel();
263        tokio::spawn(async move {
264            let result = DatabaseQueries::load_currencies(&pool).await;
265            match result {
266                Ok(currencies) => {
267                    let mapping = currencies
268                        .into_iter()
269                        .map(|currency| (currency.code, currency))
270                        .collect();
271                    if let Err(e) = tx.send(mapping) {
272                        log::error!("Failed to send currencies: {e:?}");
273                    }
274                }
275                Err(e) => {
276                    log::error!("Failed to load currencies: {e:?}");
277                    if let Err(e) = tx.send(HashMap::new()) {
278                        log::error!("Failed to send empty currencies: {e:?}");
279                    }
280                }
281            }
282        });
283        Ok(rx.recv()?)
284    }
285
286    async fn load_instruments(&self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
287        let pool = self.pool.clone();
288        let (tx, rx) = std::sync::mpsc::channel();
289        tokio::spawn(async move {
290            let result = DatabaseQueries::load_instruments(&pool).await;
291            match result {
292                Ok(instruments) => {
293                    let mapping = instruments
294                        .into_iter()
295                        .map(|instrument| (instrument.id(), instrument))
296                        .collect();
297                    if let Err(e) = tx.send(mapping) {
298                        log::error!("Failed to send instruments: {e:?}");
299                    }
300                }
301                Err(e) => {
302                    log::error!("Failed to load instruments: {e:?}");
303                    if let Err(e) = tx.send(HashMap::new()) {
304                        log::error!("Failed to send empty instruments: {e:?}");
305                    }
306                }
307            }
308        });
309        Ok(rx.recv()?)
310    }
311
312    async fn load_synthetics(&self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
313        todo!()
314    }
315
316    async fn load_accounts(&self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
317        let pool = self.pool.clone();
318        let (tx, rx) = std::sync::mpsc::channel();
319        tokio::spawn(async move {
320            let result = DatabaseQueries::load_accounts(&pool).await;
321            match result {
322                Ok(accounts) => {
323                    let mapping = accounts
324                        .into_iter()
325                        .map(|account| (account.id(), account))
326                        .collect();
327                    if let Err(e) = tx.send(mapping) {
328                        log::error!("Failed to send accounts: {e:?}");
329                    }
330                }
331                Err(e) => {
332                    log::error!("Failed to load accounts: {e:?}");
333                    if let Err(e) = tx.send(HashMap::new()) {
334                        log::error!("Failed to send empty accounts: {e:?}");
335                    }
336                }
337            }
338        });
339        Ok(rx.recv()?)
340    }
341
342    async fn load_orders(&self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
343        let pool = self.pool.clone();
344        let (tx, rx) = std::sync::mpsc::channel();
345        tokio::spawn(async move {
346            let result = DatabaseQueries::load_orders(&pool).await;
347            match result {
348                Ok(orders) => {
349                    let mapping = orders
350                        .into_iter()
351                        .map(|order| (order.client_order_id(), order))
352                        .collect();
353                    if let Err(e) = tx.send(mapping) {
354                        log::error!("Failed to send orders: {e:?}");
355                    }
356                }
357                Err(e) => {
358                    log::error!("Failed to load orders: {e:?}");
359                    if let Err(e) = tx.send(HashMap::new()) {
360                        log::error!("Failed to send empty orders: {e:?}");
361                    }
362                }
363            }
364        });
365        Ok(rx.recv()?)
366    }
367
368    async fn load_positions(&self) -> anyhow::Result<HashMap<PositionId, Position>> {
369        todo!()
370    }
371
372    fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
373        todo!()
374    }
375
376    fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
377        let pool = self.pool.clone();
378        let (tx, rx) = std::sync::mpsc::channel();
379        tokio::spawn(async move {
380            let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
381            match result {
382                Ok(currency) => {
383                    if let Err(e) = tx.send(currency) {
384                        log::error!("Failed to send load_index_order_client result: {e:?}");
385                    }
386                }
387                Err(e) => {
388                    log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
389                    if let Err(e) = tx.send(HashMap::new()) {
390                        log::error!("Failed to send empty load_index_order_client result: {e:?}");
391                    }
392                }
393            }
394        });
395        Ok(rx.recv()?)
396    }
397
398    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
399        let pool = self.pool.clone();
400        let code = code.to_owned(); // Clone the code
401        let (tx, rx) = std::sync::mpsc::channel();
402        tokio::spawn(async move {
403            let result = DatabaseQueries::load_currency(&pool, &code).await;
404            match result {
405                Ok(currency) => {
406                    if let Err(e) = tx.send(currency) {
407                        log::error!("Failed to send currency {code}: {e:?}");
408                    }
409                }
410                Err(e) => {
411                    log::error!("Failed to load currency {code}: {e:?}");
412                    if let Err(e) = tx.send(None) {
413                        log::error!("Failed to send None for currency {code}: {e:?}");
414                    }
415                }
416            }
417        });
418        Ok(rx.recv()?)
419    }
420
421    async fn load_instrument(
422        &self,
423        instrument_id: &InstrumentId,
424    ) -> anyhow::Result<Option<InstrumentAny>> {
425        let pool = self.pool.clone();
426        let instrument_id = instrument_id.to_owned(); // Clone the instrument_id
427        let (tx, rx) = std::sync::mpsc::channel();
428        tokio::spawn(async move {
429            let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
430            match result {
431                Ok(instrument) => {
432                    if let Err(e) = tx.send(instrument) {
433                        log::error!("Failed to send instrument {instrument_id}: {e:?}");
434                    }
435                }
436                Err(e) => {
437                    log::error!("Failed to load instrument {instrument_id}: {e:?}");
438                    if let Err(e) = tx.send(None) {
439                        log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
440                    }
441                }
442            }
443        });
444        Ok(rx.recv()?)
445    }
446
447    async fn load_synthetic(
448        &self,
449        instrument_id: &InstrumentId,
450    ) -> anyhow::Result<Option<SyntheticInstrument>> {
451        todo!()
452    }
453
454    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
455        let pool = self.pool.clone();
456        let account_id = account_id.to_owned();
457        let (tx, rx) = std::sync::mpsc::channel();
458        tokio::spawn(async move {
459            let result = DatabaseQueries::load_account(&pool, &account_id).await;
460            match result {
461                Ok(account) => {
462                    if let Err(e) = tx.send(account) {
463                        log::error!("Failed to send account {account_id}: {e:?}");
464                    }
465                }
466                Err(e) => {
467                    log::error!("Failed to load account {account_id}: {e:?}");
468                    if let Err(e) = tx.send(None) {
469                        log::error!("Failed to send None for account {account_id}: {e:?}");
470                    }
471                }
472            }
473        });
474        Ok(rx.recv()?)
475    }
476
477    async fn load_order(
478        &self,
479        client_order_id: &ClientOrderId,
480    ) -> anyhow::Result<Option<OrderAny>> {
481        let pool = self.pool.clone();
482        let client_order_id = client_order_id.to_owned();
483        let (tx, rx) = std::sync::mpsc::channel();
484        tokio::spawn(async move {
485            let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
486            match result {
487                Ok(order) => {
488                    if let Err(e) = tx.send(order) {
489                        log::error!("Failed to send order {client_order_id}: {e:?}");
490                    }
491                }
492                Err(e) => {
493                    log::error!("Failed to load order {client_order_id}: {e:?}");
494                    let _ = tx.send(None);
495                }
496            }
497        });
498        Ok(rx.recv()?)
499    }
500
501    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
502        todo!()
503    }
504
505    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
506        todo!()
507    }
508
509    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
510        todo!()
511    }
512
513    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
514        todo!()
515    }
516
517    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
518        todo!()
519    }
520
521    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
522        let query = DatabaseQuery::Add(key, value.into());
523        self.tx
524            .send(query)
525            .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
526    }
527
528    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
529        let query = DatabaseQuery::AddCurrency(*currency);
530        self.tx.send(query).map_err(|e| {
531            anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
532        })
533    }
534
535    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
536        let query = DatabaseQuery::AddInstrument(instrument.clone());
537        self.tx.send(query).map_err(|e| {
538            anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
539        })
540    }
541
542    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
543        todo!()
544    }
545
546    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
547        let query = DatabaseQuery::AddAccount(account.clone(), false);
548        self.tx.send(query).map_err(|e| {
549            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
550        })
551    }
552
553    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
554        let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
555        self.tx.send(query).map_err(|e| {
556            anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
557        })
558    }
559
560    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
561        let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
562        self.tx.send(query).map_err(|e| {
563            anyhow::anyhow!(
564                "Failed to send query add_order_snapshot to database message handler: {e}"
565            )
566        })
567    }
568
569    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
570        todo!()
571    }
572
573    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
574        let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
575        self.tx.send(query).map_err(|e| {
576            anyhow::anyhow!(
577                "Failed to send query add_position_snapshot to database message handler: {e}"
578            )
579        })
580    }
581
582    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
583        todo!()
584    }
585
586    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
587        let query = DatabaseQuery::AddQuote(quote.to_owned());
588        self.tx.send(query).map_err(|e| {
589            anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
590        })
591    }
592
593    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
594        let pool = self.pool.clone();
595        let instrument_id = instrument_id.to_owned();
596        let (tx, rx) = std::sync::mpsc::channel();
597        tokio::spawn(async move {
598            let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
599            match result {
600                Ok(quotes) => {
601                    if let Err(e) = tx.send(quotes) {
602                        log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
603                    }
604                }
605                Err(e) => {
606                    log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
607                    if let Err(e) = tx.send(Vec::new()) {
608                        log::error!(
609                            "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
610                        );
611                    }
612                }
613            }
614        });
615        Ok(rx.recv()?)
616    }
617
618    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
619        let query = DatabaseQuery::AddTrade(trade.to_owned());
620        self.tx.send(query).map_err(|e| {
621            anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
622        })
623    }
624
625    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
626        let pool = self.pool.clone();
627        let instrument_id = instrument_id.to_owned();
628        let (tx, rx) = std::sync::mpsc::channel();
629        tokio::spawn(async move {
630            let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
631            match result {
632                Ok(trades) => {
633                    if let Err(e) = tx.send(trades) {
634                        log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
635                    }
636                }
637                Err(e) => {
638                    log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
639                    if let Err(e) = tx.send(Vec::new()) {
640                        log::error!(
641                            "Failed to send empty trades for instrument {instrument_id}: {e:?}"
642                        );
643                    }
644                }
645            }
646        });
647        Ok(rx.recv()?)
648    }
649
650    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
651        let query = DatabaseQuery::AddBar(bar.to_owned());
652        self.tx.send(query).map_err(|e| {
653            anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
654        })
655    }
656
657    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
658        let pool = self.pool.clone();
659        let instrument_id = instrument_id.to_owned();
660        let (tx, rx) = std::sync::mpsc::channel();
661        tokio::spawn(async move {
662            let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
663            match result {
664                Ok(bars) => {
665                    if let Err(e) = tx.send(bars) {
666                        log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
667                    }
668                }
669                Err(e) => {
670                    log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
671                    if let Err(e) = tx.send(Vec::new()) {
672                        log::error!(
673                            "Failed to send empty bars for instrument {instrument_id}: {e:?}"
674                        );
675                    }
676                }
677            }
678        });
679        Ok(rx.recv()?)
680    }
681
682    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
683        let query = DatabaseQuery::AddSignal(signal.to_owned());
684        self.tx.send(query).map_err(|e| {
685            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
686        })
687    }
688
689    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
690        let pool = self.pool.clone();
691        let name = name.to_owned();
692        let (tx, rx) = std::sync::mpsc::channel();
693        tokio::spawn(async move {
694            let result = DatabaseQueries::load_signals(&pool, &name).await;
695            match result {
696                Ok(signals) => {
697                    if let Err(e) = tx.send(signals) {
698                        log::error!("Failed to send signals for '{name}': {e:?}");
699                    }
700                }
701                Err(e) => {
702                    log::error!("Failed to load signals for '{name}': {e:?}");
703                    if let Err(e) = tx.send(Vec::new()) {
704                        log::error!("Failed to send empty signals for '{name}': {e:?}");
705                    }
706                }
707            }
708        });
709        Ok(rx.recv()?)
710    }
711
712    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
713        let query = DatabaseQuery::AddCustom(data.to_owned());
714        self.tx.send(query).map_err(|e| {
715            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
716        })
717    }
718
719    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
720        let pool = self.pool.clone();
721        let data_type = data_type.to_owned();
722        let (tx, rx) = std::sync::mpsc::channel();
723        tokio::spawn(async move {
724            let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
725            match result {
726                Ok(signals) => {
727                    if let Err(e) = tx.send(signals) {
728                        log::error!("Failed to send custom data for '{data_type}': {e:?}");
729                    }
730                }
731                Err(e) => {
732                    log::error!("Failed to load custom data for '{data_type}': {e:?}");
733                    if let Err(e) = tx.send(Vec::new()) {
734                        log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
735                    }
736                }
737            }
738        });
739        Ok(rx.recv()?)
740    }
741
742    fn load_order_snapshot(
743        &self,
744        client_order_id: &ClientOrderId,
745    ) -> anyhow::Result<Option<OrderSnapshot>> {
746        let pool = self.pool.clone();
747        let client_order_id = client_order_id.to_owned();
748        let (tx, rx) = std::sync::mpsc::channel();
749        tokio::spawn(async move {
750            let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
751            match result {
752                Ok(snapshot) => {
753                    if let Err(e) = tx.send(snapshot) {
754                        log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
755                    }
756                }
757                Err(e) => {
758                    log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
759                    if let Err(e) = tx.send(None) {
760                        log::error!(
761                            "Failed to send None for order snapshot {client_order_id}: {e:?}"
762                        );
763                    }
764                }
765            }
766        });
767        Ok(rx.recv()?)
768    }
769
770    fn load_position_snapshot(
771        &self,
772        position_id: &PositionId,
773    ) -> anyhow::Result<Option<PositionSnapshot>> {
774        let pool = self.pool.clone();
775        let position_id = position_id.to_owned();
776        let (tx, rx) = std::sync::mpsc::channel();
777        tokio::spawn(async move {
778            let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
779            match result {
780                Ok(snapshot) => {
781                    if let Err(e) = tx.send(snapshot) {
782                        log::error!("Failed to send position snapshot {position_id}: {e:?}");
783                    }
784                }
785                Err(e) => {
786                    log::error!("Failed to load position snapshot {position_id}: {e:?}");
787                    if let Err(e) = tx.send(None) {
788                        log::error!(
789                            "Failed to send None for position snapshot {position_id}: {e:?}"
790                        );
791                    }
792                }
793            }
794        });
795        Ok(rx.recv()?)
796    }
797
798    fn index_venue_order_id(
799        &self,
800        client_order_id: ClientOrderId,
801        venue_order_id: VenueOrderId,
802    ) -> anyhow::Result<()> {
803        todo!()
804    }
805
806    fn index_order_position(
807        &self,
808        client_order_id: ClientOrderId,
809        position_id: PositionId,
810    ) -> anyhow::Result<()> {
811        todo!()
812    }
813
814    fn update_actor(&self) -> anyhow::Result<()> {
815        todo!()
816    }
817
818    fn update_strategy(&self) -> anyhow::Result<()> {
819        todo!()
820    }
821
822    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
823        let query = DatabaseQuery::AddAccount(account.clone(), true);
824        self.tx.send(query).map_err(|e| {
825            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
826        })
827    }
828
829    fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
830        let query = DatabaseQuery::UpdateOrder(event.clone());
831        self.tx.send(query).map_err(|e| {
832            anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
833        })
834    }
835
836    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
837        todo!()
838    }
839
840    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
841        todo!()
842    }
843
844    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
845        todo!()
846    }
847
848    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
849        todo!()
850    }
851}
852
853async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
854    for cmd in buffer.drain(..) {
855        let result: anyhow::Result<()> = match cmd {
856            DatabaseQuery::Close => Ok(()),
857            DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value).await,
858            DatabaseQuery::AddCurrency(currency) => {
859                DatabaseQueries::add_currency(pool, currency).await
860            }
861            DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
862                InstrumentAny::Betting(instrument) => {
863                    DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
864                }
865                InstrumentAny::BinaryOption(instrument) => {
866                    DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
867                        .await
868                }
869                InstrumentAny::CryptoFuture(instrument) => {
870                    DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
871                        .await
872                }
873                InstrumentAny::CryptoOption(instrument) => {
874                    DatabaseQueries::add_instrument(pool, "CRYPTO_OPTION", Box::new(instrument))
875                        .await
876                }
877                InstrumentAny::CryptoPerpetual(instrument) => {
878                    DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
879                        .await
880                }
881                InstrumentAny::CurrencyPair(instrument) => {
882                    DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
883                        .await
884                }
885                InstrumentAny::Equity(equity) => {
886                    DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
887                }
888                InstrumentAny::FuturesContract(instrument) => {
889                    DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
890                        .await
891                }
892                InstrumentAny::FuturesSpread(instrument) => {
893                    DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
894                        .await
895                }
896                InstrumentAny::OptionContract(instrument) => {
897                    DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
898                        .await
899                }
900                InstrumentAny::OptionSpread(instrument) => {
901                    DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
902                        .await
903                }
904            },
905            DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
906                OrderAny::Limit(order) => {
907                    DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
908                        .await
909                }
910                OrderAny::LimitIfTouched(order) => {
911                    DatabaseQueries::add_order(
912                        pool,
913                        "LIMIT_IF_TOUCHED",
914                        updated,
915                        Box::new(order),
916                        client_id,
917                    )
918                    .await
919                }
920                OrderAny::Market(order) => {
921                    DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
922                        .await
923                }
924                OrderAny::MarketIfTouched(order) => {
925                    DatabaseQueries::add_order(
926                        pool,
927                        "MARKET_IF_TOUCHED",
928                        updated,
929                        Box::new(order),
930                        client_id,
931                    )
932                    .await
933                }
934                OrderAny::MarketToLimit(order) => {
935                    DatabaseQueries::add_order(
936                        pool,
937                        "MARKET_TO_LIMIT",
938                        updated,
939                        Box::new(order),
940                        client_id,
941                    )
942                    .await
943                }
944                OrderAny::StopLimit(order) => {
945                    DatabaseQueries::add_order(
946                        pool,
947                        "STOP_LIMIT",
948                        updated,
949                        Box::new(order),
950                        client_id,
951                    )
952                    .await
953                }
954                OrderAny::StopMarket(order) => {
955                    DatabaseQueries::add_order(
956                        pool,
957                        "STOP_MARKET",
958                        updated,
959                        Box::new(order),
960                        client_id,
961                    )
962                    .await
963                }
964                OrderAny::TrailingStopLimit(order) => {
965                    DatabaseQueries::add_order(
966                        pool,
967                        "TRAILING_STOP_LIMIT",
968                        updated,
969                        Box::new(order),
970                        client_id,
971                    )
972                    .await
973                }
974                OrderAny::TrailingStopMarket(order) => {
975                    DatabaseQueries::add_order(
976                        pool,
977                        "TRAILING_STOP_MARKET",
978                        updated,
979                        Box::new(order),
980                        client_id,
981                    )
982                    .await
983                }
984            },
985            DatabaseQuery::AddOrderSnapshot(snapshot) => {
986                DatabaseQueries::add_order_snapshot(pool, snapshot).await
987            }
988            DatabaseQuery::AddPositionSnapshot(snapshot) => {
989                DatabaseQueries::add_position_snapshot(pool, snapshot).await
990            }
991            DatabaseQuery::AddAccount(account_any, updated) => match account_any {
992                AccountAny::Cash(account) => {
993                    DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
994                }
995                AccountAny::Margin(account) => {
996                    DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
997                }
998            },
999            DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
1000            DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
1001            DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, &quote).await,
1002            DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
1003            DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
1004            DatabaseQuery::UpdateOrder(event) => {
1005                DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
1006            }
1007        };
1008
1009        if let Err(e) = result {
1010            tracing::error!("Error on query: {e:?}");
1011        }
1012    }
1013}