nautilus_trading/algorithm/
twap.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Time-Weighted Average Price (TWAP) execution algorithm.
17//!
18//! The TWAP algorithm executes orders by evenly spreading them over a specified
19//! time horizon at regular intervals. This helps reduce market impact by avoiding
20//! concentration of trade size at any given time.
21//!
22//! # Parameters
23//!
24//! Orders submitted to this algorithm must include `exec_algorithm_params` with:
25//! - `horizon_secs`: Total execution horizon in seconds.
26//! - `interval_secs`: Interval between child orders in seconds.
27//!
28//! # Example
29//!
30//! An order with `horizon_secs=60` and `interval_secs=10` will spawn 6 child
31//! orders over 60 seconds, one every 10 seconds.
32
33use std::{
34    ops::{Deref, DerefMut},
35    time::Duration,
36};
37
38use ahash::AHashMap;
39use nautilus_common::{
40    actor::{DataActor, DataActorCore},
41    timer::TimeEvent,
42};
43use nautilus_model::{
44    enums::OrderType,
45    identifiers::ClientOrderId,
46    instruments::Instrument,
47    orders::{Order, OrderAny},
48    types::{Quantity, quantity::QuantityRaw},
49};
50use ustr::Ustr;
51
52use super::{ExecutionAlgorithm, ExecutionAlgorithmConfig, ExecutionAlgorithmCore};
53
54/// Configuration for [`TwapAlgorithm`].
55pub type TwapAlgorithmConfig = ExecutionAlgorithmConfig;
56
57/// Time-Weighted Average Price (TWAP) execution algorithm.
58///
59/// Executes orders by evenly spreading them over a specified time horizon,
60/// at regular intervals. The algorithm receives a primary order and spawns
61/// smaller child orders that are executed at regular intervals.
62#[derive(Debug)]
63pub struct TwapAlgorithm {
64    /// The algorithm core.
65    pub core: ExecutionAlgorithmCore,
66    /// Scheduled sizes for each primary order.
67    scheduled_sizes: AHashMap<ClientOrderId, Vec<Quantity>>,
68}
69
70impl TwapAlgorithm {
71    /// Creates a new [`TwapAlgorithm`] instance.
72    #[must_use]
73    pub fn new(config: TwapAlgorithmConfig) -> Self {
74        Self {
75            core: ExecutionAlgorithmCore::new(config),
76            scheduled_sizes: AHashMap::new(),
77        }
78    }
79
80    /// Completes the execution sequence for a primary order.
81    fn complete_sequence(&mut self, primary_id: &ClientOrderId) {
82        let timer_name = primary_id.as_str();
83        if self.core.clock().timer_names().contains(&timer_name) {
84            self.core.clock().cancel_timer(timer_name);
85        }
86        self.scheduled_sizes.remove(primary_id);
87        log::info!("Completed TWAP execution for {primary_id}");
88    }
89}
90
91impl Deref for TwapAlgorithm {
92    type Target = DataActorCore;
93    fn deref(&self) -> &Self::Target {
94        &self.core.actor
95    }
96}
97
98impl DerefMut for TwapAlgorithm {
99    fn deref_mut(&mut self) -> &mut Self::Target {
100        &mut self.core.actor
101    }
102}
103
104impl DataActor for TwapAlgorithm {}
105
106impl ExecutionAlgorithm for TwapAlgorithm {
107    fn core_mut(&mut self) -> &mut ExecutionAlgorithmCore {
108        &mut self.core
109    }
110
111    fn on_order(&mut self, order: OrderAny) -> anyhow::Result<()> {
112        let primary_id = order.client_order_id();
113
114        if self.scheduled_sizes.contains_key(&primary_id) {
115            anyhow::bail!("Order {primary_id} already being executed");
116        }
117
118        log::info!("Received order for TWAP execution: {order:?}");
119
120        // Only market orders supported
121        if order.order_type() != OrderType::Market {
122            log::error!(
123                "Cannot execute order: only implemented for market orders, order_type={:?}",
124                order.order_type()
125            );
126            return Ok(());
127        }
128
129        let instrument = {
130            let cache = self.core.cache();
131            cache.instrument(&order.instrument_id()).cloned()
132        };
133
134        let Some(instrument) = instrument else {
135            log::error!(
136                "Cannot execute order: instrument {} not found",
137                order.instrument_id()
138            );
139            return Ok(());
140        };
141
142        let Some(exec_params) = order.exec_algorithm_params() else {
143            log::error!(
144                "Cannot execute order: exec_algorithm_params not found for primary order {primary_id}"
145            );
146            return Ok(());
147        };
148
149        let Some(horizon_secs_str) = exec_params.get(&Ustr::from("horizon_secs")) else {
150            log::error!("Cannot execute order: horizon_secs not found in exec_algorithm_params");
151            return Ok(());
152        };
153
154        let horizon_secs: u64 = horizon_secs_str.parse().map_err(|e| {
155            log::error!("Cannot parse horizon_secs: {e}");
156            anyhow::anyhow!("Invalid horizon_secs")
157        })?;
158
159        let Some(interval_secs_str) = exec_params.get(&Ustr::from("interval_secs")) else {
160            log::error!("Cannot execute order: interval_secs not found in exec_algorithm_params");
161            return Ok(());
162        };
163
164        let interval_secs: u64 = interval_secs_str.parse().map_err(|e| {
165            log::error!("Cannot parse interval_secs: {e}");
166            anyhow::anyhow!("Invalid interval_secs")
167        })?;
168
169        if horizon_secs < interval_secs {
170            log::error!(
171                "Cannot execute order: horizon_secs={horizon_secs} was less than interval_secs={interval_secs}"
172            );
173            return Ok(());
174        }
175
176        let num_intervals = horizon_secs / interval_secs;
177        if num_intervals == 0 {
178            log::error!("Cannot execute order: num_intervals is 0");
179            return Ok(());
180        }
181
182        let total_qty = order.quantity();
183        let total_raw = total_qty.raw;
184        let precision = total_qty.precision;
185
186        let qty_per_interval_raw = total_raw / (num_intervals as QuantityRaw);
187        let qty_per_interval = Quantity::from_raw(qty_per_interval_raw, precision);
188
189        if qty_per_interval == total_qty || qty_per_interval < instrument.size_increment() {
190            log::warn!(
191                "Submitting for entire size: qty_per_interval={qty_per_interval}, order_quantity={total_qty}"
192            );
193            self.submit_order(order, None, None)?;
194            return Ok(());
195        }
196
197        if let Some(min_qty) = instrument.min_quantity()
198            && qty_per_interval < min_qty
199        {
200            log::warn!(
201                "Submitting for entire size: qty_per_interval={qty_per_interval} < min_quantity={min_qty}"
202            );
203            self.submit_order(order, None, None)?;
204            return Ok(());
205        }
206
207        let mut scheduled_sizes: Vec<Quantity> = vec![qty_per_interval; num_intervals as usize];
208
209        // Remainder goes in the last slice
210        let scheduled_total = qty_per_interval_raw * (num_intervals as QuantityRaw);
211        let remainder_raw = total_raw - scheduled_total;
212        if remainder_raw > 0 {
213            let remainder = Quantity::from_raw(remainder_raw, total_qty.precision);
214            scheduled_sizes.push(remainder);
215        }
216
217        log::info!("Order execution size schedule: {scheduled_sizes:?}");
218
219        // Add primary order to cache so on_time_event can retrieve it
220        {
221            let cache_rc = self.core.cache_rc();
222            let mut cache = cache_rc.borrow_mut();
223            cache.add_order(order.clone(), None, None, false)?;
224        }
225
226        self.scheduled_sizes
227            .insert(primary_id, scheduled_sizes.clone());
228
229        let first_qty = self.scheduled_sizes.get_mut(&primary_id).unwrap().remove(0);
230        let is_single_slice = self
231            .scheduled_sizes
232            .get(&primary_id)
233            .is_some_and(|s| s.is_empty());
234
235        // Single slice: submit the primary order directly
236        if is_single_slice {
237            self.submit_order(order, None, None)?;
238            self.complete_sequence(&primary_id);
239            return Ok(());
240        }
241
242        // Multiple slices: spawn first child order and reduce primary
243        let tags = order.tags().map(|t| t.to_vec());
244        let time_in_force = order.time_in_force();
245        let reduce_only = order.is_reduce_only();
246        let mut order = order;
247        let spawned = self.spawn_market(
248            &mut order,
249            first_qty,
250            time_in_force,
251            reduce_only,
252            tags,
253            true,
254        );
255        self.submit_order(spawned.into(), None, None)?;
256
257        {
258            let cache_rc = self.core.cache_rc();
259            let mut cache = cache_rc.borrow_mut();
260            cache.update_order(&order)?;
261        }
262
263        self.core.clock().set_timer(
264            primary_id.as_str(),
265            Duration::from_secs(interval_secs),
266            None,
267            None,
268            None,
269            None,
270            None,
271        )?;
272
273        log::info!(
274            "Started TWAP execution for {primary_id}: horizon_secs={horizon_secs}, interval_secs={interval_secs}"
275        );
276
277        Ok(())
278    }
279
280    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
281        log::info!("Received time event: {event:?}");
282
283        let primary_id = ClientOrderId::new(event.name.as_str());
284
285        let primary = {
286            let cache = self.core.cache();
287            cache.order(&primary_id).cloned()
288        };
289
290        let Some(primary) = primary else {
291            log::error!("Cannot find primary order for exec_spawn_id={primary_id}");
292            return Ok(());
293        };
294
295        if primary.is_closed() {
296            self.complete_sequence(&primary_id);
297            return Ok(());
298        }
299
300        let Some(scheduled_sizes) = self.scheduled_sizes.get_mut(&primary_id) else {
301            log::error!("Cannot find scheduled sizes for exec_spawn_id={primary_id}");
302            return Ok(());
303        };
304
305        if scheduled_sizes.is_empty() {
306            log::warn!("No more size to execute for exec_spawn_id={primary_id}");
307            return Ok(());
308        }
309
310        let quantity = scheduled_sizes.remove(0);
311        let is_final_slice = scheduled_sizes.is_empty();
312
313        // Final slice: submit the primary order (already reduced to remaining quantity)
314        if is_final_slice {
315            self.submit_order(primary, None, None)?;
316            self.complete_sequence(&primary_id);
317            return Ok(());
318        }
319
320        // Intermediate slice: spawn child order and reduce primary
321        let tags = primary.tags().map(|t| t.to_vec());
322        let time_in_force = primary.time_in_force();
323        let reduce_only = primary.is_reduce_only();
324        let mut primary = primary;
325        let spawned = self.spawn_market(
326            &mut primary,
327            quantity,
328            time_in_force,
329            reduce_only,
330            tags,
331            true,
332        );
333        self.submit_order(spawned.into(), None, None)?;
334
335        {
336            let cache_rc = self.core.cache_rc();
337            let mut cache = cache_rc.borrow_mut();
338            cache.update_order(&primary)?;
339        }
340
341        Ok(())
342    }
343
344    fn on_stop(&mut self) -> anyhow::Result<()> {
345        self.core.clock().cancel_timers();
346        Ok(())
347    }
348
349    fn on_reset(&mut self) -> anyhow::Result<()> {
350        self.core.reset();
351        self.scheduled_sizes.clear();
352        Ok(())
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use std::{cell::RefCell, rc::Rc};
359
360    use indexmap::IndexMap;
361    use nautilus_common::{
362        cache::Cache,
363        clock::{Clock, TestClock},
364    };
365    use nautilus_core::UUID4;
366    use nautilus_model::{
367        enums::{OrderSide, TimeInForce},
368        identifiers::{ExecAlgorithmId, InstrumentId, StrategyId, TraderId},
369        orders::{LimitOrder, MarketOrder},
370        types::Price,
371    };
372    use rstest::rstest;
373    use ustr::Ustr;
374
375    use super::*;
376
377    fn create_twap_algorithm() -> TwapAlgorithm {
378        // Use unique ID to avoid thread-local registry/msgbus conflicts in parallel tests
379        let unique_id = format!("TWAP-{}", UUID4::new());
380        let config = TwapAlgorithmConfig {
381            exec_algorithm_id: Some(ExecAlgorithmId::new(&unique_id)),
382            ..Default::default()
383        };
384        TwapAlgorithm::new(config)
385    }
386
387    fn register_algorithm(algo: &mut TwapAlgorithm) {
388        use nautilus_common::timer::TimeEventCallback;
389
390        let trader_id = TraderId::from("TRADER-001");
391        let clock = Rc::new(RefCell::new(TestClock::new()));
392        let cache = Rc::new(RefCell::new(Cache::default()));
393
394        // Register a no-op default handler for timer callbacks
395        clock
396            .borrow_mut()
397            .register_default_handler(TimeEventCallback::Rust(std::sync::Arc::new(|_| {})));
398
399        algo.core.register(trader_id, clock, cache).unwrap();
400    }
401
402    fn add_instrument_to_cache(algo: &mut TwapAlgorithm) {
403        use nautilus_model::instruments::{InstrumentAny, stubs::crypto_perpetual_ethusdt};
404
405        let instrument = crypto_perpetual_ethusdt();
406        let cache_rc = algo.core.cache_rc();
407        let mut cache = cache_rc.borrow_mut();
408        cache
409            .add_instrument(InstrumentAny::CryptoPerpetual(instrument))
410            .unwrap();
411    }
412
413    fn create_market_order_with_params(params: IndexMap<Ustr, Ustr>) -> OrderAny {
414        create_market_order_with_params_and_qty(params, Quantity::from("1.0"))
415    }
416
417    fn create_market_order_with_params_and_qty(
418        params: IndexMap<Ustr, Ustr>,
419        quantity: Quantity,
420    ) -> OrderAny {
421        OrderAny::Market(MarketOrder::new(
422            TraderId::from("TRADER-001"),
423            StrategyId::from("STRAT-001"),
424            InstrumentId::from("ETHUSDT-PERP.BINANCE"),
425            ClientOrderId::from("O-001"),
426            OrderSide::Buy,
427            quantity,
428            TimeInForce::Gtc,
429            UUID4::new(),
430            0.into(),
431            false,
432            false,
433            None,
434            None,
435            None,
436            None,
437            Some(ExecAlgorithmId::new("TWAP")),
438            Some(params),
439            None,
440            None,
441        ))
442    }
443
444    #[rstest]
445    fn test_twap_creation() {
446        let algo = create_twap_algorithm();
447        assert!(algo.core.exec_algorithm_id.inner().starts_with("TWAP"));
448        assert!(algo.scheduled_sizes.is_empty());
449    }
450
451    #[rstest]
452    fn test_twap_registration() {
453        let mut algo = create_twap_algorithm();
454        register_algorithm(&mut algo);
455
456        assert!(algo.core.trader_id().is_some());
457    }
458
459    #[rstest]
460    fn test_twap_reset_clears_scheduled_sizes() {
461        let mut algo = create_twap_algorithm();
462        let primary_id = ClientOrderId::new("O-001");
463
464        algo.scheduled_sizes
465            .insert(primary_id, vec![Quantity::from("1.0")]);
466
467        assert!(!algo.scheduled_sizes.is_empty());
468
469        ExecutionAlgorithm::on_reset(&mut algo).unwrap();
470
471        assert!(algo.scheduled_sizes.is_empty());
472    }
473
474    #[rstest]
475    fn test_twap_rejects_non_market_orders() {
476        let mut algo = create_twap_algorithm();
477        register_algorithm(&mut algo);
478
479        let order = OrderAny::Limit(LimitOrder::new(
480            TraderId::from("TRADER-001"),
481            StrategyId::from("STRAT-001"),
482            InstrumentId::from("BTC/USDT.BINANCE"),
483            ClientOrderId::from("O-001"),
484            OrderSide::Buy,
485            Quantity::from("1.0"),
486            Price::from("50000.0"),
487            TimeInForce::Gtc,
488            None,  // expire_time
489            false, // post_only
490            false, // reduce_only
491            false, // quote_quantity
492            None,  // display_qty
493            None,  // emulation_trigger
494            None,  // trigger_instrument_id
495            None,  // contingency_type
496            None,  // order_list_id
497            None,  // linked_order_ids
498            None,  // parent_order_id
499            None,  // exec_algorithm_id
500            None,  // exec_algorithm_params
501            None,  // exec_spawn_id
502            None,  // tags
503            UUID4::new(),
504            0.into(),
505        ));
506
507        // Should not error, just log and return
508        let result = algo.on_order(order);
509        assert!(result.is_ok());
510    }
511
512    #[rstest]
513    fn test_twap_rejects_missing_params() {
514        let mut algo = create_twap_algorithm();
515        register_algorithm(&mut algo);
516
517        let order = OrderAny::Market(MarketOrder::new(
518            TraderId::from("TRADER-001"),
519            StrategyId::from("STRAT-001"),
520            InstrumentId::from("BTC/USDT.BINANCE"),
521            ClientOrderId::from("O-001"),
522            OrderSide::Buy,
523            Quantity::from("1.0"),
524            TimeInForce::Gtc,
525            UUID4::new(),
526            0.into(),
527            false,
528            false,
529            None,
530            None,
531            None,
532            None,
533            None,
534            None, // No exec_algorithm_params
535            None,
536            None,
537        ));
538
539        // Should not error, just log and return
540        let result = algo.on_order(order);
541        assert!(result.is_ok());
542    }
543
544    #[rstest]
545    fn test_twap_rejects_horizon_less_than_interval() {
546        let mut algo = create_twap_algorithm();
547        register_algorithm(&mut algo);
548
549        add_instrument_to_cache(&mut algo);
550
551        let mut params = IndexMap::new();
552        params.insert(Ustr::from("horizon_secs"), Ustr::from("30"));
553        params.insert(Ustr::from("interval_secs"), Ustr::from("60"));
554
555        let order = create_market_order_with_params(params);
556        let result = algo.on_order(order);
557
558        assert!(result.is_ok());
559        assert!(algo.scheduled_sizes.is_empty());
560    }
561
562    #[rstest]
563    fn test_twap_rejects_duplicate_order() {
564        let mut algo = create_twap_algorithm();
565        register_algorithm(&mut algo);
566
567        add_instrument_to_cache(&mut algo);
568
569        let mut params = IndexMap::new();
570        params.insert(Ustr::from("horizon_secs"), Ustr::from("60"));
571        params.insert(Ustr::from("interval_secs"), Ustr::from("10"));
572
573        let order1 = create_market_order_with_params(params.clone());
574        let order2 = create_market_order_with_params(params);
575
576        algo.on_order(order1).unwrap();
577        let result = algo.on_order(order2);
578
579        assert!(result.is_err());
580        assert!(
581            result
582                .unwrap_err()
583                .to_string()
584                .contains("already being executed")
585        );
586    }
587
588    #[rstest]
589    fn test_twap_calculates_size_schedule_evenly() {
590        let mut algo = create_twap_algorithm();
591        register_algorithm(&mut algo);
592
593        add_instrument_to_cache(&mut algo);
594
595        // 1.2 qty over 60s with 20s intervals = 3 intervals of 0.4 each (divides evenly)
596        let mut params = IndexMap::new();
597        params.insert(Ustr::from("horizon_secs"), Ustr::from("60"));
598        params.insert(Ustr::from("interval_secs"), Ustr::from("20"));
599
600        let order = create_market_order_with_params_and_qty(params, Quantity::from("1.2"));
601        let primary_id = order.client_order_id();
602
603        algo.on_order(order).unwrap();
604
605        // First slice spawned immediately, remaining 2 slices scheduled (no remainder)
606        let remaining = algo.scheduled_sizes.get(&primary_id).unwrap();
607        assert_eq!(remaining.len(), 2);
608
609        for qty in remaining {
610            assert_eq!(*qty, Quantity::from("0.4"));
611        }
612    }
613
614    #[rstest]
615    fn test_twap_calculates_size_schedule_with_remainder() {
616        let mut algo = create_twap_algorithm();
617        register_algorithm(&mut algo);
618
619        add_instrument_to_cache(&mut algo);
620
621        // 1.0 qty over 60s with 20s intervals = 3 intervals
622        // Raw is scaled to FIXED_PRECISION: 9 (standard) or 16 (high-precision)
623        let mut params = IndexMap::new();
624        params.insert(Ustr::from("horizon_secs"), Ustr::from("60"));
625        params.insert(Ustr::from("interval_secs"), Ustr::from("20"));
626
627        let order = create_market_order_with_params(params);
628        let primary_id = order.client_order_id();
629
630        algo.on_order(order).unwrap();
631
632        // First slice spawned, 3 remaining (2 regular + 1 remainder)
633        let remaining = algo.scheduled_sizes.get(&primary_id).unwrap();
634        assert_eq!(remaining.len(), 3);
635
636        // Expected raw values depend on FIXED_PRECISION
637        // Standard (9):  1_000_000_000 / 3 = 333_333_333, remainder = 1
638        // High (16): 10_000_000_000_000_000 / 3 = 3_333_333_333_333_333, remainder = 1
639        #[cfg(feature = "high-precision")]
640        {
641            assert_eq!(remaining[0].raw, 3_333_333_333_333_333);
642            assert_eq!(remaining[1].raw, 3_333_333_333_333_333);
643            assert_eq!(remaining[2].raw, 1);
644        }
645        #[cfg(not(feature = "high-precision"))]
646        {
647            assert_eq!(remaining[0].raw, 333_333_333);
648            assert_eq!(remaining[1].raw, 333_333_333);
649            assert_eq!(remaining[2].raw, 1);
650        }
651    }
652
653    #[rstest]
654    fn test_twap_on_time_event_spawns_next_slice() {
655        let mut algo = create_twap_algorithm();
656        register_algorithm(&mut algo);
657
658        add_instrument_to_cache(&mut algo);
659
660        // Use qty that divides evenly: 1.2 / 3 = 0.4 each
661        let mut params = IndexMap::new();
662        params.insert(Ustr::from("horizon_secs"), Ustr::from("60"));
663        params.insert(Ustr::from("interval_secs"), Ustr::from("20"));
664
665        let order = create_market_order_with_params_and_qty(params, Quantity::from("1.2"));
666        let primary_id = order.client_order_id();
667
668        algo.on_order(order).unwrap();
669
670        // Verify 2 slices remain after first spawn (no remainder)
671        assert_eq!(algo.scheduled_sizes.get(&primary_id).unwrap().len(), 2);
672
673        // Simulate timer firing
674        let event = TimeEvent::new(
675            Ustr::from(primary_id.as_str()),
676            UUID4::new(),
677            0.into(),
678            0.into(),
679        );
680        ExecutionAlgorithm::on_time_event(&mut algo, &event).unwrap();
681
682        // One slice consumed
683        assert_eq!(algo.scheduled_sizes.get(&primary_id).unwrap().len(), 1);
684    }
685
686    #[rstest]
687    fn test_twap_on_time_event_completes_on_final_slice() {
688        let mut algo = create_twap_algorithm();
689        register_algorithm(&mut algo);
690
691        add_instrument_to_cache(&mut algo);
692
693        // 2 intervals: first spawned immediately, one in scheduled_sizes
694        let mut params = IndexMap::new();
695        params.insert(Ustr::from("horizon_secs"), Ustr::from("60"));
696        params.insert(Ustr::from("interval_secs"), Ustr::from("30"));
697
698        let order = create_market_order_with_params(params);
699        let primary_id = order.client_order_id();
700
701        algo.on_order(order).unwrap();
702        assert_eq!(algo.scheduled_sizes.get(&primary_id).unwrap().len(), 1);
703
704        // Simulate timer firing for final slice
705        let event = TimeEvent::new(
706            Ustr::from(primary_id.as_str()),
707            UUID4::new(),
708            0.into(),
709            0.into(),
710        );
711        ExecutionAlgorithm::on_time_event(&mut algo, &event).unwrap();
712
713        // Sequence completed, scheduled_sizes removed
714        assert!(algo.scheduled_sizes.get(&primary_id).is_none());
715    }
716}