1use std::{collections::VecDeque, time::Duration};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use nautilus_common::{
21 cache::database::{CacheDatabaseAdapter, CacheMap},
22 custom::CustomData,
23 logging::{log_task_awaiting, log_task_started, log_task_stopped},
24 runtime::get_runtime,
25 signal::Signal,
26};
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29 accounts::AccountAny,
30 data::{Bar, DataType, QuoteTick, TradeTick},
31 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
32 identifiers::{
33 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
34 VenueOrderId,
35 },
36 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
37 orderbook::OrderBook,
38 orders::{Order, OrderAny},
39 position::Position,
40 types::Currency,
41};
42use sqlx::{PgPool, postgres::PgConnectOptions};
43use tokio::{time::Instant, try_join};
44use ustr::Ustr;
45
46use crate::sql::{
47 pg::{connect_pg, get_postgres_connect_options},
48 queries::DatabaseQueries,
49};
50
51const CACHE_PROCESS: &str = "cache-process";
53
54#[derive(Debug)]
55#[cfg_attr(
56 feature = "python",
57 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
58)]
59pub struct PostgresCacheDatabase {
60 pub pool: PgPool,
61 tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
62 handle: tokio::task::JoinHandle<()>,
63}
64
65#[allow(clippy::large_enum_variant)]
66#[derive(Debug, Clone)]
67pub enum DatabaseQuery {
68 Close,
69 Add(String, Vec<u8>),
70 AddCurrency(Currency),
71 AddInstrument(InstrumentAny),
72 AddOrder(OrderAny, Option<ClientId>, bool),
73 AddOrderSnapshot(OrderSnapshot),
74 AddPositionSnapshot(PositionSnapshot),
75 AddAccount(AccountAny, bool),
76 AddSignal(Signal),
77 AddCustom(CustomData),
78 AddQuote(QuoteTick),
79 AddTrade(TradeTick),
80 AddBar(Bar),
81 UpdateOrder(OrderEventAny),
82}
83
84impl PostgresCacheDatabase {
85 pub async fn connect(
95 host: Option<String>,
96 port: Option<u16>,
97 username: Option<String>,
98 password: Option<String>,
99 database: Option<String>,
100 ) -> Result<Self, sqlx::Error> {
101 let pg_connect_options =
102 get_postgres_connect_options(host, port, username, password, database);
103 let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
104 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
105
106 let handle = tokio::spawn(async move {
108 Self::process_commands(rx, pg_connect_options.clone().into()).await;
109 });
110 Ok(Self { pool, tx, handle })
111 }
112
113 async fn process_commands(
114 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
115 pg_connect_options: PgConnectOptions,
116 ) {
117 log_task_started(CACHE_PROCESS);
118
119 let pool = connect_pg(pg_connect_options).await.unwrap();
120
121 let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
123
124 let buffer_interval = Duration::from_millis(0);
126
127 let flush_timer = tokio::time::sleep(buffer_interval);
129 tokio::pin!(flush_timer);
130
131 loop {
133 tokio::select! {
134 maybe_msg = rx.recv() => {
135 if let Some(msg) = maybe_msg {
136 tracing::debug!("Received {msg:?}");
137 if matches!(msg, DatabaseQuery::Close) {
138 break;
139 }
140 buffer.push_back(msg);
141
142 if buffer_interval.is_zero() {
144 drain_buffer(&pool, &mut buffer).await;
145 }
146 } else {
147 tracing::debug!("Command channel closed");
148 break;
149 }
150 }
151 () = &mut flush_timer, if !buffer_interval.is_zero() => {
152 if !buffer.is_empty() {
153 drain_buffer(&pool, &mut buffer).await;
154 }
155
156 flush_timer.as_mut().reset(Instant::now() + buffer_interval);
157 }
158 }
159 }
160
161 if !buffer.is_empty() {
163 drain_buffer(&pool, &mut buffer).await;
164 }
165
166 log_task_stopped(CACHE_PROCESS);
167 }
168}
169
170pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
176 let connect_options = get_postgres_connect_options(None, None, None, None, None);
177 Ok(PostgresCacheDatabase::connect(
178 Some(connect_options.host),
179 Some(connect_options.port),
180 Some(connect_options.username),
181 Some(connect_options.password),
182 Some(connect_options.database),
183 )
184 .await?)
185}
186
187#[allow(dead_code)]
188#[allow(unused)]
189#[async_trait::async_trait]
190impl CacheDatabaseAdapter for PostgresCacheDatabase {
191 fn close(&mut self) -> anyhow::Result<()> {
192 let pool = self.pool.clone();
193 let (tx, rx) = std::sync::mpsc::channel();
194
195 log::debug!("Closing connection pool");
196
197 tokio::task::block_in_place(|| {
198 get_runtime().block_on(async {
199 pool.close().await;
200 if let Err(e) = tx.send(()) {
201 log::error!("Error closing pool: {e:?}");
202 }
203 });
204 });
205
206 if let Err(e) = self.tx.send(DatabaseQuery::Close) {
208 log::error!("Error sending close: {e:?}");
209 }
210
211 log_task_awaiting("cache-write");
212
213 tokio::task::block_in_place(|| {
214 if let Err(e) = get_runtime().block_on(&mut self.handle) {
215 log::error!("Error awaiting task 'cache-write': {e:?}");
216 }
217 });
218
219 log::debug!("Closed");
220
221 Ok(rx.recv()?)
222 }
223
224 fn flush(&mut self) -> anyhow::Result<()> {
225 let pool = self.pool.clone();
226 let (tx, rx) = std::sync::mpsc::channel();
227
228 tokio::task::block_in_place(|| {
229 get_runtime().block_on(async {
230 if let Err(e) = DatabaseQueries::truncate(&pool).await {
231 log::error!("Error flushing pool: {e:?}");
232 }
233 if let Err(e) = tx.send(()) {
234 log::error!("Error sending flush result: {e:?}");
235 }
236 });
237 });
238
239 Ok(rx.recv()?)
240 }
241
242 async fn load_all(&self) -> anyhow::Result<CacheMap> {
243 let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
244 self.load_currencies(),
245 self.load_instruments(),
246 self.load_synthetics(),
247 self.load_accounts(),
248 self.load_orders(),
249 self.load_positions()
250 )
251 .map_err(|e| anyhow::anyhow!("Error loading cache data: {}", e))?;
252
253 let greeks = AHashMap::new();
256 let yield_curves = AHashMap::new();
257
258 Ok(CacheMap {
259 currencies,
260 instruments,
261 synthetics,
262 accounts,
263 orders,
264 positions,
265 greeks,
266 yield_curves,
267 })
268 }
269
270 fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
271 let pool = self.pool.clone();
272 let (tx, rx) = std::sync::mpsc::channel();
273 tokio::spawn(async move {
274 let result = DatabaseQueries::load(&pool).await;
275 match result {
276 Ok(items) => {
277 let mapping = items
278 .into_iter()
279 .map(|(k, v)| (k, Bytes::from(v)))
280 .collect();
281 if let Err(e) = tx.send(mapping) {
282 log::error!("Failed to send general items: {e:?}");
283 }
284 }
285 Err(e) => {
286 log::error!("Failed to load general items: {e:?}");
287 if let Err(e) = tx.send(AHashMap::new()) {
288 log::error!("Failed to send empty general items: {e:?}");
289 }
290 }
291 }
292 });
293 Ok(rx.recv()?)
294 }
295
296 async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
297 let pool = self.pool.clone();
298 let (tx, rx) = std::sync::mpsc::channel();
299 tokio::spawn(async move {
300 let result = DatabaseQueries::load_currencies(&pool).await;
301 match result {
302 Ok(currencies) => {
303 let mapping = currencies
304 .into_iter()
305 .map(|currency| (currency.code, currency))
306 .collect();
307 if let Err(e) = tx.send(mapping) {
308 log::error!("Failed to send currencies: {e:?}");
309 }
310 }
311 Err(e) => {
312 log::error!("Failed to load currencies: {e:?}");
313 if let Err(e) = tx.send(AHashMap::new()) {
314 log::error!("Failed to send empty currencies: {e:?}");
315 }
316 }
317 }
318 });
319 Ok(rx.recv()?)
320 }
321
322 async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
323 let pool = self.pool.clone();
324 let (tx, rx) = std::sync::mpsc::channel();
325 tokio::spawn(async move {
326 let result = DatabaseQueries::load_instruments(&pool).await;
327 match result {
328 Ok(instruments) => {
329 let mapping = instruments
330 .into_iter()
331 .map(|instrument| (instrument.id(), instrument))
332 .collect();
333 if let Err(e) = tx.send(mapping) {
334 log::error!("Failed to send instruments: {e:?}");
335 }
336 }
337 Err(e) => {
338 log::error!("Failed to load instruments: {e:?}");
339 if let Err(e) = tx.send(AHashMap::new()) {
340 log::error!("Failed to send empty instruments: {e:?}");
341 }
342 }
343 }
344 });
345 Ok(rx.recv()?)
346 }
347
348 async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
349 todo!()
350 }
351
352 async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
353 let pool = self.pool.clone();
354 let (tx, rx) = std::sync::mpsc::channel();
355 tokio::spawn(async move {
356 let result = DatabaseQueries::load_accounts(&pool).await;
357 match result {
358 Ok(accounts) => {
359 let mapping = accounts
360 .into_iter()
361 .map(|account| (account.id(), account))
362 .collect();
363 if let Err(e) = tx.send(mapping) {
364 log::error!("Failed to send accounts: {e:?}");
365 }
366 }
367 Err(e) => {
368 log::error!("Failed to load accounts: {e:?}");
369 if let Err(e) = tx.send(AHashMap::new()) {
370 log::error!("Failed to send empty accounts: {e:?}");
371 }
372 }
373 }
374 });
375 Ok(rx.recv()?)
376 }
377
378 async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
379 let pool = self.pool.clone();
380 let (tx, rx) = std::sync::mpsc::channel();
381 tokio::spawn(async move {
382 let result = DatabaseQueries::load_orders(&pool).await;
383 match result {
384 Ok(orders) => {
385 let mapping = orders
386 .into_iter()
387 .map(|order| (order.client_order_id(), order))
388 .collect();
389 if let Err(e) = tx.send(mapping) {
390 log::error!("Failed to send orders: {e:?}");
391 }
392 }
393 Err(e) => {
394 log::error!("Failed to load orders: {e:?}");
395 if let Err(e) = tx.send(AHashMap::new()) {
396 log::error!("Failed to send empty orders: {e:?}");
397 }
398 }
399 }
400 });
401 Ok(rx.recv()?)
402 }
403
404 async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
405 todo!()
406 }
407
408 fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
409 todo!()
410 }
411
412 fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
413 let pool = self.pool.clone();
414 let (tx, rx) = std::sync::mpsc::channel();
415 tokio::spawn(async move {
416 let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
417 match result {
418 Ok(currency) => {
419 if let Err(e) = tx.send(currency) {
420 log::error!("Failed to send load_index_order_client result: {e:?}");
421 }
422 }
423 Err(e) => {
424 log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
425 if let Err(e) = tx.send(AHashMap::new()) {
426 log::error!("Failed to send empty load_index_order_client result: {e:?}");
427 }
428 }
429 }
430 });
431 Ok(rx.recv()?)
432 }
433
434 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
435 let pool = self.pool.clone();
436 let code = code.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
438 tokio::spawn(async move {
439 let result = DatabaseQueries::load_currency(&pool, &code).await;
440 match result {
441 Ok(currency) => {
442 if let Err(e) = tx.send(currency) {
443 log::error!("Failed to send currency {code}: {e:?}");
444 }
445 }
446 Err(e) => {
447 log::error!("Failed to load currency {code}: {e:?}");
448 if let Err(e) = tx.send(None) {
449 log::error!("Failed to send None for currency {code}: {e:?}");
450 }
451 }
452 }
453 });
454 Ok(rx.recv()?)
455 }
456
457 async fn load_instrument(
458 &self,
459 instrument_id: &InstrumentId,
460 ) -> anyhow::Result<Option<InstrumentAny>> {
461 let pool = self.pool.clone();
462 let instrument_id = instrument_id.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
464 tokio::spawn(async move {
465 let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
466 match result {
467 Ok(instrument) => {
468 if let Err(e) = tx.send(instrument) {
469 log::error!("Failed to send instrument {instrument_id}: {e:?}");
470 }
471 }
472 Err(e) => {
473 log::error!("Failed to load instrument {instrument_id}: {e:?}");
474 if let Err(e) = tx.send(None) {
475 log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
476 }
477 }
478 }
479 });
480 Ok(rx.recv()?)
481 }
482
483 async fn load_synthetic(
484 &self,
485 instrument_id: &InstrumentId,
486 ) -> anyhow::Result<Option<SyntheticInstrument>> {
487 todo!()
488 }
489
490 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
491 let pool = self.pool.clone();
492 let account_id = account_id.to_owned();
493 let (tx, rx) = std::sync::mpsc::channel();
494 tokio::spawn(async move {
495 let result = DatabaseQueries::load_account(&pool, &account_id).await;
496 match result {
497 Ok(account) => {
498 if let Err(e) = tx.send(account) {
499 log::error!("Failed to send account {account_id}: {e:?}");
500 }
501 }
502 Err(e) => {
503 log::error!("Failed to load account {account_id}: {e:?}");
504 if let Err(e) = tx.send(None) {
505 log::error!("Failed to send None for account {account_id}: {e:?}");
506 }
507 }
508 }
509 });
510 Ok(rx.recv()?)
511 }
512
513 async fn load_order(
514 &self,
515 client_order_id: &ClientOrderId,
516 ) -> anyhow::Result<Option<OrderAny>> {
517 let pool = self.pool.clone();
518 let client_order_id = client_order_id.to_owned();
519 let (tx, rx) = std::sync::mpsc::channel();
520 tokio::spawn(async move {
521 let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
522 match result {
523 Ok(order) => {
524 if let Err(e) = tx.send(order) {
525 log::error!("Failed to send order {client_order_id}: {e:?}");
526 }
527 }
528 Err(e) => {
529 log::error!("Failed to load order {client_order_id}: {e:?}");
530 let _ = tx.send(None);
531 }
532 }
533 });
534 Ok(rx.recv()?)
535 }
536
537 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
538 todo!()
539 }
540
541 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
542 todo!()
543 }
544
545 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
546 todo!()
547 }
548
549 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
550 todo!()
551 }
552
553 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
554 todo!()
555 }
556
557 fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
558 anyhow::bail!(
559 "delete_order not implemented for PostgreSQL cache adapter: {client_order_id}"
560 )
561 }
562
563 fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
564 anyhow::bail!("delete_position not implemented for PostgreSQL cache adapter: {position_id}")
565 }
566
567 fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
568 anyhow::bail!(
569 "delete_account_event not implemented for PostgreSQL cache adapter: {account_id}, {event_id}"
570 )
571 }
572
573 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
574 let query = DatabaseQuery::Add(key, value.into());
575 self.tx
576 .send(query)
577 .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
578 }
579
580 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
581 let query = DatabaseQuery::AddCurrency(*currency);
582 self.tx.send(query).map_err(|e| {
583 anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
584 })
585 }
586
587 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
588 let query = DatabaseQuery::AddInstrument(instrument.clone());
589 self.tx.send(query).map_err(|e| {
590 anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
591 })
592 }
593
594 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
595 todo!()
596 }
597
598 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
599 let query = DatabaseQuery::AddAccount(account.clone(), false);
600 self.tx.send(query).map_err(|e| {
601 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
602 })
603 }
604
605 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
606 let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
607 self.tx.send(query).map_err(|e| {
608 anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
609 })
610 }
611
612 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
613 let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
614 self.tx.send(query).map_err(|e| {
615 anyhow::anyhow!(
616 "Failed to send query add_order_snapshot to database message handler: {e}"
617 )
618 })
619 }
620
621 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
622 todo!()
623 }
624
625 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
626 let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
627 self.tx.send(query).map_err(|e| {
628 anyhow::anyhow!(
629 "Failed to send query add_position_snapshot to database message handler: {e}"
630 )
631 })
632 }
633
634 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
635 todo!()
636 }
637
638 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
639 let query = DatabaseQuery::AddQuote(quote.to_owned());
640 self.tx.send(query).map_err(|e| {
641 anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
642 })
643 }
644
645 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
646 let pool = self.pool.clone();
647 let instrument_id = instrument_id.to_owned();
648 let (tx, rx) = std::sync::mpsc::channel();
649 tokio::spawn(async move {
650 let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
651 match result {
652 Ok(quotes) => {
653 if let Err(e) = tx.send(quotes) {
654 log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
655 }
656 }
657 Err(e) => {
658 log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
659 if let Err(e) = tx.send(Vec::new()) {
660 log::error!(
661 "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
662 );
663 }
664 }
665 }
666 });
667 Ok(rx.recv()?)
668 }
669
670 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
671 let query = DatabaseQuery::AddTrade(trade.to_owned());
672 self.tx.send(query).map_err(|e| {
673 anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
674 })
675 }
676
677 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
678 let pool = self.pool.clone();
679 let instrument_id = instrument_id.to_owned();
680 let (tx, rx) = std::sync::mpsc::channel();
681 tokio::spawn(async move {
682 let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
683 match result {
684 Ok(trades) => {
685 if let Err(e) = tx.send(trades) {
686 log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
687 }
688 }
689 Err(e) => {
690 log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
691 if let Err(e) = tx.send(Vec::new()) {
692 log::error!(
693 "Failed to send empty trades for instrument {instrument_id}: {e:?}"
694 );
695 }
696 }
697 }
698 });
699 Ok(rx.recv()?)
700 }
701
702 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
703 let query = DatabaseQuery::AddBar(bar.to_owned());
704 self.tx.send(query).map_err(|e| {
705 anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
706 })
707 }
708
709 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
710 let pool = self.pool.clone();
711 let instrument_id = instrument_id.to_owned();
712 let (tx, rx) = std::sync::mpsc::channel();
713 tokio::spawn(async move {
714 let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
715 match result {
716 Ok(bars) => {
717 if let Err(e) = tx.send(bars) {
718 log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
719 }
720 }
721 Err(e) => {
722 log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
723 if let Err(e) = tx.send(Vec::new()) {
724 log::error!(
725 "Failed to send empty bars for instrument {instrument_id}: {e:?}"
726 );
727 }
728 }
729 }
730 });
731 Ok(rx.recv()?)
732 }
733
734 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
735 let query = DatabaseQuery::AddSignal(signal.to_owned());
736 self.tx.send(query).map_err(|e| {
737 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
738 })
739 }
740
741 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
742 let pool = self.pool.clone();
743 let name = name.to_owned();
744 let (tx, rx) = std::sync::mpsc::channel();
745 tokio::spawn(async move {
746 let result = DatabaseQueries::load_signals(&pool, &name).await;
747 match result {
748 Ok(signals) => {
749 if let Err(e) = tx.send(signals) {
750 log::error!("Failed to send signals for '{name}': {e:?}");
751 }
752 }
753 Err(e) => {
754 log::error!("Failed to load signals for '{name}': {e:?}");
755 if let Err(e) = tx.send(Vec::new()) {
756 log::error!("Failed to send empty signals for '{name}': {e:?}");
757 }
758 }
759 }
760 });
761 Ok(rx.recv()?)
762 }
763
764 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
765 let query = DatabaseQuery::AddCustom(data.to_owned());
766 self.tx.send(query).map_err(|e| {
767 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
768 })
769 }
770
771 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
772 let pool = self.pool.clone();
773 let data_type = data_type.to_owned();
774 let (tx, rx) = std::sync::mpsc::channel();
775 tokio::spawn(async move {
776 let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
777 match result {
778 Ok(signals) => {
779 if let Err(e) = tx.send(signals) {
780 log::error!("Failed to send custom data for '{data_type}': {e:?}");
781 }
782 }
783 Err(e) => {
784 log::error!("Failed to load custom data for '{data_type}': {e:?}");
785 if let Err(e) = tx.send(Vec::new()) {
786 log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
787 }
788 }
789 }
790 });
791 Ok(rx.recv()?)
792 }
793
794 fn load_order_snapshot(
795 &self,
796 client_order_id: &ClientOrderId,
797 ) -> anyhow::Result<Option<OrderSnapshot>> {
798 let pool = self.pool.clone();
799 let client_order_id = client_order_id.to_owned();
800 let (tx, rx) = std::sync::mpsc::channel();
801 tokio::spawn(async move {
802 let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
803 match result {
804 Ok(snapshot) => {
805 if let Err(e) = tx.send(snapshot) {
806 log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
807 }
808 }
809 Err(e) => {
810 log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
811 if let Err(e) = tx.send(None) {
812 log::error!(
813 "Failed to send None for order snapshot {client_order_id}: {e:?}"
814 );
815 }
816 }
817 }
818 });
819 Ok(rx.recv()?)
820 }
821
822 fn load_position_snapshot(
823 &self,
824 position_id: &PositionId,
825 ) -> anyhow::Result<Option<PositionSnapshot>> {
826 let pool = self.pool.clone();
827 let position_id = position_id.to_owned();
828 let (tx, rx) = std::sync::mpsc::channel();
829 tokio::spawn(async move {
830 let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
831 match result {
832 Ok(snapshot) => {
833 if let Err(e) = tx.send(snapshot) {
834 log::error!("Failed to send position snapshot {position_id}: {e:?}");
835 }
836 }
837 Err(e) => {
838 log::error!("Failed to load position snapshot {position_id}: {e:?}");
839 if let Err(e) = tx.send(None) {
840 log::error!(
841 "Failed to send None for position snapshot {position_id}: {e:?}"
842 );
843 }
844 }
845 }
846 });
847 Ok(rx.recv()?)
848 }
849
850 fn index_venue_order_id(
851 &self,
852 client_order_id: ClientOrderId,
853 venue_order_id: VenueOrderId,
854 ) -> anyhow::Result<()> {
855 todo!()
856 }
857
858 fn index_order_position(
859 &self,
860 client_order_id: ClientOrderId,
861 position_id: PositionId,
862 ) -> anyhow::Result<()> {
863 todo!()
864 }
865
866 fn update_actor(&self) -> anyhow::Result<()> {
867 todo!()
868 }
869
870 fn update_strategy(&self) -> anyhow::Result<()> {
871 todo!()
872 }
873
874 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
875 let query = DatabaseQuery::AddAccount(account.clone(), true);
876 self.tx.send(query).map_err(|e| {
877 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
878 })
879 }
880
881 fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
882 let query = DatabaseQuery::UpdateOrder(event.clone());
883 self.tx.send(query).map_err(|e| {
884 anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
885 })
886 }
887
888 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
889 todo!()
890 }
891
892 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
893 todo!()
894 }
895
896 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
897 todo!()
898 }
899
900 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
901 todo!()
902 }
903}
904
905async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
906 for cmd in buffer.drain(..) {
907 let result: anyhow::Result<()> = match cmd {
908 DatabaseQuery::Close => Ok(()),
909 DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value).await,
910 DatabaseQuery::AddCurrency(currency) => {
911 DatabaseQueries::add_currency(pool, currency).await
912 }
913 DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
914 InstrumentAny::Betting(instrument) => {
915 DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
916 }
917 InstrumentAny::BinaryOption(instrument) => {
918 DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
919 .await
920 }
921 InstrumentAny::CryptoFuture(instrument) => {
922 DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
923 .await
924 }
925 InstrumentAny::CryptoOption(instrument) => {
926 DatabaseQueries::add_instrument(pool, "CRYPTO_OPTION", Box::new(instrument))
927 .await
928 }
929 InstrumentAny::CryptoPerpetual(instrument) => {
930 DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
931 .await
932 }
933 InstrumentAny::CurrencyPair(instrument) => {
934 DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
935 .await
936 }
937 InstrumentAny::Equity(equity) => {
938 DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
939 }
940 InstrumentAny::FuturesContract(instrument) => {
941 DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
942 .await
943 }
944 InstrumentAny::FuturesSpread(instrument) => {
945 DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
946 .await
947 }
948 InstrumentAny::OptionContract(instrument) => {
949 DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
950 .await
951 }
952 InstrumentAny::OptionSpread(instrument) => {
953 DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
954 .await
955 }
956 },
957 DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
958 OrderAny::Limit(order) => {
959 DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
960 .await
961 }
962 OrderAny::LimitIfTouched(order) => {
963 DatabaseQueries::add_order(
964 pool,
965 "LIMIT_IF_TOUCHED",
966 updated,
967 Box::new(order),
968 client_id,
969 )
970 .await
971 }
972 OrderAny::Market(order) => {
973 DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
974 .await
975 }
976 OrderAny::MarketIfTouched(order) => {
977 DatabaseQueries::add_order(
978 pool,
979 "MARKET_IF_TOUCHED",
980 updated,
981 Box::new(order),
982 client_id,
983 )
984 .await
985 }
986 OrderAny::MarketToLimit(order) => {
987 DatabaseQueries::add_order(
988 pool,
989 "MARKET_TO_LIMIT",
990 updated,
991 Box::new(order),
992 client_id,
993 )
994 .await
995 }
996 OrderAny::StopLimit(order) => {
997 DatabaseQueries::add_order(
998 pool,
999 "STOP_LIMIT",
1000 updated,
1001 Box::new(order),
1002 client_id,
1003 )
1004 .await
1005 }
1006 OrderAny::StopMarket(order) => {
1007 DatabaseQueries::add_order(
1008 pool,
1009 "STOP_MARKET",
1010 updated,
1011 Box::new(order),
1012 client_id,
1013 )
1014 .await
1015 }
1016 OrderAny::TrailingStopLimit(order) => {
1017 DatabaseQueries::add_order(
1018 pool,
1019 "TRAILING_STOP_LIMIT",
1020 updated,
1021 Box::new(order),
1022 client_id,
1023 )
1024 .await
1025 }
1026 OrderAny::TrailingStopMarket(order) => {
1027 DatabaseQueries::add_order(
1028 pool,
1029 "TRAILING_STOP_MARKET",
1030 updated,
1031 Box::new(order),
1032 client_id,
1033 )
1034 .await
1035 }
1036 },
1037 DatabaseQuery::AddOrderSnapshot(snapshot) => {
1038 DatabaseQueries::add_order_snapshot(pool, snapshot).await
1039 }
1040 DatabaseQuery::AddPositionSnapshot(snapshot) => {
1041 DatabaseQueries::add_position_snapshot(pool, snapshot).await
1042 }
1043 DatabaseQuery::AddAccount(account_any, updated) => match account_any {
1044 AccountAny::Cash(account) => {
1045 DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
1046 }
1047 AccountAny::Margin(account) => {
1048 DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
1049 }
1050 },
1051 DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
1052 DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
1053 DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, "e).await,
1054 DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
1055 DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
1056 DatabaseQuery::UpdateOrder(event) => {
1057 DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
1058 }
1059 };
1060
1061 if let Err(e) = result {
1062 tracing::error!("Error on query: {e:?}");
1063 }
1064 }
1065}