1
/*
2
 * Hurl (https://hurl.dev)
3
 * Copyright (C) 2025 Orange
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *          http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18
use std::sync::mpsc::{Receiver, Sender};
19
use std::sync::{mpsc, Arc, Mutex};
20

            
21
use hurl_core::error::{DisplaySourceError, OutputFormat};
22
use hurl_core::typing::Count;
23

            
24
use crate::output;
25
use crate::parallel::error::JobError;
26
use crate::parallel::job::{Job, JobQueue, JobResult};
27
use crate::parallel::message::WorkerMessage;
28
use crate::parallel::progress::{Mode, ParProgress};
29
use crate::parallel::worker::{Worker, WorkerId};
30
use crate::util::term::{Stderr, Stdout, WriteMode};
31

            
32
/// A parallel runner manages a list of `Worker`. Each worker is either idle or is running a
33
/// [`Job`]. To run jobs, the [`ParallelRunner::run`] method much be executed on the main thread.
34
/// Each worker has its own thread that it uses to run a Hurl file, and communicates with the main
35
/// thread. Standard multi-producer single-producer channels are used between the main runner and
36
/// the workers to send job request and receive job result.
37
///
38
/// The parallel runner is responsible to manage the state of the workers, and to display standard
39
/// output and standard error, in the main thread. Each worker reports its progression to the
40
/// parallel runner, which updates the workers states and displays a progress bar.
41
/// Inside each worker, logs (messages on standard error) and HTTP response (output on
42
/// standard output) are buffered and send to the runner to be eventually displayed.
43
///
44
/// By design, the workers state is read and modified on the main thread.
45
pub struct ParallelRunner {
46
    /// The list of workers, running Hurl file in their inner thread.
47
    workers: Vec<(Worker, WorkerState)>,
48
    /// The transmit end of the channel used to send messages to workers.
49
    tx: Option<Sender<Job>>,
50
    /// The receiving end of the channel used to communicate to workers.
51
    rx: Receiver<WorkerMessage>,
52
    /// Progress reporter to display the advancement of the parallel runs.
53
    progress: ParProgress,
54
    /// Output type for each completed job on standard output.
55
    output_type: OutputType,
56
    /// Repeat mode for the runner: infinite or finite.
57
    repeat: Count,
58
}
59

            
60
/// Represents a worker's state.
61
#[allow(clippy::large_enum_variant)]
62
pub enum WorkerState {
63
    /// Worker has no job to run.
64
    Idle,
65
    /// Worker is currently running a `job`, the entry being executed is at 0-based index
66
    /// `entry_index`, the total number of entries being `entry_count`.
67
    Running {
68
        job: Job,
69
        entry_index: usize,
70
        entry_count: usize,
71
    },
72
}
73

            
74
#[derive(Clone, Debug, PartialEq, Eq)]
75
pub enum OutputType {
76
    /// The last HTTP response body of a Hurl file is outputted on standard output.
77
    ResponseBody { include_headers: bool, color: bool },
78
    /// The whole Hurl file run is exported in a structured JSON export on standard output.
79
    Json,
80
    /// Nothing is outputted on standard output when a Hurl file run is completed.
81
    NoOutput,
82
}
83

            
84
const MAX_RUNNING_DISPLAYED: usize = 8;
85

            
86
impl ParallelRunner {
87
    /// Creates a new parallel runner, with `worker_count` worker thread.
88
    ///
89
    /// The runner runs a list of [`Job`] in parallel. It creates two channels to communicate
90
    /// with the workers:
91
    ///
92
    /// - `runner -> worker`: is used to send [`Job`] processing request to a worker,
93
    /// - `worker -> runner`: is used to send [`WorkerMessage`] to update job progression, from a
94
    ///   worker to the runner.
95
    ///
96
    /// When a job is completed, depending on `output_type`, it can be outputted to standard output:
97
    /// whether as a raw response body bytes, or in a structured JSON output.
98
    ///
99
    /// The runner can repeat running a list of jobs. For instance, when repeating two times the job
100
    /// sequence (`a`, `b`, `c`), runner will act as if it runs (`a`, `b`, `c`, `a`, `b`, `c`).
101
    ///
102
    /// If `test` mode is `true` the runner is run in "test" mode, reporting the success or failure
103
    /// of each file on standard error. In addition to the test mode, a `progress_bar` designed for
104
    /// parallel run progression can be used. When the progress bar is displayed, it's wrapped with
105
    /// new lines at width `max_width`.
106
    ///
107
    /// `color` determines if color if used in standard error.
108
78
    pub fn new(
109
78
        workers_count: usize,
110
78
        output_type: OutputType,
111
78
        repeat: Count,
112
78
        test: bool,
113
78
        progress_bar: bool,
114
78
        color: bool,
115
78
        max_width: Option<usize>,
116
78
    ) -> Self {
117
78
        // Worker are running on theirs own thread, while parallel runner is running in the main
118
78
        // thread.
119
78
        // We create the channel to communicate from workers to the parallel runner.
120
78
        let (tx_in, rx_in) = mpsc::channel();
121
78
        // We create the channel to communicate from the parallel runner to the workers.
122
78
        let (tx_out, rx_out) = mpsc::channel();
123
78
        let rx_out = Arc::new(Mutex::new(rx_out));
124
78

            
125
78
        // Create the workers:
126
78
        let workers = (0..workers_count)
127
209
            .map(|i| {
128
183
                let worker = Worker::new(WorkerId::from(i), &tx_in, &rx_out);
129
183
                let state = WorkerState::Idle;
130
183
                (worker, state)
131
209
            })
132
78
            .collect::<Vec<_>>();
133
78

            
134
78
        let mode = Mode::new(test, progress_bar);
135
78
        let progress = ParProgress::new(MAX_RUNNING_DISPLAYED, mode, color, max_width);
136
78

            
137
78
        ParallelRunner {
138
78
            workers,
139
78
            tx: Some(tx_out),
140
78
            rx: rx_in,
141
78
            progress,
142
78
            output_type,
143
78
            repeat,
144
        }
145
    }
146

            
147
    /// Runs a list of [`Job`] in parallel and returns the results.
148
    ///
149
    /// Results are returned ordered by the sequence number, and not their execution order. So, the
150
    /// order of the `jobs` is the same as the order of the `jobs` results, independently of the
151
    /// worker's count.
152
78
    pub fn run(&mut self, jobs: &[Job]) -> Result<Vec<JobResult>, JobError> {
153
78
        // The parallel runner runs on the main thread. It's responsible for displaying standard
154
78
        // output and standard error. Workers are buffering their output and error in memory, and
155
78
        // delegate the display to the runners.
156
78
        let mut stdout = Stdout::new(WriteMode::Immediate);
157
78
        let mut stderr = Stderr::new(WriteMode::Immediate);
158
78

            
159
78
        // Create the jobs queue:
160
78
        let mut queue = JobQueue::new(jobs, self.repeat);
161
78
        let jobs_count = queue.jobs_count();
162
78

            
163
78
        // Initiate the runner, fill our workers:
164
209
        self.workers.iter().for_each(|_| {
165
183
            if let Some(job) = queue.next() {
166
183
                _ = self.tx.as_ref().unwrap().send(job);
167
            }
168
209
        });
169
78

            
170
78
        // When dumped HTTP responses, we truncate existing output file on first save, then append
171
78
        // it on subsequent write.
172
78
        let mut append = false;
173
78

            
174
78
        // Start the message pump:
175
78
        let mut results = vec![];
176
3084
        for msg in self.rx.iter() {
177
3084
            match msg {
178
                // If we have any error (either a [`WorkerMessage::IOError`] or a [`WorkerMessage::ParsingError`]
179
                // we don't take any more jobs and exit from the methods in error. This is the same
180
                // behaviour as when we run sequentially a list of Hurl files.
181
3
                WorkerMessage::InputReadError(msg) => {
182
3
                    self.progress.clear_progress_bar(&mut stderr);
183
3

            
184
3
                    let filename = msg.job.filename;
185
3
                    let error = msg.error;
186
3
                    let message = format!("Issue reading from {filename}: {error}");
187
3
                    return Err(JobError::InputRead(message));
188
                }
189
3
                WorkerMessage::ParsingError(msg) => {
190
3
                    // Like [`hurl::runner::run`] method, the display of parsing error is done here
191
3
                    // instead of being done in [`hurl::run_par`] method.
192
3
                    self.progress.clear_progress_bar(&mut stderr);
193
3

            
194
3
                    stderr.eprint(msg.stderr.buffer());
195
3
                    return Err(JobError::Parsing);
196
                }
197
                // Everything is OK, we report the progress. As we can receive a lot of running
198
                // messages, we don't want to update the progress bar too often to avoid flickering.
199
2007
                WorkerMessage::Running(msg) => {
200
2007
                    self.workers[msg.worker_id.0].1 = WorkerState::Running {
201
2007
                        job: msg.job,
202
2007
                        entry_index: msg.entry_index,
203
2007
                        entry_count: msg.entry_count,
204
2007
                    };
205
2007

            
206
2007
                    if self.progress.can_update() {
207
2007
                        self.progress.clear_progress_bar(&mut stderr);
208
2007
                        self.progress.update_progress_bar(
209
2007
                            &self.workers,
210
2007
                            results.len(),
211
2007
                            jobs_count,
212
2007
                            &mut stderr,
213
2007
                        );
214
                    }
215
                }
216
                // A new job has been completed, we take a new job if the queue is not empty.
217
                // Contrary to when we receive a running message, we clear the progress bar no
218
                // matter what the frequency is, to get a "correct" and up-to-date display on any
219
                // test completion.
220
1071
                WorkerMessage::Completed(msg) => {
221
1071
                    self.progress.clear_progress_bar(&mut stderr);
222
1071

            
223
1071
                    // The worker is becoming idle.
224
1071
                    self.workers[msg.worker_id.0].1 = WorkerState::Idle;
225
1071

            
226
1071
                    // First, we display the job standard error, then the job standard output
227
1071
                    // (similar to the sequential runner).
228
1071
                    if !msg.stderr.buffer().is_empty() {
229
285
                        stderr.eprint(msg.stderr.buffer());
230
                    }
231
1071
                    if !msg.stdout.buffer().is_empty() {
232
3
                        let ret = stdout.write_all(msg.stdout.buffer());
233
3
                        if ret.is_err() {
234
                            return Err(JobError::OutputWrite(
235
                                "Issue writing to stdout".to_string(),
236
                            ));
237
                        }
238
                    }
239

            
240
                    // Then, we print job output on standard output (the first response truncates
241
                    // exiting file, subsequent response appends bytes).
242
1071
                    self.print_output(&msg.result, &mut stdout, append)?;
243
1071
                    append = true;
244
1071

            
245
1071
                    // Report the completion of this job and update the progress.
246
1071
                    self.progress.print_completed(&msg.result, &mut stderr);
247
1071

            
248
1071
                    results.push(msg.result);
249
1071

            
250
1071
                    self.progress.update_progress_bar(
251
1071
                        &self.workers,
252
1071
                        results.len(),
253
1071
                        jobs_count,
254
1071
                        &mut stderr,
255
1071
                    );
256
1071
                    // We want to force the next refresh of the progress bar (when we receive a
257
1071
                    // running message) to be sure that the new next jobs will be printed. This
258
1071
                    // is needed because we've a throttle on the progress bar refresh and not every
259
1071
                    // running messages received leads to a progress bar refresh.
260
1071
                    self.progress.force_next_update();
261
1071

            
262
1071
                    // We run the next job to process:
263
1071
                    let job = queue.next();
264
1071
                    match job {
265
909
                        Some(job) => {
266
909
                            _ = self.tx.as_ref().unwrap().send(job);
267
                        }
268
                        None => {
269
                            // If we have received all the job results, we can stop the run.
270
162
                            if let Some(jobs_count) = jobs_count {
271
162
                                if results.len() == jobs_count {
272
72
                                    break;
273
                                }
274
                            }
275
                        }
276
                    }
277
                }
278
            }
279
        }
280

            
281
        // We gracefully shut down workers, by dropping the sender and wait for each thread workers
282
        // to join.
283
72
        drop(self.tx.take());
284
234
        for worker in &mut self.workers {
285
162
            if let Some(thread) = worker.0.take_thread() {
286
162
                thread.join().unwrap();
287
            }
288
        }
289

            
290
        // All jobs have been executed, we sort results by sequence number to get the same order
291
        // as the input jobs list.
292
9564
        results.sort_unstable_by_key(|result| result.job.seq);
293
72
        Ok(results)
294
    }
295

            
296
    /// Prints a job `result` to standard output `stdout`, either as a raw HTTP response (last
297
    /// body of the run), or in a structured JSON way.
298
    /// If `append` is true, any existing file will be appended instead of being truncated.
299
1071
    fn print_output(
300
1071
        &self,
301
1071
        result: &JobResult,
302
1071
        stdout: &mut Stdout,
303
1071
        append: bool,
304
1071
    ) -> Result<(), JobError> {
305
1071
        let job = &result.job;
306
1071
        let content = &result.content;
307
1071
        let hurl_result = &result.hurl_result;
308
1071
        let filename_in = &job.filename;
309
1071
        let filename_out = job.runner_options.output.as_ref();
310
1071

            
311
1071
        match self.output_type {
312
            OutputType::ResponseBody {
313
63
                include_headers,
314
63
                color,
315
63
            } => {
316
63
                if hurl_result.success {
317
63
                    let result = output::write_last_body(
318
63
                        hurl_result,
319
63
                        include_headers,
320
63
                        color,
321
63
                        filename_out,
322
63
                        stdout,
323
63
                        append,
324
63
                    );
325
63
                    if let Err(e) = result {
326
                        let message = e.to_string(
327
                            &filename_in.to_string(),
328
                            content,
329
                            None,
330
                            OutputFormat::Terminal(color),
331
                        );
332
                        return Err(JobError::OutputWrite(message));
333
                    }
334
                }
335
            }
336
            OutputType::Json => {
337
42
                let result = output::write_json(
338
42
                    hurl_result,
339
42
                    content,
340
42
                    filename_in,
341
42
                    filename_out,
342
42
                    stdout,
343
42
                    append,
344
42
                );
345
42
                if let Err(eroor) = result {
346
                    return Err(JobError::OutputWrite(eroor.to_string()));
347
                }
348
            }
349
966
            OutputType::NoOutput => {}
350
        }
351
1071
        Ok(())
352
    }
353
}