jwalk/core/
read_dir_iter.rs

1use std::sync::Arc;
2
3use super::*;
4use crate::Result;
5
6/// Client's read dir function.
7pub(crate) type ReadDirCallback<C> =
8    dyn Fn(ReadDirSpec<C>) -> Result<ReadDir<C>> + Send + Sync + 'static;
9
10/// Result<ReadDir> Iterator.
11///
12/// Yields ReadDirs (results of fs::read_dir) in order required for recursive
13/// directory traversal. Depending on Walk/ParWalk state these reads might be
14/// computed in parallel.
15pub enum ReadDirIter<C: ClientState> {
16    Walk {
17        read_dir_spec_stack: Vec<ReadDirSpec<C>>,
18        core_read_dir_callback: Arc<ReadDirCallback<C>>,
19    },
20    ParWalk {
21        read_dir_result_iter: OrderedQueueIter<Result<ReadDir<C>>>,
22    },
23}
24
25impl<C: ClientState> ReadDirIter<C> {
26    pub(crate) fn try_new(
27        read_dir_specs: Vec<ReadDirSpec<C>>,
28        parallelism: Parallelism,
29        core_read_dir_callback: Arc<ReadDirCallback<C>>,
30    ) -> Option<Self> {
31        if let Parallelism::Serial = parallelism {
32            ReadDirIter::Walk {
33                read_dir_spec_stack: read_dir_specs,
34                core_read_dir_callback,
35            }
36        } else {
37            let stop = Arc::new(AtomicBool::new(false));
38            let read_dir_result_queue = new_ordered_queue(stop.clone(), Ordering::Strict);
39            let (read_dir_result_queue, read_dir_result_iter) = read_dir_result_queue;
40            let read_dir_spec_queue = new_ordered_queue(stop.clone(), Ordering::Relaxed);
41            let (read_dir_spec_queue, read_dir_spec_iter) = read_dir_spec_queue;
42
43            for (i, read_dir_spec) in read_dir_specs.into_iter().enumerate() {
44                read_dir_spec_queue
45                    .push(Ordered::new(read_dir_spec, IndexPath::new(vec![0]), i))
46                    .unwrap();
47            }
48
49            let run_context = RunContext {
50                stop,
51                read_dir_spec_queue,
52                read_dir_result_queue,
53                core_read_dir_callback,
54            };
55
56            let (startup_tx, startup_rx) = parallelism
57                .timeout()
58                .map(|duration| {
59                    let (tx, rx) = crossbeam::channel::unbounded();
60                    (Some(tx), Some((rx, duration)))
61                })
62                .unwrap_or((None, None));
63            parallelism.spawn(move || {
64                if let Some(tx) = startup_tx {
65                    if tx.send(()).is_err() {
66                        // rayon didn't install this function in time so the listener exited. Do the same.
67                        return;
68                    }
69                }
70                read_dir_spec_iter.par_bridge().for_each_with(
71                    run_context,
72                    |run_context, ordered_read_dir_spec| {
73                        multi_threaded_walk_dir(ordered_read_dir_spec, run_context);
74                    },
75                );
76            });
77            if startup_rx.map_or(false, |(rx, duration)| rx.recv_timeout(duration).is_err()) {
78                return None;
79            }
80            ReadDirIter::ParWalk {
81                read_dir_result_iter,
82            }
83        }
84        .into()
85    }
86}
87
88impl<C: ClientState> Iterator for ReadDirIter<C> {
89    type Item = Result<ReadDir<C>>;
90    fn next(&mut self) -> Option<Self::Item> {
91        match self {
92            ReadDirIter::Walk {
93                read_dir_spec_stack,
94                core_read_dir_callback,
95            } => {
96                let read_dir_spec = read_dir_spec_stack.pop()?;
97                let read_dir_result = core_read_dir_callback(read_dir_spec);
98
99                if let Ok(read_dir) = read_dir_result.as_ref() {
100                    for each_spec in read_dir
101                        .read_children_specs()
102                        .collect::<Vec<_>>()
103                        .into_iter()
104                        .rev()
105                    {
106                        read_dir_spec_stack.push(each_spec);
107                    }
108                }
109
110                Some(read_dir_result)
111            }
112
113            ReadDirIter::ParWalk {
114                read_dir_result_iter,
115            } => read_dir_result_iter
116                .next()
117                .map(|read_dir_result| read_dir_result.value),
118        }
119    }
120}
121
122fn multi_threaded_walk_dir<C: ClientState>(
123    ordered_read_dir_spec: Ordered<ReadDirSpec<C>>,
124    run_context: &mut RunContext<C>,
125) {
126    let Ordered {
127        value: read_dir_spec,
128        index_path,
129        ..
130    } = ordered_read_dir_spec;
131
132    let read_dir_result = (run_context.core_read_dir_callback)(read_dir_spec);
133    let ordered_read_children_specs = read_dir_result
134        .as_ref()
135        .ok()
136        .map(|read_dir| read_dir.ordered_read_children_specs(&index_path));
137
138    let ordered_read_dir_result = Ordered::new(
139        read_dir_result,
140        index_path,
141        ordered_read_children_specs.as_ref().map_or(0, Vec::len),
142    );
143
144    if !run_context.send_read_dir_result(ordered_read_dir_result) {
145        run_context.stop();
146        return;
147    }
148
149    if let Some(ordered_read_children_specs) = ordered_read_children_specs {
150        for each in ordered_read_children_specs {
151            if !run_context.schedule_read_dir_spec(each) {
152                run_context.stop();
153                return;
154            }
155        }
156    }
157
158    run_context.complete_item();
159}