9#include <QtCore/qobject.h>
10#include <QtCore/qscopedvaluerollback.h>
11#include <QtCore/private/qcore_unix_p.h>
12#include <QtCore/private/qfiledevice_p.h>
13#include <QtCore/private/qfsfileengine_p.h>
17#include <sys/eventfd.h>
23static_assert(
sizeof(iovec)
24 ==
sizeof(
decltype(std::declval<QIORingRequest<QIORing::Operation::VectoredRead>>()
26static_assert(
alignof(iovec)
27 ==
alignof(
decltype(std::declval<QIORingRequest<QIORing::Operation::VectoredRead>>()
40static_assert(std::atomic<qsizetype>::is_always_lock_free);
44Q_CONSTINIT std::atomic<qsizetype> QtPrivate::testMaxReadWriteLen{MaxReadWriteLen};
49 return MaxReadWriteLen;
51 return QtPrivate::testMaxReadWriteLen.load(std::memory_order_relaxed);
57 if (eventDescriptor != -1)
58 close(eventDescriptor);
63bool QIORing::initializeIORing()
68 io_uring_params params{};
69 params.flags = IORING_SETUP_CQSIZE;
70 params.cq_entries = cqEntries;
71 const int fd = io_uring_setup(sqEntries, ¶ms);
73 qErrnoWarning(-fd,
"Failed to setup io_uring");
77 size_t submissionQueueSize = params.sq_off.array + (params.sq_entries *
sizeof(quint32));
78 size_t completionQueueSize = params.cq_off.cqes + (params.cq_entries *
sizeof(io_uring_cqe));
79 if (params.features & IORING_FEAT_SINGLE_MMAP)
80 submissionQueueSize =
std::max(submissionQueueSize, completionQueueSize);
81 submissionQueue = mmap(
nullptr, submissionQueueSize, PROT_READ | PROT_WRITE,
82 MAP_SHARED | MAP_POPULATE, io_uringFd, IORING_OFF_SQ_RING);
84 qErrnoWarning(errno,
"Failed to mmap io_uring submission queue");
89 const size_t submissionQueueEntriesSize = params.sq_entries *
sizeof(io_uring_sqe);
90 submissionQueueEntries =
static_cast<io_uring_sqe *>(
91 mmap(
nullptr, submissionQueueEntriesSize, PROT_READ | PROT_WRITE,
92 MAP_SHARED | MAP_POPULATE, io_uringFd, IORING_OFF_SQES));
94 qErrnoWarning(errno,
"Failed to mmap io_uring submission queue entries");
95 munmap(submissionQueue, submissionQueueSize);
100 void *completionQueue =
nullptr;
101 if (params.features & IORING_FEAT_SINGLE_MMAP) {
102 completionQueue = submissionQueue;
104 completionQueue = mmap(
nullptr, completionQueueSize, PROT_READ | PROT_WRITE,
105 MAP_SHARED | MAP_POPULATE, io_uringFd, IORING_OFF_CQ_RING);
107 qErrnoWarning(errno,
"Failed to mmap io_uring completion queue");
108 munmap(submissionQueue, submissionQueueSize);
109 munmap(submissionQueueEntries, submissionQueueEntriesSize);
115 sqEntries = params.sq_entries;
116 cqEntries = params.cq_entries;
118 char *sq =
static_cast<
char *>(submissionQueue);
119 sqHead =
reinterpret_cast<quint32 *>(sq + params.sq_off.head);
120 sqTail =
reinterpret_cast<quint32 *>(sq + params.sq_off.tail);
121 sqIndexMask =
reinterpret_cast<quint32 *>(sq + params.sq_off.ring_mask);
122 sqIndexArray =
reinterpret_cast<quint32 *>(sq + params.sq_off.array);
124 char *cq =
static_cast<
char *>(completionQueue);
125 cqHead =
reinterpret_cast<quint32 *>(cq + params.cq_off.head);
126 cqTail =
reinterpret_cast<quint32 *>(cq + params.cq_off.tail);
127 cqIndexMask =
reinterpret_cast<quint32 *>(cq + params.cq_off.ring_mask);
128 completionQueueEntries =
reinterpret_cast<io_uring_cqe *>(cq + params.cq_off.cqes);
130 eventDescriptor = eventfd(0, 0);
131 io_uring_register(io_uringFd, IORING_REGISTER_EVENTFD, &eventDescriptor, 1);
133 notifier.emplace(eventDescriptor, QSocketNotifier::Read);
134 QObject::connect(std::addressof(*notifier), &QSocketNotifier::activated,
135 std::addressof(*notifier), [
this]() { completionReady(); });
139QFileDevice::FileError QIORing::mapFileError(NativeResultType error,
140 QFileDevice::FileError defaultValue)
143 if (-error == ECANCELED)
144 return QFileDevice::AbortError;
148void QIORing::completionReady()
153 std::ignore = read(eventDescriptor, &ignored,
sizeof(ignored));
155 quint32 head = __atomic_load_n(cqHead, __ATOMIC_RELAXED);
156 const quint32 tail = __atomic_load_n(cqTail, __ATOMIC_ACQUIRE);
161 "Status of completion queue, total entries: %u, tail: %u, head: %u, to process: %u",
162 cqEntries, tail, head, (tail - head));
163 while (head != tail) {
165 const io_uring_cqe *cqe = &completionQueueEntries[head & *cqIndexMask];
169 qCDebug(lcQIORing) <<
"Got completed entry. Operation:" << request->operation()
170 <<
"- user_data pointer:" << request;
171 switch (request->operation()) {
174 openRequest = request->
template takeRequestData<Operation::Open>();
177 if (-cqe->res == ECANCELED)
178 openRequest.result.
template emplace<QFileDevice::FileError>(
179 QFileDevice::AbortError);
181 openRequest.result.
template emplace<QFileDevice::FileError>(
182 QFileDevice::OpenError);
184 auto &result = openRequest.result
186 result.fd = cqe->res;
188 invokeCallback(openRequest);
193 closeRequest = request->
template takeRequestData<Operation::Close>();
195 closeRequest.result.emplace<QFileDevice::FileError>(QFileDevice::OpenError);
199 invokeCallback(closeRequest);
203 const ReadWriteStatus status = handleReadCompletion<
Operation::Read>(
204 cqe->res, size_t(cqe->res), request);
205 if (status == ReadWriteStatus::MoreToDo)
207 auto readRequest = request->takeRequestData<Operation::Read>();
208 invokeCallback(readRequest);
212 const ReadWriteStatus status = handleWriteCompletion<
Operation::Write>(
213 cqe->res, size_t(cqe->res), request);
214 if (status == ReadWriteStatus::MoreToDo)
216 auto writeRequest = request->takeRequestData<Operation::Write>();
217 invokeCallback(writeRequest);
221 const ReadWriteStatus status = handleReadCompletion<
Operation::VectoredRead>(
222 cqe->res, size_t(cqe->res), request);
223 if (status == ReadWriteStatus::MoreToDo)
225 auto readvRequest = request->takeRequestData<Operation::VectoredRead>();
226 invokeCallback(readvRequest);
230 const ReadWriteStatus status = handleWriteCompletion<
Operation::VectoredWrite>(
231 cqe->res, size_t(cqe->res), request);
232 if (status == ReadWriteStatus::MoreToDo)
234 auto writevRequest = request->takeRequestData<Operation::VectoredWrite>();
235 invokeCallback(writevRequest);
240 flushRequest = request->
template takeRequestData<Operation::Flush>();
242 flushRequest.result.emplace<QFileDevice::FileError>(QFileDevice::WriteError);
247 flushInProgress =
false;
248 invokeCallback(flushRequest);
253 cancelRequest = request->
template takeRequestData<Operation::Cancel>();
254 invokeCallback(cancelRequest);
259 statRequest = request->
template takeRequestData<Operation::Stat>();
261 statRequest.result.emplace<QFileDevice::FileError>(QFileDevice::OpenError);
263 struct statx *st = request->getExtra<
struct statx>();
266 res.size = st->stx_size;
268 invokeCallback(statRequest);
272 Q_UNREACHABLE_RETURN();
275 auto it = addrItMap.take(request);
276 pendingRequests.erase(it);
278 __atomic_store_n(cqHead, head, __ATOMIC_RELEASE);
280 "Done processing available completions, updated pointers, tail: %u, head: %u", tail,
283 if (!stagePending && unstagedRequests > 0)
287bool QIORing::waitForCompletions(QDeadlineTimer deadline)
289 notifier->setEnabled(
false);
290 auto reactivateNotifier = qScopeGuard([
this]() {
291 notifier->setEnabled(
true);
294 pollfd pfd = qt_make_pollfd(eventDescriptor, POLLIN);
295 return qt_safe_poll(&pfd, 1, deadline) > 0;
319 stagePending =
false;
320 if (unstagedRequests == 0)
323 auto submitToRing = [
this] {
324 int ret = io_uring_enter(io_uringFd, unstagedRequests, 0, 0,
nullptr);
326 qErrnoWarning(
"Error occurred notifying kernel about requests...");
328 unstagedRequests -= ret;
329 qCDebug(lcQIORing) <<
"io_uring_enter returned" << ret;
332 if (submitToRing()) {
334 if (unstagedRequests)
350 invokeOnOp(req, [&](
auto *request) {
351 if constexpr (
QtPrivate::HasFdMember<
decltype(*request)>) {
352 result = request->fd > 0;
358void QIORing::prepareRequests()
360 if (!lastUnqueuedIterator) {
361 qCDebug(lcQIORing,
"Nothing left to queue");
364 Q_ASSERT(!preparingRequests);
365 QScopedValueRollback<
bool> prepareGuard(preparingRequests,
true);
367 quint32 tail = __atomic_load_n(sqTail, __ATOMIC_RELAXED);
368 const quint32 head = __atomic_load_n(sqHead, __ATOMIC_ACQUIRE);
370 "Status of submission queue, total entries: %u, tail: %u, head: %u, free: %u",
371 sqEntries, tail, head, sqEntries - (tail - head));
373 auto it = *lastUnqueuedIterator;
374 lastUnqueuedIterator.reset();
375 const auto end = pendingRequests.end();
376 bool anyQueued =
false;
382 while (!flushInProgress && unstagedRequests != sqEntries && inFlightRequests != cqEntries
384 const quint32 index = tail & *sqIndexMask;
385 io_uring_sqe *sqe = &submissionQueueEntries[index];
387 RequestPrepResult result = prepareRequest(sqe, *it);
390 Q_ASSERT(result != RequestPrepResult::QueueFull);
391 if (result == RequestPrepResult::Defer) {
392 qCDebug(lcQIORing) <<
"Request for" << it->operation()
393 <<
"had to be deferred, will not queue any more requests at the moment.";
396 if (result == RequestPrepResult::RequestCompleted) {
397 addrItMap.remove(std::addressof(*it));
398 it = pendingRequests.erase(it);
404 sqIndexArray[index] = index;
411 lastUnqueuedIterator = it;
414 qCDebug(lcQIORing,
"Queued %u operation(s)",
415 tail - __atomic_load_n(sqTail, __ATOMIC_RELAXED));
416 __atomic_store_n(sqTail, tail, __ATOMIC_RELEASE);
424 return IORING_OP_OPENAT;
426 return IORING_OP_READ;
428 return IORING_OP_CLOSE;
430 return IORING_OP_WRITE;
432 return IORING_OP_READV;
434 return IORING_OP_WRITEV;
436 return IORING_OP_FSYNC;
438 return IORING_OP_ASYNC_CANCEL;
440 return IORING_OP_STATX;
444 Q_UNREACHABLE_RETURN(IORING_OP_NOP);
450 sqe->fd = qint32(request.fd);
456 const void *address, quint64 offset, qsizetype size)
458 prepareFileIOCommon(sqe, request, offset);
459 sqe->len = quint32(size);
460 sqe->addr = quint64(address);
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491template <
typename SpanOfBytes>
492auto QIORing::getVectoredOpAddressAndSize(QIORing::
GenericRequestType &request, QSpan<SpanOfBytes> spans)
494 using TypeErasedPtr = std::conditional_t<
495 std::is_const_v<std::remove_pointer_t<
typename SpanOfBytes::pointer>>,
const void *,
499 TypeErasedPtr address;
504 if (
auto *extra = request.getExtra<QtPrivate::ReadWriteExtra>())
505 spans.slice(extra->spanIndex);
508 r.op = request.operation();
509 r.address = spans.data();
510 r.size = spans.size();
513 const qsizetype exceedAtIndex = [&]() {
516 for (; i < spans.size(); ++i) {
517 total += spans[i].size();
518 if (total > maxReadWriteLen())
523 if (exceedAtIndex != spans.size()) {
526 if (extra->spanIndex == 0 && extra->spanOffset == 0) {
527 ++ongoingSplitOperations;
528 extra->numSpans = spans.size();
530 if (exceedAtIndex == 0) {
532 const bool isWrite = r.op == QIORing::
Operation::VectoredWrite;
534 auto singleSpan = spans.front();
538 const qsizetype remaining = singleSpan.size() - extra->spanOffset;
539 singleSpan.slice(extra->spanOffset,
std::min(remaining, maxReadWriteLen()));
540 r.address = singleSpan.data();
541 r.size = singleSpan.size();
549 auto limitedSpans = spans.first(exceedAtIndex);
550 r.address = limitedSpans.data();
551 r.size = limitedSpans.size();
557auto QIORing::prepareRequest(io_uring_sqe *sqe,
GenericRequestType &request) -> RequestPrepResult
559 sqe->user_data = qint64(&request);
560 sqe->opcode = toUringOp(request.operation());
562 if (!verifyFd(request)) {
563 finishRequestWithError(request, QFileDevice::OpenError);
564 return RequestPrepResult::RequestCompleted;
567 switch (request.operation()) {
570 *openRequest = request.
template requestData<Operation::Open>();
572 sqe->addr =
reinterpret_cast<quint64>(openRequest->path.native().c_str());
573 sqe->open_flags = QFSFileEnginePrivate::openModeToOpenFlags(openRequest->flags);
574 auto &mode = sqe->len;
579 if (ongoingSplitOperations)
582 *closeRequest = request.
template requestData<Operation::Close>();
583 sqe->fd = closeRequest->fd;
585 sqe->flags |= IOSQE_IO_DRAIN;
590 *readRequest = request.
template requestData<Operation::Read>();
591 auto span = readRequest->destination;
592 auto offset = readRequest->offset;
593 if (span.size() >= maxReadWriteLen()) {
594 qCDebug(lcQIORing) <<
"Requested Read of size" << span.size() <<
"has to be split";
596 if (extra->spanOffset == 0)
597 ++ongoingSplitOperations;
598 qsizetype remaining = span.size() - extra->spanOffset;
599 span.slice(extra->spanOffset,
std::min(remaining, maxReadWriteLen()));
600 offset += extra->totalProcessed;
602 prepareFileReadWrite(sqe, *readRequest, span.data(), offset, span.size());
607 *writeRequest = request.
template requestData<Operation::Write>();
608 auto span = writeRequest->source;
609 auto offset = writeRequest->offset;
610 if (span.size() >= maxReadWriteLen()) {
611 qCDebug(lcQIORing) <<
"Requested Write of size" << span.size() <<
"has to be split";
613 if (extra->spanOffset == 0)
614 ++ongoingSplitOperations;
615 qsizetype remaining = span.size() - extra->spanOffset;
616 span.slice(extra->spanOffset,
std::min(remaining, maxReadWriteLen()));
617 offset += extra->totalProcessed;
619 prepareFileReadWrite(sqe, *writeRequest, span.data(), offset, span.size());
624 *readvRequest = request.
template requestData<Operation::VectoredRead>();
625 quint64 offset = readvRequest->offset;
626 if (
auto *extra = request.getExtra<QtPrivate::ReadWriteExtra>())
627 offset += extra->totalProcessed;
628 const auto r = getVectoredOpAddressAndSize(request, readvRequest->destinations);
629 sqe->opcode = toUringOp(r.op);
630 prepareFileReadWrite(sqe, *readvRequest, r.address, offset, r.size);
635 *writevRequest = request.
template requestData<Operation::VectoredWrite>();
636 quint64 offset = writevRequest->offset;
637 if (
auto *extra = request.getExtra<QtPrivate::ReadWriteExtra>())
638 offset += extra->totalProcessed;
639 const auto r = getVectoredOpAddressAndSize(request, writevRequest->sources);
640 sqe->opcode = toUringOp(r.op);
641 prepareFileReadWrite(sqe, *writevRequest, r.address, offset, r.size);
645 if (ongoingSplitOperations)
648 *flushRequest = request.
template requestData<Operation::Flush>();
649 sqe->fd = qint32(flushRequest->fd);
651 sqe->flags |= IOSQE_IO_DRAIN;
652 flushInProgress =
true;
657 *cancelRequest = request.
template requestData<Operation::Cancel>();
659 auto it = std::as_const(addrItMap).find(otherOperation);
660 if (it == addrItMap.cend()) {
661 invokeCallback(*cancelRequest);
662 return RequestPrepResult::RequestCompleted;
664 if (!otherOperation->wasQueued()) {
667 Q_ASSERT(!lastUnqueuedIterator);
668 finishRequestWithError(*otherOperation, QFileDevice::AbortError);
669 pendingRequests.erase(*it);
671 invokeCallback(*cancelRequest);
672 return RequestPrepResult::RequestCompleted;
674 sqe->addr = quint64(otherOperation);
679 *statRequest = request.
template requestData<Operation::Stat>();
681 struct statx *st = request.getOrInitializeExtra<
struct statx>();
683 sqe->fd = statRequest->fd;
686 static const char emptystr[] =
"";
687 sqe->addr = qint64(emptystr);
688 sqe->statx_flags = AT_EMPTY_PATH;
689 sqe->len = STATX_ALL;
690 sqe->off = quint64(st);
694 Q_UNREACHABLE_RETURN(RequestPrepResult::RequestCompleted);
697 return RequestPrepResult::Ok;
716 delete static_cast<
struct statx *>(extra);
Q_CORE_EXPORT void submitRequests()
QtPrivate::Operation Operation
decltype(std::declval< const T & >().fd) DetectFd
constexpr bool HasFdMember
static qsizetype maxReadWriteLen()
static Q_ALWAYS_INLINE void prepareFileIOCommon(io_uring_sqe *sqe, const QIORingRequestOffsetFdBase &request, quint64 offset)
static io_uring_op toUringOp(QIORing::Operation op)
static constexpr qsizetype MaxReadWriteLen
QT_REQUIRE_CONFIG(liburing)
static Q_ALWAYS_INLINE void prepareFileReadWrite(io_uring_sqe *sqe, const QIORingRequestOffsetFdBase &request, const void *address, quint64 offset, qsizetype size)
QIORing::RequestHandle handle