1
/*
2
 * Hurl (https://hurl.dev)
3
 * Copyright (C) 2024 Orange
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *          http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18
use std::sync::mpsc::{Receiver, Sender};
19
use std::sync::{mpsc, Arc, Mutex};
20

            
21
use hurl_core::error::{DisplaySourceError, OutputFormat};
22
use hurl_core::typing::Count;
23

            
24
use crate::output;
25
use crate::parallel::error::JobError;
26
use crate::parallel::job::{Job, JobQueue, JobResult};
27
use crate::parallel::message::WorkerMessage;
28
use crate::parallel::progress::{Mode, ParProgress};
29
use crate::parallel::worker::{Worker, WorkerId};
30
use crate::util::term::{Stderr, Stdout, WriteMode};
31

            
32
/// A parallel runner manages a list of `Worker`. Each worker is either idle or is running a
33
/// [`Job`]. To run jobs, the [`ParallelRunner::run`] method much be executed on the main thread.
34
/// Each worker has its own thread that it uses to run a Hurl file, and communicates with the main
35
/// thread. Standard multi-producer single-producer channels are used between the main runner and
36
/// the workers to send job request and receive job result.
37
///
38
/// The parallel runner is responsible to manage the state of the workers, and to display standard
39
/// output and standard error, in the main thread. Each worker reports its progression to the
40
/// parallel runner, which updates the workers states and displays a progress bar.
41
/// Inside each worker, logs (messages on standard error) and HTTP response (output on
42
/// standard output) are buffered and send to the runner to be eventually displayed.
43
///
44
/// By design, the workers state is read and modified on the main thread.
45
pub struct ParallelRunner {
46
    /// The list of workers, running Hurl file in their inner thread.
47
    workers: Vec<(Worker, WorkerState)>,
48
    /// The transmit end of the channel used to send messages to workers.
49
    tx: Option<Sender<Job>>,
50
    /// The receiving end of the channel used to communicate to workers.
51
    rx: Receiver<WorkerMessage>,
52
    /// Progress reporter to display the advancement of the parallel runs.
53
    progress: ParProgress,
54
    /// Output type for each completed job on standard output.
55
    output_type: OutputType,
56
    /// Repeat mode for the runner: infinite or finite.
57
    repeat: Count,
58
}
59

            
60
/// Represents a worker's state.
61
#[allow(clippy::large_enum_variant)]
62
pub enum WorkerState {
63
    /// Worker has no job to run.
64
    Idle,
65
    /// Worker is currently running a `job`, the entry being executed is at 0-based index
66
    /// `entry_index`, the total number of entries being `entry_count`.
67
    Running {
68
        job: Job,
69
        entry_index: usize,
70
        entry_count: usize,
71
    },
72
}
73

            
74
#[derive(Clone, Debug, PartialEq, Eq)]
75
pub enum OutputType {
76
    /// The last HTTP response body of a Hurl file is outputted on standard output.
77
    ResponseBody { include_headers: bool, color: bool },
78
    /// The whole Hurl file run is exported in a structured JSON export on standard output.
79
    Json,
80
    /// Nothing is outputted on standard output when a Hurl file run is completed.
81
    NoOutput,
82
}
83

            
84
const MAX_RUNNING_DISPLAYED: usize = 8;
85

            
86
impl ParallelRunner {
87
    /// Creates a new parallel runner, with `worker_count` worker thread.
88
    ///
89
    /// The runner runs a list of [`Job`] in parallel. It creates two channels to communicate
90
    /// with the workers:
91
    ///
92
    /// - `runner -> worker`: is used to send [`Job`] processing request to a worker,
93
    /// - `worker -> runner`: is used to send [`WorkerMessage`] to update job progression, from a
94
    ///    worker to the runner.
95
    ///
96
    /// When a job is completed, depending on `output_type`, it can be outputted to standard output:
97
    /// whether as a raw response body bytes, or in a structured JSON output.
98
    ///
99
    /// The runner can repeat running a list of jobs. For instance, when repeating two times the job
100
    /// sequence (`a`, `b`, `c`), runner will act as if it runs (`a`, `b`, `c`, `a`, `b`, `c`).
101
    ///
102
    /// If `test` mode is `true` the runner is run in "test" mode, reporting the success or failure
103
    /// of each file on standard error. In addition to the test mode, a `progress_bar` designed for
104
    /// parallel run progression can be used. When the progress bar is displayed, it's wrapped with
105
    /// new lines at width `max_width`.
106
    ///
107
    /// `color` determines if color if used in standard error.
108
75
    pub fn new(
109
75
        workers_count: usize,
110
75
        output_type: OutputType,
111
75
        repeat: Count,
112
75
        test: bool,
113
75
        progress_bar: bool,
114
75
        color: bool,
115
75
        max_width: Option<usize>,
116
75
    ) -> Self {
117
75
        // Worker are running on theirs own thread, while parallel runner is running in the main
118
75
        // thread.
119
75
        // We create the channel to communicate from workers to the parallel runner.
120
75
        let (tx_in, rx_in) = mpsc::channel();
121
75
        // We create the channel to communicate from the parallel runner to the workers.
122
75
        let (tx_out, rx_out) = mpsc::channel();
123
75
        let rx_out = Arc::new(Mutex::new(rx_out));
124
75

            
125
75
        // Create the workers:
126
75
        let workers = (0..workers_count)
127
214
            .map(|i| {
128
189
                let worker = Worker::new(WorkerId::from(i), &tx_in, &rx_out);
129
189
                let state = WorkerState::Idle;
130
189
                (worker, state)
131
214
            })
132
75
            .collect::<Vec<_>>();
133
75

            
134
75
        let mode = Mode::new(test, progress_bar);
135
75
        let progress = ParProgress::new(MAX_RUNNING_DISPLAYED, mode, color, max_width);
136
75

            
137
75
        ParallelRunner {
138
75
            workers,
139
75
            tx: Some(tx_out),
140
75
            rx: rx_in,
141
75
            progress,
142
75
            output_type,
143
75
            repeat,
144
        }
145
    }
146

            
147
    /// Runs a list of [`Job`] in parallel and returns the results.
148
    ///
149
    /// Results are returned ordered by the sequence number, and not their execution order. So, the
150
    /// order of the `jobs` is the same as the order of the `jobs` results, independently of the
151
    /// worker's count.
152
75
    pub fn run(&mut self, jobs: &[Job]) -> Result<Vec<JobResult>, JobError> {
153
75
        // The parallel runner runs on the main thread. It's responsible for displaying standard
154
75
        // output and standard error. Workers are buffering their output and error in memory, and
155
75
        // delegate the display to the runners.
156
75
        let mut stdout = Stdout::new(WriteMode::Immediate);
157
75
        let mut stderr = Stderr::new(WriteMode::Immediate);
158
75

            
159
75
        // Create the jobs queue:
160
75
        let mut queue = JobQueue::new(jobs, self.repeat);
161
75
        let jobs_count = queue.jobs_count();
162
75

            
163
75
        // Initiate the runner, fill our workers:
164
214
        self.workers.iter().for_each(|_| {
165
189
            if let Some(job) = queue.next() {
166
189
                _ = self.tx.as_ref().unwrap().send(job);
167
            }
168
214
        });
169
75

            
170
75
        // When dumped HTTP responses, we truncate existing output file on first save, then append
171
75
        // it on subsequent write.
172
75
        let mut append = false;
173
75

            
174
75
        // Start the message pump:
175
75
        let mut results = vec![];
176
3039
        for msg in self.rx.iter() {
177
3039
            match msg {
178
                // If we have any error (either a [`WorkerMessage::IOError`] or a [`WorkerMessage::ParsingError`]
179
                // we don't take any more jobs and exit from the methods in error. This is the same
180
                // behaviour as when we run sequentially a list of Hurl files.
181
3
                WorkerMessage::IOError(msg) => {
182
3
                    self.progress.clear_progress_bar(&mut stderr);
183
3

            
184
3
                    let filename = msg.job.filename;
185
3
                    let error = msg.error;
186
3
                    let message = format!("Issue reading from {filename}: {error}");
187
3
                    return Err(JobError::IO(message));
188
                }
189
3
                WorkerMessage::ParsingError(msg) => {
190
3
                    // Like [`hurl::runner::run`] method, the display of parsing error is done here
191
3
                    // instead of being done in [`hurl::run_par`] method.
192
3
                    self.progress.clear_progress_bar(&mut stderr);
193
3

            
194
3
                    stderr.eprint(msg.stderr.buffer());
195
3
                    return Err(JobError::Parsing);
196
                }
197
                // Everything is OK, we report the progress. As we can receive a lot of running
198
                // messages, we don't want to update the progress bar too often to avoid flickering.
199
1953
                WorkerMessage::Running(msg) => {
200
1953
                    self.workers[msg.worker_id.0].1 = WorkerState::Running {
201
1953
                        job: msg.job,
202
1953
                        entry_index: msg.entry_index,
203
1953
                        entry_count: msg.entry_count,
204
1953
                    };
205
1953

            
206
1953
                    if self.progress.can_update() {
207
1953
                        self.progress.clear_progress_bar(&mut stderr);
208
1953
                        self.progress.update_progress_bar(
209
1953
                            &self.workers,
210
1953
                            results.len(),
211
1953
                            jobs_count,
212
1953
                            &mut stderr,
213
1953
                        );
214
                    }
215
                }
216
                // A new job has been completed, we take a new job if the queue is not empty.
217
                // Contrary to when we receive a running message, we clear the progress bar no
218
                // matter what the frequency is, to get a "correct" and up-to-date display on any
219
                // test completion.
220
1080
                WorkerMessage::Completed(msg) => {
221
1080
                    self.progress.clear_progress_bar(&mut stderr);
222
1080

            
223
1080
                    // The worker is becoming idle.
224
1080
                    self.workers[msg.worker_id.0].1 = WorkerState::Idle;
225
1080

            
226
1080
                    // First, we display the job standard error, then the job standard output
227
1080
                    // (similar to the sequential runner).
228
1080
                    if !msg.stderr.buffer().is_empty() {
229
288
                        stderr.eprint(msg.stderr.buffer());
230
                    }
231
1080
                    if !msg.stdout.buffer().is_empty() {
232
3
                        let ret = stdout.write_all(msg.stdout.buffer());
233
3
                        if ret.is_err() {
234
                            return Err(JobError::IO("Issue writing to stdout".to_string()));
235
                        }
236
                    }
237

            
238
                    // Then, we print job output on standard output (the first response truncates
239
                    // exiting file, subsequent response appends bytes).
240
1080
                    self.print_output(&msg.result, &mut stdout, append)?;
241
1080
                    append = true;
242
1080

            
243
1080
                    // Report the completion of this job and update the progress.
244
1080
                    self.progress.print_completed(&msg.result, &mut stderr);
245
1080

            
246
1080
                    results.push(msg.result);
247
1080

            
248
1080
                    self.progress.update_progress_bar(
249
1080
                        &self.workers,
250
1080
                        results.len(),
251
1080
                        jobs_count,
252
1080
                        &mut stderr,
253
1080
                    );
254
1080
                    // We want to force the next refresh of the progress bar (when we receive a
255
1080
                    // running message) to be sure that the new next jobs will be printed. This
256
1080
                    // is needed because we've a throttle on the progress bar refresh and not every
257
1080
                    // running messages received leads to a progress bar refresh.
258
1080
                    self.progress.force_next_update();
259
1080

            
260
1080
                    // We run the next job to process:
261
1080
                    let job = queue.next();
262
1080
                    match job {
263
912
                        Some(job) => {
264
912
                            _ = self.tx.as_ref().unwrap().send(job);
265
                        }
266
                        None => {
267
                            // If we have received all the job results, we can stop the run.
268
168
                            if let Some(jobs_count) = jobs_count {
269
168
                                if results.len() == jobs_count {
270
69
                                    break;
271
                                }
272
                            }
273
                        }
274
                    }
275
                }
276
            }
277
        }
278

            
279
        // We gracefully shut down workers, by dropping the sender and wait for each thread workers
280
        // to join.
281
69
        drop(self.tx.take());
282
237
        for worker in &mut self.workers {
283
168
            if let Some(thread) = worker.0.take_thread() {
284
168
                thread.join().unwrap();
285
            }
286
        }
287

            
288
        // All jobs have been executed, we sort results by sequence number to get the same order
289
        // as the input jobs list.
290
9581
        results.sort_unstable_by_key(|result| result.job.seq);
291
69
        Ok(results)
292
    }
293

            
294
    /// Prints a job `result` to standard output `stdout`, either as a raw HTTP response (last
295
    /// body of the run), or in a structured JSON way.
296
    /// If `append` is true, any existing file will be appended instead of being truncated.
297
1080
    fn print_output(
298
1080
        &self,
299
1080
        result: &JobResult,
300
1080
        stdout: &mut Stdout,
301
1080
        append: bool,
302
1080
    ) -> Result<(), JobError> {
303
1080
        let job = &result.job;
304
1080
        let content = &result.content;
305
1080
        let hurl_result = &result.hurl_result;
306
1080
        let filename_in = &job.filename;
307
1080
        let filename_out = job.runner_options.output.as_ref();
308
1080

            
309
1080
        match self.output_type {
310
            OutputType::ResponseBody {
311
63
                include_headers,
312
63
                color,
313
63
            } => {
314
63
                if hurl_result.success {
315
63
                    let result = output::write_last_body(
316
63
                        hurl_result,
317
63
                        include_headers,
318
63
                        color,
319
63
                        filename_out,
320
63
                        stdout,
321
63
                        append,
322
63
                    );
323
63
                    if let Err(e) = result {
324
                        return Err(JobError::Runtime(e.to_string(
325
                            &filename_in.to_string(),
326
                            content,
327
                            None,
328
                            OutputFormat::Terminal(color),
329
                        )));
330
                    }
331
                }
332
            }
333
            OutputType::Json => {
334
42
                let result = output::write_json(
335
42
                    hurl_result,
336
42
                    content,
337
42
                    filename_in,
338
42
                    filename_out,
339
42
                    stdout,
340
42
                    append,
341
42
                );
342
42
                if let Err(e) = result {
343
                    return Err(JobError::Runtime(e.to_string()));
344
                }
345
            }
346
975
            OutputType::NoOutput => {}
347
        }
348
1080
        Ok(())
349
    }
350
}