1
/*
2
 * Hurl (https://hurl.dev)
3
 * Copyright (C) 2025 Orange
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *          http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18
use std::sync::mpsc::{Receiver, Sender};
19
use std::sync::{mpsc, Arc, Mutex};
20

            
21
use super::error::JobError;
22
use super::job::{Job, JobQueue, JobResult};
23
use super::message::WorkerMessage;
24
use super::progress::{Mode, ParProgress};
25
use super::worker::{Worker, WorkerId};
26
use crate::output;
27
use crate::pretty::PrettyMode;
28
use crate::util::term::{Stderr, Stdout, WriteMode};
29
use hurl_core::error::{DisplaySourceError, OutputFormat};
30
use hurl_core::typing::Count;
31

            
32
/// A parallel runner manages a list of `Worker`. Each worker is either idle or is running a
33
/// [`Job`]. To run jobs, the [`ParallelRunner::run`] method much be executed on the main thread.
34
/// Each worker has its own thread that it uses to run a Hurl file, and communicates with the main
35
/// thread. Standard multi-producer single-producer channels are used between the main runner and
36
/// the workers to send job request and receive job result.
37
///
38
/// The parallel runner is responsible to manage the state of the workers, and to display standard
39
/// output and standard error, in the main thread. Each worker reports its progression to the
40
/// parallel runner, which updates the workers states and displays a progress bar.
41
/// Inside each worker, logs (messages on standard error) and HTTP response (output on
42
/// standard output) are buffered and send to the runner to be eventually displayed.
43
///
44
/// By design, the workers state is read and modified on the main thread.
45
pub struct ParallelRunner {
46
    /// The list of workers, running Hurl file in their inner thread.
47
    workers: Vec<(Worker, WorkerState)>,
48
    /// The transmit end of the channel used to send messages to workers.
49
    tx: Option<Sender<Job>>,
50
    /// The receiving end of the channel used to communicate to workers.
51
    rx: Receiver<WorkerMessage>,
52
    /// Progress reporter to display the advancement of the parallel runs.
53
    progress: ParProgress,
54
    /// Output type for each completed job on standard output.
55
    output_type: OutputType,
56
    /// Repeat mode for the runner: infinite or finite.
57
    repeat: Count,
58
}
59

            
60
/// Represents a worker's state.
61
#[allow(clippy::large_enum_variant)]
62
pub enum WorkerState {
63
    /// Worker has no job to run.
64
    Idle,
65
    /// Worker is currently running a `job`, the entry being executed is at 0-based index
66
    /// `entry_index`, the total number of entries being `entry_count`.
67
    Running {
68
        job: Job,
69
        entry_index: usize,
70
        entry_count: usize,
71
    },
72
}
73

            
74
#[derive(Clone, Debug, PartialEq, Eq)]
75
pub enum OutputType {
76
    /// The last HTTP response body of a Hurl file is outputted on standard output.
77
    ResponseBody {
78
        include_headers: bool,
79
        color: bool,
80
        pretty: PrettyMode,
81
    },
82
    /// The whole Hurl file run is exported in a structured JSON export on standard output.
83
    Json,
84
    /// Nothing is outputted on standard output when a Hurl file run is completed.
85
    NoOutput,
86
}
87

            
88
const MAX_RUNNING_DISPLAYED: usize = 8;
89

            
90
impl ParallelRunner {
91
    /// Creates a new parallel runner, with `worker_count` worker thread.
92
    ///
93
    /// The runner runs a list of [`Job`] in parallel. It creates two channels to communicate
94
    /// with the workers:
95
    ///
96
    /// - `runner -> worker`: is used to send [`Job`] processing request to a worker,
97
    /// - `worker -> runner`: is used to send [`WorkerMessage`] to update job progression, from a
98
    ///   worker to the runner.
99
    ///
100
    /// When a job is completed, depending on `output_type`, it can be outputted to standard output:
101
    /// whether as a raw response body bytes, or in a structured JSON output.
102
    ///
103
    /// The runner can repeat running a list of jobs. For instance, when repeating two times the job
104
    /// sequence (`a`, `b`, `c`), runner will act as if it runs (`a`, `b`, `c`, `a`, `b`, `c`).
105
    ///
106
    /// If `test` mode is `true` the runner is run in "test" mode, reporting the success or failure
107
    /// of each file on standard error. In addition to the test mode, a `progress_bar` designed for
108
    /// parallel run progression can be used. When the progress bar is displayed, it's wrapped with
109
    /// new lines at width `max_width`.
110
    ///
111
    /// `color` determines if color if used in standard error.
112
84
    pub fn new(
113
84
        workers_count: usize,
114
84
        output_type: OutputType,
115
84
        repeat: Count,
116
84
        test: bool,
117
84
        progress_bar: bool,
118
84
        color: bool,
119
84
        max_width: Option<usize>,
120
84
    ) -> Self {
121
        // Worker are running on theirs own thread, while parallel runner is running in the main
122
        // thread.
123
        // We create the channel to communicate from workers to the parallel runner.
124
84
        let (tx_in, rx_in) = mpsc::channel();
125
        // We create the channel to communicate from the parallel runner to the workers.
126
84
        let (tx_out, rx_out) = mpsc::channel();
127
84
        let rx_out = Arc::new(Mutex::new(rx_out));
128

            
129
        // Create the workers:
130
84
        let workers = (0..workers_count)
131
226
            .map(|i| {
132
198
                let worker = Worker::new(WorkerId::from(i), &tx_in, &rx_out);
133
198
                let state = WorkerState::Idle;
134
198
                (worker, state)
135
198
            })
136
84
            .collect::<Vec<_>>();
137

            
138
84
        let mode = Mode::new(test, progress_bar);
139
84
        let progress = ParProgress::new(MAX_RUNNING_DISPLAYED, mode, color, max_width);
140

            
141
84
        ParallelRunner {
142
84
            workers,
143
84
            tx: Some(tx_out),
144
84
            rx: rx_in,
145
84
            progress,
146
84
            output_type,
147
84
            repeat,
148
        }
149
    }
150

            
151
    /// Runs a list of [`Job`] in parallel and returns the results.
152
    ///
153
    /// Results are returned ordered by the sequence number, and not their execution order. So, the
154
    /// order of the `jobs` is the same as the order of the `jobs` results, independently of the
155
    /// worker's count.
156
84
    pub fn run(&mut self, jobs: &[Job]) -> Result<Vec<JobResult>, JobError> {
157
        // The parallel runner runs on the main thread. It's responsible for displaying standard
158
        // output and standard error. Workers are buffering their output and error in memory, and
159
        // delegate the display to the runners.
160
84
        let mut stdout = Stdout::new(WriteMode::Immediate);
161
84
        let mut stderr = Stderr::new(WriteMode::Immediate);
162

            
163
        // Create the jobs queue:
164
84
        let mut queue = JobQueue::new(jobs, self.repeat);
165
84
        let jobs_count = queue.jobs_count();
166

            
167
        // Initiate the runner, fill our workers:
168
226
        self.workers.iter().for_each(|_| {
169
198
            if let Some(job) = queue.next() {
170
198
                _ = self.tx.as_ref().unwrap().send(job);
171
            }
172
198
        });
173

            
174
        // When dumped HTTP responses, we truncate existing output file on first save, then append
175
        // it on subsequent write.
176
84
        let mut append = false;
177

            
178
        // Start the message pump:
179
84
        let mut results = vec![];
180
3225
        for msg in self.rx.iter() {
181
3225
            match msg {
182
                // If we have any error (either a [`WorkerMessage::IOError`] or a [`WorkerMessage::ParsingError`]
183
                // we don't take any more jobs and exit from the methods in error. This is the same
184
                // behaviour as when we run sequentially a list of Hurl files.
185
3
                WorkerMessage::InputReadError(msg) => {
186
3
                    self.progress.clear_progress_bar(&mut stderr);
187

            
188
3
                    let filename = msg.job.filename;
189
3
                    let error = msg.error;
190
3
                    let message = format!("Issue reading from {filename}: {error}");
191
3
                    return Err(JobError::InputRead(message));
192
                }
193
3
                WorkerMessage::ParsingError(msg) => {
194
                    // Like [`hurl::runner::run`] method, the display of parsing error is done here
195
                    // instead of being done in [`hurl::run_par`] method.
196
3
                    self.progress.clear_progress_bar(&mut stderr);
197

            
198
3
                    stderr.eprint(msg.stderr.buffer());
199
3
                    return Err(JobError::Parsing);
200
                }
201
                // Everything is OK, we report the progress. As we can receive a lot of running
202
                // messages, we don't want to update the progress bar too often to avoid flickering.
203
2124
                WorkerMessage::Running(msg) => {
204
2124
                    self.workers[msg.worker_id.0].1 = WorkerState::Running {
205
2124
                        job: msg.job,
206
2124
                        entry_index: msg.entry_index,
207
2124
                        entry_count: msg.entry_count,
208
2124
                    };
209

            
210
2124
                    if self.progress.can_update() {
211
2124
                        self.progress.clear_progress_bar(&mut stderr);
212
2124
                        self.progress.update_progress_bar(
213
2124
                            &self.workers,
214
2124
                            results.len(),
215
2124
                            jobs_count,
216
2124
                            &mut stderr,
217
2124
                        );
218
                    }
219
                }
220
                // A new job has been completed, we take a new job if the queue is not empty.
221
                // Contrary to when we receive a running message, we clear the progress bar no
222
                // matter what the frequency is, to get a "correct" and up-to-date display on any
223
                // test completion.
224
1095
                WorkerMessage::Completed(msg) => {
225
1095
                    self.progress.clear_progress_bar(&mut stderr);
226

            
227
                    // The worker is becoming idle.
228
1095
                    self.workers[msg.worker_id.0].1 = WorkerState::Idle;
229

            
230
                    // First, we display the job standard error, then the job standard output
231
                    // (similar to the sequential runner).
232
1095
                    if !msg.stderr.buffer().is_empty() {
233
297
                        stderr.eprint(msg.stderr.buffer());
234
                    }
235
1095
                    if !msg.stdout.buffer().is_empty() {
236
3
                        let ret = stdout.write_all(msg.stdout.buffer());
237
3
                        if ret.is_err() {
238
                            return Err(JobError::OutputWrite(
239
                                "Issue writing to stdout".to_string(),
240
                            ));
241
                        }
242
                    }
243

            
244
                    // Then, we print job output on standard output (the first response truncates
245
                    // exiting file, subsequent response appends bytes).
246
1095
                    self.print_output(&msg.result, &mut stdout, append)?;
247
1095
                    append = true;
248

            
249
                    // Report the completion of this job and update the progress.
250
1095
                    self.progress.print_completed(&msg.result, &mut stderr);
251

            
252
1095
                    results.push(msg.result);
253

            
254
1095
                    self.progress.update_progress_bar(
255
1095
                        &self.workers,
256
1095
                        results.len(),
257
1095
                        jobs_count,
258
1095
                        &mut stderr,
259
                    );
260
                    // We want to force the next refresh of the progress bar (when we receive a
261
                    // running message) to be sure that the new next jobs will be printed. This
262
                    // is needed because we've a throttle on the progress bar refresh and not every
263
                    // running messages received leads to a progress bar refresh.
264
1095
                    self.progress.force_next_update();
265

            
266
                    // We run the next job to process:
267
1095
                    let job = queue.next();
268
1095
                    match job {
269
918
                        Some(job) => {
270
918
                            _ = self.tx.as_ref().unwrap().send(job);
271
                        }
272
                        None => {
273
                            // If we have received all the job results, we can stop the run.
274
177
                            if let Some(jobs_count) = jobs_count {
275
177
                                if results.len() == jobs_count {
276
78
                                    break;
277
                                }
278
                            }
279
                        }
280
                    }
281
                }
282
            }
283
        }
284

            
285
        // We gracefully shut down workers, by dropping the sender and wait for each thread workers
286
        // to join.
287
78
        drop(self.tx.take());
288
255
        for worker in &mut self.workers {
289
177
            if let Some(thread) = worker.0.take_thread() {
290
177
                thread.join().unwrap();
291
            }
292
        }
293

            
294
        // All jobs have been executed, we sort results by sequence number to get the same order
295
        // as the input jobs list.
296
78
        results.sort_unstable_by_key(|result| result.job.seq);
297
78
        Ok(results)
298
    }
299

            
300
    /// Prints a job `result` to standard output `stdout`, either as a raw HTTP response (last
301
    /// body of the run), or in a structured JSON way.
302
    /// If `append` is true, any existing file will be appended instead of being truncated.
303
1095
    fn print_output(
304
1095
        &self,
305
1095
        result: &JobResult,
306
1095
        stdout: &mut Stdout,
307
1095
        append: bool,
308
1095
    ) -> Result<(), JobError> {
309
1095
        let job = &result.job;
310
1095
        let content = &result.content;
311
1095
        let hurl_result = &result.hurl_result;
312
1095
        let filename_in = &job.filename;
313
1095
        let filename_out = job.runner_options.output.as_ref();
314

            
315
1095
        match self.output_type {
316
            OutputType::ResponseBody {
317
63
                include_headers,
318
63
                color,
319
63
                pretty,
320
            } => {
321
63
                if hurl_result.success {
322
63
                    let result = output::write_last_body(
323
63
                        hurl_result,
324
63
                        include_headers,
325
63
                        color,
326
63
                        pretty,
327
63
                        filename_out,
328
63
                        stdout,
329
63
                        append,
330
                    );
331
63
                    if let Err(e) = result {
332
                        let message = e.render(
333
                            &filename_in.to_string(),
334
                            content,
335
                            None,
336
                            OutputFormat::Terminal(color),
337
                        );
338
                        return Err(JobError::OutputWrite(message));
339
                    }
340
                }
341
            }
342
            OutputType::Json => {
343
42
                let result = output::write_json(
344
42
                    hurl_result,
345
42
                    content,
346
42
                    filename_in,
347
42
                    filename_out,
348
42
                    stdout,
349
42
                    append,
350
                );
351
42
                if let Err(error) = result {
352
                    return Err(JobError::OutputWrite(error.to_string()));
353
                }
354
            }
355
990
            OutputType::NoOutput => {}
356
        }
357
1095
        Ok(())
358
    }
359
}