1use std::collections::HashMap;
17
18use nautilus_common::{custom::CustomData, signal::Signal};
19use nautilus_model::{
20 accounts::{any::AccountAny, base::Account},
21 data::{Bar, DataType, QuoteTick, TradeTick},
22 events::{
23 position::snapshot::PositionSnapshot, AccountState, OrderEvent, OrderEventAny,
24 OrderSnapshot,
25 },
26 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, PositionId},
27 instruments::{Instrument, InstrumentAny},
28 orders::{Order, OrderAny},
29 types::{AccountBalance, Currency, MarginBalance},
30};
31use sqlx::{PgPool, Row};
32
33use super::models::{
34 orders::OrderSnapshotModel,
35 positions::PositionSnapshotModel,
36 types::{CustomDataModel, SignalModel},
37};
38use crate::sql::models::{
39 accounts::AccountEventModel,
40 data::{BarModel, QuoteTickModel, TradeTickModel},
41 enums::{
42 AggregationSourceModel, AggressorSideModel, AssetClassModel, BarAggregationModel,
43 CurrencyTypeModel, PriceTypeModel, TrailingOffsetTypeModel,
44 },
45 general::{GeneralRow, OrderEventOrderClientIdCombination},
46 instruments::InstrumentAnyModel,
47 orders::OrderEventAnyModel,
48 types::CurrencyModel,
49};
50
51#[derive(Debug)]
52pub struct DatabaseQueries;
53
54impl DatabaseQueries {
55 pub async fn truncate(pool: &PgPool) -> anyhow::Result<()> {
56 sqlx::query("SELECT truncate_all_tables()")
57 .execute(pool)
58 .await
59 .map(|_| ())
60 .map_err(|e| anyhow::anyhow!("Failed to truncate tables: {e}"))
61 }
62
63 pub async fn add(pool: &PgPool, key: String, value: Vec<u8>) -> anyhow::Result<()> {
64 sqlx::query("INSERT INTO general (id, value) VALUES ($1, $2)")
65 .bind(key)
66 .bind(value)
67 .execute(pool)
68 .await
69 .map(|_| ())
70 .map_err(|e| anyhow::anyhow!("Failed to insert into general table: {e}"))
71 }
72
73 pub async fn load(pool: &PgPool) -> anyhow::Result<HashMap<String, Vec<u8>>> {
74 sqlx::query_as::<_, GeneralRow>("SELECT * FROM general")
75 .fetch_all(pool)
76 .await
77 .map(|rows| {
78 let mut cache: HashMap<String, Vec<u8>> = HashMap::new();
79 for row in rows {
80 cache.insert(row.id, row.value);
81 }
82 cache
83 })
84 .map_err(|e| anyhow::anyhow!("Failed to load general table: {e}"))
85 }
86
87 pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> {
88 sqlx::query(
89 "INSERT INTO currency (id, precision, iso4217, name, currency_type) VALUES ($1, $2, $3, $4, $5::currency_type) ON CONFLICT (id) DO NOTHING"
90 )
91 .bind(currency.code.as_str())
92 .bind(currency.precision as i32)
93 .bind(currency.iso4217 as i32)
94 .bind(currency.name.as_str())
95 .bind(CurrencyTypeModel(currency.currency_type))
96 .execute(pool)
97 .await
98 .map(|_| ())
99 .map_err(|e| anyhow::anyhow!("Failed to insert into currency table: {e}"))
100 }
101
102 pub async fn load_currencies(pool: &PgPool) -> anyhow::Result<Vec<Currency>> {
103 sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency ORDER BY id ASC")
104 .fetch_all(pool)
105 .await
106 .map(|rows| rows.into_iter().map(|row| row.0).collect())
107 .map_err(|e| anyhow::anyhow!("Failed to load currencies: {e}"))
108 }
109
110 pub async fn load_currency(pool: &PgPool, code: &str) -> anyhow::Result<Option<Currency>> {
111 sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency WHERE id = $1")
112 .bind(code)
113 .fetch_optional(pool)
114 .await
115 .map(|currency| currency.map(|row| row.0))
116 .map_err(|e| anyhow::anyhow!("Failed to load currency: {e}"))
117 }
118
119 pub async fn add_instrument(
120 pool: &PgPool,
121 kind: &str,
122 instrument: Box<dyn Instrument>,
123 ) -> anyhow::Result<()> {
124 sqlx::query(r#"
125 INSERT INTO "instrument" (
126 id, kind, raw_symbol, base_currency, underlying, quote_currency, settlement_currency, isin, asset_class, exchange,
127 multiplier, option_kind, is_inverse, strike_price, activation_ns, expiration_ns, price_precision, size_precision,
128 price_increment, size_increment, maker_fee, taker_fee, margin_init, margin_maint, lot_size, max_quantity, min_quantity, max_notional,
129 min_notional, max_price, min_price, ts_init, ts_event, created_at, updated_at
130 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::asset_class, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
131 ON CONFLICT (id)
132 DO UPDATE
133 SET
134 kind = $2, raw_symbol = $3, base_currency= $4, underlying = $5, quote_currency = $6, settlement_currency = $7, isin = $8, asset_class = $9, exchange = $10,
135 multiplier = $11, option_kind = $12, is_inverse = $13, strike_price = $14, activation_ns = $15, expiration_ns = $16 , price_precision = $17, size_precision = $18,
136 price_increment = $19, size_increment = $20, maker_fee = $21, taker_fee = $22, margin_init = $23, margin_maint = $24, lot_size = $25, max_quantity = $26,
137 min_quantity = $27, max_notional = $28, min_notional = $29, max_price = $30, min_price = $31, ts_init = $32, ts_event = $33, updated_at = CURRENT_TIMESTAMP
138 "#)
139 .bind(instrument.id().to_string())
140 .bind(kind)
141 .bind(instrument.raw_symbol().to_string())
142 .bind(instrument.base_currency().map(|x| x.code.as_str()))
143 .bind(instrument.underlying().map(|x| x.to_string()))
144 .bind(instrument.quote_currency().code.as_str())
145 .bind(instrument.settlement_currency().code.as_str())
146 .bind(instrument.isin().map(|x| x.to_string()))
147 .bind(AssetClassModel(instrument.asset_class()))
148 .bind(instrument.exchange().map(|x| x.to_string()))
149 .bind(instrument.multiplier().to_string())
150 .bind(instrument.option_kind().map(|x| x.to_string()))
151 .bind(instrument.is_inverse())
152 .bind(instrument.strike_price().map(|x| x.to_string()))
153 .bind(instrument.activation_ns().map(|x| x.to_string()))
154 .bind(instrument.expiration_ns().map(|x| x.to_string()))
155 .bind(instrument.price_precision() as i32)
156 .bind(instrument.size_precision() as i32)
157 .bind(instrument.price_increment().to_string())
158 .bind(instrument.size_increment().to_string())
159 .bind(instrument.maker_fee().to_string())
160 .bind(instrument.taker_fee().to_string())
161 .bind(instrument.margin_init().to_string())
162 .bind(instrument.margin_maint().to_string())
163 .bind(instrument.lot_size().map(|x| x.to_string()))
164 .bind(instrument.max_quantity().map(|x| x.to_string()))
165 .bind(instrument.min_quantity().map(|x| x.to_string()))
166 .bind(instrument.max_notional().map(|x| x.to_string()))
167 .bind(instrument.min_notional().map(|x| x.to_string()))
168 .bind(instrument.max_price().map(|x| x.to_string()))
169 .bind(instrument.min_price().map(|x| x.to_string()))
170 .bind(instrument.ts_init().to_string())
171 .bind(instrument.ts_event().to_string())
172 .execute(pool)
173 .await
174 .map(|_| ())
175 .map_err(|e| anyhow::anyhow!(format!("Failed to insert item {} into instrument table: {:?}", instrument.id().to_string(), e)))
176 }
177
178 pub async fn load_instrument(
179 pool: &PgPool,
180 instrument_id: &InstrumentId,
181 ) -> anyhow::Result<Option<InstrumentAny>> {
182 sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument WHERE id = $1")
183 .bind(instrument_id.to_string())
184 .fetch_optional(pool)
185 .await
186 .map(|instrument| instrument.map(|row| row.0))
187 .map_err(|e| {
188 anyhow::anyhow!("Failed to load instrument with id {instrument_id},error is: {e}")
189 })
190 }
191
192 pub async fn load_instruments(pool: &PgPool) -> anyhow::Result<Vec<InstrumentAny>> {
193 sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument")
194 .fetch_all(pool)
195 .await
196 .map(|rows| rows.into_iter().map(|row| row.0).collect())
197 .map_err(|e| anyhow::anyhow!("Failed to load instruments: {e}"))
198 }
199
200 pub async fn add_order(
201 pool: &PgPool,
202 _kind: &str,
203 updated: bool,
204 order: Box<dyn Order>,
205 client_id: Option<ClientId>,
206 ) -> anyhow::Result<()> {
207 if updated {
208 let exists =
209 DatabaseQueries::check_if_order_initialized_exists(pool, order.client_order_id())
210 .await
211 .unwrap();
212 if !exists {
213 panic!(
214 "OrderInitialized event does not exist for order: {}",
215 order.client_order_id()
216 );
217 }
218 }
219 match order.last_event().clone() {
220 OrderEventAny::Accepted(event) => {
221 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
222 }
223 OrderEventAny::CancelRejected(event) => {
224 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
225 }
226 OrderEventAny::Canceled(event) => {
227 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
228 }
229 OrderEventAny::Denied(event) => {
230 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
231 }
232 OrderEventAny::Emulated(event) => {
233 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
234 }
235 OrderEventAny::Expired(event) => {
236 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
237 }
238 OrderEventAny::Filled(event) => {
239 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
240 }
241 OrderEventAny::Initialized(event) => {
242 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
243 }
244 OrderEventAny::ModifyRejected(event) => {
245 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
246 }
247 OrderEventAny::PendingCancel(event) => {
248 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
249 }
250 OrderEventAny::PendingUpdate(event) => {
251 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
252 }
253 OrderEventAny::Rejected(event) => {
254 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
255 }
256 OrderEventAny::Released(event) => {
257 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
258 }
259 OrderEventAny::Submitted(event) => {
260 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
261 }
262 OrderEventAny::Updated(event) => {
263 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
264 }
265 OrderEventAny::Triggered(event) => {
266 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
267 }
268 OrderEventAny::PartiallyFilled(event) => {
269 DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
270 }
271 }
272 }
273
274 pub async fn add_order_snapshot(pool: &PgPool, snapshot: OrderSnapshot) -> anyhow::Result<()> {
275 let mut transaction = pool.begin().await?;
276
277 sqlx::query(
280 r#"
281 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
282 "#,
283 )
284 .bind(snapshot.trader_id.to_string())
285 .execute(&mut *transaction)
286 .await
287 .map(|_| ())
288 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
289
290 sqlx::query(
291 r#"
292 INSERT INTO "order" (
293 id, trader_id, strategy_id, instrument_id, client_order_id, venue_order_id, position_id,
294 account_id, last_trade_id, order_type, order_side, quantity, price, trigger_price,
295 trigger_type, limit_offset, trailing_offset, trailing_offset_type, time_in_force,
296 expire_time, filled_qty, liquidity_side, avg_px, slippage, commissions, status,
297 is_post_only, is_reduce_only, is_quote_quantity, display_qty, emulation_trigger,
298 trigger_instrument_id, contingency_type, order_list_id, linked_order_ids,
299 parent_order_id, exec_algorithm_id, exec_algorithm_params, exec_spawn_id, tags, init_id, ts_init, ts_last,
300 created_at, updated_at
301 ) VALUES (
302 $1, $2, $3, $4, $1, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
303 $17::TRAILING_OFFSET_TYPE, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28,
304 $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42,
305 CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
306 )
307 ON CONFLICT (id)
308 DO UPDATE SET
309 trader_id = $2,
310 strategy_id = $3,
311 instrument_id = $4,
312 venue_order_id = $5,
313 position_id = $6,
314 account_id = $7,
315 last_trade_id = $8,
316 order_type = $9,
317 order_side = $10,
318 quantity = $11,
319 price = $12,
320 trigger_price = $13,
321 trigger_type = $14,
322 limit_offset = $15,
323 trailing_offset = $16,
324 trailing_offset_type = $17::TRAILING_OFFSET_TYPE,
325 time_in_force = $18,
326 expire_time = $19,
327 filled_qty = $20,
328 liquidity_side = $21,
329 avg_px = $22,
330 slippage = $23,
331 commissions = $24,
332 status = $25,
333 is_post_only = $26,
334 is_reduce_only = $27,
335 is_quote_quantity = $28,
336 display_qty = $29,
337 emulation_trigger = $30,
338 trigger_instrument_id = $31,
339 contingency_type = $32,
340 order_list_id = $33,
341 linked_order_ids = $34,
342 parent_order_id = $35,
343 exec_algorithm_id = $36,
344 exec_algorithm_params = $37,
345 exec_spawn_id = $38,
346 tags = $39,
347 init_id = $40,
348 ts_init = $41,
349 ts_last = $42,
350 updated_at = CURRENT_TIMESTAMP
351 "#)
352 .bind(snapshot.client_order_id.to_string()) .bind(snapshot.trader_id.to_string())
354 .bind(snapshot.strategy_id.to_string())
355 .bind(snapshot.instrument_id.to_string())
356 .bind(snapshot.venue_order_id.map(|x| x.to_string()))
357 .bind(snapshot.position_id.map(|x| x.to_string()))
358 .bind(snapshot.account_id.map(|x| x.to_string()))
359 .bind(snapshot.last_trade_id.map(|x| x.to_string()))
360 .bind(snapshot.order_type.to_string())
361 .bind(snapshot.order_side.to_string())
362 .bind(snapshot.quantity.to_string())
363 .bind(snapshot.price.map(|x| x.to_string()))
364 .bind(snapshot.trigger_price.map(|x| x.to_string()))
365 .bind(snapshot.trigger_type.map(|x| x.to_string()))
366 .bind(snapshot.limit_offset.map(|x| x.to_string()))
367 .bind(snapshot.trailing_offset.map(|x| x.to_string()))
368 .bind(snapshot.trailing_offset_type.map(|x| x.to_string()))
369 .bind(snapshot.time_in_force.to_string())
370 .bind(snapshot.expire_time.map(|x| x.to_string()))
371 .bind(snapshot.filled_qty.to_string())
372 .bind(snapshot.liquidity_side.map(|x| x.to_string()))
373 .bind(snapshot.avg_px)
374 .bind(snapshot.slippage)
375 .bind(snapshot.commissions.iter().map(|x| x.to_string()).collect::<Vec<String>>())
376 .bind(snapshot.status.to_string())
377 .bind(snapshot.is_post_only)
378 .bind(snapshot.is_reduce_only)
379 .bind(snapshot.is_quote_quantity)
380 .bind(snapshot.display_qty.map(|x| x.to_string()))
381 .bind(snapshot.emulation_trigger.map(|x| x.to_string()))
382 .bind(snapshot.trigger_instrument_id.map(|x| x.to_string()))
383 .bind(snapshot.contingency_type.map(|x| x.to_string()))
384 .bind(snapshot.order_list_id.map(|x| x.to_string()))
385 .bind(snapshot.linked_order_ids.map(|x| x.iter().map(|x| x.to_string()).collect::<Vec<String>>()))
386 .bind(snapshot.parent_order_id.map(|x| x.to_string()))
387 .bind(snapshot.exec_algorithm_id.map(|x| x.to_string()))
388 .bind(snapshot.exec_algorithm_params.map(|x| serde_json::to_value(x).unwrap()))
389 .bind(snapshot.exec_spawn_id.map(|x| x.to_string()))
390 .bind(snapshot.tags.map(|x| x.iter().map(|x| x.to_string()).collect::<Vec<String>>()))
391 .bind(snapshot.init_id.to_string())
392 .bind(snapshot.ts_init.to_string())
393 .bind(snapshot.ts_last.to_string())
394 .execute(&mut *transaction)
395 .await
396 .map(|_| ())
397 .map_err(|e| anyhow::anyhow!("Failed to insert into order table: {e}"))?;
398
399 transaction
400 .commit()
401 .await
402 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
403 }
404
405 pub async fn load_order_snapshot(
406 pool: &PgPool,
407 client_order_id: &ClientOrderId,
408 ) -> anyhow::Result<Option<OrderSnapshot>> {
409 sqlx::query_as::<_, OrderSnapshotModel>(
410 r#"SELECT * FROM "order" WHERE client_order_id = $1"#,
411 )
412 .bind(client_order_id.to_string())
413 .fetch_optional(pool)
414 .await
415 .map(|model| model.map(|m| m.0))
416 .map_err(|e| anyhow::anyhow!("Failed to load order snapshot: {e}"))
417 }
418
419 pub async fn add_position_snapshot(
420 pool: &PgPool,
421 snapshot: PositionSnapshot,
422 ) -> anyhow::Result<()> {
423 let mut transaction = pool.begin().await?;
424
425 sqlx::query(
428 r#"
429 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
430 "#,
431 )
432 .bind(snapshot.trader_id.to_string())
433 .execute(&mut *transaction)
434 .await
435 .map(|_| ())
436 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
437
438 sqlx::query(r#"
439 INSERT INTO "position" (
440 id, trader_id, strategy_id, instrument_id, account_id, opening_order_id, closing_order_id, entry, side, signed_qty, quantity, peak_qty,
441 quote_currency, base_currency, settlement_currency, avg_px_open, avg_px_close, realized_return, realized_pnl, unrealized_pnl, commissions,
442 duration_ns, ts_opened, ts_closed, ts_init, ts_last, created_at, updated_at
443 ) VALUES (
444 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
445 $21, $22, $23, $24, $25, $26, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
446 )
447 ON CONFLICT (id)
448 DO UPDATE
449 SET
450 trader_id = $2, strategy_id = $3, instrument_id = $4, account_id = $5, opening_order_id = $6, closing_order_id = $7, entry = $8, side = $9, signed_qty = $10, quantity = $11,
451 peak_qty = $12, quote_currency = $13, base_currency = $14, settlement_currency = $15, avg_px_open = $16, avg_px_close = $17, realized_return = $18, realized_pnl = $19, unrealized_pnl = $20,
452 commissions = $21, duration_ns = $22, ts_opened = $23, ts_closed = $24, ts_init = $25, ts_last = $26, updated_at = CURRENT_TIMESTAMP
453 "#)
454 .bind(snapshot.position_id.to_string())
455 .bind(snapshot.trader_id.to_string())
456 .bind(snapshot.strategy_id.to_string())
457 .bind(snapshot.instrument_id.to_string())
458 .bind(snapshot.account_id.to_string())
459 .bind(snapshot.opening_order_id.to_string())
460 .bind(snapshot.closing_order_id.map(|x| x.to_string()))
461 .bind(snapshot.entry.to_string())
462 .bind(snapshot.side.to_string())
463 .bind(snapshot.signed_qty)
464 .bind(snapshot.quantity.to_string())
465 .bind(snapshot.peak_qty.to_string())
466 .bind(snapshot.quote_currency.to_string())
467 .bind(snapshot.base_currency.map(|x| x.to_string()))
468 .bind(snapshot.settlement_currency.to_string())
469 .bind(snapshot.avg_px_open)
470 .bind(snapshot.avg_px_close)
471 .bind(snapshot.realized_return)
472 .bind(snapshot.realized_pnl.map(|x| x.to_string()))
473 .bind(snapshot.unrealized_pnl.map(|x| x.to_string()))
474 .bind(snapshot.commissions.iter().map(|x| x.to_string()).collect::<Vec<String>>())
475 .bind(snapshot.duration_ns.map(|x| x.to_string()))
476 .bind(snapshot.ts_opened.to_string())
477 .bind(snapshot.ts_closed.map(|x| x.to_string()))
478 .bind(snapshot.ts_init.to_string())
479 .bind(snapshot.ts_last.to_string())
480 .execute(&mut *transaction)
481 .await
482 .map(|_| ())
483 .map_err(|e| anyhow::anyhow!("Failed to insert into position table: {e}"))?;
484 transaction
485 .commit()
486 .await
487 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
488 }
489
490 pub async fn load_position_snapshot(
491 pool: &PgPool,
492 position_id: &PositionId,
493 ) -> anyhow::Result<Option<PositionSnapshot>> {
494 sqlx::query_as::<_, PositionSnapshotModel>(r#"SELECT * FROM "position" WHERE id = $1"#)
495 .bind(position_id.to_string())
496 .fetch_optional(pool)
497 .await
498 .map(|model| model.map(|m| m.0))
499 .map_err(|e| anyhow::anyhow!("Failed to load position snapshot: {e}"))
500 }
501
502 pub async fn check_if_order_initialized_exists(
503 pool: &PgPool,
504 client_order_id: ClientOrderId,
505 ) -> anyhow::Result<bool> {
506 sqlx::query(r#"
507 SELECT EXISTS(SELECT 1 FROM "order_event" WHERE client_order_id = $1 AND kind = 'OrderInitialized')
508 "#)
509 .bind(client_order_id.to_string())
510 .fetch_one(pool)
511 .await
512 .map(|row| row.get(0))
513 .map_err(|e| anyhow::anyhow!("Failed to check if order initialized exists: {e}"))
514 }
515
516 pub async fn check_if_account_event_exists(
517 pool: &PgPool,
518 account_id: AccountId,
519 ) -> anyhow::Result<bool> {
520 sqlx::query(
521 r#"
522 SELECT EXISTS(SELECT 1 FROM "account_event" WHERE account_id = $1)
523 "#,
524 )
525 .bind(account_id.to_string())
526 .fetch_one(pool)
527 .await
528 .map(|row| row.get(0))
529 .map_err(|e| anyhow::anyhow!("Failed to check if account event exists: {e}"))
530 }
531
532 pub async fn add_order_event(
533 pool: &PgPool,
534 order_event: Box<dyn OrderEvent>,
535 client_id: Option<ClientId>,
536 ) -> anyhow::Result<()> {
537 let mut transaction = pool.begin().await?;
538
539 sqlx::query(
542 r#"
543 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
544 "#,
545 )
546 .bind(order_event.trader_id().to_string())
547 .execute(&mut *transaction)
548 .await
549 .map(|_| ())
550 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
551
552 if let Some(client_id) = client_id {
555 sqlx::query(
556 r#"
557 INSERT INTO "client" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
558 "#,
559 )
560 .bind(client_id.to_string())
561 .execute(&mut *transaction)
562 .await
563 .map(|_| ())
564 .map_err(|e| anyhow::anyhow!("Failed to insert into client table: {e}"))?;
565 }
566
567 sqlx::query(r#"
568 INSERT INTO "order_event" (
569 id, kind, client_order_id, order_type, order_side, trader_id, client_id, strategy_id, instrument_id, trade_id, currency, quantity, time_in_force, liquidity_side,
570 post_only, reduce_only, quote_quantity, reconciliation, price, last_px, last_qty, trigger_price, trigger_type, limit_offset, trailing_offset,
571 trailing_offset_type, expire_time, display_qty, emulation_trigger, trigger_instrument_id, contingency_type,
572 order_list_id, linked_order_ids, parent_order_id,
573 exec_algorithm_id, exec_spawn_id, venue_order_id, account_id, position_id, commission, ts_event, ts_init, created_at, updated_at
574 ) VALUES (
575 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
576 $21, $22, $23, $24, $25::trailing_offset_type, $26, $27, $28, $29, $30, $31, $32, $33, $34,
577 $35, $36, $37, $38, $39, $40, $41, $42, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
578 )
579 ON CONFLICT (id)
580 DO UPDATE
581 SET
582 kind = $2, client_order_id = $3, order_type = $4, order_side=$5, trader_id = $6, client_id = $7, strategy_id = $8, instrument_id = $9, trade_id = $10, currency = $11,
583 quantity = $12, time_in_force = $13, liquidity_side = $14, post_only = $15, reduce_only = $16, quote_quantity = $17, reconciliation = $18, price = $19, last_px = $20,
584 last_qty = $21, trigger_price = $22, trigger_type = $23, limit_offset = $24, trailing_offset = $25, trailing_offset_type = $26, expire_time = $27, display_qty = $28,
585 emulation_trigger = $29, trigger_instrument_id = $30, contingency_type = $31, order_list_id = $32, linked_order_ids = $33, parent_order_id = $34, exec_algorithm_id = $35,
586 exec_spawn_id = $36, venue_order_id = $37, account_id = $38, position_id = $39, commission = $40, ts_event = $41, ts_init = $42, updated_at = CURRENT_TIMESTAMP
587
588 "#)
589 .bind(order_event.id().to_string())
590 .bind(order_event.kind())
591 .bind(order_event.client_order_id().to_string())
592 .bind(order_event.order_type().map(|x| x.to_string()))
593 .bind(order_event.order_side().map(|x| x.to_string()))
594 .bind(order_event.trader_id().to_string())
595 .bind(client_id.map(|x| x.to_string()))
596 .bind(order_event.strategy_id().to_string())
597 .bind(order_event.instrument_id().to_string())
598 .bind(order_event.trade_id().map(|x| x.to_string()))
599 .bind(order_event.currency().map(|x| x.code.as_str()))
600 .bind(order_event.quantity().map(|x| x.to_string()))
601 .bind(order_event.time_in_force().map(|x| x.to_string()))
602 .bind(order_event.liquidity_side().map(|x| x.to_string()))
603 .bind(order_event.post_only())
604 .bind(order_event.reduce_only())
605 .bind(order_event.quote_quantity())
606 .bind(order_event.reconciliation())
607 .bind(order_event.price().map(|x| x.to_string()))
608 .bind(order_event.last_px().map(|x| x.to_string()))
609 .bind(order_event.last_qty().map(|x| x.to_string()))
610 .bind(order_event.trigger_price().map(|x| x.to_string()))
611 .bind(order_event.trigger_type().map(|x| x.to_string()))
612 .bind(order_event.limit_offset().map(|x| x.to_string()))
613 .bind(order_event.trailing_offset().map(|x| x.to_string()))
614 .bind(order_event.trailing_offset_type().map(TrailingOffsetTypeModel))
615 .bind(order_event.expire_time().map(|x| x.to_string()))
616 .bind(order_event.display_qty().map(|x| x.to_string()))
617 .bind(order_event.emulation_trigger().map(|x| x.to_string()))
618 .bind(order_event.trigger_instrument_id().map(|x| x.to_string()))
619 .bind(order_event.contingency_type().map(|x| x.to_string()))
620 .bind(order_event.order_list_id().map(|x| x.to_string()))
621 .bind(order_event.linked_order_ids().map(|x| x.iter().map(|x| x.to_string()).collect::<Vec<String>>()))
622 .bind(order_event.parent_order_id().map(|x| x.to_string()))
623 .bind(order_event.exec_algorithm_id().map(|x| x.to_string()))
624 .bind(order_event.exec_spawn_id().map(|x| x.to_string()))
625 .bind(order_event.venue_order_id().map(|x| x.to_string()))
626 .bind(order_event.account_id().map(|x| x.to_string()))
627 .bind(order_event.position_id().map(|x| x.to_string()))
628 .bind(order_event.commission().map(|x| x.to_string()))
629 .bind(order_event.ts_event().to_string())
630 .bind(order_event.ts_init().to_string())
631 .execute(&mut *transaction)
632 .await
633 .map(|_| ())
634 .map_err(|e| anyhow::anyhow!("Failed to insert into order_event table: {e}"))?;
635 transaction
636 .commit()
637 .await
638 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
639 }
640
641 pub async fn load_order_events(
642 pool: &PgPool,
643 client_order_id: &ClientOrderId,
644 ) -> anyhow::Result<Vec<OrderEventAny>> {
645 sqlx::query_as::<_, OrderEventAnyModel>(r#"SELECT * FROM "order_event" event WHERE event.client_order_id = $1 ORDER BY created_at ASC"#)
646 .bind(client_order_id.to_string())
647 .fetch_all(pool)
648 .await
649 .map(|rows| rows.into_iter().map(|row| row.0).collect())
650 .map_err(|e| anyhow::anyhow!("Failed to load order events: {e}"))
651 }
652
653 pub async fn load_order(
654 pool: &PgPool,
655 client_order_id: &ClientOrderId,
656 ) -> anyhow::Result<Option<OrderAny>> {
657 let order_events = DatabaseQueries::load_order_events(pool, client_order_id).await;
658
659 match order_events {
660 Ok(order_events) => {
661 if order_events.is_empty() {
662 return Ok(None);
663 }
664 let order = OrderAny::from_events(order_events).unwrap();
665 Ok(Some(order))
666 }
667 Err(e) => anyhow::bail!("Failed to load order events: {e}"),
668 }
669 }
670
671 pub async fn load_orders(pool: &PgPool) -> anyhow::Result<Vec<OrderAny>> {
672 let mut orders: Vec<OrderAny> = Vec::new();
673 let client_order_ids: Vec<ClientOrderId> = sqlx::query(
674 r#"
675 SELECT DISTINCT client_order_id FROM "order_event"
676 "#,
677 )
678 .fetch_all(pool)
679 .await
680 .map(|rows| {
681 rows.into_iter()
682 .map(|row| ClientOrderId::from(row.get::<&str, _>(0)))
683 .collect()
684 })
685 .map_err(|e| anyhow::anyhow!("Failed to load order ids: {e}"))?;
686 for id in client_order_ids {
687 let order = DatabaseQueries::load_order(pool, &id).await.unwrap();
688 match order {
689 Some(order) => {
690 orders.push(order);
691 }
692 None => {
693 continue;
694 }
695 }
696 }
697 Ok(orders)
698 }
699
700 pub async fn add_account(
701 pool: &PgPool,
702 kind: &str,
703 updated: bool,
704 account: Box<dyn Account>,
705 ) -> anyhow::Result<()> {
706 if updated {
707 let exists = DatabaseQueries::check_if_account_event_exists(pool, account.id())
708 .await
709 .unwrap();
710 if !exists {
711 panic!("Account event does not exist for account: {}", account.id());
712 }
713 }
714
715 let mut transaction = pool.begin().await?;
716
717 sqlx::query(
718 r#"
719 INSERT INTO "account" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
720 "#,
721 )
722 .bind(account.id().to_string())
723 .execute(&mut *transaction)
724 .await
725 .map(|_| ())
726 .map_err(|e| anyhow::anyhow!("Failed to insert into account table: {e}"))?;
727
728 let account_event = account.last_event().unwrap();
729 sqlx::query(r#"
730 INSERT INTO "account_event" (
731 id, kind, account_id, base_currency, balances, margins, is_reported, ts_event, ts_init, created_at, updated_at
732 ) VALUES (
733 $1, $2, $3, $4, $5, $6, $7, $8, $9, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
734 )
735 ON CONFLICT (id)
736 DO UPDATE
737 SET
738 kind = $2, account_id = $3, base_currency = $4, balances = $5, margins = $6, is_reported = $7,
739 ts_event = $8, ts_init = $9, updated_at = CURRENT_TIMESTAMP
740 "#)
741 .bind(account_event.event_id.to_string())
742 .bind(kind.to_string())
743 .bind(account_event.account_id.to_string())
744 .bind(account_event.base_currency.map(|x| x.code.as_str()))
745 .bind(serde_json::to_value::<Vec<AccountBalance>>(account_event.balances).unwrap())
746 .bind(serde_json::to_value::<Vec<MarginBalance>>(account_event.margins).unwrap())
747 .bind(account_event.is_reported)
748 .bind(account_event.ts_event.to_string())
749 .bind(account_event.ts_init.to_string())
750 .execute(&mut *transaction)
751 .await
752 .map(|_| ())
753 .map_err(|e| anyhow::anyhow!("Failed to insert into account_event table: {e}"))?;
754 transaction
755 .commit()
756 .await
757 .map_err(|e| anyhow::anyhow!("Failed to commit add_account transaction: {e}"))
758 }
759
760 pub async fn load_account_events(
761 pool: &PgPool,
762 account_id: &AccountId,
763 ) -> anyhow::Result<Vec<AccountState>> {
764 sqlx::query_as::<_, AccountEventModel>(
765 r#"SELECT * FROM "account_event" WHERE account_id = $1 ORDER BY created_at ASC"#,
766 )
767 .bind(account_id.to_string())
768 .fetch_all(pool)
769 .await
770 .map(|rows| rows.into_iter().map(|row| row.0).collect())
771 .map_err(|e| anyhow::anyhow!("Failed to load account events: {e}"))
772 }
773
774 pub async fn load_account(
775 pool: &PgPool,
776 account_id: &AccountId,
777 ) -> anyhow::Result<Option<AccountAny>> {
778 let account_events = DatabaseQueries::load_account_events(pool, account_id).await;
779 match account_events {
780 Ok(account_events) => {
781 if account_events.is_empty() {
782 return Ok(None);
783 }
784 let account = AccountAny::from_events(account_events).unwrap();
785 Ok(Some(account))
786 }
787 Err(e) => anyhow::bail!("Failed to load account events: {e}"),
788 }
789 }
790
791 pub async fn load_accounts(pool: &PgPool) -> anyhow::Result<Vec<AccountAny>> {
792 let mut accounts: Vec<AccountAny> = Vec::new();
793 let account_ids: Vec<AccountId> = sqlx::query(
794 r#"
795 SELECT DISTINCT account_id FROM "account_event"
796 "#,
797 )
798 .fetch_all(pool)
799 .await
800 .map(|rows| {
801 rows.into_iter()
802 .map(|row| AccountId::from(row.get::<&str, _>(0)))
803 .collect()
804 })
805 .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
806 for id in account_ids {
807 let account = DatabaseQueries::load_account(pool, &id).await.unwrap();
808 match account {
809 Some(account) => {
810 accounts.push(account);
811 }
812 None => {
813 continue;
814 }
815 }
816 }
817 Ok(accounts)
818 }
819
820 pub async fn add_trade(pool: &PgPool, trade: &TradeTick) -> anyhow::Result<()> {
821 sqlx::query(r#"
822 INSERT INTO "trade" (
823 instrument_id, price, quantity, aggressor_side, venue_trade_id,
824 ts_event, ts_init, created_at, updated_at
825 ) VALUES (
826 $1, $2, $3, $4::aggressor_side, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
827 )
828 ON CONFLICT (id)
829 DO UPDATE
830 SET
831 instrument_id = $1, price = $2, quantity = $3, aggressor_side = $4, venue_trade_id = $5,
832 ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
833 "#)
834 .bind(trade.instrument_id.to_string())
835 .bind(trade.price.to_string())
836 .bind(trade.size.to_string())
837 .bind(AggressorSideModel(trade.aggressor_side))
838 .bind(trade.trade_id.to_string())
839 .bind(trade.ts_event.to_string())
840 .bind(trade.ts_init.to_string())
841 .execute(pool)
842 .await
843 .map(|_| ())
844 .map_err(|e| anyhow::anyhow!("Failed to insert into trade table: {e}"))
845 }
846
847 pub async fn load_trades(
848 pool: &PgPool,
849 instrument_id: &InstrumentId,
850 ) -> anyhow::Result<Vec<TradeTick>> {
851 sqlx::query_as::<_, TradeTickModel>(
852 r#"SELECT * FROM "trade" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
853 )
854 .bind(instrument_id.to_string())
855 .fetch_all(pool)
856 .await
857 .map(|rows| rows.into_iter().map(|row| row.0).collect())
858 .map_err(|e| anyhow::anyhow!("Failed to load trades: {e}"))
859 }
860
861 pub async fn add_quote(pool: &PgPool, quote: &QuoteTick) -> anyhow::Result<()> {
862 sqlx::query(r#"
863 INSERT INTO "quote" (
864 instrument_id, bid_price, ask_price, bid_size, ask_size, ts_event, ts_init, created_at, updated_at
865 ) VALUES (
866 $1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
867 )
868 ON CONFLICT (id)
869 DO UPDATE
870 SET
871 instrument_id = $1, bid_price = $2, ask_price = $3, bid_size = $4, ask_size = $5,
872 ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
873 "#)
874 .bind(quote.instrument_id.to_string())
875 .bind(quote.bid_price.to_string())
876 .bind(quote.ask_price.to_string())
877 .bind(quote.bid_size.to_string())
878 .bind(quote.ask_size.to_string())
879 .bind(quote.ts_event.to_string())
880 .bind(quote.ts_init.to_string())
881 .execute(pool)
882 .await
883 .map(|_| ())
884 .map_err(|e| anyhow::anyhow!("Failed to insert into quote table: {e}"))
885 }
886
887 pub async fn load_quotes(
888 pool: &PgPool,
889 instrument_id: &InstrumentId,
890 ) -> anyhow::Result<Vec<QuoteTick>> {
891 sqlx::query_as::<_, QuoteTickModel>(
892 r#"SELECT * FROM "quote" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
893 )
894 .bind(instrument_id.to_string())
895 .fetch_all(pool)
896 .await
897 .map(|rows| rows.into_iter().map(|row| row.0).collect())
898 .map_err(|e| anyhow::anyhow!("Failed to load quotes: {e}"))
899 }
900
901 pub async fn add_bar(pool: &PgPool, bar: &Bar) -> anyhow::Result<()> {
902 println!("Adding bar: {:?}", bar);
903 sqlx::query(r#"
904 INSERT INTO "bar" (
905 instrument_id, step, bar_aggregation, price_type, aggregation_source, open, high, low, close, volume, ts_event, ts_init, created_at, updated_at
906 ) VALUES (
907 $1, $2, $3::bar_aggregation, $4::price_type, $5::aggregation_source, $6, $7, $8, $9, $10, $11, $12, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
908 )
909 ON CONFLICT (id)
910 DO UPDATE
911 SET
912 instrument_id = $1, step = $2, bar_aggregation = $3::bar_aggregation, price_type = $4::price_type, aggregation_source = $5::aggregation_source,
913 open = $6, high = $7, low = $8, close = $9, volume = $10, ts_event = $11, ts_init = $12, updated_at = CURRENT_TIMESTAMP
914 "#)
915 .bind(bar.bar_type.instrument_id().to_string())
916 .bind(bar.bar_type.spec().step.get() as i32)
917 .bind(BarAggregationModel(bar.bar_type.spec().aggregation))
918 .bind(PriceTypeModel(bar.bar_type.spec().price_type))
919 .bind(AggregationSourceModel(bar.bar_type.aggregation_source()))
920 .bind(bar.open.to_string())
921 .bind(bar.high.to_string())
922 .bind(bar.low.to_string())
923 .bind(bar.close.to_string())
924 .bind(bar.volume.to_string())
925 .bind(bar.ts_event.to_string())
926 .bind(bar.ts_init.to_string())
927 .execute(pool)
928 .await
929 .map(|_| ())
930 .map_err(|e| anyhow::anyhow!("Failed to insert into bar table: {e}"))
931 }
932
933 pub async fn load_bars(
934 pool: &PgPool,
935 instrument_id: &InstrumentId,
936 ) -> anyhow::Result<Vec<Bar>> {
937 sqlx::query_as::<_, BarModel>(
938 r#"SELECT * FROM "bar" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
939 )
940 .bind(instrument_id.to_string())
941 .fetch_all(pool)
942 .await
943 .map(|rows| rows.into_iter().map(|row| row.0).collect())
944 .map_err(|e| anyhow::anyhow!("Failed to load bars: {e}"))
945 }
946
947 pub async fn load_distinct_order_event_client_ids(
948 pool: &PgPool,
949 ) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
950 let mut map: HashMap<ClientOrderId, ClientId> = HashMap::new();
951 let result = sqlx::query_as::<_, OrderEventOrderClientIdCombination>(
952 r#"
953 SELECT DISTINCT
954 client_order_id AS "client_order_id",
955 client_id AS "client_id"
956 FROM "order_event"
957 "#,
958 )
959 .fetch_all(pool)
960 .await
961 .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
962 for id in result {
963 map.insert(id.client_order_id, id.client_id);
964 }
965 Ok(map)
966 }
967
968 pub async fn add_signal(pool: &PgPool, signal: &Signal) -> anyhow::Result<()> {
969 sqlx::query(
970 r#"
971 INSERT INTO "signal" (
972 name, value, ts_event, ts_init, created_at, updated_at
973 ) VALUES (
974 $1, $2, $3, $4, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
975 )
976 ON CONFLICT (id)
977 DO UPDATE
978 SET
979 name = $1, value = $2, ts_event = $3, ts_init = $4,
980 updated_at = CURRENT_TIMESTAMP
981 "#,
982 )
983 .bind(signal.name.to_string())
984 .bind(signal.value.to_string())
985 .bind(signal.ts_event.to_string())
986 .bind(signal.ts_init.to_string())
987 .execute(pool)
988 .await
989 .map(|_| ())
990 .map_err(|e| anyhow::anyhow!("Failed to insert into signal table: {e}"))
991 }
992
993 pub async fn load_signals(pool: &PgPool, name: &str) -> anyhow::Result<Vec<Signal>> {
994 sqlx::query_as::<_, SignalModel>(
995 r#"SELECT * FROM "signal" WHERE name = $1 ORDER BY ts_init ASC"#,
996 )
997 .bind(name)
998 .fetch_all(pool)
999 .await
1000 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1001 .map_err(|e| anyhow::anyhow!("Failed to load signals: {e}"))
1002 }
1003
1004 pub async fn add_custom_data(pool: &PgPool, data: &CustomData) -> anyhow::Result<()> {
1005 sqlx::query(
1006 r#"
1007 INSERT INTO "custom" (
1008 data_type, metadata, value, ts_event, ts_init, created_at, updated_at
1009 ) VALUES (
1010 $1, $2, $3, $4, $5, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1011 )
1012 ON CONFLICT (id)
1013 DO UPDATE
1014 SET
1015 data_type = $1, metadata = $2, value = $3, ts_event = $4, ts_init = $5,
1016 updated_at = CURRENT_TIMESTAMP
1017 "#,
1018 )
1019 .bind(data.data_type.type_name().to_string())
1020 .bind(
1021 data.data_type
1022 .metadata()
1023 .as_ref()
1024 .map_or_else(|| Ok(serde_json::Value::Null), serde_json::to_value)?,
1025 )
1026 .bind(data.value.to_vec())
1027 .bind(data.ts_event.to_string())
1028 .bind(data.ts_init.to_string())
1029 .execute(pool)
1030 .await
1031 .map(|_| ())
1032 .map_err(|e| anyhow::anyhow!("Failed to insert into custom table: {e}"))
1033 }
1034
1035 pub async fn load_custom_data(
1036 pool: &PgPool,
1037 data_type: &DataType,
1038 ) -> anyhow::Result<Vec<CustomData>> {
1039 let metadata_json = data_type
1041 .metadata()
1042 .as_ref()
1043 .map_or(Ok(serde_json::Value::Null), |metadata| {
1044 serde_json::to_value(metadata)
1045 })?;
1046
1047 sqlx::query_as::<_, CustomDataModel>(
1048 r#"SELECT * FROM "custom" WHERE data_type = $1 AND metadata = $2 ORDER BY ts_init ASC"#,
1049 )
1050 .bind(data_type.type_name())
1051 .bind(metadata_json)
1052 .fetch_all(pool)
1053 .await
1054 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1055 .map_err(|e| anyhow::anyhow!("Failed to load custom data: {e}"))
1056 }
1057}