1use std::{
17 collections::{HashMap, VecDeque},
18 time::{Duration, Instant},
19};
20
21use bytes::Bytes;
22use nautilus_common::{
23 cache::database::{CacheDatabaseAdapter, CacheMap},
24 custom::CustomData,
25 logging::{log_task_awaiting, log_task_started, log_task_stopped},
26 runtime::get_runtime,
27 signal::Signal,
28};
29use nautilus_core::UnixNanos;
30use nautilus_model::{
31 accounts::AccountAny,
32 data::{Bar, DataType, QuoteTick, TradeTick},
33 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
34 identifiers::{
35 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
36 VenueOrderId,
37 },
38 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
39 orderbook::OrderBook,
40 orders::{Order, OrderAny},
41 position::Position,
42 types::Currency,
43};
44use sqlx::{PgPool, postgres::PgConnectOptions};
45use tokio::try_join;
46use ustr::Ustr;
47
48use crate::sql::{
49 pg::{connect_pg, get_postgres_connect_options},
50 queries::DatabaseQueries,
51};
52
53const CACHE_PROCESS: &str = "cache-process";
55
56#[derive(Debug)]
57#[cfg_attr(
58 feature = "python",
59 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
60)]
61pub struct PostgresCacheDatabase {
62 pub pool: PgPool,
63 tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
64 handle: tokio::task::JoinHandle<()>,
65}
66
67#[allow(clippy::large_enum_variant)]
68#[derive(Debug, Clone)]
69pub enum DatabaseQuery {
70 Close,
71 Add(String, Vec<u8>),
72 AddCurrency(Currency),
73 AddInstrument(InstrumentAny),
74 AddOrder(OrderAny, Option<ClientId>, bool),
75 AddOrderSnapshot(OrderSnapshot),
76 AddPositionSnapshot(PositionSnapshot),
77 AddAccount(AccountAny, bool),
78 AddSignal(Signal),
79 AddCustom(CustomData),
80 AddQuote(QuoteTick),
81 AddTrade(TradeTick),
82 AddBar(Bar),
83 UpdateOrder(OrderEventAny),
84}
85
86impl PostgresCacheDatabase {
87 pub async fn connect(
97 host: Option<String>,
98 port: Option<u16>,
99 username: Option<String>,
100 password: Option<String>,
101 database: Option<String>,
102 ) -> Result<Self, sqlx::Error> {
103 let pg_connect_options =
104 get_postgres_connect_options(host, port, username, password, database);
105 let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
106 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
107
108 let handle = tokio::spawn(async move {
110 Self::process_commands(rx, pg_connect_options.clone().into()).await;
111 });
112 Ok(Self { pool, tx, handle })
113 }
114
115 async fn process_commands(
116 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
117 pg_connect_options: PgConnectOptions,
118 ) {
119 log_task_started(CACHE_PROCESS);
120
121 let pool = connect_pg(pg_connect_options).await.unwrap();
122
123 let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
125 let mut last_drain = Instant::now();
126
127 let buffer_interval = Duration::from_millis(0);
129
130 loop {
132 if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
133 drain_buffer(&pool, &mut buffer).await;
134 last_drain = Instant::now();
135 } else if let Some(msg) = rx.recv().await {
136 tracing::debug!("Received {msg:?}");
137 match msg {
138 DatabaseQuery::Close => break,
139 _ => buffer.push_back(msg),
140 }
141 } else {
142 tracing::debug!("Command channel closed");
143 break;
144 }
145 }
146
147 if !buffer.is_empty() {
149 drain_buffer(&pool, &mut buffer).await;
150 }
151
152 log_task_stopped(CACHE_PROCESS);
153 }
154}
155
156pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
162 let connect_options = get_postgres_connect_options(None, None, None, None, None);
163 Ok(PostgresCacheDatabase::connect(
164 Some(connect_options.host),
165 Some(connect_options.port),
166 Some(connect_options.username),
167 Some(connect_options.password),
168 Some(connect_options.database),
169 )
170 .await?)
171}
172
173#[allow(dead_code)]
174#[allow(unused)]
175#[async_trait::async_trait]
176impl CacheDatabaseAdapter for PostgresCacheDatabase {
177 fn close(&mut self) -> anyhow::Result<()> {
178 let pool = self.pool.clone();
179 let (tx, rx) = std::sync::mpsc::channel();
180
181 log::debug!("Closing connection pool");
182
183 tokio::task::block_in_place(|| {
184 get_runtime().block_on(async {
185 pool.close().await;
186 if let Err(e) = tx.send(()) {
187 log::error!("Error closing pool: {e:?}");
188 }
189 });
190 });
191
192 if let Err(e) = self.tx.send(DatabaseQuery::Close) {
194 log::error!("Error sending close: {e:?}");
195 }
196
197 log_task_awaiting("cache-write");
198
199 tokio::task::block_in_place(|| {
200 if let Err(e) = get_runtime().block_on(&mut self.handle) {
201 log::error!("Error awaiting task 'cache-write': {e:?}");
202 }
203 });
204
205 log::debug!("Closed");
206
207 Ok(rx.recv()?)
208 }
209
210 fn flush(&mut self) -> anyhow::Result<()> {
211 let pool = self.pool.clone();
212 let (tx, rx) = std::sync::mpsc::channel();
213
214 tokio::task::block_in_place(|| {
215 get_runtime().block_on(async {
216 if let Err(e) = DatabaseQueries::truncate(&pool).await {
217 log::error!("Error flushing pool: {e:?}");
218 }
219 if let Err(e) = tx.send(()) {
220 log::error!("Error sending flush result: {e:?}");
221 }
222 });
223 });
224
225 Ok(rx.recv()?)
226 }
227
228 async fn load_all(&self) -> anyhow::Result<CacheMap> {
229 let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
230 self.load_currencies(),
231 self.load_instruments(),
232 self.load_synthetics(),
233 self.load_accounts(),
234 self.load_orders(),
235 self.load_positions()
236 )
237 .map_err(|e| anyhow::anyhow!("Error loading cache data: {}", e))?;
238
239 let greeks = HashMap::new();
242 let yield_curves = HashMap::new();
243
244 Ok(CacheMap {
245 currencies,
246 instruments,
247 synthetics,
248 accounts,
249 orders,
250 positions,
251 greeks,
252 yield_curves,
253 })
254 }
255
256 fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
257 let pool = self.pool.clone();
258 let (tx, rx) = std::sync::mpsc::channel();
259 tokio::spawn(async move {
260 let result = DatabaseQueries::load(&pool).await;
261 match result {
262 Ok(items) => {
263 let mapping = items
264 .into_iter()
265 .map(|(k, v)| (k, Bytes::from(v)))
266 .collect();
267 if let Err(e) = tx.send(mapping) {
268 log::error!("Failed to send general items: {e:?}");
269 }
270 }
271 Err(e) => {
272 log::error!("Failed to load general items: {e:?}");
273 if let Err(e) = tx.send(HashMap::new()) {
274 log::error!("Failed to send empty general items: {e:?}");
275 }
276 }
277 }
278 });
279 Ok(rx.recv()?)
280 }
281
282 async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>> {
283 let pool = self.pool.clone();
284 let (tx, rx) = std::sync::mpsc::channel();
285 tokio::spawn(async move {
286 let result = DatabaseQueries::load_currencies(&pool).await;
287 match result {
288 Ok(currencies) => {
289 let mapping = currencies
290 .into_iter()
291 .map(|currency| (currency.code, currency))
292 .collect();
293 if let Err(e) = tx.send(mapping) {
294 log::error!("Failed to send currencies: {e:?}");
295 }
296 }
297 Err(e) => {
298 log::error!("Failed to load currencies: {e:?}");
299 if let Err(e) = tx.send(HashMap::new()) {
300 log::error!("Failed to send empty currencies: {e:?}");
301 }
302 }
303 }
304 });
305 Ok(rx.recv()?)
306 }
307
308 async fn load_instruments(&self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
309 let pool = self.pool.clone();
310 let (tx, rx) = std::sync::mpsc::channel();
311 tokio::spawn(async move {
312 let result = DatabaseQueries::load_instruments(&pool).await;
313 match result {
314 Ok(instruments) => {
315 let mapping = instruments
316 .into_iter()
317 .map(|instrument| (instrument.id(), instrument))
318 .collect();
319 if let Err(e) = tx.send(mapping) {
320 log::error!("Failed to send instruments: {e:?}");
321 }
322 }
323 Err(e) => {
324 log::error!("Failed to load instruments: {e:?}");
325 if let Err(e) = tx.send(HashMap::new()) {
326 log::error!("Failed to send empty instruments: {e:?}");
327 }
328 }
329 }
330 });
331 Ok(rx.recv()?)
332 }
333
334 async fn load_synthetics(&self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
335 todo!()
336 }
337
338 async fn load_accounts(&self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
339 let pool = self.pool.clone();
340 let (tx, rx) = std::sync::mpsc::channel();
341 tokio::spawn(async move {
342 let result = DatabaseQueries::load_accounts(&pool).await;
343 match result {
344 Ok(accounts) => {
345 let mapping = accounts
346 .into_iter()
347 .map(|account| (account.id(), account))
348 .collect();
349 if let Err(e) = tx.send(mapping) {
350 log::error!("Failed to send accounts: {e:?}");
351 }
352 }
353 Err(e) => {
354 log::error!("Failed to load accounts: {e:?}");
355 if let Err(e) = tx.send(HashMap::new()) {
356 log::error!("Failed to send empty accounts: {e:?}");
357 }
358 }
359 }
360 });
361 Ok(rx.recv()?)
362 }
363
364 async fn load_orders(&self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
365 let pool = self.pool.clone();
366 let (tx, rx) = std::sync::mpsc::channel();
367 tokio::spawn(async move {
368 let result = DatabaseQueries::load_orders(&pool).await;
369 match result {
370 Ok(orders) => {
371 let mapping = orders
372 .into_iter()
373 .map(|order| (order.client_order_id(), order))
374 .collect();
375 if let Err(e) = tx.send(mapping) {
376 log::error!("Failed to send orders: {e:?}");
377 }
378 }
379 Err(e) => {
380 log::error!("Failed to load orders: {e:?}");
381 if let Err(e) = tx.send(HashMap::new()) {
382 log::error!("Failed to send empty orders: {e:?}");
383 }
384 }
385 }
386 });
387 Ok(rx.recv()?)
388 }
389
390 async fn load_positions(&self) -> anyhow::Result<HashMap<PositionId, Position>> {
391 todo!()
392 }
393
394 fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
395 todo!()
396 }
397
398 fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
399 let pool = self.pool.clone();
400 let (tx, rx) = std::sync::mpsc::channel();
401 tokio::spawn(async move {
402 let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
403 match result {
404 Ok(currency) => {
405 if let Err(e) = tx.send(currency) {
406 log::error!("Failed to send load_index_order_client result: {e:?}");
407 }
408 }
409 Err(e) => {
410 log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
411 if let Err(e) = tx.send(HashMap::new()) {
412 log::error!("Failed to send empty load_index_order_client result: {e:?}");
413 }
414 }
415 }
416 });
417 Ok(rx.recv()?)
418 }
419
420 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
421 let pool = self.pool.clone();
422 let code = code.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
424 tokio::spawn(async move {
425 let result = DatabaseQueries::load_currency(&pool, &code).await;
426 match result {
427 Ok(currency) => {
428 if let Err(e) = tx.send(currency) {
429 log::error!("Failed to send currency {code}: {e:?}");
430 }
431 }
432 Err(e) => {
433 log::error!("Failed to load currency {code}: {e:?}");
434 if let Err(e) = tx.send(None) {
435 log::error!("Failed to send None for currency {code}: {e:?}");
436 }
437 }
438 }
439 });
440 Ok(rx.recv()?)
441 }
442
443 async fn load_instrument(
444 &self,
445 instrument_id: &InstrumentId,
446 ) -> anyhow::Result<Option<InstrumentAny>> {
447 let pool = self.pool.clone();
448 let instrument_id = instrument_id.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
450 tokio::spawn(async move {
451 let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
452 match result {
453 Ok(instrument) => {
454 if let Err(e) = tx.send(instrument) {
455 log::error!("Failed to send instrument {instrument_id}: {e:?}");
456 }
457 }
458 Err(e) => {
459 log::error!("Failed to load instrument {instrument_id}: {e:?}");
460 if let Err(e) = tx.send(None) {
461 log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
462 }
463 }
464 }
465 });
466 Ok(rx.recv()?)
467 }
468
469 async fn load_synthetic(
470 &self,
471 instrument_id: &InstrumentId,
472 ) -> anyhow::Result<Option<SyntheticInstrument>> {
473 todo!()
474 }
475
476 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
477 let pool = self.pool.clone();
478 let account_id = account_id.to_owned();
479 let (tx, rx) = std::sync::mpsc::channel();
480 tokio::spawn(async move {
481 let result = DatabaseQueries::load_account(&pool, &account_id).await;
482 match result {
483 Ok(account) => {
484 if let Err(e) = tx.send(account) {
485 log::error!("Failed to send account {account_id}: {e:?}");
486 }
487 }
488 Err(e) => {
489 log::error!("Failed to load account {account_id}: {e:?}");
490 if let Err(e) = tx.send(None) {
491 log::error!("Failed to send None for account {account_id}: {e:?}");
492 }
493 }
494 }
495 });
496 Ok(rx.recv()?)
497 }
498
499 async fn load_order(
500 &self,
501 client_order_id: &ClientOrderId,
502 ) -> anyhow::Result<Option<OrderAny>> {
503 let pool = self.pool.clone();
504 let client_order_id = client_order_id.to_owned();
505 let (tx, rx) = std::sync::mpsc::channel();
506 tokio::spawn(async move {
507 let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
508 match result {
509 Ok(order) => {
510 if let Err(e) = tx.send(order) {
511 log::error!("Failed to send order {client_order_id}: {e:?}");
512 }
513 }
514 Err(e) => {
515 log::error!("Failed to load order {client_order_id}: {e:?}");
516 let _ = tx.send(None);
517 }
518 }
519 });
520 Ok(rx.recv()?)
521 }
522
523 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
524 todo!()
525 }
526
527 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
528 todo!()
529 }
530
531 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
532 todo!()
533 }
534
535 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
536 todo!()
537 }
538
539 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
540 todo!()
541 }
542
543 fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
544 anyhow::bail!(
545 "delete_order not implemented for PostgreSQL cache adapter: {client_order_id}"
546 )
547 }
548
549 fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
550 anyhow::bail!("delete_position not implemented for PostgreSQL cache adapter: {position_id}")
551 }
552
553 fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
554 anyhow::bail!(
555 "delete_account_event not implemented for PostgreSQL cache adapter: {account_id}, {event_id}"
556 )
557 }
558
559 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
560 let query = DatabaseQuery::Add(key, value.into());
561 self.tx
562 .send(query)
563 .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
564 }
565
566 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
567 let query = DatabaseQuery::AddCurrency(*currency);
568 self.tx.send(query).map_err(|e| {
569 anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
570 })
571 }
572
573 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
574 let query = DatabaseQuery::AddInstrument(instrument.clone());
575 self.tx.send(query).map_err(|e| {
576 anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
577 })
578 }
579
580 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
581 todo!()
582 }
583
584 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
585 let query = DatabaseQuery::AddAccount(account.clone(), false);
586 self.tx.send(query).map_err(|e| {
587 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
588 })
589 }
590
591 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
592 let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
593 self.tx.send(query).map_err(|e| {
594 anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
595 })
596 }
597
598 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
599 let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
600 self.tx.send(query).map_err(|e| {
601 anyhow::anyhow!(
602 "Failed to send query add_order_snapshot to database message handler: {e}"
603 )
604 })
605 }
606
607 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
608 todo!()
609 }
610
611 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
612 let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
613 self.tx.send(query).map_err(|e| {
614 anyhow::anyhow!(
615 "Failed to send query add_position_snapshot to database message handler: {e}"
616 )
617 })
618 }
619
620 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
621 todo!()
622 }
623
624 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
625 let query = DatabaseQuery::AddQuote(quote.to_owned());
626 self.tx.send(query).map_err(|e| {
627 anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
628 })
629 }
630
631 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
632 let pool = self.pool.clone();
633 let instrument_id = instrument_id.to_owned();
634 let (tx, rx) = std::sync::mpsc::channel();
635 tokio::spawn(async move {
636 let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
637 match result {
638 Ok(quotes) => {
639 if let Err(e) = tx.send(quotes) {
640 log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
641 }
642 }
643 Err(e) => {
644 log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
645 if let Err(e) = tx.send(Vec::new()) {
646 log::error!(
647 "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
648 );
649 }
650 }
651 }
652 });
653 Ok(rx.recv()?)
654 }
655
656 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
657 let query = DatabaseQuery::AddTrade(trade.to_owned());
658 self.tx.send(query).map_err(|e| {
659 anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
660 })
661 }
662
663 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
664 let pool = self.pool.clone();
665 let instrument_id = instrument_id.to_owned();
666 let (tx, rx) = std::sync::mpsc::channel();
667 tokio::spawn(async move {
668 let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
669 match result {
670 Ok(trades) => {
671 if let Err(e) = tx.send(trades) {
672 log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
673 }
674 }
675 Err(e) => {
676 log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
677 if let Err(e) = tx.send(Vec::new()) {
678 log::error!(
679 "Failed to send empty trades for instrument {instrument_id}: {e:?}"
680 );
681 }
682 }
683 }
684 });
685 Ok(rx.recv()?)
686 }
687
688 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
689 let query = DatabaseQuery::AddBar(bar.to_owned());
690 self.tx.send(query).map_err(|e| {
691 anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
692 })
693 }
694
695 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
696 let pool = self.pool.clone();
697 let instrument_id = instrument_id.to_owned();
698 let (tx, rx) = std::sync::mpsc::channel();
699 tokio::spawn(async move {
700 let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
701 match result {
702 Ok(bars) => {
703 if let Err(e) = tx.send(bars) {
704 log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
705 }
706 }
707 Err(e) => {
708 log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
709 if let Err(e) = tx.send(Vec::new()) {
710 log::error!(
711 "Failed to send empty bars for instrument {instrument_id}: {e:?}"
712 );
713 }
714 }
715 }
716 });
717 Ok(rx.recv()?)
718 }
719
720 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
721 let query = DatabaseQuery::AddSignal(signal.to_owned());
722 self.tx.send(query).map_err(|e| {
723 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
724 })
725 }
726
727 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
728 let pool = self.pool.clone();
729 let name = name.to_owned();
730 let (tx, rx) = std::sync::mpsc::channel();
731 tokio::spawn(async move {
732 let result = DatabaseQueries::load_signals(&pool, &name).await;
733 match result {
734 Ok(signals) => {
735 if let Err(e) = tx.send(signals) {
736 log::error!("Failed to send signals for '{name}': {e:?}");
737 }
738 }
739 Err(e) => {
740 log::error!("Failed to load signals for '{name}': {e:?}");
741 if let Err(e) = tx.send(Vec::new()) {
742 log::error!("Failed to send empty signals for '{name}': {e:?}");
743 }
744 }
745 }
746 });
747 Ok(rx.recv()?)
748 }
749
750 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
751 let query = DatabaseQuery::AddCustom(data.to_owned());
752 self.tx.send(query).map_err(|e| {
753 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
754 })
755 }
756
757 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
758 let pool = self.pool.clone();
759 let data_type = data_type.to_owned();
760 let (tx, rx) = std::sync::mpsc::channel();
761 tokio::spawn(async move {
762 let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
763 match result {
764 Ok(signals) => {
765 if let Err(e) = tx.send(signals) {
766 log::error!("Failed to send custom data for '{data_type}': {e:?}");
767 }
768 }
769 Err(e) => {
770 log::error!("Failed to load custom data for '{data_type}': {e:?}");
771 if let Err(e) = tx.send(Vec::new()) {
772 log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
773 }
774 }
775 }
776 });
777 Ok(rx.recv()?)
778 }
779
780 fn load_order_snapshot(
781 &self,
782 client_order_id: &ClientOrderId,
783 ) -> anyhow::Result<Option<OrderSnapshot>> {
784 let pool = self.pool.clone();
785 let client_order_id = client_order_id.to_owned();
786 let (tx, rx) = std::sync::mpsc::channel();
787 tokio::spawn(async move {
788 let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
789 match result {
790 Ok(snapshot) => {
791 if let Err(e) = tx.send(snapshot) {
792 log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
793 }
794 }
795 Err(e) => {
796 log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
797 if let Err(e) = tx.send(None) {
798 log::error!(
799 "Failed to send None for order snapshot {client_order_id}: {e:?}"
800 );
801 }
802 }
803 }
804 });
805 Ok(rx.recv()?)
806 }
807
808 fn load_position_snapshot(
809 &self,
810 position_id: &PositionId,
811 ) -> anyhow::Result<Option<PositionSnapshot>> {
812 let pool = self.pool.clone();
813 let position_id = position_id.to_owned();
814 let (tx, rx) = std::sync::mpsc::channel();
815 tokio::spawn(async move {
816 let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
817 match result {
818 Ok(snapshot) => {
819 if let Err(e) = tx.send(snapshot) {
820 log::error!("Failed to send position snapshot {position_id}: {e:?}");
821 }
822 }
823 Err(e) => {
824 log::error!("Failed to load position snapshot {position_id}: {e:?}");
825 if let Err(e) = tx.send(None) {
826 log::error!(
827 "Failed to send None for position snapshot {position_id}: {e:?}"
828 );
829 }
830 }
831 }
832 });
833 Ok(rx.recv()?)
834 }
835
836 fn index_venue_order_id(
837 &self,
838 client_order_id: ClientOrderId,
839 venue_order_id: VenueOrderId,
840 ) -> anyhow::Result<()> {
841 todo!()
842 }
843
844 fn index_order_position(
845 &self,
846 client_order_id: ClientOrderId,
847 position_id: PositionId,
848 ) -> anyhow::Result<()> {
849 todo!()
850 }
851
852 fn update_actor(&self) -> anyhow::Result<()> {
853 todo!()
854 }
855
856 fn update_strategy(&self) -> anyhow::Result<()> {
857 todo!()
858 }
859
860 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
861 let query = DatabaseQuery::AddAccount(account.clone(), true);
862 self.tx.send(query).map_err(|e| {
863 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
864 })
865 }
866
867 fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
868 let query = DatabaseQuery::UpdateOrder(event.clone());
869 self.tx.send(query).map_err(|e| {
870 anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
871 })
872 }
873
874 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
875 todo!()
876 }
877
878 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
879 todo!()
880 }
881
882 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
883 todo!()
884 }
885
886 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
887 todo!()
888 }
889}
890
891async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
892 for cmd in buffer.drain(..) {
893 let result: anyhow::Result<()> = match cmd {
894 DatabaseQuery::Close => Ok(()),
895 DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value).await,
896 DatabaseQuery::AddCurrency(currency) => {
897 DatabaseQueries::add_currency(pool, currency).await
898 }
899 DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
900 InstrumentAny::Betting(instrument) => {
901 DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
902 }
903 InstrumentAny::BinaryOption(instrument) => {
904 DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
905 .await
906 }
907 InstrumentAny::CryptoFuture(instrument) => {
908 DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
909 .await
910 }
911 InstrumentAny::CryptoOption(instrument) => {
912 DatabaseQueries::add_instrument(pool, "CRYPTO_OPTION", Box::new(instrument))
913 .await
914 }
915 InstrumentAny::CryptoPerpetual(instrument) => {
916 DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
917 .await
918 }
919 InstrumentAny::CurrencyPair(instrument) => {
920 DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
921 .await
922 }
923 InstrumentAny::Equity(equity) => {
924 DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
925 }
926 InstrumentAny::FuturesContract(instrument) => {
927 DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
928 .await
929 }
930 InstrumentAny::FuturesSpread(instrument) => {
931 DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
932 .await
933 }
934 InstrumentAny::OptionContract(instrument) => {
935 DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
936 .await
937 }
938 InstrumentAny::OptionSpread(instrument) => {
939 DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
940 .await
941 }
942 },
943 DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
944 OrderAny::Limit(order) => {
945 DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
946 .await
947 }
948 OrderAny::LimitIfTouched(order) => {
949 DatabaseQueries::add_order(
950 pool,
951 "LIMIT_IF_TOUCHED",
952 updated,
953 Box::new(order),
954 client_id,
955 )
956 .await
957 }
958 OrderAny::Market(order) => {
959 DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
960 .await
961 }
962 OrderAny::MarketIfTouched(order) => {
963 DatabaseQueries::add_order(
964 pool,
965 "MARKET_IF_TOUCHED",
966 updated,
967 Box::new(order),
968 client_id,
969 )
970 .await
971 }
972 OrderAny::MarketToLimit(order) => {
973 DatabaseQueries::add_order(
974 pool,
975 "MARKET_TO_LIMIT",
976 updated,
977 Box::new(order),
978 client_id,
979 )
980 .await
981 }
982 OrderAny::StopLimit(order) => {
983 DatabaseQueries::add_order(
984 pool,
985 "STOP_LIMIT",
986 updated,
987 Box::new(order),
988 client_id,
989 )
990 .await
991 }
992 OrderAny::StopMarket(order) => {
993 DatabaseQueries::add_order(
994 pool,
995 "STOP_MARKET",
996 updated,
997 Box::new(order),
998 client_id,
999 )
1000 .await
1001 }
1002 OrderAny::TrailingStopLimit(order) => {
1003 DatabaseQueries::add_order(
1004 pool,
1005 "TRAILING_STOP_LIMIT",
1006 updated,
1007 Box::new(order),
1008 client_id,
1009 )
1010 .await
1011 }
1012 OrderAny::TrailingStopMarket(order) => {
1013 DatabaseQueries::add_order(
1014 pool,
1015 "TRAILING_STOP_MARKET",
1016 updated,
1017 Box::new(order),
1018 client_id,
1019 )
1020 .await
1021 }
1022 },
1023 DatabaseQuery::AddOrderSnapshot(snapshot) => {
1024 DatabaseQueries::add_order_snapshot(pool, snapshot).await
1025 }
1026 DatabaseQuery::AddPositionSnapshot(snapshot) => {
1027 DatabaseQueries::add_position_snapshot(pool, snapshot).await
1028 }
1029 DatabaseQuery::AddAccount(account_any, updated) => match account_any {
1030 AccountAny::Cash(account) => {
1031 DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
1032 }
1033 AccountAny::Margin(account) => {
1034 DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
1035 }
1036 },
1037 DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
1038 DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
1039 DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, "e).await,
1040 DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
1041 DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
1042 DatabaseQuery::UpdateOrder(event) => {
1043 DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
1044 }
1045 };
1046
1047 if let Err(e) = result {
1048 tracing::error!("Error on query: {e:?}");
1049 }
1050 }
1051}