1use std::sync::Arc;
28
29use nautilus_common::live::get_runtime;
30use nautilus_model::{
31 enums::{OrderSide, TimeInForce},
32 identifiers::InstrumentId,
33 types::{Price, Quantity},
34};
35
36use crate::{
37 error::DydxError,
38 execution::{
39 block_time::BlockTimeMonitor,
40 broadcaster::TxBroadcaster,
41 order_builder::OrderMessageBuilder,
42 tx_manager::TransactionManager,
43 types::{ConditionalOrderType, LimitOrderParams, OrderLifetime},
44 },
45 grpc::{DydxGrpcClient, types::ChainId},
46 http::client::DydxHttpClient,
47};
48
49#[derive(Debug)]
70pub struct OrderSubmitter {
71 tx_manager: Arc<TransactionManager>,
73 broadcaster: Arc<TxBroadcaster>,
75 order_builder: Arc<OrderMessageBuilder>,
77 block_time_monitor: Arc<BlockTimeMonitor>,
79}
80
81impl OrderSubmitter {
82 pub fn new(
98 grpc_client: DydxGrpcClient,
99 http_client: DydxHttpClient,
100 private_key: &str,
101 wallet_address: String,
102 subaccount_number: u32,
103 chain_id: ChainId,
104 block_time_monitor: Arc<BlockTimeMonitor>,
105 ) -> Result<Self, DydxError> {
106 let tx_manager = Arc::new(TransactionManager::new(
108 grpc_client.clone(),
109 private_key,
110 wallet_address.clone(),
111 chain_id,
112 )?);
113
114 let broadcaster = Arc::new(TxBroadcaster::new(grpc_client));
115
116 let order_builder = Arc::new(OrderMessageBuilder::new(
117 http_client,
118 wallet_address,
119 subaccount_number,
120 block_time_monitor.clone(),
121 ));
122
123 Ok(Self {
124 tx_manager,
125 broadcaster,
126 order_builder,
127 block_time_monitor,
128 })
129 }
130
131 pub fn from_components(
135 tx_manager: Arc<TransactionManager>,
136 broadcaster: Arc<TxBroadcaster>,
137 order_builder: Arc<OrderMessageBuilder>,
138 block_time_monitor: Arc<BlockTimeMonitor>,
139 ) -> Self {
140 Self {
141 tx_manager,
142 broadcaster,
143 order_builder,
144 block_time_monitor,
145 }
146 }
147
148 #[must_use]
150 pub fn current_block_height(&self) -> u32 {
151 self.block_time_monitor.current_block_height() as u32
152 }
153
154 #[must_use]
156 pub fn block_time_monitor(&self) -> &BlockTimeMonitor {
157 &self.block_time_monitor
158 }
159
160 #[must_use]
162 pub fn wallet_address(&self) -> &str {
163 self.tx_manager.wallet_address()
164 }
165
166 #[must_use]
168 pub fn order_builder(&self) -> &OrderMessageBuilder {
169 &self.order_builder
170 }
171
172 #[must_use]
174 pub fn tx_manager(&self) -> &TransactionManager {
175 &self.tx_manager
176 }
177
178 pub async fn submit_market_order(
191 &self,
192 instrument_id: InstrumentId,
193 client_order_id: u32,
194 client_metadata: u32,
195 side: OrderSide,
196 quantity: Quantity,
197 ) -> Result<String, DydxError> {
198 log::info!(
199 "Submitting market order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, quantity={quantity}"
200 );
201
202 let block_height = self.current_block_height();
203
204 let msg = self.order_builder.build_market_order(
206 instrument_id,
207 client_order_id,
208 client_metadata,
209 side,
210 quantity,
211 block_height,
212 )?;
213
214 let operation = format!("Submit market order {client_order_id}");
216 let tx_hash = self
217 .broadcaster
218 .broadcast_short_term(&self.tx_manager, vec![msg], &operation)
219 .await?;
220
221 Ok(tx_hash)
222 }
223
224 #[allow(clippy::too_many_arguments)]
237 pub async fn submit_limit_order(
238 &self,
239 instrument_id: InstrumentId,
240 client_order_id: u32,
241 client_metadata: u32,
242 side: OrderSide,
243 price: Price,
244 quantity: Quantity,
245 time_in_force: TimeInForce,
246 post_only: bool,
247 reduce_only: bool,
248 expire_time: Option<i64>,
249 ) -> Result<String, DydxError> {
250 log::info!(
251 "Submitting limit order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, price={price}, \
252 quantity={quantity}, tif={time_in_force:?}, post_only={post_only}, reduce_only={reduce_only}"
253 );
254
255 let block_height = self.current_block_height();
256
257 let msg = self.order_builder.build_limit_order(
259 instrument_id,
260 client_order_id,
261 client_metadata,
262 side,
263 price,
264 quantity,
265 time_in_force,
266 post_only,
267 reduce_only,
268 block_height,
269 expire_time,
270 )?;
271
272 let is_short_term = OrderLifetime::from_time_in_force(
274 time_in_force,
275 expire_time,
276 false,
277 self.order_builder.max_short_term_secs(),
278 )
279 .is_short_term();
280
281 let operation = format!("Submit limit order {client_order_id}");
283 let tx_hash = if is_short_term {
284 self.broadcaster
285 .broadcast_short_term(&self.tx_manager, vec![msg], &operation)
286 .await?
287 } else {
288 self.broadcaster
289 .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
290 .await?
291 };
292
293 Ok(tx_hash)
294 }
295
296 pub async fn submit_limit_orders_batch(
312 &self,
313 orders: Vec<LimitOrderParams>,
314 ) -> Result<Vec<String>, DydxError> {
315 if orders.is_empty() {
316 return Ok(Vec::new());
317 }
318
319 let block_height = self.current_block_height();
320
321 let has_short_term = orders
323 .iter()
324 .any(|params| self.order_builder.is_short_term_order(params));
325
326 if has_short_term {
327 log::debug!(
331 "Submitting {} short-term limit orders concurrently (sequence not consumed)",
332 orders.len()
333 );
334
335 let mut tx_hashes = Vec::with_capacity(orders.len());
336 let mut handles = Vec::with_capacity(orders.len());
337
338 for params in orders {
339 let tx_manager = Arc::clone(&self.tx_manager);
340 let broadcaster = Arc::clone(&self.broadcaster);
341 let order_builder = Arc::clone(&self.order_builder);
342
343 let handle = get_runtime().spawn(async move {
344 let msg = order_builder.build_limit_order_from_params(¶ms, block_height)?;
345 let operation = format!("Submit short-term order {}", params.client_order_id);
346 broadcaster
347 .broadcast_short_term(&tx_manager, vec![msg], &operation)
348 .await
349 });
350
351 handles.push(handle);
352 }
353
354 for handle in handles {
356 match handle.await {
357 Ok(Ok(tx_hash)) => tx_hashes.push(tx_hash),
358 Ok(Err(e)) => return Err(e),
359 Err(e) => {
360 return Err(DydxError::Nautilus(anyhow::anyhow!("Task join error: {e}")));
361 }
362 }
363 }
364
365 Ok(tx_hashes)
366 } else {
367 log::info!(
369 "Batch submitting {} long-term limit orders in single transaction",
370 orders.len()
371 );
372
373 let msgs = self
374 .order_builder
375 .build_limit_orders_batch(&orders, block_height)?;
376
377 let operation = format!("Submit batch of {} limit orders", msgs.len());
378 let tx_hash = self
379 .broadcaster
380 .broadcast_with_retry(&self.tx_manager, msgs, &operation)
381 .await?;
382
383 Ok(vec![tx_hash])
384 }
385 }
386
387 pub async fn cancel_order(
399 &self,
400 instrument_id: InstrumentId,
401 client_order_id: u32,
402 time_in_force: TimeInForce,
403 expire_time_ns: Option<nautilus_core::UnixNanos>,
404 ) -> Result<String, DydxError> {
405 log::info!("Cancelling order: client_id={client_order_id}, instrument={instrument_id}");
406
407 let block_height = self.current_block_height();
408
409 let msg = self.order_builder.build_cancel_order(
411 instrument_id,
412 client_order_id,
413 time_in_force,
414 expire_time_ns,
415 block_height,
416 )?;
417
418 let is_short_term = self
420 .order_builder
421 .is_short_term_cancel(time_in_force, expire_time_ns);
422
423 let operation = format!("Cancel order {client_order_id}");
425 let tx_hash = if is_short_term {
426 self.broadcaster
427 .broadcast_short_term(&self.tx_manager, vec![msg], &operation)
428 .await?
429 } else {
430 self.broadcaster
431 .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
432 .await?
433 };
434
435 Ok(tx_hash)
436 }
437
438 pub async fn cancel_orders_batch(
454 &self,
455 orders: &[(
456 InstrumentId,
457 u32,
458 TimeInForce,
459 Option<nautilus_core::UnixNanos>,
460 )],
461 ) -> Result<String, DydxError> {
462 if orders.is_empty() {
463 return Err(DydxError::Order("No orders to cancel".to_string()));
464 }
465
466 log::info!(
467 "Batch cancelling {} orders in single transaction",
468 orders.len()
469 );
470
471 let block_height = self.current_block_height();
472
473 let msgs = self
475 .order_builder
476 .build_cancel_orders_batch(orders, block_height)?;
477
478 let operation = format!("Cancel batch of {} orders", msgs.len());
480 let tx_hash = self
481 .broadcaster
482 .broadcast_with_retry(&self.tx_manager, msgs, &operation)
483 .await?;
484
485 Ok(tx_hash)
486 }
487
488 #[allow(clippy::too_many_arguments)]
500 pub async fn submit_stop_market_order(
501 &self,
502 instrument_id: InstrumentId,
503 client_order_id: u32,
504 client_metadata: u32,
505 side: OrderSide,
506 trigger_price: Price,
507 quantity: Quantity,
508 reduce_only: bool,
509 expire_time: Option<i64>,
510 ) -> Result<String, DydxError> {
511 log::info!(
512 "Submitting stop market order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
513 trigger={trigger_price}, qty={quantity}"
514 );
515
516 let msg = self.order_builder.build_stop_market_order(
518 instrument_id,
519 client_order_id,
520 client_metadata,
521 side,
522 trigger_price,
523 quantity,
524 reduce_only,
525 expire_time,
526 )?;
527
528 let operation = format!("Submit stop market order {client_order_id}");
530 let tx_hash = self
531 .broadcaster
532 .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
533 .await?;
534
535 Ok(tx_hash)
536 }
537
538 #[allow(clippy::too_many_arguments)]
551 pub async fn submit_stop_limit_order(
552 &self,
553 instrument_id: InstrumentId,
554 client_order_id: u32,
555 client_metadata: u32,
556 side: OrderSide,
557 trigger_price: Price,
558 limit_price: Price,
559 quantity: Quantity,
560 time_in_force: TimeInForce,
561 post_only: bool,
562 reduce_only: bool,
563 expire_time: Option<i64>,
564 ) -> Result<String, DydxError> {
565 log::info!(
566 "Submitting stop limit order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
567 trigger={trigger_price}, limit={limit_price}, qty={quantity}"
568 );
569
570 let msg = self.order_builder.build_stop_limit_order(
572 instrument_id,
573 client_order_id,
574 client_metadata,
575 side,
576 trigger_price,
577 limit_price,
578 quantity,
579 time_in_force,
580 post_only,
581 reduce_only,
582 expire_time,
583 )?;
584
585 let operation = format!("Submit stop limit order {client_order_id}");
587 let tx_hash = self
588 .broadcaster
589 .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
590 .await?;
591
592 Ok(tx_hash)
593 }
594
595 #[allow(clippy::too_many_arguments)]
608 pub async fn submit_take_profit_market_order(
609 &self,
610 instrument_id: InstrumentId,
611 client_order_id: u32,
612 client_metadata: u32,
613 side: OrderSide,
614 trigger_price: Price,
615 quantity: Quantity,
616 reduce_only: bool,
617 expire_time: Option<i64>,
618 ) -> Result<String, DydxError> {
619 log::info!(
620 "Submitting take profit market order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
621 trigger={trigger_price}, qty={quantity}"
622 );
623
624 let msg = self.order_builder.build_take_profit_market_order(
626 instrument_id,
627 client_order_id,
628 client_metadata,
629 side,
630 trigger_price,
631 quantity,
632 reduce_only,
633 expire_time,
634 )?;
635
636 let operation = format!("Submit take profit market order {client_order_id}");
638 let tx_hash = self
639 .broadcaster
640 .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
641 .await?;
642
643 Ok(tx_hash)
644 }
645
646 #[allow(clippy::too_many_arguments)]
659 pub async fn submit_take_profit_limit_order(
660 &self,
661 instrument_id: InstrumentId,
662 client_order_id: u32,
663 client_metadata: u32,
664 side: OrderSide,
665 trigger_price: Price,
666 limit_price: Price,
667 quantity: Quantity,
668 time_in_force: TimeInForce,
669 post_only: bool,
670 reduce_only: bool,
671 expire_time: Option<i64>,
672 ) -> Result<String, DydxError> {
673 log::info!(
674 "Submitting take profit limit order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
675 trigger={trigger_price}, limit={limit_price}, qty={quantity}"
676 );
677
678 let msg = self.order_builder.build_take_profit_limit_order(
680 instrument_id,
681 client_order_id,
682 client_metadata,
683 side,
684 trigger_price,
685 limit_price,
686 quantity,
687 time_in_force,
688 post_only,
689 reduce_only,
690 expire_time,
691 )?;
692
693 let operation = format!("Submit take profit limit order {client_order_id}");
695 let tx_hash = self
696 .broadcaster
697 .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
698 .await?;
699
700 Ok(tx_hash)
701 }
702
703 #[allow(clippy::too_many_arguments)]
716 pub async fn submit_conditional_order(
717 &self,
718 instrument_id: InstrumentId,
719 client_order_id: u32,
720 client_metadata: u32,
721 order_type: ConditionalOrderType,
722 side: OrderSide,
723 trigger_price: Price,
724 limit_price: Option<Price>,
725 quantity: Quantity,
726 time_in_force: Option<TimeInForce>,
727 post_only: bool,
728 reduce_only: bool,
729 expire_time: Option<i64>,
730 ) -> Result<String, DydxError> {
731 let msg = self.order_builder.build_conditional_order(
733 instrument_id,
734 client_order_id,
735 client_metadata,
736 order_type,
737 side,
738 trigger_price,
739 limit_price,
740 quantity,
741 time_in_force,
742 post_only,
743 reduce_only,
744 expire_time,
745 )?;
746
747 let operation = format!("Submit {order_type:?} order {client_order_id}");
749 let tx_hash = self
750 .broadcaster
751 .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
752 .await?;
753
754 Ok(tx_hash)
755 }
756}