nautilus_common/
component.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Component system for managing stateful system entities.
17//!
18//! This module provides the component framework for managing the lifecycle and state
19//! of system entities. Components have defined states (pre-initialized, ready, running,
20//! stopped, etc.) and provide a consistent interface for state management and transitions.
21
22#![allow(unsafe_code)]
23
24use std::{
25    cell::{RefCell, UnsafeCell},
26    fmt::Debug,
27    rc::Rc,
28};
29
30use ahash::{AHashMap, AHashSet};
31use nautilus_model::identifiers::{ComponentId, TraderId};
32use ustr::Ustr;
33
34use crate::{
35    actor::{Actor, registry::get_actor_registry},
36    cache::Cache,
37    clock::Clock,
38    enums::{ComponentState, ComponentTrigger},
39};
40
41/// Components have state and lifecycle management capabilities.
42pub trait Component {
43    /// Returns the unique identifier for this component.
44    fn component_id(&self) -> ComponentId;
45
46    /// Returns the current state of the component.
47    fn state(&self) -> ComponentState;
48
49    /// Transition the component with the state trigger.
50    ///
51    /// # Errors
52    ///
53    /// Returns an error if the `trigger` is an invalid transition from the current state.
54    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()>;
55
56    /// Returns whether the component is ready.
57    fn is_ready(&self) -> bool {
58        self.state() == ComponentState::Ready
59    }
60
61    /// Returns whether the component is *not* running.
62    fn not_running(&self) -> bool {
63        !self.is_running()
64    }
65
66    /// Returns whether the component is running.
67    fn is_running(&self) -> bool {
68        self.state() == ComponentState::Running
69    }
70
71    /// Returns whether the component is stopped.
72    fn is_stopped(&self) -> bool {
73        self.state() == ComponentState::Stopped
74    }
75
76    /// Returns whether the component has been degraded.
77    fn is_degraded(&self) -> bool {
78        self.state() == ComponentState::Degraded
79    }
80
81    /// Returns whether the component has been faulted.
82    fn is_faulted(&self) -> bool {
83        self.state() == ComponentState::Faulted
84    }
85
86    /// Returns whether the component has been disposed.
87    fn is_disposed(&self) -> bool {
88        self.state() == ComponentState::Disposed
89    }
90
91    /// Registers the component with a system.
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the component fails to register.
96    fn register(
97        &mut self,
98        trader_id: TraderId,
99        clock: Rc<RefCell<dyn Clock>>,
100        cache: Rc<RefCell<Cache>>,
101    ) -> anyhow::Result<()>;
102
103    /// Initializes the component.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the initialization state transition fails.
108    fn initialize(&mut self) -> anyhow::Result<()> {
109        self.transition_state(ComponentTrigger::Initialize)
110    }
111
112    /// Starts the component.
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if the component fails to start.
117    fn start(&mut self) -> anyhow::Result<()> {
118        self.transition_state(ComponentTrigger::Start)?; // -> Starting
119
120        if let Err(e) = self.on_start() {
121            log_error(&e);
122            return Err(e); // Halt state transition
123        }
124
125        self.transition_state(ComponentTrigger::StartCompleted)?;
126
127        Ok(())
128    }
129
130    /// Stops the component.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if the component fails to stop.
135    fn stop(&mut self) -> anyhow::Result<()> {
136        self.transition_state(ComponentTrigger::Stop)?; // -> Stopping
137
138        if let Err(e) = self.on_stop() {
139            log_error(&e);
140            return Err(e); // Halt state transition
141        }
142
143        self.transition_state(ComponentTrigger::StopCompleted)?;
144
145        Ok(())
146    }
147
148    /// Resumes the component.
149    ///
150    /// # Errors
151    ///
152    /// Returns an error if the component fails to resume.
153    fn resume(&mut self) -> anyhow::Result<()> {
154        self.transition_state(ComponentTrigger::Resume)?; // -> Resuming
155
156        if let Err(e) = self.on_resume() {
157            log_error(&e);
158            return Err(e); // Halt state transition
159        }
160
161        self.transition_state(ComponentTrigger::ResumeCompleted)?;
162
163        Ok(())
164    }
165
166    /// Degrades the component.
167    ///
168    /// # Errors
169    ///
170    /// Returns an error if the component fails to degrade.
171    fn degrade(&mut self) -> anyhow::Result<()> {
172        self.transition_state(ComponentTrigger::Degrade)?; // -> Degrading
173
174        if let Err(e) = self.on_degrade() {
175            log_error(&e);
176            return Err(e); // Halt state transition
177        }
178
179        self.transition_state(ComponentTrigger::DegradeCompleted)?;
180
181        Ok(())
182    }
183
184    /// Faults the component.
185    ///
186    /// # Errors
187    ///
188    /// Returns an error if the component fails to fault.
189    fn fault(&mut self) -> anyhow::Result<()> {
190        self.transition_state(ComponentTrigger::Fault)?; // -> Faulting
191
192        if let Err(e) = self.on_fault() {
193            log_error(&e);
194            return Err(e); // Halt state transition
195        }
196
197        self.transition_state(ComponentTrigger::FaultCompleted)?;
198
199        Ok(())
200    }
201
202    /// Resets the component to its initial state.
203    ///
204    /// # Errors
205    ///
206    /// Returns an error if the component fails to reset.
207    fn reset(&mut self) -> anyhow::Result<()> {
208        self.transition_state(ComponentTrigger::Reset)?; // -> Resetting
209
210        if let Err(e) = self.on_reset() {
211            log_error(&e);
212            return Err(e); // Halt state transition
213        }
214
215        self.transition_state(ComponentTrigger::ResetCompleted)?;
216
217        Ok(())
218    }
219
220    /// Disposes of the component, releasing any resources.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the component fails to dispose.
225    fn dispose(&mut self) -> anyhow::Result<()> {
226        self.transition_state(ComponentTrigger::Dispose)?; // -> Disposing
227
228        if let Err(e) = self.on_dispose() {
229            log_error(&e);
230            return Err(e); // Halt state transition
231        }
232
233        self.transition_state(ComponentTrigger::DisposeCompleted)?;
234
235        Ok(())
236    }
237
238    /// Actions to be performed on start.
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if starting the actor fails.
243    fn on_start(&mut self) -> anyhow::Result<()> {
244        log::warn!(
245            "The `on_start` handler was called when not overridden, \
246            it's expected that any actions required when stopping the component \
247            occur here, such as unsubscribing from data",
248        );
249        Ok(())
250    }
251
252    /// Actions to be performed on stop.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if stopping the actor fails.
257    fn on_stop(&mut self) -> anyhow::Result<()> {
258        log::warn!(
259            "The `on_stop` handler was called when not overridden, \
260            it's expected that any actions required when stopping the component \
261            occur here, such as unsubscribing from data",
262        );
263        Ok(())
264    }
265
266    /// Actions to be performed on resume.
267    ///
268    /// # Errors
269    ///
270    /// Returns an error if resuming the actor fails.
271    fn on_resume(&mut self) -> anyhow::Result<()> {
272        log::warn!(
273            "The `on_resume` handler was called when not overridden, \
274            it's expected that any actions required when resuming the component \
275            following a stop occur here"
276        );
277        Ok(())
278    }
279
280    /// Actions to be performed on reset.
281    ///
282    /// # Errors
283    ///
284    /// Returns an error if resetting the actor fails.
285    fn on_reset(&mut self) -> anyhow::Result<()> {
286        log::warn!(
287            "The `on_reset` handler was called when not overridden, \
288            it's expected that any actions required when resetting the component \
289            occur here, such as resetting indicators and other state"
290        );
291        Ok(())
292    }
293
294    /// Actions to be performed on dispose.
295    ///
296    /// # Errors
297    ///
298    /// Returns an error if disposing the actor fails.
299    fn on_dispose(&mut self) -> anyhow::Result<()> {
300        Ok(())
301    }
302
303    /// Actions to be performed on degrade.
304    ///
305    /// # Errors
306    ///
307    /// Returns an error if degrading the actor fails.
308    fn on_degrade(&mut self) -> anyhow::Result<()> {
309        Ok(())
310    }
311
312    /// Actions to be performed on fault.
313    ///
314    /// # Errors
315    ///
316    /// Returns an error if faulting the actor fails.
317    fn on_fault(&mut self) -> anyhow::Result<()> {
318        Ok(())
319    }
320}
321
322fn log_error(e: &anyhow::Error) {
323    log::error!("{e}");
324}
325
326#[rustfmt::skip]
327impl ComponentState {
328    /// Transition the state machine with the component `trigger`.
329    ///
330    /// # Errors
331    ///
332    /// Returns an error if `trigger` is invalid for the current state.
333    pub fn transition(&mut self, trigger: &ComponentTrigger) -> anyhow::Result<Self> {
334        let new_state = match (&self, trigger) {
335            (Self::PreInitialized, ComponentTrigger::Initialize) => Self::Ready,
336            (Self::Ready, ComponentTrigger::Reset) => Self::Resetting,
337            (Self::Ready, ComponentTrigger::Start) => Self::Starting,
338            (Self::Ready, ComponentTrigger::Dispose) => Self::Disposing,
339            (Self::Resetting, ComponentTrigger::ResetCompleted) => Self::Ready,
340            (Self::Starting, ComponentTrigger::StartCompleted) => Self::Running,
341            (Self::Starting, ComponentTrigger::Stop) => Self::Stopping,
342            (Self::Starting, ComponentTrigger::Fault) => Self::Faulting,
343            (Self::Running, ComponentTrigger::Stop) => Self::Stopping,
344            (Self::Running, ComponentTrigger::Degrade) => Self::Degrading,
345            (Self::Running, ComponentTrigger::Fault) => Self::Faulting,
346            (Self::Resuming, ComponentTrigger::Stop) => Self::Stopping,
347            (Self::Resuming, ComponentTrigger::ResumeCompleted) => Self::Running,
348            (Self::Resuming, ComponentTrigger::Fault) => Self::Faulting,
349            (Self::Stopping, ComponentTrigger::StopCompleted) => Self::Stopped,
350            (Self::Stopping, ComponentTrigger::Fault) => Self::Faulting,
351            (Self::Stopped, ComponentTrigger::Reset) => Self::Resetting,
352            (Self::Stopped, ComponentTrigger::Resume) => Self::Resuming,
353            (Self::Stopped, ComponentTrigger::Dispose) => Self::Disposing,
354            (Self::Stopped, ComponentTrigger::Fault) => Self::Faulting,
355            (Self::Degrading, ComponentTrigger::DegradeCompleted) => Self::Degraded,
356            (Self::Degraded, ComponentTrigger::Resume) => Self::Resuming,
357            (Self::Degraded, ComponentTrigger::Stop) => Self::Stopping,
358            (Self::Degraded, ComponentTrigger::Fault) => Self::Faulting,
359            (Self::Disposing, ComponentTrigger::DisposeCompleted) => Self::Disposed,
360            (Self::Faulting, ComponentTrigger::FaultCompleted) => Self::Faulted,
361            _ => anyhow::bail!("Invalid state trigger {self} -> {trigger}"),
362        };
363        Ok(new_state)
364    }
365}
366
367thread_local! {
368    static COMPONENT_REGISTRY: ComponentRegistry = ComponentRegistry::new();
369}
370
371/// Registry for storing components with runtime borrow tracking.
372///
373/// The registry tracks which components are currently mutably borrowed to prevent
374/// multiple simultaneous mutable borrows (which would be undefined behavior).
375pub struct ComponentRegistry {
376    components: RefCell<AHashMap<Ustr, Rc<UnsafeCell<dyn Component>>>>,
377    borrows: RefCell<AHashSet<Ustr>>,
378}
379
380impl Debug for ComponentRegistry {
381    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
382        let components_ref = self.components.borrow();
383        let keys: Vec<&Ustr> = components_ref.keys().collect();
384        f.debug_struct(stringify!(ComponentRegistry))
385            .field("components", &keys)
386            .field("active_borrows", &self.borrows.borrow().len())
387            .finish()
388    }
389}
390
391impl Default for ComponentRegistry {
392    fn default() -> Self {
393        Self::new()
394    }
395}
396
397impl ComponentRegistry {
398    pub fn new() -> Self {
399        Self {
400            components: RefCell::new(AHashMap::new()),
401            borrows: RefCell::new(AHashSet::new()),
402        }
403    }
404
405    pub fn insert(&self, id: Ustr, component: Rc<UnsafeCell<dyn Component>>) {
406        self.components.borrow_mut().insert(id, component);
407    }
408
409    pub fn get(&self, id: &Ustr) -> Option<Rc<UnsafeCell<dyn Component>>> {
410        self.components.borrow().get(id).cloned()
411    }
412
413    /// Checks if a component is currently borrowed.
414    pub fn is_borrowed(&self, id: &Ustr) -> bool {
415        self.borrows.borrow().contains(id)
416    }
417
418    /// Marks a component as borrowed. Returns false if already borrowed.
419    fn try_borrow(&self, id: Ustr) -> bool {
420        let mut borrows = self.borrows.borrow_mut();
421        if borrows.contains(&id) {
422            false
423        } else {
424            borrows.insert(id);
425            true
426        }
427    }
428
429    /// Releases a borrow on a component.
430    fn release_borrow(&self, id: &Ustr) {
431        self.borrows.borrow_mut().remove(id);
432    }
433}
434
435/// Guard that releases a component borrow when dropped.
436///
437/// This ensures borrows are released even if the code panics during
438/// a lifecycle method call.
439struct BorrowGuard {
440    id: Ustr,
441}
442
443impl BorrowGuard {
444    fn new(id: Ustr) -> Self {
445        Self { id }
446    }
447}
448
449impl Drop for BorrowGuard {
450    fn drop(&mut self) {
451        get_component_registry().release_borrow(&self.id);
452    }
453}
454
455/// Returns a reference to the global component registry.
456pub fn get_component_registry() -> &'static ComponentRegistry {
457    COMPONENT_REGISTRY.with(|registry| unsafe {
458        // SAFETY: We return a static reference that lives for the lifetime of the thread.
459        // Since this is thread_local storage, each thread has its own instance.
460        std::mem::transmute::<&ComponentRegistry, &'static ComponentRegistry>(registry)
461    })
462}
463
464/// Registers a component.
465pub fn register_component<T>(component: T) -> Rc<UnsafeCell<T>>
466where
467    T: Component + 'static,
468{
469    let component_id = component.component_id().inner();
470    let component_ref = Rc::new(UnsafeCell::new(component));
471
472    // Register in component registry
473    let component_trait_ref: Rc<UnsafeCell<dyn Component>> = component_ref.clone();
474    get_component_registry().insert(component_id, component_trait_ref);
475
476    component_ref
477}
478
479/// Registers a component that also implements Actor.
480pub fn register_component_actor<T>(component: T) -> Rc<UnsafeCell<T>>
481where
482    T: Component + Actor + 'static,
483{
484    let component_id = component.component_id().inner();
485    let actor_id = component.id();
486    let component_ref = Rc::new(UnsafeCell::new(component));
487
488    // Register in component registry
489    let component_trait_ref: Rc<UnsafeCell<dyn Component>> = component_ref.clone();
490    get_component_registry().insert(component_id, component_trait_ref);
491
492    // Register in actor registry
493    let actor_trait_ref: Rc<UnsafeCell<dyn Actor>> = component_ref.clone();
494    get_actor_registry().insert(actor_id, actor_trait_ref);
495
496    component_ref
497}
498
499/// Safely calls start() on a component in the global registry.
500///
501/// # Errors
502///
503/// - Returns an error if the component is not found.
504/// - Returns an error if the component is already borrowed.
505/// - Returns an error if start() fails.
506pub fn start_component(id: &Ustr) -> anyhow::Result<()> {
507    let registry = get_component_registry();
508    let component_ref = registry
509        .get(id)
510        .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
511
512    if !registry.try_borrow(*id) {
513        anyhow::bail!(
514            "Component '{id}' is already mutably borrowed. \
515             This would create aliasing mutable references (undefined behavior)."
516        );
517    }
518
519    let _guard = BorrowGuard::new(*id);
520
521    // SAFETY: Borrow tracking ensures exclusive access
522    unsafe {
523        let component = &mut *component_ref.get();
524        component.start()
525    }
526}
527
528/// Safely calls stop() on a component in the global registry.
529///
530/// # Errors
531///
532/// - Returns an error if the component is not found.
533/// - Returns an error if the component is already borrowed.
534/// - Returns an error if stop() fails.
535pub fn stop_component(id: &Ustr) -> anyhow::Result<()> {
536    let registry = get_component_registry();
537    let component_ref = registry
538        .get(id)
539        .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
540
541    if !registry.try_borrow(*id) {
542        anyhow::bail!(
543            "Component '{id}' is already mutably borrowed. \
544             This would create aliasing mutable references (undefined behavior)."
545        );
546    }
547
548    let _guard = BorrowGuard::new(*id);
549
550    // SAFETY: Borrow tracking ensures exclusive access
551    unsafe {
552        let component = &mut *component_ref.get();
553        component.stop()
554    }
555}
556
557/// Safely calls reset() on a component in the global registry.
558///
559/// # Errors
560///
561/// - Returns an error if the component is not found.
562/// - Returns an error if the component is already borrowed.
563/// - Returns an error if reset() fails.
564pub fn reset_component(id: &Ustr) -> anyhow::Result<()> {
565    let registry = get_component_registry();
566    let component_ref = registry
567        .get(id)
568        .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
569
570    if !registry.try_borrow(*id) {
571        anyhow::bail!(
572            "Component '{id}' is already mutably borrowed. \
573             This would create aliasing mutable references (undefined behavior)."
574        );
575    }
576
577    let _guard = BorrowGuard::new(*id);
578
579    // SAFETY: Borrow tracking ensures exclusive access
580    unsafe {
581        let component = &mut *component_ref.get();
582        component.reset()
583    }
584}
585
586/// Safely calls dispose() on a component in the global registry.
587///
588/// # Errors
589///
590/// - Returns an error if the component is not found.
591/// - Returns an error if the component is already borrowed.
592/// - Returns an error if dispose() fails.
593pub fn dispose_component(id: &Ustr) -> anyhow::Result<()> {
594    let registry = get_component_registry();
595    let component_ref = registry
596        .get(id)
597        .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
598
599    if !registry.try_borrow(*id) {
600        anyhow::bail!(
601            "Component '{id}' is already mutably borrowed. \
602             This would create aliasing mutable references (undefined behavior)."
603        );
604    }
605
606    let _guard = BorrowGuard::new(*id);
607
608    // SAFETY: Borrow tracking ensures exclusive access
609    unsafe {
610        let component = &mut *component_ref.get();
611        component.dispose()
612    }
613}
614
615/// Returns a component from the global registry by ID.
616pub fn get_component(id: &Ustr) -> Option<Rc<UnsafeCell<dyn Component>>> {
617    get_component_registry().get(id)
618}
619
620#[cfg(test)]
621/// Clears the component registry (for test isolation).
622pub fn clear_component_registry() {
623    let registry = get_component_registry();
624    registry.components.borrow_mut().clear();
625    registry.borrows.borrow_mut().clear();
626}
627
628#[cfg(test)]
629mod tests {
630    use std::sync::atomic::{AtomicBool, Ordering};
631
632    use rstest::rstest;
633
634    use super::*;
635
636    struct TestComponent {
637        id: ComponentId,
638        state: ComponentState,
639        should_panic: &'static AtomicBool,
640    }
641
642    impl TestComponent {
643        fn new(name: &str, should_panic: &'static AtomicBool) -> Self {
644            Self {
645                id: ComponentId::new(name),
646                state: ComponentState::Ready,
647                should_panic,
648            }
649        }
650    }
651
652    impl Component for TestComponent {
653        fn component_id(&self) -> ComponentId {
654            self.id
655        }
656
657        fn state(&self) -> ComponentState {
658            self.state
659        }
660
661        fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
662            self.state = self.state.transition(&trigger)?;
663            Ok(())
664        }
665
666        fn register(
667            &mut self,
668            _trader_id: TraderId,
669            _clock: Rc<RefCell<dyn Clock>>,
670            _cache: Rc<RefCell<Cache>>,
671        ) -> anyhow::Result<()> {
672            Ok(())
673        }
674
675        #[allow(clippy::panic_in_result_fn)] // Intentional panic for testing
676        fn on_start(&mut self) -> anyhow::Result<()> {
677            if self.should_panic.load(Ordering::SeqCst) {
678                panic!("Intentional panic for testing");
679            }
680            Ok(())
681        }
682    }
683
684    static NO_PANIC: AtomicBool = AtomicBool::new(false);
685    static DO_PANIC: AtomicBool = AtomicBool::new(true);
686
687    #[rstest]
688    fn test_component_borrow_tracking_prevents_double_borrow() {
689        clear_component_registry();
690
691        let id = Ustr::from("test-component-1");
692        let component = TestComponent::new("test-component-1", &NO_PANIC);
693        let component_id = component.id.inner();
694
695        let component_ref = Rc::new(UnsafeCell::new(component));
696        get_component_registry().insert(component_id, component_ref);
697
698        // First borrow via start_component should succeed
699        let result1 = start_component(&id);
700        assert!(result1.is_ok());
701
702        // Component should now be borrowable again (guard released)
703        let result2 = stop_component(&id);
704        assert!(result2.is_ok());
705    }
706
707    #[rstest]
708    fn test_component_borrow_released_after_lifecycle_call() {
709        clear_component_registry();
710
711        let id = Ustr::from("test-component-2");
712        let component = TestComponent::new("test-component-2", &NO_PANIC);
713        let component_id = component.id.inner();
714
715        let component_ref = Rc::new(UnsafeCell::new(component));
716        get_component_registry().insert(component_id, component_ref);
717
718        // Call start - borrow should be released after
719        let _ = start_component(&id);
720
721        // Verify not marked as borrowed
722        assert!(!get_component_registry().is_borrowed(&id));
723    }
724
725    #[rstest]
726    fn test_component_borrow_released_on_panic() {
727        clear_component_registry();
728
729        let id = Ustr::from("test-component-panic");
730        let component = TestComponent::new("test-component-panic", &DO_PANIC);
731        let component_id = component.id.inner();
732
733        let component_ref = Rc::new(UnsafeCell::new(component));
734        get_component_registry().insert(component_id, component_ref);
735
736        // Call start which will panic - catch the panic
737        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
738            let _ = start_component(&id);
739        }));
740        assert!(result.is_err(), "Expected panic from on_start");
741
742        // Borrow should still be released due to BorrowGuard drop
743        assert!(
744            !get_component_registry().is_borrowed(&id),
745            "Borrow was not released after panic"
746        );
747    }
748}