1use std::{collections::VecDeque, ops::ControlFlow, pin::Pin, time::Duration};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use nautilus_common::{
21 cache::database::{CacheDatabaseAdapter, CacheMap},
22 custom::CustomData,
23 live::get_runtime,
24 logging::{log_task_awaiting, log_task_started, log_task_stopped},
25 signal::Signal,
26};
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29 accounts::AccountAny,
30 data::{Bar, DataType, FundingRateUpdate, QuoteTick, TradeTick},
31 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
32 identifiers::{
33 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
34 VenueOrderId,
35 },
36 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
37 orderbook::OrderBook,
38 orders::{Order, OrderAny},
39 position::Position,
40 types::Currency,
41};
42use sqlx::{PgPool, postgres::PgConnectOptions};
43use tokio::{time::Instant, try_join};
44use ustr::Ustr;
45
46use crate::sql::{
47 pg::{connect_pg, get_postgres_connect_options},
48 queries::DatabaseQueries,
49};
50
51const CACHE_PROCESS: &str = "cache-process";
53
54#[derive(Debug)]
55#[cfg_attr(
56 feature = "python",
57 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
58)]
59pub struct PostgresCacheDatabase {
60 pub pool: PgPool,
61 tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
62 handle: tokio::task::JoinHandle<()>,
63}
64
65#[allow(clippy::large_enum_variant)]
66#[derive(Debug, Clone)]
67pub enum DatabaseQuery {
68 Close,
69 Add(String, Vec<u8>),
70 AddCurrency(Currency),
71 AddInstrument(InstrumentAny),
72 AddOrder(OrderAny, Option<ClientId>, bool),
73 AddOrderSnapshot(OrderSnapshot),
74 AddPositionSnapshot(PositionSnapshot),
75 AddAccount(AccountAny, bool),
76 AddSignal(Signal),
77 AddCustom(CustomData),
78 AddQuote(QuoteTick),
79 AddTrade(TradeTick),
80 AddBar(Bar),
81 UpdateOrder(OrderEventAny),
82}
83
84impl PostgresCacheDatabase {
85 pub async fn connect(
95 host: Option<String>,
96 port: Option<u16>,
97 username: Option<String>,
98 password: Option<String>,
99 database: Option<String>,
100 ) -> Result<Self, sqlx::Error> {
101 let pg_connect_options =
102 get_postgres_connect_options(host, port, username, password, database);
103 let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
104 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
105
106 let handle = tokio::spawn(async move {
108 Self::process_commands(rx, pg_connect_options.clone().into()).await;
109 });
110 Ok(Self { pool, tx, handle })
111 }
112
113 async fn process_commands(
114 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
115 pg_connect_options: PgConnectOptions,
116 ) {
117 log_task_started(CACHE_PROCESS);
118
119 let pool = connect_pg(pg_connect_options).await.unwrap();
120
121 let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
123
124 let buffer_interval = Duration::from_millis(0);
126
127 let flush_timer = tokio::time::sleep(buffer_interval);
131 tokio::pin!(flush_timer);
132
133 loop {
135 tokio::select! {
136 maybe_msg = rx.recv() => {
137 let result = handle_query(
138 maybe_msg,
139 &mut buffer,
140 buffer_interval,
141 &pool,
142 ).await;
143 if result.is_break() {
144 break;
145 }
146 }
147 () = &mut flush_timer, if !buffer_interval.is_zero() => {
148 flush_buffer(&mut buffer, &pool, &mut flush_timer, buffer_interval).await;
149 }
150 }
151 }
152
153 if !buffer.is_empty() {
154 drain_buffer(&pool, &mut buffer).await;
155 }
156
157 log_task_stopped(CACHE_PROCESS);
158 }
159}
160
161async fn handle_query(
162 maybe_msg: Option<DatabaseQuery>,
163 buffer: &mut VecDeque<DatabaseQuery>,
164 buffer_interval: Duration,
165 pool: &PgPool,
166) -> ControlFlow<()> {
167 let Some(msg) = maybe_msg else {
168 log::debug!("Command channel closed");
169 return ControlFlow::Break(());
170 };
171
172 log::debug!("Received {msg:?}");
173
174 if matches!(msg, DatabaseQuery::Close) {
175 if !buffer.is_empty() {
176 drain_buffer(pool, buffer).await;
177 }
178 return ControlFlow::Break(());
179 }
180
181 buffer.push_back(msg);
182
183 if buffer_interval.is_zero() {
184 drain_buffer(pool, buffer).await;
185 }
186
187 ControlFlow::Continue(())
188}
189
190async fn flush_buffer(
191 buffer: &mut VecDeque<DatabaseQuery>,
192 pool: &PgPool,
193 flush_timer: &mut Pin<&mut tokio::time::Sleep>,
194 buffer_interval: Duration,
195) {
196 if !buffer.is_empty() {
197 drain_buffer(pool, buffer).await;
198 }
199 flush_timer.as_mut().reset(Instant::now() + buffer_interval);
200}
201
202pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
208 let connect_options = get_postgres_connect_options(None, None, None, None, None);
209 Ok(PostgresCacheDatabase::connect(
210 Some(connect_options.host),
211 Some(connect_options.port),
212 Some(connect_options.username),
213 Some(connect_options.password),
214 Some(connect_options.database),
215 )
216 .await?)
217}
218
219#[allow(dead_code)]
220#[allow(unused)]
221#[async_trait::async_trait]
222impl CacheDatabaseAdapter for PostgresCacheDatabase {
223 fn close(&mut self) -> anyhow::Result<()> {
224 let pool = self.pool.clone();
225 let (tx, rx) = std::sync::mpsc::channel();
226
227 log::debug!("Closing connection pool");
228
229 tokio::task::block_in_place(|| {
230 get_runtime().block_on(async {
231 pool.close().await;
232 if let Err(e) = tx.send(()) {
233 log::error!("Error closing pool: {e:?}");
234 }
235 });
236 });
237
238 if let Err(e) = self.tx.send(DatabaseQuery::Close) {
240 log::error!("Error sending close: {e:?}");
241 }
242
243 log_task_awaiting("cache-write");
244
245 tokio::task::block_in_place(|| {
246 if let Err(e) = get_runtime().block_on(&mut self.handle) {
247 log::error!("Error awaiting task 'cache-write': {e:?}");
248 }
249 });
250
251 log::debug!("Closed");
252
253 Ok(rx.recv()?)
254 }
255
256 fn flush(&mut self) -> anyhow::Result<()> {
257 let pool = self.pool.clone();
258 let (tx, rx) = std::sync::mpsc::channel();
259
260 tokio::task::block_in_place(|| {
261 get_runtime().block_on(async {
262 if let Err(e) = DatabaseQueries::truncate(&pool).await {
263 log::error!("Error flushing pool: {e:?}");
264 }
265 if let Err(e) = tx.send(()) {
266 log::error!("Error sending flush result: {e:?}");
267 }
268 });
269 });
270
271 Ok(rx.recv()?)
272 }
273
274 async fn load_all(&self) -> anyhow::Result<CacheMap> {
275 let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
276 self.load_currencies(),
277 self.load_instruments(),
278 self.load_synthetics(),
279 self.load_accounts(),
280 self.load_orders(),
281 self.load_positions()
282 )
283 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
284
285 let greeks = AHashMap::new();
288 let yield_curves = AHashMap::new();
289
290 Ok(CacheMap {
291 currencies,
292 instruments,
293 synthetics,
294 accounts,
295 orders,
296 positions,
297 greeks,
298 yield_curves,
299 })
300 }
301
302 fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
303 let pool = self.pool.clone();
304 let (tx, rx) = std::sync::mpsc::channel();
305
306 tokio::spawn(async move {
307 let result = DatabaseQueries::load(&pool).await;
308 match result {
309 Ok(items) => {
310 let mapping = items
311 .into_iter()
312 .map(|(k, v)| (k, Bytes::from(v)))
313 .collect();
314 if let Err(e) = tx.send(mapping) {
315 log::error!("Failed to send general items: {e:?}");
316 }
317 }
318 Err(e) => {
319 log::error!("Failed to load general items: {e:?}");
320 if let Err(e) = tx.send(AHashMap::new()) {
321 log::error!("Failed to send empty general items: {e:?}");
322 }
323 }
324 }
325 });
326 Ok(rx.recv()?)
327 }
328
329 async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
330 let pool = self.pool.clone();
331 let (tx, rx) = std::sync::mpsc::channel();
332
333 tokio::spawn(async move {
334 let result = DatabaseQueries::load_currencies(&pool).await;
335 match result {
336 Ok(currencies) => {
337 let mapping = currencies
338 .into_iter()
339 .map(|currency| (currency.code, currency))
340 .collect();
341 if let Err(e) = tx.send(mapping) {
342 log::error!("Failed to send currencies: {e:?}");
343 }
344 }
345 Err(e) => {
346 log::error!("Failed to load currencies: {e:?}");
347 if let Err(e) = tx.send(AHashMap::new()) {
348 log::error!("Failed to send empty currencies: {e:?}");
349 }
350 }
351 }
352 });
353 Ok(rx.recv()?)
354 }
355
356 async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
357 let pool = self.pool.clone();
358 let (tx, rx) = std::sync::mpsc::channel();
359
360 tokio::spawn(async move {
361 let result = DatabaseQueries::load_instruments(&pool).await;
362 match result {
363 Ok(instruments) => {
364 let mapping = instruments
365 .into_iter()
366 .map(|instrument| (instrument.id(), instrument))
367 .collect();
368 if let Err(e) = tx.send(mapping) {
369 log::error!("Failed to send instruments: {e:?}");
370 }
371 }
372 Err(e) => {
373 log::error!("Failed to load instruments: {e:?}");
374 if let Err(e) = tx.send(AHashMap::new()) {
375 log::error!("Failed to send empty instruments: {e:?}");
376 }
377 }
378 }
379 });
380 Ok(rx.recv()?)
381 }
382
383 async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
384 todo!()
385 }
386
387 async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
388 let pool = self.pool.clone();
389 let (tx, rx) = std::sync::mpsc::channel();
390
391 tokio::spawn(async move {
392 let result = DatabaseQueries::load_accounts(&pool).await;
393 match result {
394 Ok(accounts) => {
395 let mapping = accounts
396 .into_iter()
397 .map(|account| (account.id(), account))
398 .collect();
399 if let Err(e) = tx.send(mapping) {
400 log::error!("Failed to send accounts: {e:?}");
401 }
402 }
403 Err(e) => {
404 log::error!("Failed to load accounts: {e:?}");
405 if let Err(e) = tx.send(AHashMap::new()) {
406 log::error!("Failed to send empty accounts: {e:?}");
407 }
408 }
409 }
410 });
411 Ok(rx.recv()?)
412 }
413
414 async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
415 let pool = self.pool.clone();
416 let (tx, rx) = std::sync::mpsc::channel();
417
418 tokio::spawn(async move {
419 let result = DatabaseQueries::load_orders(&pool).await;
420 match result {
421 Ok(orders) => {
422 let mapping = orders
423 .into_iter()
424 .map(|order| (order.client_order_id(), order))
425 .collect();
426 if let Err(e) = tx.send(mapping) {
427 log::error!("Failed to send orders: {e:?}");
428 }
429 }
430 Err(e) => {
431 log::error!("Failed to load orders: {e:?}");
432 if let Err(e) = tx.send(AHashMap::new()) {
433 log::error!("Failed to send empty orders: {e:?}");
434 }
435 }
436 }
437 });
438 Ok(rx.recv()?)
439 }
440
441 async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
442 todo!()
443 }
444
445 fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
446 todo!()
447 }
448
449 fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
450 let pool = self.pool.clone();
451 let (tx, rx) = std::sync::mpsc::channel();
452
453 tokio::spawn(async move {
454 let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
455 match result {
456 Ok(currency) => {
457 if let Err(e) = tx.send(currency) {
458 log::error!("Failed to send load_index_order_client result: {e:?}");
459 }
460 }
461 Err(e) => {
462 log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
463 if let Err(e) = tx.send(AHashMap::new()) {
464 log::error!("Failed to send empty load_index_order_client result: {e:?}");
465 }
466 }
467 }
468 });
469 Ok(rx.recv()?)
470 }
471
472 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
473 let pool = self.pool.clone();
474 let code = code.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
476
477 tokio::spawn(async move {
478 let result = DatabaseQueries::load_currency(&pool, &code).await;
479 match result {
480 Ok(currency) => {
481 if let Err(e) = tx.send(currency) {
482 log::error!("Failed to send currency {code}: {e:?}");
483 }
484 }
485 Err(e) => {
486 log::error!("Failed to load currency {code}: {e:?}");
487 if let Err(e) = tx.send(None) {
488 log::error!("Failed to send None for currency {code}: {e:?}");
489 }
490 }
491 }
492 });
493 Ok(rx.recv()?)
494 }
495
496 async fn load_instrument(
497 &self,
498 instrument_id: &InstrumentId,
499 ) -> anyhow::Result<Option<InstrumentAny>> {
500 let pool = self.pool.clone();
501 let instrument_id = instrument_id.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
503
504 tokio::spawn(async move {
505 let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
506 match result {
507 Ok(instrument) => {
508 if let Err(e) = tx.send(instrument) {
509 log::error!("Failed to send instrument {instrument_id}: {e:?}");
510 }
511 }
512 Err(e) => {
513 log::error!("Failed to load instrument {instrument_id}: {e:?}");
514 if let Err(e) = tx.send(None) {
515 log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
516 }
517 }
518 }
519 });
520 Ok(rx.recv()?)
521 }
522
523 async fn load_synthetic(
524 &self,
525 instrument_id: &InstrumentId,
526 ) -> anyhow::Result<Option<SyntheticInstrument>> {
527 todo!()
528 }
529
530 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
531 let pool = self.pool.clone();
532 let account_id = account_id.to_owned();
533 let (tx, rx) = std::sync::mpsc::channel();
534
535 tokio::spawn(async move {
536 let result = DatabaseQueries::load_account(&pool, &account_id).await;
537 match result {
538 Ok(account) => {
539 if let Err(e) = tx.send(account) {
540 log::error!("Failed to send account {account_id}: {e:?}");
541 }
542 }
543 Err(e) => {
544 log::error!("Failed to load account {account_id}: {e:?}");
545 if let Err(e) = tx.send(None) {
546 log::error!("Failed to send None for account {account_id}: {e:?}");
547 }
548 }
549 }
550 });
551 Ok(rx.recv()?)
552 }
553
554 async fn load_order(
555 &self,
556 client_order_id: &ClientOrderId,
557 ) -> anyhow::Result<Option<OrderAny>> {
558 let pool = self.pool.clone();
559 let client_order_id = client_order_id.to_owned();
560 let (tx, rx) = std::sync::mpsc::channel();
561
562 tokio::spawn(async move {
563 let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
564 match result {
565 Ok(order) => {
566 if let Err(e) = tx.send(order) {
567 log::error!("Failed to send order {client_order_id}: {e:?}");
568 }
569 }
570 Err(e) => {
571 log::error!("Failed to load order {client_order_id}: {e:?}");
572 let _ = tx.send(None);
573 }
574 }
575 });
576 Ok(rx.recv()?)
577 }
578
579 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
580 todo!()
581 }
582
583 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
584 todo!()
585 }
586
587 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
588 todo!()
589 }
590
591 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
592 todo!()
593 }
594
595 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
596 todo!()
597 }
598
599 fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
600 anyhow::bail!(
601 "delete_order not implemented for PostgreSQL cache adapter: {client_order_id}"
602 )
603 }
604
605 fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
606 anyhow::bail!("delete_position not implemented for PostgreSQL cache adapter: {position_id}")
607 }
608
609 fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
610 anyhow::bail!(
611 "delete_account_event not implemented for PostgreSQL cache adapter: {account_id}, {event_id}"
612 )
613 }
614
615 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
616 let query = DatabaseQuery::Add(key, value.into());
617 self.tx
618 .send(query)
619 .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
620 }
621
622 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
623 let query = DatabaseQuery::AddCurrency(*currency);
624 self.tx.send(query).map_err(|e| {
625 anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
626 })
627 }
628
629 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
630 let query = DatabaseQuery::AddInstrument(instrument.clone());
631 self.tx.send(query).map_err(|e| {
632 anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
633 })
634 }
635
636 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
637 todo!()
638 }
639
640 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
641 let query = DatabaseQuery::AddAccount(account.clone(), false);
642 self.tx.send(query).map_err(|e| {
643 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
644 })
645 }
646
647 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
648 let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
649 self.tx.send(query).map_err(|e| {
650 anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
651 })
652 }
653
654 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
655 let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
656 self.tx.send(query).map_err(|e| {
657 anyhow::anyhow!(
658 "Failed to send query add_order_snapshot to database message handler: {e}"
659 )
660 })
661 }
662
663 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
664 todo!()
665 }
666
667 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
668 let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
669 self.tx.send(query).map_err(|e| {
670 anyhow::anyhow!(
671 "Failed to send query add_position_snapshot to database message handler: {e}"
672 )
673 })
674 }
675
676 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
677 todo!()
678 }
679
680 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
681 let query = DatabaseQuery::AddQuote(quote.to_owned());
682 self.tx.send(query).map_err(|e| {
683 anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
684 })
685 }
686
687 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
688 let pool = self.pool.clone();
689 let instrument_id = instrument_id.to_owned();
690 let (tx, rx) = std::sync::mpsc::channel();
691
692 tokio::spawn(async move {
693 let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
694 match result {
695 Ok(quotes) => {
696 if let Err(e) = tx.send(quotes) {
697 log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
698 }
699 }
700 Err(e) => {
701 log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
702 if let Err(e) = tx.send(Vec::new()) {
703 log::error!(
704 "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
705 );
706 }
707 }
708 }
709 });
710 Ok(rx.recv()?)
711 }
712
713 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
714 let query = DatabaseQuery::AddTrade(trade.to_owned());
715 self.tx.send(query).map_err(|e| {
716 anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
717 })
718 }
719
720 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
721 let pool = self.pool.clone();
722 let instrument_id = instrument_id.to_owned();
723 let (tx, rx) = std::sync::mpsc::channel();
724
725 tokio::spawn(async move {
726 let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
727 match result {
728 Ok(trades) => {
729 if let Err(e) = tx.send(trades) {
730 log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
731 }
732 }
733 Err(e) => {
734 log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
735 if let Err(e) = tx.send(Vec::new()) {
736 log::error!(
737 "Failed to send empty trades for instrument {instrument_id}: {e:?}"
738 );
739 }
740 }
741 }
742 });
743 Ok(rx.recv()?)
744 }
745
746 fn add_funding_rate(&self, _funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
747 anyhow::bail!("add_funding_rate not implemented for PostgreSQL cache adapter")
748 }
749
750 fn load_funding_rates(
751 &self,
752 _instrument_id: &InstrumentId,
753 ) -> anyhow::Result<Vec<FundingRateUpdate>> {
754 anyhow::bail!("load_funding_rates not implemented for PostgreSQL cache adapter")
755 }
756
757 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
758 let query = DatabaseQuery::AddBar(bar.to_owned());
759 self.tx.send(query).map_err(|e| {
760 anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
761 })
762 }
763
764 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
765 let pool = self.pool.clone();
766 let instrument_id = instrument_id.to_owned();
767 let (tx, rx) = std::sync::mpsc::channel();
768
769 tokio::spawn(async move {
770 let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
771 match result {
772 Ok(bars) => {
773 if let Err(e) = tx.send(bars) {
774 log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
775 }
776 }
777 Err(e) => {
778 log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
779 if let Err(e) = tx.send(Vec::new()) {
780 log::error!(
781 "Failed to send empty bars for instrument {instrument_id}: {e:?}"
782 );
783 }
784 }
785 }
786 });
787 Ok(rx.recv()?)
788 }
789
790 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
791 let query = DatabaseQuery::AddSignal(signal.to_owned());
792 self.tx.send(query).map_err(|e| {
793 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
794 })
795 }
796
797 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
798 let pool = self.pool.clone();
799 let name = name.to_owned();
800 let (tx, rx) = std::sync::mpsc::channel();
801
802 tokio::spawn(async move {
803 let result = DatabaseQueries::load_signals(&pool, &name).await;
804 match result {
805 Ok(signals) => {
806 if let Err(e) = tx.send(signals) {
807 log::error!("Failed to send signals for '{name}': {e:?}");
808 }
809 }
810 Err(e) => {
811 log::error!("Failed to load signals for '{name}': {e:?}");
812 if let Err(e) = tx.send(Vec::new()) {
813 log::error!("Failed to send empty signals for '{name}': {e:?}");
814 }
815 }
816 }
817 });
818 Ok(rx.recv()?)
819 }
820
821 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
822 let query = DatabaseQuery::AddCustom(data.to_owned());
823 self.tx.send(query).map_err(|e| {
824 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
825 })
826 }
827
828 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
829 let pool = self.pool.clone();
830 let data_type = data_type.to_owned();
831 let (tx, rx) = std::sync::mpsc::channel();
832
833 tokio::spawn(async move {
834 let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
835 match result {
836 Ok(signals) => {
837 if let Err(e) = tx.send(signals) {
838 log::error!("Failed to send custom data for '{data_type}': {e:?}");
839 }
840 }
841 Err(e) => {
842 log::error!("Failed to load custom data for '{data_type}': {e:?}");
843 if let Err(e) = tx.send(Vec::new()) {
844 log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
845 }
846 }
847 }
848 });
849 Ok(rx.recv()?)
850 }
851
852 fn load_order_snapshot(
853 &self,
854 client_order_id: &ClientOrderId,
855 ) -> anyhow::Result<Option<OrderSnapshot>> {
856 let pool = self.pool.clone();
857 let client_order_id = client_order_id.to_owned();
858 let (tx, rx) = std::sync::mpsc::channel();
859
860 tokio::spawn(async move {
861 let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
862 match result {
863 Ok(snapshot) => {
864 if let Err(e) = tx.send(snapshot) {
865 log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
866 }
867 }
868 Err(e) => {
869 log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
870 if let Err(e) = tx.send(None) {
871 log::error!(
872 "Failed to send None for order snapshot {client_order_id}: {e:?}"
873 );
874 }
875 }
876 }
877 });
878 Ok(rx.recv()?)
879 }
880
881 fn load_position_snapshot(
882 &self,
883 position_id: &PositionId,
884 ) -> anyhow::Result<Option<PositionSnapshot>> {
885 let pool = self.pool.clone();
886 let position_id = position_id.to_owned();
887 let (tx, rx) = std::sync::mpsc::channel();
888
889 tokio::spawn(async move {
890 let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
891 match result {
892 Ok(snapshot) => {
893 if let Err(e) = tx.send(snapshot) {
894 log::error!("Failed to send position snapshot {position_id}: {e:?}");
895 }
896 }
897 Err(e) => {
898 log::error!("Failed to load position snapshot {position_id}: {e:?}");
899 if let Err(e) = tx.send(None) {
900 log::error!(
901 "Failed to send None for position snapshot {position_id}: {e:?}"
902 );
903 }
904 }
905 }
906 });
907 Ok(rx.recv()?)
908 }
909
910 fn index_venue_order_id(
911 &self,
912 client_order_id: ClientOrderId,
913 venue_order_id: VenueOrderId,
914 ) -> anyhow::Result<()> {
915 todo!()
916 }
917
918 fn index_order_position(
919 &self,
920 client_order_id: ClientOrderId,
921 position_id: PositionId,
922 ) -> anyhow::Result<()> {
923 todo!()
924 }
925
926 fn update_actor(&self) -> anyhow::Result<()> {
927 todo!()
928 }
929
930 fn update_strategy(&self) -> anyhow::Result<()> {
931 todo!()
932 }
933
934 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
935 let query = DatabaseQuery::AddAccount(account.clone(), true);
936 self.tx.send(query).map_err(|e| {
937 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
938 })
939 }
940
941 fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
942 let query = DatabaseQuery::UpdateOrder(event.clone());
943 self.tx.send(query).map_err(|e| {
944 anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
945 })
946 }
947
948 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
949 todo!()
950 }
951
952 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
953 todo!()
954 }
955
956 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
957 todo!()
958 }
959
960 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
961 todo!()
962 }
963}
964
965async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
966 for cmd in buffer.drain(..) {
967 let result: anyhow::Result<()> = match cmd {
968 DatabaseQuery::Close => Ok(()),
969 DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value).await,
970 DatabaseQuery::AddCurrency(currency) => {
971 DatabaseQueries::add_currency(pool, currency).await
972 }
973 DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
974 InstrumentAny::Betting(instrument) => {
975 DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
976 }
977 InstrumentAny::BinaryOption(instrument) => {
978 DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
979 .await
980 }
981 InstrumentAny::CryptoFuture(instrument) => {
982 DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
983 .await
984 }
985 InstrumentAny::CryptoOption(instrument) => {
986 DatabaseQueries::add_instrument(pool, "CRYPTO_OPTION", Box::new(instrument))
987 .await
988 }
989 InstrumentAny::CryptoPerpetual(instrument) => {
990 DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
991 .await
992 }
993 InstrumentAny::CurrencyPair(instrument) => {
994 DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
995 .await
996 }
997 InstrumentAny::Equity(equity) => {
998 DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
999 }
1000 InstrumentAny::FuturesContract(instrument) => {
1001 DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
1002 .await
1003 }
1004 InstrumentAny::FuturesSpread(instrument) => {
1005 DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
1006 .await
1007 }
1008 InstrumentAny::OptionContract(instrument) => {
1009 DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
1010 .await
1011 }
1012 InstrumentAny::OptionSpread(instrument) => {
1013 DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
1014 .await
1015 }
1016 },
1017 DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
1018 OrderAny::Limit(order) => {
1019 DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
1020 .await
1021 }
1022 OrderAny::LimitIfTouched(order) => {
1023 DatabaseQueries::add_order(
1024 pool,
1025 "LIMIT_IF_TOUCHED",
1026 updated,
1027 Box::new(order),
1028 client_id,
1029 )
1030 .await
1031 }
1032 OrderAny::Market(order) => {
1033 DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
1034 .await
1035 }
1036 OrderAny::MarketIfTouched(order) => {
1037 DatabaseQueries::add_order(
1038 pool,
1039 "MARKET_IF_TOUCHED",
1040 updated,
1041 Box::new(order),
1042 client_id,
1043 )
1044 .await
1045 }
1046 OrderAny::MarketToLimit(order) => {
1047 DatabaseQueries::add_order(
1048 pool,
1049 "MARKET_TO_LIMIT",
1050 updated,
1051 Box::new(order),
1052 client_id,
1053 )
1054 .await
1055 }
1056 OrderAny::StopLimit(order) => {
1057 DatabaseQueries::add_order(
1058 pool,
1059 "STOP_LIMIT",
1060 updated,
1061 Box::new(order),
1062 client_id,
1063 )
1064 .await
1065 }
1066 OrderAny::StopMarket(order) => {
1067 DatabaseQueries::add_order(
1068 pool,
1069 "STOP_MARKET",
1070 updated,
1071 Box::new(order),
1072 client_id,
1073 )
1074 .await
1075 }
1076 OrderAny::TrailingStopLimit(order) => {
1077 DatabaseQueries::add_order(
1078 pool,
1079 "TRAILING_STOP_LIMIT",
1080 updated,
1081 Box::new(order),
1082 client_id,
1083 )
1084 .await
1085 }
1086 OrderAny::TrailingStopMarket(order) => {
1087 DatabaseQueries::add_order(
1088 pool,
1089 "TRAILING_STOP_MARKET",
1090 updated,
1091 Box::new(order),
1092 client_id,
1093 )
1094 .await
1095 }
1096 },
1097 DatabaseQuery::AddOrderSnapshot(snapshot) => {
1098 DatabaseQueries::add_order_snapshot(pool, snapshot).await
1099 }
1100 DatabaseQuery::AddPositionSnapshot(snapshot) => {
1101 DatabaseQueries::add_position_snapshot(pool, snapshot).await
1102 }
1103 DatabaseQuery::AddAccount(account_any, updated) => match account_any {
1104 AccountAny::Cash(account) => {
1105 DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
1106 }
1107 AccountAny::Margin(account) => {
1108 DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
1109 }
1110 },
1111 DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
1112 DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
1113 DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, "e).await,
1114 DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
1115 DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
1116 DatabaseQuery::UpdateOrder(event) => {
1117 DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
1118 }
1119 };
1120
1121 if let Err(e) = result {
1122 log::error!("Error on query: {e:?}");
1123 }
1124 }
1125}