Qt
Internal/Contributor docs for the Qt SDK. Note: These are NOT official API docs; those are found at https://doc.qt.io/
Loading...
Searching...
No Matches
qfutureinterface.cpp
Go to the documentation of this file.
1// Copyright (C) 2020 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3// Qt-Security score:significant reason:default
4
5// qfutureinterface.h included from qfuture.h
6#include "qfuture.h"
8
9#include <QtCore/qatomic.h>
10#include <QtCore/qcoreapplication.h>
11#include <QtCore/qloggingcategory.h>
12#include <QtCore/qthread.h>
13#include <QtCore/qvarlengtharray.h>
14#include <private/qthreadpool_p.h>
15#include <private/qobject_p.h>
16
17#include <climits> // For INT_MAX
18
19// GCC 12 gets confused about QFutureInterfaceBase::state, for some non-obvious
20// reason
21// warning: ‘unsigned int __atomic_or_fetch_4(volatile void*, unsigned int, int)’ writing 4 bytes into a region of size 0 overflows the destination [-Wstringop-overflow=]
22QT_WARNING_DISABLE_GCC("-Wstringop-overflow")
23
24QT_BEGIN_NAMESPACE
25
26Q_STATIC_LOGGING_CATEGORY(lcQFutureContinuations, "qt.core.qfuture.continuations")
27
28enum {
29 MaxProgressEmitsPerSecond = 25
30};
31
32namespace {
33class ThreadPoolThreadReleaser {
34 QThreadPool *m_pool;
35public:
36 Q_NODISCARD_CTOR
37 explicit ThreadPoolThreadReleaser(QThreadPool *pool)
38 : m_pool(pool)
39 { if (pool) pool->releaseThread(); }
40 ~ThreadPoolThreadReleaser()
41 { if (m_pool) m_pool->reserveThread(); }
42};
43
44const auto suspendingOrSuspended =
45 QFutureInterfaceBase::Suspending | QFutureInterfaceBase::Suspended;
46
47} // unnamed namespace
48
49namespace QtPrivate {
50
51void qfutureWarnIfUnusedResults(qsizetype numResults)
52{
53 if (numResults > 1) {
54 qCWarning(lcQFutureContinuations,
55 "Parent future has %" PRIdQSIZETYPE " result(s), but only the first result "
56 "will be handled in the continuation.",
57 numResults);
58 }
59}
60
61} // namespace QtPrivate
62
64{
66public:
69 {
70 }
71
73 void run();
74};
75
76QFutureCallOutInterface::~QFutureCallOutInterface()
77 = default;
78
80
81QFutureInterfaceBase::QFutureInterfaceBase(State initialState)
82 : d(new QFutureInterfaceBasePrivate(initialState))
83{ }
84
85QFutureInterfaceBase::QFutureInterfaceBase(const QFutureInterfaceBase &other)
86 : d(other.d)
87{
88 d->refCount.ref();
89}
90
91QFutureInterfaceBase::~QFutureInterfaceBase()
92{
93 if (d && !d->refCount.deref())
94 delete d;
95}
96
97static inline int switch_on(QAtomicInt &a, int which)
98{
99 return a.fetchAndOrRelaxed(which) | which;
100}
101
102static inline int switch_off(QAtomicInt &a, int which)
103{
104 return a.fetchAndAndRelaxed(~which) & ~which;
105}
106
107static inline int switch_from_to(QAtomicInt &a, int from, int to)
108{
109 const auto adjusted = [&](int old) { return (old & ~from) | to; };
110 int value = a.loadRelaxed();
111 while (!a.testAndSetRelaxed(value, adjusted(value), value))
112 qYieldCpu();
113 return value;
114}
115
116void QFutureInterfaceBasePrivate::cancelImpl(QFutureInterfaceBase::CancelMode mode,
117 CancelOptions options)
118{
119 QMutexLocker locker(&m_mutex);
120
121 const auto oldState = state.loadRelaxed();
122
123 switch (mode) {
124 case QFutureInterfaceBase::CancelMode::CancelAndFinish:
125 if ((oldState & QFutureInterfaceBase::Finished)
126 && (oldState & QFutureInterfaceBase::Canceled)) {
127 return;
128 }
129 switch_from_to(state, suspendingOrSuspended | QFutureInterfaceBase::Running,
130 QFutureInterfaceBase::Canceled | QFutureInterfaceBase::Finished);
131 break;
132 case QFutureInterfaceBase::CancelMode::CancelOnly:
133 if (oldState & QFutureInterfaceBase::Canceled)
134 return;
135 switch_from_to(state, suspendingOrSuspended, QFutureInterfaceBase::Canceled);
136 break;
137 }
138
139 if (options & CancelOption::CancelContinuations) {
140 // Cancel the continuations chain
141 QMutexLocker continuationLocker(&continuationMutex);
143 while (next) {
144 QMutexLocker nextLocker(&next->continuationMutex);
145 if (next->continuationType == QFutureInterfaceBase::ContinuationType::Then) {
146 next->continuationState = QFutureInterfaceBasePrivate::Canceled;
147 next = next->continuationData;
148 } else {
149 break;
150 }
151 }
152 }
153
154 waitCondition.wakeAll();
155 pausedWaitCondition.wakeAll();
156
157 if (!(oldState & QFutureInterfaceBase::Canceled))
158 sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
159 if (mode == QFutureInterfaceBase::CancelMode::CancelAndFinish
160 && !(oldState & QFutureInterfaceBase::Finished)) {
161 sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
162 }
163
164 isValid = false;
165}
166
167void QFutureInterfaceBase::cancel()
168{
169 cancel(CancelMode::CancelOnly);
170}
171
172void QFutureInterfaceBase::cancelChain()
173{
174 cancelChain(CancelMode::CancelOnly);
175}
176
177void QFutureInterfaceBase::setAddResultsIfCanceledEnabled(bool enable)
178{
179 d->addResultsIfCanceled = enable;
180}
181
182bool QFutureInterfaceBase::isAddResultsIfCanceledEnabled() const
183{
184 return d->addResultsIfCanceled;
185}
186
187void QFutureInterfaceBase::cancel(QFutureInterfaceBase::CancelMode mode)
188{
189 d->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::CancelContinuations);
190}
191
192void QFutureInterfaceBase::cancelChain(QFutureInterfaceBase::CancelMode mode)
193{
194 // go up through the list of continuations, cancelling each of them
195 {
196 QMutexLocker locker(&d->continuationMutex);
197 QFutureInterfaceBasePrivate *prev = d->nonConcludedParent;
198 while (prev) {
199 // Do not cancel continuations, because we're going bottom-to-top
200 prev->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::None);
201 QMutexLocker prevLocker(&prev->continuationMutex);
202 prev = prev->nonConcludedParent;
203 }
204 }
205 // finally, cancel self and all next continuations
206 d->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::CancelContinuations);
207}
208
209void QFutureInterfaceBase::setSuspended(bool suspend)
210{
211 QMutexLocker locker(&d->m_mutex);
212 if (suspend) {
213 switch_on(d->state, Suspending);
214 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
215 } else {
216 switch_off(d->state, suspendingOrSuspended);
217 d->pausedWaitCondition.wakeAll();
218 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
219 }
220}
221
222void QFutureInterfaceBase::toggleSuspended()
223{
224 QMutexLocker locker(&d->m_mutex);
225 if (d->state.loadRelaxed() & suspendingOrSuspended) {
226 switch_off(d->state, suspendingOrSuspended);
227 d->pausedWaitCondition.wakeAll();
228 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
229 } else {
230 switch_on(d->state, Suspending);
231 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
232 }
233}
234
235void QFutureInterfaceBase::reportSuspended() const
236{
237 // Needs to be called when pause is in effect,
238 // i.e. no more events will be reported.
239
240 QMutexLocker locker(&d->m_mutex);
241 const int state = d->state.loadRelaxed();
242 if (!(state & Suspending) || (state & Suspended))
243 return;
244
245 switch_from_to(d->state, Suspending, Suspended);
246 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
247}
248
249void QFutureInterfaceBase::setThrottled(bool enable)
250{
251 QMutexLocker lock(&d->m_mutex);
252 if (enable) {
253 switch_on(d->state, Throttled);
254 } else {
255 switch_off(d->state, Throttled);
256 if (!(d->state.loadRelaxed() & suspendingOrSuspended))
257 d->pausedWaitCondition.wakeAll();
258 }
259}
260
261
262bool QFutureInterfaceBase::isRunning() const
263{
264 return queryState(Running);
265}
266
267bool QFutureInterfaceBase::isStarted() const
268{
269 return queryState(Started);
270}
271
272bool QFutureInterfaceBase::isCanceled() const
273{
274 return queryState(Canceled);
275}
276
277bool QFutureInterfaceBase::isFinished() const
278{
279 return queryState(Finished);
280}
281
282bool QFutureInterfaceBase::isSuspending() const
283{
284 return queryState(Suspending);
285}
286
287#if QT_DEPRECATED_SINCE(6, 0)
288bool QFutureInterfaceBase::isPaused() const
289{
290 return queryState(static_cast<State>(suspendingOrSuspended));
291}
292#endif
293
294bool QFutureInterfaceBase::isSuspended() const
295{
296 return queryState(Suspended);
297}
298
299bool QFutureInterfaceBase::isThrottled() const
300{
301 return queryState(Throttled);
302}
303
304bool QFutureInterfaceBase::isResultReadyAt(int index) const
305{
306 QMutexLocker lock(&d->m_mutex);
307 return d->internal_isResultReadyAt(index);
308}
309
310bool QFutureInterfaceBase::isValid() const
311{
312 const QMutexLocker lock(&d->m_mutex);
313 return d->isValid;
314}
315
316bool QFutureInterfaceBase::isRunningOrPending() const
317{
318 return queryState(static_cast<State>(Running | Pending));
319}
320
321bool QFutureInterfaceBase::waitForNextResult()
322{
323 QMutexLocker lock(&d->m_mutex);
324 return d->internal_waitForNextResult();
325}
326
327void QFutureInterfaceBase::waitForResume()
328{
329 // return early if possible to avoid taking the mutex lock.
330 {
331 const int state = d->state.loadRelaxed();
332 if (!(state & suspendingOrSuspended) || (state & Canceled))
333 return;
334 }
335
336 QMutexLocker lock(&d->m_mutex);
337 const int state = d->state.loadRelaxed();
338 if (!(state & suspendingOrSuspended) || (state & Canceled))
339 return;
340
341 // decrease active thread count since this thread will wait.
342 const ThreadPoolThreadReleaser releaser(d->pool());
343
344 d->pausedWaitCondition.wait(&d->m_mutex);
345}
346
347void QFutureInterfaceBase::suspendIfRequested()
348{
349 const auto canSuspend = [] (int state) {
350 // can suspend only if 1) in any suspend-related state; 2) not canceled
351 return (state & suspendingOrSuspended) && !(state & Canceled);
352 };
353
354 // return early if possible to avoid taking the mutex lock.
355 {
356 const int state = d->state.loadRelaxed();
357 if (!canSuspend(state))
358 return;
359 }
360
361 QMutexLocker lock(&d->m_mutex);
362 const int state = d->state.loadRelaxed();
363 if (!canSuspend(state))
364 return;
365
366 // Note: expecting that Suspending and Suspended are mutually exclusive
367 if (!(state & Suspended)) {
368 // switch state in case this is the first invocation
369 switch_from_to(d->state, Suspending, Suspended);
370 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
371 }
372
373 // decrease active thread count since this thread will wait.
374 const ThreadPoolThreadReleaser releaser(d->pool());
375 d->pausedWaitCondition.wait(&d->m_mutex);
376}
377
378int QFutureInterfaceBase::progressValue() const
379{
380 const QMutexLocker lock(&d->m_mutex);
381 return d->m_progressValue;
382}
383
384int QFutureInterfaceBase::progressMinimum() const
385{
386 const QMutexLocker lock(&d->m_mutex);
387 return d->m_progress ? d->m_progress->minimum : 0;
388}
389
390int QFutureInterfaceBase::progressMaximum() const
391{
392 const QMutexLocker lock(&d->m_mutex);
393 return d->m_progress ? d->m_progress->maximum : 0;
394}
395
396int QFutureInterfaceBase::resultCount() const
397{
398 QMutexLocker lock(&d->m_mutex);
399 return d->internal_resultCount();
400}
401
402QString QFutureInterfaceBase::progressText() const
403{
404 QMutexLocker locker(&d->m_mutex);
405 return d->m_progress ? d->m_progress->text : QString();
406}
407
408bool QFutureInterfaceBase::isProgressUpdateNeeded() const
409{
410 QMutexLocker locker(&d->m_mutex);
411 return !d->progressTime.isValid() || (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond));
412}
413
414void QFutureInterfaceBase::reportStarted()
415{
416 QMutexLocker locker(&d->m_mutex);
417 if (d->state.loadRelaxed() & (Started|Canceled|Finished))
418 return;
419 d->setState(State(Started | Running));
420 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Started));
421 d->isValid = true;
422}
423
424void QFutureInterfaceBase::reportCanceled()
425{
426 cancel();
427}
428
429#ifndef QT_NO_EXCEPTIONS
430void QFutureInterfaceBase::reportException(const QException &exception)
431{
432 try {
433 exception.raise();
434 } catch (...) {
435 reportException(std::current_exception());
436 }
437}
438
439#if QT_VERSION < QT_VERSION_CHECK(7, 0, 0)
440void QFutureInterfaceBase::reportException(std::exception_ptr exception)
441#else
442void QFutureInterfaceBase::reportException(const std::exception_ptr &exception)
443#endif
444{
445 QMutexLocker locker(&d->m_mutex);
446 if (d->state.loadRelaxed() & (Canceled|Finished))
447 return;
448
449 d->hasException = true;
450 d->data.setException(exception);
451 switch_on(d->state, Canceled);
452 d->waitCondition.wakeAll();
453 d->pausedWaitCondition.wakeAll();
454 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
455}
456#endif
457
458void QFutureInterfaceBase::reportFinished()
459{
460 QMutexLocker locker(&d->m_mutex);
461 if (!isFinished()) {
462 switch_from_to(d->state, Running, Finished);
463 d->waitCondition.wakeAll();
464 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
465 }
466}
467
468void QFutureInterfaceBase::setExpectedResultCount(int resultCount)
469{
470 if (d->m_progress)
471 setProgressRange(0, resultCount);
472 d->m_expectedResultCount = resultCount;
473}
474
475int QFutureInterfaceBase::expectedResultCount()
476{
477 return d->m_expectedResultCount;
478}
479
480bool QFutureInterfaceBase::queryState(State state) const
481{
482 return d->state.loadRelaxed() & state;
483}
484
485int QFutureInterfaceBase::loadState() const
486{
487 // Used from ~QPromise, so this check is needed
488 if (!d)
489 return QFutureInterfaceBase::State::NoState;
490 return d->state.loadRelaxed();
491}
492
493void QFutureInterfaceBase::waitForResult(int resultIndex)
494{
495 if (d->hasException)
496 d->data.m_exceptionStore.rethrowException();
497
498 QMutexLocker lock(&d->m_mutex);
499 if (!isRunningOrPending())
500 return;
501 lock.unlock();
502
503 // To avoid deadlocks and reduce the number of threads used, try to
504 // run the runnable in the current thread.
505 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
506
507 lock.relock();
508
509 const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
510 while (isRunningOrPending() && !d->internal_isResultReadyAt(waitIndex))
511 d->waitCondition.wait(&d->m_mutex);
512
513 if (d->hasException)
514 d->data.m_exceptionStore.rethrowException();
515}
516
517void QFutureInterfaceBase::waitForFinished()
518{
519 QMutexLocker lock(&d->m_mutex);
520 const bool alreadyFinished = isFinished();
521 lock.unlock();
522
523 if (!alreadyFinished) {
524 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
525
526 lock.relock();
527
528 while (!isFinished())
529 d->waitCondition.wait(&d->m_mutex);
530 }
531
532 if (d->hasException)
533 d->data.m_exceptionStore.rethrowException();
534}
535
536void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex)
537{
538 if (beginIndex == endIndex || (d->state.loadRelaxed() & (Canceled|Finished)))
539 return;
540
541 d->waitCondition.wakeAll();
542
543 if (!d->m_progress) {
544 if (d->internal_updateProgressValue(d->m_progressValue + endIndex - beginIndex) == false) {
545 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
546 beginIndex,
547 endIndex));
548 return;
549 }
550
551 d->sendCallOuts(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
552 d->m_progressValue,
553 QString()),
554 QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
555 beginIndex,
556 endIndex));
557 return;
558 }
559 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex));
560}
561
562void QFutureInterfaceBase::setRunnable(QRunnable *runnable)
563{
564 d->runnable = runnable;
565}
566
567void QFutureInterfaceBase::setThreadPool(QThreadPool *pool)
568{
569 d->m_pool = pool;
570}
571
572QThreadPool *QFutureInterfaceBase::threadPool() const
573{
574 return d->m_pool;
575}
576
577void QFutureInterfaceBase::setFilterMode(bool enable)
578{
579 QMutexLocker locker(&d->m_mutex);
580 if (!hasException())
581 resultStoreBase().setFilterMode(enable);
582}
583
584/*!
585 \internal
586 Sets the progress range's minimum and maximum values to \a minimum and
587 \a maximum respectively.
588
589 If \a maximum is smaller than \a minimum, \a minimum becomes the only
590 legal value.
591
592 The progress value is reset to be \a minimum.
593
594 The progress range usage can be disabled by using setProgressRange(0, 0).
595 In this case progress value is also reset to 0.
596
597 The behavior of this method is mostly inspired by
598 \l QProgressBar::setRange.
599*/
600void QFutureInterfaceBase::setProgressRange(int minimum, int maximum)
601{
602 QMutexLocker locker(&d->m_mutex);
603 if (!d->m_progress)
604 d->m_progress.reset(new QFutureInterfaceBasePrivate::ProgressData());
605 d->m_progress->minimum = minimum;
606 d->m_progress->maximum = qMax(minimum, maximum);
607 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum));
608 d->m_progressValue = minimum;
609}
610
611void QFutureInterfaceBase::setProgressValue(int progressValue)
612{
613 setProgressValueAndText(progressValue, QString());
614}
615
616/*!
617 \internal
618 In case of the \a progressValue falling out of the progress range,
619 this method has no effect.
620 Such behavior is inspired by \l QProgressBar::setValue.
621*/
622void QFutureInterfaceBase::setProgressValueAndText(int progressValue,
623 const QString &progressText)
624{
625 QMutexLocker locker(&d->m_mutex);
626 if (!d->m_progress)
627 d->m_progress.reset(new QFutureInterfaceBasePrivate::ProgressData());
628
629 const bool useProgressRange = (d->m_progress->maximum != 0) || (d->m_progress->minimum != 0);
630 if (useProgressRange
631 && ((progressValue < d->m_progress->minimum) || (progressValue > d->m_progress->maximum))) {
632 return;
633 }
634
635 if (d->m_progressValue >= progressValue)
636 return;
637
638 if (d->state.loadRelaxed() & (Canceled|Finished))
639 return;
640
641 if (d->internal_updateProgress(progressValue, progressText)) {
642 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
643 d->m_progressValue,
644 d->m_progress->text));
645 }
646}
647
648QMutex &QFutureInterfaceBase::mutex() const
649{
650 return d->m_mutex;
651}
652
653bool QFutureInterfaceBase::hasException() const
654{
655 return d->hasException;
656}
657
658QtPrivate::ExceptionStore &QFutureInterfaceBase::exceptionStore()
659{
660 Q_ASSERT(d->hasException);
661 return d->data.m_exceptionStore;
662}
663
664QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase()
665{
666 Q_ASSERT(!d->hasException);
667 return d->data.m_results;
668}
669
670const QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() const
671{
672 Q_ASSERT(!d->hasException);
673 return d->data.m_results;
674}
675
676QFutureInterfaceBase &QFutureInterfaceBase::operator=(const QFutureInterfaceBase &other)
677{
678 QFutureInterfaceBase copy(other);
679 swap(copy);
680 return *this;
681}
682
683// ### Qt 7: inline
684void QFutureInterfaceBase::swap(QFutureInterfaceBase &other) noexcept
685{
686 qSwap(d, other.d);
687}
688
689bool QFutureInterfaceBase::refT() const noexcept
690{
691 return d->refCount.refT();
692}
693
694bool QFutureInterfaceBase::derefT() const noexcept
695{
696 // Called from ~QFutureInterface
697 return !d || d->refCount.derefT();
698}
699
700void QFutureInterfaceBase::reset()
701{
702 d->m_progressValue = 0;
703 d->m_progress.reset();
704 d->progressTime.invalidate();
705 d->isValid = false;
706}
707
708void QFutureInterfaceBase::rethrowPossibleException()
709{
710 if (hasException())
711 exceptionStore().rethrowException();
712}
713
714QFutureInterfaceBasePrivate::QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
716{
717 progressTime.invalidate();
718}
719
721{
722 if (hasException)
723 data.m_exceptionStore.~ExceptionStore();
724 else
725 data.m_results.~ResultStoreBase();
726}
727
729{
730 return hasException ? 0 : data.m_results.count(); // ### subtract canceled results.
731}
732
734{
735 return hasException ? false : (data.m_results.contains(index));
736}
737
739{
740 if (hasException)
741 return false;
742
743 if (data.m_results.hasNextResult())
744 return true;
745
746 while ((state.loadRelaxed() & QFutureInterfaceBase::Running)
747 && data.m_results.hasNextResult() == false)
748 waitCondition.wait(&m_mutex);
749
750 return !(state.loadRelaxed() & QFutureInterfaceBase::Canceled)
751 && data.m_results.hasNextResult();
752}
753
755{
756 if (m_progressValue >= progress)
757 return false;
758
759 m_progressValue = progress;
760
761 if (progressTime.isValid() && m_progressValue != 0) // make sure the first and last steps are emitted.
762 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
763 return false;
764
765 progressTime.start();
766 return true;
767
768}
769
771 const QString &progressText)
772{
773 if (m_progressValue >= progress)
774 return false;
775
776 Q_ASSERT(m_progress);
777
778 m_progressValue = progress;
779 m_progress->text = progressText;
780
781 if (progressTime.isValid() && m_progressValue != m_progress->maximum) // make sure the first and last steps are emitted.
782 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
783 return false;
784
785 progressTime.start();
786 return true;
787}
788
790{
791 // bail out if we are not changing the state
792 if ((enable && (state.loadRelaxed() & QFutureInterfaceBase::Throttled))
793 || (!enable && !(state.loadRelaxed() & QFutureInterfaceBase::Throttled)))
794 return;
795
796 // change the state
797 if (enable) {
798 switch_on(state, QFutureInterfaceBase::Throttled);
799 } else {
800 switch_off(state, QFutureInterfaceBase::Throttled);
801 if (!(state.loadRelaxed() & suspendingOrSuspended))
802 pausedWaitCondition.wakeAll();
803 }
804}
805
806void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOutEvent)
807{
808 if (outputConnections.isEmpty())
809 return;
810
811 for (int i = 0; i < outputConnections.size(); ++i)
812 outputConnections.at(i)->postCallOutEvent(callOutEvent);
813}
814
815void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOutEvent1,
816 const QFutureCallOutEvent &callOutEvent2)
817{
818 if (outputConnections.isEmpty())
819 return;
820
821 for (int i = 0; i < outputConnections.size(); ++i) {
822 QFutureCallOutInterface *iface = outputConnections.at(i);
823 iface->postCallOutEvent(callOutEvent1);
824 iface->postCallOutEvent(callOutEvent2);
825 }
826}
827
828// This function connects an output interface (for example a QFutureWatcher)
829// to this future. While holding the lock we check the state and ready results
830// and add the appropriate callouts to the queue.
831void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *iface)
832{
833 QMutexLocker locker(&m_mutex);
834
835 const auto currentState = state.loadRelaxed();
836 if (currentState & QFutureInterfaceBase::Started) {
837 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
838 if (m_progress) {
839 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
840 m_progress->minimum,
841 m_progress->maximum));
842 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
843 m_progressValue,
844 m_progress->text));
845 } else {
846 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
847 0,
848 0));
849 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
850 m_progressValue,
851 QString()));
852 }
853 }
854
855 if (!hasException) {
856 QtPrivate::ResultIteratorBase it = data.m_results.begin();
857 while (it != data.m_results.end()) {
858 const int begin = it.resultIndex();
859 const int end = begin + it.batchSize();
860 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
861 begin,
862 end));
863 it.batchedAdvance();
864 }
865 }
866
867 if (currentState & QFutureInterfaceBase::Suspended)
868 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
869 else if (currentState & QFutureInterfaceBase::Suspending)
870 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
871
872 if (currentState & QFutureInterfaceBase::Canceled)
873 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
874
875 if (currentState & QFutureInterfaceBase::Finished)
876 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
877
878 outputConnections.append(iface);
879}
880
881void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *iface)
882{
883 QMutexLocker lock(&m_mutex);
884 const qsizetype index = outputConnections.indexOf(iface);
885 if (index == -1)
886 return;
887 outputConnections.removeAt(index);
888
889 iface->callOutInterfaceDisconnected();
890}
891
892void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState)
893{
894 state.storeRelaxed(newState);
895}
896
897void QFutureInterfaceBase::setContinuation(std::function<void (const QFutureInterfaceBase &)> func,
898 void *continuationFutureData, ContinuationType type)
899{
900 auto *futureData = static_cast<QFutureInterfaceBasePrivate *>(continuationFutureData);
901
902 QMutexLocker lock(&d->continuationMutex);
903
904 // Unless the continuation has been cleaned earlier, we have to
905 // store the move-only continuation, to guarantee that the associated
906 // future's data stays alive.
907 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) {
908 if (d->continuation) {
909 qWarning("Adding a continuation to a future which already has a continuation. "
910 "The existing continuation is overwritten.");
911 if (d->continuationData)
912 d->continuationData->nonConcludedParent = nullptr;
913 }
914 if (futureData) {
915 // If we ever hit this, we should properly unlink the future before
916 // relinking to ourselves.
917 Q_ASSERT_X(!futureData->nonConcludedParent, "setContinuation",
918 "futureData already has a parent");
919 futureData->continuationType = type;
920 futureData->nonConcludedParent = d;
921 }
922 d->continuationData = futureData;
923 Q_ASSERT_X(!futureData || futureData->continuationType != ContinuationType::Unknown,
924 "setContinuation", "Make sure to provide a correct continuation type!");
925 }
926
927 // If the state is ready, run continuation immediately,
928 // otherwise save it for later.
929 // Ensure that we run the continuation after setting up the future chain links so that
930 // the continuation itself can modify the links again.
931 if (isFinished()) {
932 d->continuationExecuted = true;
933
934 lock.unlock();
935 func(*this);
936 lock.relock();
937 }
938
939 if (d->continuationState == QFutureInterfaceBasePrivate::Cleaned) {
940 // if the continuation that we've possibly run above has cleaned this
941 // future, we have to unlink ourselves from it again.
942 if (futureData)
943 futureData->nonConcludedParent = nullptr;
944 } else {
945 d->continuation = std::move(func);
946 }
947}
948
949/*
950 For continuations with context we expect all the needed data to be captured
951 directly by the continuation data, because this simplifies the slot
952 invocation. That's why func has no parameters.
953
954 We pass continuation data as a QVariant, because we need to keep the
955 QFutureInterface<T> for the entire lifetime of the continuation, but we
956 cannot pass a template type T as a parameter.
957*/
958void QFutureInterfaceBase::setContinuation(const QObject *context, std::function<void()> func,
959 const QVariant &continuationFuture,
960 ContinuationType type)
961{
962 Q_ASSERT(context);
963
964 using FuncType = void();
965 using Prototype = typename QtPrivate::Callable<FuncType>::Function;
966 auto slotObj = QtPrivate::makeCallableObject<Prototype>(std::move(func));
967
968 auto slot = QtPrivate::SlotObjUniquePtr(slotObj);
969
970 auto *watcher = new QObjectContinuationWrapper;
971 watcher->moveToThread(context->thread());
972
973 // We need to protect acccess to the watcher. The context object (and in turn, the watcher)
974 // could be destroyed while the continuation that emits the signal is running. We have to
975 // prevent that.
976 // The mutex has to be recursive, because the continuation itself could delete the context
977 // object (and thus the watcher), which will try to lock the mutex from the same thread twice.
978 auto watcherMutex = std::make_shared<QRecursiveMutex>();
979 const auto destroyWatcher = [watcherMutex, watcher]() mutable {
980 QMutexLocker lock(watcherMutex.get());
981 delete watcher;
982 };
983
984 // ### we're missing a convenient way to `QObject::connect()` to a `QSlotObjectBase`...
985 QObject::connect(watcher, &QObjectContinuationWrapper::run,
986 // for the following, cf. QMetaObject::invokeMethodImpl():
987 // we know `slot` is a lambda returning `void`, so we can just
988 // `call()` with `obj` and `args[0]` set to `nullptr`:
989 context, [slot = std::move(slot)] {
990 void *args[] = { nullptr }; // for `void` return value
991 slot->call(nullptr, args);
992 });
993 QObject::connect(watcher, &QObjectContinuationWrapper::run, watcher, destroyWatcher);
994
995 // We need to connect to destroyWatcher here, instead of delete or deleteLater().
996 // If the continuation is called from a separate thread, emit watcher->run() can't detect that
997 // the watcher has been deleted in the separate thread, causing a race condition and potential
998 // heap-use-after-free issue inside QObject::doActivate. destroyWatcher forces the deletion of
999 // the watcher to occur after emit watcher->run() completes and prevents the race condition.
1000 QObject::connect(context, &QObject::destroyed, watcher, destroyWatcher);
1001
1002 // Extract a QFutureInterfaceBasePrivate pointer from the QVariant. We rely
1003 // on the fact that QVariant contains QFutureInterface<T>.
1004 QFutureInterfaceBasePrivate *continuationFutureData = nullptr;
1005 if (continuationFuture.isValid()) {
1006 Q_ASSERT(QLatin1StringView(continuationFuture.typeName())
1007 .startsWith(QLatin1StringView("QFutureInterface")));
1008 const auto continuationPtr =
1009 static_cast<const QFutureInterfaceBase *>(continuationFuture.constData());
1010 continuationFutureData = continuationPtr->d;
1011 }
1012
1013 // Capture continuationFuture so that it lives as long as the continuation,
1014 // and the continuation data remains valid.
1015 setContinuation([watcherMutex = std::move(watcherMutex),
1016 watcher = QPointer(watcher), continuationFuture]
1017 (const QFutureInterfaceBase &parentData)
1018 {
1019 Q_UNUSED(parentData);
1020 Q_UNUSED(continuationFuture);
1021 QMutexLocker lock(watcherMutex.get());
1022 if (watcher)
1023 emit watcher->run();
1024 }, continuationFutureData, type);
1025}
1026
1027void QFutureInterfaceBase::cleanContinuation()
1028{
1029 if (!d)
1030 return;
1031
1032 QMutexLocker lock(&d->continuationMutex);
1033 d->continuation = nullptr;
1034 d->continuationState = QFutureInterfaceBasePrivate::Cleaned;
1035 d->continuationData = nullptr;
1036}
1037
1038void QFutureInterfaceBase::runContinuation() const
1039{
1040 QMutexLocker lock(&d->continuationMutex);
1041 if (d->continuation && !d->continuationExecuted) {
1042 // If we run the next continuation, then this future is concluded, so
1043 // we wouldn't need to revisit it in the cancelChain()
1044 if (d->continuationData)
1045 d->continuationData->nonConcludedParent = nullptr;
1046 // Save the continuation in a local function, to avoid calling
1047 // a null std::function below, in case cleanContinuation() is
1048 // called from some other thread right after unlock() below.
1049 d->continuationExecuted = true;
1050 auto fn = std::move(d->continuation);
1051 lock.unlock();
1052 fn(*this);
1053
1054 lock.relock();
1055 // Unless the continuation has been cleaned earlier, we have to
1056 // store the move-only continuation, to guarantee that the associated
1057 // future's data stays alive.
1058 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned)
1059 d->continuation = std::move(fn);
1060 }
1061}
1062
1063bool QFutureInterfaceBase::isChainCanceled() const
1064{
1065 return isCanceled() || d->continuationState == QFutureInterfaceBasePrivate::Canceled;
1066}
1067
1068void QFutureInterfaceBase::setLaunchAsync(bool value)
1069{
1070 d->launchAsync = value;
1071}
1072
1073bool QFutureInterfaceBase::launchAsync() const
1074{
1075 return d->launchAsync;
1076}
1077
1078namespace QtFuture {
1079
1081{
1082 QFutureInterface<void> promise;
1083 promise.reportStarted();
1084 promise.reportFinished();
1085
1086 return promise.future();
1087}
1088
1089} // namespace QtFuture
1090
1091QT_END_NAMESPACE
1092
1093#include "qfutureinterface.moc"
bool internal_updateProgress(int progress, const QString &progressText=QString())
void sendCallOuts(const QFutureCallOutEvent &callOut1, const QFutureCallOutEvent &callOut2)
void sendCallOut(const QFutureCallOutEvent &callOut)
void setState(QFutureInterfaceBase::State state)
bool internal_updateProgressValue(int progress)
bool internal_isResultReadyAt(int index) const
void internal_setThrottled(bool enable)
QFutureInterfaceBasePrivate * continuationData
QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
QFuture< void > makeReadyVoidFuture()
void qfutureWarnIfUnusedResults(qsizetype numResults)
static int switch_from_to(QAtomicInt &a, int from, int to)
static int switch_off(QAtomicInt &a, int which)
static int switch_on(QAtomicInt &a, int which)