99 CancelOptions options)
101 QMutexLocker locker(&m_mutex);
103 const auto oldState = state.loadRelaxed();
106 case QFutureInterfaceBase::CancelMode::CancelAndFinish:
107 if ((oldState & QFutureInterfaceBase::Finished)
108 && (oldState & QFutureInterfaceBase::Canceled)) {
111 switch_from_to(state, suspendingOrSuspended | QFutureInterfaceBase::Running,
112 QFutureInterfaceBase::Canceled | QFutureInterfaceBase::Finished);
114 case QFutureInterfaceBase::CancelMode::CancelOnly:
115 if (oldState & QFutureInterfaceBase::Canceled)
117 switch_from_to(state, suspendingOrSuspended, QFutureInterfaceBase::Canceled);
123 QMutexLocker continuationLocker(&continuationMutex);
126 QMutexLocker nextLocker(&next->continuationMutex);
127 if (next->continuationType == QFutureInterfaceBase::ContinuationType::Then) {
136 waitCondition.wakeAll();
137 pausedWaitCondition.wakeAll();
139 if (!(oldState & QFutureInterfaceBase::Canceled))
140 sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
141 if (mode == QFutureInterfaceBase::CancelMode::CancelAndFinish
142 && !(oldState & QFutureInterfaceBase::Finished)) {
143 sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
164void QFutureInterfaceBase::cancelChain(QFutureInterfaceBase::CancelMode mode)
168 QMutexLocker locker(&d->continuationMutex);
169 QFutureInterfaceBasePrivate *prev = d->nonConcludedParent;
172 prev->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::None);
173 QMutexLocker prevLocker(&prev->continuationMutex);
174 prev = prev->nonConcludedParent;
178 d->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::CancelContinuations);
181void QFutureInterfaceBase::setSuspended(
bool suspend)
183 QMutexLocker locker(&d->m_mutex);
185 switch_on(d->state, Suspending);
186 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
188 switch_off(d->state, suspendingOrSuspended);
189 d->pausedWaitCondition.wakeAll();
190 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
194void QFutureInterfaceBase::toggleSuspended()
196 QMutexLocker locker(&d->m_mutex);
197 if (d->state.loadRelaxed() & suspendingOrSuspended) {
198 switch_off(d->state, suspendingOrSuspended);
199 d->pausedWaitCondition.wakeAll();
200 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
202 switch_on(d->state, Suspending);
203 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
207void QFutureInterfaceBase::reportSuspended()
const
212 QMutexLocker locker(&d->m_mutex);
213 const int state = d->state.loadRelaxed();
214 if (!(state & Suspending) || (state & Suspended))
217 switch_from_to(d->state, Suspending, Suspended);
218 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
221void QFutureInterfaceBase::setThrottled(
bool enable)
223 QMutexLocker lock(&d->m_mutex);
225 switch_on(d->state, Throttled);
227 switch_off(d->state, Throttled);
228 if (!(d->state.loadRelaxed() & suspendingOrSuspended))
229 d->pausedWaitCondition.wakeAll();
299void QFutureInterfaceBase::waitForResume()
303 const int state = d->state.loadRelaxed();
304 if (!(state & suspendingOrSuspended) || (state & Canceled))
308 QMutexLocker lock(&d->m_mutex);
309 const int state = d->state.loadRelaxed();
310 if (!(state & suspendingOrSuspended) || (state & Canceled))
314 const ThreadPoolThreadReleaser releaser(d->pool());
316 d->pausedWaitCondition.wait(&d->m_mutex);
319void QFutureInterfaceBase::suspendIfRequested()
321 const auto canSuspend = [] (
int state) {
323 return (state & suspendingOrSuspended) && !(state & Canceled);
328 const int state = d->state.loadRelaxed();
329 if (!canSuspend(state))
333 QMutexLocker lock(&d->m_mutex);
334 const int state = d->state.loadRelaxed();
335 if (!canSuspend(state))
339 if (!(state & Suspended)) {
341 switch_from_to(d->state, Suspending, Suspended);
342 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
346 const ThreadPoolThreadReleaser releaser(d->pool());
347 d->pausedWaitCondition.wait(&d->m_mutex);
386void QFutureInterfaceBase::reportStarted()
388 QMutexLocker locker(&d->m_mutex);
389 if (d->state.loadRelaxed() & (Started|Canceled|Finished))
391 d->setState(State(Started | Running));
392 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Started));
414void QFutureInterfaceBase::reportException(
const std::exception_ptr &exception)
417 QMutexLocker locker(&d->m_mutex);
418 if (d->state.loadRelaxed() & (Canceled|Finished))
421 d->hasException =
true;
422 d->data.setException(exception);
423 switch_on(d->state, Canceled);
424 d->waitCondition.wakeAll();
425 d->pausedWaitCondition.wakeAll();
426 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
430void QFutureInterfaceBase::reportFinished()
432 QMutexLocker locker(&d->m_mutex);
434 switch_from_to(d->state, Running, Finished);
435 d->waitCondition.wakeAll();
436 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
465void QFutureInterfaceBase::waitForResult(
int resultIndex)
468 d->data.m_exceptionStore.rethrowException();
470 QMutexLocker lock(&d->m_mutex);
471 if (!isRunningOrPending())
477 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
481 const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
482 while (isRunningOrPending() && !d->internal_isResultReadyAt(waitIndex))
483 d->waitCondition.wait(&d->m_mutex);
486 d->data.m_exceptionStore.rethrowException();
489void QFutureInterfaceBase::waitForFinished()
491 QMutexLocker lock(&d->m_mutex);
492 const bool alreadyFinished = isFinished();
495 if (!alreadyFinished) {
496 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
500 while (!isFinished())
501 d->waitCondition.wait(&d->m_mutex);
505 d->data.m_exceptionStore.rethrowException();
508void QFutureInterfaceBase::reportResultsReady(
int beginIndex,
int endIndex)
510 if (beginIndex == endIndex || (d->state.loadRelaxed() & (Canceled|Finished)))
513 d->waitCondition.wakeAll();
515 if (!d->m_progress) {
516 if (d->internal_updateProgressValue(d->m_progressValue + endIndex - beginIndex) ==
false) {
517 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
523 d->sendCallOuts(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
526 QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
531 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex));
572void QFutureInterfaceBase::setProgressRange(
int minimum,
int maximum)
574 QMutexLocker locker(&d->m_mutex);
576 d->m_progress.reset(
new QFutureInterfaceBasePrivate::ProgressData());
577 d->m_progress->minimum = minimum;
578 d->m_progress->maximum = qMax(minimum, maximum);
579 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum));
580 d->m_progressValue = minimum;
594void QFutureInterfaceBase::setProgressValueAndText(
int progressValue,
595 const QString &progressText)
597 QMutexLocker locker(&d->m_mutex);
599 d->m_progress.reset(
new QFutureInterfaceBasePrivate::ProgressData());
601 const bool useProgressRange = (d->m_progress->maximum != 0) || (d->m_progress->minimum != 0);
603 && ((progressValue < d->m_progress->minimum) || (progressValue > d->m_progress->maximum))) {
607 if (d->m_progressValue >= progressValue)
610 if (d->state.loadRelaxed() & (Canceled|Finished))
613 if (d->internal_updateProgress(progressValue, progressText)) {
614 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
616 d->m_progress->text));
715 if (data.m_results.hasNextResult())
718 while ((state.loadRelaxed() & QFutureInterfaceBase::Running)
719 && data.m_results.hasNextResult() ==
false)
720 waitCondition.wait(&m_mutex);
722 return !(state.loadRelaxed() & QFutureInterfaceBase::Canceled)
723 && data.m_results.hasNextResult();
764 if ((enable && (state.loadRelaxed() & QFutureInterfaceBase::Throttled))
765 || (!enable && !(state.loadRelaxed() & QFutureInterfaceBase::Throttled)))
770 switch_on(state, QFutureInterfaceBase::Throttled);
772 switch_off(state, QFutureInterfaceBase::Throttled);
773 if (!(state.loadRelaxed() & suspendingOrSuspended))
774 pausedWaitCondition.wakeAll();
788 const QFutureCallOutEvent &callOutEvent2)
790 if (outputConnections.isEmpty())
793 for (
int i = 0; i < outputConnections.size(); ++i) {
794 QFutureCallOutInterface *iface = outputConnections.at(i);
795 iface->postCallOutEvent(callOutEvent1);
796 iface->postCallOutEvent(callOutEvent2);
805 QMutexLocker locker(&m_mutex);
807 const auto currentState = state.loadRelaxed();
808 if (currentState & QFutureInterfaceBase::Started) {
809 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
811 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
813 m_progress->maximum));
814 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
818 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
821 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
828 QtPrivate::ResultIteratorBase it = data.m_results.begin();
829 while (it != data.m_results.end()) {
830 const int begin = it.resultIndex();
831 const int end = begin + it.batchSize();
832 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
839 if (currentState & QFutureInterfaceBase::Suspended)
840 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
841 else if (currentState & QFutureInterfaceBase::Suspending)
842 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
844 if (currentState & QFutureInterfaceBase::Canceled)
845 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
847 if (currentState & QFutureInterfaceBase::Finished)
848 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
850 outputConnections.append(iface);
855 QMutexLocker lock(&m_mutex);
856 const qsizetype index = outputConnections.indexOf(iface);
859 outputConnections.removeAt(index);
861 iface->callOutInterfaceDisconnected();
869void QFutureInterfaceBase::setContinuation(std::function<
void (
const QFutureInterfaceBase &)> func,
870 void *continuationFutureData, ContinuationType type)
872 auto *futureData =
static_cast<QFutureInterfaceBasePrivate *>(continuationFutureData);
874 QMutexLocker lock(&d->continuationMutex);
879 d->continuationExecuted =
true;
887 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) {
888 if (d->continuation) {
889 qWarning(
"Adding a continuation to a future which already has a continuation. "
890 "The existing continuation is overwritten.");
891 if (d->continuationData)
892 d->continuationData->nonConcludedParent =
nullptr;
894 d->continuation = std::move(func);
896 futureData->continuationType = type;
897 futureData->nonConcludedParent = d;
899 d->continuationData = futureData;
900 Q_ASSERT_X(!futureData || futureData->continuationType != ContinuationType::Unknown,
901 "setContinuation",
"Make sure to provide a correct continuation type!");
914void QFutureInterfaceBase::setContinuation(
const QObject *context, std::function<
void()> func,
915 const QVariant &continuationFuture,
916 ContinuationType type)
920 using FuncType =
void();
921 using Prototype =
typename QtPrivate::Callable<FuncType>::Function;
922 auto slotObj = QtPrivate::makeCallableObject<Prototype>(std::move(func));
924 auto slot = QtPrivate::SlotObjUniquePtr(slotObj);
926 auto *watcher =
new QObjectContinuationWrapper;
927 watcher->moveToThread(context->thread());
934 auto watcherMutex = std::make_shared<QRecursiveMutex>();
935 const auto destroyWatcher = [watcherMutex, watcher]()
mutable {
936 QMutexLocker lock(watcherMutex.get());
941 QObject::connect(watcher, &QObjectContinuationWrapper::run,
945 context, [slot = std::move(slot)] {
946 void *args[] = {
nullptr };
947 slot->call(
nullptr, args);
949 QObject::connect(watcher, &QObjectContinuationWrapper::run, watcher, destroyWatcher);
956 QObject::connect(context, &QObject::destroyed, watcher, destroyWatcher);
960 QFutureInterfaceBasePrivate *continuationFutureData =
nullptr;
961 if (continuationFuture.isValid()) {
962 Q_ASSERT(QLatin1StringView(continuationFuture.typeName())
963 .startsWith(QLatin1StringView(
"QFutureInterface")));
964 const auto continuationPtr =
965 static_cast<
const QFutureInterfaceBase *>(continuationFuture.constData());
966 continuationFutureData = continuationPtr->d;
971 setContinuation([watcherMutex = std::move(watcherMutex),
972 watcher = QPointer(watcher), continuationFuture]
973 (
const QFutureInterfaceBase &parentData)
975 Q_UNUSED(parentData);
976 Q_UNUSED(continuationFuture);
977 QMutexLocker lock(watcherMutex.get());
980 }, continuationFutureData, type);
994void QFutureInterfaceBase::runContinuation()
const
996 QMutexLocker lock(&d->continuationMutex);
997 if (d->continuation && !d->continuationExecuted) {
1000 if (d->continuationData)
1001 d->continuationData->nonConcludedParent =
nullptr;
1005 d->continuationExecuted =
true;
1006 auto fn = std::move(d->continuation);
1014 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned)
1015 d->continuation = std::move(fn);