7#include <qcoreapplication.h>
13QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent)
16 eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
17 syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),
19 pendingBytesWrittenValue(0),
20 lastError(ERROR_SUCCESS),
21 completionState(NoError),
23 writeSequenceStarted(
false),
24 bytesWrittenPending(
false),
25 winEventActPosted(
false)
27 ZeroMemory(&overlapped,
sizeof(OVERLAPPED));
28 overlapped.hEvent = eventHandle;
29 waitObject = CreateThreadpoolWait(waitCallback,
this, NULL);
30 if (waitObject == NULL)
31 qErrnoWarning(
"QWindowsPipeWriter: CreateThreadpollWait failed.");
34QWindowsPipeWriter::~QWindowsPipeWriter()
37 CloseThreadpoolWait(waitObject);
38 CloseHandle(eventHandle);
39 CloseHandle(syncHandle);
43
44
45
46void QWindowsPipeWriter::setHandle(HANDLE hPipeWriteEnd)
50 handle = hPipeWriteEnd;
51 QMutexLocker locker(&mutex);
52 startAsyncWriteHelper(&locker);
56
57
58
59void QWindowsPipeWriter::stop()
66 if (writeSequenceStarted) {
69 SetThreadpoolWait(waitObject, NULL, NULL);
70 if (!CancelIoEx(handle, &overlapped)) {
71 const DWORD dwError = GetLastError();
72 if (dwError != ERROR_NOT_FOUND) {
73 qErrnoWarning(dwError,
"QWindowsPipeWriter: CancelIoEx on handle %p failed.",
77 writeSequenceStarted =
false;
81 WaitForThreadpoolWaitCallbacks(waitObject, TRUE);
85
86
87qint64 QWindowsPipeWriter::bytesToWrite()
const
89 QMutexLocker locker(&mutex);
90 return writeBuffer.size() + pendingBytesWrittenValue;
94
95
96bool QWindowsPipeWriter::isWriteOperationActive()
const
98 return completionState == NoError && bytesToWrite() != 0;
102
103
104void QWindowsPipeWriter::write(
const QByteArray &ba)
106 if (completionState != WriteDisabled)
111
112
113void QWindowsPipeWriter::write(
const char *data, qint64 size)
115 if (completionState != WriteDisabled)
116 writeImpl(data, size);
119template <
typename... Args>
120inline void QWindowsPipeWriter::writeImpl(Args... args)
122 QMutexLocker locker(&mutex);
124 writeBuffer.append(args...);
126 if (writeSequenceStarted || (lastError != ERROR_SUCCESS))
133 if (handle != INVALID_HANDLE_VALUE)
134 startAsyncWriteHelper(&locker);
137void QWindowsPipeWriter::startAsyncWriteHelper(QMutexLocker<QMutex> *locker)
139 startAsyncWriteLocked();
142 if (!bytesWrittenPending && lastError == ERROR_SUCCESS)
145 notifyCompleted(locker);
149
150
151void QWindowsPipeWriter::startAsyncWriteLocked()
153 while (!writeBuffer.isEmpty()) {
157 DWORD numberOfBytesWritten;
158 DWORD errorCode = ERROR_SUCCESS;
159 if (!WriteFile(handle, writeBuffer.readPointer(), writeBuffer.nextDataBlockSize(),
160 &numberOfBytesWritten, &overlapped)) {
161 errorCode = GetLastError();
162 if (errorCode == ERROR_IO_PENDING) {
164 writeSequenceStarted =
true;
165 SetThreadpoolWait(waitObject, eventHandle, NULL);
170 if (!writeCompleted(errorCode, numberOfBytesWritten))
176
177
178
179void QWindowsPipeWriter::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
180 PTP_WAIT wait, TP_WAIT_RESULT waitResult)
184 Q_UNUSED(waitResult);
185 QWindowsPipeWriter *pipeWriter =
reinterpret_cast<QWindowsPipeWriter *>(context);
188 DWORD numberOfBytesTransfered = 0;
189 DWORD errorCode = ERROR_SUCCESS;
190 if (!GetOverlappedResult(pipeWriter->handle, &pipeWriter->overlapped,
191 &numberOfBytesTransfered, FALSE))
192 errorCode = GetLastError();
194 QMutexLocker locker(&pipeWriter->mutex);
199 if (pipeWriter->stopped)
202 pipeWriter->writeSequenceStarted =
false;
204 if (pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered))
205 pipeWriter->startAsyncWriteLocked();
209 pipeWriter->notifyCompleted(&locker);
213
214
215
216bool QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten)
220 bytesWrittenPending =
true;
221 pendingBytesWrittenValue += numberOfBytesWritten;
222 writeBuffer.free(numberOfBytesWritten);
224 case ERROR_PIPE_NOT_CONNECTED:
225 case ERROR_OPERATION_ABORTED:
229 qErrnoWarning(errorCode,
"QWindowsPipeWriter: write failed.");
235 lastError = errorCode;
240
241
242void QWindowsPipeWriter::notifyCompleted(QMutexLocker<QMutex> *locker)
244 if (!winEventActPosted) {
245 winEventActPosted =
true;
247 QCoreApplication::postEvent(
this,
new QEvent(QEvent::WinEventAct));
254 SetEvent(syncHandle);
258
259
260bool QWindowsPipeWriter::event(QEvent *e)
262 if (e->type() == QEvent::WinEventAct) {
263 consumePendingAndEmit(
true);
266 return QObject::event(e);
270
271
272
273bool QWindowsPipeWriter::consumePendingAndEmit(
bool allowWinActPosting)
275 ResetEvent(syncHandle);
276 QMutexLocker locker(&mutex);
279 if (allowWinActPosting)
280 winEventActPosted =
false;
282 const qint64 numberOfBytesWritten = pendingBytesWrittenValue;
283 const bool emitBytesWritten = bytesWrittenPending;
284 if (emitBytesWritten) {
285 bytesWrittenPending =
false;
286 pendingBytesWrittenValue = 0;
288 const DWORD dwError = lastError;
300 if (dwError != ERROR_SUCCESS && completionState == NoError) {
301 QPointer<QWindowsPipeWriter> alive(
this);
302 completionState = ErrorDetected;
303 if (emitBytesWritten)
304 emit bytesWritten(numberOfBytesWritten);
307 completionState = WriteDisabled;
310 }
else if (emitBytesWritten) {
311 emit bytesWritten(numberOfBytesWritten);
314 return emitBytesWritten;
319#include "moc_qwindowspipewriter_p.cpp"