9#include <QtCore/qobject.h>
10#include <QtCore/qscopedvaluerollback.h>
11#include <QtCore/private/qcore_unix_p.h>
12#include <QtCore/private/qfiledevice_p.h>
16#include <sys/eventfd.h>
22static_assert(
sizeof(iovec)
23 ==
sizeof(
decltype(std::declval<QIORingRequest<QIORing::Operation::VectoredRead>>()
25static_assert(
alignof(iovec)
26 ==
alignof(
decltype(std::declval<QIORingRequest<QIORing::Operation::VectoredRead>>()
39static_assert(std::atomic<qsizetype>::is_always_lock_free);
43Q_CONSTINIT std::atomic<qsizetype> QtPrivate::testMaxReadWriteLen{MaxReadWriteLen};
48 return MaxReadWriteLen;
50 return QtPrivate::testMaxReadWriteLen.load(std::memory_order_relaxed);
56 if (eventDescriptor != -1)
57 close(eventDescriptor);
62bool QIORing::initializeIORing()
67 io_uring_params params{};
68 params.flags = IORING_SETUP_CQSIZE;
69 params.cq_entries = cqEntries;
70 const int fd = io_uring_setup(sqEntries, ¶ms);
72 qErrnoWarning(-fd,
"Failed to setup io_uring");
76 size_t submissionQueueSize = params.sq_off.array + (params.sq_entries *
sizeof(quint32));
77 size_t completionQueueSize = params.cq_off.cqes + (params.cq_entries *
sizeof(io_uring_cqe));
78 if (params.features & IORING_FEAT_SINGLE_MMAP)
79 submissionQueueSize =
std::max(submissionQueueSize, completionQueueSize);
80 submissionQueue = mmap(
nullptr, submissionQueueSize, PROT_READ | PROT_WRITE,
81 MAP_SHARED | MAP_POPULATE, io_uringFd, IORING_OFF_SQ_RING);
83 qErrnoWarning(errno,
"Failed to mmap io_uring submission queue");
88 const size_t submissionQueueEntriesSize = params.sq_entries *
sizeof(io_uring_sqe);
89 submissionQueueEntries =
static_cast<io_uring_sqe *>(
90 mmap(
nullptr, submissionQueueEntriesSize, PROT_READ | PROT_WRITE,
91 MAP_SHARED | MAP_POPULATE, io_uringFd, IORING_OFF_SQES));
93 qErrnoWarning(errno,
"Failed to mmap io_uring submission queue entries");
94 munmap(submissionQueue, submissionQueueSize);
99 void *completionQueue =
nullptr;
100 if (params.features & IORING_FEAT_SINGLE_MMAP) {
101 completionQueue = submissionQueue;
103 completionQueue = mmap(
nullptr, completionQueueSize, PROT_READ | PROT_WRITE,
104 MAP_SHARED | MAP_POPULATE, io_uringFd, IORING_OFF_CQ_RING);
106 qErrnoWarning(errno,
"Failed to mmap io_uring completion queue");
107 munmap(submissionQueue, submissionQueueSize);
108 munmap(submissionQueueEntries, submissionQueueEntriesSize);
114 sqEntries = params.sq_entries;
115 cqEntries = params.cq_entries;
117 char *sq =
static_cast<
char *>(submissionQueue);
118 sqHead =
reinterpret_cast<quint32 *>(sq + params.sq_off.head);
119 sqTail =
reinterpret_cast<quint32 *>(sq + params.sq_off.tail);
120 sqIndexMask =
reinterpret_cast<quint32 *>(sq + params.sq_off.ring_mask);
121 sqIndexArray =
reinterpret_cast<quint32 *>(sq + params.sq_off.array);
123 char *cq =
static_cast<
char *>(completionQueue);
124 cqHead =
reinterpret_cast<quint32 *>(cq + params.cq_off.head);
125 cqTail =
reinterpret_cast<quint32 *>(cq + params.cq_off.tail);
126 cqIndexMask =
reinterpret_cast<quint32 *>(cq + params.cq_off.ring_mask);
127 completionQueueEntries =
reinterpret_cast<io_uring_cqe *>(cq + params.cq_off.cqes);
129 eventDescriptor = eventfd(0, 0);
130 io_uring_register(io_uringFd, IORING_REGISTER_EVENTFD, &eventDescriptor, 1);
132 notifier.emplace(eventDescriptor, QSocketNotifier::Read);
133 QObject::connect(std::addressof(*notifier), &QSocketNotifier::activated,
134 std::addressof(*notifier), [
this]() { completionReady(); });
138QFileDevice::FileError QIORing::mapFileError(NativeResultType error,
139 QFileDevice::FileError defaultValue)
142 if (-error == ECANCELED)
143 return QFileDevice::AbortError;
147void QIORing::completionReady()
152 std::ignore = read(eventDescriptor, &ignored,
sizeof(ignored));
154 quint32 head = __atomic_load_n(cqHead, __ATOMIC_RELAXED);
155 const quint32 tail = __atomic_load_n(cqTail, __ATOMIC_ACQUIRE);
160 "Status of completion queue, total entries: %u, tail: %u, head: %u, to process: %u",
161 cqEntries, tail, head, (tail - head));
162 while (head != tail) {
164 const io_uring_cqe *cqe = &completionQueueEntries[head & *cqIndexMask];
168 qCDebug(lcQIORing) <<
"Got completed entry. Operation:" << request->operation()
169 <<
"- user_data pointer:" << request;
170 switch (request->operation()) {
173 openRequest = request->
template takeRequestData<Operation::Open>();
176 if (-cqe->res == ECANCELED)
177 openRequest.result.
template emplace<QFileDevice::FileError>(
178 QFileDevice::AbortError);
180 openRequest.result.
template emplace<QFileDevice::FileError>(
181 QFileDevice::OpenError);
183 auto &result = openRequest.result
185 result.fd = cqe->res;
187 invokeCallback(openRequest);
192 closeRequest = request->
template takeRequestData<Operation::Close>();
194 closeRequest.result.emplace<QFileDevice::FileError>(QFileDevice::OpenError);
198 invokeCallback(closeRequest);
202 const ReadWriteStatus status = handleReadCompletion<
Operation::Read>(
203 cqe->res, size_t(cqe->res), request);
204 if (status == ReadWriteStatus::MoreToDo)
206 auto readRequest = request->takeRequestData<Operation::Read>();
207 invokeCallback(readRequest);
211 const ReadWriteStatus status = handleWriteCompletion<
Operation::Write>(
212 cqe->res, size_t(cqe->res), request);
213 if (status == ReadWriteStatus::MoreToDo)
215 auto writeRequest = request->takeRequestData<Operation::Write>();
216 invokeCallback(writeRequest);
220 const ReadWriteStatus status = handleReadCompletion<
Operation::VectoredRead>(
221 cqe->res, size_t(cqe->res), request);
222 if (status == ReadWriteStatus::MoreToDo)
224 auto readvRequest = request->takeRequestData<Operation::VectoredRead>();
225 invokeCallback(readvRequest);
229 const ReadWriteStatus status = handleWriteCompletion<
Operation::VectoredWrite>(
230 cqe->res, size_t(cqe->res), request);
231 if (status == ReadWriteStatus::MoreToDo)
233 auto writevRequest = request->takeRequestData<Operation::VectoredWrite>();
234 invokeCallback(writevRequest);
239 flushRequest = request->
template takeRequestData<Operation::Flush>();
241 flushRequest.result.emplace<QFileDevice::FileError>(QFileDevice::WriteError);
246 flushInProgress =
false;
247 invokeCallback(flushRequest);
252 cancelRequest = request->
template takeRequestData<Operation::Cancel>();
253 invokeCallback(cancelRequest);
258 statRequest = request->
template takeRequestData<Operation::Stat>();
260 statRequest.result.emplace<QFileDevice::FileError>(QFileDevice::OpenError);
262 struct statx *st = request->getExtra<
struct statx>();
265 res.size = st->stx_size;
267 invokeCallback(statRequest);
271 Q_UNREACHABLE_RETURN();
274 auto it = addrItMap.take(request);
275 pendingRequests.erase(it);
277 __atomic_store_n(cqHead, head, __ATOMIC_RELEASE);
279 "Done processing available completions, updated pointers, tail: %u, head: %u", tail,
282 if (!stagePending && unstagedRequests > 0)
286bool QIORing::waitForCompletions(QDeadlineTimer deadline)
288 notifier->setEnabled(
false);
289 auto reactivateNotifier = qScopeGuard([
this]() {
290 notifier->setEnabled(
true);
293 pollfd pfd = qt_make_pollfd(eventDescriptor, POLLIN);
294 return qt_safe_poll(&pfd, 1, deadline) > 0;
318 stagePending =
false;
319 if (unstagedRequests == 0)
322 auto submitToRing = [
this] {
323 int ret = io_uring_enter(io_uringFd, unstagedRequests, 0, 0,
nullptr);
325 qErrnoWarning(
"Error occurred notifying kernel about requests...");
327 unstagedRequests -= ret;
328 qCDebug(lcQIORing) <<
"io_uring_enter returned" << ret;
331 if (submitToRing()) {
333 if (unstagedRequests)
349 invokeOnOp(req, [&](
auto *request) {
350 if constexpr (
QtPrivate::HasFdMember<
decltype(*request)>) {
351 result = request->fd > 0;
357void QIORing::prepareRequests()
359 if (!lastUnqueuedIterator) {
360 qCDebug(lcQIORing,
"Nothing left to queue");
363 Q_ASSERT(!preparingRequests);
364 QScopedValueRollback<
bool> prepareGuard(preparingRequests,
true);
366 quint32 tail = __atomic_load_n(sqTail, __ATOMIC_RELAXED);
367 const quint32 head = __atomic_load_n(sqHead, __ATOMIC_ACQUIRE);
369 "Status of submission queue, total entries: %u, tail: %u, head: %u, free: %u",
370 sqEntries, tail, head, sqEntries - (tail - head));
372 auto it = *lastUnqueuedIterator;
373 lastUnqueuedIterator.reset();
374 const auto end = pendingRequests.end();
375 bool anyQueued =
false;
381 while (!flushInProgress && unstagedRequests != sqEntries && inFlightRequests != cqEntries
383 const quint32 index = tail & *sqIndexMask;
384 io_uring_sqe *sqe = &submissionQueueEntries[index];
386 RequestPrepResult result = prepareRequest(sqe, *it);
389 Q_ASSERT(result != RequestPrepResult::QueueFull);
390 if (result == RequestPrepResult::Defer) {
391 qCDebug(lcQIORing) <<
"Request for" << it->operation()
392 <<
"had to be deferred, will not queue any more requests at the moment.";
395 if (result == RequestPrepResult::RequestCompleted) {
396 addrItMap.remove(std::addressof(*it));
397 it = pendingRequests.erase(it);
403 sqIndexArray[index] = index;
410 lastUnqueuedIterator = it;
413 qCDebug(lcQIORing,
"Queued %u operation(s)",
414 tail - __atomic_load_n(sqTail, __ATOMIC_RELAXED));
415 __atomic_store_n(sqTail, tail, __ATOMIC_RELEASE);
423 return IORING_OP_OPENAT;
425 return IORING_OP_READ;
427 return IORING_OP_CLOSE;
429 return IORING_OP_WRITE;
431 return IORING_OP_READV;
433 return IORING_OP_WRITEV;
435 return IORING_OP_FSYNC;
437 return IORING_OP_ASYNC_CANCEL;
439 return IORING_OP_STATX;
443 Q_UNREACHABLE_RETURN(IORING_OP_NOP);
449 sqe->fd = qint32(request.fd);
455 const void *address, quint64 offset, qsizetype size)
457 prepareFileIOCommon(sqe, request, offset);
458 sqe->len = quint32(size);
459 sqe->addr = quint64(address);
465 int oflags = QT_OPEN_RDONLY;
466#ifdef QT_LARGEFILE_SUPPORT
467 oflags |= QT_OPEN_LARGEFILE;
470 if ((mode & QIODevice::ReadWrite) == QIODevice::ReadWrite)
471 oflags = QT_OPEN_RDWR;
472 else if (mode & QIODevice::WriteOnly)
473 oflags = QT_OPEN_WRONLY;
475 if ((mode & QIODevice::WriteOnly)
476 && !(mode & QIODevice::ExistingOnly))
477 oflags |= QT_OPEN_CREAT;
479 if (mode & QIODevice::Truncate)
480 oflags |= QT_OPEN_TRUNC;
482 if (mode & QIODevice::Append)
483 oflags |= QT_OPEN_APPEND;
485 if (mode & QIODevice::NewOnly)
486 oflags |= QT_OPEN_EXCL;
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519template <
typename SpanOfBytes>
520auto QIORing::getVectoredOpAddressAndSize(QIORing::
GenericRequestType &request, QSpan<SpanOfBytes> spans)
522 using TypeErasedPtr = std::conditional_t<
523 std::is_const_v<std::remove_pointer_t<
typename SpanOfBytes::pointer>>,
const void *,
527 TypeErasedPtr address;
532 if (
auto *extra = request.getExtra<QtPrivate::ReadWriteExtra>())
533 spans.slice(extra->spanIndex);
536 r.op = request.operation();
537 r.address = spans.data();
538 r.size = spans.size();
541 const qsizetype exceedAtIndex = [&]() {
544 for (; i < spans.size(); ++i) {
545 total += spans[i].size();
546 if (total > maxReadWriteLen())
551 if (exceedAtIndex != spans.size()) {
554 if (extra->spanIndex == 0 && extra->spanOffset == 0) {
555 ++ongoingSplitOperations;
556 extra->numSpans = spans.size();
558 if (exceedAtIndex == 0) {
560 const bool isWrite = r.op == QIORing::
Operation::VectoredWrite;
562 auto singleSpan = spans.front();
566 const qsizetype remaining = singleSpan.size() - extra->spanOffset;
567 singleSpan.slice(extra->spanOffset,
std::min(remaining, maxReadWriteLen()));
568 r.address = singleSpan.data();
569 r.size = singleSpan.size();
577 auto limitedSpans = spans.first(exceedAtIndex);
578 r.address = limitedSpans.data();
579 r.size = limitedSpans.size();
585auto QIORing::prepareRequest(io_uring_sqe *sqe,
GenericRequestType &request) -> RequestPrepResult
587 sqe->user_data = qint64(&request);
588 sqe->opcode = toUringOp(request.operation());
590 if (!verifyFd(request)) {
591 finishRequestWithError(request, QFileDevice::OpenError);
592 return RequestPrepResult::RequestCompleted;
595 switch (request.operation()) {
598 *openRequest = request.
template requestData<Operation::Open>();
600 sqe->addr =
reinterpret_cast<quint64>(openRequest->path.native().c_str());
601 sqe->open_flags = openModeToOpenFlags(openRequest->flags);
602 auto &mode = sqe->len;
607 if (ongoingSplitOperations)
610 *closeRequest = request.
template requestData<Operation::Close>();
611 sqe->fd = closeRequest->fd;
613 sqe->flags |= IOSQE_IO_DRAIN;
618 *readRequest = request.
template requestData<Operation::Read>();
619 auto span = readRequest->destination;
620 auto offset = readRequest->offset;
621 if (span.size() >= maxReadWriteLen()) {
622 qCDebug(lcQIORing) <<
"Requested Read of size" << span.size() <<
"has to be split";
624 if (extra->spanOffset == 0)
625 ++ongoingSplitOperations;
626 qsizetype remaining = span.size() - extra->spanOffset;
627 span.slice(extra->spanOffset,
std::min(remaining, maxReadWriteLen()));
628 offset += extra->totalProcessed;
630 prepareFileReadWrite(sqe, *readRequest, span.data(), offset, span.size());
635 *writeRequest = request.
template requestData<Operation::Write>();
636 auto span = writeRequest->source;
637 auto offset = writeRequest->offset;
638 if (span.size() >= maxReadWriteLen()) {
639 qCDebug(lcQIORing) <<
"Requested Write of size" << span.size() <<
"has to be split";
641 if (extra->spanOffset == 0)
642 ++ongoingSplitOperations;
643 qsizetype remaining = span.size() - extra->spanOffset;
644 span.slice(extra->spanOffset,
std::min(remaining, maxReadWriteLen()));
645 offset += extra->totalProcessed;
647 prepareFileReadWrite(sqe, *writeRequest, span.data(), offset, span.size());
652 *readvRequest = request.
template requestData<Operation::VectoredRead>();
653 quint64 offset = readvRequest->offset;
654 if (
auto *extra = request.getExtra<QtPrivate::ReadWriteExtra>())
655 offset += extra->totalProcessed;
656 const auto r = getVectoredOpAddressAndSize(request, readvRequest->destinations);
657 sqe->opcode = toUringOp(r.op);
658 prepareFileReadWrite(sqe, *readvRequest, r.address, offset, r.size);
663 *writevRequest = request.
template requestData<Operation::VectoredWrite>();
664 quint64 offset = writevRequest->offset;
665 if (
auto *extra = request.getExtra<QtPrivate::ReadWriteExtra>())
666 offset += extra->totalProcessed;
667 const auto r = getVectoredOpAddressAndSize(request, writevRequest->sources);
668 sqe->opcode = toUringOp(r.op);
669 prepareFileReadWrite(sqe, *writevRequest, r.address, offset, r.size);
673 if (ongoingSplitOperations)
676 *flushRequest = request.
template requestData<Operation::Flush>();
677 sqe->fd = qint32(flushRequest->fd);
679 sqe->flags |= IOSQE_IO_DRAIN;
680 flushInProgress =
true;
685 *cancelRequest = request.
template requestData<Operation::Cancel>();
687 auto it = std::as_const(addrItMap).find(otherOperation);
688 if (it == addrItMap.cend()) {
689 invokeCallback(*cancelRequest);
690 return RequestPrepResult::RequestCompleted;
692 if (!otherOperation->wasQueued()) {
695 Q_ASSERT(!lastUnqueuedIterator);
696 finishRequestWithError(*otherOperation, QFileDevice::AbortError);
697 pendingRequests.erase(*it);
699 invokeCallback(*cancelRequest);
700 return RequestPrepResult::RequestCompleted;
702 sqe->addr = quint64(otherOperation);
707 *statRequest = request.
template requestData<Operation::Stat>();
709 struct statx *st = request.getOrInitializeExtra<
struct statx>();
711 sqe->fd = statRequest->fd;
714 static const char emptystr[] =
"";
715 sqe->addr = qint64(emptystr);
716 sqe->statx_flags = AT_EMPTY_PATH;
717 sqe->len = STATX_ALL;
718 sqe->off = quint64(st);
722 Q_UNREACHABLE_RETURN(RequestPrepResult::RequestCompleted);
725 return RequestPrepResult::Ok;
744 delete static_cast<
struct statx *>(extra);
Q_CORE_EXPORT void submitRequests()
QtPrivate::Operation Operation
decltype(std::declval< const T & >().fd) DetectFd
constexpr bool HasFdMember
static qsizetype maxReadWriteLen()
static Q_ALWAYS_INLINE void prepareFileIOCommon(io_uring_sqe *sqe, const QIORingRequestOffsetFdBase &request, quint64 offset)
static io_uring_op toUringOp(QIORing::Operation op)
static constexpr qsizetype MaxReadWriteLen
static int openModeToOpenFlags(QIODevice::OpenMode mode)
QT_REQUIRE_CONFIG(liburing)
static Q_ALWAYS_INLINE void prepareFileReadWrite(io_uring_sqe *sqe, const QIORingRequestOffsetFdBase &request, const void *address, quint64 offset, qsizetype size)
QIORing::RequestHandle handle