1use std::{
34 ops::{Deref, DerefMut},
35 time::Duration,
36};
37
38use ahash::AHashMap;
39use nautilus_common::{
40 actor::{DataActor, DataActorCore},
41 timer::TimeEvent,
42};
43use nautilus_model::{
44 enums::OrderType,
45 identifiers::ClientOrderId,
46 instruments::Instrument,
47 orders::{Order, OrderAny},
48 types::{Quantity, quantity::QuantityRaw},
49};
50use ustr::Ustr;
51
52use super::{ExecutionAlgorithm, ExecutionAlgorithmConfig, ExecutionAlgorithmCore};
53
54pub type TwapAlgorithmConfig = ExecutionAlgorithmConfig;
56
57#[derive(Debug)]
63pub struct TwapAlgorithm {
64 pub core: ExecutionAlgorithmCore,
66 scheduled_sizes: AHashMap<ClientOrderId, Vec<Quantity>>,
68}
69
70impl TwapAlgorithm {
71 #[must_use]
73 pub fn new(config: TwapAlgorithmConfig) -> Self {
74 Self {
75 core: ExecutionAlgorithmCore::new(config),
76 scheduled_sizes: AHashMap::new(),
77 }
78 }
79
80 fn complete_sequence(&mut self, primary_id: &ClientOrderId) {
82 let timer_name = primary_id.as_str();
83 if self.core.clock().timer_names().contains(&timer_name) {
84 self.core.clock().cancel_timer(timer_name);
85 }
86 self.scheduled_sizes.remove(primary_id);
87 log::info!("Completed TWAP execution for {primary_id}");
88 }
89}
90
91impl Deref for TwapAlgorithm {
92 type Target = DataActorCore;
93 fn deref(&self) -> &Self::Target {
94 &self.core.actor
95 }
96}
97
98impl DerefMut for TwapAlgorithm {
99 fn deref_mut(&mut self) -> &mut Self::Target {
100 &mut self.core.actor
101 }
102}
103
104impl DataActor for TwapAlgorithm {}
105
106impl ExecutionAlgorithm for TwapAlgorithm {
107 fn core_mut(&mut self) -> &mut ExecutionAlgorithmCore {
108 &mut self.core
109 }
110
111 fn on_order(&mut self, order: OrderAny) -> anyhow::Result<()> {
112 let primary_id = order.client_order_id();
113
114 if self.scheduled_sizes.contains_key(&primary_id) {
115 anyhow::bail!("Order {primary_id} already being executed");
116 }
117
118 log::info!("Received order for TWAP execution: {order:?}");
119
120 if order.order_type() != OrderType::Market {
122 log::error!(
123 "Cannot execute order: only implemented for market orders, order_type={:?}",
124 order.order_type()
125 );
126 return Ok(());
127 }
128
129 let instrument = {
130 let cache = self.core.cache();
131 cache.instrument(&order.instrument_id()).cloned()
132 };
133
134 let Some(instrument) = instrument else {
135 log::error!(
136 "Cannot execute order: instrument {} not found",
137 order.instrument_id()
138 );
139 return Ok(());
140 };
141
142 let Some(exec_params) = order.exec_algorithm_params() else {
143 log::error!(
144 "Cannot execute order: exec_algorithm_params not found for primary order {primary_id}"
145 );
146 return Ok(());
147 };
148
149 let Some(horizon_secs_str) = exec_params.get(&Ustr::from("horizon_secs")) else {
150 log::error!("Cannot execute order: horizon_secs not found in exec_algorithm_params");
151 return Ok(());
152 };
153
154 let horizon_secs: u64 = horizon_secs_str.parse().map_err(|e| {
155 log::error!("Cannot parse horizon_secs: {e}");
156 anyhow::anyhow!("Invalid horizon_secs")
157 })?;
158
159 let Some(interval_secs_str) = exec_params.get(&Ustr::from("interval_secs")) else {
160 log::error!("Cannot execute order: interval_secs not found in exec_algorithm_params");
161 return Ok(());
162 };
163
164 let interval_secs: u64 = interval_secs_str.parse().map_err(|e| {
165 log::error!("Cannot parse interval_secs: {e}");
166 anyhow::anyhow!("Invalid interval_secs")
167 })?;
168
169 if horizon_secs < interval_secs {
170 log::error!(
171 "Cannot execute order: horizon_secs={horizon_secs} was less than interval_secs={interval_secs}"
172 );
173 return Ok(());
174 }
175
176 let num_intervals = horizon_secs / interval_secs;
177 if num_intervals == 0 {
178 log::error!("Cannot execute order: num_intervals is 0");
179 return Ok(());
180 }
181
182 let total_qty = order.quantity();
183 let total_raw = total_qty.raw;
184 let precision = total_qty.precision;
185
186 let qty_per_interval_raw = total_raw / (num_intervals as QuantityRaw);
187 let qty_per_interval = Quantity::from_raw(qty_per_interval_raw, precision);
188
189 if qty_per_interval == total_qty || qty_per_interval < instrument.size_increment() {
190 log::warn!(
191 "Submitting for entire size: qty_per_interval={qty_per_interval}, order_quantity={total_qty}"
192 );
193 self.submit_order(order, None, None)?;
194 return Ok(());
195 }
196
197 if let Some(min_qty) = instrument.min_quantity()
198 && qty_per_interval < min_qty
199 {
200 log::warn!(
201 "Submitting for entire size: qty_per_interval={qty_per_interval} < min_quantity={min_qty}"
202 );
203 self.submit_order(order, None, None)?;
204 return Ok(());
205 }
206
207 let mut scheduled_sizes: Vec<Quantity> = vec![qty_per_interval; num_intervals as usize];
208
209 let scheduled_total = qty_per_interval_raw * (num_intervals as QuantityRaw);
211 let remainder_raw = total_raw - scheduled_total;
212 if remainder_raw > 0 {
213 let remainder = Quantity::from_raw(remainder_raw, total_qty.precision);
214 scheduled_sizes.push(remainder);
215 }
216
217 log::info!("Order execution size schedule: {scheduled_sizes:?}");
218
219 {
221 let cache_rc = self.core.cache_rc();
222 let mut cache = cache_rc.borrow_mut();
223 cache.add_order(order.clone(), None, None, false)?;
224 }
225
226 self.scheduled_sizes
227 .insert(primary_id, scheduled_sizes.clone());
228
229 let first_qty = self.scheduled_sizes.get_mut(&primary_id).unwrap().remove(0);
230 let is_single_slice = self
231 .scheduled_sizes
232 .get(&primary_id)
233 .is_some_and(|s| s.is_empty());
234
235 if is_single_slice {
237 self.submit_order(order, None, None)?;
238 self.complete_sequence(&primary_id);
239 return Ok(());
240 }
241
242 let tags = order.tags().map(|t| t.to_vec());
244 let time_in_force = order.time_in_force();
245 let reduce_only = order.is_reduce_only();
246 let mut order = order;
247 let spawned = self.spawn_market(
248 &mut order,
249 first_qty,
250 time_in_force,
251 reduce_only,
252 tags,
253 true,
254 );
255 self.submit_order(spawned.into(), None, None)?;
256
257 {
258 let cache_rc = self.core.cache_rc();
259 let mut cache = cache_rc.borrow_mut();
260 cache.update_order(&order)?;
261 }
262
263 self.core.clock().set_timer(
264 primary_id.as_str(),
265 Duration::from_secs(interval_secs),
266 None,
267 None,
268 None,
269 None,
270 None,
271 )?;
272
273 log::info!(
274 "Started TWAP execution for {primary_id}: horizon_secs={horizon_secs}, interval_secs={interval_secs}"
275 );
276
277 Ok(())
278 }
279
280 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
281 log::info!("Received time event: {event:?}");
282
283 let primary_id = ClientOrderId::new(event.name.as_str());
284
285 let primary = {
286 let cache = self.core.cache();
287 cache.order(&primary_id).cloned()
288 };
289
290 let Some(primary) = primary else {
291 log::error!("Cannot find primary order for exec_spawn_id={primary_id}");
292 return Ok(());
293 };
294
295 if primary.is_closed() {
296 self.complete_sequence(&primary_id);
297 return Ok(());
298 }
299
300 let Some(scheduled_sizes) = self.scheduled_sizes.get_mut(&primary_id) else {
301 log::error!("Cannot find scheduled sizes for exec_spawn_id={primary_id}");
302 return Ok(());
303 };
304
305 if scheduled_sizes.is_empty() {
306 log::warn!("No more size to execute for exec_spawn_id={primary_id}");
307 return Ok(());
308 }
309
310 let quantity = scheduled_sizes.remove(0);
311 let is_final_slice = scheduled_sizes.is_empty();
312
313 if is_final_slice {
315 self.submit_order(primary, None, None)?;
316 self.complete_sequence(&primary_id);
317 return Ok(());
318 }
319
320 let tags = primary.tags().map(|t| t.to_vec());
322 let time_in_force = primary.time_in_force();
323 let reduce_only = primary.is_reduce_only();
324 let mut primary = primary;
325 let spawned = self.spawn_market(
326 &mut primary,
327 quantity,
328 time_in_force,
329 reduce_only,
330 tags,
331 true,
332 );
333 self.submit_order(spawned.into(), None, None)?;
334
335 {
336 let cache_rc = self.core.cache_rc();
337 let mut cache = cache_rc.borrow_mut();
338 cache.update_order(&primary)?;
339 }
340
341 Ok(())
342 }
343
344 fn on_stop(&mut self) -> anyhow::Result<()> {
345 self.core.clock().cancel_timers();
346 Ok(())
347 }
348
349 fn on_reset(&mut self) -> anyhow::Result<()> {
350 self.core.reset();
351 self.scheduled_sizes.clear();
352 Ok(())
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use std::{cell::RefCell, rc::Rc};
359
360 use indexmap::IndexMap;
361 use nautilus_common::{
362 cache::Cache,
363 clock::{Clock, TestClock},
364 };
365 use nautilus_core::UUID4;
366 use nautilus_model::{
367 enums::{OrderSide, TimeInForce},
368 identifiers::{ExecAlgorithmId, InstrumentId, StrategyId, TraderId},
369 orders::{LimitOrder, MarketOrder},
370 types::Price,
371 };
372 use rstest::rstest;
373 use ustr::Ustr;
374
375 use super::*;
376
377 fn create_twap_algorithm() -> TwapAlgorithm {
378 let unique_id = format!("TWAP-{}", UUID4::new());
380 let config = TwapAlgorithmConfig {
381 exec_algorithm_id: Some(ExecAlgorithmId::new(&unique_id)),
382 ..Default::default()
383 };
384 TwapAlgorithm::new(config)
385 }
386
387 fn register_algorithm(algo: &mut TwapAlgorithm) {
388 use nautilus_common::timer::TimeEventCallback;
389
390 let trader_id = TraderId::from("TRADER-001");
391 let clock = Rc::new(RefCell::new(TestClock::new()));
392 let cache = Rc::new(RefCell::new(Cache::default()));
393
394 clock
396 .borrow_mut()
397 .register_default_handler(TimeEventCallback::Rust(std::sync::Arc::new(|_| {})));
398
399 algo.core.register(trader_id, clock, cache).unwrap();
400 }
401
402 fn add_instrument_to_cache(algo: &mut TwapAlgorithm) {
403 use nautilus_model::instruments::{InstrumentAny, stubs::crypto_perpetual_ethusdt};
404
405 let instrument = crypto_perpetual_ethusdt();
406 let cache_rc = algo.core.cache_rc();
407 let mut cache = cache_rc.borrow_mut();
408 cache
409 .add_instrument(InstrumentAny::CryptoPerpetual(instrument))
410 .unwrap();
411 }
412
413 fn create_market_order_with_params(params: IndexMap<Ustr, Ustr>) -> OrderAny {
414 create_market_order_with_params_and_qty(params, Quantity::from("1.0"))
415 }
416
417 fn create_market_order_with_params_and_qty(
418 params: IndexMap<Ustr, Ustr>,
419 quantity: Quantity,
420 ) -> OrderAny {
421 OrderAny::Market(MarketOrder::new(
422 TraderId::from("TRADER-001"),
423 StrategyId::from("STRAT-001"),
424 InstrumentId::from("ETHUSDT-PERP.BINANCE"),
425 ClientOrderId::from("O-001"),
426 OrderSide::Buy,
427 quantity,
428 TimeInForce::Gtc,
429 UUID4::new(),
430 0.into(),
431 false,
432 false,
433 None,
434 None,
435 None,
436 None,
437 Some(ExecAlgorithmId::new("TWAP")),
438 Some(params),
439 None,
440 None,
441 ))
442 }
443
444 #[rstest]
445 fn test_twap_creation() {
446 let algo = create_twap_algorithm();
447 assert!(algo.core.exec_algorithm_id.inner().starts_with("TWAP"));
448 assert!(algo.scheduled_sizes.is_empty());
449 }
450
451 #[rstest]
452 fn test_twap_registration() {
453 let mut algo = create_twap_algorithm();
454 register_algorithm(&mut algo);
455
456 assert!(algo.core.trader_id().is_some());
457 }
458
459 #[rstest]
460 fn test_twap_reset_clears_scheduled_sizes() {
461 let mut algo = create_twap_algorithm();
462 let primary_id = ClientOrderId::new("O-001");
463
464 algo.scheduled_sizes
465 .insert(primary_id, vec![Quantity::from("1.0")]);
466
467 assert!(!algo.scheduled_sizes.is_empty());
468
469 ExecutionAlgorithm::on_reset(&mut algo).unwrap();
470
471 assert!(algo.scheduled_sizes.is_empty());
472 }
473
474 #[rstest]
475 fn test_twap_rejects_non_market_orders() {
476 let mut algo = create_twap_algorithm();
477 register_algorithm(&mut algo);
478
479 let order = OrderAny::Limit(LimitOrder::new(
480 TraderId::from("TRADER-001"),
481 StrategyId::from("STRAT-001"),
482 InstrumentId::from("BTC/USDT.BINANCE"),
483 ClientOrderId::from("O-001"),
484 OrderSide::Buy,
485 Quantity::from("1.0"),
486 Price::from("50000.0"),
487 TimeInForce::Gtc,
488 None, false, false, false, None, None, None, None, None, None, None, None, None, None, None, UUID4::new(),
504 0.into(),
505 ));
506
507 let result = algo.on_order(order);
509 assert!(result.is_ok());
510 }
511
512 #[rstest]
513 fn test_twap_rejects_missing_params() {
514 let mut algo = create_twap_algorithm();
515 register_algorithm(&mut algo);
516
517 let order = OrderAny::Market(MarketOrder::new(
518 TraderId::from("TRADER-001"),
519 StrategyId::from("STRAT-001"),
520 InstrumentId::from("BTC/USDT.BINANCE"),
521 ClientOrderId::from("O-001"),
522 OrderSide::Buy,
523 Quantity::from("1.0"),
524 TimeInForce::Gtc,
525 UUID4::new(),
526 0.into(),
527 false,
528 false,
529 None,
530 None,
531 None,
532 None,
533 None,
534 None, None,
536 None,
537 ));
538
539 let result = algo.on_order(order);
541 assert!(result.is_ok());
542 }
543
544 #[rstest]
545 fn test_twap_rejects_horizon_less_than_interval() {
546 let mut algo = create_twap_algorithm();
547 register_algorithm(&mut algo);
548
549 add_instrument_to_cache(&mut algo);
550
551 let mut params = IndexMap::new();
552 params.insert(Ustr::from("horizon_secs"), Ustr::from("30"));
553 params.insert(Ustr::from("interval_secs"), Ustr::from("60"));
554
555 let order = create_market_order_with_params(params);
556 let result = algo.on_order(order);
557
558 assert!(result.is_ok());
559 assert!(algo.scheduled_sizes.is_empty());
560 }
561
562 #[rstest]
563 fn test_twap_rejects_duplicate_order() {
564 let mut algo = create_twap_algorithm();
565 register_algorithm(&mut algo);
566
567 add_instrument_to_cache(&mut algo);
568
569 let mut params = IndexMap::new();
570 params.insert(Ustr::from("horizon_secs"), Ustr::from("60"));
571 params.insert(Ustr::from("interval_secs"), Ustr::from("10"));
572
573 let order1 = create_market_order_with_params(params.clone());
574 let order2 = create_market_order_with_params(params);
575
576 algo.on_order(order1).unwrap();
577 let result = algo.on_order(order2);
578
579 assert!(result.is_err());
580 assert!(
581 result
582 .unwrap_err()
583 .to_string()
584 .contains("already being executed")
585 );
586 }
587
588 #[rstest]
589 fn test_twap_calculates_size_schedule_evenly() {
590 let mut algo = create_twap_algorithm();
591 register_algorithm(&mut algo);
592
593 add_instrument_to_cache(&mut algo);
594
595 let mut params = IndexMap::new();
597 params.insert(Ustr::from("horizon_secs"), Ustr::from("60"));
598 params.insert(Ustr::from("interval_secs"), Ustr::from("20"));
599
600 let order = create_market_order_with_params_and_qty(params, Quantity::from("1.2"));
601 let primary_id = order.client_order_id();
602
603 algo.on_order(order).unwrap();
604
605 let remaining = algo.scheduled_sizes.get(&primary_id).unwrap();
607 assert_eq!(remaining.len(), 2);
608
609 for qty in remaining {
610 assert_eq!(*qty, Quantity::from("0.4"));
611 }
612 }
613
614 #[rstest]
615 fn test_twap_calculates_size_schedule_with_remainder() {
616 let mut algo = create_twap_algorithm();
617 register_algorithm(&mut algo);
618
619 add_instrument_to_cache(&mut algo);
620
621 let mut params = IndexMap::new();
624 params.insert(Ustr::from("horizon_secs"), Ustr::from("60"));
625 params.insert(Ustr::from("interval_secs"), Ustr::from("20"));
626
627 let order = create_market_order_with_params(params);
628 let primary_id = order.client_order_id();
629
630 algo.on_order(order).unwrap();
631
632 let remaining = algo.scheduled_sizes.get(&primary_id).unwrap();
634 assert_eq!(remaining.len(), 3);
635
636 #[cfg(feature = "high-precision")]
640 {
641 assert_eq!(remaining[0].raw, 3_333_333_333_333_333);
642 assert_eq!(remaining[1].raw, 3_333_333_333_333_333);
643 assert_eq!(remaining[2].raw, 1);
644 }
645 #[cfg(not(feature = "high-precision"))]
646 {
647 assert_eq!(remaining[0].raw, 333_333_333);
648 assert_eq!(remaining[1].raw, 333_333_333);
649 assert_eq!(remaining[2].raw, 1);
650 }
651 }
652
653 #[rstest]
654 fn test_twap_on_time_event_spawns_next_slice() {
655 let mut algo = create_twap_algorithm();
656 register_algorithm(&mut algo);
657
658 add_instrument_to_cache(&mut algo);
659
660 let mut params = IndexMap::new();
662 params.insert(Ustr::from("horizon_secs"), Ustr::from("60"));
663 params.insert(Ustr::from("interval_secs"), Ustr::from("20"));
664
665 let order = create_market_order_with_params_and_qty(params, Quantity::from("1.2"));
666 let primary_id = order.client_order_id();
667
668 algo.on_order(order).unwrap();
669
670 assert_eq!(algo.scheduled_sizes.get(&primary_id).unwrap().len(), 2);
672
673 let event = TimeEvent::new(
675 Ustr::from(primary_id.as_str()),
676 UUID4::new(),
677 0.into(),
678 0.into(),
679 );
680 ExecutionAlgorithm::on_time_event(&mut algo, &event).unwrap();
681
682 assert_eq!(algo.scheduled_sizes.get(&primary_id).unwrap().len(), 1);
684 }
685
686 #[rstest]
687 fn test_twap_on_time_event_completes_on_final_slice() {
688 let mut algo = create_twap_algorithm();
689 register_algorithm(&mut algo);
690
691 add_instrument_to_cache(&mut algo);
692
693 let mut params = IndexMap::new();
695 params.insert(Ustr::from("horizon_secs"), Ustr::from("60"));
696 params.insert(Ustr::from("interval_secs"), Ustr::from("30"));
697
698 let order = create_market_order_with_params(params);
699 let primary_id = order.client_order_id();
700
701 algo.on_order(order).unwrap();
702 assert_eq!(algo.scheduled_sizes.get(&primary_id).unwrap().len(), 1);
703
704 let event = TimeEvent::new(
706 Ustr::from(primary_id.as_str()),
707 UUID4::new(),
708 0.into(),
709 0.into(),
710 );
711 ExecutionAlgorithm::on_time_event(&mut algo, &event).unwrap();
712
713 assert!(algo.scheduled_sizes.get(&primary_id).is_none());
715 }
716}