1use ahash::AHashMap;
17use nautilus_common::{custom::CustomData, signal::Signal};
18use nautilus_model::{
19 accounts::{Account, AccountAny},
20 data::{Bar, DataType, QuoteTick, TradeTick},
21 events::{
22 AccountState, OrderEvent, OrderEventAny, OrderSnapshot,
23 position::snapshot::PositionSnapshot,
24 },
25 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, PositionId},
26 instruments::{Instrument, InstrumentAny},
27 orders::{Order, OrderAny},
28 types::{AccountBalance, Currency, MarginBalance},
29};
30use sqlx::{PgPool, Row};
31
32use super::models::{
33 orders::OrderSnapshotModel,
34 positions::PositionSnapshotModel,
35 types::{CustomDataModel, SignalModel},
36};
37use crate::sql::models::{
38 accounts::AccountEventModel,
39 data::{BarModel, QuoteTickModel, TradeTickModel},
40 enums::{
41 AggregationSourceModel, AggressorSideModel, AssetClassModel, BarAggregationModel,
42 CurrencyTypeModel, PriceTypeModel, TrailingOffsetTypeModel,
43 },
44 general::{GeneralRow, OrderEventOrderClientIdCombination},
45 instruments::InstrumentAnyModel,
46 orders::OrderEventAnyModel,
47 types::CurrencyModel,
48};
49
50#[derive(Debug)]
51pub struct DatabaseQueries;
52
53impl DatabaseQueries {
54 pub async fn truncate(pool: &PgPool) -> anyhow::Result<()> {
60 sqlx::query("SELECT truncate_all_tables()")
61 .execute(pool)
62 .await
63 .map(|_| ())
64 .map_err(|e| anyhow::anyhow!("Failed to truncate tables: {e}"))
65 }
66
67 pub async fn add(pool: &PgPool, key: String, value: Vec<u8>) -> anyhow::Result<()> {
73 sqlx::query("INSERT INTO general (id, value) VALUES ($1, $2)")
74 .bind(key)
75 .bind(value)
76 .execute(pool)
77 .await
78 .map(|_| ())
79 .map_err(|e| anyhow::anyhow!("Failed to insert into general table: {e}"))
80 }
81
82 pub async fn load(pool: &PgPool) -> anyhow::Result<AHashMap<String, Vec<u8>>> {
88 sqlx::query_as::<_, GeneralRow>("SELECT * FROM general")
89 .fetch_all(pool)
90 .await
91 .map(|rows| {
92 let mut cache: AHashMap<String, Vec<u8>> = AHashMap::new();
93 for row in rows {
94 cache.insert(row.id, row.value);
95 }
96 cache
97 })
98 .map_err(|e| anyhow::anyhow!("Failed to load general table: {e}"))
99 }
100
101 pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> {
107 sqlx::query(
108 "INSERT INTO currency (id, precision, iso4217, name, currency_type) VALUES ($1, $2, $3, $4, $5::currency_type) ON CONFLICT (id) DO NOTHING"
109 )
110 .bind(currency.code.as_str())
111 .bind(i32::from(currency.precision))
112 .bind(i32::from(currency.iso4217))
113 .bind(currency.name.as_str())
114 .bind(CurrencyTypeModel(currency.currency_type))
115 .execute(pool)
116 .await
117 .map(|_| ())
118 .map_err(|e| anyhow::anyhow!("Failed to insert into currency table: {e}"))
119 }
120
121 pub async fn load_currencies(pool: &PgPool) -> anyhow::Result<Vec<Currency>> {
127 sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency ORDER BY id ASC")
128 .fetch_all(pool)
129 .await
130 .map(|rows| rows.into_iter().map(|row| row.0).collect())
131 .map_err(|e| anyhow::anyhow!("Failed to load currencies: {e}"))
132 }
133
134 pub async fn load_currency(pool: &PgPool, code: &str) -> anyhow::Result<Option<Currency>> {
140 sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency WHERE id = $1")
141 .bind(code)
142 .fetch_optional(pool)
143 .await
144 .map(|currency| currency.map(|row| row.0))
145 .map_err(|e| anyhow::anyhow!("Failed to load currency: {e}"))
146 }
147
148 pub async fn add_instrument(
154 pool: &PgPool,
155 kind: &str,
156 instrument: Box<dyn Instrument>,
157 ) -> anyhow::Result<()> {
158 sqlx::query(r#"
159 INSERT INTO "instrument" (
160 id, kind, raw_symbol, base_currency, underlying, quote_currency, settlement_currency, isin, asset_class, exchange,
161 multiplier, option_kind, is_inverse, strike_price, activation_ns, expiration_ns, price_precision, size_precision,
162 price_increment, size_increment, maker_fee, taker_fee, margin_init, margin_maint, lot_size, max_quantity, min_quantity, max_notional,
163 min_notional, max_price, min_price, ts_init, ts_event, created_at, updated_at
164 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::asset_class, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
165 ON CONFLICT (id)
166 DO UPDATE
167 SET
168 kind = $2, raw_symbol = $3, base_currency= $4, underlying = $5, quote_currency = $6, settlement_currency = $7, isin = $8, asset_class = $9, exchange = $10,
169 multiplier = $11, option_kind = $12, is_inverse = $13, strike_price = $14, activation_ns = $15, expiration_ns = $16 , price_precision = $17, size_precision = $18,
170 price_increment = $19, size_increment = $20, maker_fee = $21, taker_fee = $22, margin_init = $23, margin_maint = $24, lot_size = $25, max_quantity = $26,
171 min_quantity = $27, max_notional = $28, min_notional = $29, max_price = $30, min_price = $31, ts_init = $32, ts_event = $33, updated_at = CURRENT_TIMESTAMP
172 "#)
173 .bind(instrument.id().to_string())
174 .bind(kind)
175 .bind(instrument.raw_symbol().to_string())
176 .bind(instrument.base_currency().map(|x| x.code.as_str()))
177 .bind(instrument.underlying().map(|x| x.to_string()))
178 .bind(instrument.quote_currency().code.as_str())
179 .bind(instrument.settlement_currency().code.as_str())
180 .bind(instrument.isin().map(|x| x.to_string()))
181 .bind(AssetClassModel(instrument.asset_class()))
182 .bind(instrument.exchange().map(|x| x.to_string()))
183 .bind(instrument.multiplier().to_string())
184 .bind(instrument.option_kind().map(|x| x.to_string()))
185 .bind(instrument.is_inverse())
186 .bind(instrument.strike_price().map(|x| x.to_string()))
187 .bind(instrument.activation_ns().map(|x| x.to_string()))
188 .bind(instrument.expiration_ns().map(|x| x.to_string()))
189 .bind(i32::from(instrument.price_precision()))
190 .bind(i32::from(instrument.size_precision()))
191 .bind(instrument.price_increment().to_string())
192 .bind(instrument.size_increment().to_string())
193 .bind(instrument.maker_fee().to_string())
194 .bind(instrument.taker_fee().to_string())
195 .bind(instrument.margin_init().to_string())
196 .bind(instrument.margin_maint().to_string())
197 .bind(instrument.lot_size().map(|x| x.to_string()))
198 .bind(instrument.max_quantity().map(|x| x.to_string()))
199 .bind(instrument.min_quantity().map(|x| x.to_string()))
200 .bind(instrument.max_notional().map(|x| x.to_string()))
201 .bind(instrument.min_notional().map(|x| x.to_string()))
202 .bind(instrument.max_price().map(|x| x.to_string()))
203 .bind(instrument.min_price().map(|x| x.to_string()))
204 .bind(instrument.ts_init().to_string())
205 .bind(instrument.ts_event().to_string())
206 .execute(pool)
207 .await
208 .map(|_| ())
209 .map_err(|e| anyhow::anyhow!("Failed to insert item {} into instrument table: {:?}", instrument.id(), e))
210 }
211
212 pub async fn load_instrument(
218 pool: &PgPool,
219 instrument_id: &InstrumentId,
220 ) -> anyhow::Result<Option<InstrumentAny>> {
221 sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument WHERE id = $1")
222 .bind(instrument_id.to_string())
223 .fetch_optional(pool)
224 .await
225 .map(|instrument| instrument.map(|row| row.0))
226 .map_err(|e| {
227 anyhow::anyhow!("Failed to load instrument with id {instrument_id},error is: {e}")
228 })
229 }
230
231 pub async fn load_instruments(pool: &PgPool) -> anyhow::Result<Vec<InstrumentAny>> {
237 sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument")
238 .fetch_all(pool)
239 .await
240 .map(|rows| rows.into_iter().map(|row| row.0).collect())
241 .map_err(|e| anyhow::anyhow!("Failed to load instruments: {e}"))
242 }
243
244 pub async fn add_order(
254 pool: &PgPool,
255 _kind: &str,
256 updated: bool,
257 order: Box<dyn Order>,
258 client_id: Option<ClientId>,
259 ) -> anyhow::Result<()> {
260 if updated {
261 let exists = Self::check_if_order_initialized_exists(pool, order.client_order_id())
262 .await
263 .unwrap();
264 assert!(
265 exists,
266 "OrderInitialized event does not exist for order: {}",
267 order.client_order_id()
268 );
269 }
270 match order.last_event().clone() {
271 OrderEventAny::Accepted(event) => {
272 Self::add_order_event(pool, Box::new(event), client_id).await
273 }
274 OrderEventAny::CancelRejected(event) => {
275 Self::add_order_event(pool, Box::new(event), client_id).await
276 }
277 OrderEventAny::Canceled(event) => {
278 Self::add_order_event(pool, Box::new(event), client_id).await
279 }
280 OrderEventAny::Denied(event) => {
281 Self::add_order_event(pool, Box::new(event), client_id).await
282 }
283 OrderEventAny::Emulated(event) => {
284 Self::add_order_event(pool, Box::new(event), client_id).await
285 }
286 OrderEventAny::Expired(event) => {
287 Self::add_order_event(pool, Box::new(event), client_id).await
288 }
289 OrderEventAny::Filled(event) => {
290 Self::add_order_event(pool, Box::new(event), client_id).await
291 }
292 OrderEventAny::Initialized(event) => {
293 Self::add_order_event(pool, Box::new(event), client_id).await
294 }
295 OrderEventAny::ModifyRejected(event) => {
296 Self::add_order_event(pool, Box::new(event), client_id).await
297 }
298 OrderEventAny::PendingCancel(event) => {
299 Self::add_order_event(pool, Box::new(event), client_id).await
300 }
301 OrderEventAny::PendingUpdate(event) => {
302 Self::add_order_event(pool, Box::new(event), client_id).await
303 }
304 OrderEventAny::Rejected(event) => {
305 Self::add_order_event(pool, Box::new(event), client_id).await
306 }
307 OrderEventAny::Released(event) => {
308 Self::add_order_event(pool, Box::new(event), client_id).await
309 }
310 OrderEventAny::Submitted(event) => {
311 Self::add_order_event(pool, Box::new(event), client_id).await
312 }
313 OrderEventAny::Updated(event) => {
314 Self::add_order_event(pool, Box::new(event), client_id).await
315 }
316 OrderEventAny::Triggered(event) => {
317 Self::add_order_event(pool, Box::new(event), client_id).await
318 }
319 }
320 }
321
322 pub async fn add_order_snapshot(pool: &PgPool, snapshot: OrderSnapshot) -> anyhow::Result<()> {
332 let mut transaction = pool.begin().await?;
333
334 sqlx::query(
337 r#"
338 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
339 "#,
340 )
341 .bind(snapshot.trader_id.to_string())
342 .execute(&mut *transaction)
343 .await
344 .map(|_| ())
345 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
346
347 sqlx::query(
348 r#"
349 INSERT INTO "order" (
350 id, trader_id, strategy_id, instrument_id, client_order_id, venue_order_id, position_id,
351 account_id, last_trade_id, order_type, order_side, quantity, price, trigger_price,
352 trigger_type, limit_offset, trailing_offset, trailing_offset_type, time_in_force,
353 expire_time, filled_qty, liquidity_side, avg_px, slippage, commissions, status,
354 is_post_only, is_reduce_only, is_quote_quantity, display_qty, emulation_trigger,
355 trigger_instrument_id, contingency_type, order_list_id, linked_order_ids,
356 parent_order_id, exec_algorithm_id, exec_algorithm_params, exec_spawn_id, tags, init_id, ts_init, ts_last,
357 created_at, updated_at
358 ) VALUES (
359 $1, $2, $3, $4, $1, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
360 $17::TRAILING_OFFSET_TYPE, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28,
361 $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42,
362 CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
363 )
364 ON CONFLICT (id)
365 DO UPDATE SET
366 trader_id = $2,
367 strategy_id = $3,
368 instrument_id = $4,
369 venue_order_id = $5,
370 position_id = $6,
371 account_id = $7,
372 last_trade_id = $8,
373 order_type = $9,
374 order_side = $10,
375 quantity = $11,
376 price = $12,
377 trigger_price = $13,
378 trigger_type = $14,
379 limit_offset = $15,
380 trailing_offset = $16,
381 trailing_offset_type = $17::TRAILING_OFFSET_TYPE,
382 time_in_force = $18,
383 expire_time = $19,
384 filled_qty = $20,
385 liquidity_side = $21,
386 avg_px = $22,
387 slippage = $23,
388 commissions = $24,
389 status = $25,
390 is_post_only = $26,
391 is_reduce_only = $27,
392 is_quote_quantity = $28,
393 display_qty = $29,
394 emulation_trigger = $30,
395 trigger_instrument_id = $31,
396 contingency_type = $32,
397 order_list_id = $33,
398 linked_order_ids = $34,
399 parent_order_id = $35,
400 exec_algorithm_id = $36,
401 exec_algorithm_params = $37,
402 exec_spawn_id = $38,
403 tags = $39,
404 init_id = $40,
405 ts_init = $41,
406 ts_last = $42,
407 updated_at = CURRENT_TIMESTAMP
408 "#)
409 .bind(snapshot.client_order_id.to_string()) .bind(snapshot.trader_id.to_string())
411 .bind(snapshot.strategy_id.to_string())
412 .bind(snapshot.instrument_id.to_string())
413 .bind(snapshot.venue_order_id.map(|x| x.to_string()))
414 .bind(snapshot.position_id.map(|x| x.to_string()))
415 .bind(snapshot.account_id.map(|x| x.to_string()))
416 .bind(snapshot.last_trade_id.map(|x| x.to_string()))
417 .bind(snapshot.order_type.to_string())
418 .bind(snapshot.order_side.to_string())
419 .bind(snapshot.quantity.to_string())
420 .bind(snapshot.price.map(|x| x.to_string()))
421 .bind(snapshot.trigger_price.map(|x| x.to_string()))
422 .bind(snapshot.trigger_type.map(|x| x.to_string()))
423 .bind(snapshot.limit_offset.map(|x| x.to_string()))
424 .bind(snapshot.trailing_offset.map(|x| x.to_string()))
425 .bind(snapshot.trailing_offset_type.map(|x| x.to_string()))
426 .bind(snapshot.time_in_force.to_string())
427 .bind(snapshot.expire_time.map(|x| x.to_string()))
428 .bind(snapshot.filled_qty.to_string())
429 .bind(snapshot.liquidity_side.map(|x| x.to_string()))
430 .bind(snapshot.avg_px)
431 .bind(snapshot.slippage)
432 .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
433 .bind(snapshot.status.to_string())
434 .bind(snapshot.is_post_only)
435 .bind(snapshot.is_reduce_only)
436 .bind(snapshot.is_quote_quantity)
437 .bind(snapshot.display_qty.map(|x| x.to_string()))
438 .bind(snapshot.emulation_trigger.map(|x| x.to_string()))
439 .bind(snapshot.trigger_instrument_id.map(|x| x.to_string()))
440 .bind(snapshot.contingency_type.map(|x| x.to_string()))
441 .bind(snapshot.order_list_id.map(|x| x.to_string()))
442 .bind(snapshot.linked_order_ids.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
443 .bind(snapshot.parent_order_id.map(|x| x.to_string()))
444 .bind(snapshot.exec_algorithm_id.map(|x| x.to_string()))
445 .bind(snapshot.exec_algorithm_params.map(|x| serde_json::to_value(x).unwrap()))
446 .bind(snapshot.exec_spawn_id.map(|x| x.to_string()))
447 .bind(snapshot.tags.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
448 .bind(snapshot.init_id.to_string())
449 .bind(snapshot.ts_init.to_string())
450 .bind(snapshot.ts_last.to_string())
451 .execute(&mut *transaction)
452 .await
453 .map(|_| ())
454 .map_err(|e| anyhow::anyhow!("Failed to insert into order table: {e}"))?;
455
456 transaction
457 .commit()
458 .await
459 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
460 }
461
462 pub async fn load_order_snapshot(
468 pool: &PgPool,
469 client_order_id: &ClientOrderId,
470 ) -> anyhow::Result<Option<OrderSnapshot>> {
471 sqlx::query_as::<_, OrderSnapshotModel>(
472 r#"SELECT * FROM "order" WHERE client_order_id = $1"#,
473 )
474 .bind(client_order_id.to_string())
475 .fetch_optional(pool)
476 .await
477 .map(|model| model.map(|m| m.0))
478 .map_err(|e| anyhow::anyhow!("Failed to load order snapshot: {e}"))
479 }
480
481 pub async fn add_position_snapshot(
487 pool: &PgPool,
488 snapshot: PositionSnapshot,
489 ) -> anyhow::Result<()> {
490 let mut transaction = pool.begin().await?;
491
492 sqlx::query(
495 r#"
496 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
497 "#,
498 )
499 .bind(snapshot.trader_id.to_string())
500 .execute(&mut *transaction)
501 .await
502 .map(|_| ())
503 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
504
505 sqlx::query(r#"
506 INSERT INTO "position" (
507 id, trader_id, strategy_id, instrument_id, account_id, opening_order_id, closing_order_id, entry, side, signed_qty, quantity, peak_qty,
508 quote_currency, base_currency, settlement_currency, avg_px_open, avg_px_close, realized_return, realized_pnl, unrealized_pnl, commissions,
509 duration_ns, ts_opened, ts_closed, ts_init, ts_last, created_at, updated_at
510 ) VALUES (
511 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
512 $21, $22, $23, $24, $25, $26, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
513 )
514 ON CONFLICT (id)
515 DO UPDATE
516 SET
517 trader_id = $2, strategy_id = $3, instrument_id = $4, account_id = $5, opening_order_id = $6, closing_order_id = $7, entry = $8, side = $9, signed_qty = $10, quantity = $11,
518 peak_qty = $12, quote_currency = $13, base_currency = $14, settlement_currency = $15, avg_px_open = $16, avg_px_close = $17, realized_return = $18, realized_pnl = $19, unrealized_pnl = $20,
519 commissions = $21, duration_ns = $22, ts_opened = $23, ts_closed = $24, ts_init = $25, ts_last = $26, updated_at = CURRENT_TIMESTAMP
520 "#)
521 .bind(snapshot.position_id.to_string())
522 .bind(snapshot.trader_id.to_string())
523 .bind(snapshot.strategy_id.to_string())
524 .bind(snapshot.instrument_id.to_string())
525 .bind(snapshot.account_id.to_string())
526 .bind(snapshot.opening_order_id.to_string())
527 .bind(snapshot.closing_order_id.map(|x| x.to_string()))
528 .bind(snapshot.entry.to_string())
529 .bind(snapshot.side.to_string())
530 .bind(snapshot.signed_qty)
531 .bind(snapshot.quantity.to_string())
532 .bind(snapshot.peak_qty.to_string())
533 .bind(snapshot.quote_currency.to_string())
534 .bind(snapshot.base_currency.map(|x| x.to_string()))
535 .bind(snapshot.settlement_currency.to_string())
536 .bind(snapshot.avg_px_open)
537 .bind(snapshot.avg_px_close)
538 .bind(snapshot.realized_return)
539 .bind(snapshot.realized_pnl.map(|x| x.to_string()))
540 .bind(snapshot.unrealized_pnl.map(|x| x.to_string()))
541 .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
542 .bind(snapshot.duration_ns.map(|x| x.to_string()))
543 .bind(snapshot.ts_opened.to_string())
544 .bind(snapshot.ts_closed.map(|x| x.to_string()))
545 .bind(snapshot.ts_init.to_string())
546 .bind(snapshot.ts_last.to_string())
547 .execute(&mut *transaction)
548 .await
549 .map(|_| ())
550 .map_err(|e| anyhow::anyhow!("Failed to insert into position table: {e}"))?;
551 transaction
552 .commit()
553 .await
554 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
555 }
556
557 pub async fn load_position_snapshot(
563 pool: &PgPool,
564 position_id: &PositionId,
565 ) -> anyhow::Result<Option<PositionSnapshot>> {
566 sqlx::query_as::<_, PositionSnapshotModel>(r#"SELECT * FROM "position" WHERE id = $1"#)
567 .bind(position_id.to_string())
568 .fetch_optional(pool)
569 .await
570 .map(|model| model.map(|m| m.0))
571 .map_err(|e| anyhow::anyhow!("Failed to load position snapshot: {e}"))
572 }
573
574 pub async fn check_if_order_initialized_exists(
580 pool: &PgPool,
581 client_order_id: ClientOrderId,
582 ) -> anyhow::Result<bool> {
583 sqlx::query(r#"
584 SELECT EXISTS(SELECT 1 FROM "order_event" WHERE client_order_id = $1 AND kind = 'OrderInitialized')
585 "#)
586 .bind(client_order_id.to_string())
587 .fetch_one(pool)
588 .await
589 .map(|row| row.get(0))
590 .map_err(|e| anyhow::anyhow!("Failed to check if order initialized exists: {e}"))
591 }
592
593 pub async fn check_if_account_event_exists(
599 pool: &PgPool,
600 account_id: AccountId,
601 ) -> anyhow::Result<bool> {
602 sqlx::query(
603 r#"
604 SELECT EXISTS(SELECT 1 FROM "account_event" WHERE account_id = $1)
605 "#,
606 )
607 .bind(account_id.to_string())
608 .fetch_one(pool)
609 .await
610 .map(|row| row.get(0))
611 .map_err(|e| anyhow::anyhow!("Failed to check if account event exists: {e}"))
612 }
613
614 pub async fn add_order_event(
620 pool: &PgPool,
621 order_event: Box<dyn OrderEvent>,
622 client_id: Option<ClientId>,
623 ) -> anyhow::Result<()> {
624 let mut transaction = pool.begin().await?;
625
626 sqlx::query(
629 r#"
630 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
631 "#,
632 )
633 .bind(order_event.trader_id().to_string())
634 .execute(&mut *transaction)
635 .await
636 .map(|_| ())
637 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
638
639 if let Some(client_id) = client_id {
642 sqlx::query(
643 r#"
644 INSERT INTO "client" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
645 "#,
646 )
647 .bind(client_id.to_string())
648 .execute(&mut *transaction)
649 .await
650 .map(|_| ())
651 .map_err(|e| anyhow::anyhow!("Failed to insert into client table: {e}"))?;
652 }
653
654 sqlx::query(r#"
655 INSERT INTO "order_event" (
656 id, kind, client_order_id, order_type, order_side, trader_id, client_id, reason, strategy_id, instrument_id, trade_id, currency, quantity, time_in_force, liquidity_side,
657 post_only, reduce_only, quote_quantity, reconciliation, price, last_px, last_qty, trigger_price, trigger_type, limit_offset, trailing_offset,
658 trailing_offset_type, expire_time, display_qty, emulation_trigger, trigger_instrument_id, contingency_type,
659 order_list_id, linked_order_ids, parent_order_id,
660 exec_algorithm_id, exec_spawn_id, venue_order_id, account_id, position_id, commission, ts_event, ts_init, created_at, updated_at
661 ) VALUES (
662 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
663 $21, $22, $23, $24, $25, $26::trailing_offset_type, $27, $28, $29, $30, $31, $32, $33, $34,
664 $35, $36, $37, $38, $39, $40, $41, $42, $43, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
665 )
666 ON CONFLICT (id)
667 DO UPDATE
668 SET
669 kind = $2, client_order_id = $3, order_type = $4, order_side=$5, trader_id = $6, client_id = $7, reason = $8, strategy_id = $9, instrument_id = $10, trade_id = $11, currency = $12,
670 quantity = $13, time_in_force = $14, liquidity_side = $15, post_only = $16, reduce_only = $17, quote_quantity = $18, reconciliation = $19, price = $20, last_px = $21,
671 last_qty = $22, trigger_price = $23, trigger_type = $24, limit_offset = $25, trailing_offset = $26, trailing_offset_type = $27, expire_time = $28, display_qty = $29,
672 emulation_trigger = $30, trigger_instrument_id = $31, contingency_type = $32, order_list_id = $33, linked_order_ids = $34, parent_order_id = $35, exec_algorithm_id = $36,
673 exec_spawn_id = $37, venue_order_id = $38, account_id = $39, position_id = $40, commission = $41, ts_event = $42, ts_init = $43, updated_at = CURRENT_TIMESTAMP
674
675 "#)
676 .bind(order_event.id().to_string())
677 .bind(order_event.kind())
678 .bind(order_event.client_order_id().to_string())
679 .bind(order_event.order_type().map(|x| x.to_string()))
680 .bind(order_event.order_side().map(|x| x.to_string()))
681 .bind(order_event.trader_id().to_string())
682 .bind(client_id.map(|x| x.to_string()))
683 .bind(order_event.reason().map(|x| x.to_string()))
684 .bind(order_event.strategy_id().to_string())
685 .bind(order_event.instrument_id().to_string())
686 .bind(order_event.trade_id().map(|x| x.to_string()))
687 .bind(order_event.currency().map(|x| x.code.as_str()))
688 .bind(order_event.quantity().map(|x| x.to_string()))
689 .bind(order_event.time_in_force().map(|x| x.to_string()))
690 .bind(order_event.liquidity_side().map(|x| x.to_string()))
691 .bind(order_event.post_only())
692 .bind(order_event.reduce_only())
693 .bind(order_event.quote_quantity())
694 .bind(order_event.reconciliation())
695 .bind(order_event.price().map(|x| x.to_string()))
696 .bind(order_event.last_px().map(|x| x.to_string()))
697 .bind(order_event.last_qty().map(|x| x.to_string()))
698 .bind(order_event.trigger_price().map(|x| x.to_string()))
699 .bind(order_event.trigger_type().map(|x| x.to_string()))
700 .bind(order_event.limit_offset().map(|x| x.to_string()))
701 .bind(order_event.trailing_offset().map(|x| x.to_string()))
702 .bind(order_event.trailing_offset_type().map(TrailingOffsetTypeModel))
703 .bind(order_event.expire_time().map(|x| x.to_string()))
704 .bind(order_event.display_qty().map(|x| x.to_string()))
705 .bind(order_event.emulation_trigger().map(|x| x.to_string()))
706 .bind(order_event.trigger_instrument_id().map(|x| x.to_string()))
707 .bind(order_event.contingency_type().map(|x| x.to_string()))
708 .bind(order_event.order_list_id().map(|x| x.to_string()))
709 .bind(order_event.linked_order_ids().map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
710 .bind(order_event.parent_order_id().map(|x| x.to_string()))
711 .bind(order_event.exec_algorithm_id().map(|x| x.to_string()))
712 .bind(order_event.exec_spawn_id().map(|x| x.to_string()))
713 .bind(order_event.venue_order_id().map(|x| x.to_string()))
714 .bind(order_event.account_id().map(|x| x.to_string()))
715 .bind(order_event.position_id().map(|x| x.to_string()))
716 .bind(order_event.commission().map(|x| x.to_string()))
717 .bind(order_event.ts_event().to_string())
718 .bind(order_event.ts_init().to_string())
719 .execute(&mut *transaction)
720 .await
721 .map(|_| ())
722 .map_err(|e| anyhow::anyhow!("Failed to insert into order_event table: {e}"))?;
723 transaction
724 .commit()
725 .await
726 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
727 }
728
729 pub async fn load_order_events(
735 pool: &PgPool,
736 client_order_id: &ClientOrderId,
737 ) -> anyhow::Result<Vec<OrderEventAny>> {
738 sqlx::query_as::<_, OrderEventAnyModel>(r#"SELECT * FROM "order_event" event WHERE event.client_order_id = $1 ORDER BY created_at ASC"#)
739 .bind(client_order_id.to_string())
740 .fetch_all(pool)
741 .await
742 .map(|rows| rows.into_iter().map(|row| row.0).collect())
743 .map_err(|e| anyhow::anyhow!("Failed to load order events: {e}"))
744 }
745
746 pub async fn load_order(
756 pool: &PgPool,
757 client_order_id: &ClientOrderId,
758 ) -> anyhow::Result<Option<OrderAny>> {
759 let order_events = Self::load_order_events(pool, client_order_id).await;
760
761 match order_events {
762 Ok(order_events) => {
763 if order_events.is_empty() {
764 return Ok(None);
765 }
766 let order = OrderAny::from_events(order_events).unwrap();
767 Ok(Some(order))
768 }
769 Err(e) => anyhow::bail!("Failed to load order events: {e}"),
770 }
771 }
772
773 pub async fn load_orders(pool: &PgPool) -> anyhow::Result<Vec<OrderAny>> {
783 let mut orders: Vec<OrderAny> = Vec::new();
784 let client_order_ids: Vec<ClientOrderId> = sqlx::query(
785 r#"
786 SELECT DISTINCT client_order_id FROM "order_event"
787 "#,
788 )
789 .fetch_all(pool)
790 .await
791 .map(|rows| {
792 rows.into_iter()
793 .map(|row| ClientOrderId::from(row.get::<&str, _>(0)))
794 .collect()
795 })
796 .map_err(|e| anyhow::anyhow!("Failed to load order ids: {e}"))?;
797 for id in client_order_ids {
798 let order = Self::load_order(pool, &id).await.unwrap();
799 match order {
800 Some(order) => {
801 orders.push(order);
802 }
803 None => {
804 continue;
805 }
806 }
807 }
808 Ok(orders)
809 }
810
811 pub async fn add_account(
821 pool: &PgPool,
822 kind: &str,
823 updated: bool,
824 account: Box<dyn Account>,
825 ) -> anyhow::Result<()> {
826 if updated {
827 let exists = Self::check_if_account_event_exists(pool, account.id())
828 .await
829 .unwrap();
830 assert!(
831 exists,
832 "Account event does not exist for account: {}",
833 account.id()
834 );
835 }
836
837 let mut transaction = pool.begin().await?;
838
839 sqlx::query(
840 r#"
841 INSERT INTO "account" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
842 "#,
843 )
844 .bind(account.id().to_string())
845 .execute(&mut *transaction)
846 .await
847 .map(|_| ())
848 .map_err(|e| anyhow::anyhow!("Failed to insert into account table: {e}"))?;
849
850 let account_event = account.last_event().unwrap();
851 sqlx::query(r#"
852 INSERT INTO "account_event" (
853 id, kind, account_id, base_currency, balances, margins, is_reported, ts_event, ts_init, created_at, updated_at
854 ) VALUES (
855 $1, $2, $3, $4, $5, $6, $7, $8, $9, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
856 )
857 ON CONFLICT (id)
858 DO UPDATE
859 SET
860 kind = $2, account_id = $3, base_currency = $4, balances = $5, margins = $6, is_reported = $7,
861 ts_event = $8, ts_init = $9, updated_at = CURRENT_TIMESTAMP
862 "#)
863 .bind(account_event.event_id.to_string())
864 .bind(kind.to_string())
865 .bind(account_event.account_id.to_string())
866 .bind(account_event.base_currency.map(|x| x.code.as_str()))
867 .bind(serde_json::to_value::<Vec<AccountBalance>>(account_event.balances).unwrap())
868 .bind(serde_json::to_value::<Vec<MarginBalance>>(account_event.margins).unwrap())
869 .bind(account_event.is_reported)
870 .bind(account_event.ts_event.to_string())
871 .bind(account_event.ts_init.to_string())
872 .execute(&mut *transaction)
873 .await
874 .map(|_| ())
875 .map_err(|e| anyhow::anyhow!("Failed to insert into account_event table: {e}"))?;
876 transaction
877 .commit()
878 .await
879 .map_err(|e| anyhow::anyhow!("Failed to commit add_account transaction: {e}"))
880 }
881
882 pub async fn load_account_events(
888 pool: &PgPool,
889 account_id: &AccountId,
890 ) -> anyhow::Result<Vec<AccountState>> {
891 sqlx::query_as::<_, AccountEventModel>(
892 r#"SELECT * FROM "account_event" WHERE account_id = $1 ORDER BY created_at ASC"#,
893 )
894 .bind(account_id.to_string())
895 .fetch_all(pool)
896 .await
897 .map(|rows| rows.into_iter().map(|row| row.0).collect())
898 .map_err(|e| anyhow::anyhow!("Failed to load account events: {e}"))
899 }
900
901 pub async fn load_account(
911 pool: &PgPool,
912 account_id: &AccountId,
913 ) -> anyhow::Result<Option<AccountAny>> {
914 let account_events = Self::load_account_events(pool, account_id).await;
915 match account_events {
916 Ok(account_events) => {
917 if account_events.is_empty() {
918 return Ok(None);
919 }
920 let account = AccountAny::from_events(account_events).unwrap();
921 Ok(Some(account))
922 }
923 Err(e) => anyhow::bail!("Failed to load account events: {e}"),
924 }
925 }
926
927 pub async fn load_accounts(pool: &PgPool) -> anyhow::Result<Vec<AccountAny>> {
937 let mut accounts: Vec<AccountAny> = Vec::new();
938 let account_ids: Vec<AccountId> = sqlx::query(
939 r#"
940 SELECT DISTINCT account_id FROM "account_event"
941 "#,
942 )
943 .fetch_all(pool)
944 .await
945 .map(|rows| {
946 rows.into_iter()
947 .map(|row| AccountId::from(row.get::<&str, _>(0)))
948 .collect()
949 })
950 .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
951 for id in account_ids {
952 let account = Self::load_account(pool, &id).await.unwrap();
953 match account {
954 Some(account) => {
955 accounts.push(account);
956 }
957 None => {
958 continue;
959 }
960 }
961 }
962 Ok(accounts)
963 }
964
965 pub async fn add_trade(pool: &PgPool, trade: &TradeTick) -> anyhow::Result<()> {
971 sqlx::query(r#"
972 INSERT INTO "trade" (
973 instrument_id, price, quantity, aggressor_side, venue_trade_id,
974 ts_event, ts_init, created_at, updated_at
975 ) VALUES (
976 $1, $2, $3, $4::aggressor_side, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
977 )
978 ON CONFLICT (id)
979 DO UPDATE
980 SET
981 instrument_id = $1, price = $2, quantity = $3, aggressor_side = $4, venue_trade_id = $5,
982 ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
983 "#)
984 .bind(trade.instrument_id.to_string())
985 .bind(trade.price.to_string())
986 .bind(trade.size.to_string())
987 .bind(AggressorSideModel(trade.aggressor_side))
988 .bind(trade.trade_id.to_string())
989 .bind(trade.ts_event.to_string())
990 .bind(trade.ts_init.to_string())
991 .execute(pool)
992 .await
993 .map(|_| ())
994 .map_err(|e| anyhow::anyhow!("Failed to insert into trade table: {e}"))
995 }
996
997 pub async fn load_trades(
1003 pool: &PgPool,
1004 instrument_id: &InstrumentId,
1005 ) -> anyhow::Result<Vec<TradeTick>> {
1006 sqlx::query_as::<_, TradeTickModel>(
1007 r#"SELECT * FROM "trade" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1008 )
1009 .bind(instrument_id.to_string())
1010 .fetch_all(pool)
1011 .await
1012 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1013 .map_err(|e| anyhow::anyhow!("Failed to load trades: {e}"))
1014 }
1015
1016 pub async fn add_quote(pool: &PgPool, quote: &QuoteTick) -> anyhow::Result<()> {
1022 sqlx::query(r#"
1023 INSERT INTO "quote" (
1024 instrument_id, bid_price, ask_price, bid_size, ask_size, ts_event, ts_init, created_at, updated_at
1025 ) VALUES (
1026 $1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1027 )
1028 ON CONFLICT (id)
1029 DO UPDATE
1030 SET
1031 instrument_id = $1, bid_price = $2, ask_price = $3, bid_size = $4, ask_size = $5,
1032 ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
1033 "#)
1034 .bind(quote.instrument_id.to_string())
1035 .bind(quote.bid_price.to_string())
1036 .bind(quote.ask_price.to_string())
1037 .bind(quote.bid_size.to_string())
1038 .bind(quote.ask_size.to_string())
1039 .bind(quote.ts_event.to_string())
1040 .bind(quote.ts_init.to_string())
1041 .execute(pool)
1042 .await
1043 .map(|_| ())
1044 .map_err(|e| anyhow::anyhow!("Failed to insert into quote table: {e}"))
1045 }
1046
1047 pub async fn load_quotes(
1053 pool: &PgPool,
1054 instrument_id: &InstrumentId,
1055 ) -> anyhow::Result<Vec<QuoteTick>> {
1056 sqlx::query_as::<_, QuoteTickModel>(
1057 r#"SELECT * FROM "quote" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1058 )
1059 .bind(instrument_id.to_string())
1060 .fetch_all(pool)
1061 .await
1062 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1063 .map_err(|e| anyhow::anyhow!("Failed to load quotes: {e}"))
1064 }
1065
1066 pub async fn add_bar(pool: &PgPool, bar: &Bar) -> anyhow::Result<()> {
1072 println!("Adding bar: {bar:?}");
1073 sqlx::query(r#"
1074 INSERT INTO "bar" (
1075 instrument_id, step, bar_aggregation, price_type, aggregation_source, open, high, low, close, volume, ts_event, ts_init, created_at, updated_at
1076 ) VALUES (
1077 $1, $2, $3::bar_aggregation, $4::price_type, $5::aggregation_source, $6, $7, $8, $9, $10, $11, $12, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1078 )
1079 ON CONFLICT (id)
1080 DO UPDATE
1081 SET
1082 instrument_id = $1, step = $2, bar_aggregation = $3::bar_aggregation, price_type = $4::price_type, aggregation_source = $5::aggregation_source,
1083 open = $6, high = $7, low = $8, close = $9, volume = $10, ts_event = $11, ts_init = $12, updated_at = CURRENT_TIMESTAMP
1084 "#)
1085 .bind(bar.bar_type.instrument_id().to_string())
1086 .bind(bar.bar_type.spec().step.get() as i32)
1087 .bind(BarAggregationModel(bar.bar_type.spec().aggregation))
1088 .bind(PriceTypeModel(bar.bar_type.spec().price_type))
1089 .bind(AggregationSourceModel(bar.bar_type.aggregation_source()))
1090 .bind(bar.open.to_string())
1091 .bind(bar.high.to_string())
1092 .bind(bar.low.to_string())
1093 .bind(bar.close.to_string())
1094 .bind(bar.volume.to_string())
1095 .bind(bar.ts_event.to_string())
1096 .bind(bar.ts_init.to_string())
1097 .execute(pool)
1098 .await
1099 .map(|_| ())
1100 .map_err(|e| anyhow::anyhow!("Failed to insert into bar table: {e}"))
1101 }
1102
1103 pub async fn load_bars(
1109 pool: &PgPool,
1110 instrument_id: &InstrumentId,
1111 ) -> anyhow::Result<Vec<Bar>> {
1112 sqlx::query_as::<_, BarModel>(
1113 r#"SELECT * FROM "bar" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1114 )
1115 .bind(instrument_id.to_string())
1116 .fetch_all(pool)
1117 .await
1118 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1119 .map_err(|e| anyhow::anyhow!("Failed to load bars: {e}"))
1120 }
1121
1122 pub async fn load_distinct_order_event_client_ids(
1128 pool: &PgPool,
1129 ) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
1130 let mut map: AHashMap<ClientOrderId, ClientId> = AHashMap::new();
1131 let result = sqlx::query_as::<_, OrderEventOrderClientIdCombination>(
1132 r#"
1133 SELECT DISTINCT
1134 client_order_id AS "client_order_id",
1135 client_id AS "client_id"
1136 FROM "order_event"
1137 "#,
1138 )
1139 .fetch_all(pool)
1140 .await
1141 .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
1142 for id in result {
1143 map.insert(id.client_order_id, id.client_id);
1144 }
1145 Ok(map)
1146 }
1147
1148 pub async fn add_signal(pool: &PgPool, signal: &Signal) -> anyhow::Result<()> {
1154 sqlx::query(
1155 r#"
1156 INSERT INTO "signal" (
1157 name, value, ts_event, ts_init, created_at, updated_at
1158 ) VALUES (
1159 $1, $2, $3, $4, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1160 )
1161 ON CONFLICT (id)
1162 DO UPDATE
1163 SET
1164 name = $1, value = $2, ts_event = $3, ts_init = $4,
1165 updated_at = CURRENT_TIMESTAMP
1166 "#,
1167 )
1168 .bind(signal.name.to_string())
1169 .bind(signal.value.clone())
1170 .bind(signal.ts_event.to_string())
1171 .bind(signal.ts_init.to_string())
1172 .execute(pool)
1173 .await
1174 .map(|_| ())
1175 .map_err(|e| anyhow::anyhow!("Failed to insert into signal table: {e}"))
1176 }
1177
1178 pub async fn load_signals(pool: &PgPool, name: &str) -> anyhow::Result<Vec<Signal>> {
1184 sqlx::query_as::<_, SignalModel>(
1185 r#"SELECT * FROM "signal" WHERE name = $1 ORDER BY ts_init ASC"#,
1186 )
1187 .bind(name)
1188 .fetch_all(pool)
1189 .await
1190 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1191 .map_err(|e| anyhow::anyhow!("Failed to load signals: {e}"))
1192 }
1193
1194 pub async fn add_custom_data(pool: &PgPool, data: &CustomData) -> anyhow::Result<()> {
1200 sqlx::query(
1201 r#"
1202 INSERT INTO "custom" (
1203 data_type, metadata, value, ts_event, ts_init, created_at, updated_at
1204 ) VALUES (
1205 $1, $2, $3, $4, $5, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1206 )
1207 ON CONFLICT (id)
1208 DO UPDATE
1209 SET
1210 data_type = $1, metadata = $2, value = $3, ts_event = $4, ts_init = $5,
1211 updated_at = CURRENT_TIMESTAMP
1212 "#,
1213 )
1214 .bind(data.data_type.type_name().to_string())
1215 .bind(
1216 data.data_type
1217 .metadata()
1218 .as_ref()
1219 .map_or_else(|| Ok(serde_json::Value::Null), serde_json::to_value)?,
1220 )
1221 .bind(data.value.to_vec())
1222 .bind(data.ts_event.to_string())
1223 .bind(data.ts_init.to_string())
1224 .execute(pool)
1225 .await
1226 .map(|_| ())
1227 .map_err(|e| anyhow::anyhow!("Failed to insert into custom table: {e}"))
1228 }
1229
1230 pub async fn load_custom_data(
1236 pool: &PgPool,
1237 data_type: &DataType,
1238 ) -> anyhow::Result<Vec<CustomData>> {
1239 let metadata_json = data_type
1241 .metadata()
1242 .as_ref()
1243 .map_or(Ok(serde_json::Value::Null), |metadata| {
1244 serde_json::to_value(metadata)
1245 })?;
1246
1247 sqlx::query_as::<_, CustomDataModel>(
1248 r#"SELECT * FROM "custom" WHERE data_type = $1 AND metadata = $2 ORDER BY ts_init ASC"#,
1249 )
1250 .bind(data_type.type_name())
1251 .bind(metadata_json)
1252 .fetch_all(pool)
1253 .await
1254 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1255 .map_err(|e| anyhow::anyhow!("Failed to load custom data: {e}"))
1256 }
1257}