117 CancelOptions options)
119 QMutexLocker locker(&m_mutex);
121 const auto oldState = state.loadRelaxed();
124 case QFutureInterfaceBase::CancelMode::CancelAndFinish:
125 if ((oldState & QFutureInterfaceBase::Finished)
126 && (oldState & QFutureInterfaceBase::Canceled)) {
129 switch_from_to(state, suspendingOrSuspended | QFutureInterfaceBase::Running,
130 QFutureInterfaceBase::Canceled | QFutureInterfaceBase::Finished);
132 case QFutureInterfaceBase::CancelMode::CancelOnly:
133 if (oldState & QFutureInterfaceBase::Canceled)
135 switch_from_to(state, suspendingOrSuspended, QFutureInterfaceBase::Canceled);
141 QMutexLocker continuationLocker(&continuationMutex);
144 QMutexLocker nextLocker(&next->continuationMutex);
145 if (next->continuationType == QFutureInterfaceBase::ContinuationType::Then) {
154 waitCondition.wakeAll();
155 pausedWaitCondition.wakeAll();
157 if (!(oldState & QFutureInterfaceBase::Canceled))
158 sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
159 if (mode == QFutureInterfaceBase::CancelMode::CancelAndFinish
160 && !(oldState & QFutureInterfaceBase::Finished)) {
161 sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
182void QFutureInterfaceBase::cancelChain(QFutureInterfaceBase::CancelMode mode)
186 QMutexLocker locker(&d->continuationMutex);
187 QFutureInterfaceBasePrivate *prev = d->nonConcludedParent;
190 prev->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::None);
191 QMutexLocker prevLocker(&prev->continuationMutex);
192 prev = prev->nonConcludedParent;
196 d->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::CancelContinuations);
199void QFutureInterfaceBase::setSuspended(
bool suspend)
201 QMutexLocker locker(&d->m_mutex);
203 switch_on(d->state, Suspending);
204 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
206 switch_off(d->state, suspendingOrSuspended);
207 d->pausedWaitCondition.wakeAll();
208 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
212void QFutureInterfaceBase::toggleSuspended()
214 QMutexLocker locker(&d->m_mutex);
215 if (d->state.loadRelaxed() & suspendingOrSuspended) {
216 switch_off(d->state, suspendingOrSuspended);
217 d->pausedWaitCondition.wakeAll();
218 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
220 switch_on(d->state, Suspending);
221 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
225void QFutureInterfaceBase::reportSuspended()
const
230 QMutexLocker locker(&d->m_mutex);
231 const int state = d->state.loadRelaxed();
232 if (!(state & Suspending) || (state & Suspended))
235 switch_from_to(d->state, Suspending, Suspended);
236 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
239void QFutureInterfaceBase::setThrottled(
bool enable)
241 QMutexLocker lock(&d->m_mutex);
243 switch_on(d->state, Throttled);
245 switch_off(d->state, Throttled);
246 if (!(d->state.loadRelaxed() & suspendingOrSuspended))
247 d->pausedWaitCondition.wakeAll();
317void QFutureInterfaceBase::waitForResume()
321 const int state = d->state.loadRelaxed();
322 if (!(state & suspendingOrSuspended) || (state & Canceled))
326 QMutexLocker lock(&d->m_mutex);
327 const int state = d->state.loadRelaxed();
328 if (!(state & suspendingOrSuspended) || (state & Canceled))
332 const ThreadPoolThreadReleaser releaser(d->pool());
334 d->pausedWaitCondition.wait(&d->m_mutex);
337void QFutureInterfaceBase::suspendIfRequested()
339 const auto canSuspend = [] (
int state) {
341 return (state & suspendingOrSuspended) && !(state & Canceled);
346 const int state = d->state.loadRelaxed();
347 if (!canSuspend(state))
351 QMutexLocker lock(&d->m_mutex);
352 const int state = d->state.loadRelaxed();
353 if (!canSuspend(state))
357 if (!(state & Suspended)) {
359 switch_from_to(d->state, Suspending, Suspended);
360 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
364 const ThreadPoolThreadReleaser releaser(d->pool());
365 d->pausedWaitCondition.wait(&d->m_mutex);
404void QFutureInterfaceBase::reportStarted()
406 QMutexLocker locker(&d->m_mutex);
407 if (d->state.loadRelaxed() & (Started|Canceled|Finished))
409 d->setState(State(Started | Running));
410 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Started));
432void QFutureInterfaceBase::reportException(
const std::exception_ptr &exception)
435 QMutexLocker locker(&d->m_mutex);
436 if (d->state.loadRelaxed() & (Canceled|Finished))
439 d->hasException =
true;
440 d->data.setException(exception);
441 switch_on(d->state, Canceled);
442 d->waitCondition.wakeAll();
443 d->pausedWaitCondition.wakeAll();
444 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
448void QFutureInterfaceBase::reportFinished()
450 QMutexLocker locker(&d->m_mutex);
452 switch_from_to(d->state, Running, Finished);
453 d->waitCondition.wakeAll();
454 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
483void QFutureInterfaceBase::waitForResult(
int resultIndex)
486 d->data.m_exceptionStore.rethrowException();
488 QMutexLocker lock(&d->m_mutex);
489 if (!isRunningOrPending())
495 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
499 const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
500 while (isRunningOrPending() && !d->internal_isResultReadyAt(waitIndex))
501 d->waitCondition.wait(&d->m_mutex);
504 d->data.m_exceptionStore.rethrowException();
507void QFutureInterfaceBase::waitForFinished()
509 QMutexLocker lock(&d->m_mutex);
510 const bool alreadyFinished = isFinished();
513 if (!alreadyFinished) {
514 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
518 while (!isFinished())
519 d->waitCondition.wait(&d->m_mutex);
523 d->data.m_exceptionStore.rethrowException();
526void QFutureInterfaceBase::reportResultsReady(
int beginIndex,
int endIndex)
528 if (beginIndex == endIndex || (d->state.loadRelaxed() & (Canceled|Finished)))
531 d->waitCondition.wakeAll();
533 if (!d->m_progress) {
534 if (d->internal_updateProgressValue(d->m_progressValue + endIndex - beginIndex) ==
false) {
535 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
541 d->sendCallOuts(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
544 QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
549 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex));
590void QFutureInterfaceBase::setProgressRange(
int minimum,
int maximum)
592 QMutexLocker locker(&d->m_mutex);
594 d->m_progress.reset(
new QFutureInterfaceBasePrivate::ProgressData());
595 d->m_progress->minimum = minimum;
596 d->m_progress->maximum = qMax(minimum, maximum);
597 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum));
598 d->m_progressValue = minimum;
612void QFutureInterfaceBase::setProgressValueAndText(
int progressValue,
613 const QString &progressText)
615 QMutexLocker locker(&d->m_mutex);
617 d->m_progress.reset(
new QFutureInterfaceBasePrivate::ProgressData());
619 const bool useProgressRange = (d->m_progress->maximum != 0) || (d->m_progress->minimum != 0);
621 && ((progressValue < d->m_progress->minimum) || (progressValue > d->m_progress->maximum))) {
625 if (d->m_progressValue >= progressValue)
628 if (d->state.loadRelaxed() & (Canceled|Finished))
631 if (d->internal_updateProgress(progressValue, progressText)) {
632 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
634 d->m_progress->text));
733 if (data.m_results.hasNextResult())
736 while ((state.loadRelaxed() & QFutureInterfaceBase::Running)
737 && data.m_results.hasNextResult() ==
false)
738 waitCondition.wait(&m_mutex);
740 return !(state.loadRelaxed() & QFutureInterfaceBase::Canceled)
741 && data.m_results.hasNextResult();
782 if ((enable && (state.loadRelaxed() & QFutureInterfaceBase::Throttled))
783 || (!enable && !(state.loadRelaxed() & QFutureInterfaceBase::Throttled)))
788 switch_on(state, QFutureInterfaceBase::Throttled);
790 switch_off(state, QFutureInterfaceBase::Throttled);
791 if (!(state.loadRelaxed() & suspendingOrSuspended))
792 pausedWaitCondition.wakeAll();
806 const QFutureCallOutEvent &callOutEvent2)
808 if (outputConnections.isEmpty())
811 for (
int i = 0; i < outputConnections.size(); ++i) {
812 QFutureCallOutInterface *iface = outputConnections.at(i);
813 iface->postCallOutEvent(callOutEvent1);
814 iface->postCallOutEvent(callOutEvent2);
823 QMutexLocker locker(&m_mutex);
825 const auto currentState = state.loadRelaxed();
826 if (currentState & QFutureInterfaceBase::Started) {
827 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
829 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
831 m_progress->maximum));
832 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
836 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
839 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
846 QtPrivate::ResultIteratorBase it = data.m_results.begin();
847 while (it != data.m_results.end()) {
848 const int begin = it.resultIndex();
849 const int end = begin + it.batchSize();
850 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
857 if (currentState & QFutureInterfaceBase::Suspended)
858 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
859 else if (currentState & QFutureInterfaceBase::Suspending)
860 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
862 if (currentState & QFutureInterfaceBase::Canceled)
863 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
865 if (currentState & QFutureInterfaceBase::Finished)
866 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
868 outputConnections.append(iface);
873 QMutexLocker lock(&m_mutex);
874 const qsizetype index = outputConnections.indexOf(iface);
877 outputConnections.removeAt(index);
879 iface->callOutInterfaceDisconnected();
887void QFutureInterfaceBase::setContinuation(std::function<
void (
const QFutureInterfaceBase &)> func,
888 void *continuationFutureData, ContinuationType type)
890 auto *futureData =
static_cast<QFutureInterfaceBasePrivate *>(continuationFutureData);
892 QMutexLocker lock(&d->continuationMutex);
897 d->continuationExecuted =
true;
905 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) {
906 if (d->continuation) {
907 qWarning(
"Adding a continuation to a future which already has a continuation. "
908 "The existing continuation is overwritten.");
909 if (d->continuationData)
910 d->continuationData->nonConcludedParent =
nullptr;
912 d->continuation = std::move(func);
914 futureData->continuationType = type;
915 futureData->nonConcludedParent = d;
917 d->continuationData = futureData;
918 Q_ASSERT_X(!futureData || futureData->continuationType != ContinuationType::Unknown,
919 "setContinuation",
"Make sure to provide a correct continuation type!");
932void QFutureInterfaceBase::setContinuation(
const QObject *context, std::function<
void()> func,
933 const QVariant &continuationFuture,
934 ContinuationType type)
938 using FuncType =
void();
939 using Prototype =
typename QtPrivate::Callable<FuncType>::Function;
940 auto slotObj = QtPrivate::makeCallableObject<Prototype>(std::move(func));
942 auto slot = QtPrivate::SlotObjUniquePtr(slotObj);
944 auto *watcher =
new QObjectContinuationWrapper;
945 watcher->moveToThread(context->thread());
952 auto watcherMutex = std::make_shared<QRecursiveMutex>();
953 const auto destroyWatcher = [watcherMutex, watcher]()
mutable {
954 QMutexLocker lock(watcherMutex.get());
959 QObject::connect(watcher, &QObjectContinuationWrapper::run,
963 context, [slot = std::move(slot)] {
964 void *args[] = {
nullptr };
965 slot->call(
nullptr, args);
967 QObject::connect(watcher, &QObjectContinuationWrapper::run, watcher, destroyWatcher);
974 QObject::connect(context, &QObject::destroyed, watcher, destroyWatcher);
978 QFutureInterfaceBasePrivate *continuationFutureData =
nullptr;
979 if (continuationFuture.isValid()) {
980 Q_ASSERT(QLatin1StringView(continuationFuture.typeName())
981 .startsWith(QLatin1StringView(
"QFutureInterface")));
982 const auto continuationPtr =
983 static_cast<
const QFutureInterfaceBase *>(continuationFuture.constData());
984 continuationFutureData = continuationPtr->d;
989 setContinuation([watcherMutex = std::move(watcherMutex),
990 watcher = QPointer(watcher), continuationFuture]
991 (
const QFutureInterfaceBase &parentData)
993 Q_UNUSED(parentData);
994 Q_UNUSED(continuationFuture);
995 QMutexLocker lock(watcherMutex.get());
998 }, continuationFutureData, type);
1012void QFutureInterfaceBase::runContinuation()
const
1014 QMutexLocker lock(&d->continuationMutex);
1015 if (d->continuation && !d->continuationExecuted) {
1018 if (d->continuationData)
1019 d->continuationData->nonConcludedParent =
nullptr;
1023 d->continuationExecuted =
true;
1024 auto fn = std::move(d->continuation);
1032 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned)
1033 d->continuation = std::move(fn);