1
/*
2
 * Hurl (https://hurl.dev)
3
 * Copyright (C) 2024 Orange
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *          http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18
use std::sync::mpsc::{Receiver, Sender};
19
use std::sync::{mpsc, Arc, Mutex};
20

            
21
use hurl_core::error::{DisplaySourceError, OutputFormat};
22
use hurl_core::typing::Count;
23

            
24
use crate::output;
25
use crate::parallel::error::JobError;
26
use crate::parallel::job::{Job, JobQueue, JobResult};
27
use crate::parallel::message::WorkerMessage;
28
use crate::parallel::progress::{Mode, ParProgress};
29
use crate::parallel::worker::{Worker, WorkerId};
30
use crate::util::term::{Stderr, Stdout, WriteMode};
31

            
32
/// A parallel runner manages a list of `Worker`. Each worker is either idle or is running a
33
/// [`Job`]. To run jobs, the [`ParallelRunner::run`] method much be executed on the main thread.
34
/// Each worker has its own thread that it uses to run a Hurl file, and communicates with the main
35
/// thread. Standard multi-producer single-producer channels are used between the main runner and
36
/// the workers to send job request and receive job result.
37
///
38
/// The parallel runner is responsible to manage the state of the workers, and to display standard
39
/// output and standard error, in the main thread. Each worker reports its progression to the
40
/// parallel runner, which updates the workers states and displays a progress bar.
41
/// Inside each worker, logs (messages on standard error) and HTTP response (output on
42
/// standard output) are buffered and send to the runner to be eventually displayed.
43
///
44
/// By design, the workers state is read and modified on the main thread.
45
pub struct ParallelRunner {
46
    /// The list of workers, running Hurl file in their inner thread.
47
    workers: Vec<(Worker, WorkerState)>,
48
    /// The transmit end of the channel used to send messages to workers.
49
    tx: Sender<Job>,
50
    /// The receiving end of the channel used to communicate to workers.
51
    rx: Receiver<WorkerMessage>,
52
    /// Progress reporter to display the advancement of the parallel runs.
53
    progress: ParProgress,
54
    /// Output type for each completed job on standard output.
55
    output_type: OutputType,
56
    /// Repeat mode for the runner: infinite or finite.
57
    repeat: Count,
58
}
59

            
60
/// Represents a worker's state.
61
pub enum WorkerState {
62
    /// Worker has no job to run.
63
    Idle,
64
    /// Worker is currently running a `job`, the entry being executed is at 0-based index
65
    /// `entry_index`, the total number of entries being `entry_count`.
66
    Running {
67
        job: Job,
68
        entry_index: usize,
69
        entry_count: usize,
70
    },
71
}
72

            
73
#[derive(Clone, Debug, PartialEq, Eq)]
74
pub enum OutputType {
75
    /// The last HTTP response body of a Hurl file is outputted on standard output.
76
    ResponseBody { include_headers: bool, color: bool },
77
    /// The whole Hurl file run is exported in a structured JSON export on standard output.
78
    Json,
79
    /// Nothing is outputted on standard output when a Hurl file run is completed.
80
    NoOutput,
81
}
82

            
83
const MAX_RUNNING_DISPLAYED: usize = 8;
84

            
85
impl ParallelRunner {
86
    /// Creates a new parallel runner, with `worker_count` worker thread.
87
    ///
88
    /// The runner runs a list of [`Job`] in parallel. It creates two channels to communicate
89
    /// with the workers:
90
    ///
91
    /// - `runner -> worker`: is used to send [`Job`] processing request to a worker,
92
    /// - `worker -> runner`: is used to send [`WorkerMessage`] to update job progression, from a
93
    ///    worker to the runner.
94
    ///
95
    /// When a job is completed, depending on `output_type`, it can be outputted to standard output:
96
    /// whether as a raw response body bytes, or in a structured JSON output.
97
    ///
98
    /// The runner can repeat running a list of jobs. For instance, when repeating two times the job
99
    /// sequence (`a`, `b`, `c`), runner will act as if it runs (`a`, `b`, `c`, `a`, `b`, `c`).
100
    ///
101
    /// If `test` mode is `true` the runner is run in "test" mode, reporting the success or failure
102
    /// of each file on standard error. In addition to the test mode, a `progress_bar` designed for
103
    /// parallel run progression can be used. When the progress bar is displayed, it's wrapped with
104
    /// new lines at width `max_width`.
105
    ///
106
    /// `color` determines if color if used in standard error.
107
72
    pub fn new(
108
72
        workers_count: usize,
109
72
        output_type: OutputType,
110
72
        repeat: Count,
111
72
        test: bool,
112
72
        progress_bar: bool,
113
72
        color: bool,
114
72
        max_width: Option<usize>,
115
72
    ) -> Self {
116
72
        // Worker are running on theirs own thread, while parallel runner is running in the main
117
72
        // thread.
118
72
        // We create the channel to communicate from workers to the parallel runner.
119
72
        let (tx_in, rx_in) = mpsc::channel();
120
72
        // We create the channel to communicate from the parallel runner to the workers.
121
72
        let (tx_out, rx_out) = mpsc::channel();
122
72
        let rx_out = Arc::new(Mutex::new(rx_out));
123
72

            
124
72
        // Create the workers:
125
72
        let workers = (0..workers_count)
126
201
            .map(|i| {
127
177
                let worker = Worker::new(WorkerId::from(i), &tx_in, &rx_out);
128
177
                let state = WorkerState::Idle;
129
177
                (worker, state)
130
201
            })
131
72
            .collect::<Vec<_>>();
132
72

            
133
72
        let mode = Mode::new(test, progress_bar);
134
72
        let progress = ParProgress::new(MAX_RUNNING_DISPLAYED, mode, color, max_width);
135
72

            
136
72
        ParallelRunner {
137
72
            workers,
138
72
            tx: tx_out,
139
72
            rx: rx_in,
140
72
            progress,
141
72
            output_type,
142
72
            repeat,
143
        }
144
    }
145

            
146
    /// Runs a list of [`Job`] in parallel and returns the results.
147
    ///
148
    /// Results are returned ordered by the sequence number, and not their execution order. So, the
149
    /// order of the `jobs` is the same as the order of the `jobs` results, independently of the
150
    /// worker's count.
151
72
    pub fn run(&mut self, jobs: &[Job]) -> Result<Vec<JobResult>, JobError> {
152
72
        // The parallel runner runs on the main thread. It's responsible for displaying standard
153
72
        // output and standard error. Workers are buffering their output and error in memory, and
154
72
        // delegate the display to the runners.
155
72
        let mut stdout = Stdout::new(WriteMode::Immediate);
156
72
        let mut stderr = Stderr::new(WriteMode::Immediate);
157
72

            
158
72
        // Create the jobs queue:
159
72
        let mut queue = JobQueue::new(jobs, self.repeat);
160
72
        let jobs_count = queue.jobs_count();
161
72

            
162
72
        // Initiate the runner, fill our workers:
163
201
        self.workers.iter().for_each(|_| {
164
177
            if let Some(job) = queue.next() {
165
177
                _ = self.tx.send(job);
166
            }
167
201
        });
168
72

            
169
72
        // When dumped HTTP responses, we truncate existing output file on first save, then append
170
72
        // it on subsequent write.
171
72
        let mut append = false;
172
72

            
173
72
        // Start the message pump:
174
72
        let mut results = vec![];
175
2403
        for msg in self.rx.iter() {
176
2403
            match msg {
177
                // If we have any error (either a [`WorkerMessage::IOError`] or a [`WorkerMessage::ParsingError`]
178
                // we don't take any more jobs and exit from the methods in error. This is the same
179
                // behaviour as when we run sequentially a list of Hurl files.
180
3
                WorkerMessage::IOError(msg) => {
181
3
                    self.progress.clear_progress_bar(&mut stderr);
182
3

            
183
3
                    let filename = msg.job.filename;
184
3
                    let error = msg.error;
185
3
                    let message = format!("Issue reading from {filename}: {error}");
186
3
                    return Err(JobError::IO(message));
187
                }
188
3
                WorkerMessage::ParsingError(msg) => {
189
3
                    // Like [`hurl::runner::run`] method, the display of parsing error is done here
190
3
                    // instead of being done in [`hurl::run_par`] method.
191
3
                    self.progress.clear_progress_bar(&mut stderr);
192
3

            
193
3
                    stderr.eprint(msg.stderr.buffer());
194
3
                    return Err(JobError::Parsing);
195
                }
196
                // Everything is OK, we report the progress. As we can receive a lot of running
197
                // messages, we don't want to update the progress bar too often to avoid flickering.
198
1617
                WorkerMessage::Running(msg) => {
199
1617
                    self.workers[msg.worker_id.0].1 = WorkerState::Running {
200
1617
                        job: msg.job,
201
1617
                        entry_index: msg.entry_index,
202
1617
                        entry_count: msg.entry_count,
203
1617
                    };
204
1617

            
205
1617
                    if self.progress.can_update() {
206
1617
                        self.progress.clear_progress_bar(&mut stderr);
207
1617
                        self.progress.update_progress_bar(
208
1617
                            &self.workers,
209
1617
                            results.len(),
210
1617
                            jobs_count,
211
1617
                            &mut stderr,
212
1617
                        );
213
                    }
214
                }
215
                // A new job has been completed, we take a new job if the queue is not empty.
216
                // Contrary to when we receive a running message, we clear the progress bar no
217
                // matter what the frequency is, to get a "correct" and up-to-date display on any
218
                // test completion.
219
780
                WorkerMessage::Completed(msg) => {
220
780
                    self.progress.clear_progress_bar(&mut stderr);
221
780

            
222
780
                    // The worker is becoming idle.
223
780
                    self.workers[msg.worker_id.0].1 = WorkerState::Idle;
224
780

            
225
780
                    // First, we display the job standard error, then the job standard output
226
780
                    // (similar to the sequential runner).
227
780
                    if !msg.stderr.buffer().is_empty() {
228
288
                        stderr.eprint(msg.stderr.buffer());
229
                    }
230
780
                    if !msg.stdout.buffer().is_empty() {
231
3
                        let ret = stdout.write_all(msg.stdout.buffer());
232
3
                        if ret.is_err() {
233
                            return Err(JobError::IO("Issue writing to stdout".to_string()));
234
                        }
235
                    }
236

            
237
                    // Then, we print job output on standard output (the first response truncates
238
                    // exiting file, subsequent response appends bytes).
239
780
                    self.print_output(&msg.result, &mut stdout, append)?;
240
780
                    append = true;
241
780

            
242
780
                    // Report the completion of this job and update the progress.
243
780
                    self.progress.print_completed(&msg.result, &mut stderr);
244
780

            
245
780
                    results.push(msg.result);
246
780

            
247
780
                    self.progress.update_progress_bar(
248
780
                        &self.workers,
249
780
                        results.len(),
250
780
                        jobs_count,
251
780
                        &mut stderr,
252
780
                    );
253
780
                    // We want to force the next refresh of the progress bar (when we receive a
254
780
                    // running message) to be sure that the new next jobs will be printed. This
255
780
                    // is needed because we've a throttle on the progress bar refresh and not every
256
780
                    // running messages received leads to a progress bar refresh.
257
780
                    self.progress.force_next_update();
258
780

            
259
780
                    // We run the next job to process:
260
780
                    let job = queue.next();
261
780
                    match job {
262
624
                        Some(job) => {
263
624
                            _ = self.tx.send(job);
264
                        }
265
                        None => {
266
                            // If we have received all the job results, we can stop the run.
267
156
                            if let Some(jobs_count) = jobs_count {
268
156
                                if results.len() == jobs_count {
269
66
                                    break;
270
                                }
271
                            }
272
                        }
273
                    }
274
                }
275
                // Graceful shutdown for the worker.
276
                WorkerMessage::ShutDown => {}
277
            }
278
        }
279

            
280
        // All jobs have been executed, we sort results by sequence number to get the same order
281
        // as the input jobs list.
282
6790
        results.sort_unstable_by_key(|result| result.job.seq);
283
66
        Ok(results)
284
    }
285

            
286
    /// Prints a job `result` to standard output `stdout`, either as a raw HTTP response (last
287
    /// body of the run), or in a structured JSON way.
288
    /// If `append` is true, any existing file will be appended instead of being truncated.
289
780
    fn print_output(
290
780
        &self,
291
780
        result: &JobResult,
292
780
        stdout: &mut Stdout,
293
780
        append: bool,
294
780
    ) -> Result<(), JobError> {
295
780
        let job = &result.job;
296
780
        let content = &result.content;
297
780
        let hurl_result = &result.hurl_result;
298
780
        let filename_in = &job.filename;
299
780
        let filename_out = job.runner_options.output.as_ref();
300
780

            
301
780
        match self.output_type {
302
            OutputType::ResponseBody {
303
63
                include_headers,
304
63
                color,
305
63
            } => {
306
63
                if hurl_result.success {
307
63
                    let result = output::write_last_body(
308
63
                        hurl_result,
309
63
                        include_headers,
310
63
                        color,
311
63
                        filename_out,
312
63
                        stdout,
313
63
                        append,
314
63
                    );
315
63
                    if let Err(e) = result {
316
                        return Err(JobError::Runtime(e.to_string(
317
                            &filename_in.to_string(),
318
                            content,
319
                            None,
320
                            OutputFormat::Terminal(color),
321
                        )));
322
                    }
323
                }
324
            }
325
            OutputType::Json => {
326
42
                let result = output::write_json(
327
42
                    hurl_result,
328
42
                    content,
329
42
                    filename_in,
330
42
                    filename_out,
331
42
                    stdout,
332
42
                    append,
333
42
                );
334
42
                if let Err(e) = result {
335
                    return Err(JobError::Runtime(e.to_string()));
336
                }
337
            }
338
675
            OutputType::NoOutput => {}
339
        }
340
780
        Ok(())
341
    }
342
}