1use std::{collections::VecDeque, time::Duration};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use nautilus_common::{
21 cache::database::{CacheDatabaseAdapter, CacheMap},
22 custom::CustomData,
23 logging::{log_task_awaiting, log_task_started, log_task_stopped},
24 runtime::get_runtime,
25 signal::Signal,
26};
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29 accounts::AccountAny,
30 data::{Bar, DataType, QuoteTick, TradeTick},
31 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
32 identifiers::{
33 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
34 VenueOrderId,
35 },
36 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
37 orderbook::OrderBook,
38 orders::{Order, OrderAny},
39 position::Position,
40 types::Currency,
41};
42use sqlx::{PgPool, postgres::PgConnectOptions};
43use tokio::{time::Instant, try_join};
44use ustr::Ustr;
45
46use crate::sql::{
47 pg::{connect_pg, get_postgres_connect_options},
48 queries::DatabaseQueries,
49};
50
51const CACHE_PROCESS: &str = "cache-process";
53
54#[derive(Debug)]
55#[cfg_attr(
56 feature = "python",
57 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
58)]
59pub struct PostgresCacheDatabase {
60 pub pool: PgPool,
61 tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
62 handle: tokio::task::JoinHandle<()>,
63}
64
65#[allow(clippy::large_enum_variant)]
66#[derive(Debug, Clone)]
67pub enum DatabaseQuery {
68 Close,
69 Add(String, Vec<u8>),
70 AddCurrency(Currency),
71 AddInstrument(InstrumentAny),
72 AddOrder(OrderAny, Option<ClientId>, bool),
73 AddOrderSnapshot(OrderSnapshot),
74 AddPositionSnapshot(PositionSnapshot),
75 AddAccount(AccountAny, bool),
76 AddSignal(Signal),
77 AddCustom(CustomData),
78 AddQuote(QuoteTick),
79 AddTrade(TradeTick),
80 AddBar(Bar),
81 UpdateOrder(OrderEventAny),
82}
83
84impl PostgresCacheDatabase {
85 pub async fn connect(
95 host: Option<String>,
96 port: Option<u16>,
97 username: Option<String>,
98 password: Option<String>,
99 database: Option<String>,
100 ) -> Result<Self, sqlx::Error> {
101 let pg_connect_options =
102 get_postgres_connect_options(host, port, username, password, database);
103 let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
104 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
105
106 let handle = tokio::spawn(async move {
108 Self::process_commands(rx, pg_connect_options.clone().into()).await;
109 });
110 Ok(Self { pool, tx, handle })
111 }
112
113 async fn process_commands(
114 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
115 pg_connect_options: PgConnectOptions,
116 ) {
117 log_task_started(CACHE_PROCESS);
118
119 let pool = connect_pg(pg_connect_options).await.unwrap();
120
121 let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
123
124 let buffer_interval = Duration::from_millis(0);
126
127 let flush_timer = tokio::time::sleep(buffer_interval);
129 tokio::pin!(flush_timer);
130
131 loop {
133 tokio::select! {
134 maybe_msg = rx.recv() => {
135 if let Some(msg) = maybe_msg {
136 tracing::debug!("Received {msg:?}");
137 if matches!(msg, DatabaseQuery::Close) {
138 break;
139 }
140 buffer.push_back(msg);
141
142 if buffer_interval.is_zero() {
144 drain_buffer(&pool, &mut buffer).await;
145 }
146 } else {
147 tracing::debug!("Command channel closed");
148 break;
149 }
150 }
151 () = &mut flush_timer, if !buffer_interval.is_zero() => {
152 if !buffer.is_empty() {
153 drain_buffer(&pool, &mut buffer).await;
154 }
155
156 flush_timer.as_mut().reset(Instant::now() + buffer_interval);
157 }
158 }
159 }
160
161 if !buffer.is_empty() {
163 drain_buffer(&pool, &mut buffer).await;
164 }
165
166 log_task_stopped(CACHE_PROCESS);
167 }
168}
169
170pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
176 let connect_options = get_postgres_connect_options(None, None, None, None, None);
177 Ok(PostgresCacheDatabase::connect(
178 Some(connect_options.host),
179 Some(connect_options.port),
180 Some(connect_options.username),
181 Some(connect_options.password),
182 Some(connect_options.database),
183 )
184 .await?)
185}
186
187#[allow(dead_code)]
188#[allow(unused)]
189#[async_trait::async_trait]
190impl CacheDatabaseAdapter for PostgresCacheDatabase {
191 fn close(&mut self) -> anyhow::Result<()> {
192 let pool = self.pool.clone();
193 let (tx, rx) = std::sync::mpsc::channel();
194
195 log::debug!("Closing connection pool");
196
197 tokio::task::block_in_place(|| {
198 get_runtime().block_on(async {
199 pool.close().await;
200 if let Err(e) = tx.send(()) {
201 log::error!("Error closing pool: {e:?}");
202 }
203 });
204 });
205
206 if let Err(e) = self.tx.send(DatabaseQuery::Close) {
208 log::error!("Error sending close: {e:?}");
209 }
210
211 log_task_awaiting("cache-write");
212
213 tokio::task::block_in_place(|| {
214 if let Err(e) = get_runtime().block_on(&mut self.handle) {
215 log::error!("Error awaiting task 'cache-write': {e:?}");
216 }
217 });
218
219 log::debug!("Closed");
220
221 Ok(rx.recv()?)
222 }
223
224 fn flush(&mut self) -> anyhow::Result<()> {
225 let pool = self.pool.clone();
226 let (tx, rx) = std::sync::mpsc::channel();
227
228 tokio::task::block_in_place(|| {
229 get_runtime().block_on(async {
230 if let Err(e) = DatabaseQueries::truncate(&pool).await {
231 log::error!("Error flushing pool: {e:?}");
232 }
233 if let Err(e) = tx.send(()) {
234 log::error!("Error sending flush result: {e:?}");
235 }
236 });
237 });
238
239 Ok(rx.recv()?)
240 }
241
242 async fn load_all(&self) -> anyhow::Result<CacheMap> {
243 let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
244 self.load_currencies(),
245 self.load_instruments(),
246 self.load_synthetics(),
247 self.load_accounts(),
248 self.load_orders(),
249 self.load_positions()
250 )
251 .map_err(|e| anyhow::anyhow!("Error loading cache data: {}", e))?;
252
253 let greeks = AHashMap::new();
256 let yield_curves = AHashMap::new();
257
258 Ok(CacheMap {
259 currencies,
260 instruments,
261 synthetics,
262 accounts,
263 orders,
264 positions,
265 greeks,
266 yield_curves,
267 })
268 }
269
270 fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
271 let pool = self.pool.clone();
272 let (tx, rx) = std::sync::mpsc::channel();
273
274 tokio::spawn(async move {
275 let result = DatabaseQueries::load(&pool).await;
276 match result {
277 Ok(items) => {
278 let mapping = items
279 .into_iter()
280 .map(|(k, v)| (k, Bytes::from(v)))
281 .collect();
282 if let Err(e) = tx.send(mapping) {
283 log::error!("Failed to send general items: {e:?}");
284 }
285 }
286 Err(e) => {
287 log::error!("Failed to load general items: {e:?}");
288 if let Err(e) = tx.send(AHashMap::new()) {
289 log::error!("Failed to send empty general items: {e:?}");
290 }
291 }
292 }
293 });
294 Ok(rx.recv()?)
295 }
296
297 async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
298 let pool = self.pool.clone();
299 let (tx, rx) = std::sync::mpsc::channel();
300
301 tokio::spawn(async move {
302 let result = DatabaseQueries::load_currencies(&pool).await;
303 match result {
304 Ok(currencies) => {
305 let mapping = currencies
306 .into_iter()
307 .map(|currency| (currency.code, currency))
308 .collect();
309 if let Err(e) = tx.send(mapping) {
310 log::error!("Failed to send currencies: {e:?}");
311 }
312 }
313 Err(e) => {
314 log::error!("Failed to load currencies: {e:?}");
315 if let Err(e) = tx.send(AHashMap::new()) {
316 log::error!("Failed to send empty currencies: {e:?}");
317 }
318 }
319 }
320 });
321 Ok(rx.recv()?)
322 }
323
324 async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
325 let pool = self.pool.clone();
326 let (tx, rx) = std::sync::mpsc::channel();
327
328 tokio::spawn(async move {
329 let result = DatabaseQueries::load_instruments(&pool).await;
330 match result {
331 Ok(instruments) => {
332 let mapping = instruments
333 .into_iter()
334 .map(|instrument| (instrument.id(), instrument))
335 .collect();
336 if let Err(e) = tx.send(mapping) {
337 log::error!("Failed to send instruments: {e:?}");
338 }
339 }
340 Err(e) => {
341 log::error!("Failed to load instruments: {e:?}");
342 if let Err(e) = tx.send(AHashMap::new()) {
343 log::error!("Failed to send empty instruments: {e:?}");
344 }
345 }
346 }
347 });
348 Ok(rx.recv()?)
349 }
350
351 async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
352 todo!()
353 }
354
355 async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
356 let pool = self.pool.clone();
357 let (tx, rx) = std::sync::mpsc::channel();
358
359 tokio::spawn(async move {
360 let result = DatabaseQueries::load_accounts(&pool).await;
361 match result {
362 Ok(accounts) => {
363 let mapping = accounts
364 .into_iter()
365 .map(|account| (account.id(), account))
366 .collect();
367 if let Err(e) = tx.send(mapping) {
368 log::error!("Failed to send accounts: {e:?}");
369 }
370 }
371 Err(e) => {
372 log::error!("Failed to load accounts: {e:?}");
373 if let Err(e) = tx.send(AHashMap::new()) {
374 log::error!("Failed to send empty accounts: {e:?}");
375 }
376 }
377 }
378 });
379 Ok(rx.recv()?)
380 }
381
382 async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
383 let pool = self.pool.clone();
384 let (tx, rx) = std::sync::mpsc::channel();
385
386 tokio::spawn(async move {
387 let result = DatabaseQueries::load_orders(&pool).await;
388 match result {
389 Ok(orders) => {
390 let mapping = orders
391 .into_iter()
392 .map(|order| (order.client_order_id(), order))
393 .collect();
394 if let Err(e) = tx.send(mapping) {
395 log::error!("Failed to send orders: {e:?}");
396 }
397 }
398 Err(e) => {
399 log::error!("Failed to load orders: {e:?}");
400 if let Err(e) = tx.send(AHashMap::new()) {
401 log::error!("Failed to send empty orders: {e:?}");
402 }
403 }
404 }
405 });
406 Ok(rx.recv()?)
407 }
408
409 async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
410 todo!()
411 }
412
413 fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
414 todo!()
415 }
416
417 fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
418 let pool = self.pool.clone();
419 let (tx, rx) = std::sync::mpsc::channel();
420
421 tokio::spawn(async move {
422 let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
423 match result {
424 Ok(currency) => {
425 if let Err(e) = tx.send(currency) {
426 log::error!("Failed to send load_index_order_client result: {e:?}");
427 }
428 }
429 Err(e) => {
430 log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
431 if let Err(e) = tx.send(AHashMap::new()) {
432 log::error!("Failed to send empty load_index_order_client result: {e:?}");
433 }
434 }
435 }
436 });
437 Ok(rx.recv()?)
438 }
439
440 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
441 let pool = self.pool.clone();
442 let code = code.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
444
445 tokio::spawn(async move {
446 let result = DatabaseQueries::load_currency(&pool, &code).await;
447 match result {
448 Ok(currency) => {
449 if let Err(e) = tx.send(currency) {
450 log::error!("Failed to send currency {code}: {e:?}");
451 }
452 }
453 Err(e) => {
454 log::error!("Failed to load currency {code}: {e:?}");
455 if let Err(e) = tx.send(None) {
456 log::error!("Failed to send None for currency {code}: {e:?}");
457 }
458 }
459 }
460 });
461 Ok(rx.recv()?)
462 }
463
464 async fn load_instrument(
465 &self,
466 instrument_id: &InstrumentId,
467 ) -> anyhow::Result<Option<InstrumentAny>> {
468 let pool = self.pool.clone();
469 let instrument_id = instrument_id.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
471
472 tokio::spawn(async move {
473 let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
474 match result {
475 Ok(instrument) => {
476 if let Err(e) = tx.send(instrument) {
477 log::error!("Failed to send instrument {instrument_id}: {e:?}");
478 }
479 }
480 Err(e) => {
481 log::error!("Failed to load instrument {instrument_id}: {e:?}");
482 if let Err(e) = tx.send(None) {
483 log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
484 }
485 }
486 }
487 });
488 Ok(rx.recv()?)
489 }
490
491 async fn load_synthetic(
492 &self,
493 instrument_id: &InstrumentId,
494 ) -> anyhow::Result<Option<SyntheticInstrument>> {
495 todo!()
496 }
497
498 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
499 let pool = self.pool.clone();
500 let account_id = account_id.to_owned();
501 let (tx, rx) = std::sync::mpsc::channel();
502
503 tokio::spawn(async move {
504 let result = DatabaseQueries::load_account(&pool, &account_id).await;
505 match result {
506 Ok(account) => {
507 if let Err(e) = tx.send(account) {
508 log::error!("Failed to send account {account_id}: {e:?}");
509 }
510 }
511 Err(e) => {
512 log::error!("Failed to load account {account_id}: {e:?}");
513 if let Err(e) = tx.send(None) {
514 log::error!("Failed to send None for account {account_id}: {e:?}");
515 }
516 }
517 }
518 });
519 Ok(rx.recv()?)
520 }
521
522 async fn load_order(
523 &self,
524 client_order_id: &ClientOrderId,
525 ) -> anyhow::Result<Option<OrderAny>> {
526 let pool = self.pool.clone();
527 let client_order_id = client_order_id.to_owned();
528 let (tx, rx) = std::sync::mpsc::channel();
529
530 tokio::spawn(async move {
531 let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
532 match result {
533 Ok(order) => {
534 if let Err(e) = tx.send(order) {
535 log::error!("Failed to send order {client_order_id}: {e:?}");
536 }
537 }
538 Err(e) => {
539 log::error!("Failed to load order {client_order_id}: {e:?}");
540 let _ = tx.send(None);
541 }
542 }
543 });
544 Ok(rx.recv()?)
545 }
546
547 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
548 todo!()
549 }
550
551 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
552 todo!()
553 }
554
555 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
556 todo!()
557 }
558
559 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
560 todo!()
561 }
562
563 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
564 todo!()
565 }
566
567 fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
568 anyhow::bail!(
569 "delete_order not implemented for PostgreSQL cache adapter: {client_order_id}"
570 )
571 }
572
573 fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
574 anyhow::bail!("delete_position not implemented for PostgreSQL cache adapter: {position_id}")
575 }
576
577 fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
578 anyhow::bail!(
579 "delete_account_event not implemented for PostgreSQL cache adapter: {account_id}, {event_id}"
580 )
581 }
582
583 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
584 let query = DatabaseQuery::Add(key, value.into());
585 self.tx
586 .send(query)
587 .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
588 }
589
590 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
591 let query = DatabaseQuery::AddCurrency(*currency);
592 self.tx.send(query).map_err(|e| {
593 anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
594 })
595 }
596
597 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
598 let query = DatabaseQuery::AddInstrument(instrument.clone());
599 self.tx.send(query).map_err(|e| {
600 anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
601 })
602 }
603
604 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
605 todo!()
606 }
607
608 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
609 let query = DatabaseQuery::AddAccount(account.clone(), false);
610 self.tx.send(query).map_err(|e| {
611 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
612 })
613 }
614
615 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
616 let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
617 self.tx.send(query).map_err(|e| {
618 anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
619 })
620 }
621
622 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
623 let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
624 self.tx.send(query).map_err(|e| {
625 anyhow::anyhow!(
626 "Failed to send query add_order_snapshot to database message handler: {e}"
627 )
628 })
629 }
630
631 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
632 todo!()
633 }
634
635 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
636 let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
637 self.tx.send(query).map_err(|e| {
638 anyhow::anyhow!(
639 "Failed to send query add_position_snapshot to database message handler: {e}"
640 )
641 })
642 }
643
644 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
645 todo!()
646 }
647
648 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
649 let query = DatabaseQuery::AddQuote(quote.to_owned());
650 self.tx.send(query).map_err(|e| {
651 anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
652 })
653 }
654
655 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
656 let pool = self.pool.clone();
657 let instrument_id = instrument_id.to_owned();
658 let (tx, rx) = std::sync::mpsc::channel();
659
660 tokio::spawn(async move {
661 let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
662 match result {
663 Ok(quotes) => {
664 if let Err(e) = tx.send(quotes) {
665 log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
666 }
667 }
668 Err(e) => {
669 log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
670 if let Err(e) = tx.send(Vec::new()) {
671 log::error!(
672 "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
673 );
674 }
675 }
676 }
677 });
678 Ok(rx.recv()?)
679 }
680
681 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
682 let query = DatabaseQuery::AddTrade(trade.to_owned());
683 self.tx.send(query).map_err(|e| {
684 anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
685 })
686 }
687
688 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
689 let pool = self.pool.clone();
690 let instrument_id = instrument_id.to_owned();
691 let (tx, rx) = std::sync::mpsc::channel();
692
693 tokio::spawn(async move {
694 let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
695 match result {
696 Ok(trades) => {
697 if let Err(e) = tx.send(trades) {
698 log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
699 }
700 }
701 Err(e) => {
702 log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
703 if let Err(e) = tx.send(Vec::new()) {
704 log::error!(
705 "Failed to send empty trades for instrument {instrument_id}: {e:?}"
706 );
707 }
708 }
709 }
710 });
711 Ok(rx.recv()?)
712 }
713
714 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
715 let query = DatabaseQuery::AddBar(bar.to_owned());
716 self.tx.send(query).map_err(|e| {
717 anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
718 })
719 }
720
721 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
722 let pool = self.pool.clone();
723 let instrument_id = instrument_id.to_owned();
724 let (tx, rx) = std::sync::mpsc::channel();
725
726 tokio::spawn(async move {
727 let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
728 match result {
729 Ok(bars) => {
730 if let Err(e) = tx.send(bars) {
731 log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
732 }
733 }
734 Err(e) => {
735 log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
736 if let Err(e) = tx.send(Vec::new()) {
737 log::error!(
738 "Failed to send empty bars for instrument {instrument_id}: {e:?}"
739 );
740 }
741 }
742 }
743 });
744 Ok(rx.recv()?)
745 }
746
747 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
748 let query = DatabaseQuery::AddSignal(signal.to_owned());
749 self.tx.send(query).map_err(|e| {
750 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
751 })
752 }
753
754 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
755 let pool = self.pool.clone();
756 let name = name.to_owned();
757 let (tx, rx) = std::sync::mpsc::channel();
758
759 tokio::spawn(async move {
760 let result = DatabaseQueries::load_signals(&pool, &name).await;
761 match result {
762 Ok(signals) => {
763 if let Err(e) = tx.send(signals) {
764 log::error!("Failed to send signals for '{name}': {e:?}");
765 }
766 }
767 Err(e) => {
768 log::error!("Failed to load signals for '{name}': {e:?}");
769 if let Err(e) = tx.send(Vec::new()) {
770 log::error!("Failed to send empty signals for '{name}': {e:?}");
771 }
772 }
773 }
774 });
775 Ok(rx.recv()?)
776 }
777
778 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
779 let query = DatabaseQuery::AddCustom(data.to_owned());
780 self.tx.send(query).map_err(|e| {
781 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
782 })
783 }
784
785 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
786 let pool = self.pool.clone();
787 let data_type = data_type.to_owned();
788 let (tx, rx) = std::sync::mpsc::channel();
789
790 tokio::spawn(async move {
791 let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
792 match result {
793 Ok(signals) => {
794 if let Err(e) = tx.send(signals) {
795 log::error!("Failed to send custom data for '{data_type}': {e:?}");
796 }
797 }
798 Err(e) => {
799 log::error!("Failed to load custom data for '{data_type}': {e:?}");
800 if let Err(e) = tx.send(Vec::new()) {
801 log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
802 }
803 }
804 }
805 });
806 Ok(rx.recv()?)
807 }
808
809 fn load_order_snapshot(
810 &self,
811 client_order_id: &ClientOrderId,
812 ) -> anyhow::Result<Option<OrderSnapshot>> {
813 let pool = self.pool.clone();
814 let client_order_id = client_order_id.to_owned();
815 let (tx, rx) = std::sync::mpsc::channel();
816
817 tokio::spawn(async move {
818 let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
819 match result {
820 Ok(snapshot) => {
821 if let Err(e) = tx.send(snapshot) {
822 log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
823 }
824 }
825 Err(e) => {
826 log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
827 if let Err(e) = tx.send(None) {
828 log::error!(
829 "Failed to send None for order snapshot {client_order_id}: {e:?}"
830 );
831 }
832 }
833 }
834 });
835 Ok(rx.recv()?)
836 }
837
838 fn load_position_snapshot(
839 &self,
840 position_id: &PositionId,
841 ) -> anyhow::Result<Option<PositionSnapshot>> {
842 let pool = self.pool.clone();
843 let position_id = position_id.to_owned();
844 let (tx, rx) = std::sync::mpsc::channel();
845
846 tokio::spawn(async move {
847 let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
848 match result {
849 Ok(snapshot) => {
850 if let Err(e) = tx.send(snapshot) {
851 log::error!("Failed to send position snapshot {position_id}: {e:?}");
852 }
853 }
854 Err(e) => {
855 log::error!("Failed to load position snapshot {position_id}: {e:?}");
856 if let Err(e) = tx.send(None) {
857 log::error!(
858 "Failed to send None for position snapshot {position_id}: {e:?}"
859 );
860 }
861 }
862 }
863 });
864 Ok(rx.recv()?)
865 }
866
867 fn index_venue_order_id(
868 &self,
869 client_order_id: ClientOrderId,
870 venue_order_id: VenueOrderId,
871 ) -> anyhow::Result<()> {
872 todo!()
873 }
874
875 fn index_order_position(
876 &self,
877 client_order_id: ClientOrderId,
878 position_id: PositionId,
879 ) -> anyhow::Result<()> {
880 todo!()
881 }
882
883 fn update_actor(&self) -> anyhow::Result<()> {
884 todo!()
885 }
886
887 fn update_strategy(&self) -> anyhow::Result<()> {
888 todo!()
889 }
890
891 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
892 let query = DatabaseQuery::AddAccount(account.clone(), true);
893 self.tx.send(query).map_err(|e| {
894 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
895 })
896 }
897
898 fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
899 let query = DatabaseQuery::UpdateOrder(event.clone());
900 self.tx.send(query).map_err(|e| {
901 anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
902 })
903 }
904
905 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
906 todo!()
907 }
908
909 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
910 todo!()
911 }
912
913 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
914 todo!()
915 }
916
917 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
918 todo!()
919 }
920}
921
922async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
923 for cmd in buffer.drain(..) {
924 let result: anyhow::Result<()> = match cmd {
925 DatabaseQuery::Close => Ok(()),
926 DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value).await,
927 DatabaseQuery::AddCurrency(currency) => {
928 DatabaseQueries::add_currency(pool, currency).await
929 }
930 DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
931 InstrumentAny::Betting(instrument) => {
932 DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
933 }
934 InstrumentAny::BinaryOption(instrument) => {
935 DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
936 .await
937 }
938 InstrumentAny::CryptoFuture(instrument) => {
939 DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
940 .await
941 }
942 InstrumentAny::CryptoOption(instrument) => {
943 DatabaseQueries::add_instrument(pool, "CRYPTO_OPTION", Box::new(instrument))
944 .await
945 }
946 InstrumentAny::CryptoPerpetual(instrument) => {
947 DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
948 .await
949 }
950 InstrumentAny::CurrencyPair(instrument) => {
951 DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
952 .await
953 }
954 InstrumentAny::Equity(equity) => {
955 DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
956 }
957 InstrumentAny::FuturesContract(instrument) => {
958 DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
959 .await
960 }
961 InstrumentAny::FuturesSpread(instrument) => {
962 DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
963 .await
964 }
965 InstrumentAny::OptionContract(instrument) => {
966 DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
967 .await
968 }
969 InstrumentAny::OptionSpread(instrument) => {
970 DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
971 .await
972 }
973 },
974 DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
975 OrderAny::Limit(order) => {
976 DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
977 .await
978 }
979 OrderAny::LimitIfTouched(order) => {
980 DatabaseQueries::add_order(
981 pool,
982 "LIMIT_IF_TOUCHED",
983 updated,
984 Box::new(order),
985 client_id,
986 )
987 .await
988 }
989 OrderAny::Market(order) => {
990 DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
991 .await
992 }
993 OrderAny::MarketIfTouched(order) => {
994 DatabaseQueries::add_order(
995 pool,
996 "MARKET_IF_TOUCHED",
997 updated,
998 Box::new(order),
999 client_id,
1000 )
1001 .await
1002 }
1003 OrderAny::MarketToLimit(order) => {
1004 DatabaseQueries::add_order(
1005 pool,
1006 "MARKET_TO_LIMIT",
1007 updated,
1008 Box::new(order),
1009 client_id,
1010 )
1011 .await
1012 }
1013 OrderAny::StopLimit(order) => {
1014 DatabaseQueries::add_order(
1015 pool,
1016 "STOP_LIMIT",
1017 updated,
1018 Box::new(order),
1019 client_id,
1020 )
1021 .await
1022 }
1023 OrderAny::StopMarket(order) => {
1024 DatabaseQueries::add_order(
1025 pool,
1026 "STOP_MARKET",
1027 updated,
1028 Box::new(order),
1029 client_id,
1030 )
1031 .await
1032 }
1033 OrderAny::TrailingStopLimit(order) => {
1034 DatabaseQueries::add_order(
1035 pool,
1036 "TRAILING_STOP_LIMIT",
1037 updated,
1038 Box::new(order),
1039 client_id,
1040 )
1041 .await
1042 }
1043 OrderAny::TrailingStopMarket(order) => {
1044 DatabaseQueries::add_order(
1045 pool,
1046 "TRAILING_STOP_MARKET",
1047 updated,
1048 Box::new(order),
1049 client_id,
1050 )
1051 .await
1052 }
1053 },
1054 DatabaseQuery::AddOrderSnapshot(snapshot) => {
1055 DatabaseQueries::add_order_snapshot(pool, snapshot).await
1056 }
1057 DatabaseQuery::AddPositionSnapshot(snapshot) => {
1058 DatabaseQueries::add_position_snapshot(pool, snapshot).await
1059 }
1060 DatabaseQuery::AddAccount(account_any, updated) => match account_any {
1061 AccountAny::Cash(account) => {
1062 DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
1063 }
1064 AccountAny::Margin(account) => {
1065 DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
1066 }
1067 },
1068 DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
1069 DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
1070 DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, "e).await,
1071 DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
1072 DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
1073 DatabaseQuery::UpdateOrder(event) => {
1074 DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
1075 }
1076 };
1077
1078 if let Err(e) = result {
1079 tracing::error!("Error on query: {e:?}");
1080 }
1081 }
1082}