Echo/Scheduler/
Scheduler.rs

1//! # Scheduler
2//!
3//! Manages the pool of workers and the task queue system, serving as the main
4//! public interface of the `Echo` library.
5
6#![allow(non_snake_case, non_camel_case_types)]
7
8use std::{
9	collections::HashMap,
10	future::Future,
11	sync::{
12		Arc,
13		atomic::{AtomicBool, Ordering},
14	},
15};
16
17use log::{error, info, warn};
18use tokio::task::JoinHandle;
19
20use super::{SchedulerBuilder::Concurrency, Worker::Worker};
21use crate::{
22	Queue::StealingQueue::StealingQueue,
23	Task::{Priority::Priority, Task::Task},
24};
25
26/// Manages a pool of worker threads and a work-stealing queue to execute
27/// tasks efficiently. This is the primary public-facing struct of the library.
28pub struct Scheduler {
29	/// The underlying work-stealing queue system used for task submission.
30	Queue:StealingQueue<Task>,
31
32	/// Handles to the spawned worker threads, used for graceful shutdown.
33	WorkerHandles:Vec<JoinHandle<()>>,
34
35	/// An atomic flag to signal all workers to shut down.
36	IsRunning:Arc<AtomicBool>,
37}
38
39impl Scheduler {
40	/// Creates and starts a new scheduler with a given configuration.
41	///
42	/// This is a crate-private function, intended to be called only by the
43	/// `SchedulerBuilder`'s `Build` method.
44	pub(crate) fn Create(WorkerCount:usize, _Configuration:HashMap<String, Concurrency>) -> Self {
45		info!("[Scheduler] Creating scheduler with {} workers.", WorkerCount);
46
47		let IsRunning = Arc::new(AtomicBool::new(true));
48
49		// Create the entire queue system and retrieve the contexts for each worker.
50		let (Queue, Contexts) = StealingQueue::<Task>::Create(WorkerCount);
51
52		let mut WorkerHandles = Vec::with_capacity(WorkerCount);
53
54		// Spawn an asynchronous task for each worker.
55		for Context in Contexts.into_iter() {
56			let IsRunning = IsRunning.clone();
57
58			let WorkerHandle = tokio::spawn(async move {
59				// Each task creates and runs a worker, consuming its context.
60				Worker::Create(Context, IsRunning).Run().await;
61			});
62
63			WorkerHandles.push(WorkerHandle);
64		}
65
66		Self { Queue, WorkerHandles, IsRunning }
67	}
68
69	/// Submits a new task to the scheduler's global queue.
70	///
71	/// The task will be picked up by the next available worker according to its
72	/// priority and the work-stealing logic.
73	pub fn Submit<F>(&self, Operation:F, Priority:Priority)
74	where
75		F: Future<Output = ()> + Send + 'static, {
76		self.Queue.Submit(Task::Create(Operation, Priority));
77	}
78
79	/// Asynchronously shuts down the scheduler.
80	///
81	/// This method signals all worker threads to stop their loops and then
82	/// waits for each one to complete its current task and exit gracefully.
83	pub async fn Stop(&mut self) {
84		if !self.IsRunning.swap(false, Ordering::Relaxed) {
85			info!("[Scheduler] Stop already initiated.");
86
87			return;
88		}
89
90		info!("[Scheduler] Stopping worker threads...");
91
92		for Handle in self.WorkerHandles.drain(..) {
93			if let Err(Error) = Handle.await {
94				error!("[Scheduler] Error joining worker handle: {}", Error);
95			}
96		}
97
98		info!("[Scheduler] All workers stopped successfully.");
99	}
100}
101
102impl Drop for Scheduler {
103	/// Ensures workers are signaled to stop if the `Scheduler` is dropped
104	/// without an explicit call to `Stop`.
105	///
106	/// This prevents orphaned worker threads if the user forgets to manage the
107	/// scheduler's lifecycle properly.
108	fn drop(&mut self) {
109		if self.IsRunning.load(Ordering::Relaxed) {
110			warn!("[Scheduler] Dropped without explicit stop. Signaling workers to terminate.");
111
112			self.IsRunning.store(false, Ordering::Relaxed);
113		}
114	}
115}