10#include <QtCore/qfuture.h>
11#include <QtCore/qthread.h>
12#include <QtCore/qthreadpool.h>
32 QMutexLocker locker(&m_mutex);
33 if (m_refCount == 0) {
35 m_pool =
new QThreadPool;
42 QMutexLocker locker(&m_mutex);
44 if (--m_refCount == 0) {
52 QMutexLocker locker(&m_mutex);
53 Q_ASSERT(m_refCount > 0);
59 QThreadPool *m_pool =
nullptr;
60 quint64 m_refCount = 0;
68 QObjectPrivate(version)
80 QObject::connect(&m_watcher, &QFutureWatcherBase::finished, q_ptr, [
this]{
83 QObject::connect(&m_watcher, &QFutureWatcherBase::canceled, q_ptr, [
this]{
90 if (op == m_currentOperation) {
91 m_currentOperation =
nullptr;
93 m_watcher.waitForFinished();
95 m_operations.removeAll(op);
104 if (m_fileState == FileState::Closed) {
107 m_fileState = FileState::OpenPending;
113 priv->type = QIOOperation::Type::Open;
115 auto *op =
new QIOOperation(*priv, q_ptr);
117 m_operations.append(op);
118 executeNextOperation();
125 for (
const auto &op : std::as_const(m_operations)) {
127 auto *priv = QIOOperationPrivate::get(op.get());
128 priv->setError(QIOOperation::Error::Aborted);
131 m_operations.clear();
134 if (m_currentOperation) {
135 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
136 priv->setError(QIOOperation::Error::Aborted);
137 cancelAndWait(m_currentOperation.get());
141 QMutexLocker locker(&m_engineMutex);
148 m_fileState = FileState::Closed;
153 QMutexLocker locker(&m_engineMutex);
155 return m_engine->size();
164 priv->type = QIOOperation::Type::Flush;
166 auto *op =
new QIOOperation(*priv, q_ptr);
167 m_operations.append(op);
168 executeNextOperation();
175 array.resizeForOverwrite(maxSize);
179 priv->offset = offset;
180 priv->type = QIOOperation::Type::Read;
182 auto *op =
new QIOReadOperation(*priv, q_ptr);
183 m_operations.append(op);
184 executeNextOperation();
194 priv->offset = offset;
195 priv->type = QIOOperation::Type::Write;
197 auto *op =
new QIOWriteOperation(*priv, q_ptr);
198 m_operations.append(op);
199 executeNextOperation();
209 priv->offset = offset;
210 priv->type = QIOOperation::Type::Write;
212 auto *op =
new QIOWriteOperation(*priv, q_ptr);
213 m_operations.append(op);
214 executeNextOperation();
222 new QtPrivate::QIOOperationDataStorage(QSpan<
const QSpan<std::byte>>{buffer});
225 priv->offset = offset;
226 priv->type = QIOOperation::Type::Read;
228 auto *op =
new QIOVectoredReadOperation(*priv, q_ptr);
229 m_operations.append(op);
230 executeNextOperation();
238 new QtPrivate::QIOOperationDataStorage(QSpan<
const QSpan<
const std::byte>>{buffer});
241 priv->offset = offset;
242 priv->type = QIOOperation::Type::Write;
244 auto *op =
new QIOVectoredWriteOperation(*priv, q_ptr);
245 m_operations.append(op);
246 executeNextOperation();
256 priv->offset = offset;
257 priv->type = QIOOperation::Type::Read;
259 auto *op =
new QIOVectoredReadOperation(*priv, q_ptr);
260 m_operations.append(op);
261 executeNextOperation();
271 priv->offset = offset;
272 priv->type = QIOOperation::Type::Write;
274 auto *op =
new QIOVectoredWriteOperation(*priv, q_ptr);
275 m_operations.append(op);
276 executeNextOperation();
281executeRead(QFSFileEngine *engine, QBasicMutex *mutex, qint64 offset,
char *buffer, qint64 maxSize)
283 QRandomAccessAsyncFilePrivate::OperationResult result{0, QIOOperation::Error::None};
285 QMutexLocker locker(mutex);
287 if (engine->seek(offset)) {
288 qint64 bytesRead = engine->read(buffer, maxSize);
290 result.bytesProcessed = bytesRead;
292 result.error = QIOOperation::Error::Read;
294 result.error = QIOOperation::Error::IncorrectOffset;
297 result.error = QIOOperation::Error::FileNotOpen;
304 const char *buffer, qint64 size)
306 QRandomAccessAsyncFilePrivate::OperationResult result{0, QIOOperation::Error::None};
308 QMutexLocker locker(mutex);
310 if (engine->seek(offset)) {
311 qint64 written = engine->write(buffer, size);
313 result.bytesProcessed = written;
315 result.error = QIOOperation::Error::Write;
317 result.error = QIOOperation::Error::IncorrectOffset;
320 result.error = QIOOperation::Error::FileNotOpen;
327 if (m_currentOperation.isNull()) {
329 if (!m_operations.isEmpty()) {
330 m_currentOperation = m_operations.takeFirst();
331 switch (m_currentOperation->type()) {
332 case QIOOperation::Type::Read:
333 case QIOOperation::Type::Write:
334 numProcessedBuffers = 0;
335 processBufferAt(numProcessedBuffers);
337 case QIOOperation::Type::Flush:
340 case QIOOperation::Type::Open:
343 case QIOOperation::Type::Unknown:
344 Q_ASSERT_X(
false,
"executeNextOperation",
"Operation of type Unknown!");
346 m_watcher.setFuture(QtFuture::makeReadyValueFuture(OperationResult{}));
356 Q_ASSERT(!m_currentOperation.isNull());
357 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
358 auto &dataStorage = priv->dataStorage;
360 Q_ASSERT(dataStorage->containsReadSpans()
361 || dataStorage->containsWriteSpans()
363 if (priv->type == QIOOperation::Type::Read) {
366 if (dataStorage->containsReadSpans()) {
367 auto &readBuffers = dataStorage->getReadSpans();
368 Q_ASSERT(readBuffers.size() > idx);
369 maxSize = readBuffers[idx].size_bytes();
370 buf =
reinterpret_cast<
char *>(readBuffers[idx].data());
372 Q_ASSERT(dataStorage->containsByteArray());
373 auto &array = dataStorage->getByteArray();
374 maxSize = array.size();
377 Q_ASSERT(maxSize >= 0);
379 qint64 offset = priv->offset;
381 offset += priv->processed;
382 QBasicMutex *mutexPtr = &m_engineMutex;
383 auto op = [engine = m_engine.get(), buf, maxSize, offset, mutexPtr] {
384 return executeRead(engine, mutexPtr, offset, buf, maxSize);
387 QFuture<OperationResult> f =
388 QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
389 m_watcher.setFuture(f);
390 }
else if (priv->type == QIOOperation::Type::Write) {
392 const char *buf =
nullptr;
393 if (dataStorage->containsWriteSpans()) {
394 const auto &writeBuffers = dataStorage->getWriteSpans();
395 Q_ASSERT(writeBuffers.size() > idx);
396 size = writeBuffers[idx].size_bytes();
397 buf =
reinterpret_cast<
const char *>(writeBuffers[idx].data());
399 Q_ASSERT(dataStorage->containsByteArray());
400 const auto &array = dataStorage->getByteArray();
402 buf = array.constData();
406 qint64 offset = priv->offset;
408 offset += priv->processed;
409 QBasicMutex *mutexPtr = &m_engineMutex;
410 auto op = [engine = m_engine.get(), buf, size, offset, mutexPtr] {
411 return executeWrite(engine, mutexPtr, offset, buf, size);
414 QFuture<OperationResult> f =
415 QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
416 m_watcher.setFuture(f);
422 Q_ASSERT(!m_currentOperation.isNull());
423 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
424 auto &dataStorage = priv->dataStorage;
425 Q_ASSERT(dataStorage->isEmpty());
427 QBasicMutex *mutexPtr = &m_engineMutex;
428 auto op = [engine = m_engine.get(), mutexPtr] {
429 QMutexLocker locker(mutexPtr);
430 QRandomAccessAsyncFilePrivate::OperationResult result{0, QIOOperation::Error::None};
432 if (!engine->flush())
433 result.error = QIOOperation::Error::Flush;
435 result.error = QIOOperation::Error::FileNotOpen;
440 QFuture<OperationResult> f =
441 QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
442 m_watcher.setFuture(f);
447 Q_ASSERT(!m_currentOperation.isNull());
448 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
449 auto &dataStorage = priv->dataStorage;
450 Q_ASSERT(dataStorage->isEmpty());
452 QFuture<OperationResult> f;
453 if (m_fileState == FileState::OpenPending) {
455 m_engineMutex.lock();
456 m_engine = std::make_unique<QFSFileEngine>(m_filePath);
457 m_engineMutex.unlock();
458 QBasicMutex *mutexPtr = &m_engineMutex;
459 auto op = [engine = m_engine.get(), mutexPtr, mode = m_openMode] {
460 QRandomAccessAsyncFilePrivate::OperationResult result{0, QIOOperation::Error::None};
461 QMutexLocker locker(mutexPtr);
463 engine && engine->open(mode | QIODeviceBase::Unbuffered, std::nullopt);
465 result.error = QIOOperation::Error::Open;
468 f = QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), op);
470 f = QtFuture::makeReadyVoidFuture().then(asyncFileThreadPool(), [] {
471 return QRandomAccessAsyncFilePrivate::OperationResult{0, QIOOperation::Error::Open};
474 m_watcher.setFuture(f);
482 auto scheduleNextOperation = qScopeGuard([
this]{
483 m_currentOperation =
nullptr;
484 executeNextOperation();
487 if (m_currentOperation && !m_watcher.isCanceled()) {
488 OperationResult res = m_watcher.future().result();
489 auto *priv = QIOOperationPrivate::get(m_currentOperation.get());
490 auto &dataStorage = priv->dataStorage;
492 switch (priv->type) {
493 case QIOOperation::Type::Read: {
494 qsizetype expectedBuffersCount = 1;
495 if (dataStorage->containsReadSpans()) {
496 auto &readBuffers = dataStorage->getReadSpans();
497 expectedBuffersCount = readBuffers.size();
498 Q_ASSERT(numProcessedBuffers < expectedBuffersCount);
499 const qsizetype unusedBytes =
500 readBuffers[numProcessedBuffers].size_bytes() - res.bytesProcessed;
501 readBuffers[numProcessedBuffers].chop(unusedBytes);
503 Q_ASSERT(dataStorage->containsByteArray());
504 Q_ASSERT(numProcessedBuffers == 0);
505 auto &array = dataStorage->getByteArray();
506 array.resize(res.bytesProcessed);
508 priv->appendBytesProcessed(res.bytesProcessed);
509 if (++numProcessedBuffers < expectedBuffersCount) {
511 processBufferAt(numProcessedBuffers);
512 scheduleNextOperation.dismiss();
514 priv->operationComplete(res.error);
518 case QIOOperation::Type::Write: {
519 qsizetype expectedBuffersCount = 1;
520 if (dataStorage->containsWriteSpans())
521 expectedBuffersCount = dataStorage->getWriteSpans().size();
522 Q_ASSERT(numProcessedBuffers < expectedBuffersCount);
523 priv->appendBytesProcessed(res.bytesProcessed);
524 if (++numProcessedBuffers < expectedBuffersCount) {
526 processBufferAt(numProcessedBuffers);
527 scheduleNextOperation.dismiss();
529 priv->operationComplete(res.error);
533 case QIOOperation::Type::Flush:
534 priv->operationComplete(res.error);
536 case QIOOperation::Type::Open:
537 if (m_fileState == FileState::OpenPending) {
538 if (res.error == QIOOperation::Error::None) {
539 m_fileState = FileState::Opened;
541 m_fileState = FileState::Closed;
542 m_engineMutex.lock();
544 m_engineMutex.unlock();
547 priv->operationComplete(res.error);
549 case QIOOperation::Type::Unknown:
550 priv->setError(QIOOperation::Error::Aborted);
QIOOperationPrivate(QtPrivate::QIOOperationDataStorage *storage)
QIOOperation * open(const QString &path, QIODeviceBase::OpenMode mode)
~QRandomAccessAsyncFilePrivate() override
QIOReadOperation * read(qint64 offset, qint64 maxSize)
QIOVectoredWriteOperation * writeFrom(qint64 offset, QSpan< const std::byte > buffer)
QIOWriteOperation * write(qint64 offset, QByteArray &&data)
QIOWriteOperation * write(qint64 offset, const QByteArray &data)
QIOVectoredReadOperation * readInto(qint64 offset, QSpan< std::byte > buffer)
QThreadPool * operator()()
QIOOperationDataStorage()
static SharedThreadPool asyncFileThreadPool
static QRandomAccessAsyncFilePrivate::OperationResult executeRead(QFSFileEngine *engine, QBasicMutex *mutex, qint64 offset, char *buffer, qint64 maxSize)
static QRandomAccessAsyncFilePrivate::OperationResult executeWrite(QFSFileEngine *engine, QBasicMutex *mutex, qint64 offset, const char *buffer, qint64 size)
QT_REQUIRE_CONFIG(thread)
QFuture< void > future
[5]