Qt
Internal/Contributor docs for the Qt SDK. Note: These are NOT official API docs; those are found at https://doc.qt.io/
Loading...
Searching...
No Matches
qwindowspipereader.cpp
Go to the documentation of this file.
1// Copyright (C) 2016 The Qt Company Ltd.
2// Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
3// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
4// Qt-Security score:critical reason:data-parser
5
7#include <qcoreapplication.h>
8#include <QMutexLocker>
9#include <QPointer>
10
12
13using namespace Qt::StringLiterals;
14
15static const DWORD minReadBufferSize = 4096;
16
17QWindowsPipeReader::QWindowsPipeReader(QObject *parent)
18 : QObject(parent),
19 handle(INVALID_HANDLE_VALUE),
20 eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
21 syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),
22 waitObject(NULL),
23 readBufferMaxSize(0),
24 actualReadBufferSize(0),
25 pendingReadBytes(0),
26 lastError(ERROR_SUCCESS),
27 state(Stopped),
28 readSequenceStarted(false),
29 pipeBroken(true),
30 readyReadPending(false),
31 winEventActPosted(false)
32{
33 ZeroMemory(&overlapped, sizeof(OVERLAPPED));
34 overlapped.hEvent = eventHandle;
35 waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
36 if (waitObject == NULL)
37 qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed.");
38}
39
40QWindowsPipeReader::~QWindowsPipeReader()
41{
42 stop();
43
44 // Wait for thread pool callback to complete, as it can be still
45 // executing some completion code.
46 WaitForThreadpoolWaitCallbacks(waitObject, FALSE);
47 CloseThreadpoolWait(waitObject);
48 CloseHandle(eventHandle);
49 CloseHandle(syncHandle);
50}
51
52/*!
53 Sets the handle to read from. The handle must be valid.
54 Do not call this function while the pipe is running.
55 */
56void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
57{
58 readBuffer.clear();
59 actualReadBufferSize = 0;
60 readyReadPending = false;
61 pendingReadBytes = 0;
62 handle = hPipeReadEnd;
63 pipeBroken = false;
64 lastError = ERROR_SUCCESS;
65}
66
67/*!
68 Stops the asynchronous read sequence.
69 If the read sequence is running then the I/O operation is canceled.
70 */
71void QWindowsPipeReader::stop()
72{
73 cancelAsyncRead(Stopped);
74 pipeBroken = true;
75}
76
77/*!
78 Stops the asynchronous read sequence.
79 Reads all pending bytes into the internal buffer.
80 */
81void QWindowsPipeReader::drainAndStop()
82{
83 cancelAsyncRead(Draining);
84 pipeBroken = true;
85}
86
87/*!
88 Stops the asynchronous read sequence.
89 Clears the internal buffer and discards any pending data.
90 */
91void QWindowsPipeReader::stopAndClear()
92{
93 cancelAsyncRead(Stopped);
94 readBuffer.clear();
95 actualReadBufferSize = 0;
96 // QLocalSocket is supposed to write data in the 'Closing'
97 // state, so we don't set 'pipeBroken' flag here. Also, avoid
98 // setting this flag in checkForReadyRead().
99 lastError = ERROR_SUCCESS;
100}
101
102/*!
103 Stops the asynchronous read sequence.
104 */
105void QWindowsPipeReader::cancelAsyncRead(State newState)
106{
107 if (state != Running)
108 return;
109
110 mutex.lock();
111 state = newState;
112 if (readSequenceStarted) {
113 // This can legitimately fail due to the GetOverlappedResult()
114 // in the callback not being locked. We ignore ERROR_NOT_FOUND
115 // in this case.
116 if (!CancelIoEx(handle, &overlapped)) {
117 const DWORD dwError = GetLastError();
118 if (dwError != ERROR_NOT_FOUND) {
119 qErrnoWarning(dwError, "QWindowsPipeReader: CancelIoEx on handle %p failed.",
120 handle);
121 }
122 }
123
124 // Wait for callback to complete.
125 do {
126 mutex.unlock();
127 waitForNotification();
128 mutex.lock();
129 } while (readSequenceStarted);
130 }
131 mutex.unlock();
132
133 // Finish reading to keep the class state consistent. Note that
134 // signals are not emitted in the call below, as the caller is
135 // expected to do that synchronously.
136 consumePending();
137}
138
139/*!
140 Sets the size of internal read buffer.
141 */
142void QWindowsPipeReader::setMaxReadBufferSize(qint64 size)
143{
144 QMutexLocker locker(&mutex);
145 readBufferMaxSize = size;
146}
147
148/*!
149 Returns \c true if async operation is in progress, there is
150 pending data to read, or a read error is pending.
151*/
152bool QWindowsPipeReader::isReadOperationActive() const
153{
154 QMutexLocker locker(&mutex);
155 return readSequenceStarted || readyReadPending
156 || (lastError != ERROR_SUCCESS && !pipeBroken);
157}
158
159/*!
160 Returns the number of bytes we've read so far.
161 */
162qint64 QWindowsPipeReader::bytesAvailable() const
163{
164 return actualReadBufferSize;
165}
166
167/*!
168 Copies at most \c{maxlen} bytes from the internal read buffer to \c{data}.
169 */
170qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
171{
172 QMutexLocker locker(&mutex);
173 qint64 readSoFar;
174
175 // If startAsyncRead() has read data, copy it to its destination.
176 if (maxlen == 1 && actualReadBufferSize > 0) {
177 *data = readBuffer.getChar();
178 actualReadBufferSize--;
179 readSoFar = 1;
180 } else {
181 readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen));
182 actualReadBufferSize -= readSoFar;
183 }
184
185 if (!pipeBroken) {
186 startAsyncReadHelper(&locker);
187 if (readSoFar == 0)
188 return -2; // signal EWOULDBLOCK
189 }
190
191 return readSoFar;
192}
193
194/*!
195 Reads a line from the internal buffer, but no more than \c{maxlen}
196 characters. A terminating '\0' byte is always appended to \c{data},
197 so \c{maxlen} must be larger than 1.
198 */
199qint64 QWindowsPipeReader::readLine(char *data, qint64 maxlen)
200{
201 QMutexLocker locker(&mutex);
202 qint64 readSoFar = 0;
203
204 if (actualReadBufferSize > 0) {
205 readSoFar = readBuffer.readLine(data, qMin(actualReadBufferSize + 1, maxlen));
206 actualReadBufferSize -= readSoFar;
207 }
208
209 if (!pipeBroken) {
210 startAsyncReadHelper(&locker);
211 if (readSoFar == 0)
212 return -2; // signal EWOULDBLOCK
213 }
214
215 return readSoFar;
216}
217
218/*!
219 Skips up to \c{maxlen} bytes from the internal read buffer.
220 */
221qint64 QWindowsPipeReader::skip(qint64 maxlen)
222{
223 QMutexLocker locker(&mutex);
224
225 const qint64 skippedSoFar = readBuffer.skip(qMin(actualReadBufferSize, maxlen));
226 actualReadBufferSize -= skippedSoFar;
227
228 if (!pipeBroken) {
229 startAsyncReadHelper(&locker);
230 if (skippedSoFar == 0)
231 return -2; // signal EWOULDBLOCK
232 }
233
234 return skippedSoFar;
235}
236
237/*!
238 Returns \c true if a complete line of data can be read from the buffer.
239 */
240bool QWindowsPipeReader::canReadLine() const
241{
242 QMutexLocker locker(&mutex);
243 return readBuffer.indexOf('\n', actualReadBufferSize) >= 0;
244}
245
246/*!
247 Starts an asynchronous read sequence on the pipe.
248 */
249void QWindowsPipeReader::startAsyncRead()
250{
251 QMutexLocker locker(&mutex);
252 startAsyncReadHelper(&locker);
253}
254
255void QWindowsPipeReader::startAsyncReadHelper(QMutexLocker<QMutex> *locker)
256{
257 if (readSequenceStarted || lastError != ERROR_SUCCESS)
258 return;
259
260 state = Running;
261 startAsyncReadLocked();
262
263 // Do not post the event, if the read operation will be completed asynchronously.
264 if (!readyReadPending && lastError == ERROR_SUCCESS)
265 return;
266
267 if (!winEventActPosted) {
268 winEventActPosted = true;
269 locker->unlock();
270 QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
271 } else {
272 locker->unlock();
273 }
274
275 SetEvent(syncHandle);
276}
277
278/*!
279 Starts a new read sequence. Thread-safety should be ensured
280 by the caller.
281 */
282void QWindowsPipeReader::startAsyncReadLocked()
283{
284 // Determine the number of bytes to read.
285 qint64 bytesToRead = qMax(checkPipeState(), state == Running ? minReadBufferSize : 0);
286
287 // This can happen only while draining; just do nothing in this case.
288 if (bytesToRead == 0)
289 return;
290
291 while (lastError == ERROR_SUCCESS) {
292 if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
293 bytesToRead = readBufferMaxSize - readBuffer.size();
294 if (bytesToRead <= 0) {
295 // Buffer is full. User must read data from the buffer
296 // before we can read more from the pipe.
297 return;
298 }
299 }
300
301 char *ptr = readBuffer.reserve(bytesToRead);
302
303 // ReadFile() returns true, if the read operation completes synchronously.
304 // We don't need to call GetOverlappedResult() additionally, because
305 // 'numberOfBytesRead' is valid in this case.
306 DWORD numberOfBytesRead;
307 DWORD errorCode = ERROR_SUCCESS;
308 if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped)) {
309 errorCode = GetLastError();
310 if (errorCode == ERROR_IO_PENDING) {
311 Q_ASSERT(state == Running);
312 // Operation has been queued and will complete in the future.
313 readSequenceStarted = true;
314 SetThreadpoolWait(waitObject, eventHandle, NULL);
315 return;
316 }
317 }
318
319 if (!readCompleted(errorCode, numberOfBytesRead))
320 return;
321
322 // In the 'Draining' state, we have to get all the data with one call
323 // to ReadFile(). Note that message mode pipes are not supported here.
324 if (state == Draining) {
325 Q_ASSERT(bytesToRead == qint64(numberOfBytesRead));
326 return;
327 }
328
329 // We need to loop until all pending data has been read and an
330 // operation is queued for asynchronous completion.
331 // If the pipe is configured to work in message mode, we read
332 // the data in chunks.
333 bytesToRead = qMax(checkPipeState(), minReadBufferSize);
334 }
335}
336
337/*!
338 \internal
339
340 Thread pool callback procedure.
341 */
342void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
343 PTP_WAIT wait, TP_WAIT_RESULT waitResult)
344{
345 Q_UNUSED(instance);
346 Q_UNUSED(wait);
347 Q_UNUSED(waitResult);
348 QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context);
349
350 // Get the result of the asynchronous operation.
351 DWORD numberOfBytesTransfered = 0;
352 DWORD errorCode = ERROR_SUCCESS;
353 if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped,
354 &numberOfBytesTransfered, FALSE))
355 errorCode = GetLastError();
356
357 pipeReader->mutex.lock();
358
359 pipeReader->readSequenceStarted = false;
360
361 // Do not overwrite error code, if error has been detected by
362 // checkPipeState() in waitForPipeClosed(). Also, if the reader was
363 // stopped, the only reason why this function can be called is the
364 // completion of a cancellation. No signals should be emitted, and
365 // no new read sequence should be started in this case.
366 if (pipeReader->lastError == ERROR_SUCCESS && pipeReader->state != Stopped) {
367 // Ignore ERROR_OPERATION_ABORTED. We have canceled the I/O operation
368 // specifically for flushing the pipe.
369 if (pipeReader->state == Draining && errorCode == ERROR_OPERATION_ABORTED)
370 errorCode = ERROR_SUCCESS;
371
372 if (pipeReader->readCompleted(errorCode, numberOfBytesTransfered))
373 pipeReader->startAsyncReadLocked();
374
375 if (pipeReader->state == Running && !pipeReader->winEventActPosted) {
376 pipeReader->winEventActPosted = true;
377 pipeReader->mutex.unlock();
378 QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct));
379 } else {
380 pipeReader->mutex.unlock();
381 }
382 } else {
383 pipeReader->mutex.unlock();
384 }
385
386 // We set the event only after unlocking to avoid additional context
387 // switches due to the released thread immediately running into the lock.
388 SetEvent(pipeReader->syncHandle);
389}
390
391/*!
392 Will be called whenever the read operation completes. Returns \c true if
393 no error occurred; otherwise returns \c false.
394 */
395bool QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead)
396{
397 // ERROR_MORE_DATA is not an error. We're connected to a message mode
398 // pipe and the message didn't fit into the pipe's system
399 // buffer. We will read the remaining data in the next call.
400 if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) {
401 readyReadPending = true;
402 pendingReadBytes += numberOfBytesRead;
403 readBuffer.truncate(actualReadBufferSize + pendingReadBytes);
404 return true;
405 }
406
407 lastError = errorCode;
408 return false;
409}
410
411/*!
412 Receives notification that the read operation has completed.
413 */
414bool QWindowsPipeReader::event(QEvent *e)
415{
416 if (e->type() == QEvent::WinEventAct) {
417 consumePendingAndEmit(true);
418 return true;
419 }
420 return QObject::event(e);
421}
422
423/*!
424 Updates the read buffer size and emits pending signals in the main thread.
425 Returns \c true, if readyRead() was emitted.
426 */
427bool QWindowsPipeReader::consumePendingAndEmit(bool allowWinActPosting)
428{
429 ResetEvent(syncHandle);
430 mutex.lock();
431
432 // Enable QEvent::WinEventAct posting.
433 if (allowWinActPosting)
434 winEventActPosted = false;
435
436 const bool emitReadyRead = consumePending();
437 const DWORD dwError = lastError;
438
439 mutex.unlock();
440
441 // Trigger 'pipeBroken' only once. This flag must be updated before
442 // emitting the readyRead() signal. Otherwise, the read sequence will
443 // be considered not finished, and we may hang if a slot connected
444 // to readyRead() calls waitForReadyRead().
445 const bool emitPipeClosed = (dwError != ERROR_SUCCESS && !pipeBroken);
446 if (emitPipeClosed)
447 pipeBroken = true;
448
449 // Disable any further processing, if the pipe was stopped.
450 // We are not allowed to emit signals in either 'Stopped'
451 // or 'Draining' state.
452 if (state != Running)
453 return false;
454
455 if (!emitPipeClosed) {
456 if (emitReadyRead)
457 emit readyRead();
458 } else {
459 QPointer<QWindowsPipeReader> alive(this);
460 if (emitReadyRead)
461 emit readyRead();
462 if (alive && dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED)
463 emit winError(dwError, "QWindowsPipeReader::consumePendingAndEmit"_L1);
464 if (alive)
465 emit pipeClosed();
466 }
467
468 return emitReadyRead;
469}
470
471/*!
472 Updates the read buffer size. Returns \c true, if readyRead()
473 should be emitted. Thread-safety should be ensured by the caller.
474 */
475bool QWindowsPipeReader::consumePending()
476{
477 if (readyReadPending) {
478 readyReadPending = false;
479 actualReadBufferSize += pendingReadBytes;
480 pendingReadBytes = 0;
481 return true;
482 }
483
484 return false;
485}
486
487/*!
488 Returns the number of available bytes in the pipe.
489 */
490DWORD QWindowsPipeReader::checkPipeState()
491{
492 DWORD bytes;
493 if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr))
494 return bytes;
495
496 lastError = GetLastError();
497 return 0;
498}
499
500bool QWindowsPipeReader::waitForNotification()
501{
502 DWORD waitRet;
503 do {
504 waitRet = WaitForSingleObjectEx(syncHandle, INFINITE, TRUE);
505 if (waitRet == WAIT_OBJECT_0)
506 return true;
507
508 // Some I/O completion routine was called. Wait some more.
509 } while (waitRet == WAIT_IO_COMPLETION);
510
511 return false;
512}
513
514QT_END_NAMESPACE
515
516#include "moc_qwindowspipereader_p.cpp"
static const DWORD minReadBufferSize