Qt
Internal/Contributor docs for the Qt SDK. Note: These are NOT official API docs; those are found at https://doc.qt.io/
Loading...
Searching...
No Matches
qwindowspipewriter.cpp
Go to the documentation of this file.
1// Copyright (C) 2016 The Qt Company Ltd.
2// Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
3// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
4// Qt-Security score:significant reason:default
5
7#include <qcoreapplication.h>
8#include <QMutexLocker>
9#include <QPointer>
10
12
13QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent)
14 : QObject(parent),
15 handle(pipeWriteEnd),
16 eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
17 syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),
18 waitObject(NULL),
19 pendingBytesWrittenValue(0),
20 lastError(ERROR_SUCCESS),
21 completionState(NoError),
22 stopped(true),
23 writeSequenceStarted(false),
24 bytesWrittenPending(false),
25 winEventActPosted(false)
26{
27 ZeroMemory(&overlapped, sizeof(OVERLAPPED));
28 overlapped.hEvent = eventHandle;
29 waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
30 if (waitObject == NULL)
31 qErrnoWarning("QWindowsPipeWriter: CreateThreadpollWait failed.");
32}
33
34QWindowsPipeWriter::~QWindowsPipeWriter()
35{
36 stop();
37 CloseThreadpoolWait(waitObject);
38 CloseHandle(eventHandle);
39 CloseHandle(syncHandle);
40}
41
42/*!
43 Assigns the handle to this writer. The handle must be valid.
44 Call this function if data was buffered before getting the handle.
45 */
46void QWindowsPipeWriter::setHandle(HANDLE hPipeWriteEnd)
47{
48 Q_ASSERT(!stopped);
49
50 handle = hPipeWriteEnd;
51 QMutexLocker locker(&mutex);
52 startAsyncWriteHelper(&locker);
53}
54
55/*!
56 Stops the asynchronous write sequence.
57 If the write sequence is running then the I/O operation is canceled.
58 */
59void QWindowsPipeWriter::stop()
60{
61 if (stopped)
62 return;
63
64 mutex.lock();
65 stopped = true;
66 if (writeSequenceStarted) {
67 // Trying to disable callback before canceling the operation.
68 // Callback invocation is unnecessary here.
69 SetThreadpoolWait(waitObject, NULL, NULL);
70 if (!CancelIoEx(handle, &overlapped)) {
71 const DWORD dwError = GetLastError();
72 if (dwError != ERROR_NOT_FOUND) {
73 qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.",
74 handle);
75 }
76 }
77 writeSequenceStarted = false;
78 }
79 mutex.unlock();
80
81 WaitForThreadpoolWaitCallbacks(waitObject, TRUE);
82}
83
84/*!
85 Returns the number of bytes that are waiting to be written.
86 */
87qint64 QWindowsPipeWriter::bytesToWrite() const
88{
89 QMutexLocker locker(&mutex);
90 return writeBuffer.size() + pendingBytesWrittenValue;
91}
92
93/*!
94 Returns \c true if async operation is in progress.
95*/
96bool QWindowsPipeWriter::isWriteOperationActive() const
97{
98 return completionState == NoError && bytesToWrite() != 0;
99}
100
101/*!
102 Writes a shallow copy of \a ba to the internal buffer.
103 */
104void QWindowsPipeWriter::write(const QByteArray &ba)
105{
106 if (completionState != WriteDisabled)
107 writeImpl(ba);
108}
109
110/*!
111 Writes data to the internal buffer.
112 */
113void QWindowsPipeWriter::write(const char *data, qint64 size)
114{
115 if (completionState != WriteDisabled)
116 writeImpl(data, size);
117}
118
119template <typename... Args>
120inline void QWindowsPipeWriter::writeImpl(Args... args)
121{
122 QMutexLocker locker(&mutex);
123
124 writeBuffer.append(args...);
125
126 if (writeSequenceStarted || (lastError != ERROR_SUCCESS))
127 return;
128
129 stopped = false;
130
131 // If we don't have an assigned handle yet, defer writing until
132 // setHandle() is called.
133 if (handle != INVALID_HANDLE_VALUE)
134 startAsyncWriteHelper(&locker);
135}
136
137void QWindowsPipeWriter::startAsyncWriteHelper(QMutexLocker<QMutex> *locker)
138{
139 startAsyncWriteLocked();
140
141 // Do not post the event, if the write operation will be completed asynchronously.
142 if (!bytesWrittenPending && lastError == ERROR_SUCCESS)
143 return;
144
145 notifyCompleted(locker);
146}
147
148/*!
149 Starts a new write sequence.
150 */
151void QWindowsPipeWriter::startAsyncWriteLocked()
152{
153 while (!writeBuffer.isEmpty()) {
154 // WriteFile() returns true, if the write operation completes synchronously.
155 // We don't need to call GetOverlappedResult() additionally, because
156 // 'numberOfBytesWritten' is valid in this case.
157 DWORD numberOfBytesWritten;
158 DWORD errorCode = ERROR_SUCCESS;
159 if (!WriteFile(handle, writeBuffer.readPointer(), writeBuffer.nextDataBlockSize(),
160 &numberOfBytesWritten, &overlapped)) {
161 errorCode = GetLastError();
162 if (errorCode == ERROR_IO_PENDING) {
163 // Operation has been queued and will complete in the future.
164 writeSequenceStarted = true;
165 SetThreadpoolWait(waitObject, eventHandle, NULL);
166 break;
167 }
168 }
169
170 if (!writeCompleted(errorCode, numberOfBytesWritten))
171 break;
172 }
173}
174
175/*!
176 \internal
177 Thread pool callback procedure.
178 */
179void QWindowsPipeWriter::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
180 PTP_WAIT wait, TP_WAIT_RESULT waitResult)
181{
182 Q_UNUSED(instance);
183 Q_UNUSED(wait);
184 Q_UNUSED(waitResult);
185 QWindowsPipeWriter *pipeWriter = reinterpret_cast<QWindowsPipeWriter *>(context);
186
187 // Get the result of the asynchronous operation.
188 DWORD numberOfBytesTransfered = 0;
189 DWORD errorCode = ERROR_SUCCESS;
190 if (!GetOverlappedResult(pipeWriter->handle, &pipeWriter->overlapped,
191 &numberOfBytesTransfered, FALSE))
192 errorCode = GetLastError();
193
194 QMutexLocker locker(&pipeWriter->mutex);
195
196 // After the writer was stopped, the only reason why this function can be called is the
197 // completion of a cancellation. No signals should be emitted, and no new write sequence
198 // should be started in this case.
199 if (pipeWriter->stopped)
200 return;
201
202 pipeWriter->writeSequenceStarted = false;
203
204 if (pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered))
205 pipeWriter->startAsyncWriteLocked();
206
207 // We post the notification even if the write operation failed,
208 // to unblock the main thread, in case it is waiting for the event.
209 pipeWriter->notifyCompleted(&locker);
210}
211
212/*!
213 Will be called whenever the write operation completes. Returns \c true if
214 no error occurred; otherwise returns \c false.
215 */
216bool QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten)
217{
218 switch (errorCode) {
219 case ERROR_SUCCESS:
220 bytesWrittenPending = true;
221 pendingBytesWrittenValue += numberOfBytesWritten;
222 writeBuffer.free(numberOfBytesWritten);
223 return true;
224 case ERROR_PIPE_NOT_CONNECTED: // the other end has closed the pipe
225 case ERROR_OPERATION_ABORTED: // the operation was canceled
226 case ERROR_NO_DATA: // the pipe is being closed
227 break;
228 default:
229 qErrnoWarning(errorCode, "QWindowsPipeWriter: write failed.");
230 break;
231 }
232
233 // The buffer is not cleared here, because the write progress
234 // should appear on the main thread synchronously.
235 lastError = errorCode;
236 return false;
237}
238
239/*!
240 Posts a notification event to the main thread.
241 */
242void QWindowsPipeWriter::notifyCompleted(QMutexLocker<QMutex> *locker)
243{
244 if (!winEventActPosted) {
245 winEventActPosted = true;
246 locker->unlock();
247 QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
248 } else {
249 locker->unlock();
250 }
251
252 // We set the event only after unlocking to avoid additional context
253 // switches due to the released thread immediately running into the lock.
254 SetEvent(syncHandle);
255}
256
257/*!
258 Receives notification that the write operation has completed.
259 */
260bool QWindowsPipeWriter::event(QEvent *e)
261{
262 if (e->type() == QEvent::WinEventAct) {
263 consumePendingAndEmit(true);
264 return true;
265 }
266 return QObject::event(e);
267}
268
269/*!
270 Updates the state and emits pending signals in the main thread.
271 Returns \c true, if bytesWritten() was emitted.
272 */
273bool QWindowsPipeWriter::consumePendingAndEmit(bool allowWinActPosting)
274{
275 ResetEvent(syncHandle);
276 QMutexLocker locker(&mutex);
277
278 // Enable QEvent::WinEventAct posting.
279 if (allowWinActPosting)
280 winEventActPosted = false;
281
282 const qint64 numberOfBytesWritten = pendingBytesWrittenValue;
283 const bool emitBytesWritten = bytesWrittenPending;
284 if (emitBytesWritten) {
285 bytesWrittenPending = false;
286 pendingBytesWrittenValue = 0;
287 }
288 const DWORD dwError = lastError;
289
290 locker.unlock();
291
292 // Disable any further processing, if the pipe was stopped.
293 if (stopped)
294 return false;
295
296 // Trigger 'ErrorDetected' state only once. This state must be set before
297 // emitting the bytesWritten() signal. Otherwise, the write sequence will
298 // be considered not finished, and we may hang if a slot connected
299 // to bytesWritten() calls waitForBytesWritten().
300 if (dwError != ERROR_SUCCESS && completionState == NoError) {
301 QPointer<QWindowsPipeWriter> alive(this);
302 completionState = ErrorDetected;
303 if (emitBytesWritten)
304 emit bytesWritten(numberOfBytesWritten);
305 if (alive) {
306 writeBuffer.clear();
307 completionState = WriteDisabled;
308 emit writeFailed();
309 }
310 } else if (emitBytesWritten) {
311 emit bytesWritten(numberOfBytesWritten);
312 }
313
314 return emitBytesWritten;
315}
316
317QT_END_NAMESPACE
318
319#include "moc_qwindowspipewriter_p.cpp"