nautilus_infrastructure/sql/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::{HashMap, VecDeque},
18    time::{Duration, Instant},
19};
20
21use bytes::Bytes;
22use nautilus_common::{
23    cache::database::CacheDatabaseAdapter, custom::CustomData, runtime::get_runtime, signal::Signal,
24};
25use nautilus_core::UnixNanos;
26use nautilus_model::{
27    accounts::AccountAny,
28    data::{Bar, DataType, QuoteTick, TradeTick},
29    events::{position::snapshot::PositionSnapshot, OrderEventAny, OrderSnapshot},
30    identifiers::{
31        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
32        VenueOrderId,
33    },
34    instruments::{InstrumentAny, SyntheticInstrument},
35    orderbook::OrderBook,
36    orders::OrderAny,
37    position::Position,
38    types::Currency,
39};
40use sqlx::{postgres::PgConnectOptions, PgPool};
41use ustr::Ustr;
42
43use crate::sql::{
44    pg::{connect_pg, get_postgres_connect_options},
45    queries::DatabaseQueries,
46};
47
48#[derive(Debug)]
49#[cfg_attr(
50    feature = "python",
51    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
52)]
53pub struct PostgresCacheDatabase {
54    pub pool: PgPool,
55    tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
56    handle: tokio::task::JoinHandle<()>,
57}
58
59#[allow(clippy::large_enum_variant)]
60#[derive(Debug, Clone)]
61pub enum DatabaseQuery {
62    Close,
63    Add(String, Vec<u8>),
64    AddCurrency(Currency),
65    AddInstrument(InstrumentAny),
66    AddOrder(OrderAny, Option<ClientId>, bool),
67    AddOrderSnapshot(OrderSnapshot),
68    AddPositionSnapshot(PositionSnapshot),
69    AddAccount(AccountAny, bool),
70    AddSignal(Signal),
71    AddCustom(CustomData),
72    AddQuote(QuoteTick),
73    AddTrade(TradeTick),
74    AddBar(Bar),
75    UpdateOrder(OrderEventAny),
76}
77
78impl PostgresCacheDatabase {
79    pub async fn connect(
80        host: Option<String>,
81        port: Option<u16>,
82        username: Option<String>,
83        password: Option<String>,
84        database: Option<String>,
85    ) -> Result<Self, sqlx::Error> {
86        let pg_connect_options =
87            get_postgres_connect_options(host, port, username, password, database);
88        let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
89        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
90
91        // Spawn a task to handle messages
92        let handle = tokio::spawn(async move {
93            PostgresCacheDatabase::process_commands(rx, pg_connect_options.clone().into()).await;
94        });
95        Ok(PostgresCacheDatabase { pool, tx, handle })
96    }
97
98    async fn process_commands(
99        mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
100        pg_connect_options: PgConnectOptions,
101    ) {
102        tracing::debug!("Starting cache processing");
103
104        let pool = connect_pg(pg_connect_options).await.unwrap();
105
106        // Buffering
107        let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
108        let mut last_drain = Instant::now();
109
110        // TODO: Add `buffer_interval_ms` to config, setting this above 0 currently fails tests
111        let buffer_interval = Duration::from_millis(0);
112
113        // Continue to receive and handle messages until channel is hung up
114        loop {
115            if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
116                drain_buffer(&pool, &mut buffer).await;
117                last_drain = Instant::now();
118            } else {
119                match rx.recv().await {
120                    Some(msg) => match msg {
121                        DatabaseQuery::Close => break,
122                        _ => buffer.push_back(msg),
123                    },
124                    None => break, // Channel hung up
125                }
126            }
127        }
128
129        // Drain any remaining message
130        if !buffer.is_empty() {
131            drain_buffer(&pool, &mut buffer).await;
132        }
133
134        tracing::debug!("Stopped cache processing");
135    }
136}
137
138pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
139    let connect_options = get_postgres_connect_options(None, None, None, None, None);
140    Ok(PostgresCacheDatabase::connect(
141        Some(connect_options.host),
142        Some(connect_options.port),
143        Some(connect_options.username),
144        Some(connect_options.password),
145        Some(connect_options.database),
146    )
147    .await?)
148}
149
150#[allow(dead_code)]
151#[allow(unused)]
152impl CacheDatabaseAdapter for PostgresCacheDatabase {
153    fn close(&mut self) -> anyhow::Result<()> {
154        let pool = self.pool.clone();
155        let (tx, rx) = std::sync::mpsc::channel();
156
157        log::debug!("Closing connection pool");
158        tokio::task::block_in_place(|| {
159            get_runtime().block_on(async {
160                pool.close().await;
161                if let Err(e) = tx.send(()) {
162                    log::error!("Error closing pool: {e:?}");
163                }
164            });
165        });
166
167        // Cancel message handling task
168        if let Err(e) = self.tx.send(DatabaseQuery::Close) {
169            log::error!("Error sending close: {e:?}");
170        }
171
172        log::debug!("Awaiting task 'cache-write'"); // Naming tasks will soon be stablized
173        tokio::task::block_in_place(|| {
174            if let Err(e) = get_runtime().block_on(&mut self.handle) {
175                log::error!("Error awaiting task 'cache-write': {e:?}");
176            }
177        });
178
179        log::debug!("Closed");
180
181        Ok(rx.recv()?)
182    }
183
184    fn flush(&mut self) -> anyhow::Result<()> {
185        let pool = self.pool.clone();
186        let (tx, rx) = std::sync::mpsc::channel();
187
188        tokio::task::block_in_place(|| {
189            get_runtime().block_on(async {
190                if let Err(e) = DatabaseQueries::truncate(&pool).await {
191                    log::error!("Error flushing pool: {e:?}");
192                }
193                if let Err(e) = tx.send(()) {
194                    log::error!("Error sending flush result: {e:?}");
195                }
196            });
197        });
198
199        Ok(rx.recv()?)
200    }
201
202    fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
203        let pool = self.pool.clone();
204        let (tx, rx) = std::sync::mpsc::channel();
205        tokio::spawn(async move {
206            let result = DatabaseQueries::load(&pool).await;
207            match result {
208                Ok(items) => {
209                    let mapping = items
210                        .into_iter()
211                        .map(|(k, v)| (k, Bytes::from(v)))
212                        .collect();
213                    if let Err(e) = tx.send(mapping) {
214                        log::error!("Failed to send general items: {e:?}");
215                    }
216                }
217                Err(e) => {
218                    log::error!("Failed to load general items: {e:?}");
219                    if let Err(e) = tx.send(HashMap::new()) {
220                        log::error!("Failed to send empty general items: {e:?}");
221                    }
222                }
223            }
224        });
225        Ok(rx.recv()?)
226    }
227
228    fn load_currencies(&mut self) -> anyhow::Result<HashMap<Ustr, Currency>> {
229        let pool = self.pool.clone();
230        let (tx, rx) = std::sync::mpsc::channel();
231        tokio::spawn(async move {
232            let result = DatabaseQueries::load_currencies(&pool).await;
233            match result {
234                Ok(currencies) => {
235                    let mapping = currencies
236                        .into_iter()
237                        .map(|currency| (currency.code, currency))
238                        .collect();
239                    if let Err(e) = tx.send(mapping) {
240                        log::error!("Failed to send currencies: {e:?}");
241                    }
242                }
243                Err(e) => {
244                    log::error!("Failed to load currencies: {e:?}");
245                    if let Err(e) = tx.send(HashMap::new()) {
246                        log::error!("Failed to send empty currencies: {e:?}");
247                    }
248                }
249            }
250        });
251        Ok(rx.recv()?)
252    }
253
254    fn load_instruments(&mut self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
255        let pool = self.pool.clone();
256        let (tx, rx) = std::sync::mpsc::channel();
257        tokio::spawn(async move {
258            let result = DatabaseQueries::load_instruments(&pool).await;
259            match result {
260                Ok(instruments) => {
261                    let mapping = instruments
262                        .into_iter()
263                        .map(|instrument| (instrument.id(), instrument))
264                        .collect();
265                    if let Err(e) = tx.send(mapping) {
266                        log::error!("Failed to send instruments: {e:?}");
267                    }
268                }
269                Err(e) => {
270                    log::error!("Failed to load instruments: {e:?}");
271                    if let Err(e) = tx.send(HashMap::new()) {
272                        log::error!("Failed to send empty instruments: {e:?}");
273                    }
274                }
275            }
276        });
277        Ok(rx.recv()?)
278    }
279
280    fn load_synthetics(&mut self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
281        todo!()
282    }
283
284    fn load_accounts(&mut self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
285        let pool = self.pool.clone();
286        let (tx, rx) = std::sync::mpsc::channel();
287        tokio::spawn(async move {
288            let result = DatabaseQueries::load_accounts(&pool).await;
289            match result {
290                Ok(accounts) => {
291                    let mapping = accounts
292                        .into_iter()
293                        .map(|account| (account.id(), account))
294                        .collect();
295                    if let Err(e) = tx.send(mapping) {
296                        log::error!("Failed to send accounts: {e:?}");
297                    }
298                }
299                Err(e) => {
300                    log::error!("Failed to load accounts: {e:?}");
301                    if let Err(e) = tx.send(HashMap::new()) {
302                        log::error!("Failed to send empty accounts: {e:?}");
303                    }
304                }
305            }
306        });
307        Ok(rx.recv()?)
308    }
309
310    fn load_orders(&mut self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
311        let pool = self.pool.clone();
312        let (tx, rx) = std::sync::mpsc::channel();
313        tokio::spawn(async move {
314            let result = DatabaseQueries::load_orders(&pool).await;
315            match result {
316                Ok(orders) => {
317                    let mapping = orders
318                        .into_iter()
319                        .map(|order| (order.client_order_id(), order))
320                        .collect();
321                    if let Err(e) = tx.send(mapping) {
322                        log::error!("Failed to send orders: {e:?}");
323                    }
324                }
325                Err(e) => {
326                    log::error!("Failed to load orders: {e:?}");
327                    if let Err(e) = tx.send(HashMap::new()) {
328                        log::error!("Failed to send empty orders: {e:?}");
329                    }
330                }
331            }
332        });
333        Ok(rx.recv()?)
334    }
335
336    fn load_positions(&mut self) -> anyhow::Result<HashMap<PositionId, Position>> {
337        todo!()
338    }
339
340    fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
341        todo!()
342    }
343
344    fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
345        let pool = self.pool.clone();
346        let (tx, rx) = std::sync::mpsc::channel();
347        tokio::spawn(async move {
348            let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
349            match result {
350                Ok(currency) => {
351                    if let Err(e) = tx.send(currency) {
352                        log::error!("Failed to send load_index_order_client result: {e:?}");
353                    }
354                }
355                Err(e) => {
356                    log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
357                    if let Err(e) = tx.send(HashMap::new()) {
358                        log::error!("Failed to send empty load_index_order_client result: {e:?}");
359                    }
360                }
361            }
362        });
363        Ok(rx.recv()?)
364    }
365
366    fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
367        let pool = self.pool.clone();
368        let code = code.to_owned(); // Clone the code
369        let (tx, rx) = std::sync::mpsc::channel();
370        tokio::spawn(async move {
371            let result = DatabaseQueries::load_currency(&pool, &code).await;
372            match result {
373                Ok(currency) => {
374                    if let Err(e) = tx.send(currency) {
375                        log::error!("Failed to send currency {code}: {e:?}");
376                    }
377                }
378                Err(e) => {
379                    log::error!("Failed to load currency {code}: {e:?}");
380                    if let Err(e) = tx.send(None) {
381                        log::error!("Failed to send None for currency {code}: {e:?}");
382                    }
383                }
384            }
385        });
386        Ok(rx.recv()?)
387    }
388
389    fn load_instrument(
390        &self,
391        instrument_id: &InstrumentId,
392    ) -> anyhow::Result<Option<InstrumentAny>> {
393        let pool = self.pool.clone();
394        let instrument_id = instrument_id.to_owned(); // Clone the instrument_id
395        let (tx, rx) = std::sync::mpsc::channel();
396        tokio::spawn(async move {
397            let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
398            match result {
399                Ok(instrument) => {
400                    if let Err(e) = tx.send(instrument) {
401                        log::error!("Failed to send instrument {instrument_id}: {e:?}");
402                    }
403                }
404                Err(e) => {
405                    log::error!("Failed to load instrument {instrument_id}: {e:?}");
406                    if let Err(e) = tx.send(None) {
407                        log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
408                    }
409                }
410            }
411        });
412        Ok(rx.recv()?)
413    }
414
415    fn load_synthetic(&self, instrument_id: &InstrumentId) -> anyhow::Result<SyntheticInstrument> {
416        todo!()
417    }
418
419    fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
420        let pool = self.pool.clone();
421        let account_id = account_id.to_owned();
422        let (tx, rx) = std::sync::mpsc::channel();
423        tokio::spawn(async move {
424            let result = DatabaseQueries::load_account(&pool, &account_id).await;
425            match result {
426                Ok(account) => {
427                    if let Err(e) = tx.send(account) {
428                        log::error!("Failed to send account {account_id}: {e:?}");
429                    }
430                }
431                Err(e) => {
432                    log::error!("Failed to load account {account_id}: {e:?}");
433                    if let Err(e) = tx.send(None) {
434                        log::error!("Failed to send None for account {account_id}: {e:?}");
435                    }
436                }
437            }
438        });
439        Ok(rx.recv()?)
440    }
441
442    fn load_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<Option<OrderAny>> {
443        let pool = self.pool.clone();
444        let client_order_id = client_order_id.to_owned();
445        let (tx, rx) = std::sync::mpsc::channel();
446        tokio::spawn(async move {
447            let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
448            match result {
449                Ok(order) => {
450                    if let Err(e) = tx.send(order) {
451                        log::error!("Failed to send order {client_order_id}: {e:?}");
452                    }
453                }
454                Err(e) => {
455                    log::error!("Failed to load order {client_order_id}: {e:?}");
456                    let _ = tx.send(None);
457                }
458            }
459        });
460        Ok(rx.recv()?)
461    }
462
463    fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Position> {
464        todo!()
465    }
466
467    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
468        todo!()
469    }
470
471    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
472        todo!()
473    }
474
475    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
476        todo!()
477    }
478
479    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
480        todo!()
481    }
482
483    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
484        let query = DatabaseQuery::Add(key, value.into());
485        self.tx
486            .send(query)
487            .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
488    }
489
490    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
491        let query = DatabaseQuery::AddCurrency(*currency);
492        self.tx.send(query).map_err(|e| {
493            anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
494        })
495    }
496
497    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
498        let query = DatabaseQuery::AddInstrument(instrument.clone());
499        self.tx.send(query).map_err(|e| {
500            anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
501        })
502    }
503
504    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
505        todo!()
506    }
507
508    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
509        let query = DatabaseQuery::AddAccount(account.clone(), false);
510        self.tx.send(query).map_err(|e| {
511            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
512        })
513    }
514
515    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
516        let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
517        self.tx.send(query).map_err(|e| {
518            anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
519        })
520    }
521
522    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
523        let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
524        self.tx.send(query).map_err(|e| {
525            anyhow::anyhow!(
526                "Failed to send query add_order_snapshot to database message handler: {e}"
527            )
528        })
529    }
530
531    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
532        todo!()
533    }
534
535    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
536        let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
537        self.tx.send(query).map_err(|e| {
538            anyhow::anyhow!(
539                "Failed to send query add_position_snapshot to database message handler: {e}"
540            )
541        })
542    }
543
544    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
545        todo!()
546    }
547
548    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
549        let query = DatabaseQuery::AddQuote(quote.to_owned());
550        self.tx.send(query).map_err(|e| {
551            anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
552        })
553    }
554
555    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
556        let pool = self.pool.clone();
557        let instrument_id = instrument_id.to_owned();
558        let (tx, rx) = std::sync::mpsc::channel();
559        tokio::spawn(async move {
560            let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
561            match result {
562                Ok(quotes) => {
563                    if let Err(e) = tx.send(quotes) {
564                        log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
565                    }
566                }
567                Err(e) => {
568                    log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
569                    if let Err(e) = tx.send(Vec::new()) {
570                        log::error!(
571                            "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
572                        );
573                    }
574                }
575            }
576        });
577        Ok(rx.recv()?)
578    }
579
580    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
581        let query = DatabaseQuery::AddTrade(trade.to_owned());
582        self.tx.send(query).map_err(|e| {
583            anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
584        })
585    }
586
587    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
588        let pool = self.pool.clone();
589        let instrument_id = instrument_id.to_owned();
590        let (tx, rx) = std::sync::mpsc::channel();
591        tokio::spawn(async move {
592            let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
593            match result {
594                Ok(trades) => {
595                    if let Err(e) = tx.send(trades) {
596                        log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
597                    }
598                }
599                Err(e) => {
600                    log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
601                    if let Err(e) = tx.send(Vec::new()) {
602                        log::error!(
603                            "Failed to send empty trades for instrument {instrument_id}: {e:?}"
604                        );
605                    }
606                }
607            }
608        });
609        Ok(rx.recv()?)
610    }
611
612    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
613        let query = DatabaseQuery::AddBar(bar.to_owned());
614        self.tx.send(query).map_err(|e| {
615            anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
616        })
617    }
618
619    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
620        let pool = self.pool.clone();
621        let instrument_id = instrument_id.to_owned();
622        let (tx, rx) = std::sync::mpsc::channel();
623        tokio::spawn(async move {
624            let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
625            match result {
626                Ok(bars) => {
627                    if let Err(e) = tx.send(bars) {
628                        log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
629                    }
630                }
631                Err(e) => {
632                    log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
633                    if let Err(e) = tx.send(Vec::new()) {
634                        log::error!(
635                            "Failed to send empty bars for instrument {instrument_id}: {e:?}"
636                        );
637                    }
638                }
639            }
640        });
641        Ok(rx.recv()?)
642    }
643
644    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
645        let query = DatabaseQuery::AddSignal(signal.to_owned());
646        self.tx.send(query).map_err(|e| {
647            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
648        })
649    }
650
651    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
652        let pool = self.pool.clone();
653        let name = name.to_owned();
654        let (tx, rx) = std::sync::mpsc::channel();
655        tokio::spawn(async move {
656            let result = DatabaseQueries::load_signals(&pool, &name).await;
657            match result {
658                Ok(signals) => {
659                    if let Err(e) = tx.send(signals) {
660                        log::error!("Failed to send signals for '{name}': {e:?}");
661                    }
662                }
663                Err(e) => {
664                    log::error!("Failed to load signals for '{name}': {e:?}");
665                    if let Err(e) = tx.send(Vec::new()) {
666                        log::error!("Failed to send empty signals for '{name}': {e:?}");
667                    }
668                }
669            }
670        });
671        Ok(rx.recv()?)
672    }
673
674    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
675        let query = DatabaseQuery::AddCustom(data.to_owned());
676        self.tx.send(query).map_err(|e| {
677            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
678        })
679    }
680
681    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
682        let pool = self.pool.clone();
683        let data_type = data_type.to_owned();
684        let (tx, rx) = std::sync::mpsc::channel();
685        tokio::spawn(async move {
686            let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
687            match result {
688                Ok(signals) => {
689                    if let Err(e) = tx.send(signals) {
690                        log::error!("Failed to send custom data for '{data_type}': {e:?}");
691                    }
692                }
693                Err(e) => {
694                    log::error!("Failed to load custom data for '{data_type}': {e:?}");
695                    if let Err(e) = tx.send(Vec::new()) {
696                        log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
697                    }
698                }
699            }
700        });
701        Ok(rx.recv()?)
702    }
703
704    fn load_order_snapshot(
705        &self,
706        client_order_id: &ClientOrderId,
707    ) -> anyhow::Result<Option<OrderSnapshot>> {
708        let pool = self.pool.clone();
709        let client_order_id = client_order_id.to_owned();
710        let (tx, rx) = std::sync::mpsc::channel();
711        tokio::spawn(async move {
712            let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
713            match result {
714                Ok(snapshot) => {
715                    if let Err(e) = tx.send(snapshot) {
716                        log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
717                    }
718                }
719                Err(e) => {
720                    log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
721                    if let Err(e) = tx.send(None) {
722                        log::error!(
723                            "Failed to send None for order snapshot {client_order_id}: {e:?}"
724                        );
725                    }
726                }
727            }
728        });
729        Ok(rx.recv()?)
730    }
731
732    fn load_position_snapshot(
733        &self,
734        position_id: &PositionId,
735    ) -> anyhow::Result<Option<PositionSnapshot>> {
736        let pool = self.pool.clone();
737        let position_id = position_id.to_owned();
738        let (tx, rx) = std::sync::mpsc::channel();
739        tokio::spawn(async move {
740            let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
741            match result {
742                Ok(snapshot) => {
743                    if let Err(e) = tx.send(snapshot) {
744                        log::error!("Failed to send position snapshot {position_id}: {e:?}");
745                    }
746                }
747                Err(e) => {
748                    log::error!("Failed to load position snapshot {position_id}: {e:?}");
749                    if let Err(e) = tx.send(None) {
750                        log::error!(
751                            "Failed to send None for position snapshot {position_id}: {e:?}"
752                        );
753                    }
754                }
755            }
756        });
757        Ok(rx.recv()?)
758    }
759
760    fn index_venue_order_id(
761        &self,
762        client_order_id: ClientOrderId,
763        venue_order_id: VenueOrderId,
764    ) -> anyhow::Result<()> {
765        todo!()
766    }
767
768    fn index_order_position(
769        &self,
770        client_order_id: ClientOrderId,
771        position_id: PositionId,
772    ) -> anyhow::Result<()> {
773        todo!()
774    }
775
776    fn update_actor(&self) -> anyhow::Result<()> {
777        todo!()
778    }
779
780    fn update_strategy(&self) -> anyhow::Result<()> {
781        todo!()
782    }
783
784    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
785        let query = DatabaseQuery::AddAccount(account.clone(), true);
786        self.tx.send(query).map_err(|e| {
787            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
788        })
789    }
790
791    fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
792        let query = DatabaseQuery::UpdateOrder(event.clone());
793        self.tx.send(query).map_err(|e| {
794            anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
795        })
796    }
797
798    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
799        todo!()
800    }
801
802    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
803        todo!()
804    }
805
806    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
807        todo!()
808    }
809
810    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
811        todo!()
812    }
813}
814
815async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
816    for cmd in buffer.drain(..) {
817        let result: anyhow::Result<()> = match cmd {
818            DatabaseQuery::Close => Ok(()),
819            DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value)
820                .await
821                .map_err(anyhow::Error::from),
822            DatabaseQuery::AddCurrency(currency) => DatabaseQueries::add_currency(pool, currency)
823                .await
824                .map_err(anyhow::Error::from),
825            DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
826                InstrumentAny::Betting(instrument) => {
827                    DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
828                }
829                InstrumentAny::BinaryOption(instrument) => {
830                    DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
831                        .await
832                }
833                InstrumentAny::CryptoFuture(instrument) => {
834                    DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
835                        .await
836                }
837                InstrumentAny::CryptoPerpetual(instrument) => {
838                    DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
839                        .await
840                }
841                InstrumentAny::CurrencyPair(instrument) => {
842                    DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
843                        .await
844                }
845                InstrumentAny::Equity(equity) => {
846                    DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
847                }
848                InstrumentAny::FuturesContract(instrument) => {
849                    DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
850                        .await
851                }
852                InstrumentAny::FuturesSpread(instrument) => {
853                    DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
854                        .await
855                }
856                InstrumentAny::OptionContract(instrument) => {
857                    DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
858                        .await
859                }
860                InstrumentAny::OptionSpread(instrument) => {
861                    DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
862                        .await
863                }
864            },
865            DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
866                OrderAny::Limit(order) => {
867                    DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
868                        .await
869                }
870                OrderAny::LimitIfTouched(order) => {
871                    DatabaseQueries::add_order(
872                        pool,
873                        "LIMIT_IF_TOUCHED",
874                        updated,
875                        Box::new(order),
876                        client_id,
877                    )
878                    .await
879                }
880                OrderAny::Market(order) => {
881                    DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
882                        .await
883                }
884                OrderAny::MarketIfTouched(order) => {
885                    DatabaseQueries::add_order(
886                        pool,
887                        "MARKET_IF_TOUCHED",
888                        updated,
889                        Box::new(order),
890                        client_id,
891                    )
892                    .await
893                }
894                OrderAny::MarketToLimit(order) => {
895                    DatabaseQueries::add_order(
896                        pool,
897                        "MARKET_TO_LIMIT",
898                        updated,
899                        Box::new(order),
900                        client_id,
901                    )
902                    .await
903                }
904                OrderAny::StopLimit(order) => {
905                    DatabaseQueries::add_order(
906                        pool,
907                        "STOP_LIMIT",
908                        updated,
909                        Box::new(order),
910                        client_id,
911                    )
912                    .await
913                }
914                OrderAny::StopMarket(order) => {
915                    DatabaseQueries::add_order(
916                        pool,
917                        "STOP_MARKET",
918                        updated,
919                        Box::new(order),
920                        client_id,
921                    )
922                    .await
923                }
924                OrderAny::TrailingStopLimit(order) => {
925                    DatabaseQueries::add_order(
926                        pool,
927                        "TRAILING_STOP_LIMIT",
928                        updated,
929                        Box::new(order),
930                        client_id,
931                    )
932                    .await
933                }
934                OrderAny::TrailingStopMarket(order) => {
935                    DatabaseQueries::add_order(
936                        pool,
937                        "TRAILING_STOP_MARKET",
938                        updated,
939                        Box::new(order),
940                        client_id,
941                    )
942                    .await
943                }
944            },
945            DatabaseQuery::AddOrderSnapshot(snapshot) => {
946                DatabaseQueries::add_order_snapshot(pool, snapshot).await
947            }
948            DatabaseQuery::AddPositionSnapshot(snapshot) => {
949                DatabaseQueries::add_position_snapshot(pool, snapshot).await
950            }
951            DatabaseQuery::AddAccount(account_any, updated) => match account_any {
952                AccountAny::Cash(account) => {
953                    DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
954                }
955                AccountAny::Margin(account) => {
956                    DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
957                }
958            },
959            DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
960            DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
961            DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, &quote).await,
962            DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
963            DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
964            DatabaseQuery::UpdateOrder(event) => {
965                DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
966            }
967        };
968
969        if let Err(e) = result {
970            tracing::error!("Error on query: {e:?}");
971        }
972    }
973}