117 CancelOptions options)
119 QMutexLocker locker(&m_mutex);
121 const auto oldState = state.loadRelaxed();
124 case QFutureInterfaceBase::CancelMode::CancelAndFinish:
125 if ((oldState & QFutureInterfaceBase::Finished)
126 && (oldState & QFutureInterfaceBase::Canceled)) {
129 switch_from_to(state, suspendingOrSuspended | QFutureInterfaceBase::Running,
130 QFutureInterfaceBase::Canceled | QFutureInterfaceBase::Finished);
132 case QFutureInterfaceBase::CancelMode::CancelOnly:
133 if (oldState & QFutureInterfaceBase::Canceled)
135 switch_from_to(state, suspendingOrSuspended, QFutureInterfaceBase::Canceled);
141 QMutexLocker continuationLocker(&continuationMutex);
144 QMutexLocker nextLocker(&next->continuationMutex);
145 if (next->continuationType == QFutureInterfaceBase::ContinuationType::Then) {
154 waitCondition.wakeAll();
155 pausedWaitCondition.wakeAll();
157 if (!(oldState & QFutureInterfaceBase::Canceled))
158 sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
159 if (mode == QFutureInterfaceBase::CancelMode::CancelAndFinish
160 && !(oldState & QFutureInterfaceBase::Finished)) {
161 sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
192void QFutureInterfaceBase::cancelChain(QFutureInterfaceBase::CancelMode mode)
196 QMutexLocker locker(&d->continuationMutex);
197 QFutureInterfaceBasePrivate *prev = d->nonConcludedParent;
200 prev->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::None);
201 QMutexLocker prevLocker(&prev->continuationMutex);
202 prev = prev->nonConcludedParent;
206 d->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::CancelContinuations);
209void QFutureInterfaceBase::setSuspended(
bool suspend)
211 QMutexLocker locker(&d->m_mutex);
213 switch_on(d->state, Suspending);
214 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
216 switch_off(d->state, suspendingOrSuspended);
217 d->pausedWaitCondition.wakeAll();
218 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
222void QFutureInterfaceBase::toggleSuspended()
224 QMutexLocker locker(&d->m_mutex);
225 if (d->state.loadRelaxed() & suspendingOrSuspended) {
226 switch_off(d->state, suspendingOrSuspended);
227 d->pausedWaitCondition.wakeAll();
228 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
230 switch_on(d->state, Suspending);
231 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
235void QFutureInterfaceBase::reportSuspended()
const
240 QMutexLocker locker(&d->m_mutex);
241 const int state = d->state.loadRelaxed();
242 if (!(state & Suspending) || (state & Suspended))
245 switch_from_to(d->state, Suspending, Suspended);
246 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
249void QFutureInterfaceBase::setThrottled(
bool enable)
251 QMutexLocker lock(&d->m_mutex);
253 switch_on(d->state, Throttled);
255 switch_off(d->state, Throttled);
256 if (!(d->state.loadRelaxed() & suspendingOrSuspended))
257 d->pausedWaitCondition.wakeAll();
327void QFutureInterfaceBase::waitForResume()
331 const int state = d->state.loadRelaxed();
332 if (!(state & suspendingOrSuspended) || (state & Canceled))
336 QMutexLocker lock(&d->m_mutex);
337 const int state = d->state.loadRelaxed();
338 if (!(state & suspendingOrSuspended) || (state & Canceled))
342 const ThreadPoolThreadReleaser releaser(d->pool());
344 d->pausedWaitCondition.wait(&d->m_mutex);
347void QFutureInterfaceBase::suspendIfRequested()
349 const auto canSuspend = [] (
int state) {
351 return (state & suspendingOrSuspended) && !(state & Canceled);
356 const int state = d->state.loadRelaxed();
357 if (!canSuspend(state))
361 QMutexLocker lock(&d->m_mutex);
362 const int state = d->state.loadRelaxed();
363 if (!canSuspend(state))
367 if (!(state & Suspended)) {
369 switch_from_to(d->state, Suspending, Suspended);
370 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
374 const ThreadPoolThreadReleaser releaser(d->pool());
375 d->pausedWaitCondition.wait(&d->m_mutex);
414void QFutureInterfaceBase::reportStarted()
416 QMutexLocker locker(&d->m_mutex);
417 if (d->state.loadRelaxed() & (Started|Canceled|Finished))
419 d->setState(State(Started | Running));
420 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Started));
442void QFutureInterfaceBase::reportException(
const std::exception_ptr &exception)
445 QMutexLocker locker(&d->m_mutex);
446 if (d->state.loadRelaxed() & (Canceled|Finished))
449 d->hasException =
true;
450 d->data.setException(exception);
451 switch_on(d->state, Canceled);
452 d->waitCondition.wakeAll();
453 d->pausedWaitCondition.wakeAll();
454 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
458void QFutureInterfaceBase::reportFinished()
460 QMutexLocker locker(&d->m_mutex);
462 switch_from_to(d->state, Running, Finished);
463 d->waitCondition.wakeAll();
464 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
493void QFutureInterfaceBase::waitForResult(
int resultIndex)
496 d->data.m_exceptionStore.rethrowException();
498 QMutexLocker lock(&d->m_mutex);
499 if (!isRunningOrPending())
505 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
509 const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
510 while (isRunningOrPending() && !d->internal_isResultReadyAt(waitIndex))
511 d->waitCondition.wait(&d->m_mutex);
514 d->data.m_exceptionStore.rethrowException();
517void QFutureInterfaceBase::waitForFinished()
519 QMutexLocker lock(&d->m_mutex);
520 const bool alreadyFinished = isFinished();
523 if (!alreadyFinished) {
524 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
528 while (!isFinished())
529 d->waitCondition.wait(&d->m_mutex);
533 d->data.m_exceptionStore.rethrowException();
536void QFutureInterfaceBase::reportResultsReady(
int beginIndex,
int endIndex)
538 if (beginIndex == endIndex || (d->state.loadRelaxed() & (Canceled|Finished)))
541 d->waitCondition.wakeAll();
543 if (!d->m_progress) {
544 if (d->internal_updateProgressValue(d->m_progressValue + endIndex - beginIndex) ==
false) {
545 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
551 d->sendCallOuts(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
554 QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
559 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex));
600void QFutureInterfaceBase::setProgressRange(
int minimum,
int maximum)
602 QMutexLocker locker(&d->m_mutex);
604 d->m_progress.reset(
new QFutureInterfaceBasePrivate::ProgressData());
605 d->m_progress->minimum = minimum;
606 d->m_progress->maximum = qMax(minimum, maximum);
607 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum));
608 d->m_progressValue = minimum;
622void QFutureInterfaceBase::setProgressValueAndText(
int progressValue,
623 const QString &progressText)
625 QMutexLocker locker(&d->m_mutex);
627 d->m_progress.reset(
new QFutureInterfaceBasePrivate::ProgressData());
629 const bool useProgressRange = (d->m_progress->maximum != 0) || (d->m_progress->minimum != 0);
631 && ((progressValue < d->m_progress->minimum) || (progressValue > d->m_progress->maximum))) {
635 if (d->m_progressValue >= progressValue)
638 if (d->state.loadRelaxed() & (Canceled|Finished))
641 if (d->internal_updateProgress(progressValue, progressText)) {
642 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
644 d->m_progress->text));
743 if (data.m_results.hasNextResult())
746 while ((state.loadRelaxed() & QFutureInterfaceBase::Running)
747 && data.m_results.hasNextResult() ==
false)
748 waitCondition.wait(&m_mutex);
750 return !(state.loadRelaxed() & QFutureInterfaceBase::Canceled)
751 && data.m_results.hasNextResult();
792 if ((enable && (state.loadRelaxed() & QFutureInterfaceBase::Throttled))
793 || (!enable && !(state.loadRelaxed() & QFutureInterfaceBase::Throttled)))
798 switch_on(state, QFutureInterfaceBase::Throttled);
800 switch_off(state, QFutureInterfaceBase::Throttled);
801 if (!(state.loadRelaxed() & suspendingOrSuspended))
802 pausedWaitCondition.wakeAll();
816 const QFutureCallOutEvent &callOutEvent2)
818 if (outputConnections.isEmpty())
821 for (
int i = 0; i < outputConnections.size(); ++i) {
822 QFutureCallOutInterface *iface = outputConnections.at(i);
823 iface->postCallOutEvent(callOutEvent1);
824 iface->postCallOutEvent(callOutEvent2);
833 QMutexLocker locker(&m_mutex);
835 const auto currentState = state.loadRelaxed();
836 if (currentState & QFutureInterfaceBase::Started) {
837 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
839 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
841 m_progress->maximum));
842 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
846 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
849 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
856 QtPrivate::ResultIteratorBase it = data.m_results.begin();
857 while (it != data.m_results.end()) {
858 const int begin = it.resultIndex();
859 const int end = begin + it.batchSize();
860 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
867 if (currentState & QFutureInterfaceBase::Suspended)
868 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
869 else if (currentState & QFutureInterfaceBase::Suspending)
870 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
872 if (currentState & QFutureInterfaceBase::Canceled)
873 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
875 if (currentState & QFutureInterfaceBase::Finished)
876 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
878 outputConnections.append(iface);
883 QMutexLocker lock(&m_mutex);
884 const qsizetype index = outputConnections.indexOf(iface);
887 outputConnections.removeAt(index);
889 iface->callOutInterfaceDisconnected();
897void QFutureInterfaceBase::setContinuation(std::function<
void (
const QFutureInterfaceBase &)> func,
898 void *continuationFutureData, ContinuationType type)
900 auto *futureData =
static_cast<QFutureInterfaceBasePrivate *>(continuationFutureData);
902 QMutexLocker lock(&d->continuationMutex);
907 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) {
908 if (d->continuation) {
909 qWarning(
"Adding a continuation to a future which already has a continuation. "
910 "The existing continuation is overwritten.");
911 if (d->continuationData)
912 d->continuationData->nonConcludedParent =
nullptr;
917 Q_ASSERT_X(!futureData->nonConcludedParent,
"setContinuation",
918 "futureData already has a parent");
919 futureData->continuationType = type;
920 futureData->nonConcludedParent = d;
922 d->continuationData = futureData;
923 Q_ASSERT_X(!futureData || futureData->continuationType != ContinuationType::Unknown,
924 "setContinuation",
"Make sure to provide a correct continuation type!");
932 d->continuationExecuted =
true;
939 if (d->continuationState == QFutureInterfaceBasePrivate::Cleaned) {
943 futureData->nonConcludedParent =
nullptr;
945 d->continuation = std::move(func);
958void QFutureInterfaceBase::setContinuation(
const QObject *context, std::function<
void()> func,
959 const QVariant &continuationFuture,
960 ContinuationType type)
964 using FuncType =
void();
965 using Prototype =
typename QtPrivate::Callable<FuncType>::Function;
966 auto slotObj = QtPrivate::makeCallableObject<Prototype>(std::move(func));
968 auto slot = QtPrivate::SlotObjUniquePtr(slotObj);
970 auto *watcher =
new QObjectContinuationWrapper;
971 watcher->moveToThread(context->thread());
978 auto watcherMutex = std::make_shared<QRecursiveMutex>();
979 const auto destroyWatcher = [watcherMutex, watcher]()
mutable {
980 QMutexLocker lock(watcherMutex.get());
985 QObject::connect(watcher, &QObjectContinuationWrapper::run,
989 context, [slot = std::move(slot)] {
990 void *args[] = {
nullptr };
991 slot->call(
nullptr, args);
993 QObject::connect(watcher, &QObjectContinuationWrapper::run, watcher, destroyWatcher);
1000 QObject::connect(context, &QObject::destroyed, watcher, destroyWatcher);
1004 QFutureInterfaceBasePrivate *continuationFutureData =
nullptr;
1005 if (continuationFuture.isValid()) {
1006 Q_ASSERT(QLatin1StringView(continuationFuture.typeName())
1007 .startsWith(QLatin1StringView(
"QFutureInterface")));
1008 const auto continuationPtr =
1009 static_cast<
const QFutureInterfaceBase *>(continuationFuture.constData());
1010 continuationFutureData = continuationPtr->d;
1015 setContinuation([watcherMutex = std::move(watcherMutex),
1016 watcher = QPointer(watcher), continuationFuture]
1017 (
const QFutureInterfaceBase &parentData)
1019 Q_UNUSED(parentData);
1020 Q_UNUSED(continuationFuture);
1021 QMutexLocker lock(watcherMutex.get());
1023 emit watcher->run();
1024 }, continuationFutureData, type);
1038void QFutureInterfaceBase::runContinuation()
const
1040 QMutexLocker lock(&d->continuationMutex);
1041 if (d->continuation && !d->continuationExecuted) {
1044 if (d->continuationData)
1045 d->continuationData->nonConcludedParent =
nullptr;
1049 d->continuationExecuted =
true;
1050 auto fn = std::move(d->continuation);
1058 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned)
1059 d->continuation = std::move(fn);