Qt
Internal/Contributor docs for the Qt SDK. Note: These are NOT official API docs; those are found at https://doc.qt.io/
Loading...
Searching...
No Matches
qrandomaccessasyncfile_threadpool.cpp
Go to the documentation of this file.
1// Copyright (C) 2025 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3// Qt-Security score:significant reason:default
4
6
9
10#include <QtCore/qfuture.h>
11#include <QtCore/qthread.h>
12#include <QtCore/qthreadpool.h>
13
16
17QT_BEGIN_NAMESPACE
18
19namespace {
20
21// We cannot use Q_GLOBAL_STATIC(QThreadPool, foo) because the Windows
22// implementation raises a qWarning in its destructor when used as a global
23// static, and this warning leads to a crash on Windows CI. Cannot reproduce
24// the crash locally, so cannot really fix the issue :(
25// This class should act like a global thread pool, but it'll have a sort of
26// ref counting, and will be created/destroyed by QRAAFP instances.
28{
29public:
30 void ref()
31 {
32 QMutexLocker locker(&m_mutex);
33 if (m_refCount == 0) {
34 Q_ASSERT(!m_pool);
35 m_pool = new QThreadPool;
36 }
37 ++m_refCount;
38 }
39
40 void deref()
41 {
42 QMutexLocker locker(&m_mutex);
43 Q_ASSERT(m_refCount);
44 if (--m_refCount == 0) {
45 delete m_pool;
46 m_pool = nullptr;
47 }
48 }
49
51 {
52 QMutexLocker locker(&m_mutex);
53 Q_ASSERT(m_refCount > 0);
54 return m_pool;
55 }
56
57private:
58 QBasicMutex m_mutex;
59 QThreadPool *m_pool = nullptr;
60 quint64 m_refCount = 0;
61};
62
64
65} // anonymous namespace
66
67QRandomAccessAsyncFileThreadPoolBackend::QRandomAccessAsyncFileThreadPoolBackend(QRandomAccessAsyncFile *owner) :
68 QRandomAccessAsyncFileBackend(owner)
69{
70 asyncFileThreadPool.ref();
71}
72
73QRandomAccessAsyncFileThreadPoolBackend::~QRandomAccessAsyncFileThreadPoolBackend()
74{
75 asyncFileThreadPool.deref();
76}
77
78bool QRandomAccessAsyncFileThreadPoolBackend::init()
79{
80 QObject::connect(&m_watcher, &QFutureWatcherBase::finished, m_owner, [this]{
81 operationComplete();
82 });
83 QObject::connect(&m_watcher, &QFutureWatcherBase::canceled, m_owner, [this]{
84 operationComplete();
85 });
86 return true;
87}
88
89void QRandomAccessAsyncFileThreadPoolBackend::cancelAndWait(QIOOperation *op)
90{
91 if (op == m_currentOperation) {
92 m_currentOperation = nullptr; // to discard the result
93 m_watcher.cancel(); // might have no effect
94 m_watcher.waitForFinished();
95 } else {
96 m_operations.removeAll(op);
97 }
98}
99
100QIOOperation *
101QRandomAccessAsyncFileThreadPoolBackend::open(const QString &path, QIODeviceBase::OpenMode mode)
102{
103 // We generate the command in any case. But if the file is already opened,
104 // it will finish with an error
105 if (m_fileState == FileState::Closed) {
106 m_filePath = path;
107 m_openMode = mode;
108 m_fileState = FileState::OpenPending;
109 }
110
111 auto *dataStorage = new QtPrivate::QIOOperationDataStorage();
112
113 auto *priv = new QIOOperationPrivate(dataStorage);
114 priv->type = QIOOperation::Type::Open;
115
116 auto *op = new QIOOperation(*priv, m_owner);
117
118 m_operations.append(op);
119 executeNextOperation();
120 return op;
121}
122
123void QRandomAccessAsyncFileThreadPoolBackend::close()
124{
125 // all the operations should be aborted
126 for (const auto &op : std::as_const(m_operations)) {
127 if (op) {
128 auto *priv = QIOOperationPrivate::get(op.get());
129 priv->setError(QIOOperation::Error::Aborted);
130 }
131 }
132 m_operations.clear();
133
134 // Wait until the current operation is complete
135 if (m_currentOperation) {
136 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
137 priv->setError(QIOOperation::Error::Aborted);
138 cancelAndWait(m_currentOperation.get());
139 }
140
141 {
142 QMutexLocker locker(&m_engineMutex);
143 if (m_engine) {
144 m_engine->close();
145 m_engine.reset();
146 }
147 }
148
149 m_fileState = FileState::Closed;
150}
151
152qint64 QRandomAccessAsyncFileThreadPoolBackend::size() const
153{
154 QMutexLocker locker(&m_engineMutex);
155 if (m_engine)
156 return m_engine->size();
157 return -1;
158}
159
160QIOOperation *QRandomAccessAsyncFileThreadPoolBackend::flush()
161{
162 auto *dataStorage = new QtPrivate::QIOOperationDataStorage();
163
164 auto *priv = new QIOOperationPrivate(dataStorage);
165 priv->type = QIOOperation::Type::Flush;
166
167 auto *op = new QIOOperation(*priv, m_owner);
168 m_operations.append(op);
169 executeNextOperation();
170 return op;
171}
172
173QIOReadOperation *QRandomAccessAsyncFileThreadPoolBackend::read(qint64 offset, qint64 maxSize)
174{
175 QByteArray array;
176 array.resizeForOverwrite(maxSize);
177 auto *dataStorage = new QtPrivate::QIOOperationDataStorage(std::move(array));
178
179 auto *priv = new QIOOperationPrivate(dataStorage);
180 priv->offset = offset;
181 priv->type = QIOOperation::Type::Read;
182
183 auto *op = new QIOReadOperation(*priv, m_owner);
184 m_operations.append(op);
185 executeNextOperation();
186 return op;
187}
188
189QIOWriteOperation *
190QRandomAccessAsyncFileThreadPoolBackend::write(qint64 offset, const QByteArray &data)
191{
192 auto *dataStorage = new QtPrivate::QIOOperationDataStorage(data);
193
194 auto *priv = new QIOOperationPrivate(dataStorage);
195 priv->offset = offset;
196 priv->type = QIOOperation::Type::Write;
197
198 auto *op = new QIOWriteOperation(*priv, m_owner);
199 m_operations.append(op);
200 executeNextOperation();
201 return op;
202}
203
204QIOWriteOperation *
205QRandomAccessAsyncFileThreadPoolBackend::write(qint64 offset, QByteArray &&data)
206{
207 auto *dataStorage = new QtPrivate::QIOOperationDataStorage(std::move(data));
208
209 auto *priv = new QIOOperationPrivate(dataStorage);
210 priv->offset = offset;
211 priv->type = QIOOperation::Type::Write;
212
213 auto *op = new QIOWriteOperation(*priv, m_owner);
214 m_operations.append(op);
215 executeNextOperation();
216 return op;
217}
218
219QIOVectoredReadOperation *
220QRandomAccessAsyncFileThreadPoolBackend::readInto(qint64 offset, QSpan<std::byte> buffer)
221{
222 auto *dataStorage =
223 new QtPrivate::QIOOperationDataStorage(QSpan<const QSpan<std::byte>>{buffer});
224
225 auto *priv = new QIOOperationPrivate(dataStorage);
226 priv->offset = offset;
227 priv->type = QIOOperation::Type::Read;
228
229 auto *op = new QIOVectoredReadOperation(*priv, m_owner);
230 m_operations.append(op);
231 executeNextOperation();
232 return op;
233}
234
235QIOVectoredWriteOperation *
236QRandomAccessAsyncFileThreadPoolBackend::writeFrom(qint64 offset, QSpan<const std::byte> buffer)
237{
238 auto *dataStorage =
239 new QtPrivate::QIOOperationDataStorage(QSpan<const QSpan<const std::byte>>{buffer});
240
241 auto *priv = new QIOOperationPrivate(dataStorage);
242 priv->offset = offset;
243 priv->type = QIOOperation::Type::Write;
244
245 auto *op = new QIOVectoredWriteOperation(*priv, m_owner);
246 m_operations.append(op);
247 executeNextOperation();
248 return op;
249}
250
251QIOVectoredReadOperation *
252QRandomAccessAsyncFileThreadPoolBackend::readInto(qint64 offset, QSpan<const QSpan<std::byte>> buffers)
253{
254 auto *dataStorage = new QtPrivate::QIOOperationDataStorage(buffers);
255
256 auto *priv = new QIOOperationPrivate(dataStorage);
257 priv->offset = offset;
258 priv->type = QIOOperation::Type::Read;
259
260 auto *op = new QIOVectoredReadOperation(*priv, m_owner);
261 m_operations.append(op);
262 executeNextOperation();
263 return op;
264}
265
266QIOVectoredWriteOperation *
267QRandomAccessAsyncFileThreadPoolBackend::writeFrom(qint64 offset, QSpan<const QSpan<const std::byte>> buffers)
268{
269 auto *dataStorage = new QtPrivate::QIOOperationDataStorage(buffers);
270
271 auto *priv = new QIOOperationPrivate(dataStorage);
272 priv->offset = offset;
273 priv->type = QIOOperation::Type::Write;
274
275 auto *op = new QIOVectoredWriteOperation(*priv, m_owner);
276 m_operations.append(op);
277 executeNextOperation();
278 return op;
279}
280
282executeRead(QFSFileEngine *engine, QBasicMutex *mutex, qint64 offset, char *buffer, qint64 maxSize)
283{
284 QRandomAccessAsyncFileThreadPoolBackend::OperationResult result{0, QIOOperation::Error::None};
285
286 QMutexLocker locker(mutex);
287 if (engine) {
288 if (engine->seek(offset)) {
289 qint64 bytesRead = engine->read(buffer, maxSize);
290 if (bytesRead >= 0)
291 result.bytesProcessed = bytesRead;
292 else
293 result.error = QIOOperation::Error::Read;
294 } else {
295 result.error = QIOOperation::Error::IncorrectOffset;
296 }
297 } else {
298 result.error = QIOOperation::Error::FileNotOpen;
299 }
300 return result;
301}
302
304executeWrite(QFSFileEngine *engine, QBasicMutex *mutex, qint64 offset,
305 const char *buffer, qint64 size)
306{
307 QRandomAccessAsyncFileThreadPoolBackend::OperationResult result{0, QIOOperation::Error::None};
308
309 QMutexLocker locker(mutex);
310 if (engine) {
311 if (engine->seek(offset)) {
312 qint64 written = engine->write(buffer, size);
313 if (written >= 0)
314 result.bytesProcessed = written;
315 else
316 result.error = QIOOperation::Error::Write;
317 } else {
318 result.error = QIOOperation::Error::IncorrectOffset;
319 }
320 } else {
321 result.error = QIOOperation::Error::FileNotOpen;
322 }
323 return result;
324}
325
326void QRandomAccessAsyncFileThreadPoolBackend::executeNextOperation()
327{
328 if (m_currentOperation.isNull()) {
329 // start next
330 if (!m_operations.isEmpty()) {
331 m_currentOperation = m_operations.takeFirst();
332 switch (m_currentOperation->type()) {
333 case QIOOperation::Type::Read:
334 case QIOOperation::Type::Write:
335 numProcessedBuffers = 0;
336 processBufferAt(numProcessedBuffers);
337 break;
338 case QIOOperation::Type::Flush:
339 processFlush();
340 break;
341 case QIOOperation::Type::Open:
342 processOpen();
343 break;
344 case QIOOperation::Type::Unknown:
345 Q_ASSERT_X(false, "executeNextOperation", "Operation of type Unknown!");
346 // For release builds - directly complete the operation
347 m_watcher.setFuture(QtFuture::makeReadyValueFuture(OperationResult{}));
348 operationComplete();
349 break;
350 }
351 }
352 }
353}
354
355void QRandomAccessAsyncFileThreadPoolBackend::processBufferAt(qsizetype idx)
356{
357 Q_ASSERT(!m_currentOperation.isNull());
358 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
359 auto &dataStorage = priv->dataStorage;
360 // if we do not use span buffers, we have only one buffer
361 Q_ASSERT(dataStorage->containsReadSpans()
362 || dataStorage->containsWriteSpans()
363 || idx == 0);
364 if (priv->type == QIOOperation::Type::Read) {
365 qint64 maxSize = -1;
366 char *buf = nullptr;
367 if (dataStorage->containsReadSpans()) {
368 auto &readBuffers = dataStorage->getReadSpans();
369 Q_ASSERT(readBuffers.size() > idx);
370 maxSize = readBuffers[idx].size_bytes();
371 buf = reinterpret_cast<char *>(readBuffers[idx].data());
372 } else {
373 Q_ASSERT(dataStorage->containsByteArray());
374 auto &array = dataStorage->getByteArray();
375 maxSize = array.size();
376 buf = array.data();
377 }
378 Q_ASSERT(maxSize >= 0);
379
380 qint64 offset = priv->offset;
381 if (idx != 0)
382 offset += priv->processed;
383 QBasicMutex *mutexPtr = &m_engineMutex;
384 auto op = [engine = m_engine.get(), buf, maxSize, offset, mutexPtr] {
385 return executeRead(engine, mutexPtr, offset, buf, maxSize);
386 };
387
388 QFuture<OperationResult> f =
389 QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
390 m_watcher.setFuture(f);
391 } else if (priv->type == QIOOperation::Type::Write) {
392 qint64 size = -1;
393 const char *buf = nullptr;
394 if (dataStorage->containsWriteSpans()) {
395 const auto &writeBuffers = dataStorage->getWriteSpans();
396 Q_ASSERT(writeBuffers.size() > idx);
397 size = writeBuffers[idx].size_bytes();
398 buf = reinterpret_cast<const char *>(writeBuffers[idx].data());
399 } else {
400 Q_ASSERT(dataStorage->containsByteArray());
401 const auto &array = dataStorage->getByteArray();
402 size = array.size();
403 buf = array.constData();
404 }
405 Q_ASSERT(size >= 0);
406
407 qint64 offset = priv->offset;
408 if (idx != 0)
409 offset += priv->processed;
410 QBasicMutex *mutexPtr = &m_engineMutex;
411 auto op = [engine = m_engine.get(), buf, size, offset, mutexPtr] {
412 return executeWrite(engine, mutexPtr, offset, buf, size);
413 };
414
415 QFuture<OperationResult> f =
416 QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
417 m_watcher.setFuture(f);
418 }
419}
420
421void QRandomAccessAsyncFileThreadPoolBackend::processFlush()
422{
423 Q_ASSERT(!m_currentOperation.isNull());
424 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
425 auto &dataStorage = priv->dataStorage;
426 Q_ASSERT(dataStorage->isEmpty());
427
428 QBasicMutex *mutexPtr = &m_engineMutex;
429 auto op = [engine = m_engine.get(), mutexPtr] {
430 QMutexLocker locker(mutexPtr);
431 QRandomAccessAsyncFileThreadPoolBackend::OperationResult result{0, QIOOperation::Error::None};
432 if (engine) {
433 if (!engine->flush())
434 result.error = QIOOperation::Error::Flush;
435 } else {
436 result.error = QIOOperation::Error::FileNotOpen;
437 }
438 return result;
439 };
440
441 QFuture<OperationResult> f =
442 QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
443 m_watcher.setFuture(f);
444}
445
446void QRandomAccessAsyncFileThreadPoolBackend::processOpen()
447{
448 Q_ASSERT(!m_currentOperation.isNull());
449 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
450 auto &dataStorage = priv->dataStorage;
451 Q_ASSERT(dataStorage->isEmpty());
452
453 QFuture<OperationResult> f;
454 if (m_fileState == FileState::OpenPending) {
455 // create the engine
456 m_engineMutex.lock();
457 m_engine = std::make_unique<QFSFileEngine>(m_filePath);
458 m_engineMutex.unlock();
459 QBasicMutex *mutexPtr = &m_engineMutex;
460 auto op = [engine = m_engine.get(), mutexPtr, mode = m_openMode] {
461 QRandomAccessAsyncFileThreadPoolBackend::OperationResult result{0, QIOOperation::Error::None};
462 QMutexLocker locker(mutexPtr);
463 const bool res =
464 engine && engine->open(mode | QIODeviceBase::Unbuffered, std::nullopt);
465 if (!res)
466 result.error = QIOOperation::Error::Open;
467 return result;
468 };
469 f = QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
470 } else {
471 f = QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), [] {
472 return QRandomAccessAsyncFileThreadPoolBackend::OperationResult{0, QIOOperation::Error::Open};
473 });
474 }
475 m_watcher.setFuture(f);
476}
477
478void QRandomAccessAsyncFileThreadPoolBackend::operationComplete()
479{
480 // TODO: if one of the buffers was read/written with an error,
481 // stop processing immediately
482
483 auto scheduleNextOperation = qScopeGuard([this]{
484 m_currentOperation = nullptr;
485 executeNextOperation();
486 });
487
488 if (m_currentOperation && !m_watcher.isCanceled()) {
489 OperationResult res = m_watcher.future().result();
490 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
491 auto &dataStorage = priv->dataStorage;
492
493 switch (priv->type) {
494 case QIOOperation::Type::Read: {
495 qsizetype expectedBuffersCount = 1;
496 if (dataStorage->containsReadSpans()) {
497 auto &readBuffers = dataStorage->getReadSpans();
498 expectedBuffersCount = readBuffers.size();
499 Q_ASSERT(numProcessedBuffers < expectedBuffersCount);
500 const qsizetype unusedBytes =
501 readBuffers[numProcessedBuffers].size_bytes() - res.bytesProcessed;
502 readBuffers[numProcessedBuffers].chop(unusedBytes);
503 } else {
504 Q_ASSERT(dataStorage->containsByteArray());
505 Q_ASSERT(numProcessedBuffers == 0);
506 auto &array = dataStorage->getByteArray();
507 array.resize(res.bytesProcessed);
508 }
509 priv->appendBytesProcessed(res.bytesProcessed);
510 if (++numProcessedBuffers < expectedBuffersCount) {
511 // keep executing this command
512 processBufferAt(numProcessedBuffers);
513 scheduleNextOperation.dismiss();
514 } else {
515 priv->operationComplete(res.error);
516 }
517 break;
518 }
519 case QIOOperation::Type::Write: {
520 qsizetype expectedBuffersCount = 1;
521 if (dataStorage->containsWriteSpans())
522 expectedBuffersCount = dataStorage->getWriteSpans().size();
523 Q_ASSERT(numProcessedBuffers < expectedBuffersCount);
524 priv->appendBytesProcessed(res.bytesProcessed);
525 if (++numProcessedBuffers < expectedBuffersCount) {
526 // keep executing this command
527 processBufferAt(numProcessedBuffers);
528 scheduleNextOperation.dismiss();
529 } else {
530 priv->operationComplete(res.error);
531 }
532 break;
533 }
534 case QIOOperation::Type::Flush:
535 priv->operationComplete(res.error);
536 break;
537 case QIOOperation::Type::Open:
538 if (m_fileState == FileState::OpenPending) {
539 if (res.error == QIOOperation::Error::None) {
540 m_fileState = FileState::Opened;
541 } else {
542 m_fileState = FileState::Closed;
543 m_engineMutex.lock();
544 m_engine.reset();
545 m_engineMutex.unlock();
546 }
547 }
548 priv->operationComplete(res.error);
549 break;
550 case QIOOperation::Type::Unknown:
551 priv->setError(QIOOperation::Error::Aborted);
552 break;
553 }
554 }
555}
556
557QT_END_NAMESPACE
static SharedThreadPool asyncFileThreadPool
QT_REQUIRE_CONFIG(liburing)
static QRandomAccessAsyncFileThreadPoolBackend::OperationResult executeWrite(QFSFileEngine *engine, QBasicMutex *mutex, qint64 offset, const char *buffer, qint64 size)
static QRandomAccessAsyncFileThreadPoolBackend::OperationResult executeRead(QFSFileEngine *engine, QBasicMutex *mutex, qint64 offset, char *buffer, qint64 maxSize)
QFuture< void > future
[5]