1use std::{
17 collections::{HashMap, VecDeque},
18 time::{Duration, Instant},
19};
20
21use bytes::Bytes;
22use nautilus_common::{
23 cache::database::CacheDatabaseAdapter, custom::CustomData, runtime::get_runtime, signal::Signal,
24};
25use nautilus_core::UnixNanos;
26use nautilus_model::{
27 accounts::AccountAny,
28 data::{Bar, DataType, QuoteTick, TradeTick},
29 events::{position::snapshot::PositionSnapshot, OrderEventAny, OrderSnapshot},
30 identifiers::{
31 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
32 VenueOrderId,
33 },
34 instruments::{InstrumentAny, SyntheticInstrument},
35 orderbook::OrderBook,
36 orders::OrderAny,
37 position::Position,
38 types::Currency,
39};
40use sqlx::{postgres::PgConnectOptions, PgPool};
41use ustr::Ustr;
42
43use crate::sql::{
44 pg::{connect_pg, get_postgres_connect_options},
45 queries::DatabaseQueries,
46};
47
48#[derive(Debug)]
49#[cfg_attr(
50 feature = "python",
51 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
52)]
53pub struct PostgresCacheDatabase {
54 pub pool: PgPool,
55 tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
56 handle: tokio::task::JoinHandle<()>,
57}
58
59#[allow(clippy::large_enum_variant)]
60#[derive(Debug, Clone)]
61pub enum DatabaseQuery {
62 Close,
63 Add(String, Vec<u8>),
64 AddCurrency(Currency),
65 AddInstrument(InstrumentAny),
66 AddOrder(OrderAny, Option<ClientId>, bool),
67 AddOrderSnapshot(OrderSnapshot),
68 AddPositionSnapshot(PositionSnapshot),
69 AddAccount(AccountAny, bool),
70 AddSignal(Signal),
71 AddCustom(CustomData),
72 AddQuote(QuoteTick),
73 AddTrade(TradeTick),
74 AddBar(Bar),
75 UpdateOrder(OrderEventAny),
76}
77
78impl PostgresCacheDatabase {
79 pub async fn connect(
80 host: Option<String>,
81 port: Option<u16>,
82 username: Option<String>,
83 password: Option<String>,
84 database: Option<String>,
85 ) -> Result<Self, sqlx::Error> {
86 let pg_connect_options =
87 get_postgres_connect_options(host, port, username, password, database);
88 let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
89 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
90
91 let handle = tokio::spawn(async move {
93 PostgresCacheDatabase::process_commands(rx, pg_connect_options.clone().into()).await;
94 });
95 Ok(PostgresCacheDatabase { pool, tx, handle })
96 }
97
98 async fn process_commands(
99 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
100 pg_connect_options: PgConnectOptions,
101 ) {
102 tracing::debug!("Starting cache processing");
103
104 let pool = connect_pg(pg_connect_options).await.unwrap();
105
106 let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
108 let mut last_drain = Instant::now();
109
110 let buffer_interval = Duration::from_millis(0);
112
113 loop {
115 if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
116 drain_buffer(&pool, &mut buffer).await;
117 last_drain = Instant::now();
118 } else {
119 match rx.recv().await {
120 Some(msg) => match msg {
121 DatabaseQuery::Close => break,
122 _ => buffer.push_back(msg),
123 },
124 None => break, }
126 }
127 }
128
129 if !buffer.is_empty() {
131 drain_buffer(&pool, &mut buffer).await;
132 }
133
134 tracing::debug!("Stopped cache processing");
135 }
136}
137
138pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
139 let connect_options = get_postgres_connect_options(None, None, None, None, None);
140 Ok(PostgresCacheDatabase::connect(
141 Some(connect_options.host),
142 Some(connect_options.port),
143 Some(connect_options.username),
144 Some(connect_options.password),
145 Some(connect_options.database),
146 )
147 .await?)
148}
149
150#[allow(dead_code)]
151#[allow(unused)]
152impl CacheDatabaseAdapter for PostgresCacheDatabase {
153 fn close(&mut self) -> anyhow::Result<()> {
154 let pool = self.pool.clone();
155 let (tx, rx) = std::sync::mpsc::channel();
156
157 log::debug!("Closing connection pool");
158 tokio::task::block_in_place(|| {
159 get_runtime().block_on(async {
160 pool.close().await;
161 if let Err(e) = tx.send(()) {
162 log::error!("Error closing pool: {e:?}");
163 }
164 });
165 });
166
167 if let Err(e) = self.tx.send(DatabaseQuery::Close) {
169 log::error!("Error sending close: {e:?}");
170 }
171
172 log::debug!("Awaiting task 'cache-write'"); tokio::task::block_in_place(|| {
174 if let Err(e) = get_runtime().block_on(&mut self.handle) {
175 log::error!("Error awaiting task 'cache-write': {e:?}");
176 }
177 });
178
179 log::debug!("Closed");
180
181 Ok(rx.recv()?)
182 }
183
184 fn flush(&mut self) -> anyhow::Result<()> {
185 let pool = self.pool.clone();
186 let (tx, rx) = std::sync::mpsc::channel();
187
188 tokio::task::block_in_place(|| {
189 get_runtime().block_on(async {
190 if let Err(e) = DatabaseQueries::truncate(&pool).await {
191 log::error!("Error flushing pool: {e:?}");
192 }
193 if let Err(e) = tx.send(()) {
194 log::error!("Error sending flush result: {e:?}");
195 }
196 });
197 });
198
199 Ok(rx.recv()?)
200 }
201
202 fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
203 let pool = self.pool.clone();
204 let (tx, rx) = std::sync::mpsc::channel();
205 tokio::spawn(async move {
206 let result = DatabaseQueries::load(&pool).await;
207 match result {
208 Ok(items) => {
209 let mapping = items
210 .into_iter()
211 .map(|(k, v)| (k, Bytes::from(v)))
212 .collect();
213 if let Err(e) = tx.send(mapping) {
214 log::error!("Failed to send general items: {e:?}");
215 }
216 }
217 Err(e) => {
218 log::error!("Failed to load general items: {e:?}");
219 if let Err(e) = tx.send(HashMap::new()) {
220 log::error!("Failed to send empty general items: {e:?}");
221 }
222 }
223 }
224 });
225 Ok(rx.recv()?)
226 }
227
228 fn load_currencies(&mut self) -> anyhow::Result<HashMap<Ustr, Currency>> {
229 let pool = self.pool.clone();
230 let (tx, rx) = std::sync::mpsc::channel();
231 tokio::spawn(async move {
232 let result = DatabaseQueries::load_currencies(&pool).await;
233 match result {
234 Ok(currencies) => {
235 let mapping = currencies
236 .into_iter()
237 .map(|currency| (currency.code, currency))
238 .collect();
239 if let Err(e) = tx.send(mapping) {
240 log::error!("Failed to send currencies: {e:?}");
241 }
242 }
243 Err(e) => {
244 log::error!("Failed to load currencies: {e:?}");
245 if let Err(e) = tx.send(HashMap::new()) {
246 log::error!("Failed to send empty currencies: {e:?}");
247 }
248 }
249 }
250 });
251 Ok(rx.recv()?)
252 }
253
254 fn load_instruments(&mut self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
255 let pool = self.pool.clone();
256 let (tx, rx) = std::sync::mpsc::channel();
257 tokio::spawn(async move {
258 let result = DatabaseQueries::load_instruments(&pool).await;
259 match result {
260 Ok(instruments) => {
261 let mapping = instruments
262 .into_iter()
263 .map(|instrument| (instrument.id(), instrument))
264 .collect();
265 if let Err(e) = tx.send(mapping) {
266 log::error!("Failed to send instruments: {e:?}");
267 }
268 }
269 Err(e) => {
270 log::error!("Failed to load instruments: {e:?}");
271 if let Err(e) = tx.send(HashMap::new()) {
272 log::error!("Failed to send empty instruments: {e:?}");
273 }
274 }
275 }
276 });
277 Ok(rx.recv()?)
278 }
279
280 fn load_synthetics(&mut self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
281 todo!()
282 }
283
284 fn load_accounts(&mut self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
285 let pool = self.pool.clone();
286 let (tx, rx) = std::sync::mpsc::channel();
287 tokio::spawn(async move {
288 let result = DatabaseQueries::load_accounts(&pool).await;
289 match result {
290 Ok(accounts) => {
291 let mapping = accounts
292 .into_iter()
293 .map(|account| (account.id(), account))
294 .collect();
295 if let Err(e) = tx.send(mapping) {
296 log::error!("Failed to send accounts: {e:?}");
297 }
298 }
299 Err(e) => {
300 log::error!("Failed to load accounts: {e:?}");
301 if let Err(e) = tx.send(HashMap::new()) {
302 log::error!("Failed to send empty accounts: {e:?}");
303 }
304 }
305 }
306 });
307 Ok(rx.recv()?)
308 }
309
310 fn load_orders(&mut self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
311 let pool = self.pool.clone();
312 let (tx, rx) = std::sync::mpsc::channel();
313 tokio::spawn(async move {
314 let result = DatabaseQueries::load_orders(&pool).await;
315 match result {
316 Ok(orders) => {
317 let mapping = orders
318 .into_iter()
319 .map(|order| (order.client_order_id(), order))
320 .collect();
321 if let Err(e) = tx.send(mapping) {
322 log::error!("Failed to send orders: {e:?}");
323 }
324 }
325 Err(e) => {
326 log::error!("Failed to load orders: {e:?}");
327 if let Err(e) = tx.send(HashMap::new()) {
328 log::error!("Failed to send empty orders: {e:?}");
329 }
330 }
331 }
332 });
333 Ok(rx.recv()?)
334 }
335
336 fn load_positions(&mut self) -> anyhow::Result<HashMap<PositionId, Position>> {
337 todo!()
338 }
339
340 fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
341 todo!()
342 }
343
344 fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
345 let pool = self.pool.clone();
346 let (tx, rx) = std::sync::mpsc::channel();
347 tokio::spawn(async move {
348 let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
349 match result {
350 Ok(currency) => {
351 if let Err(e) = tx.send(currency) {
352 log::error!("Failed to send load_index_order_client result: {e:?}");
353 }
354 }
355 Err(e) => {
356 log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
357 if let Err(e) = tx.send(HashMap::new()) {
358 log::error!("Failed to send empty load_index_order_client result: {e:?}");
359 }
360 }
361 }
362 });
363 Ok(rx.recv()?)
364 }
365
366 fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
367 let pool = self.pool.clone();
368 let code = code.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
370 tokio::spawn(async move {
371 let result = DatabaseQueries::load_currency(&pool, &code).await;
372 match result {
373 Ok(currency) => {
374 if let Err(e) = tx.send(currency) {
375 log::error!("Failed to send currency {code}: {e:?}");
376 }
377 }
378 Err(e) => {
379 log::error!("Failed to load currency {code}: {e:?}");
380 if let Err(e) = tx.send(None) {
381 log::error!("Failed to send None for currency {code}: {e:?}");
382 }
383 }
384 }
385 });
386 Ok(rx.recv()?)
387 }
388
389 fn load_instrument(
390 &self,
391 instrument_id: &InstrumentId,
392 ) -> anyhow::Result<Option<InstrumentAny>> {
393 let pool = self.pool.clone();
394 let instrument_id = instrument_id.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
396 tokio::spawn(async move {
397 let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
398 match result {
399 Ok(instrument) => {
400 if let Err(e) = tx.send(instrument) {
401 log::error!("Failed to send instrument {instrument_id}: {e:?}");
402 }
403 }
404 Err(e) => {
405 log::error!("Failed to load instrument {instrument_id}: {e:?}");
406 if let Err(e) = tx.send(None) {
407 log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
408 }
409 }
410 }
411 });
412 Ok(rx.recv()?)
413 }
414
415 fn load_synthetic(&self, instrument_id: &InstrumentId) -> anyhow::Result<SyntheticInstrument> {
416 todo!()
417 }
418
419 fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
420 let pool = self.pool.clone();
421 let account_id = account_id.to_owned();
422 let (tx, rx) = std::sync::mpsc::channel();
423 tokio::spawn(async move {
424 let result = DatabaseQueries::load_account(&pool, &account_id).await;
425 match result {
426 Ok(account) => {
427 if let Err(e) = tx.send(account) {
428 log::error!("Failed to send account {account_id}: {e:?}");
429 }
430 }
431 Err(e) => {
432 log::error!("Failed to load account {account_id}: {e:?}");
433 if let Err(e) = tx.send(None) {
434 log::error!("Failed to send None for account {account_id}: {e:?}");
435 }
436 }
437 }
438 });
439 Ok(rx.recv()?)
440 }
441
442 fn load_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<Option<OrderAny>> {
443 let pool = self.pool.clone();
444 let client_order_id = client_order_id.to_owned();
445 let (tx, rx) = std::sync::mpsc::channel();
446 tokio::spawn(async move {
447 let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
448 match result {
449 Ok(order) => {
450 if let Err(e) = tx.send(order) {
451 log::error!("Failed to send order {client_order_id}: {e:?}");
452 }
453 }
454 Err(e) => {
455 log::error!("Failed to load order {client_order_id}: {e:?}");
456 let _ = tx.send(None);
457 }
458 }
459 });
460 Ok(rx.recv()?)
461 }
462
463 fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Position> {
464 todo!()
465 }
466
467 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
468 todo!()
469 }
470
471 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
472 todo!()
473 }
474
475 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
476 todo!()
477 }
478
479 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
480 todo!()
481 }
482
483 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
484 let query = DatabaseQuery::Add(key, value.into());
485 self.tx
486 .send(query)
487 .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
488 }
489
490 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
491 let query = DatabaseQuery::AddCurrency(*currency);
492 self.tx.send(query).map_err(|e| {
493 anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
494 })
495 }
496
497 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
498 let query = DatabaseQuery::AddInstrument(instrument.clone());
499 self.tx.send(query).map_err(|e| {
500 anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
501 })
502 }
503
504 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
505 todo!()
506 }
507
508 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
509 let query = DatabaseQuery::AddAccount(account.clone(), false);
510 self.tx.send(query).map_err(|e| {
511 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
512 })
513 }
514
515 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
516 let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
517 self.tx.send(query).map_err(|e| {
518 anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
519 })
520 }
521
522 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
523 let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
524 self.tx.send(query).map_err(|e| {
525 anyhow::anyhow!(
526 "Failed to send query add_order_snapshot to database message handler: {e}"
527 )
528 })
529 }
530
531 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
532 todo!()
533 }
534
535 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
536 let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
537 self.tx.send(query).map_err(|e| {
538 anyhow::anyhow!(
539 "Failed to send query add_position_snapshot to database message handler: {e}"
540 )
541 })
542 }
543
544 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
545 todo!()
546 }
547
548 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
549 let query = DatabaseQuery::AddQuote(quote.to_owned());
550 self.tx.send(query).map_err(|e| {
551 anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
552 })
553 }
554
555 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
556 let pool = self.pool.clone();
557 let instrument_id = instrument_id.to_owned();
558 let (tx, rx) = std::sync::mpsc::channel();
559 tokio::spawn(async move {
560 let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
561 match result {
562 Ok(quotes) => {
563 if let Err(e) = tx.send(quotes) {
564 log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
565 }
566 }
567 Err(e) => {
568 log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
569 if let Err(e) = tx.send(Vec::new()) {
570 log::error!(
571 "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
572 );
573 }
574 }
575 }
576 });
577 Ok(rx.recv()?)
578 }
579
580 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
581 let query = DatabaseQuery::AddTrade(trade.to_owned());
582 self.tx.send(query).map_err(|e| {
583 anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
584 })
585 }
586
587 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
588 let pool = self.pool.clone();
589 let instrument_id = instrument_id.to_owned();
590 let (tx, rx) = std::sync::mpsc::channel();
591 tokio::spawn(async move {
592 let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
593 match result {
594 Ok(trades) => {
595 if let Err(e) = tx.send(trades) {
596 log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
597 }
598 }
599 Err(e) => {
600 log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
601 if let Err(e) = tx.send(Vec::new()) {
602 log::error!(
603 "Failed to send empty trades for instrument {instrument_id}: {e:?}"
604 );
605 }
606 }
607 }
608 });
609 Ok(rx.recv()?)
610 }
611
612 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
613 let query = DatabaseQuery::AddBar(bar.to_owned());
614 self.tx.send(query).map_err(|e| {
615 anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
616 })
617 }
618
619 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
620 let pool = self.pool.clone();
621 let instrument_id = instrument_id.to_owned();
622 let (tx, rx) = std::sync::mpsc::channel();
623 tokio::spawn(async move {
624 let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
625 match result {
626 Ok(bars) => {
627 if let Err(e) = tx.send(bars) {
628 log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
629 }
630 }
631 Err(e) => {
632 log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
633 if let Err(e) = tx.send(Vec::new()) {
634 log::error!(
635 "Failed to send empty bars for instrument {instrument_id}: {e:?}"
636 );
637 }
638 }
639 }
640 });
641 Ok(rx.recv()?)
642 }
643
644 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
645 let query = DatabaseQuery::AddSignal(signal.to_owned());
646 self.tx.send(query).map_err(|e| {
647 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
648 })
649 }
650
651 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
652 let pool = self.pool.clone();
653 let name = name.to_owned();
654 let (tx, rx) = std::sync::mpsc::channel();
655 tokio::spawn(async move {
656 let result = DatabaseQueries::load_signals(&pool, &name).await;
657 match result {
658 Ok(signals) => {
659 if let Err(e) = tx.send(signals) {
660 log::error!("Failed to send signals for '{name}': {e:?}");
661 }
662 }
663 Err(e) => {
664 log::error!("Failed to load signals for '{name}': {e:?}");
665 if let Err(e) = tx.send(Vec::new()) {
666 log::error!("Failed to send empty signals for '{name}': {e:?}");
667 }
668 }
669 }
670 });
671 Ok(rx.recv()?)
672 }
673
674 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
675 let query = DatabaseQuery::AddCustom(data.to_owned());
676 self.tx.send(query).map_err(|e| {
677 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
678 })
679 }
680
681 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
682 let pool = self.pool.clone();
683 let data_type = data_type.to_owned();
684 let (tx, rx) = std::sync::mpsc::channel();
685 tokio::spawn(async move {
686 let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
687 match result {
688 Ok(signals) => {
689 if let Err(e) = tx.send(signals) {
690 log::error!("Failed to send custom data for '{data_type}': {e:?}");
691 }
692 }
693 Err(e) => {
694 log::error!("Failed to load custom data for '{data_type}': {e:?}");
695 if let Err(e) = tx.send(Vec::new()) {
696 log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
697 }
698 }
699 }
700 });
701 Ok(rx.recv()?)
702 }
703
704 fn load_order_snapshot(
705 &self,
706 client_order_id: &ClientOrderId,
707 ) -> anyhow::Result<Option<OrderSnapshot>> {
708 let pool = self.pool.clone();
709 let client_order_id = client_order_id.to_owned();
710 let (tx, rx) = std::sync::mpsc::channel();
711 tokio::spawn(async move {
712 let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
713 match result {
714 Ok(snapshot) => {
715 if let Err(e) = tx.send(snapshot) {
716 log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
717 }
718 }
719 Err(e) => {
720 log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
721 if let Err(e) = tx.send(None) {
722 log::error!(
723 "Failed to send None for order snapshot {client_order_id}: {e:?}"
724 );
725 }
726 }
727 }
728 });
729 Ok(rx.recv()?)
730 }
731
732 fn load_position_snapshot(
733 &self,
734 position_id: &PositionId,
735 ) -> anyhow::Result<Option<PositionSnapshot>> {
736 let pool = self.pool.clone();
737 let position_id = position_id.to_owned();
738 let (tx, rx) = std::sync::mpsc::channel();
739 tokio::spawn(async move {
740 let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
741 match result {
742 Ok(snapshot) => {
743 if let Err(e) = tx.send(snapshot) {
744 log::error!("Failed to send position snapshot {position_id}: {e:?}");
745 }
746 }
747 Err(e) => {
748 log::error!("Failed to load position snapshot {position_id}: {e:?}");
749 if let Err(e) = tx.send(None) {
750 log::error!(
751 "Failed to send None for position snapshot {position_id}: {e:?}"
752 );
753 }
754 }
755 }
756 });
757 Ok(rx.recv()?)
758 }
759
760 fn index_venue_order_id(
761 &self,
762 client_order_id: ClientOrderId,
763 venue_order_id: VenueOrderId,
764 ) -> anyhow::Result<()> {
765 todo!()
766 }
767
768 fn index_order_position(
769 &self,
770 client_order_id: ClientOrderId,
771 position_id: PositionId,
772 ) -> anyhow::Result<()> {
773 todo!()
774 }
775
776 fn update_actor(&self) -> anyhow::Result<()> {
777 todo!()
778 }
779
780 fn update_strategy(&self) -> anyhow::Result<()> {
781 todo!()
782 }
783
784 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
785 let query = DatabaseQuery::AddAccount(account.clone(), true);
786 self.tx.send(query).map_err(|e| {
787 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
788 })
789 }
790
791 fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
792 let query = DatabaseQuery::UpdateOrder(event.clone());
793 self.tx.send(query).map_err(|e| {
794 anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
795 })
796 }
797
798 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
799 todo!()
800 }
801
802 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
803 todo!()
804 }
805
806 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
807 todo!()
808 }
809
810 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
811 todo!()
812 }
813}
814
815async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
816 for cmd in buffer.drain(..) {
817 let result: anyhow::Result<()> = match cmd {
818 DatabaseQuery::Close => Ok(()),
819 DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value)
820 .await
821 .map_err(anyhow::Error::from),
822 DatabaseQuery::AddCurrency(currency) => DatabaseQueries::add_currency(pool, currency)
823 .await
824 .map_err(anyhow::Error::from),
825 DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
826 InstrumentAny::Betting(instrument) => {
827 DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
828 }
829 InstrumentAny::BinaryOption(instrument) => {
830 DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
831 .await
832 }
833 InstrumentAny::CryptoFuture(instrument) => {
834 DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
835 .await
836 }
837 InstrumentAny::CryptoPerpetual(instrument) => {
838 DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
839 .await
840 }
841 InstrumentAny::CurrencyPair(instrument) => {
842 DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
843 .await
844 }
845 InstrumentAny::Equity(equity) => {
846 DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
847 }
848 InstrumentAny::FuturesContract(instrument) => {
849 DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
850 .await
851 }
852 InstrumentAny::FuturesSpread(instrument) => {
853 DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
854 .await
855 }
856 InstrumentAny::OptionContract(instrument) => {
857 DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
858 .await
859 }
860 InstrumentAny::OptionSpread(instrument) => {
861 DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
862 .await
863 }
864 },
865 DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
866 OrderAny::Limit(order) => {
867 DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
868 .await
869 }
870 OrderAny::LimitIfTouched(order) => {
871 DatabaseQueries::add_order(
872 pool,
873 "LIMIT_IF_TOUCHED",
874 updated,
875 Box::new(order),
876 client_id,
877 )
878 .await
879 }
880 OrderAny::Market(order) => {
881 DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
882 .await
883 }
884 OrderAny::MarketIfTouched(order) => {
885 DatabaseQueries::add_order(
886 pool,
887 "MARKET_IF_TOUCHED",
888 updated,
889 Box::new(order),
890 client_id,
891 )
892 .await
893 }
894 OrderAny::MarketToLimit(order) => {
895 DatabaseQueries::add_order(
896 pool,
897 "MARKET_TO_LIMIT",
898 updated,
899 Box::new(order),
900 client_id,
901 )
902 .await
903 }
904 OrderAny::StopLimit(order) => {
905 DatabaseQueries::add_order(
906 pool,
907 "STOP_LIMIT",
908 updated,
909 Box::new(order),
910 client_id,
911 )
912 .await
913 }
914 OrderAny::StopMarket(order) => {
915 DatabaseQueries::add_order(
916 pool,
917 "STOP_MARKET",
918 updated,
919 Box::new(order),
920 client_id,
921 )
922 .await
923 }
924 OrderAny::TrailingStopLimit(order) => {
925 DatabaseQueries::add_order(
926 pool,
927 "TRAILING_STOP_LIMIT",
928 updated,
929 Box::new(order),
930 client_id,
931 )
932 .await
933 }
934 OrderAny::TrailingStopMarket(order) => {
935 DatabaseQueries::add_order(
936 pool,
937 "TRAILING_STOP_MARKET",
938 updated,
939 Box::new(order),
940 client_id,
941 )
942 .await
943 }
944 },
945 DatabaseQuery::AddOrderSnapshot(snapshot) => {
946 DatabaseQueries::add_order_snapshot(pool, snapshot).await
947 }
948 DatabaseQuery::AddPositionSnapshot(snapshot) => {
949 DatabaseQueries::add_position_snapshot(pool, snapshot).await
950 }
951 DatabaseQuery::AddAccount(account_any, updated) => match account_any {
952 AccountAny::Cash(account) => {
953 DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
954 }
955 AccountAny::Margin(account) => {
956 DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
957 }
958 },
959 DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
960 DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
961 DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, "e).await,
962 DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
963 DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
964 DatabaseQuery::UpdateOrder(event) => {
965 DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
966 }
967 };
968
969 if let Err(e) = result {
970 tracing::error!("Error on query: {e:?}");
971 }
972 }
973}