scuffle_bootstrap/global.rs
1//! Global state for the application.
2//!
3//! # [`Global`] vs. [`GlobalWithoutConfig`]
4//!
5//! [`Global`] has a set of functions that are called at different stages of the
6//! application lifecycle. To use [`Global`], your application is expected to
7//! have a config type implementing [`ConfigParser`]. If your application does
8//! not have a config, consider using the [`GlobalWithoutConfig`] trait which is
9//! a simplified version of [`Global`].
10
11use std::sync::Arc;
12
13use crate::config::{ConfigParser, EmptyConfig};
14
15fn default_runtime_builder() -> tokio::runtime::Builder {
16 let worker_threads = std::env::var("TOKIO_WORKER_THREADS")
17 .unwrap_or_default()
18 .parse::<usize>()
19 .ok()
20 .or_else(|| std::thread::available_parallelism().ok().map(|p| p.get()));
21
22 let mut builder = if let Some(1) = worker_threads {
23 tokio::runtime::Builder::new_current_thread()
24 } else {
25 tokio::runtime::Builder::new_multi_thread()
26 };
27
28 if let Some(worker_threads) = worker_threads {
29 builder.worker_threads(worker_threads);
30 }
31
32 if let Ok(max_blocking_threads) = std::env::var("TOKIO_MAX_BLOCKING_THREADS")
33 .unwrap_or_default()
34 .parse::<usize>()
35 {
36 builder.max_blocking_threads(max_blocking_threads);
37 }
38
39 if !std::env::var("TOKIO_DISABLE_TIME")
40 .unwrap_or_default()
41 .parse::<bool>()
42 .ok()
43 .unwrap_or(false)
44 {
45 builder.enable_time();
46 }
47
48 if !std::env::var("TOKIO_DISABLE_IO")
49 .unwrap_or_default()
50 .parse::<bool>()
51 .ok()
52 .unwrap_or(false)
53 {
54 builder.enable_io();
55 }
56
57 if let Ok(thread_stack_size) = std::env::var("TOKIO_THREAD_STACK_SIZE").unwrap_or_default().parse::<usize>() {
58 builder.thread_stack_size(thread_stack_size);
59 }
60
61 if let Ok(global_queue_interval) = std::env::var("TOKIO_GLOBAL_QUEUE_INTERVAL")
62 .unwrap_or_default()
63 .parse::<u32>()
64 {
65 builder.global_queue_interval(global_queue_interval);
66 }
67
68 if let Ok(event_interval) = std::env::var("TOKIO_EVENT_INTERVAL").unwrap_or_default().parse::<u32>() {
69 builder.event_interval(event_interval);
70 }
71
72 if let Ok(max_io_events_per_tick) = std::env::var("TOKIO_MAX_IO_EVENTS_PER_TICK")
73 .unwrap_or_default()
74 .parse::<usize>()
75 {
76 builder.max_io_events_per_tick(max_io_events_per_tick);
77 }
78
79 builder
80}
81
82/// This trait is implemented for the global type of your application.
83/// It is intended to be used to store any global state of your application.
84/// When using the [`main!`](crate::main) macro, one instance of this type will
85/// be made available to all services.
86///
87/// Using this trait requires a config type implementing [`ConfigParser`].
88/// If your application does not have a config, consider using the
89/// [`GlobalWithoutConfig`] trait.
90///
91/// # See Also
92///
93/// - [`GlobalWithoutConfig`]
94/// - [`Service`](crate::Service)
95/// - [`main`](crate::main)
96pub trait Global: Send + Sync + 'static {
97 type Config: ConfigParser + Send + 'static;
98
99 /// Pre-initialization.
100 ///
101 /// Called before initializing the tokio runtime and loading the config.
102 /// Returning an error from this function will cause the process to
103 /// immediately exit without calling [`on_exit`](Global::on_exit) first.
104 #[inline(always)]
105 fn pre_init() -> anyhow::Result<()> {
106 Ok(())
107 }
108
109 /// Builds the tokio runtime for the process.
110 ///
111 /// If not overridden, a default runtime builder is used to build the
112 /// runtime. It uses the following environment variables:
113 /// - `TOKIO_WORKER_THREADS`: Number of worker threads to use. If 1, a
114 /// current thread runtime is used.
115 ///
116 /// See [`tokio::runtime::Builder::worker_threads`] for details.
117 /// - `TOKIO_MAX_BLOCKING_THREADS`: Maximum number of blocking threads.
118 ///
119 /// See [`tokio::runtime::Builder::max_blocking_threads`] for details.
120 /// - `TOKIO_DISABLE_TIME`: If `true` disables time.
121 ///
122 /// See [`tokio::runtime::Builder::enable_time`] for details.
123 /// - `TOKIO_DISABLE_IO`: If `true` disables IO.
124 ///
125 /// See [`tokio::runtime::Builder::enable_io`] for details.
126 /// - `TOKIO_THREAD_STACK_SIZE`: Thread stack size.
127 ///
128 /// See [`tokio::runtime::Builder::thread_stack_size`] for details.
129 /// - `TOKIO_GLOBAL_QUEUE_INTERVAL`: Global queue interval.
130 ///
131 /// See [`tokio::runtime::Builder::global_queue_interval`] for details.
132 /// - `TOKIO_EVENT_INTERVAL`: Event interval.
133 ///
134 /// See [`tokio::runtime::Builder::event_interval`] for details.
135 /// - `TOKIO_MAX_IO_EVENTS_PER_TICK`: Maximum IO events per tick.
136 ///
137 /// See [`tokio::runtime::Builder::max_io_events_per_tick`] for details.
138 #[inline(always)]
139 fn tokio_runtime() -> tokio::runtime::Runtime {
140 default_runtime_builder().build().expect("runtime build")
141 }
142
143 /// Initialize the global.
144 ///
145 /// Called to initialize the global.
146 /// Returning an error from this function will cause the process to
147 /// immediately exit without calling [`on_exit`](Global::on_exit) first.
148 fn init(config: Self::Config) -> impl std::future::Future<Output = anyhow::Result<Arc<Self>>> + Send;
149
150 /// Called right before all services start.
151 ///
152 /// Returning an error from this function will prevent any service from
153 /// starting and [`on_exit`](Global::on_exit) will be called with the result
154 /// of this function.
155 #[inline(always)]
156 fn on_services_start(self: &Arc<Self>) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
157 std::future::ready(Ok(()))
158 }
159
160 /// Called after a service exits.
161 ///
162 /// `name` is the name of the service that exited and `result` is the result
163 /// the service exited with. Returning an error from this function will
164 /// stop all currently running services and [`on_exit`](Global::on_exit)
165 /// will be called with the result of this function.
166 #[inline(always)]
167 fn on_service_exit(
168 self: &Arc<Self>,
169 name: &'static str,
170 result: anyhow::Result<()>,
171 ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
172 let _ = name;
173 std::future::ready(result)
174 }
175
176 /// Called after the shutdown is complete, right before exiting the
177 /// process.
178 ///
179 /// `result` is [`Err`](anyhow::Result) when the process exits due to an
180 /// error in one of the services or handler functions, otherwise `Ok(())`.
181 #[inline(always)]
182 fn on_exit(
183 self: &Arc<Self>,
184 result: anyhow::Result<()>,
185 ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
186 std::future::ready(result)
187 }
188}
189
190/// Simplified version of [`Global`].
191///
192/// Implementing this trait will automatically implement [`Global`] for your
193/// type. This trait is intended to be used when you don't need a config for
194/// your global.
195///
196/// Refer to [`Global`] for details.
197pub trait GlobalWithoutConfig: Send + Sync + 'static {
198 /// Builds the tokio runtime for the process.
199 ///
200 /// If not overridden, a default runtime builder is used to build the
201 /// runtime. It uses the following environment variables:
202 /// - `TOKIO_WORKER_THREADS`: Number of worker threads to use. If 1, a
203 /// current thread runtime is used.
204 ///
205 /// See [`tokio::runtime::Builder::worker_threads`] for details.
206 /// - `TOKIO_MAX_BLOCKING_THREADS`: Maximum number of blocking threads.
207 ///
208 /// See [`tokio::runtime::Builder::max_blocking_threads`] for details.
209 /// - `TOKIO_DISABLE_TIME`: If `true` disables time.
210 ///
211 /// See [`tokio::runtime::Builder::enable_time`] for details.
212 /// - `TOKIO_DISABLE_IO`: If `true` disables IO.
213 ///
214 /// See [`tokio::runtime::Builder::enable_io`] for details.
215 /// - `TOKIO_THREAD_STACK_SIZE`: Thread stack size.
216 ///
217 /// See [`tokio::runtime::Builder::thread_stack_size`] for details.
218 /// - `TOKIO_GLOBAL_QUEUE_INTERVAL`: Global queue interval.
219 ///
220 /// See [`tokio::runtime::Builder::global_queue_interval`] for details.
221 /// - `TOKIO_EVENT_INTERVAL`: Event interval.
222 ///
223 /// See [`tokio::runtime::Builder::event_interval`] for details.
224 /// - `TOKIO_MAX_IO_EVENTS_PER_TICK`: Maximum IO events per tick.
225 ///
226 /// See [`tokio::runtime::Builder::max_io_events_per_tick`] for details.
227 #[inline(always)]
228 fn tokio_runtime() -> tokio::runtime::Runtime {
229 default_runtime_builder().build().expect("runtime build")
230 }
231
232 /// Initialize the global.
233 ///
234 /// Called to initialize the global.
235 /// Returning an error from this function will cause the process to
236 /// immediately exit without calling [`on_exit`](Global::on_exit) first.
237 fn init() -> impl std::future::Future<Output = anyhow::Result<Arc<Self>>> + Send;
238
239 /// Called right before all services start.
240 ///
241 /// Returning an error from this function will prevent any service from
242 /// starting and [`on_exit`](Global::on_exit) will be called with the result
243 /// of this function.
244 #[inline(always)]
245 fn on_services_start(self: &Arc<Self>) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
246 std::future::ready(Ok(()))
247 }
248
249 /// Called after a service exits.
250 ///
251 /// `name` is the name of the service that exited and `result` is the result
252 /// the service exited with. Returning an error from this function will
253 /// stop all currently running services and [`on_exit`](Global::on_exit)
254 /// will be called with the result of this function.
255 #[inline(always)]
256 fn on_service_exit(
257 self: &Arc<Self>,
258 name: &'static str,
259 result: anyhow::Result<()>,
260 ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
261 let _ = name;
262 std::future::ready(result)
263 }
264
265 /// Called after the shutdown is complete, right before exiting the
266 /// process.
267 ///
268 /// `result` is [`Err`](anyhow::Result) when the process exits due to an
269 /// error in one of the services or handler functions, otherwise `Ok(())`.
270 #[inline(always)]
271 fn on_exit(
272 self: &Arc<Self>,
273 result: anyhow::Result<()>,
274 ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
275 std::future::ready(result)
276 }
277}
278
279impl<T: GlobalWithoutConfig> Global for T {
280 type Config = EmptyConfig;
281
282 #[inline(always)]
283 fn tokio_runtime() -> tokio::runtime::Runtime {
284 <T as GlobalWithoutConfig>::tokio_runtime()
285 }
286
287 #[inline(always)]
288 fn init(_: Self::Config) -> impl std::future::Future<Output = anyhow::Result<Arc<Self>>> + Send {
289 <T as GlobalWithoutConfig>::init()
290 }
291
292 #[inline(always)]
293 fn on_services_start(self: &Arc<Self>) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
294 <T as GlobalWithoutConfig>::on_services_start(self)
295 }
296
297 #[inline(always)]
298 fn on_service_exit(
299 self: &Arc<Self>,
300 name: &'static str,
301 result: anyhow::Result<()>,
302 ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
303 <T as GlobalWithoutConfig>::on_service_exit(self, name, result)
304 }
305
306 #[inline(always)]
307 fn on_exit(
308 self: &Arc<Self>,
309 result: anyhow::Result<()>,
310 ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
311 <T as GlobalWithoutConfig>::on_exit(self, result)
312 }
313}
314
315#[cfg_attr(all(test, coverage_nightly), coverage(off))]
316#[cfg(test)]
317mod tests {
318 use std::sync::Arc;
319 use std::thread;
320
321 use super::{Global, GlobalWithoutConfig};
322 use crate::EmptyConfig;
323
324 struct TestGlobal;
325
326 impl Global for TestGlobal {
327 type Config = ();
328
329 async fn init(_config: Self::Config) -> anyhow::Result<std::sync::Arc<Self>> {
330 Ok(Arc::new(Self))
331 }
332 }
333
334 #[tokio::test]
335 async fn default_global() {
336 thread::spawn(|| {
337 // To get the coverage
338 TestGlobal::tokio_runtime();
339 });
340
341 assert!(matches!(TestGlobal::pre_init(), Ok(())));
342 let global = TestGlobal::init(()).await.unwrap();
343 assert!(matches!(global.on_services_start().await, Ok(())));
344
345 assert!(matches!(global.on_exit(Ok(())).await, Ok(())));
346 assert!(global.on_exit(Err(anyhow::anyhow!("error"))).await.is_err());
347
348 assert!(matches!(global.on_service_exit("test", Ok(())).await, Ok(())));
349 assert!(global.on_service_exit("test", Err(anyhow::anyhow!("error"))).await.is_err());
350 }
351
352 struct TestGlobalWithoutConfig;
353
354 impl GlobalWithoutConfig for TestGlobalWithoutConfig {
355 async fn init() -> anyhow::Result<std::sync::Arc<Self>> {
356 Ok(Arc::new(Self))
357 }
358 }
359
360 #[tokio::test]
361 async fn default_global_no_config() {
362 thread::spawn(|| {
363 // To get the coverage
364 <TestGlobalWithoutConfig as Global>::tokio_runtime();
365 });
366
367 assert!(matches!(TestGlobalWithoutConfig::pre_init(), Ok(())));
368 <TestGlobalWithoutConfig as Global>::init(EmptyConfig).await.unwrap();
369 let global = <TestGlobalWithoutConfig as GlobalWithoutConfig>::init().await.unwrap();
370 assert!(matches!(Global::on_services_start(&global).await, Ok(())));
371
372 assert!(matches!(Global::on_exit(&global, Ok(())).await, Ok(())));
373 assert!(Global::on_exit(&global, Err(anyhow::anyhow!("error"))).await.is_err());
374
375 assert!(matches!(Global::on_service_exit(&global, "test", Ok(())).await, Ok(())));
376 assert!(Global::on_service_exit(&global, "test", Err(anyhow::anyhow!("error")))
377 .await
378 .is_err());
379 }
380}