1
/*
2
 * Hurl (https://hurl.dev)
3
 * Copyright (C) 2025 Orange
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *          http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18
use std::sync::mpsc::{Receiver, Sender};
19
use std::sync::{mpsc, Arc, Mutex};
20

            
21
use hurl_core::error::{DisplaySourceError, OutputFormat};
22
use hurl_core::typing::Count;
23

            
24
use crate::output;
25
use crate::parallel::error::JobError;
26
use crate::parallel::job::{Job, JobQueue, JobResult};
27
use crate::parallel::message::WorkerMessage;
28
use crate::parallel::progress::{Mode, ParProgress};
29
use crate::parallel::worker::{Worker, WorkerId};
30
use crate::util::term::{Stderr, Stdout, WriteMode};
31

            
32
/// A parallel runner manages a list of `Worker`. Each worker is either idle or is running a
33
/// [`Job`]. To run jobs, the [`ParallelRunner::run`] method much be executed on the main thread.
34
/// Each worker has its own thread that it uses to run a Hurl file, and communicates with the main
35
/// thread. Standard multi-producer single-producer channels are used between the main runner and
36
/// the workers to send job request and receive job result.
37
///
38
/// The parallel runner is responsible to manage the state of the workers, and to display standard
39
/// output and standard error, in the main thread. Each worker reports its progression to the
40
/// parallel runner, which updates the workers states and displays a progress bar.
41
/// Inside each worker, logs (messages on standard error) and HTTP response (output on
42
/// standard output) are buffered and send to the runner to be eventually displayed.
43
///
44
/// By design, the workers state is read and modified on the main thread.
45
pub struct ParallelRunner {
46
    /// The list of workers, running Hurl file in their inner thread.
47
    workers: Vec<(Worker, WorkerState)>,
48
    /// The transmit end of the channel used to send messages to workers.
49
    tx: Option<Sender<Job>>,
50
    /// The receiving end of the channel used to communicate to workers.
51
    rx: Receiver<WorkerMessage>,
52
    /// Progress reporter to display the advancement of the parallel runs.
53
    progress: ParProgress,
54
    /// Output type for each completed job on standard output.
55
    output_type: OutputType,
56
    /// Repeat mode for the runner: infinite or finite.
57
    repeat: Count,
58
}
59

            
60
/// Represents a worker's state.
61
#[allow(clippy::large_enum_variant)]
62
pub enum WorkerState {
63
    /// Worker has no job to run.
64
    Idle,
65
    /// Worker is currently running a `job`, the entry being executed is at 0-based index
66
    /// `entry_index`, the total number of entries being `entry_count`.
67
    Running {
68
        job: Job,
69
        entry_index: usize,
70
        entry_count: usize,
71
    },
72
}
73

            
74
#[derive(Clone, Debug, PartialEq, Eq)]
75
pub enum OutputType {
76
    /// The last HTTP response body of a Hurl file is outputted on standard output.
77
    ResponseBody { include_headers: bool, color: bool },
78
    /// The whole Hurl file run is exported in a structured JSON export on standard output.
79
    Json,
80
    /// Nothing is outputted on standard output when a Hurl file run is completed.
81
    NoOutput,
82
}
83

            
84
const MAX_RUNNING_DISPLAYED: usize = 8;
85

            
86
impl ParallelRunner {
87
    /// Creates a new parallel runner, with `worker_count` worker thread.
88
    ///
89
    /// The runner runs a list of [`Job`] in parallel. It creates two channels to communicate
90
    /// with the workers:
91
    ///
92
    /// - `runner -> worker`: is used to send [`Job`] processing request to a worker,
93
    /// - `worker -> runner`: is used to send [`WorkerMessage`] to update job progression, from a
94
    ///   worker to the runner.
95
    ///
96
    /// When a job is completed, depending on `output_type`, it can be outputted to standard output:
97
    /// whether as a raw response body bytes, or in a structured JSON output.
98
    ///
99
    /// The runner can repeat running a list of jobs. For instance, when repeating two times the job
100
    /// sequence (`a`, `b`, `c`), runner will act as if it runs (`a`, `b`, `c`, `a`, `b`, `c`).
101
    ///
102
    /// If `test` mode is `true` the runner is run in "test" mode, reporting the success or failure
103
    /// of each file on standard error. In addition to the test mode, a `progress_bar` designed for
104
    /// parallel run progression can be used. When the progress bar is displayed, it's wrapped with
105
    /// new lines at width `max_width`.
106
    ///
107
    /// `color` determines if color if used in standard error.
108
81
    pub fn new(
109
81
        workers_count: usize,
110
81
        output_type: OutputType,
111
81
        repeat: Count,
112
81
        test: bool,
113
81
        progress_bar: bool,
114
81
        color: bool,
115
81
        max_width: Option<usize>,
116
81
    ) -> Self {
117
81
        // Worker are running on theirs own thread, while parallel runner is running in the main
118
81
        // thread.
119
81
        // We create the channel to communicate from workers to the parallel runner.
120
81
        let (tx_in, rx_in) = mpsc::channel();
121
81
        // We create the channel to communicate from the parallel runner to the workers.
122
81
        let (tx_out, rx_out) = mpsc::channel();
123
81
        let rx_out = Arc::new(Mutex::new(rx_out));
124
81

            
125
81
        // Create the workers:
126
81
        let workers = (0..workers_count)
127
222
            .map(|i| {
128
195
                let worker = Worker::new(WorkerId::from(i), &tx_in, &rx_out);
129
195
                let state = WorkerState::Idle;
130
195
                (worker, state)
131
222
            })
132
81
            .collect::<Vec<_>>();
133
81

            
134
81
        let mode = Mode::new(test, progress_bar);
135
81
        let progress = ParProgress::new(MAX_RUNNING_DISPLAYED, mode, color, max_width);
136
81

            
137
81
        ParallelRunner {
138
81
            workers,
139
81
            tx: Some(tx_out),
140
81
            rx: rx_in,
141
81
            progress,
142
81
            output_type,
143
81
            repeat,
144
        }
145
    }
146

            
147
    /// Runs a list of [`Job`] in parallel and returns the results.
148
    ///
149
    /// Results are returned ordered by the sequence number, and not their execution order. So, the
150
    /// order of the `jobs` is the same as the order of the `jobs` results, independently of the
151
    /// worker's count.
152
81
    pub fn run(&mut self, jobs: &[Job]) -> Result<Vec<JobResult>, JobError> {
153
81
        // The parallel runner runs on the main thread. It's responsible for displaying standard
154
81
        // output and standard error. Workers are buffering their output and error in memory, and
155
81
        // delegate the display to the runners.
156
81
        let mut stdout = Stdout::new(WriteMode::Immediate);
157
81
        let mut stderr = Stderr::new(WriteMode::Immediate);
158
81

            
159
81
        // Create the jobs queue:
160
81
        let mut queue = JobQueue::new(jobs, self.repeat);
161
81
        let jobs_count = queue.jobs_count();
162
81

            
163
81
        // Initiate the runner, fill our workers:
164
222
        self.workers.iter().for_each(|_| {
165
195
            if let Some(job) = queue.next() {
166
195
                _ = self.tx.as_ref().unwrap().send(job);
167
            }
168
222
        });
169
81

            
170
81
        // When dumped HTTP responses, we truncate existing output file on first save, then append
171
81
        // it on subsequent write.
172
81
        let mut append = false;
173
81

            
174
81
        // Start the message pump:
175
81
        let mut results = vec![];
176
3138
        for msg in self.rx.iter() {
177
3138
            match msg {
178
                // If we have any error (either a [`WorkerMessage::IOError`] or a [`WorkerMessage::ParsingError`]
179
                // we don't take any more jobs and exit from the methods in error. This is the same
180
                // behaviour as when we run sequentially a list of Hurl files.
181
3
                WorkerMessage::InputReadError(msg) => {
182
3
                    self.progress.clear_progress_bar(&mut stderr);
183
3

            
184
3
                    let filename = msg.job.filename;
185
3
                    let error = msg.error;
186
3
                    let message = format!("Issue reading from {filename}: {error}");
187
3
                    return Err(JobError::InputRead(message));
188
                }
189
3
                WorkerMessage::ParsingError(msg) => {
190
3
                    // Like [`hurl::runner::run`] method, the display of parsing error is done here
191
3
                    // instead of being done in [`hurl::run_par`] method.
192
3
                    self.progress.clear_progress_bar(&mut stderr);
193
3

            
194
3
                    stderr.eprint(msg.stderr.buffer());
195
3
                    return Err(JobError::Parsing);
196
                }
197
                // Everything is OK, we report the progress. As we can receive a lot of running
198
                // messages, we don't want to update the progress bar too often to avoid flickering.
199
2046
                WorkerMessage::Running(msg) => {
200
2046
                    self.workers[msg.worker_id.0].1 = WorkerState::Running {
201
2046
                        job: msg.job,
202
2046
                        entry_index: msg.entry_index,
203
2046
                        entry_count: msg.entry_count,
204
2046
                    };
205
2046

            
206
2046
                    if self.progress.can_update() {
207
2046
                        self.progress.clear_progress_bar(&mut stderr);
208
2046
                        self.progress.update_progress_bar(
209
2046
                            &self.workers,
210
2046
                            results.len(),
211
2046
                            jobs_count,
212
2046
                            &mut stderr,
213
2046
                        );
214
                    }
215
                }
216
                // A new job has been completed, we take a new job if the queue is not empty.
217
                // Contrary to when we receive a running message, we clear the progress bar no
218
                // matter what the frequency is, to get a "correct" and up-to-date display on any
219
                // test completion.
220
1086
                WorkerMessage::Completed(msg) => {
221
1086
                    self.progress.clear_progress_bar(&mut stderr);
222
1086

            
223
1086
                    // The worker is becoming idle.
224
1086
                    self.workers[msg.worker_id.0].1 = WorkerState::Idle;
225
1086

            
226
1086
                    // First, we display the job standard error, then the job standard output
227
1086
                    // (similar to the sequential runner).
228
1086
                    if !msg.stderr.buffer().is_empty() {
229
291
                        stderr.eprint(msg.stderr.buffer());
230
                    }
231
1086
                    if !msg.stdout.buffer().is_empty() {
232
3
                        let ret = stdout.write_all(msg.stdout.buffer());
233
3
                        if ret.is_err() {
234
                            return Err(JobError::OutputWrite(
235
                                "Issue writing to stdout".to_string(),
236
                            ));
237
                        }
238
                    }
239

            
240
                    // Then, we print job output on standard output (the first response truncates
241
                    // exiting file, subsequent response appends bytes).
242
1086
                    self.print_output(&msg.result, &mut stdout, append)?;
243
1086
                    append = true;
244
1086

            
245
1086
                    // Report the completion of this job and update the progress.
246
1086
                    self.progress.print_completed(&msg.result, &mut stderr);
247
1086

            
248
1086
                    results.push(msg.result);
249
1086

            
250
1086
                    self.progress.update_progress_bar(
251
1086
                        &self.workers,
252
1086
                        results.len(),
253
1086
                        jobs_count,
254
1086
                        &mut stderr,
255
1086
                    );
256
1086
                    // We want to force the next refresh of the progress bar (when we receive a
257
1086
                    // running message) to be sure that the new next jobs will be printed. This
258
1086
                    // is needed because we've a throttle on the progress bar refresh and not every
259
1086
                    // running messages received leads to a progress bar refresh.
260
1086
                    self.progress.force_next_update();
261
1086

            
262
1086
                    // We run the next job to process:
263
1086
                    let job = queue.next();
264
1086
                    match job {
265
912
                        Some(job) => {
266
912
                            _ = self.tx.as_ref().unwrap().send(job);
267
                        }
268
                        None => {
269
                            // If we have received all the job results, we can stop the run.
270
174
                            if let Some(jobs_count) = jobs_count {
271
174
                                if results.len() == jobs_count {
272
75
                                    break;
273
                                }
274
                            }
275
                        }
276
                    }
277
                }
278
            }
279
        }
280

            
281
        // We gracefully shut down workers, by dropping the sender and wait for each thread workers
282
        // to join.
283
75
        drop(self.tx.take());
284
249
        for worker in &mut self.workers {
285
174
            if let Some(thread) = worker.0.take_thread() {
286
174
                thread.join().unwrap();
287
            }
288
        }
289

            
290
        // All jobs have been executed, we sort results by sequence number to get the same order
291
        // as the input jobs list.
292
9529
        results.sort_unstable_by_key(|result| result.job.seq);
293
75
        Ok(results)
294
    }
295

            
296
    /// Prints a job `result` to standard output `stdout`, either as a raw HTTP response (last
297
    /// body of the run), or in a structured JSON way.
298
    /// If `append` is true, any existing file will be appended instead of being truncated.
299
1086
    fn print_output(
300
1086
        &self,
301
1086
        result: &JobResult,
302
1086
        stdout: &mut Stdout,
303
1086
        append: bool,
304
1086
    ) -> Result<(), JobError> {
305
1086
        let job = &result.job;
306
1086
        let content = &result.content;
307
1086
        let hurl_result = &result.hurl_result;
308
1086
        let filename_in = &job.filename;
309
1086
        let filename_out = job.runner_options.output.as_ref();
310
1086

            
311
1086
        match self.output_type {
312
            OutputType::ResponseBody {
313
63
                include_headers,
314
63
                color,
315
63
            } => {
316
63
                if hurl_result.success {
317
63
                    let result = output::write_last_body(
318
63
                        hurl_result,
319
63
                        include_headers,
320
63
                        color,
321
63
                        filename_out,
322
63
                        stdout,
323
63
                        append,
324
63
                    );
325
63
                    if let Err(e) = result {
326
                        let message = e.to_string(
327
                            &filename_in.to_string(),
328
                            content,
329
                            None,
330
                            OutputFormat::Terminal(color),
331
                        );
332
                        return Err(JobError::OutputWrite(message));
333
                    }
334
                }
335
            }
336
            OutputType::Json => {
337
42
                let result = output::write_json(
338
42
                    hurl_result,
339
42
                    content,
340
42
                    filename_in,
341
42
                    filename_out,
342
42
                    stdout,
343
42
                    append,
344
42
                );
345
42
                if let Err(eroor) = result {
346
                    return Err(JobError::OutputWrite(eroor.to_string()));
347
                }
348
            }
349
981
            OutputType::NoOutput => {}
350
        }
351
1086
        Ok(())
352
    }
353
}