10#include <QtCore/qfuture.h>
11#include <QtCore/qthread.h>
12#include <QtCore/qthreadpool.h>
32 QMutexLocker locker(&m_mutex);
33 if (m_refCount == 0) {
35 m_pool =
new QThreadPool;
42 QMutexLocker locker(&m_mutex);
44 if (--m_refCount == 0) {
52 QMutexLocker locker(&m_mutex);
53 Q_ASSERT(m_refCount > 0);
59 QThreadPool *m_pool =
nullptr;
60 quint64 m_refCount = 0;
67QRandomAccessAsyncFileThreadPoolBackend::QRandomAccessAsyncFileThreadPoolBackend(QRandomAccessAsyncFile *owner) :
68 QRandomAccessAsyncFileBackend(owner)
70 asyncFileThreadPool.ref();
73QRandomAccessAsyncFileThreadPoolBackend::~QRandomAccessAsyncFileThreadPoolBackend()
75 asyncFileThreadPool.deref();
78bool QRandomAccessAsyncFileThreadPoolBackend::init()
80 QObject::connect(&m_watcher, &QFutureWatcherBase::finished, m_owner, [
this]{
83 QObject::connect(&m_watcher, &QFutureWatcherBase::canceled, m_owner, [
this]{
89void QRandomAccessAsyncFileThreadPoolBackend::cancelAndWait(QIOOperation *op)
91 if (op == m_currentOperation) {
92 m_currentOperation =
nullptr;
94 m_watcher.waitForFinished();
96 m_operations.removeAll(op);
101QRandomAccessAsyncFileThreadPoolBackend::open(
const QString &path, QIODeviceBase::OpenMode mode)
105 if (m_fileState == FileState::Closed) {
108 m_fileState = FileState::OpenPending;
111 auto *dataStorage =
new QtPrivate::QIOOperationDataStorage();
113 auto *priv =
new QIOOperationPrivate(dataStorage);
114 priv->type = QIOOperation::Type::Open;
116 auto *op =
new QIOOperation(*priv, m_owner);
118 m_operations.append(op);
119 executeNextOperation();
123void QRandomAccessAsyncFileThreadPoolBackend::close()
126 for (
const auto &op : std::as_const(m_operations)) {
128 auto *priv = QIOOperationPrivate::get(op.get());
129 priv->setError(QIOOperation::Error::Aborted);
132 m_operations.clear();
135 if (m_currentOperation) {
136 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
137 priv->setError(QIOOperation::Error::Aborted);
138 cancelAndWait(m_currentOperation.get());
142 QMutexLocker locker(&m_engineMutex);
149 m_fileState = FileState::Closed;
152qint64 QRandomAccessAsyncFileThreadPoolBackend::size()
const
154 QMutexLocker locker(&m_engineMutex);
156 return m_engine->size();
160QIOOperation *QRandomAccessAsyncFileThreadPoolBackend::flush()
162 auto *dataStorage =
new QtPrivate::QIOOperationDataStorage();
164 auto *priv =
new QIOOperationPrivate(dataStorage);
165 priv->type = QIOOperation::Type::Flush;
167 auto *op =
new QIOOperation(*priv, m_owner);
168 m_operations.append(op);
169 executeNextOperation();
173QIOReadOperation *QRandomAccessAsyncFileThreadPoolBackend::read(qint64 offset, qint64 maxSize)
176 array.resizeForOverwrite(maxSize);
177 auto *dataStorage =
new QtPrivate::QIOOperationDataStorage(std::move(array));
179 auto *priv =
new QIOOperationPrivate(dataStorage);
180 priv->offset = offset;
181 priv->type = QIOOperation::Type::Read;
183 auto *op =
new QIOReadOperation(*priv, m_owner);
184 m_operations.append(op);
185 executeNextOperation();
190QRandomAccessAsyncFileThreadPoolBackend::write(qint64 offset,
const QByteArray &data)
192 auto *dataStorage =
new QtPrivate::QIOOperationDataStorage(data);
194 auto *priv =
new QIOOperationPrivate(dataStorage);
195 priv->offset = offset;
196 priv->type = QIOOperation::Type::Write;
198 auto *op =
new QIOWriteOperation(*priv, m_owner);
199 m_operations.append(op);
200 executeNextOperation();
205QRandomAccessAsyncFileThreadPoolBackend::write(qint64 offset, QByteArray &&data)
207 auto *dataStorage =
new QtPrivate::QIOOperationDataStorage(std::move(data));
209 auto *priv =
new QIOOperationPrivate(dataStorage);
210 priv->offset = offset;
211 priv->type = QIOOperation::Type::Write;
213 auto *op =
new QIOWriteOperation(*priv, m_owner);
214 m_operations.append(op);
215 executeNextOperation();
219QIOVectoredReadOperation *
220QRandomAccessAsyncFileThreadPoolBackend::readInto(qint64 offset, QSpan<std::byte> buffer)
223 new QtPrivate::QIOOperationDataStorage(QSpan<
const QSpan<std::byte>>{buffer});
225 auto *priv =
new QIOOperationPrivate(dataStorage);
226 priv->offset = offset;
227 priv->type = QIOOperation::Type::Read;
229 auto *op =
new QIOVectoredReadOperation(*priv, m_owner);
230 m_operations.append(op);
231 executeNextOperation();
235QIOVectoredWriteOperation *
236QRandomAccessAsyncFileThreadPoolBackend::writeFrom(qint64 offset, QSpan<
const std::byte> buffer)
239 new QtPrivate::QIOOperationDataStorage(QSpan<
const QSpan<
const std::byte>>{buffer});
241 auto *priv =
new QIOOperationPrivate(dataStorage);
242 priv->offset = offset;
243 priv->type = QIOOperation::Type::Write;
245 auto *op =
new QIOVectoredWriteOperation(*priv, m_owner);
246 m_operations.append(op);
247 executeNextOperation();
251QIOVectoredReadOperation *
252QRandomAccessAsyncFileThreadPoolBackend::readInto(qint64 offset, QSpan<
const QSpan<std::byte>> buffers)
254 auto *dataStorage =
new QtPrivate::QIOOperationDataStorage(buffers);
256 auto *priv =
new QIOOperationPrivate(dataStorage);
257 priv->offset = offset;
258 priv->type = QIOOperation::Type::Read;
260 auto *op =
new QIOVectoredReadOperation(*priv, m_owner);
261 m_operations.append(op);
262 executeNextOperation();
266QIOVectoredWriteOperation *
267QRandomAccessAsyncFileThreadPoolBackend::writeFrom(qint64 offset, QSpan<
const QSpan<
const std::byte>> buffers)
269 auto *dataStorage =
new QtPrivate::QIOOperationDataStorage(buffers);
271 auto *priv =
new QIOOperationPrivate(dataStorage);
272 priv->offset = offset;
273 priv->type = QIOOperation::Type::Write;
275 auto *op =
new QIOVectoredWriteOperation(*priv, m_owner);
276 m_operations.append(op);
277 executeNextOperation();
282executeRead(QFSFileEngine *engine, QBasicMutex *mutex, qint64 offset,
char *buffer, qint64 maxSize)
284 QRandomAccessAsyncFileThreadPoolBackend::OperationResult result{0, QIOOperation::Error::None};
286 QMutexLocker locker(mutex);
288 if (engine->seek(offset)) {
289 qint64 bytesRead = engine->read(buffer, maxSize);
291 result.bytesProcessed = bytesRead;
293 result.error = QIOOperation::Error::Read;
295 result.error = QIOOperation::Error::IncorrectOffset;
298 result.error = QIOOperation::Error::FileNotOpen;
305 const char *buffer, qint64 size)
307 QRandomAccessAsyncFileThreadPoolBackend::OperationResult result{0, QIOOperation::Error::None};
309 QMutexLocker locker(mutex);
311 if (engine->seek(offset)) {
312 qint64 written = engine->write(buffer, size);
314 result.bytesProcessed = written;
316 result.error = QIOOperation::Error::Write;
318 result.error = QIOOperation::Error::IncorrectOffset;
321 result.error = QIOOperation::Error::FileNotOpen;
326void QRandomAccessAsyncFileThreadPoolBackend::executeNextOperation()
328 if (m_currentOperation.isNull()) {
330 if (!m_operations.isEmpty()) {
331 m_currentOperation = m_operations.takeFirst();
332 switch (m_currentOperation->type()) {
333 case QIOOperation::Type::Read:
334 case QIOOperation::Type::Write:
335 numProcessedBuffers = 0;
336 processBufferAt(numProcessedBuffers);
338 case QIOOperation::Type::Flush:
341 case QIOOperation::Type::Open:
344 case QIOOperation::Type::Unknown:
345 Q_ASSERT_X(
false,
"executeNextOperation",
"Operation of type Unknown!");
347 m_watcher.setFuture(QtFuture::makeReadyValueFuture(OperationResult{}));
355void QRandomAccessAsyncFileThreadPoolBackend::processBufferAt(qsizetype idx)
357 Q_ASSERT(!m_currentOperation.isNull());
358 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
359 auto &dataStorage = priv->dataStorage;
361 Q_ASSERT(dataStorage->containsReadSpans()
362 || dataStorage->containsWriteSpans()
364 if (priv->type == QIOOperation::Type::Read) {
367 if (dataStorage->containsReadSpans()) {
368 auto &readBuffers = dataStorage->getReadSpans();
369 Q_ASSERT(readBuffers.size() > idx);
370 maxSize = readBuffers[idx].size_bytes();
371 buf =
reinterpret_cast<
char *>(readBuffers[idx].data());
373 Q_ASSERT(dataStorage->containsByteArray());
374 auto &array = dataStorage->getByteArray();
375 maxSize = array.size();
378 Q_ASSERT(maxSize >= 0);
380 qint64 offset = priv->offset;
382 offset += priv->processed;
383 QBasicMutex *mutexPtr = &m_engineMutex;
384 auto op = [engine = m_engine.get(), buf, maxSize, offset, mutexPtr] {
385 return executeRead(engine, mutexPtr, offset, buf, maxSize);
388 QFuture<OperationResult> f =
389 QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
390 m_watcher.setFuture(f);
391 }
else if (priv->type == QIOOperation::Type::Write) {
393 const char *buf =
nullptr;
394 if (dataStorage->containsWriteSpans()) {
395 const auto &writeBuffers = dataStorage->getWriteSpans();
396 Q_ASSERT(writeBuffers.size() > idx);
397 size = writeBuffers[idx].size_bytes();
398 buf =
reinterpret_cast<
const char *>(writeBuffers[idx].data());
400 Q_ASSERT(dataStorage->containsByteArray());
401 const auto &array = dataStorage->getByteArray();
403 buf = array.constData();
407 qint64 offset = priv->offset;
409 offset += priv->processed;
410 QBasicMutex *mutexPtr = &m_engineMutex;
411 auto op = [engine = m_engine.get(), buf, size, offset, mutexPtr] {
412 return executeWrite(engine, mutexPtr, offset, buf, size);
415 QFuture<OperationResult> f =
416 QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
417 m_watcher.setFuture(f);
421void QRandomAccessAsyncFileThreadPoolBackend::processFlush()
423 Q_ASSERT(!m_currentOperation.isNull());
424 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
425 auto &dataStorage = priv->dataStorage;
426 Q_ASSERT(dataStorage->isEmpty());
428 QBasicMutex *mutexPtr = &m_engineMutex;
429 auto op = [engine = m_engine.get(), mutexPtr] {
430 QMutexLocker locker(mutexPtr);
431 QRandomAccessAsyncFileThreadPoolBackend::OperationResult result{0, QIOOperation::Error::None};
433 if (!engine->flush())
434 result.error = QIOOperation::Error::Flush;
436 result.error = QIOOperation::Error::FileNotOpen;
441 QFuture<OperationResult> f =
442 QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
443 m_watcher.setFuture(f);
446void QRandomAccessAsyncFileThreadPoolBackend::processOpen()
448 Q_ASSERT(!m_currentOperation.isNull());
449 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
450 auto &dataStorage = priv->dataStorage;
451 Q_ASSERT(dataStorage->isEmpty());
453 QFuture<OperationResult> f;
454 if (m_fileState == FileState::OpenPending) {
456 m_engineMutex.lock();
457 m_engine = std::make_unique<QFSFileEngine>(m_filePath);
458 m_engineMutex.unlock();
459 QBasicMutex *mutexPtr = &m_engineMutex;
460 auto op = [engine = m_engine.get(), mutexPtr, mode = m_openMode] {
461 QRandomAccessAsyncFileThreadPoolBackend::OperationResult result{0, QIOOperation::Error::None};
462 QMutexLocker locker(mutexPtr);
464 engine && engine->open(mode | QIODeviceBase::Unbuffered, std::nullopt);
466 result.error = QIOOperation::Error::Open;
469 f = QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
471 f = QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), [] {
472 return QRandomAccessAsyncFileThreadPoolBackend::OperationResult{0, QIOOperation::Error::Open};
475 m_watcher.setFuture(f);
478void QRandomAccessAsyncFileThreadPoolBackend::operationComplete()
483 auto scheduleNextOperation = qScopeGuard([
this]{
484 m_currentOperation =
nullptr;
485 executeNextOperation();
488 if (m_currentOperation && !m_watcher.isCanceled()) {
489 OperationResult res = m_watcher.future().result();
490 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
491 auto &dataStorage = priv->dataStorage;
493 switch (priv->type) {
494 case QIOOperation::Type::Read: {
495 qsizetype expectedBuffersCount = 1;
496 if (dataStorage->containsReadSpans()) {
497 auto &readBuffers = dataStorage->getReadSpans();
498 expectedBuffersCount = readBuffers.size();
499 Q_ASSERT(numProcessedBuffers < expectedBuffersCount);
500 const qsizetype unusedBytes =
501 readBuffers[numProcessedBuffers].size_bytes() - res.bytesProcessed;
502 readBuffers[numProcessedBuffers].chop(unusedBytes);
504 Q_ASSERT(dataStorage->containsByteArray());
505 Q_ASSERT(numProcessedBuffers == 0);
506 auto &array = dataStorage->getByteArray();
507 array.resize(res.bytesProcessed);
509 priv->appendBytesProcessed(res.bytesProcessed);
510 if (++numProcessedBuffers < expectedBuffersCount) {
512 processBufferAt(numProcessedBuffers);
513 scheduleNextOperation.dismiss();
515 priv->operationComplete(res.error);
519 case QIOOperation::Type::Write: {
520 qsizetype expectedBuffersCount = 1;
521 if (dataStorage->containsWriteSpans())
522 expectedBuffersCount = dataStorage->getWriteSpans().size();
523 Q_ASSERT(numProcessedBuffers < expectedBuffersCount);
524 priv->appendBytesProcessed(res.bytesProcessed);
525 if (++numProcessedBuffers < expectedBuffersCount) {
527 processBufferAt(numProcessedBuffers);
528 scheduleNextOperation.dismiss();
530 priv->operationComplete(res.error);
534 case QIOOperation::Type::Flush:
535 priv->operationComplete(res.error);
537 case QIOOperation::Type::Open:
538 if (m_fileState == FileState::OpenPending) {
539 if (res.error == QIOOperation::Error::None) {
540 m_fileState = FileState::Opened;
542 m_fileState = FileState::Closed;
543 m_engineMutex.lock();
545 m_engineMutex.unlock();
548 priv->operationComplete(res.error);
550 case QIOOperation::Type::Unknown:
551 priv->setError(QIOOperation::Error::Aborted);
QThreadPool * operator()()
static SharedThreadPool asyncFileThreadPool
QT_REQUIRE_CONFIG(liburing)
static QRandomAccessAsyncFileThreadPoolBackend::OperationResult executeWrite(QFSFileEngine *engine, QBasicMutex *mutex, qint64 offset, const char *buffer, qint64 size)
static QRandomAccessAsyncFileThreadPoolBackend::OperationResult executeRead(QFSFileEngine *engine, QBasicMutex *mutex, qint64 offset, char *buffer, qint64 maxSize)
QFuture< void > future
[5]