Echo/Queue/StealingQueue.rs
1//! # StealingQueue
2//!
3//! A generic, priority-aware, work-stealing queue implementation. This module
4//! is self-contained and can be used by any scheduler or application to manage
5//! and distribute tasks of any type that can be prioritized.
6
7#![allow(non_snake_case, non_camel_case_types)]
8
9use std::sync::Arc;
10
11use crossbeam_deque::{Injector, Steal, Stealer, Worker};
12use rand::Rng;
13
14/// Defines a contract for types that can be prioritized by the queue.
15pub trait Prioritized {
16 /// The type of the priority value used by the implementor.
17 type Kind: PartialEq + Eq + Copy;
18
19 /// A method to retrieve the priority of the item.
20 fn Rank(&self) -> Self::Kind;
21}
22
23/// Defines the internal priority levels used by the generic queue. These map
24/// directly to the different deques managed by the system.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum Priority {
27 High,
28
29 Normal,
30
31 Low,
32}
33
34/// Holds the queue components that are safe to share across all threads.
35///
36/// This includes global injectors for submitting new tasks from any context and
37/// stealers for taking tasks from other workers' deques, organized by priority
38/// level.
39pub struct Share<TTask> {
40 /// Global, multi-producer queues for each priority level.
41 pub Injector:(Injector<TTask>, Injector<TTask>, Injector<TTask>),
42
43 /// Shared handles for stealing tasks from each worker's local queue.
44 pub Stealer:(Vec<Stealer<TTask>>, Vec<Stealer<TTask>>, Vec<Stealer<TTask>>),
45}
46
47/// A generic, priority-aware, work-stealing queue.
48///
49/// This is the public-facing entry point for submitting tasks. It is generic
50/// over any task type `TTask` that implements the `Prioritized` trait.
51pub struct StealingQueue<TTask:Prioritized<Kind = Priority>> {
52 /// A shared, thread-safe pointer to the queue's shared components.
53 Share:Arc<Share<TTask>>,
54}
55
56/// Contains all necessary components for a single worker thread to operate.
57///
58/// This includes the thread-local `Worker` deques, which are not safe to share,
59
60/// making this `Context` object the sole owner of a worker's private queues.
61pub struct Context<TTask> {
62 /// A unique identifier for the worker, used to avoid self-stealing.
63 pub Identifier:usize,
64
65 /// Thread-local work queues for each priority level.
66 pub Local:(Worker<TTask>, Worker<TTask>, Worker<TTask>),
67
68 /// A reference to the shared components of the entire queue system.
69 pub Share:Arc<Share<TTask>>,
70}
71
72impl<TTask:Prioritized<Kind = Priority>> StealingQueue<TTask> {
73 /// Creates a complete work-stealing queue system.
74 ///
75 /// This function initializes all the necessary queues, both shared and
76 /// thread-local, for a given number of workers.
77 ///
78 /// # Returns
79 ///
80 /// A tuple containing:
81 /// 1. The public-facing `StealingQueue` for submitting new tasks.
82 /// 2. A `Vec` of `Context` objects, one for each worker thread to own.
83 pub fn Create(Count:usize) -> (Self, Vec<Context<TTask>>) {
84 let mut High:Vec<Worker<TTask>> = Vec::with_capacity(Count);
85
86 let mut Normal:Vec<Worker<TTask>> = Vec::with_capacity(Count);
87
88 let mut Low:Vec<Worker<TTask>> = Vec::with_capacity(Count);
89
90 // For each priority level, create a thread-local worker queue and its
91 // corresponding shared stealer.
92 let StealerHigh:Vec<Stealer<TTask>> = (0..Count)
93 .map(|_| {
94 let Worker = Worker::new_fifo();
95
96 let Stealer = Worker.stealer();
97
98 High.push(Worker);
99
100 Stealer
101 })
102 .collect();
103
104 let StealerNormal:Vec<Stealer<TTask>> = (0..Count)
105 .map(|_| {
106 let Worker = Worker::new_fifo();
107
108 let Stealer = Worker.stealer();
109
110 Normal.push(Worker);
111
112 Stealer
113 })
114 .collect();
115
116 let StealerLow:Vec<Stealer<TTask>> = (0..Count)
117 .map(|_| {
118 let Worker = Worker::new_fifo();
119
120 let Stealer = Worker.stealer();
121
122 Low.push(Worker);
123
124 Stealer
125 })
126 .collect();
127
128 // Bundle all shared components into an Arc for safe sharing.
129 let Share = Arc::new(Share {
130 Injector:(Injector::new(), Injector::new(), Injector::new()),
131
132 Stealer:(StealerHigh, StealerNormal, StealerLow),
133 });
134
135 // Create a unique context for each worker, giving it ownership of its
136 // local queues and a reference to the shared components.
137 let mut Contexts = Vec::with_capacity(Count);
138
139 for Identifier in 0..Count {
140 Contexts.push(Context {
141 Identifier,
142
143 Local:(High.remove(0), Normal.remove(0), Low.remove(0)),
144
145 Share:Share.clone(),
146 });
147 }
148
149 let Queue = Self { Share };
150
151 (Queue, Contexts)
152 }
153
154 /// Submits a new task to the appropriate global queue based on its
155 /// priority. This method is thread-safe and can be called from any
156 /// context.
157 pub fn Submit(&self, Task:TTask) {
158 match Task.Rank() {
159 Priority::High => self.Share.Injector.0.push(Task),
160
161 Priority::Normal => self.Share.Injector.1.push(Task),
162
163 Priority::Low => self.Share.Injector.2.push(Task),
164 }
165 }
166}
167
168impl<TTask> Context<TTask> {
169 /// Finds the next available task for the worker to execute.
170 ///
171 /// This method implements the complete work-finding logic:
172 /// 1. Check local deques (from high to low priority).
173 /// 2. If local deques are empty, attempt to steal from the system (from
174 /// high to low priority).
175 pub fn Next(&self) -> Option<TTask> {
176 self.Local
177 .0
178 .pop()
179 .or_else(|| self.Local.1.pop())
180 .or_else(|| self.Local.2.pop())
181 .or_else(|| self.Steal(&self.Share.Injector.0, &self.Share.Stealer.0, &self.Local.0))
182 .or_else(|| self.Steal(&self.Share.Injector.1, &self.Share.Stealer.1, &self.Local.1))
183 .or_else(|| self.Steal(&self.Share.Injector.2, &self.Share.Stealer.2, &self.Local.2))
184 }
185
186 /// Attempts to steal a task from a specific priority set.
187 ///
188 /// It first tries to steal a batch from the global injector queue for that
189 /// priority. If that fails, it attempts to steal from a randomly chosen
190 /// peer worker to ensure fair distribution and avoid contention hotspots.
191 pub fn Steal<'a>(
192 &self,
193
194 Injector:&'a Injector<TTask>,
195
196 Stealers:&'a [Stealer<TTask>],
197
198 Local:&'a Worker<TTask>,
199 ) -> Option<TTask> {
200 // First, try to steal a batch from the global injector.
201 // `steal_batch_and_pop` is efficient: it moves a batch into our local
202 // queue and returns one task immediately if successful.
203 if let Steal::Success(Task) = Injector.steal_batch_and_pop(Local) {
204 return Some(Task);
205 }
206
207 // If the global queue is empty, try stealing from peers.
208 let Count = Stealers.len();
209
210 if Count <= 1 {
211 // Cannot steal if there are no other workers.
212 return None;
213 }
214
215 // Allocation-free random iteration: pick a random starting point.
216 let mut Rng = rand::rng();
217
218 let Start = Rng.random_range(0..Count);
219
220 // Iterate through all peers starting from the random offset.
221 for i in 0..Count {
222 let Index = (Start + i) % Count;
223
224 // Don't steal from ourselves.
225 if Index == self.Identifier {
226 continue;
227 }
228
229 if let Steal::Success(Task) = Stealers[Index].steal_batch_and_pop(Local) {
230 return Some(Task);
231 }
232 }
233
234 None
235 }
236}