nautilus_infrastructure/sql/
queries.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::HashMap;
17
18use ahash::AHashMap;
19use nautilus_common::{custom::CustomData, signal::Signal};
20use nautilus_model::{
21    accounts::{Account, AccountAny},
22    data::{Bar, DataType, QuoteTick, TradeTick},
23    events::{
24        AccountState, OrderEvent, OrderEventAny, OrderSnapshot,
25        position::snapshot::PositionSnapshot,
26    },
27    identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, PositionId},
28    instruments::{Instrument, InstrumentAny},
29    orders::{Order, OrderAny},
30    types::{AccountBalance, Currency, MarginBalance},
31};
32use sqlx::{PgPool, Row};
33
34use super::models::{
35    orders::OrderSnapshotModel,
36    positions::PositionSnapshotModel,
37    types::{CustomDataModel, SignalModel},
38};
39use crate::sql::models::{
40    accounts::AccountEventModel,
41    data::{BarModel, QuoteTickModel, TradeTickModel},
42    enums::{
43        AggregationSourceModel, AggressorSideModel, AssetClassModel, BarAggregationModel,
44        CurrencyTypeModel, PriceTypeModel, TrailingOffsetTypeModel,
45    },
46    general::{GeneralRow, OrderEventOrderClientIdCombination},
47    instruments::InstrumentAnyModel,
48    orders::OrderEventAnyModel,
49    types::CurrencyModel,
50};
51
52#[derive(Debug)]
53pub struct DatabaseQueries;
54
55impl DatabaseQueries {
56    /// Truncates all tables in the cache database via the provided Postgres `pool`.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if the TRUNCATE operation fails.
61    pub async fn truncate(pool: &PgPool) -> anyhow::Result<()> {
62        sqlx::query("SELECT truncate_all_tables()")
63            .execute(pool)
64            .await
65            .map(|_| ())
66            .map_err(|e| anyhow::anyhow!("Failed to truncate tables: {e}"))
67    }
68
69    /// Inserts a raw key-value entry into the `general` table via the provided `pool`.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the INSERT operation fails.
74    pub async fn add(pool: &PgPool, key: String, value: Vec<u8>) -> anyhow::Result<()> {
75        sqlx::query("INSERT INTO general (id, value) VALUES ($1, $2)")
76            .bind(key)
77            .bind(value)
78            .execute(pool)
79            .await
80            .map(|_| ())
81            .map_err(|e| anyhow::anyhow!("Failed to insert into general table: {e}"))
82    }
83
84    /// Loads all entries from the `general` table via the provided `pool`.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the SELECT operation fails.
89    pub async fn load(pool: &PgPool) -> anyhow::Result<HashMap<String, Vec<u8>>> {
90        sqlx::query_as::<_, GeneralRow>("SELECT * FROM general")
91            .fetch_all(pool)
92            .await
93            .map(|rows| {
94                let mut cache: HashMap<String, Vec<u8>> = HashMap::new();
95                for row in rows {
96                    cache.insert(row.id, row.value);
97                }
98                cache
99            })
100            .map_err(|e| anyhow::anyhow!("Failed to load general table: {e}"))
101    }
102
103    /// Inserts or ignores a `Currency` row via the provided `pool`.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the INSERT operation fails.
108    pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> {
109        sqlx::query(
110            "INSERT INTO currency (id, precision, iso4217, name, currency_type) VALUES ($1, $2, $3, $4, $5::currency_type) ON CONFLICT (id) DO NOTHING"
111        )
112            .bind(currency.code.as_str())
113            .bind(i32::from(currency.precision))
114            .bind(i32::from(currency.iso4217))
115            .bind(currency.name.as_str())
116            .bind(CurrencyTypeModel(currency.currency_type))
117            .execute(pool)
118            .await
119            .map(|_| ())
120            .map_err(|e| anyhow::anyhow!("Failed to insert into currency table: {e}"))
121    }
122
123    /// Loads all `Currency` entries via the provided `pool`.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the SELECT operation fails.
128    pub async fn load_currencies(pool: &PgPool) -> anyhow::Result<Vec<Currency>> {
129        sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency ORDER BY id ASC")
130            .fetch_all(pool)
131            .await
132            .map(|rows| rows.into_iter().map(|row| row.0).collect())
133            .map_err(|e| anyhow::anyhow!("Failed to load currencies: {e}"))
134    }
135
136    /// Loads a single `Currency` entry by `code` via the provided `pool`.
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the SELECT operation fails.
141    pub async fn load_currency(pool: &PgPool, code: &str) -> anyhow::Result<Option<Currency>> {
142        sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency WHERE id = $1")
143            .bind(code)
144            .fetch_optional(pool)
145            .await
146            .map(|currency| currency.map(|row| row.0))
147            .map_err(|e| anyhow::anyhow!("Failed to load currency: {e}"))
148    }
149
150    /// Inserts or updates an `InstrumentAny` entry via the provided `pool`.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the INSERT or UPDATE operation fails.
155    pub async fn add_instrument(
156        pool: &PgPool,
157        kind: &str,
158        instrument: Box<dyn Instrument>,
159    ) -> anyhow::Result<()> {
160        sqlx::query(r#"
161            INSERT INTO "instrument" (
162                id, kind, raw_symbol, base_currency, underlying, quote_currency, settlement_currency, isin, asset_class, exchange,
163                multiplier, option_kind, is_inverse, strike_price, activation_ns, expiration_ns, price_precision, size_precision,
164                price_increment, size_increment, maker_fee, taker_fee, margin_init, margin_maint, lot_size, max_quantity, min_quantity, max_notional,
165                min_notional, max_price, min_price, ts_init, ts_event, created_at, updated_at
166            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::asset_class, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
167            ON CONFLICT (id)
168            DO UPDATE
169            SET
170                kind = $2, raw_symbol = $3, base_currency= $4, underlying = $5, quote_currency = $6, settlement_currency = $7, isin = $8, asset_class = $9, exchange = $10,
171                 multiplier = $11, option_kind = $12, is_inverse = $13, strike_price = $14, activation_ns = $15, expiration_ns = $16 , price_precision = $17, size_precision = $18,
172                 price_increment = $19, size_increment = $20, maker_fee = $21, taker_fee = $22, margin_init = $23, margin_maint = $24, lot_size = $25, max_quantity = $26,
173                 min_quantity = $27, max_notional = $28, min_notional = $29, max_price = $30, min_price = $31, ts_init = $32,  ts_event = $33, updated_at = CURRENT_TIMESTAMP
174            "#)
175            .bind(instrument.id().to_string())
176            .bind(kind)
177            .bind(instrument.raw_symbol().to_string())
178            .bind(instrument.base_currency().map(|x| x.code.as_str()))
179            .bind(instrument.underlying().map(|x| x.to_string()))
180            .bind(instrument.quote_currency().code.as_str())
181            .bind(instrument.settlement_currency().code.as_str())
182            .bind(instrument.isin().map(|x| x.to_string()))
183            .bind(AssetClassModel(instrument.asset_class()))
184            .bind(instrument.exchange().map(|x| x.to_string()))
185            .bind(instrument.multiplier().to_string())
186            .bind(instrument.option_kind().map(|x| x.to_string()))
187            .bind(instrument.is_inverse())
188            .bind(instrument.strike_price().map(|x| x.to_string()))
189            .bind(instrument.activation_ns().map(|x| x.to_string()))
190            .bind(instrument.expiration_ns().map(|x| x.to_string()))
191            .bind(i32::from(instrument.price_precision()))
192            .bind(i32::from(instrument.size_precision()))
193            .bind(instrument.price_increment().to_string())
194            .bind(instrument.size_increment().to_string())
195            .bind(instrument.maker_fee().to_string())
196            .bind(instrument.taker_fee().to_string())
197            .bind(instrument.margin_init().to_string())
198            .bind(instrument.margin_maint().to_string())
199            .bind(instrument.lot_size().map(|x| x.to_string()))
200            .bind(instrument.max_quantity().map(|x| x.to_string()))
201            .bind(instrument.min_quantity().map(|x| x.to_string()))
202            .bind(instrument.max_notional().map(|x| x.to_string()))
203            .bind(instrument.min_notional().map(|x| x.to_string()))
204            .bind(instrument.max_price().map(|x| x.to_string()))
205            .bind(instrument.min_price().map(|x| x.to_string()))
206            .bind(instrument.ts_init().to_string())
207            .bind(instrument.ts_event().to_string())
208            .execute(pool)
209            .await
210            .map(|_| ())
211            .map_err(|e| anyhow::anyhow!(format!("Failed to insert item {} into instrument table: {:?}", instrument.id().to_string(), e)))
212    }
213
214    /// Loads a single `InstrumentAny` entry by `instrument_id` via the provided `pool`.
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if the SELECT operation fails.
219    pub async fn load_instrument(
220        pool: &PgPool,
221        instrument_id: &InstrumentId,
222    ) -> anyhow::Result<Option<InstrumentAny>> {
223        sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument WHERE id = $1")
224            .bind(instrument_id.to_string())
225            .fetch_optional(pool)
226            .await
227            .map(|instrument| instrument.map(|row| row.0))
228            .map_err(|e| {
229                anyhow::anyhow!("Failed to load instrument with id {instrument_id},error is: {e}")
230            })
231    }
232
233    /// Loads all `InstrumentAny` entries via the provided `pool`.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the SELECT operation fails.
238    pub async fn load_instruments(pool: &PgPool) -> anyhow::Result<Vec<InstrumentAny>> {
239        sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument")
240            .fetch_all(pool)
241            .await
242            .map(|rows| rows.into_iter().map(|row| row.0).collect())
243            .map_err(|e| anyhow::anyhow!("Failed to load instruments: {e}"))
244    }
245
246    /// Inserts or updates an `OrderAny` entry via the provided `pool`.
247    ///
248    /// # Errors
249    ///
250    /// Returns an error if the SQL INSERT or UPDATE operation fails.
251    ///
252    /// # Panics
253    ///
254    /// Panics if the order initialization existence check unwraps `None` after awaiting.
255    pub async fn add_order(
256        pool: &PgPool,
257        _kind: &str,
258        updated: bool,
259        order: Box<dyn Order>,
260        client_id: Option<ClientId>,
261    ) -> anyhow::Result<()> {
262        if updated {
263            let exists = Self::check_if_order_initialized_exists(pool, order.client_order_id())
264                .await
265                .unwrap();
266            assert!(
267                exists,
268                "OrderInitialized event does not exist for order: {}",
269                order.client_order_id()
270            );
271        }
272        match order.last_event().clone() {
273            OrderEventAny::Accepted(event) => {
274                Self::add_order_event(pool, Box::new(event), client_id).await
275            }
276            OrderEventAny::CancelRejected(event) => {
277                Self::add_order_event(pool, Box::new(event), client_id).await
278            }
279            OrderEventAny::Canceled(event) => {
280                Self::add_order_event(pool, Box::new(event), client_id).await
281            }
282            OrderEventAny::Denied(event) => {
283                Self::add_order_event(pool, Box::new(event), client_id).await
284            }
285            OrderEventAny::Emulated(event) => {
286                Self::add_order_event(pool, Box::new(event), client_id).await
287            }
288            OrderEventAny::Expired(event) => {
289                Self::add_order_event(pool, Box::new(event), client_id).await
290            }
291            OrderEventAny::Filled(event) => {
292                Self::add_order_event(pool, Box::new(event), client_id).await
293            }
294            OrderEventAny::Initialized(event) => {
295                Self::add_order_event(pool, Box::new(event), client_id).await
296            }
297            OrderEventAny::ModifyRejected(event) => {
298                Self::add_order_event(pool, Box::new(event), client_id).await
299            }
300            OrderEventAny::PendingCancel(event) => {
301                Self::add_order_event(pool, Box::new(event), client_id).await
302            }
303            OrderEventAny::PendingUpdate(event) => {
304                Self::add_order_event(pool, Box::new(event), client_id).await
305            }
306            OrderEventAny::Rejected(event) => {
307                Self::add_order_event(pool, Box::new(event), client_id).await
308            }
309            OrderEventAny::Released(event) => {
310                Self::add_order_event(pool, Box::new(event), client_id).await
311            }
312            OrderEventAny::Submitted(event) => {
313                Self::add_order_event(pool, Box::new(event), client_id).await
314            }
315            OrderEventAny::Updated(event) => {
316                Self::add_order_event(pool, Box::new(event), client_id).await
317            }
318            OrderEventAny::Triggered(event) => {
319                Self::add_order_event(pool, Box::new(event), client_id).await
320            }
321        }
322    }
323
324    /// Inserts an `OrderSnapshot` entry via the provided `pool`.
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if the SQL INSERT operation fails.
329    ///
330    /// # Panics
331    ///
332    /// Panics if serialization of `snapshot.exec_algorithm_params` fails.
333    pub async fn add_order_snapshot(pool: &PgPool, snapshot: OrderSnapshot) -> anyhow::Result<()> {
334        let mut transaction = pool.begin().await?;
335
336        // Insert trader if it does not exist
337        // TODO remove this when node and trader initialization is implemented
338        sqlx::query(
339            r#"
340            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
341            "#,
342        )
343        .bind(snapshot.trader_id.to_string())
344        .execute(&mut *transaction)
345        .await
346        .map(|_| ())
347        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
348
349        sqlx::query(
350            r#"
351            INSERT INTO "order" (
352                id, trader_id, strategy_id, instrument_id, client_order_id, venue_order_id, position_id,
353                account_id, last_trade_id, order_type, order_side, quantity, price, trigger_price,
354                trigger_type, limit_offset, trailing_offset, trailing_offset_type, time_in_force,
355                expire_time, filled_qty, liquidity_side, avg_px, slippage, commissions, status,
356                is_post_only, is_reduce_only, is_quote_quantity, display_qty, emulation_trigger,
357                trigger_instrument_id, contingency_type, order_list_id, linked_order_ids,
358                parent_order_id, exec_algorithm_id, exec_algorithm_params, exec_spawn_id, tags, init_id, ts_init, ts_last,
359                created_at, updated_at
360            ) VALUES (
361                $1, $2, $3, $4, $1, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
362                $17::TRAILING_OFFSET_TYPE, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28,
363                $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42,
364                CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
365            )
366            ON CONFLICT (id)
367            DO UPDATE SET
368                trader_id = $2,
369                strategy_id = $3,
370                instrument_id = $4,
371                venue_order_id = $5,
372                position_id = $6,
373                account_id = $7,
374                last_trade_id = $8,
375                order_type = $9,
376                order_side = $10,
377                quantity = $11,
378                price = $12,
379                trigger_price = $13,
380                trigger_type = $14,
381                limit_offset = $15,
382                trailing_offset = $16,
383                trailing_offset_type = $17::TRAILING_OFFSET_TYPE,
384                time_in_force = $18,
385                expire_time = $19,
386                filled_qty = $20,
387                liquidity_side = $21,
388                avg_px = $22,
389                slippage = $23,
390                commissions = $24,
391                status = $25,
392                is_post_only = $26,
393                is_reduce_only = $27,
394                is_quote_quantity = $28,
395                display_qty = $29,
396                emulation_trigger = $30,
397                trigger_instrument_id = $31,
398                contingency_type = $32,
399                order_list_id = $33,
400                linked_order_ids = $34,
401                parent_order_id = $35,
402                exec_algorithm_id = $36,
403                exec_algorithm_params = $37,
404                exec_spawn_id = $38,
405                tags = $39,
406                init_id = $40,
407                ts_init = $41,
408                ts_last = $42,
409                updated_at = CURRENT_TIMESTAMP
410        "#)
411            .bind(snapshot.client_order_id.to_string())  // Used for both id and client_order_id
412            .bind(snapshot.trader_id.to_string())
413            .bind(snapshot.strategy_id.to_string())
414            .bind(snapshot.instrument_id.to_string())
415            .bind(snapshot.venue_order_id.map(|x| x.to_string()))
416            .bind(snapshot.position_id.map(|x| x.to_string()))
417            .bind(snapshot.account_id.map(|x| x.to_string()))
418            .bind(snapshot.last_trade_id.map(|x| x.to_string()))
419            .bind(snapshot.order_type.to_string())
420            .bind(snapshot.order_side.to_string())
421            .bind(snapshot.quantity.to_string())
422            .bind(snapshot.price.map(|x| x.to_string()))
423            .bind(snapshot.trigger_price.map(|x| x.to_string()))
424            .bind(snapshot.trigger_type.map(|x| x.to_string()))
425            .bind(snapshot.limit_offset.map(|x| x.to_string()))
426            .bind(snapshot.trailing_offset.map(|x| x.to_string()))
427            .bind(snapshot.trailing_offset_type.map(|x| x.to_string()))
428            .bind(snapshot.time_in_force.to_string())
429            .bind(snapshot.expire_time.map(|x| x.to_string()))
430            .bind(snapshot.filled_qty.to_string())
431            .bind(snapshot.liquidity_side.map(|x| x.to_string()))
432            .bind(snapshot.avg_px)
433            .bind(snapshot.slippage)
434            .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
435            .bind(snapshot.status.to_string())
436            .bind(snapshot.is_post_only)
437            .bind(snapshot.is_reduce_only)
438            .bind(snapshot.is_quote_quantity)
439            .bind(snapshot.display_qty.map(|x| x.to_string()))
440            .bind(snapshot.emulation_trigger.map(|x| x.to_string()))
441            .bind(snapshot.trigger_instrument_id.map(|x| x.to_string()))
442            .bind(snapshot.contingency_type.map(|x| x.to_string()))
443            .bind(snapshot.order_list_id.map(|x| x.to_string()))
444            .bind(snapshot.linked_order_ids.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
445            .bind(snapshot.parent_order_id.map(|x| x.to_string()))
446            .bind(snapshot.exec_algorithm_id.map(|x| x.to_string()))
447            .bind(snapshot.exec_algorithm_params.map(|x| serde_json::to_value(x).unwrap()))
448            .bind(snapshot.exec_spawn_id.map(|x| x.to_string()))
449            .bind(snapshot.tags.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
450            .bind(snapshot.init_id.to_string())
451            .bind(snapshot.ts_init.to_string())
452            .bind(snapshot.ts_last.to_string())
453            .execute(&mut *transaction)
454            .await
455            .map(|_| ())
456            .map_err(|e| anyhow::anyhow!("Failed to insert into order table: {e}"))?;
457
458        transaction
459            .commit()
460            .await
461            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
462    }
463
464    /// Loads an `OrderSnapshot` entry by client order ID via the provided `pool`.
465    ///
466    /// # Errors
467    ///
468    /// Returns an error if the SQL SELECT or deserialization fails.
469    pub async fn load_order_snapshot(
470        pool: &PgPool,
471        client_order_id: &ClientOrderId,
472    ) -> anyhow::Result<Option<OrderSnapshot>> {
473        sqlx::query_as::<_, OrderSnapshotModel>(
474            r#"SELECT * FROM "order" WHERE client_order_id = $1"#,
475        )
476        .bind(client_order_id.to_string())
477        .fetch_optional(pool)
478        .await
479        .map(|model| model.map(|m| m.0))
480        .map_err(|e| anyhow::anyhow!("Failed to load order snapshot: {e}"))
481    }
482
483    /// Inserts or updates a `PositionSnapshot` entry via the provided `pool`.
484    ///
485    /// # Errors
486    ///
487    /// Returns an error if the SQL INSERT or UPDATE operation fails, or if beginning the transaction fails.
488    pub async fn add_position_snapshot(
489        pool: &PgPool,
490        snapshot: PositionSnapshot,
491    ) -> anyhow::Result<()> {
492        let mut transaction = pool.begin().await?;
493
494        // Insert trader if it does not exist
495        // TODO remove this when node and trader initialization is implemented
496        sqlx::query(
497            r#"
498            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
499        "#,
500        )
501        .bind(snapshot.trader_id.to_string())
502        .execute(&mut *transaction)
503        .await
504        .map(|_| ())
505        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
506
507        sqlx::query(r#"
508            INSERT INTO "position" (
509                id, trader_id, strategy_id, instrument_id, account_id, opening_order_id, closing_order_id, entry, side, signed_qty, quantity, peak_qty,
510                quote_currency, base_currency, settlement_currency, avg_px_open, avg_px_close, realized_return, realized_pnl, unrealized_pnl, commissions,
511                duration_ns, ts_opened, ts_closed, ts_init, ts_last, created_at, updated_at
512            ) VALUES (
513                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
514                $21, $22, $23, $24, $25, $26, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
515            )
516            ON CONFLICT (id)
517            DO UPDATE
518            SET
519                trader_id = $2, strategy_id = $3, instrument_id = $4, account_id = $5, opening_order_id = $6, closing_order_id = $7, entry = $8, side = $9, signed_qty = $10, quantity = $11,
520                peak_qty = $12, quote_currency = $13, base_currency = $14, settlement_currency = $15, avg_px_open = $16, avg_px_close = $17, realized_return = $18, realized_pnl = $19, unrealized_pnl = $20,
521                commissions = $21, duration_ns = $22, ts_opened = $23, ts_closed = $24, ts_init = $25, ts_last = $26, updated_at = CURRENT_TIMESTAMP
522        "#)
523            .bind(snapshot.position_id.to_string())
524            .bind(snapshot.trader_id.to_string())
525            .bind(snapshot.strategy_id.to_string())
526            .bind(snapshot.instrument_id.to_string())
527            .bind(snapshot.account_id.to_string())
528            .bind(snapshot.opening_order_id.to_string())
529            .bind(snapshot.closing_order_id.map(|x| x.to_string()))
530            .bind(snapshot.entry.to_string())
531            .bind(snapshot.side.to_string())
532            .bind(snapshot.signed_qty)
533            .bind(snapshot.quantity.to_string())
534            .bind(snapshot.peak_qty.to_string())
535            .bind(snapshot.quote_currency.to_string())
536            .bind(snapshot.base_currency.map(|x| x.to_string()))
537            .bind(snapshot.settlement_currency.to_string())
538            .bind(snapshot.avg_px_open)
539            .bind(snapshot.avg_px_close)
540            .bind(snapshot.realized_return)
541            .bind(snapshot.realized_pnl.map(|x| x.to_string()))
542            .bind(snapshot.unrealized_pnl.map(|x| x.to_string()))
543            .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
544            .bind(snapshot.duration_ns.map(|x| x.to_string()))
545            .bind(snapshot.ts_opened.to_string())
546            .bind(snapshot.ts_closed.map(|x| x.to_string()))
547            .bind(snapshot.ts_init.to_string())
548            .bind(snapshot.ts_last.to_string())
549            .execute(&mut *transaction)
550            .await
551            .map(|_| ())
552            .map_err(|e| anyhow::anyhow!("Failed to insert into position table: {e}"))?;
553        transaction
554            .commit()
555            .await
556            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
557    }
558
559    /// Loads a `PositionSnapshot` entry by `position_id` via the provided `pool`.
560    ///
561    /// # Errors
562    ///
563    /// Returns an error if the SQL SELECT or deserialization fails.
564    pub async fn load_position_snapshot(
565        pool: &PgPool,
566        position_id: &PositionId,
567    ) -> anyhow::Result<Option<PositionSnapshot>> {
568        sqlx::query_as::<_, PositionSnapshotModel>(r#"SELECT * FROM "position" WHERE id = $1"#)
569            .bind(position_id.to_string())
570            .fetch_optional(pool)
571            .await
572            .map(|model| model.map(|m| m.0))
573            .map_err(|e| anyhow::anyhow!("Failed to load position snapshot: {e}"))
574    }
575
576    /// Checks if an `OrderInitialized` event exists for the given `client_order_id` via the provided `pool`.
577    ///
578    /// # Errors
579    ///
580    /// Returns an error if the SQL SELECT operation fails.
581    pub async fn check_if_order_initialized_exists(
582        pool: &PgPool,
583        client_order_id: ClientOrderId,
584    ) -> anyhow::Result<bool> {
585        sqlx::query(r#"
586            SELECT EXISTS(SELECT 1 FROM "order_event" WHERE client_order_id = $1 AND kind = 'OrderInitialized')
587        "#)
588            .bind(client_order_id.to_string())
589            .fetch_one(pool)
590            .await
591            .map(|row| row.get(0))
592            .map_err(|e| anyhow::anyhow!("Failed to check if order initialized exists: {e}"))
593    }
594
595    /// Checks if any account event exists for the given `account_id` via the provided `pool`.
596    ///
597    /// # Errors
598    ///
599    /// Returns an error if the SQL SELECT operation fails.
600    pub async fn check_if_account_event_exists(
601        pool: &PgPool,
602        account_id: AccountId,
603    ) -> anyhow::Result<bool> {
604        sqlx::query(
605            r#"
606            SELECT EXISTS(SELECT 1 FROM "account_event" WHERE account_id = $1)
607        "#,
608        )
609        .bind(account_id.to_string())
610        .fetch_one(pool)
611        .await
612        .map(|row| row.get(0))
613        .map_err(|e| anyhow::anyhow!("Failed to check if account event exists: {e}"))
614    }
615
616    /// Inserts or updates an order event entry via the provided `pool`.
617    ///
618    /// # Errors
619    ///
620    /// Returns an error if the SQL INSERT or UPDATE operation fails.
621    pub async fn add_order_event(
622        pool: &PgPool,
623        order_event: Box<dyn OrderEvent>,
624        client_id: Option<ClientId>,
625    ) -> anyhow::Result<()> {
626        let mut transaction = pool.begin().await?;
627
628        // Insert trader if it does not exist
629        // TODO remove this when node and trader initialization is implemented
630        sqlx::query(
631            r#"
632            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
633        "#,
634        )
635        .bind(order_event.trader_id().to_string())
636        .execute(&mut *transaction)
637        .await
638        .map(|_| ())
639        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
640
641        // Insert client if it does not exist
642        // TODO remove this when client initialization is implemented
643        if let Some(client_id) = client_id {
644            sqlx::query(
645                r#"
646                INSERT INTO "client" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
647            "#,
648            )
649            .bind(client_id.to_string())
650            .execute(&mut *transaction)
651            .await
652            .map(|_| ())
653            .map_err(|e| anyhow::anyhow!("Failed to insert into client table: {e}"))?;
654        }
655
656        sqlx::query(r#"
657            INSERT INTO "order_event" (
658                id, kind, client_order_id, order_type, order_side, trader_id, client_id, reason, strategy_id, instrument_id, trade_id, currency, quantity, time_in_force, liquidity_side,
659                post_only, reduce_only, quote_quantity, reconciliation, price, last_px, last_qty, trigger_price, trigger_type, limit_offset, trailing_offset,
660                trailing_offset_type, expire_time, display_qty, emulation_trigger, trigger_instrument_id, contingency_type,
661                order_list_id, linked_order_ids, parent_order_id,
662                exec_algorithm_id, exec_spawn_id, venue_order_id, account_id, position_id, commission, ts_event, ts_init, created_at, updated_at
663            ) VALUES (
664                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
665                $21, $22, $23, $24, $25, $26::trailing_offset_type, $27, $28, $29, $30, $31, $32, $33, $34,
666                $35, $36, $37, $38, $39, $40, $41, $42, $43, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
667            )
668            ON CONFLICT (id)
669            DO UPDATE
670            SET
671                kind = $2, client_order_id = $3, order_type = $4, order_side=$5, trader_id = $6, client_id = $7, reason = $8, strategy_id = $9, instrument_id = $10, trade_id = $11, currency = $12,
672                quantity = $13, time_in_force = $14, liquidity_side = $15, post_only = $16, reduce_only = $17, quote_quantity = $18, reconciliation = $19, price = $20, last_px = $21,
673                last_qty = $22, trigger_price = $23, trigger_type = $24, limit_offset = $25, trailing_offset = $26, trailing_offset_type = $27, expire_time = $28, display_qty = $29,
674                emulation_trigger = $30, trigger_instrument_id = $31, contingency_type = $32, order_list_id = $33, linked_order_ids = $34, parent_order_id = $35, exec_algorithm_id = $36,
675                exec_spawn_id = $37, venue_order_id = $38, account_id = $39, position_id = $40, commission = $41, ts_event = $42, ts_init = $43, updated_at = CURRENT_TIMESTAMP
676
677        "#)
678            .bind(order_event.id().to_string())
679            .bind(order_event.kind())
680            .bind(order_event.client_order_id().to_string())
681            .bind(order_event.order_type().map(|x| x.to_string()))
682            .bind(order_event.order_side().map(|x| x.to_string()))
683            .bind(order_event.trader_id().to_string())
684            .bind(client_id.map(|x| x.to_string()))
685            .bind(order_event.reason().map(|x| x.to_string()))
686            .bind(order_event.strategy_id().to_string())
687            .bind(order_event.instrument_id().to_string())
688            .bind(order_event.trade_id().map(|x| x.to_string()))
689            .bind(order_event.currency().map(|x| x.code.as_str()))
690            .bind(order_event.quantity().map(|x| x.to_string()))
691            .bind(order_event.time_in_force().map(|x| x.to_string()))
692            .bind(order_event.liquidity_side().map(|x| x.to_string()))
693            .bind(order_event.post_only())
694            .bind(order_event.reduce_only())
695            .bind(order_event.quote_quantity())
696            .bind(order_event.reconciliation())
697            .bind(order_event.price().map(|x| x.to_string()))
698            .bind(order_event.last_px().map(|x| x.to_string()))
699            .bind(order_event.last_qty().map(|x| x.to_string()))
700            .bind(order_event.trigger_price().map(|x| x.to_string()))
701            .bind(order_event.trigger_type().map(|x| x.to_string()))
702            .bind(order_event.limit_offset().map(|x| x.to_string()))
703            .bind(order_event.trailing_offset().map(|x| x.to_string()))
704            .bind(order_event.trailing_offset_type().map(TrailingOffsetTypeModel))
705            .bind(order_event.expire_time().map(|x| x.to_string()))
706            .bind(order_event.display_qty().map(|x| x.to_string()))
707            .bind(order_event.emulation_trigger().map(|x| x.to_string()))
708            .bind(order_event.trigger_instrument_id().map(|x| x.to_string()))
709            .bind(order_event.contingency_type().map(|x| x.to_string()))
710            .bind(order_event.order_list_id().map(|x| x.to_string()))
711            .bind(order_event.linked_order_ids().map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
712            .bind(order_event.parent_order_id().map(|x| x.to_string()))
713            .bind(order_event.exec_algorithm_id().map(|x| x.to_string()))
714            .bind(order_event.exec_spawn_id().map(|x| x.to_string()))
715            .bind(order_event.venue_order_id().map(|x| x.to_string()))
716            .bind(order_event.account_id().map(|x| x.to_string()))
717            .bind(order_event.position_id().map(|x| x.to_string()))
718            .bind(order_event.commission().map(|x| x.to_string()))
719            .bind(order_event.ts_event().to_string())
720            .bind(order_event.ts_init().to_string())
721            .execute(&mut *transaction)
722            .await
723            .map(|_| ())
724            .map_err(|e| anyhow::anyhow!("Failed to insert into order_event table: {e}"))?;
725        transaction
726            .commit()
727            .await
728            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
729    }
730
731    /// Loads all order events for a `client_order_id` via the provided `pool`.
732    ///
733    /// # Errors
734    ///
735    /// Returns an error if the SQL SELECT or deserialization fails.
736    pub async fn load_order_events(
737        pool: &PgPool,
738        client_order_id: &ClientOrderId,
739    ) -> anyhow::Result<Vec<OrderEventAny>> {
740        sqlx::query_as::<_, OrderEventAnyModel>(r#"SELECT * FROM "order_event" event WHERE event.client_order_id = $1 ORDER BY created_at ASC"#)
741        .bind(client_order_id.to_string())
742        .fetch_all(pool)
743        .await
744        .map(|rows| rows.into_iter().map(|row| row.0).collect())
745        .map_err(|e| anyhow::anyhow!("Failed to load order events: {e}"))
746    }
747
748    /// Loads and assembles a complete `OrderAny` for a `client_order_id` via the provided `pool`.
749    ///
750    /// # Errors
751    ///
752    /// Returns an error if assembling events or SQL operations fail.
753    ///
754    /// # Panics
755    ///
756    /// Panics if assembling the order from events fails.
757    pub async fn load_order(
758        pool: &PgPool,
759        client_order_id: &ClientOrderId,
760    ) -> anyhow::Result<Option<OrderAny>> {
761        let order_events = Self::load_order_events(pool, client_order_id).await;
762
763        match order_events {
764            Ok(order_events) => {
765                if order_events.is_empty() {
766                    return Ok(None);
767                }
768                let order = OrderAny::from_events(order_events).unwrap();
769                Ok(Some(order))
770            }
771            Err(e) => anyhow::bail!("Failed to load order events: {e}"),
772        }
773    }
774
775    /// Loads and assembles all `OrderAny` entries via the provided `pool`.
776    ///
777    /// # Errors
778    ///
779    /// Returns an error if loading events or SQL operations fail.
780    ///
781    /// # Panics
782    ///
783    /// Panics if loading or assembling any individual order fails.
784    pub async fn load_orders(pool: &PgPool) -> anyhow::Result<Vec<OrderAny>> {
785        let mut orders: Vec<OrderAny> = Vec::new();
786        let client_order_ids: Vec<ClientOrderId> = sqlx::query(
787            r#"
788            SELECT DISTINCT client_order_id FROM "order_event"
789        "#,
790        )
791        .fetch_all(pool)
792        .await
793        .map(|rows| {
794            rows.into_iter()
795                .map(|row| ClientOrderId::from(row.get::<&str, _>(0)))
796                .collect()
797        })
798        .map_err(|e| anyhow::anyhow!("Failed to load order ids: {e}"))?;
799        for id in client_order_ids {
800            let order = Self::load_order(pool, &id).await.unwrap();
801            match order {
802                Some(order) => {
803                    orders.push(order);
804                }
805                None => {
806                    continue;
807                }
808            }
809        }
810        Ok(orders)
811    }
812
813    /// Inserts or updates an `AccountAny` entry via the provided `pool`.
814    ///
815    /// # Errors
816    ///
817    /// Returns an error if the SQL INSERT or UPDATE operation fails.
818    ///
819    /// # Panics
820    ///
821    /// Panics if checking for existing account event unwrap fails.
822    pub async fn add_account(
823        pool: &PgPool,
824        kind: &str,
825        updated: bool,
826        account: Box<dyn Account>,
827    ) -> anyhow::Result<()> {
828        if updated {
829            let exists = Self::check_if_account_event_exists(pool, account.id())
830                .await
831                .unwrap();
832            assert!(
833                exists,
834                "Account event does not exist for account: {}",
835                account.id()
836            );
837        }
838
839        let mut transaction = pool.begin().await?;
840
841        sqlx::query(
842            r#"
843            INSERT INTO "account" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
844        "#,
845        )
846        .bind(account.id().to_string())
847        .execute(&mut *transaction)
848        .await
849        .map(|_| ())
850        .map_err(|e| anyhow::anyhow!("Failed to insert into account table: {e}"))?;
851
852        let account_event = account.last_event().unwrap();
853        sqlx::query(r#"
854            INSERT INTO "account_event" (
855                id, kind, account_id, base_currency, balances, margins, is_reported, ts_event, ts_init, created_at, updated_at
856            ) VALUES (
857                $1, $2, $3, $4, $5, $6, $7, $8, $9, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
858            )
859            ON CONFLICT (id)
860            DO UPDATE
861            SET
862                kind = $2, account_id = $3, base_currency = $4, balances = $5, margins = $6, is_reported = $7,
863                ts_event = $8, ts_init = $9, updated_at = CURRENT_TIMESTAMP
864        "#)
865            .bind(account_event.event_id.to_string())
866            .bind(kind.to_string())
867            .bind(account_event.account_id.to_string())
868            .bind(account_event.base_currency.map(|x| x.code.as_str()))
869            .bind(serde_json::to_value::<Vec<AccountBalance>>(account_event.balances).unwrap())
870            .bind(serde_json::to_value::<Vec<MarginBalance>>(account_event.margins).unwrap())
871            .bind(account_event.is_reported)
872            .bind(account_event.ts_event.to_string())
873            .bind(account_event.ts_init.to_string())
874            .execute(&mut *transaction)
875            .await
876            .map(|_| ())
877            .map_err(|e| anyhow::anyhow!("Failed to insert into account_event table: {e}"))?;
878        transaction
879            .commit()
880            .await
881            .map_err(|e| anyhow::anyhow!("Failed to commit add_account transaction: {e}"))
882    }
883
884    /// Loads all account events for `account_id` via the provided `pool`.
885    ///
886    /// # Errors
887    ///
888    /// Returns an error if the SQL SELECT or deserialization fails.
889    pub async fn load_account_events(
890        pool: &PgPool,
891        account_id: &AccountId,
892    ) -> anyhow::Result<Vec<AccountState>> {
893        sqlx::query_as::<_, AccountEventModel>(
894            r#"SELECT * FROM "account_event" WHERE account_id = $1 ORDER BY created_at ASC"#,
895        )
896        .bind(account_id.to_string())
897        .fetch_all(pool)
898        .await
899        .map(|rows| rows.into_iter().map(|row| row.0).collect())
900        .map_err(|e| anyhow::anyhow!("Failed to load account events: {e}"))
901    }
902
903    /// Loads and assembles a complete `AccountAny` for `account_id` via the provided `pool`.
904    ///
905    /// # Errors
906    ///
907    /// Returns an error if assembling events or SQL operations fail.
908    ///
909    /// # Panics
910    ///
911    /// Panics if assembling the account from events fails.
912    pub async fn load_account(
913        pool: &PgPool,
914        account_id: &AccountId,
915    ) -> anyhow::Result<Option<AccountAny>> {
916        let account_events = Self::load_account_events(pool, account_id).await;
917        match account_events {
918            Ok(account_events) => {
919                if account_events.is_empty() {
920                    return Ok(None);
921                }
922                let account = AccountAny::from_events(account_events).unwrap();
923                Ok(Some(account))
924            }
925            Err(e) => anyhow::bail!("Failed to load account events: {e}"),
926        }
927    }
928
929    /// Loads and assembles all `AccountAny` entries via the provided `pool`.
930    ///
931    /// # Errors
932    ///
933    /// Returns an error if loading events or SQL operations fail.
934    ///
935    /// # Panics
936    ///
937    /// Panics if loading or assembling any individual account fails.
938    pub async fn load_accounts(pool: &PgPool) -> anyhow::Result<Vec<AccountAny>> {
939        let mut accounts: Vec<AccountAny> = Vec::new();
940        let account_ids: Vec<AccountId> = sqlx::query(
941            r#"
942            SELECT DISTINCT account_id FROM "account_event"
943        "#,
944        )
945        .fetch_all(pool)
946        .await
947        .map(|rows| {
948            rows.into_iter()
949                .map(|row| AccountId::from(row.get::<&str, _>(0)))
950                .collect()
951        })
952        .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
953        for id in account_ids {
954            let account = Self::load_account(pool, &id).await.unwrap();
955            match account {
956                Some(account) => {
957                    accounts.push(account);
958                }
959                None => {
960                    continue;
961                }
962            }
963        }
964        Ok(accounts)
965    }
966
967    /// Inserts a `TradeTick` entry via the provided `pool`.
968    ///
969    /// # Errors
970    ///
971    /// Returns an error if the SQL INSERT operation fails.
972    pub async fn add_trade(pool: &PgPool, trade: &TradeTick) -> anyhow::Result<()> {
973        sqlx::query(r#"
974            INSERT INTO "trade" (
975                instrument_id, price, quantity, aggressor_side, venue_trade_id,
976                ts_event, ts_init, created_at, updated_at
977            ) VALUES (
978                $1, $2, $3, $4::aggressor_side, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
979            )
980            ON CONFLICT (id)
981            DO UPDATE
982            SET
983                instrument_id = $1, price = $2, quantity = $3, aggressor_side = $4, venue_trade_id = $5,
984                ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
985        "#)
986            .bind(trade.instrument_id.to_string())
987            .bind(trade.price.to_string())
988            .bind(trade.size.to_string())
989            .bind(AggressorSideModel(trade.aggressor_side))
990            .bind(trade.trade_id.to_string())
991            .bind(trade.ts_event.to_string())
992            .bind(trade.ts_init.to_string())
993            .execute(pool)
994            .await
995            .map(|_| ())
996            .map_err(|e| anyhow::anyhow!("Failed to insert into trade table: {e}"))
997    }
998
999    /// Loads all `TradeTick` entries for `instrument_id` via the provided `pool`.
1000    ///
1001    /// # Errors
1002    ///
1003    /// Returns an error if the SQL SELECT or deserialization fails.
1004    pub async fn load_trades(
1005        pool: &PgPool,
1006        instrument_id: &InstrumentId,
1007    ) -> anyhow::Result<Vec<TradeTick>> {
1008        sqlx::query_as::<_, TradeTickModel>(
1009            r#"SELECT * FROM "trade" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1010        )
1011        .bind(instrument_id.to_string())
1012        .fetch_all(pool)
1013        .await
1014        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1015        .map_err(|e| anyhow::anyhow!("Failed to load trades: {e}"))
1016    }
1017
1018    /// Inserts a `QuoteTick` entry via the provided `pool`.
1019    ///
1020    /// # Errors
1021    ///
1022    /// Returns an error if the SQL INSERT operation fails.
1023    pub async fn add_quote(pool: &PgPool, quote: &QuoteTick) -> anyhow::Result<()> {
1024        sqlx::query(r#"
1025            INSERT INTO "quote" (
1026                instrument_id, bid_price, ask_price, bid_size, ask_size, ts_event, ts_init, created_at, updated_at
1027            ) VALUES (
1028                $1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1029            )
1030            ON CONFLICT (id)
1031            DO UPDATE
1032            SET
1033                instrument_id = $1, bid_price = $2, ask_price = $3, bid_size = $4, ask_size = $5,
1034                ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
1035        "#)
1036            .bind(quote.instrument_id.to_string())
1037            .bind(quote.bid_price.to_string())
1038            .bind(quote.ask_price.to_string())
1039            .bind(quote.bid_size.to_string())
1040            .bind(quote.ask_size.to_string())
1041            .bind(quote.ts_event.to_string())
1042            .bind(quote.ts_init.to_string())
1043            .execute(pool)
1044            .await
1045            .map(|_| ())
1046            .map_err(|e| anyhow::anyhow!("Failed to insert into quote table: {e}"))
1047    }
1048
1049    /// Loads all `QuoteTick` entries for `instrument_id` via the provided `pool`.
1050    ///
1051    /// # Errors
1052    ///
1053    /// Returns an error if the SQL SELECT or deserialization fails.
1054    pub async fn load_quotes(
1055        pool: &PgPool,
1056        instrument_id: &InstrumentId,
1057    ) -> anyhow::Result<Vec<QuoteTick>> {
1058        sqlx::query_as::<_, QuoteTickModel>(
1059            r#"SELECT * FROM "quote" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1060        )
1061        .bind(instrument_id.to_string())
1062        .fetch_all(pool)
1063        .await
1064        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1065        .map_err(|e| anyhow::anyhow!("Failed to load quotes: {e}"))
1066    }
1067
1068    /// Inserts a `Bar` entry via the provided `pool`.
1069    ///
1070    /// # Errors
1071    ///
1072    /// Returns an error if the SQL INSERT operation fails.
1073    pub async fn add_bar(pool: &PgPool, bar: &Bar) -> anyhow::Result<()> {
1074        println!("Adding bar: {bar:?}");
1075        sqlx::query(r#"
1076            INSERT INTO "bar" (
1077                instrument_id, step, bar_aggregation, price_type, aggregation_source, open, high, low, close, volume, ts_event, ts_init, created_at, updated_at
1078            ) VALUES (
1079                $1, $2, $3::bar_aggregation, $4::price_type, $5::aggregation_source, $6, $7, $8, $9, $10, $11, $12, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1080            )
1081            ON CONFLICT (id)
1082            DO UPDATE
1083            SET
1084                instrument_id = $1, step = $2, bar_aggregation = $3::bar_aggregation, price_type = $4::price_type, aggregation_source = $5::aggregation_source,
1085                open = $6, high = $7, low = $8, close = $9, volume = $10, ts_event = $11, ts_init = $12, updated_at = CURRENT_TIMESTAMP
1086        "#)
1087            .bind(bar.bar_type.instrument_id().to_string())
1088            .bind(bar.bar_type.spec().step.get() as i32)
1089            .bind(BarAggregationModel(bar.bar_type.spec().aggregation))
1090            .bind(PriceTypeModel(bar.bar_type.spec().price_type))
1091            .bind(AggregationSourceModel(bar.bar_type.aggregation_source()))
1092            .bind(bar.open.to_string())
1093            .bind(bar.high.to_string())
1094            .bind(bar.low.to_string())
1095            .bind(bar.close.to_string())
1096            .bind(bar.volume.to_string())
1097            .bind(bar.ts_event.to_string())
1098            .bind(bar.ts_init.to_string())
1099            .execute(pool)
1100            .await
1101            .map(|_| ())
1102            .map_err(|e| anyhow::anyhow!("Failed to insert into bar table: {e}"))
1103    }
1104
1105    /// Loads all `Bar` entries for `instrument_id` via the provided `pool`.
1106    ///
1107    /// # Errors
1108    ///
1109    /// Returns an error if the SQL SELECT or deserialization fails.
1110    pub async fn load_bars(
1111        pool: &PgPool,
1112        instrument_id: &InstrumentId,
1113    ) -> anyhow::Result<Vec<Bar>> {
1114        sqlx::query_as::<_, BarModel>(
1115            r#"SELECT * FROM "bar" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1116        )
1117        .bind(instrument_id.to_string())
1118        .fetch_all(pool)
1119        .await
1120        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1121        .map_err(|e| anyhow::anyhow!("Failed to load bars: {e}"))
1122    }
1123
1124    /// Loads all distinct client order IDs from order events via the provided `pool`.
1125    ///
1126    /// # Errors
1127    ///
1128    /// Returns an error if the SQL SELECT or iteration fails.
1129    pub async fn load_distinct_order_event_client_ids(
1130        pool: &PgPool,
1131    ) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
1132        let mut map: AHashMap<ClientOrderId, ClientId> = AHashMap::new();
1133        let result = sqlx::query_as::<_, OrderEventOrderClientIdCombination>(
1134            r#"
1135            SELECT DISTINCT
1136                client_order_id AS "client_order_id",
1137                client_id AS "client_id"
1138            FROM "order_event"
1139        "#,
1140        )
1141        .fetch_all(pool)
1142        .await
1143        .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
1144        for id in result {
1145            map.insert(id.client_order_id, id.client_id);
1146        }
1147        Ok(map)
1148    }
1149
1150    /// Inserts a `Signal` entry via the provided `pool`.
1151    ///
1152    /// # Errors
1153    ///
1154    /// Returns an error if the SQL INSERT operation fails.
1155    pub async fn add_signal(pool: &PgPool, signal: &Signal) -> anyhow::Result<()> {
1156        sqlx::query(
1157            r#"
1158            INSERT INTO "signal" (
1159                name, value, ts_event, ts_init, created_at, updated_at
1160            ) VALUES (
1161                $1, $2, $3, $4, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1162            )
1163            ON CONFLICT (id)
1164            DO UPDATE
1165            SET
1166                name = $1, value = $2, ts_event = $3, ts_init = $4,
1167                updated_at = CURRENT_TIMESTAMP
1168        "#,
1169        )
1170        .bind(signal.name.to_string())
1171        .bind(signal.value.to_string())
1172        .bind(signal.ts_event.to_string())
1173        .bind(signal.ts_init.to_string())
1174        .execute(pool)
1175        .await
1176        .map(|_| ())
1177        .map_err(|e| anyhow::anyhow!("Failed to insert into signal table: {e}"))
1178    }
1179
1180    /// Loads all `Signal` entries by `name` via the provided `pool`.
1181    ///
1182    /// # Errors
1183    ///
1184    /// Returns an error if the SQL SELECT or deserialization fails.
1185    pub async fn load_signals(pool: &PgPool, name: &str) -> anyhow::Result<Vec<Signal>> {
1186        sqlx::query_as::<_, SignalModel>(
1187            r#"SELECT * FROM "signal" WHERE name = $1 ORDER BY ts_init ASC"#,
1188        )
1189        .bind(name)
1190        .fetch_all(pool)
1191        .await
1192        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1193        .map_err(|e| anyhow::anyhow!("Failed to load signals: {e}"))
1194    }
1195
1196    /// Inserts a `CustomData` entry via the provided `pool`.
1197    ///
1198    /// # Errors
1199    ///
1200    /// Returns an error if the SQL INSERT operation fails.
1201    pub async fn add_custom_data(pool: &PgPool, data: &CustomData) -> anyhow::Result<()> {
1202        sqlx::query(
1203            r#"
1204            INSERT INTO "custom" (
1205                data_type, metadata, value, ts_event, ts_init, created_at, updated_at
1206            ) VALUES (
1207                $1, $2, $3, $4, $5, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1208            )
1209            ON CONFLICT (id)
1210            DO UPDATE
1211            SET
1212                data_type = $1, metadata = $2, value = $3, ts_event = $4, ts_init = $5,
1213                updated_at = CURRENT_TIMESTAMP
1214        "#,
1215        )
1216        .bind(data.data_type.type_name().to_string())
1217        .bind(
1218            data.data_type
1219                .metadata()
1220                .as_ref()
1221                .map_or_else(|| Ok(serde_json::Value::Null), serde_json::to_value)?,
1222        )
1223        .bind(data.value.to_vec())
1224        .bind(data.ts_event.to_string())
1225        .bind(data.ts_init.to_string())
1226        .execute(pool)
1227        .await
1228        .map(|_| ())
1229        .map_err(|e| anyhow::anyhow!("Failed to insert into custom table: {e}"))
1230    }
1231
1232    /// Loads all `CustomData` entries of `data_type` via the provided `pool`.
1233    ///
1234    /// # Errors
1235    ///
1236    /// Returns an error if the SQL SELECT or deserialization fails.
1237    pub async fn load_custom_data(
1238        pool: &PgPool,
1239        data_type: &DataType,
1240    ) -> anyhow::Result<Vec<CustomData>> {
1241        // TODO: This metadata JSON could be more efficient at some point
1242        let metadata_json = data_type
1243            .metadata()
1244            .as_ref()
1245            .map_or(Ok(serde_json::Value::Null), |metadata| {
1246                serde_json::to_value(metadata)
1247            })?;
1248
1249        sqlx::query_as::<_, CustomDataModel>(
1250            r#"SELECT * FROM "custom" WHERE data_type = $1 AND metadata = $2 ORDER BY ts_init ASC"#,
1251        )
1252        .bind(data_type.type_name())
1253        .bind(metadata_json)
1254        .fetch_all(pool)
1255        .await
1256        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1257        .map_err(|e| anyhow::anyhow!("Failed to load custom data: {e}"))
1258    }
1259}