jwalk/core/
read_dir_iter.rs
1use std::sync::Arc;
2
3use super::*;
4use crate::Result;
5
6pub(crate) type ReadDirCallback<C> =
8 dyn Fn(ReadDirSpec<C>) -> Result<ReadDir<C>> + Send + Sync + 'static;
9
10pub enum ReadDirIter<C: ClientState> {
16 Walk {
17 read_dir_spec_stack: Vec<ReadDirSpec<C>>,
18 core_read_dir_callback: Arc<ReadDirCallback<C>>,
19 },
20 ParWalk {
21 read_dir_result_iter: OrderedQueueIter<Result<ReadDir<C>>>,
22 },
23}
24
25impl<C: ClientState> ReadDirIter<C> {
26 pub(crate) fn try_new(
27 read_dir_specs: Vec<ReadDirSpec<C>>,
28 parallelism: Parallelism,
29 core_read_dir_callback: Arc<ReadDirCallback<C>>,
30 ) -> Option<Self> {
31 if let Parallelism::Serial = parallelism {
32 ReadDirIter::Walk {
33 read_dir_spec_stack: read_dir_specs,
34 core_read_dir_callback,
35 }
36 } else {
37 let stop = Arc::new(AtomicBool::new(false));
38 let read_dir_result_queue = new_ordered_queue(stop.clone(), Ordering::Strict);
39 let (read_dir_result_queue, read_dir_result_iter) = read_dir_result_queue;
40 let read_dir_spec_queue = new_ordered_queue(stop.clone(), Ordering::Relaxed);
41 let (read_dir_spec_queue, read_dir_spec_iter) = read_dir_spec_queue;
42
43 for (i, read_dir_spec) in read_dir_specs.into_iter().enumerate() {
44 read_dir_spec_queue
45 .push(Ordered::new(read_dir_spec, IndexPath::new(vec![0]), i))
46 .unwrap();
47 }
48
49 let run_context = RunContext {
50 stop,
51 read_dir_spec_queue,
52 read_dir_result_queue,
53 core_read_dir_callback,
54 };
55
56 let (startup_tx, startup_rx) = parallelism
57 .timeout()
58 .map(|duration| {
59 let (tx, rx) = crossbeam::channel::unbounded();
60 (Some(tx), Some((rx, duration)))
61 })
62 .unwrap_or((None, None));
63 parallelism.spawn(move || {
64 if let Some(tx) = startup_tx {
65 if tx.send(()).is_err() {
66 return;
68 }
69 }
70 read_dir_spec_iter.par_bridge().for_each_with(
71 run_context,
72 |run_context, ordered_read_dir_spec| {
73 multi_threaded_walk_dir(ordered_read_dir_spec, run_context);
74 },
75 );
76 });
77 if startup_rx.map_or(false, |(rx, duration)| rx.recv_timeout(duration).is_err()) {
78 return None;
79 }
80 ReadDirIter::ParWalk {
81 read_dir_result_iter,
82 }
83 }
84 .into()
85 }
86}
87
88impl<C: ClientState> Iterator for ReadDirIter<C> {
89 type Item = Result<ReadDir<C>>;
90 fn next(&mut self) -> Option<Self::Item> {
91 match self {
92 ReadDirIter::Walk {
93 read_dir_spec_stack,
94 core_read_dir_callback,
95 } => {
96 let read_dir_spec = read_dir_spec_stack.pop()?;
97 let read_dir_result = core_read_dir_callback(read_dir_spec);
98
99 if let Ok(read_dir) = read_dir_result.as_ref() {
100 for each_spec in read_dir
101 .read_children_specs()
102 .collect::<Vec<_>>()
103 .into_iter()
104 .rev()
105 {
106 read_dir_spec_stack.push(each_spec);
107 }
108 }
109
110 Some(read_dir_result)
111 }
112
113 ReadDirIter::ParWalk {
114 read_dir_result_iter,
115 } => read_dir_result_iter
116 .next()
117 .map(|read_dir_result| read_dir_result.value),
118 }
119 }
120}
121
122fn multi_threaded_walk_dir<C: ClientState>(
123 ordered_read_dir_spec: Ordered<ReadDirSpec<C>>,
124 run_context: &mut RunContext<C>,
125) {
126 let Ordered {
127 value: read_dir_spec,
128 index_path,
129 ..
130 } = ordered_read_dir_spec;
131
132 let read_dir_result = (run_context.core_read_dir_callback)(read_dir_spec);
133 let ordered_read_children_specs = read_dir_result
134 .as_ref()
135 .ok()
136 .map(|read_dir| read_dir.ordered_read_children_specs(&index_path));
137
138 let ordered_read_dir_result = Ordered::new(
139 read_dir_result,
140 index_path,
141 ordered_read_children_specs.as_ref().map_or(0, Vec::len),
142 );
143
144 if !run_context.send_read_dir_result(ordered_read_dir_result) {
145 run_context.stop();
146 return;
147 }
148
149 if let Some(ordered_read_children_specs) = ordered_read_children_specs {
150 for each in ordered_read_children_specs {
151 if !run_context.schedule_read_dir_spec(each) {
152 run_context.stop();
153 return;
154 }
155 }
156 }
157
158 run_context.complete_item();
159}