1
/*
2
 * Hurl (https://hurl.dev)
3
 * Copyright (C) 2025 Orange
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *          http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18
use std::sync::mpsc::{Receiver, Sender};
19
use std::sync::{mpsc, Arc, Mutex};
20

            
21
use super::error::JobError;
22
use super::job::{Job, JobQueue, JobResult};
23
use super::message::WorkerMessage;
24
use super::progress::{Mode, ParProgress};
25
use super::worker::{Worker, WorkerId};
26
use crate::output;
27
use crate::pretty::PrettyMode;
28
use crate::util::term::{Stderr, Stdout, WriteMode};
29
use hurl_core::error::{DisplaySourceError, OutputFormat};
30
use hurl_core::types::{Count, Index};
31

            
32
/// A parallel runner manages a list of `Worker`. Each worker is either idle or is running a
33
/// [`Job`]. To run jobs, the [`ParallelRunner::run`] method much be executed on the main thread.
34
/// Each worker has its own thread that it uses to run a Hurl file, and communicates with the main
35
/// thread. Standard multi-producer single-producer channels are used between the main runner and
36
/// the workers to send job request and receive job result.
37
///
38
/// The parallel runner is responsible to manage the state of the workers, and to display standard
39
/// output and standard error, in the main thread. Each worker reports its progression to the
40
/// parallel runner, which updates the workers states and displays a progress bar.
41
/// Inside each worker, logs (messages on standard error) and HTTP response (output on
42
/// standard output) are buffered and send to the runner to be eventually displayed.
43
///
44
/// By design, the workers state is read and modified on the main thread.
45
pub struct ParallelRunner {
46
    /// The list of workers, running Hurl file in their inner thread.
47
    workers: Vec<(Worker, WorkerState)>,
48
    /// The transmit end of the channel used to send messages to workers.
49
    tx: Option<Sender<Job>>,
50
    /// The receiving end of the channel used to communicate to workers.
51
    rx: Receiver<WorkerMessage>,
52
    /// Progress reporter to display the advancement of the parallel runs.
53
    progress: ParProgress,
54
    /// Output type for each completed job on standard output.
55
    output_type: OutputType,
56
    /// Repeat mode for the runner: infinite or finite.
57
    repeat: Count,
58
}
59

            
60
/// Represents a worker's state.
61
#[allow(clippy::large_enum_variant)]
62
pub enum WorkerState {
63
    /// Worker has no job to run.
64
    Idle,
65
    /// Worker is currently running a `job`, the entry index being executed is `current_entry`,
66
    /// the last entry index to be run is `last_entry` and `retry_count` is the number of retries.
67
    Running {
68
        job: Job,
69
        current_entry: Index,
70
        last_entry: Index,
71
        retry_count: usize,
72
    },
73
}
74

            
75
#[derive(Clone, Debug, PartialEq, Eq)]
76
pub enum OutputType {
77
    /// The last HTTP response body of a Hurl file is outputted on standard output.
78
    ResponseBody {
79
        include_headers: bool,
80
        color: bool,
81
        pretty: PrettyMode,
82
    },
83
    /// The whole Hurl file run is exported in a structured JSON export on standard output.
84
    Json,
85
    /// Nothing is outputted on standard output when a Hurl file run is completed.
86
    NoOutput,
87
}
88

            
89
const MAX_RUNNING_DISPLAYED: usize = 8;
90

            
91
impl ParallelRunner {
92
    /// Creates a new parallel runner, with `worker_count` worker thread.
93
    ///
94
    /// The runner runs a list of [`Job`] in parallel. It creates two channels to communicate
95
    /// with the workers:
96
    ///
97
    /// - `runner -> worker`: is used to send [`Job`] processing request to a worker,
98
    /// - `worker -> runner`: is used to send [`WorkerMessage`] to update job progression, from a
99
    ///   worker to the runner.
100
    ///
101
    /// When a job is completed, depending on `output_type`, it can be outputted to standard output:
102
    /// whether as a raw response body bytes, or in a structured JSON output.
103
    ///
104
    /// The runner can repeat running a list of jobs. For instance, when repeating two times the job
105
    /// sequence (`a`, `b`, `c`), runner will act as if it runs (`a`, `b`, `c`, `a`, `b`, `c`).
106
    ///
107
    /// If `test` mode is `true` the runner is run in "test" mode, reporting the success or failure
108
    /// of each file on standard error. In addition to the test mode, a `progress_bar` designed for
109
    /// parallel run progression can be used. When the progress bar is displayed, it's wrapped with
110
    /// new lines at width `max_width`.
111
    ///
112
    /// `color` determines if color if used in standard error.
113
84
    pub fn new(
114
84
        workers_count: usize,
115
84
        output_type: OutputType,
116
84
        repeat: Count,
117
84
        test: bool,
118
84
        progress_bar: bool,
119
84
        color: bool,
120
84
        max_width: Option<usize>,
121
84
    ) -> Self {
122
        // Worker are running on theirs own thread, while parallel runner is running in the main
123
        // thread.
124
        // We create the channel to communicate from workers to the parallel runner.
125
84
        let (tx_in, rx_in) = mpsc::channel();
126
        // We create the channel to communicate from the parallel runner to the workers.
127
84
        let (tx_out, rx_out) = mpsc::channel();
128
84
        let rx_out = Arc::new(Mutex::new(rx_out));
129

            
130
        // Create the workers:
131
84
        let workers = (0..workers_count)
132
226
            .map(|i| {
133
198
                let worker = Worker::new(WorkerId::from(i), &tx_in, &rx_out);
134
198
                let state = WorkerState::Idle;
135
198
                (worker, state)
136
198
            })
137
84
            .collect::<Vec<_>>();
138

            
139
84
        let mode = Mode::new(test, progress_bar);
140
84
        let progress = ParProgress::new(MAX_RUNNING_DISPLAYED, mode, color, max_width);
141

            
142
84
        ParallelRunner {
143
84
            workers,
144
84
            tx: Some(tx_out),
145
84
            rx: rx_in,
146
84
            progress,
147
84
            output_type,
148
84
            repeat,
149
        }
150
    }
151

            
152
    /// Runs a list of [`Job`] in parallel and returns the results.
153
    ///
154
    /// Results are returned ordered by the sequence number, and not their execution order. So, the
155
    /// order of the `jobs` is the same as the order of the `jobs` results, independently of the
156
    /// worker's count.
157
84
    pub fn run(&mut self, jobs: &[Job]) -> Result<Vec<JobResult>, JobError> {
158
        // The parallel runner runs on the main thread. It's responsible for displaying standard
159
        // output and standard error. Workers are buffering their output and error in memory, and
160
        // delegate the display to the runners.
161
84
        let mut stdout = Stdout::new(WriteMode::Immediate);
162
84
        let mut stderr = Stderr::new(WriteMode::Immediate);
163

            
164
        // Create the jobs queue:
165
84
        let mut queue = JobQueue::new(jobs, self.repeat);
166
84
        let jobs_count = queue.jobs_count();
167

            
168
        // Initiate the runner, fill our workers:
169
226
        self.workers.iter().for_each(|_| {
170
198
            if let Some(job) = queue.next() {
171
198
                _ = self.tx.as_ref().unwrap().send(job);
172
            }
173
198
        });
174

            
175
        // When dumped HTTP responses, we truncate existing output file on first save, then append
176
        // it on subsequent write.
177
84
        let mut append = false;
178

            
179
        // Start the message pump:
180
84
        let mut results = vec![];
181
3267
        for msg in self.rx.iter() {
182
3267
            match msg {
183
                // If we have any error (either a [`WorkerMessage::IOError`] or a [`WorkerMessage::ParsingError`]
184
                // we don't take any more jobs and exit from the methods in error. This is the same
185
                // behaviour as when we run sequentially a list of Hurl files.
186
3
                WorkerMessage::InputReadError(msg) => {
187
3
                    self.progress.clear_progress_bar(&mut stderr);
188

            
189
3
                    let filename = msg.job.filename;
190
3
                    let error = msg.error;
191
3
                    let message = format!("Issue reading from {filename}: {error}");
192
3
                    return Err(JobError::InputRead(message));
193
                }
194
3
                WorkerMessage::ParsingError(msg) => {
195
                    // Like [`hurl::runner::run`] method, the display of parsing error is done here
196
                    // instead of being done in [`hurl::run_par`] method.
197
3
                    self.progress.clear_progress_bar(&mut stderr);
198

            
199
3
                    stderr.eprint(msg.stderr.buffer());
200
3
                    return Err(JobError::Parsing);
201
                }
202
                // Everything is OK, we report the progress. As we can receive a lot of running
203
                // messages, we don't want to update the progress bar too often to avoid flickering.
204
2166
                WorkerMessage::Running(msg) => {
205
2166
                    self.workers[msg.worker_id.0].1 = WorkerState::Running {
206
2166
                        job: msg.job,
207
2166
                        current_entry: msg.current_entry,
208
2166
                        last_entry: msg.last_entry,
209
2166
                        retry_count: msg.retry_count,
210
2166
                    };
211

            
212
2166
                    if self.progress.can_update() {
213
2166
                        self.progress.clear_progress_bar(&mut stderr);
214
2166
                        self.progress.update_progress_bar(
215
2166
                            &self.workers,
216
2166
                            results.len(),
217
2166
                            jobs_count,
218
2166
                            &mut stderr,
219
2166
                        );
220
                    }
221
                }
222
                // A new job has been completed, we take a new job if the queue is not empty.
223
                // Contrary to when we receive a running message, we clear the progress bar no
224
                // matter what the frequency is, to get a "correct" and up-to-date display on any
225
                // test completion.
226
1095
                WorkerMessage::Completed(msg) => {
227
1095
                    self.progress.clear_progress_bar(&mut stderr);
228

            
229
                    // The worker is becoming idle.
230
1095
                    self.workers[msg.worker_id.0].1 = WorkerState::Idle;
231

            
232
                    // First, we display the job standard error, then the job standard output
233
                    // (similar to the sequential runner).
234
1095
                    if !msg.stderr.buffer().is_empty() {
235
297
                        stderr.eprint(msg.stderr.buffer());
236
                    }
237
1095
                    if !msg.stdout.buffer().is_empty() {
238
3
                        let ret = stdout.write_all(msg.stdout.buffer());
239
3
                        if ret.is_err() {
240
                            return Err(JobError::OutputWrite(
241
                                "Issue writing to stdout".to_string(),
242
                            ));
243
                        }
244
                    }
245

            
246
                    // Then, we print job output on standard output (the first response truncates
247
                    // exiting file, subsequent response appends bytes).
248
1095
                    self.print_output(&msg.result, &mut stdout, append)?;
249
1095
                    append = true;
250

            
251
                    // Report the completion of this job and update the progress.
252
1095
                    self.progress.print_completed(&msg.result, &mut stderr);
253

            
254
1095
                    results.push(msg.result);
255

            
256
1095
                    self.progress.update_progress_bar(
257
1095
                        &self.workers,
258
1095
                        results.len(),
259
1095
                        jobs_count,
260
1095
                        &mut stderr,
261
                    );
262
                    // We want to force the next refresh of the progress bar (when we receive a
263
                    // running message) to be sure that the new next jobs will be printed. This
264
                    // is needed because we've a throttle on the progress bar refresh and not every
265
                    // running messages received leads to a progress bar refresh.
266
1095
                    self.progress.force_next_update();
267

            
268
                    // We run the next job to process:
269
1095
                    let job = queue.next();
270
1095
                    match job {
271
918
                        Some(job) => {
272
918
                            _ = self.tx.as_ref().unwrap().send(job);
273
                        }
274
                        None => {
275
                            // If we have received all the job results, we can stop the run.
276
177
                            if let Count::Finite(jobs_count) = jobs_count {
277
177
                                if results.len() == jobs_count {
278
78
                                    break;
279
                                }
280
                            }
281
                        }
282
                    }
283
                }
284
            }
285
        }
286

            
287
        // We gracefully shut down workers, by dropping the sender and wait for each thread workers
288
        // to join.
289
78
        drop(self.tx.take());
290
255
        for worker in &mut self.workers {
291
177
            if let Some(thread) = worker.0.take_thread() {
292
177
                thread.join().unwrap();
293
            }
294
        }
295

            
296
        // All jobs have been executed, we sort results by sequence number to get the same order
297
        // as the input jobs list.
298
78
        results.sort_unstable_by_key(|result| result.job.seq);
299
78
        Ok(results)
300
    }
301

            
302
    /// Prints a job `result` to standard output `stdout`, either as a raw HTTP response (last
303
    /// body of the run), or in a structured JSON way.
304
    /// If `append` is true, any existing file will be appended instead of being truncated.
305
1095
    fn print_output(
306
1095
        &self,
307
1095
        result: &JobResult,
308
1095
        stdout: &mut Stdout,
309
1095
        append: bool,
310
1095
    ) -> Result<(), JobError> {
311
1095
        let job = &result.job;
312
1095
        let content = &result.content;
313
1095
        let hurl_result = &result.hurl_result;
314
1095
        let filename_in = &job.filename;
315
1095
        let filename_out = job.runner_options.output.as_ref();
316

            
317
1095
        match self.output_type {
318
            OutputType::ResponseBody {
319
63
                include_headers,
320
63
                color,
321
63
                pretty,
322
            } => {
323
63
                if hurl_result.success {
324
63
                    let result = output::write_last_body(
325
63
                        hurl_result,
326
63
                        include_headers,
327
63
                        color,
328
63
                        pretty,
329
63
                        filename_out,
330
63
                        stdout,
331
63
                        append,
332
                    );
333
63
                    if let Err(e) = result {
334
                        let message = e.render(
335
                            &filename_in.to_string(),
336
                            content,
337
                            None,
338
                            OutputFormat::Terminal(color),
339
                        );
340
                        return Err(JobError::OutputWrite(message));
341
                    }
342
                }
343
            }
344
            OutputType::Json => {
345
42
                let result = output::write_json(
346
42
                    hurl_result,
347
42
                    content,
348
42
                    filename_in,
349
42
                    filename_out,
350
42
                    stdout,
351
42
                    append,
352
                );
353
42
                if let Err(error) = result {
354
                    return Err(JobError::OutputWrite(error.to_string()));
355
                }
356
            }
357
990
            OutputType::NoOutput => {}
358
        }
359
1095
        Ok(())
360
    }
361
}