Qt
Internal/Contributor docs for the Qt SDK. Note: These are NOT official API docs; those are found at https://doc.qt.io/
Loading...
Searching...
No Matches
qwindowspipereader.cpp
Go to the documentation of this file.
1// Copyright (C) 2016 The Qt Company Ltd.
2// Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
3// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
4// Qt-Security score:critical reason:data-parser
5
7#include <qcoreapplication.h>
8#include <QMutexLocker>
9#include <QPointer>
10
12
13using namespace Qt::StringLiterals;
14
15static const DWORD minReadBufferSize = 4096;
16
17/*!
18 \class QWindowsPipeReader
19 \inmodule QtCore
20 \internal
21*/
22QWindowsPipeReader::QWindowsPipeReader(QObject *parent)
23 : QObject(parent),
24 handle(INVALID_HANDLE_VALUE),
25 eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
26 syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),
27 waitObject(NULL),
28 readBufferMaxSize(0),
29 actualReadBufferSize(0),
30 pendingReadBytes(0),
31 lastError(ERROR_SUCCESS),
32 state(Stopped),
33 readSequenceStarted(false),
34 pipeBroken(true),
35 readyReadPending(false),
36 winEventActPosted(false)
37{
38 ZeroMemory(&overlapped, sizeof(OVERLAPPED));
39 overlapped.hEvent = eventHandle;
40 waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
41 if (waitObject == NULL)
42 qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed.");
43}
44
45QWindowsPipeReader::~QWindowsPipeReader()
46{
47 stop();
48
49 // Wait for thread pool callback to complete, as it can be still
50 // executing some completion code.
51 WaitForThreadpoolWaitCallbacks(waitObject, FALSE);
52 CloseThreadpoolWait(waitObject);
53 CloseHandle(eventHandle);
54 CloseHandle(syncHandle);
55}
56
57/*!
58 Sets the handle to read from. The handle must be valid.
59 Do not call this function while the pipe is running.
60 */
61void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
62{
63 readBuffer.clear();
64 actualReadBufferSize = 0;
65 readyReadPending = false;
66 pendingReadBytes = 0;
67 handle = hPipeReadEnd;
68 pipeBroken = false;
69 lastError = ERROR_SUCCESS;
70}
71
72/*!
73 Stops the asynchronous read sequence.
74 If the read sequence is running then the I/O operation is canceled.
75 */
76void QWindowsPipeReader::stop()
77{
78 cancelAsyncRead(Stopped);
79 pipeBroken = true;
80}
81
82/*!
83 Stops the asynchronous read sequence.
84 Reads all pending bytes into the internal buffer.
85 */
86void QWindowsPipeReader::drainAndStop()
87{
88 cancelAsyncRead(Draining);
89 pipeBroken = true;
90}
91
92/*!
93 Stops the asynchronous read sequence.
94 Clears the internal buffer and discards any pending data.
95 */
96void QWindowsPipeReader::stopAndClear()
97{
98 cancelAsyncRead(Stopped);
99 readBuffer.clear();
100 actualReadBufferSize = 0;
101 // QLocalSocket is supposed to write data in the 'Closing'
102 // state, so we don't set 'pipeBroken' flag here. Also, avoid
103 // setting this flag in checkForReadyRead().
104 lastError = ERROR_SUCCESS;
105}
106
107/*!
108 Stops the asynchronous read sequence.
109 */
110void QWindowsPipeReader::cancelAsyncRead(State newState)
111{
112 if (state != Running)
113 return;
114
115 mutex.lock();
116 state = newState;
117 if (readSequenceStarted) {
118 // This can legitimately fail due to the GetOverlappedResult()
119 // in the callback not being locked. We ignore ERROR_NOT_FOUND
120 // in this case.
121 if (!CancelIoEx(handle, &overlapped)) {
122 const DWORD dwError = GetLastError();
123 if (dwError != ERROR_NOT_FOUND) {
124 qErrnoWarning(dwError, "QWindowsPipeReader: CancelIoEx on handle %p failed.",
125 handle);
126 }
127 }
128
129 // Wait for callback to complete.
130 do {
131 mutex.unlock();
132 waitForNotification();
133 mutex.lock();
134 } while (readSequenceStarted);
135 }
136 mutex.unlock();
137
138 // Finish reading to keep the class state consistent. Note that
139 // signals are not emitted in the call below, as the caller is
140 // expected to do that synchronously.
141 consumePending();
142}
143
144/*!
145 Sets the size of internal read buffer.
146 */
147void QWindowsPipeReader::setMaxReadBufferSize(qint64 size)
148{
149 QMutexLocker locker(&mutex);
150 readBufferMaxSize = size;
151}
152
153/*!
154 Returns \c true if async operation is in progress, there is
155 pending data to read, or a read error is pending.
156*/
157bool QWindowsPipeReader::isReadOperationActive() const
158{
159 QMutexLocker locker(&mutex);
160 return readSequenceStarted || readyReadPending
161 || (lastError != ERROR_SUCCESS && !pipeBroken);
162}
163
164/*!
165 Returns the number of bytes we've read so far.
166 */
167qint64 QWindowsPipeReader::bytesAvailable() const
168{
169 return actualReadBufferSize;
170}
171
172/*!
173 Copies at most \c{maxlen} bytes from the internal read buffer to \c{data}.
174 */
175qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
176{
177 QMutexLocker locker(&mutex);
178 qint64 readSoFar;
179
180 // If startAsyncRead() has read data, copy it to its destination.
181 if (maxlen == 1 && actualReadBufferSize > 0) {
182 *data = readBuffer.getChar();
183 actualReadBufferSize--;
184 readSoFar = 1;
185 } else {
186 readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen));
187 actualReadBufferSize -= readSoFar;
188 }
189
190 if (!pipeBroken) {
191 startAsyncReadHelper(&locker);
192 if (readSoFar == 0)
193 return -2; // signal EWOULDBLOCK
194 }
195
196 return readSoFar;
197}
198
199/*!
200 Reads a line from the internal buffer, but no more than \c{maxlen}
201 characters. A terminating '\0' byte is always appended to \c{data},
202 so \c{maxlen} must be larger than 1.
203 */
204qint64 QWindowsPipeReader::readLine(char *data, qint64 maxlen)
205{
206 QMutexLocker locker(&mutex);
207 qint64 readSoFar = 0;
208
209 if (actualReadBufferSize > 0) {
210 readSoFar = readBuffer.readLine(data, qMin(actualReadBufferSize + 1, maxlen));
211 actualReadBufferSize -= readSoFar;
212 }
213
214 if (!pipeBroken) {
215 startAsyncReadHelper(&locker);
216 if (readSoFar == 0)
217 return -2; // signal EWOULDBLOCK
218 }
219
220 return readSoFar;
221}
222
223/*!
224 Skips up to \c{maxlen} bytes from the internal read buffer.
225 */
226qint64 QWindowsPipeReader::skip(qint64 maxlen)
227{
228 QMutexLocker locker(&mutex);
229
230 const qint64 skippedSoFar = readBuffer.skip(qMin(actualReadBufferSize, maxlen));
231 actualReadBufferSize -= skippedSoFar;
232
233 if (!pipeBroken) {
234 startAsyncReadHelper(&locker);
235 if (skippedSoFar == 0)
236 return -2; // signal EWOULDBLOCK
237 }
238
239 return skippedSoFar;
240}
241
242/*!
243 Returns \c true if a complete line of data can be read from the buffer.
244 */
245bool QWindowsPipeReader::canReadLine() const
246{
247 QMutexLocker locker(&mutex);
248 return readBuffer.indexOf('\n', actualReadBufferSize) >= 0;
249}
250
251/*!
252 Starts an asynchronous read sequence on the pipe.
253 */
254void QWindowsPipeReader::startAsyncRead()
255{
256 QMutexLocker locker(&mutex);
257 startAsyncReadHelper(&locker);
258}
259
260void QWindowsPipeReader::startAsyncReadHelper(QMutexLocker<QMutex> *locker)
261{
262 if (readSequenceStarted || lastError != ERROR_SUCCESS)
263 return;
264
265 state = Running;
266 startAsyncReadLocked();
267
268 // Do not post the event, if the read operation will be completed asynchronously.
269 if (!readyReadPending && lastError == ERROR_SUCCESS)
270 return;
271
272 if (!winEventActPosted) {
273 winEventActPosted = true;
274 locker->unlock();
275 QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
276 } else {
277 locker->unlock();
278 }
279
280 SetEvent(syncHandle);
281}
282
283/*!
284 Starts a new read sequence. Thread-safety should be ensured
285 by the caller.
286 */
287void QWindowsPipeReader::startAsyncReadLocked()
288{
289 // Determine the number of bytes to read.
290 qint64 bytesToRead = qMax(checkPipeState(), state == Running ? minReadBufferSize : 0);
291
292 // This can happen only while draining; just do nothing in this case.
293 if (bytesToRead == 0)
294 return;
295
296 while (lastError == ERROR_SUCCESS) {
297 if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
298 bytesToRead = readBufferMaxSize - readBuffer.size();
299 if (bytesToRead <= 0) {
300 // Buffer is full. User must read data from the buffer
301 // before we can read more from the pipe.
302 return;
303 }
304 }
305
306 char *ptr = readBuffer.reserve(bytesToRead);
307
308 // ReadFile() returns true, if the read operation completes synchronously.
309 // We don't need to call GetOverlappedResult() additionally, because
310 // 'numberOfBytesRead' is valid in this case.
311 DWORD numberOfBytesRead;
312 DWORD errorCode = ERROR_SUCCESS;
313 if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped)) {
314 errorCode = GetLastError();
315 if (errorCode == ERROR_IO_PENDING) {
316 Q_ASSERT(state == Running);
317 // Operation has been queued and will complete in the future.
318 readSequenceStarted = true;
319 SetThreadpoolWait(waitObject, eventHandle, NULL);
320 return;
321 }
322 }
323
324 if (!readCompleted(errorCode, numberOfBytesRead))
325 return;
326
327 // In the 'Draining' state, we have to get all the data with one call
328 // to ReadFile(). Note that message mode pipes are not supported here.
329 if (state == Draining) {
330 Q_ASSERT(bytesToRead == qint64(numberOfBytesRead));
331 return;
332 }
333
334 // We need to loop until all pending data has been read and an
335 // operation is queued for asynchronous completion.
336 // If the pipe is configured to work in message mode, we read
337 // the data in chunks.
338 bytesToRead = qMax(checkPipeState(), minReadBufferSize);
339 }
340}
341
342/*!
343 \internal
344
345 Thread pool callback procedure.
346 */
347void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
348 PTP_WAIT wait, TP_WAIT_RESULT waitResult)
349{
350 Q_UNUSED(instance);
351 Q_UNUSED(wait);
352 Q_UNUSED(waitResult);
353 QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context);
354
355 // Get the result of the asynchronous operation.
356 DWORD numberOfBytesTransfered = 0;
357 DWORD errorCode = ERROR_SUCCESS;
358 if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped,
359 &numberOfBytesTransfered, FALSE))
360 errorCode = GetLastError();
361
362 pipeReader->mutex.lock();
363
364 pipeReader->readSequenceStarted = false;
365
366 // Do not overwrite error code, if error has been detected by
367 // checkPipeState() in waitForPipeClosed(). Also, if the reader was
368 // stopped, the only reason why this function can be called is the
369 // completion of a cancellation. No signals should be emitted, and
370 // no new read sequence should be started in this case.
371 if (pipeReader->lastError == ERROR_SUCCESS && pipeReader->state != Stopped) {
372 // Ignore ERROR_OPERATION_ABORTED. We have canceled the I/O operation
373 // specifically for flushing the pipe.
374 if (pipeReader->state == Draining && errorCode == ERROR_OPERATION_ABORTED)
375 errorCode = ERROR_SUCCESS;
376
377 if (pipeReader->readCompleted(errorCode, numberOfBytesTransfered))
378 pipeReader->startAsyncReadLocked();
379
380 if (pipeReader->state == Running && !pipeReader->winEventActPosted) {
381 pipeReader->winEventActPosted = true;
382 pipeReader->mutex.unlock();
383 QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct));
384 } else {
385 pipeReader->mutex.unlock();
386 }
387 } else {
388 pipeReader->mutex.unlock();
389 }
390
391 // We set the event only after unlocking to avoid additional context
392 // switches due to the released thread immediately running into the lock.
393 SetEvent(pipeReader->syncHandle);
394}
395
396/*!
397 Will be called whenever the read operation completes. Returns \c true if
398 no error occurred; otherwise returns \c false.
399 */
400bool QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead)
401{
402 // ERROR_MORE_DATA is not an error. We're connected to a message mode
403 // pipe and the message didn't fit into the pipe's system
404 // buffer. We will read the remaining data in the next call.
405 if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) {
406 readyReadPending = true;
407 pendingReadBytes += numberOfBytesRead;
408 readBuffer.truncate(actualReadBufferSize + pendingReadBytes);
409 return true;
410 }
411
412 lastError = errorCode;
413 return false;
414}
415
416/*!
417 Receives notification that the read operation has completed.
418 */
419bool QWindowsPipeReader::event(QEvent *e)
420{
421 if (e->type() == QEvent::WinEventAct) {
422 consumePendingAndEmit(true);
423 return true;
424 }
425 return QObject::event(e);
426}
427
428/*!
429 Updates the read buffer size and emits pending signals in the main thread.
430 Returns \c true, if readyRead() was emitted.
431 */
432bool QWindowsPipeReader::consumePendingAndEmit(bool allowWinActPosting)
433{
434 ResetEvent(syncHandle);
435 mutex.lock();
436
437 // Enable QEvent::WinEventAct posting.
438 if (allowWinActPosting)
439 winEventActPosted = false;
440
441 const bool emitReadyRead = consumePending();
442 const DWORD dwError = lastError;
443
444 mutex.unlock();
445
446 // Trigger 'pipeBroken' only once. This flag must be updated before
447 // emitting the readyRead() signal. Otherwise, the read sequence will
448 // be considered not finished, and we may hang if a slot connected
449 // to readyRead() calls waitForReadyRead().
450 const bool emitPipeClosed = (dwError != ERROR_SUCCESS && !pipeBroken);
451 if (emitPipeClosed)
452 pipeBroken = true;
453
454 // Disable any further processing, if the pipe was stopped.
455 // We are not allowed to emit signals in either 'Stopped'
456 // or 'Draining' state.
457 if (state != Running)
458 return false;
459
460 if (!emitPipeClosed) {
461 if (emitReadyRead)
462 emit readyRead();
463 } else {
464 QPointer<QWindowsPipeReader> alive(this);
465 if (emitReadyRead)
466 emit readyRead();
467 if (alive && dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED)
468 emit winError(dwError, "QWindowsPipeReader::consumePendingAndEmit"_L1);
469 if (alive)
470 emit pipeClosed();
471 }
472
473 return emitReadyRead;
474}
475
476/*!
477 Updates the read buffer size. Returns \c true, if readyRead()
478 should be emitted. Thread-safety should be ensured by the caller.
479 */
480bool QWindowsPipeReader::consumePending()
481{
482 if (readyReadPending) {
483 readyReadPending = false;
484 actualReadBufferSize += pendingReadBytes;
485 pendingReadBytes = 0;
486 return true;
487 }
488
489 return false;
490}
491
492/*!
493 Returns the number of available bytes in the pipe.
494 */
495DWORD QWindowsPipeReader::checkPipeState()
496{
497 DWORD bytes;
498 if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr))
499 return bytes;
500
501 lastError = GetLastError();
502 return 0;
503}
504
505bool QWindowsPipeReader::waitForNotification()
506{
507 DWORD waitRet;
508 do {
509 waitRet = WaitForSingleObjectEx(syncHandle, INFINITE, TRUE);
510 if (waitRet == WAIT_OBJECT_0)
511 return true;
512
513 // Some I/O completion routine was called. Wait some more.
514 } while (waitRet == WAIT_IO_COMPLETION);
515
516 return false;
517}
518
519QT_END_NAMESPACE
520
521#include "moc_qwindowspipereader_p.cpp"
Combined button and popup list for selecting options.
static const DWORD minReadBufferSize