Qt
Internal/Contributor docs for the Qt SDK. Note: These are NOT official API docs; those are found at https://doc.qt.io/
Loading...
Searching...
No Matches
qrandomaccessasyncfile_threadpool.cpp
Go to the documentation of this file.
1// Copyright (C) 2025 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3// Qt-Security score:significant reason:default
4
6
9
10#include <QtCore/qfuture.h>
11#include <QtCore/qthread.h>
12#include <QtCore/qthreadpool.h>
13
16
17QT_BEGIN_NAMESPACE
18
19namespace {
20
21// We cannot use Q_GLOBAL_STATIC(QThreadPool, foo) because the Windows
22// implementation raises a qWarning in its destructor when used as a global
23// static, and this warning leads to a crash on Windows CI. Cannot reproduce
24// the crash locally, so cannot really fix the issue :(
25// This class should act like a global thread pool, but it'll have a sort of
26// ref counting, and will be created/destroyed by QRAAFP instances.
28{
29public:
30 void ref()
31 {
32 QMutexLocker locker(&m_mutex);
33 if (m_refCount == 0) {
34 Q_ASSERT(!m_pool);
35 m_pool = new QThreadPool;
36 }
37 ++m_refCount;
38 }
39
40 void deref()
41 {
42 QMutexLocker locker(&m_mutex);
43 Q_ASSERT(m_refCount);
44 if (--m_refCount == 0) {
45 delete m_pool;
46 m_pool = nullptr;
47 }
48 }
49
51 {
52 QMutexLocker locker(&m_mutex);
53 Q_ASSERT(m_refCount > 0);
54 return m_pool;
55 }
56
57private:
58 QBasicMutex m_mutex;
59 QThreadPool *m_pool = nullptr;
60 quint64 m_refCount = 0;
61};
62
64
65} // anonymous namespace
66
67QRandomAccessAsyncFilePrivate::QRandomAccessAsyncFilePrivate(decltype(QObjectPrivateVersion) version) :
68 QObjectPrivate(version)
69{
71}
72
77
79{
80 QObject::connect(&m_watcher, &QFutureWatcherBase::finished, q_ptr, [this]{
81 operationComplete();
82 });
83 QObject::connect(&m_watcher, &QFutureWatcherBase::canceled, q_ptr, [this]{
84 operationComplete();
85 });
86}
87
88void QRandomAccessAsyncFilePrivate::cancelAndWait(QIOOperation *op)
89{
90 if (op == m_currentOperation) {
91 m_currentOperation = nullptr; // to discard the result
92 m_watcher.cancel(); // might have no effect
93 m_watcher.waitForFinished();
94 } else {
95 m_operations.removeAll(op);
96 }
97}
98
100QRandomAccessAsyncFilePrivate::open(const QString &path, QIODeviceBase::OpenMode mode)
101{
102 // We generate the command in any case. But if the file is already opened,
103 // it will finish with an error
104 if (m_fileState == FileState::Closed) {
105 m_filePath = path;
106 m_openMode = mode;
107 m_fileState = FileState::OpenPending;
108 }
109
110 auto *dataStorage = new QtPrivate::QIOOperationDataStorage();
111
112 auto *priv = new QIOOperationPrivate(dataStorage);
113 priv->type = QIOOperation::Type::Open;
114
115 auto *op = new QIOOperation(*priv, q_ptr);
116
117 m_operations.append(op);
118 executeNextOperation();
119 return op;
120}
121
123{
124 // all the operations should be aborted
125 for (const auto &op : std::as_const(m_operations)) {
126 if (op) {
127 auto *priv = QIOOperationPrivate::get(op.get());
128 priv->setError(QIOOperation::Error::Aborted);
129 }
130 }
131 m_operations.clear();
132
133 // Wait until the current operation is complete
134 if (m_currentOperation) {
135 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
136 priv->setError(QIOOperation::Error::Aborted);
137 cancelAndWait(m_currentOperation.get());
138 }
139
140 {
141 QMutexLocker locker(&m_engineMutex);
142 if (m_engine) {
143 m_engine->close();
144 m_engine.reset();
145 }
146 }
147
148 m_fileState = FileState::Closed;
149}
150
152{
153 QMutexLocker locker(&m_engineMutex);
154 if (m_engine)
155 return m_engine->size();
156 return -1;
157}
158
160{
161 auto *dataStorage = new QtPrivate::QIOOperationDataStorage();
162
163 auto *priv = new QIOOperationPrivate(dataStorage);
164 priv->type = QIOOperation::Type::Flush;
165
166 auto *op = new QIOOperation(*priv, q_ptr);
167 m_operations.append(op);
168 executeNextOperation();
169 return op;
170}
171
173{
174 QByteArray array;
175 array.resizeForOverwrite(maxSize);
176 auto *dataStorage = new QtPrivate::QIOOperationDataStorage(std::move(array));
177
178 auto *priv = new QIOOperationPrivate(dataStorage);
179 priv->offset = offset;
180 priv->type = QIOOperation::Type::Read;
181
182 auto *op = new QIOReadOperation(*priv, q_ptr);
183 m_operations.append(op);
184 executeNextOperation();
185 return op;
186}
187
189QRandomAccessAsyncFilePrivate::write(qint64 offset, const QByteArray &data)
190{
191 auto *dataStorage = new QtPrivate::QIOOperationDataStorage(data);
192
193 auto *priv = new QIOOperationPrivate(dataStorage);
194 priv->offset = offset;
195 priv->type = QIOOperation::Type::Write;
196
197 auto *op = new QIOWriteOperation(*priv, q_ptr);
198 m_operations.append(op);
199 executeNextOperation();
200 return op;
201}
202
204QRandomAccessAsyncFilePrivate::write(qint64 offset, QByteArray &&data)
205{
206 auto *dataStorage = new QtPrivate::QIOOperationDataStorage(std::move(data));
207
208 auto *priv = new QIOOperationPrivate(dataStorage);
209 priv->offset = offset;
210 priv->type = QIOOperation::Type::Write;
211
212 auto *op = new QIOWriteOperation(*priv, q_ptr);
213 m_operations.append(op);
214 executeNextOperation();
215 return op;
216}
217
219QRandomAccessAsyncFilePrivate::readInto(qint64 offset, QSpan<std::byte> buffer)
220{
221 auto *dataStorage =
222 new QtPrivate::QIOOperationDataStorage(QSpan<const QSpan<std::byte>>{buffer});
223
224 auto *priv = new QIOOperationPrivate(dataStorage);
225 priv->offset = offset;
226 priv->type = QIOOperation::Type::Read;
227
228 auto *op = new QIOVectoredReadOperation(*priv, q_ptr);
229 m_operations.append(op);
230 executeNextOperation();
231 return op;
232}
233
235QRandomAccessAsyncFilePrivate::writeFrom(qint64 offset, QSpan<const std::byte> buffer)
236{
237 auto *dataStorage =
238 new QtPrivate::QIOOperationDataStorage(QSpan<const QSpan<const std::byte>>{buffer});
239
240 auto *priv = new QIOOperationPrivate(dataStorage);
241 priv->offset = offset;
242 priv->type = QIOOperation::Type::Write;
243
244 auto *op = new QIOVectoredWriteOperation(*priv, q_ptr);
245 m_operations.append(op);
246 executeNextOperation();
247 return op;
248}
249
251QRandomAccessAsyncFilePrivate::readInto(qint64 offset, QSpan<const QSpan<std::byte>> buffers)
252{
253 auto *dataStorage = new QtPrivate::QIOOperationDataStorage(buffers);
254
255 auto *priv = new QIOOperationPrivate(dataStorage);
256 priv->offset = offset;
257 priv->type = QIOOperation::Type::Read;
258
259 auto *op = new QIOVectoredReadOperation(*priv, q_ptr);
260 m_operations.append(op);
261 executeNextOperation();
262 return op;
263}
264
266QRandomAccessAsyncFilePrivate::writeFrom(qint64 offset, QSpan<const QSpan<const std::byte>> buffers)
267{
268 auto *dataStorage = new QtPrivate::QIOOperationDataStorage(buffers);
269
270 auto *priv = new QIOOperationPrivate(dataStorage);
271 priv->offset = offset;
272 priv->type = QIOOperation::Type::Write;
273
274 auto *op = new QIOVectoredWriteOperation(*priv, q_ptr);
275 m_operations.append(op);
276 executeNextOperation();
277 return op;
278}
279
281executeRead(QFSFileEngine *engine, QBasicMutex *mutex, qint64 offset, char *buffer, qint64 maxSize)
282{
283 QRandomAccessAsyncFilePrivate::OperationResult result{0, QIOOperation::Error::None};
284
285 QMutexLocker locker(mutex);
286 if (engine) {
287 if (engine->seek(offset)) {
288 qint64 bytesRead = engine->read(buffer, maxSize);
289 if (bytesRead >= 0)
290 result.bytesProcessed = bytesRead;
291 else
292 result.error = QIOOperation::Error::Read;
293 } else {
294 result.error = QIOOperation::Error::IncorrectOffset;
295 }
296 } else {
297 result.error = QIOOperation::Error::FileNotOpen;
298 }
299 return result;
300}
301
303executeWrite(QFSFileEngine *engine, QBasicMutex *mutex, qint64 offset,
304 const char *buffer, qint64 size)
305{
306 QRandomAccessAsyncFilePrivate::OperationResult result{0, QIOOperation::Error::None};
307
308 QMutexLocker locker(mutex);
309 if (engine) {
310 if (engine->seek(offset)) {
311 qint64 written = engine->write(buffer, size);
312 if (written >= 0)
313 result.bytesProcessed = written;
314 else
315 result.error = QIOOperation::Error::Write;
316 } else {
317 result.error = QIOOperation::Error::IncorrectOffset;
318 }
319 } else {
320 result.error = QIOOperation::Error::FileNotOpen;
321 }
322 return result;
323}
324
325void QRandomAccessAsyncFilePrivate::executeNextOperation()
326{
327 if (m_currentOperation.isNull()) {
328 // start next
329 if (!m_operations.isEmpty()) {
330 m_currentOperation = m_operations.takeFirst();
331 switch (m_currentOperation->type()) {
332 case QIOOperation::Type::Read:
333 case QIOOperation::Type::Write:
334 numProcessedBuffers = 0;
335 processBufferAt(numProcessedBuffers);
336 break;
337 case QIOOperation::Type::Flush:
338 processFlush();
339 break;
340 case QIOOperation::Type::Open:
341 processOpen();
342 break;
343 case QIOOperation::Type::Unknown:
344 Q_ASSERT_X(false, "executeNextOperation", "Operation of type Unknown!");
345 // For release builds - directly complete the operation
346 m_watcher.setFuture(QtFuture::makeReadyValueFuture(OperationResult{}));
347 operationComplete();
348 break;
349 }
350 }
351 }
352}
353
354void QRandomAccessAsyncFilePrivate::processBufferAt(qsizetype idx)
355{
356 Q_ASSERT(!m_currentOperation.isNull());
357 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
358 auto &dataStorage = priv->dataStorage;
359 // if we do not use span buffers, we have only one buffer
360 Q_ASSERT(dataStorage->containsReadSpans()
361 || dataStorage->containsWriteSpans()
362 || idx == 0);
363 if (priv->type == QIOOperation::Type::Read) {
364 qint64 maxSize = -1;
365 char *buf = nullptr;
366 if (dataStorage->containsReadSpans()) {
367 auto &readBuffers = dataStorage->getReadSpans();
368 Q_ASSERT(readBuffers.size() > idx);
369 maxSize = readBuffers[idx].size_bytes();
370 buf = reinterpret_cast<char *>(readBuffers[idx].data());
371 } else {
372 Q_ASSERT(dataStorage->containsByteArray());
373 auto &array = dataStorage->getByteArray();
374 maxSize = array.size();
375 buf = array.data();
376 }
377 Q_ASSERT(maxSize >= 0);
378
379 qint64 offset = priv->offset;
380 if (idx != 0)
381 offset += priv->processed;
382 QBasicMutex *mutexPtr = &m_engineMutex;
383 auto op = [engine = m_engine.get(), buf, maxSize, offset, mutexPtr] {
384 return executeRead(engine, mutexPtr, offset, buf, maxSize);
385 };
386
387 QFuture<OperationResult> f =
388 QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
389 m_watcher.setFuture(f);
390 } else if (priv->type == QIOOperation::Type::Write) {
391 qint64 size = -1;
392 const char *buf = nullptr;
393 if (dataStorage->containsWriteSpans()) {
394 const auto &writeBuffers = dataStorage->getWriteSpans();
395 Q_ASSERT(writeBuffers.size() > idx);
396 size = writeBuffers[idx].size_bytes();
397 buf = reinterpret_cast<const char *>(writeBuffers[idx].data());
398 } else {
399 Q_ASSERT(dataStorage->containsByteArray());
400 const auto &array = dataStorage->getByteArray();
401 size = array.size();
402 buf = array.constData();
403 }
404 Q_ASSERT(size >= 0);
405
406 qint64 offset = priv->offset;
407 if (idx != 0)
408 offset += priv->processed;
409 QBasicMutex *mutexPtr = &m_engineMutex;
410 auto op = [engine = m_engine.get(), buf, size, offset, mutexPtr] {
411 return executeWrite(engine, mutexPtr, offset, buf, size);
412 };
413
414 QFuture<OperationResult> f =
415 QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
416 m_watcher.setFuture(f);
417 }
418}
419
420void QRandomAccessAsyncFilePrivate::processFlush()
421{
422 Q_ASSERT(!m_currentOperation.isNull());
423 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
424 auto &dataStorage = priv->dataStorage;
425 Q_ASSERT(dataStorage->isEmpty());
426
427 QBasicMutex *mutexPtr = &m_engineMutex;
428 auto op = [engine = m_engine.get(), mutexPtr] {
429 QMutexLocker locker(mutexPtr);
430 QRandomAccessAsyncFilePrivate::OperationResult result{0, QIOOperation::Error::None};
431 if (engine) {
432 if (!engine->flush())
433 result.error = QIOOperation::Error::Flush;
434 } else {
435 result.error = QIOOperation::Error::FileNotOpen;
436 }
437 return result;
438 };
439
440 QFuture<OperationResult> f =
441 QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
442 m_watcher.setFuture(f);
443}
444
445void QRandomAccessAsyncFilePrivate::processOpen()
446{
447 Q_ASSERT(!m_currentOperation.isNull());
448 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
449 auto &dataStorage = priv->dataStorage;
450 Q_ASSERT(dataStorage->isEmpty());
451
452 QFuture<OperationResult> f;
453 if (m_fileState == FileState::OpenPending) {
454 // create the engine
455 m_engineMutex.lock();
456 m_engine = std::make_unique<QFSFileEngine>(m_filePath);
457 m_engineMutex.unlock();
458 QBasicMutex *mutexPtr = &m_engineMutex;
459 auto op = [engine = m_engine.get(), mutexPtr, mode = m_openMode] {
460 QRandomAccessAsyncFilePrivate::OperationResult result{0, QIOOperation::Error::None};
461 QMutexLocker locker(mutexPtr);
462 const bool res =
463 engine && engine->open(mode | QIODeviceBase::Unbuffered, std::nullopt);
464 if (!res)
465 result.error = QIOOperation::Error::Open;
466 return result;
467 };
468 f = QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
469 } else {
470 f = QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), [] {
471 return QRandomAccessAsyncFilePrivate::OperationResult{0, QIOOperation::Error::Open};
472 });
473 }
474 m_watcher.setFuture(f);
475}
476
477void QRandomAccessAsyncFilePrivate::operationComplete()
478{
479 // TODO: if one of the buffers was read/written with an error,
480 // stop processing immediately
481
482 auto scheduleNextOperation = qScopeGuard([this]{
483 m_currentOperation = nullptr;
484 executeNextOperation();
485 });
486
487 if (m_currentOperation && !m_watcher.isCanceled()) {
488 OperationResult res = m_watcher.future().result();
489 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
490 auto &dataStorage = priv->dataStorage;
491
492 switch (priv->type) {
493 case QIOOperation::Type::Read: {
494 qsizetype expectedBuffersCount = 1;
495 if (dataStorage->containsReadSpans()) {
496 auto &readBuffers = dataStorage->getReadSpans();
497 expectedBuffersCount = readBuffers.size();
498 Q_ASSERT(numProcessedBuffers < expectedBuffersCount);
499 const qsizetype unusedBytes =
500 readBuffers[numProcessedBuffers].size_bytes() - res.bytesProcessed;
501 readBuffers[numProcessedBuffers].chop(unusedBytes);
502 } else {
503 Q_ASSERT(dataStorage->containsByteArray());
504 Q_ASSERT(numProcessedBuffers == 0);
505 auto &array = dataStorage->getByteArray();
506 array.resize(res.bytesProcessed);
507 }
508 priv->appendBytesProcessed(res.bytesProcessed);
509 if (++numProcessedBuffers < expectedBuffersCount) {
510 // keep executing this command
511 processBufferAt(numProcessedBuffers);
512 scheduleNextOperation.dismiss();
513 } else {
514 priv->operationComplete(res.error);
515 }
516 break;
517 }
518 case QIOOperation::Type::Write: {
519 qsizetype expectedBuffersCount = 1;
520 if (dataStorage->containsWriteSpans())
521 expectedBuffersCount = dataStorage->getWriteSpans().size();
522 Q_ASSERT(numProcessedBuffers < expectedBuffersCount);
523 priv->appendBytesProcessed(res.bytesProcessed);
524 if (++numProcessedBuffers < expectedBuffersCount) {
525 // keep executing this command
526 processBufferAt(numProcessedBuffers);
527 scheduleNextOperation.dismiss();
528 } else {
529 priv->operationComplete(res.error);
530 }
531 break;
532 }
533 case QIOOperation::Type::Flush:
534 priv->operationComplete(res.error);
535 break;
536 case QIOOperation::Type::Open:
537 if (m_fileState == FileState::OpenPending) {
538 if (res.error == QIOOperation::Error::None) {
539 m_fileState = FileState::Opened;
540 } else {
541 m_fileState = FileState::Closed;
542 m_engineMutex.lock();
543 m_engine.reset();
544 m_engineMutex.unlock();
545 }
546 }
547 priv->operationComplete(res.error);
548 break;
549 case QIOOperation::Type::Unknown:
550 priv->setError(QIOOperation::Error::Aborted);
551 break;
552 }
553 }
554}
555
556QT_END_NAMESPACE
QIOOperationPrivate(QtPrivate::QIOOperationDataStorage *storage)
QIOOperation * open(const QString &path, QIODeviceBase::OpenMode mode)
QIOReadOperation * read(qint64 offset, qint64 maxSize)
QIOVectoredWriteOperation * writeFrom(qint64 offset, QSpan< const std::byte > buffer)
QIOWriteOperation * write(qint64 offset, QByteArray &&data)
QIOWriteOperation * write(qint64 offset, const QByteArray &data)
QIOVectoredReadOperation * readInto(qint64 offset, QSpan< std::byte > buffer)
static SharedThreadPool asyncFileThreadPool
static QRandomAccessAsyncFilePrivate::OperationResult executeRead(QFSFileEngine *engine, QBasicMutex *mutex, qint64 offset, char *buffer, qint64 maxSize)
static QRandomAccessAsyncFilePrivate::OperationResult executeWrite(QFSFileEngine *engine, QBasicMutex *mutex, qint64 offset, const char *buffer, qint64 size)
QT_REQUIRE_CONFIG(thread)
QFuture< void > future
[5]