7#include <qcoreapplication.h>
13using namespace Qt::StringLiterals;
17QWindowsPipeReader::QWindowsPipeReader(QObject *parent)
19 handle(INVALID_HANDLE_VALUE),
20 eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
21 syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),
24 actualReadBufferSize(0),
26 lastError(ERROR_SUCCESS),
28 readSequenceStarted(
false),
30 readyReadPending(
false),
31 winEventActPosted(
false)
33 ZeroMemory(&overlapped,
sizeof(OVERLAPPED));
34 overlapped.hEvent = eventHandle;
35 waitObject = CreateThreadpoolWait(waitCallback,
this, NULL);
36 if (waitObject == NULL)
37 qErrnoWarning(
"QWindowsPipeReader: CreateThreadpollWait failed.");
40QWindowsPipeReader::~QWindowsPipeReader()
46 WaitForThreadpoolWaitCallbacks(waitObject, FALSE);
47 CloseThreadpoolWait(waitObject);
48 CloseHandle(eventHandle);
49 CloseHandle(syncHandle);
53
54
55
56void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
59 actualReadBufferSize = 0;
60 readyReadPending =
false;
62 handle = hPipeReadEnd;
64 lastError = ERROR_SUCCESS;
68
69
70
71void QWindowsPipeReader::stop()
73 cancelAsyncRead(Stopped);
78
79
80
81void QWindowsPipeReader::drainAndStop()
83 cancelAsyncRead(Draining);
88
89
90
91void QWindowsPipeReader::stopAndClear()
93 cancelAsyncRead(Stopped);
95 actualReadBufferSize = 0;
99 lastError = ERROR_SUCCESS;
103
104
105void QWindowsPipeReader::cancelAsyncRead(State newState)
107 if (state != Running)
112 if (readSequenceStarted) {
116 if (!CancelIoEx(handle, &overlapped)) {
117 const DWORD dwError = GetLastError();
118 if (dwError != ERROR_NOT_FOUND) {
119 qErrnoWarning(dwError,
"QWindowsPipeReader: CancelIoEx on handle %p failed.",
127 waitForNotification();
129 }
while (readSequenceStarted);
140
141
142void QWindowsPipeReader::setMaxReadBufferSize(qint64 size)
144 QMutexLocker locker(&mutex);
145 readBufferMaxSize = size;
149
150
151
152bool QWindowsPipeReader::isReadOperationActive()
const
154 QMutexLocker locker(&mutex);
155 return readSequenceStarted || readyReadPending
156 || (lastError != ERROR_SUCCESS && !pipeBroken);
160
161
162qint64 QWindowsPipeReader::bytesAvailable()
const
164 return actualReadBufferSize;
168
169
170qint64 QWindowsPipeReader::read(
char *data, qint64 maxlen)
172 QMutexLocker locker(&mutex);
176 if (maxlen == 1 && actualReadBufferSize > 0) {
177 *data = readBuffer.getChar();
178 actualReadBufferSize--;
181 readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen));
182 actualReadBufferSize -= readSoFar;
186 startAsyncReadHelper(&locker);
195
196
197
198
199qint64 QWindowsPipeReader::readLine(
char *data, qint64 maxlen)
201 QMutexLocker locker(&mutex);
202 qint64 readSoFar = 0;
204 if (actualReadBufferSize > 0) {
205 readSoFar = readBuffer.readLine(data, qMin(actualReadBufferSize + 1, maxlen));
206 actualReadBufferSize -= readSoFar;
210 startAsyncReadHelper(&locker);
219
220
221qint64 QWindowsPipeReader::skip(qint64 maxlen)
223 QMutexLocker locker(&mutex);
225 const qint64 skippedSoFar = readBuffer.skip(qMin(actualReadBufferSize, maxlen));
226 actualReadBufferSize -= skippedSoFar;
229 startAsyncReadHelper(&locker);
230 if (skippedSoFar == 0)
238
239
240bool QWindowsPipeReader::canReadLine()
const
242 QMutexLocker locker(&mutex);
243 return readBuffer.indexOf(
'\n', actualReadBufferSize) >= 0;
247
248
249void QWindowsPipeReader::startAsyncRead()
251 QMutexLocker locker(&mutex);
252 startAsyncReadHelper(&locker);
255void QWindowsPipeReader::startAsyncReadHelper(QMutexLocker<QMutex> *locker)
257 if (readSequenceStarted || lastError != ERROR_SUCCESS)
261 startAsyncReadLocked();
264 if (!readyReadPending && lastError == ERROR_SUCCESS)
267 if (!winEventActPosted) {
268 winEventActPosted =
true;
270 QCoreApplication::postEvent(
this,
new QEvent(QEvent::WinEventAct));
275 SetEvent(syncHandle);
279
280
281
282void QWindowsPipeReader::startAsyncReadLocked()
285 qint64 bytesToRead = qMax(checkPipeState(), state == Running ? minReadBufferSize : 0);
288 if (bytesToRead == 0)
291 while (lastError == ERROR_SUCCESS) {
292 if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
293 bytesToRead = readBufferMaxSize - readBuffer.size();
294 if (bytesToRead <= 0) {
301 char *ptr = readBuffer.reserve(bytesToRead);
306 DWORD numberOfBytesRead;
307 DWORD errorCode = ERROR_SUCCESS;
308 if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped)) {
309 errorCode = GetLastError();
310 if (errorCode == ERROR_IO_PENDING) {
311 Q_ASSERT(state == Running);
313 readSequenceStarted =
true;
314 SetThreadpoolWait(waitObject, eventHandle, NULL);
319 if (!readCompleted(errorCode, numberOfBytesRead))
324 if (state == Draining) {
325 Q_ASSERT(bytesToRead == qint64(numberOfBytesRead));
333 bytesToRead = qMax(checkPipeState(), minReadBufferSize);
338
339
340
341
342void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
343 PTP_WAIT wait, TP_WAIT_RESULT waitResult)
347 Q_UNUSED(waitResult);
348 QWindowsPipeReader *pipeReader =
reinterpret_cast<QWindowsPipeReader *>(context);
351 DWORD numberOfBytesTransfered = 0;
352 DWORD errorCode = ERROR_SUCCESS;
353 if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped,
354 &numberOfBytesTransfered, FALSE))
355 errorCode = GetLastError();
357 pipeReader->mutex.lock();
359 pipeReader->readSequenceStarted =
false;
366 if (pipeReader->lastError == ERROR_SUCCESS && pipeReader->state != Stopped) {
369 if (pipeReader->state == Draining && errorCode == ERROR_OPERATION_ABORTED)
370 errorCode = ERROR_SUCCESS;
372 if (pipeReader->readCompleted(errorCode, numberOfBytesTransfered))
373 pipeReader->startAsyncReadLocked();
375 if (pipeReader->state == Running && !pipeReader->winEventActPosted) {
376 pipeReader->winEventActPosted =
true;
377 pipeReader->mutex.unlock();
378 QCoreApplication::postEvent(pipeReader,
new QEvent(QEvent::WinEventAct));
380 pipeReader->mutex.unlock();
383 pipeReader->mutex.unlock();
388 SetEvent(pipeReader->syncHandle);
392
393
394
395bool QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead)
400 if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) {
401 readyReadPending =
true;
402 pendingReadBytes += numberOfBytesRead;
403 readBuffer.truncate(actualReadBufferSize + pendingReadBytes);
407 lastError = errorCode;
412
413
414bool QWindowsPipeReader::event(QEvent *e)
416 if (e->type() == QEvent::WinEventAct) {
417 consumePendingAndEmit(
true);
420 return QObject::event(e);
424
425
426
427bool QWindowsPipeReader::consumePendingAndEmit(
bool allowWinActPosting)
429 ResetEvent(syncHandle);
433 if (allowWinActPosting)
434 winEventActPosted =
false;
436 const bool emitReadyRead = consumePending();
437 const DWORD dwError = lastError;
445 const bool emitPipeClosed = (dwError != ERROR_SUCCESS && !pipeBroken);
452 if (state != Running)
455 if (!emitPipeClosed) {
459 QPointer<QWindowsPipeReader> alive(
this);
462 if (alive && dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED)
463 emit winError(dwError,
"QWindowsPipeReader::consumePendingAndEmit"_L1);
468 return emitReadyRead;
472
473
474
475bool QWindowsPipeReader::consumePending()
477 if (readyReadPending) {
478 readyReadPending =
false;
479 actualReadBufferSize += pendingReadBytes;
480 pendingReadBytes = 0;
488
489
490DWORD QWindowsPipeReader::checkPipeState()
493 if (PeekNamedPipe(handle,
nullptr, 0,
nullptr, &bytes,
nullptr))
496 lastError = GetLastError();
500bool QWindowsPipeReader::waitForNotification()
504 waitRet = WaitForSingleObjectEx(syncHandle, INFINITE, TRUE);
505 if (waitRet == WAIT_OBJECT_0)
509 }
while (waitRet == WAIT_IO_COMPLETION);
516#include "moc_qwindowspipereader_p.cpp"
static const DWORD minReadBufferSize