1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::sync::{Arc, Mutex};
use pulse::Signal;
use rayon::ThreadPool;
use dispatch::dispatcher::ThreadLocal;
use dispatch::stage::Stage;
use res::Resources;
const ERR_NO_DISPATCH: &str = "wait() called before dispatch or called twice";
pub struct AsyncDispatcher<'a> {
res: Arc<Resources>,
signal: Option<Signal>,
stages: Arc<Mutex<Vec<Stage<'static>>>>,
thread_local: ThreadLocal<'a>,
thread_pool: Arc<ThreadPool>,
}
pub fn new_async<'a>(
res: Resources,
stages: Vec<Stage<'static>>,
thread_local: ThreadLocal<'a>,
thread_pool: Arc<ThreadPool>,
) -> AsyncDispatcher<'a> {
AsyncDispatcher {
res: Arc::new(res),
signal: None,
stages: Arc::new(Mutex::new(stages)),
thread_local: thread_local,
thread_pool: thread_pool,
}
}
impl<'a> AsyncDispatcher<'a> {
pub fn dispatch(&mut self) {
let (signal, pulse) = Signal::new();
self.signal = Some(signal);
let stages = self.stages.clone();
let res = self.res.clone();
self.thread_pool.spawn(move || {
{
let stages = stages;
let mut stages = stages.lock().expect("Mutex poisoned");
let res = &*res;
for stage in &mut *stages {
stage.execute(res);
}
}
pulse.pulse();
})
}
pub fn wait(&mut self) {
self.wait_without_tl();
let res = &*self.res;
for sys in &mut self.thread_local {
sys.run_now(res);
}
}
pub fn wait_without_tl(&mut self) {
self.signal
.take()
.expect(ERR_NO_DISPATCH)
.wait()
.expect("The worker thread may have panicked");
}
pub fn is_running(&self) -> bool {
if let Some(ref signal) = self.signal {
signal.is_pending()
} else {
false
}
}
pub fn dispatch_thread_local(&mut self) {
if self.signal.is_some() {
self.wait_without_tl();
}
let res = &*self.res;
for sys in &mut self.thread_local {
sys.run_now(res);
}
}
pub fn mut_res(&mut self) -> &mut Resources {
if self.signal.is_some() {
self.wait();
}
Arc::get_mut(&mut self.res).expect(ERR_NO_DISPATCH)
}
}