Qt
Internal/Contributor docs for the Qt SDK. Note: These are NOT official API docs; those are found at https://doc.qt.io/
Loading...
Searching...
No Matches
qfutureinterface.cpp
Go to the documentation of this file.
1// Copyright (C) 2020 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3// Qt-Security score:significant reason:default
4
5// qfutureinterface.h included from qfuture.h
6#include "qfuture.h"
8
9#include <QtCore/qatomic.h>
10#include <QtCore/qcoreapplication.h>
11#include <QtCore/qloggingcategory.h>
12#include <QtCore/qthread.h>
13#include <QtCore/qvarlengtharray.h>
14#include <private/qthreadpool_p.h>
15#include <private/qobject_p.h>
16
17#include <climits> // For INT_MAX
18
19// GCC 12 gets confused about QFutureInterfaceBase::state, for some non-obvious
20// reason
21// warning: ‘unsigned int __atomic_or_fetch_4(volatile void*, unsigned int, int)’ writing 4 bytes into a region of size 0 overflows the destination [-Wstringop-overflow=]
22QT_WARNING_DISABLE_GCC("-Wstringop-overflow")
23
24QT_BEGIN_NAMESPACE
25
26Q_STATIC_LOGGING_CATEGORY(lcQFutureContinuations, "qt.core.qfuture.continuations")
27
28enum {
29 MaxProgressEmitsPerSecond = 25
30};
31
32namespace {
33class ThreadPoolThreadReleaser {
34 QThreadPool *m_pool;
35public:
36 Q_NODISCARD_CTOR
37 explicit ThreadPoolThreadReleaser(QThreadPool *pool)
38 : m_pool(pool)
39 { if (pool) pool->releaseThread(); }
40 ~ThreadPoolThreadReleaser()
41 { if (m_pool) m_pool->reserveThread(); }
42};
43
44const auto suspendingOrSuspended =
45 QFutureInterfaceBase::Suspending | QFutureInterfaceBase::Suspended;
46
47} // unnamed namespace
48
49namespace QtPrivate {
50
51void qfutureWarnIfUnusedResults(qsizetype numResults)
52{
53 if (numResults > 1) {
54 qCWarning(lcQFutureContinuations,
55 "Parent future has %" PRIdQSIZETYPE " result(s), but only the first result "
56 "will be handled in the continuation.",
57 numResults);
58 }
59}
60
61} // namespace QtPrivate
62
64{
66public:
69 {
70 }
71
73 void run();
74};
75
76QFutureCallOutInterface::~QFutureCallOutInterface()
77 = default;
78
80
81QFutureInterfaceBase::QFutureInterfaceBase(State initialState)
82 : d(new QFutureInterfaceBasePrivate(initialState))
83{ }
84
85QFutureInterfaceBase::QFutureInterfaceBase(const QFutureInterfaceBase &other)
86 : d(other.d)
87{
88 d->refCount.ref();
89}
90
91QFutureInterfaceBase::~QFutureInterfaceBase()
92{
93 if (d && !d->refCount.deref())
94 delete d;
95}
96
97static inline int switch_on(QAtomicInt &a, int which)
98{
99 return a.fetchAndOrRelaxed(which) | which;
100}
101
102static inline int switch_off(QAtomicInt &a, int which)
103{
104 return a.fetchAndAndRelaxed(~which) & ~which;
105}
106
107static inline int switch_from_to(QAtomicInt &a, int from, int to)
108{
109 const auto adjusted = [&](int old) { return (old & ~from) | to; };
110 int value = a.loadRelaxed();
111 while (!a.testAndSetRelaxed(value, adjusted(value), value))
112 qYieldCpu();
113 return value;
114}
115
116void QFutureInterfaceBasePrivate::cancelImpl(QFutureInterfaceBase::CancelMode mode,
117 CancelOptions options)
118{
119 QMutexLocker locker(&m_mutex);
120
121 const auto oldState = state.loadRelaxed();
122
123 switch (mode) {
124 case QFutureInterfaceBase::CancelMode::CancelAndFinish:
125 if ((oldState & QFutureInterfaceBase::Finished)
126 && (oldState & QFutureInterfaceBase::Canceled)) {
127 return;
128 }
129 switch_from_to(state, suspendingOrSuspended | QFutureInterfaceBase::Running,
130 QFutureInterfaceBase::Canceled | QFutureInterfaceBase::Finished);
131 break;
132 case QFutureInterfaceBase::CancelMode::CancelOnly:
133 if (oldState & QFutureInterfaceBase::Canceled)
134 return;
135 switch_from_to(state, suspendingOrSuspended, QFutureInterfaceBase::Canceled);
136 break;
137 }
138
139 if (options & CancelOption::CancelContinuations) {
140 // Cancel the continuations chain
141 QMutexLocker continuationLocker(&continuationMutex);
143 while (next) {
144 QMutexLocker nextLocker(&next->continuationMutex);
145 if (next->continuationType == QFutureInterfaceBase::ContinuationType::Then) {
146 next->continuationState = QFutureInterfaceBasePrivate::Canceled;
147 next = next->continuationData;
148 } else {
149 break;
150 }
151 }
152 }
153
154 waitCondition.wakeAll();
155 pausedWaitCondition.wakeAll();
156
157 if (!(oldState & QFutureInterfaceBase::Canceled))
158 sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
159 if (mode == QFutureInterfaceBase::CancelMode::CancelAndFinish
160 && !(oldState & QFutureInterfaceBase::Finished)) {
161 sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
162 }
163
164 isValid = false;
165}
166
167void QFutureInterfaceBase::cancel()
168{
169 cancel(CancelMode::CancelOnly);
170}
171
172void QFutureInterfaceBase::cancelChain()
173{
174 cancelChain(CancelMode::CancelOnly);
175}
176
177void QFutureInterfaceBase::cancel(QFutureInterfaceBase::CancelMode mode)
178{
179 d->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::CancelContinuations);
180}
181
182void QFutureInterfaceBase::cancelChain(QFutureInterfaceBase::CancelMode mode)
183{
184 // go up through the list of continuations, cancelling each of them
185 {
186 QMutexLocker locker(&d->continuationMutex);
187 QFutureInterfaceBasePrivate *prev = d->nonConcludedParent;
188 while (prev) {
189 // Do not cancel continuations, because we're going bottom-to-top
190 prev->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::None);
191 QMutexLocker prevLocker(&prev->continuationMutex);
192 prev = prev->nonConcludedParent;
193 }
194 }
195 // finally, cancel self and all next continuations
196 d->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::CancelContinuations);
197}
198
199void QFutureInterfaceBase::setSuspended(bool suspend)
200{
201 QMutexLocker locker(&d->m_mutex);
202 if (suspend) {
203 switch_on(d->state, Suspending);
204 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
205 } else {
206 switch_off(d->state, suspendingOrSuspended);
207 d->pausedWaitCondition.wakeAll();
208 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
209 }
210}
211
212void QFutureInterfaceBase::toggleSuspended()
213{
214 QMutexLocker locker(&d->m_mutex);
215 if (d->state.loadRelaxed() & suspendingOrSuspended) {
216 switch_off(d->state, suspendingOrSuspended);
217 d->pausedWaitCondition.wakeAll();
218 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
219 } else {
220 switch_on(d->state, Suspending);
221 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
222 }
223}
224
225void QFutureInterfaceBase::reportSuspended() const
226{
227 // Needs to be called when pause is in effect,
228 // i.e. no more events will be reported.
229
230 QMutexLocker locker(&d->m_mutex);
231 const int state = d->state.loadRelaxed();
232 if (!(state & Suspending) || (state & Suspended))
233 return;
234
235 switch_from_to(d->state, Suspending, Suspended);
236 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
237}
238
239void QFutureInterfaceBase::setThrottled(bool enable)
240{
241 QMutexLocker lock(&d->m_mutex);
242 if (enable) {
243 switch_on(d->state, Throttled);
244 } else {
245 switch_off(d->state, Throttled);
246 if (!(d->state.loadRelaxed() & suspendingOrSuspended))
247 d->pausedWaitCondition.wakeAll();
248 }
249}
250
251
252bool QFutureInterfaceBase::isRunning() const
253{
254 return queryState(Running);
255}
256
257bool QFutureInterfaceBase::isStarted() const
258{
259 return queryState(Started);
260}
261
262bool QFutureInterfaceBase::isCanceled() const
263{
264 return queryState(Canceled);
265}
266
267bool QFutureInterfaceBase::isFinished() const
268{
269 return queryState(Finished);
270}
271
272bool QFutureInterfaceBase::isSuspending() const
273{
274 return queryState(Suspending);
275}
276
277#if QT_DEPRECATED_SINCE(6, 0)
278bool QFutureInterfaceBase::isPaused() const
279{
280 return queryState(static_cast<State>(suspendingOrSuspended));
281}
282#endif
283
284bool QFutureInterfaceBase::isSuspended() const
285{
286 return queryState(Suspended);
287}
288
289bool QFutureInterfaceBase::isThrottled() const
290{
291 return queryState(Throttled);
292}
293
294bool QFutureInterfaceBase::isResultReadyAt(int index) const
295{
296 QMutexLocker lock(&d->m_mutex);
297 return d->internal_isResultReadyAt(index);
298}
299
300bool QFutureInterfaceBase::isValid() const
301{
302 const QMutexLocker lock(&d->m_mutex);
303 return d->isValid;
304}
305
306bool QFutureInterfaceBase::isRunningOrPending() const
307{
308 return queryState(static_cast<State>(Running | Pending));
309}
310
311bool QFutureInterfaceBase::waitForNextResult()
312{
313 QMutexLocker lock(&d->m_mutex);
314 return d->internal_waitForNextResult();
315}
316
317void QFutureInterfaceBase::waitForResume()
318{
319 // return early if possible to avoid taking the mutex lock.
320 {
321 const int state = d->state.loadRelaxed();
322 if (!(state & suspendingOrSuspended) || (state & Canceled))
323 return;
324 }
325
326 QMutexLocker lock(&d->m_mutex);
327 const int state = d->state.loadRelaxed();
328 if (!(state & suspendingOrSuspended) || (state & Canceled))
329 return;
330
331 // decrease active thread count since this thread will wait.
332 const ThreadPoolThreadReleaser releaser(d->pool());
333
334 d->pausedWaitCondition.wait(&d->m_mutex);
335}
336
337void QFutureInterfaceBase::suspendIfRequested()
338{
339 const auto canSuspend = [] (int state) {
340 // can suspend only if 1) in any suspend-related state; 2) not canceled
341 return (state & suspendingOrSuspended) && !(state & Canceled);
342 };
343
344 // return early if possible to avoid taking the mutex lock.
345 {
346 const int state = d->state.loadRelaxed();
347 if (!canSuspend(state))
348 return;
349 }
350
351 QMutexLocker lock(&d->m_mutex);
352 const int state = d->state.loadRelaxed();
353 if (!canSuspend(state))
354 return;
355
356 // Note: expecting that Suspending and Suspended are mutually exclusive
357 if (!(state & Suspended)) {
358 // switch state in case this is the first invocation
359 switch_from_to(d->state, Suspending, Suspended);
360 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
361 }
362
363 // decrease active thread count since this thread will wait.
364 const ThreadPoolThreadReleaser releaser(d->pool());
365 d->pausedWaitCondition.wait(&d->m_mutex);
366}
367
368int QFutureInterfaceBase::progressValue() const
369{
370 const QMutexLocker lock(&d->m_mutex);
371 return d->m_progressValue;
372}
373
374int QFutureInterfaceBase::progressMinimum() const
375{
376 const QMutexLocker lock(&d->m_mutex);
377 return d->m_progress ? d->m_progress->minimum : 0;
378}
379
380int QFutureInterfaceBase::progressMaximum() const
381{
382 const QMutexLocker lock(&d->m_mutex);
383 return d->m_progress ? d->m_progress->maximum : 0;
384}
385
386int QFutureInterfaceBase::resultCount() const
387{
388 QMutexLocker lock(&d->m_mutex);
389 return d->internal_resultCount();
390}
391
392QString QFutureInterfaceBase::progressText() const
393{
394 QMutexLocker locker(&d->m_mutex);
395 return d->m_progress ? d->m_progress->text : QString();
396}
397
398bool QFutureInterfaceBase::isProgressUpdateNeeded() const
399{
400 QMutexLocker locker(&d->m_mutex);
401 return !d->progressTime.isValid() || (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond));
402}
403
404void QFutureInterfaceBase::reportStarted()
405{
406 QMutexLocker locker(&d->m_mutex);
407 if (d->state.loadRelaxed() & (Started|Canceled|Finished))
408 return;
409 d->setState(State(Started | Running));
410 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Started));
411 d->isValid = true;
412}
413
414void QFutureInterfaceBase::reportCanceled()
415{
416 cancel();
417}
418
419#ifndef QT_NO_EXCEPTIONS
420void QFutureInterfaceBase::reportException(const QException &exception)
421{
422 try {
423 exception.raise();
424 } catch (...) {
425 reportException(std::current_exception());
426 }
427}
428
429#if QT_VERSION < QT_VERSION_CHECK(7, 0, 0)
430void QFutureInterfaceBase::reportException(std::exception_ptr exception)
431#else
432void QFutureInterfaceBase::reportException(const std::exception_ptr &exception)
433#endif
434{
435 QMutexLocker locker(&d->m_mutex);
436 if (d->state.loadRelaxed() & (Canceled|Finished))
437 return;
438
439 d->hasException = true;
440 d->data.setException(exception);
441 switch_on(d->state, Canceled);
442 d->waitCondition.wakeAll();
443 d->pausedWaitCondition.wakeAll();
444 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
445}
446#endif
447
448void QFutureInterfaceBase::reportFinished()
449{
450 QMutexLocker locker(&d->m_mutex);
451 if (!isFinished()) {
452 switch_from_to(d->state, Running, Finished);
453 d->waitCondition.wakeAll();
454 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
455 }
456}
457
458void QFutureInterfaceBase::setExpectedResultCount(int resultCount)
459{
460 if (d->m_progress)
461 setProgressRange(0, resultCount);
462 d->m_expectedResultCount = resultCount;
463}
464
465int QFutureInterfaceBase::expectedResultCount()
466{
467 return d->m_expectedResultCount;
468}
469
470bool QFutureInterfaceBase::queryState(State state) const
471{
472 return d->state.loadRelaxed() & state;
473}
474
475int QFutureInterfaceBase::loadState() const
476{
477 // Used from ~QPromise, so this check is needed
478 if (!d)
479 return QFutureInterfaceBase::State::NoState;
480 return d->state.loadRelaxed();
481}
482
483void QFutureInterfaceBase::waitForResult(int resultIndex)
484{
485 if (d->hasException)
486 d->data.m_exceptionStore.rethrowException();
487
488 QMutexLocker lock(&d->m_mutex);
489 if (!isRunningOrPending())
490 return;
491 lock.unlock();
492
493 // To avoid deadlocks and reduce the number of threads used, try to
494 // run the runnable in the current thread.
495 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
496
497 lock.relock();
498
499 const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
500 while (isRunningOrPending() && !d->internal_isResultReadyAt(waitIndex))
501 d->waitCondition.wait(&d->m_mutex);
502
503 if (d->hasException)
504 d->data.m_exceptionStore.rethrowException();
505}
506
507void QFutureInterfaceBase::waitForFinished()
508{
509 QMutexLocker lock(&d->m_mutex);
510 const bool alreadyFinished = isFinished();
511 lock.unlock();
512
513 if (!alreadyFinished) {
514 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
515
516 lock.relock();
517
518 while (!isFinished())
519 d->waitCondition.wait(&d->m_mutex);
520 }
521
522 if (d->hasException)
523 d->data.m_exceptionStore.rethrowException();
524}
525
526void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex)
527{
528 if (beginIndex == endIndex || (d->state.loadRelaxed() & (Canceled|Finished)))
529 return;
530
531 d->waitCondition.wakeAll();
532
533 if (!d->m_progress) {
534 if (d->internal_updateProgressValue(d->m_progressValue + endIndex - beginIndex) == false) {
535 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
536 beginIndex,
537 endIndex));
538 return;
539 }
540
541 d->sendCallOuts(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
542 d->m_progressValue,
543 QString()),
544 QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
545 beginIndex,
546 endIndex));
547 return;
548 }
549 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex));
550}
551
552void QFutureInterfaceBase::setRunnable(QRunnable *runnable)
553{
554 d->runnable = runnable;
555}
556
557void QFutureInterfaceBase::setThreadPool(QThreadPool *pool)
558{
559 d->m_pool = pool;
560}
561
562QThreadPool *QFutureInterfaceBase::threadPool() const
563{
564 return d->m_pool;
565}
566
567void QFutureInterfaceBase::setFilterMode(bool enable)
568{
569 QMutexLocker locker(&d->m_mutex);
570 if (!hasException())
571 resultStoreBase().setFilterMode(enable);
572}
573
574/*!
575 \internal
576 Sets the progress range's minimum and maximum values to \a minimum and
577 \a maximum respectively.
578
579 If \a maximum is smaller than \a minimum, \a minimum becomes the only
580 legal value.
581
582 The progress value is reset to be \a minimum.
583
584 The progress range usage can be disabled by using setProgressRange(0, 0).
585 In this case progress value is also reset to 0.
586
587 The behavior of this method is mostly inspired by
588 \l QProgressBar::setRange.
589*/
590void QFutureInterfaceBase::setProgressRange(int minimum, int maximum)
591{
592 QMutexLocker locker(&d->m_mutex);
593 if (!d->m_progress)
594 d->m_progress.reset(new QFutureInterfaceBasePrivate::ProgressData());
595 d->m_progress->minimum = minimum;
596 d->m_progress->maximum = qMax(minimum, maximum);
597 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum));
598 d->m_progressValue = minimum;
599}
600
601void QFutureInterfaceBase::setProgressValue(int progressValue)
602{
603 setProgressValueAndText(progressValue, QString());
604}
605
606/*!
607 \internal
608 In case of the \a progressValue falling out of the progress range,
609 this method has no effect.
610 Such behavior is inspired by \l QProgressBar::setValue.
611*/
612void QFutureInterfaceBase::setProgressValueAndText(int progressValue,
613 const QString &progressText)
614{
615 QMutexLocker locker(&d->m_mutex);
616 if (!d->m_progress)
617 d->m_progress.reset(new QFutureInterfaceBasePrivate::ProgressData());
618
619 const bool useProgressRange = (d->m_progress->maximum != 0) || (d->m_progress->minimum != 0);
620 if (useProgressRange
621 && ((progressValue < d->m_progress->minimum) || (progressValue > d->m_progress->maximum))) {
622 return;
623 }
624
625 if (d->m_progressValue >= progressValue)
626 return;
627
628 if (d->state.loadRelaxed() & (Canceled|Finished))
629 return;
630
631 if (d->internal_updateProgress(progressValue, progressText)) {
632 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
633 d->m_progressValue,
634 d->m_progress->text));
635 }
636}
637
638QMutex &QFutureInterfaceBase::mutex() const
639{
640 return d->m_mutex;
641}
642
643bool QFutureInterfaceBase::hasException() const
644{
645 return d->hasException;
646}
647
648QtPrivate::ExceptionStore &QFutureInterfaceBase::exceptionStore()
649{
650 Q_ASSERT(d->hasException);
651 return d->data.m_exceptionStore;
652}
653
654QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase()
655{
656 Q_ASSERT(!d->hasException);
657 return d->data.m_results;
658}
659
660const QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() const
661{
662 Q_ASSERT(!d->hasException);
663 return d->data.m_results;
664}
665
666QFutureInterfaceBase &QFutureInterfaceBase::operator=(const QFutureInterfaceBase &other)
667{
668 QFutureInterfaceBase copy(other);
669 swap(copy);
670 return *this;
671}
672
673// ### Qt 7: inline
674void QFutureInterfaceBase::swap(QFutureInterfaceBase &other) noexcept
675{
676 qSwap(d, other.d);
677}
678
679bool QFutureInterfaceBase::refT() const noexcept
680{
681 return d->refCount.refT();
682}
683
684bool QFutureInterfaceBase::derefT() const noexcept
685{
686 // Called from ~QFutureInterface
687 return !d || d->refCount.derefT();
688}
689
690void QFutureInterfaceBase::reset()
691{
692 d->m_progressValue = 0;
693 d->m_progress.reset();
694 d->progressTime.invalidate();
695 d->isValid = false;
696}
697
698void QFutureInterfaceBase::rethrowPossibleException()
699{
700 if (hasException())
701 exceptionStore().rethrowException();
702}
703
704QFutureInterfaceBasePrivate::QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
706{
707 progressTime.invalidate();
708}
709
711{
712 if (hasException)
713 data.m_exceptionStore.~ExceptionStore();
714 else
715 data.m_results.~ResultStoreBase();
716}
717
719{
720 return hasException ? 0 : data.m_results.count(); // ### subtract canceled results.
721}
722
724{
725 return hasException ? false : (data.m_results.contains(index));
726}
727
729{
730 if (hasException)
731 return false;
732
733 if (data.m_results.hasNextResult())
734 return true;
735
736 while ((state.loadRelaxed() & QFutureInterfaceBase::Running)
737 && data.m_results.hasNextResult() == false)
738 waitCondition.wait(&m_mutex);
739
740 return !(state.loadRelaxed() & QFutureInterfaceBase::Canceled)
741 && data.m_results.hasNextResult();
742}
743
745{
746 if (m_progressValue >= progress)
747 return false;
748
749 m_progressValue = progress;
750
751 if (progressTime.isValid() && m_progressValue != 0) // make sure the first and last steps are emitted.
752 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
753 return false;
754
755 progressTime.start();
756 return true;
757
758}
759
761 const QString &progressText)
762{
763 if (m_progressValue >= progress)
764 return false;
765
766 Q_ASSERT(m_progress);
767
768 m_progressValue = progress;
769 m_progress->text = progressText;
770
771 if (progressTime.isValid() && m_progressValue != m_progress->maximum) // make sure the first and last steps are emitted.
772 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
773 return false;
774
775 progressTime.start();
776 return true;
777}
778
780{
781 // bail out if we are not changing the state
782 if ((enable && (state.loadRelaxed() & QFutureInterfaceBase::Throttled))
783 || (!enable && !(state.loadRelaxed() & QFutureInterfaceBase::Throttled)))
784 return;
785
786 // change the state
787 if (enable) {
788 switch_on(state, QFutureInterfaceBase::Throttled);
789 } else {
790 switch_off(state, QFutureInterfaceBase::Throttled);
791 if (!(state.loadRelaxed() & suspendingOrSuspended))
792 pausedWaitCondition.wakeAll();
793 }
794}
795
796void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOutEvent)
797{
798 if (outputConnections.isEmpty())
799 return;
800
801 for (int i = 0; i < outputConnections.size(); ++i)
802 outputConnections.at(i)->postCallOutEvent(callOutEvent);
803}
804
805void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOutEvent1,
806 const QFutureCallOutEvent &callOutEvent2)
807{
808 if (outputConnections.isEmpty())
809 return;
810
811 for (int i = 0; i < outputConnections.size(); ++i) {
812 QFutureCallOutInterface *iface = outputConnections.at(i);
813 iface->postCallOutEvent(callOutEvent1);
814 iface->postCallOutEvent(callOutEvent2);
815 }
816}
817
818// This function connects an output interface (for example a QFutureWatcher)
819// to this future. While holding the lock we check the state and ready results
820// and add the appropriate callouts to the queue.
821void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *iface)
822{
823 QMutexLocker locker(&m_mutex);
824
825 const auto currentState = state.loadRelaxed();
826 if (currentState & QFutureInterfaceBase::Started) {
827 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
828 if (m_progress) {
829 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
830 m_progress->minimum,
831 m_progress->maximum));
832 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
833 m_progressValue,
834 m_progress->text));
835 } else {
836 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
837 0,
838 0));
839 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
840 m_progressValue,
841 QString()));
842 }
843 }
844
845 if (!hasException) {
846 QtPrivate::ResultIteratorBase it = data.m_results.begin();
847 while (it != data.m_results.end()) {
848 const int begin = it.resultIndex();
849 const int end = begin + it.batchSize();
850 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
851 begin,
852 end));
853 it.batchedAdvance();
854 }
855 }
856
857 if (currentState & QFutureInterfaceBase::Suspended)
858 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
859 else if (currentState & QFutureInterfaceBase::Suspending)
860 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
861
862 if (currentState & QFutureInterfaceBase::Canceled)
863 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
864
865 if (currentState & QFutureInterfaceBase::Finished)
866 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
867
868 outputConnections.append(iface);
869}
870
871void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *iface)
872{
873 QMutexLocker lock(&m_mutex);
874 const qsizetype index = outputConnections.indexOf(iface);
875 if (index == -1)
876 return;
877 outputConnections.removeAt(index);
878
879 iface->callOutInterfaceDisconnected();
880}
881
882void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState)
883{
884 state.storeRelaxed(newState);
885}
886
887void QFutureInterfaceBase::setContinuation(std::function<void (const QFutureInterfaceBase &)> func,
888 void *continuationFutureData, ContinuationType type)
889{
890 auto *futureData = static_cast<QFutureInterfaceBasePrivate *>(continuationFutureData);
891
892 QMutexLocker lock(&d->continuationMutex);
893
894 // If the state is ready, run continuation immediately,
895 // otherwise save it for later.
896 if (isFinished()) {
897 d->continuationExecuted = true;
898 lock.unlock();
899 func(*this);
900 lock.relock();
901 }
902 // Unless the continuation has been cleaned earlier, we have to
903 // store the move-only continuation, to guarantee that the associated
904 // future's data stays alive.
905 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) {
906 if (d->continuation) {
907 qWarning("Adding a continuation to a future which already has a continuation. "
908 "The existing continuation is overwritten.");
909 if (d->continuationData)
910 d->continuationData->nonConcludedParent = nullptr;
911 }
912 d->continuation = std::move(func);
913 if (futureData) {
914 futureData->continuationType = type;
915 futureData->nonConcludedParent = d;
916 }
917 d->continuationData = futureData;
918 Q_ASSERT_X(!futureData || futureData->continuationType != ContinuationType::Unknown,
919 "setContinuation", "Make sure to provide a correct continuation type!");
920 }
921}
922
923/*
924 For continuations with context we expect all the needed data to be captured
925 directly by the continuation data, because this simplifies the slot
926 invocation. That's why func has no parameters.
927
928 We pass continuation data as a QVariant, because we need to keep the
929 QFutureInterface<T> for the entire lifetime of the continuation, but we
930 cannot pass a template type T as a parameter.
931*/
932void QFutureInterfaceBase::setContinuation(const QObject *context, std::function<void()> func,
933 const QVariant &continuationFuture,
934 ContinuationType type)
935{
936 Q_ASSERT(context);
937
938 using FuncType = void();
939 using Prototype = typename QtPrivate::Callable<FuncType>::Function;
940 auto slotObj = QtPrivate::makeCallableObject<Prototype>(std::move(func));
941
942 auto slot = QtPrivate::SlotObjUniquePtr(slotObj);
943
944 auto *watcher = new QObjectContinuationWrapper;
945 watcher->moveToThread(context->thread());
946
947 // We need to protect acccess to the watcher. The context object (and in turn, the watcher)
948 // could be destroyed while the continuation that emits the signal is running. We have to
949 // prevent that.
950 // The mutex has to be recursive, because the continuation itself could delete the context
951 // object (and thus the watcher), which will try to lock the mutex from the same thread twice.
952 auto watcherMutex = std::make_shared<QRecursiveMutex>();
953 const auto destroyWatcher = [watcherMutex, watcher]() mutable {
954 QMutexLocker lock(watcherMutex.get());
955 delete watcher;
956 };
957
958 // ### we're missing a convenient way to `QObject::connect()` to a `QSlotObjectBase`...
959 QObject::connect(watcher, &QObjectContinuationWrapper::run,
960 // for the following, cf. QMetaObject::invokeMethodImpl():
961 // we know `slot` is a lambda returning `void`, so we can just
962 // `call()` with `obj` and `args[0]` set to `nullptr`:
963 context, [slot = std::move(slot)] {
964 void *args[] = { nullptr }; // for `void` return value
965 slot->call(nullptr, args);
966 });
967 QObject::connect(watcher, &QObjectContinuationWrapper::run, watcher, destroyWatcher);
968
969 // We need to connect to destroyWatcher here, instead of delete or deleteLater().
970 // If the continuation is called from a separate thread, emit watcher->run() can't detect that
971 // the watcher has been deleted in the separate thread, causing a race condition and potential
972 // heap-use-after-free issue inside QObject::doActivate. destroyWatcher forces the deletion of
973 // the watcher to occur after emit watcher->run() completes and prevents the race condition.
974 QObject::connect(context, &QObject::destroyed, watcher, destroyWatcher);
975
976 // Extract a QFutureInterfaceBasePrivate pointer from the QVariant. We rely
977 // on the fact that QVariant contains QFutureInterface<T>.
978 QFutureInterfaceBasePrivate *continuationFutureData = nullptr;
979 if (continuationFuture.isValid()) {
980 Q_ASSERT(QLatin1StringView(continuationFuture.typeName())
981 .startsWith(QLatin1StringView("QFutureInterface")));
982 const auto continuationPtr =
983 static_cast<const QFutureInterfaceBase *>(continuationFuture.constData());
984 continuationFutureData = continuationPtr->d;
985 }
986
987 // Capture continuationFuture so that it lives as long as the continuation,
988 // and the continuation data remains valid.
989 setContinuation([watcherMutex = std::move(watcherMutex),
990 watcher = QPointer(watcher), continuationFuture]
991 (const QFutureInterfaceBase &parentData)
992 {
993 Q_UNUSED(parentData);
994 Q_UNUSED(continuationFuture);
995 QMutexLocker lock(watcherMutex.get());
996 if (watcher)
997 emit watcher->run();
998 }, continuationFutureData, type);
999}
1000
1001void QFutureInterfaceBase::cleanContinuation()
1002{
1003 if (!d)
1004 return;
1005
1006 QMutexLocker lock(&d->continuationMutex);
1007 d->continuation = nullptr;
1008 d->continuationState = QFutureInterfaceBasePrivate::Cleaned;
1009 d->continuationData = nullptr;
1010}
1011
1012void QFutureInterfaceBase::runContinuation() const
1013{
1014 QMutexLocker lock(&d->continuationMutex);
1015 if (d->continuation && !d->continuationExecuted) {
1016 // If we run the next continuation, then this future is concluded, so
1017 // we wouldn't need to revisit it in the cancelChain()
1018 if (d->continuationData)
1019 d->continuationData->nonConcludedParent = nullptr;
1020 // Save the continuation in a local function, to avoid calling
1021 // a null std::function below, in case cleanContinuation() is
1022 // called from some other thread right after unlock() below.
1023 d->continuationExecuted = true;
1024 auto fn = std::move(d->continuation);
1025 lock.unlock();
1026 fn(*this);
1027
1028 lock.relock();
1029 // Unless the continuation has been cleaned earlier, we have to
1030 // store the move-only continuation, to guarantee that the associated
1031 // future's data stays alive.
1032 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned)
1033 d->continuation = std::move(fn);
1034 }
1035}
1036
1037bool QFutureInterfaceBase::isChainCanceled() const
1038{
1039 return isCanceled() || d->continuationState == QFutureInterfaceBasePrivate::Canceled;
1040}
1041
1042void QFutureInterfaceBase::setLaunchAsync(bool value)
1043{
1044 d->launchAsync = value;
1045}
1046
1047bool QFutureInterfaceBase::launchAsync() const
1048{
1049 return d->launchAsync;
1050}
1051
1052namespace QtFuture {
1053
1055{
1056 QFutureInterface<void> promise;
1057 promise.reportStarted();
1058 promise.reportFinished();
1059
1060 return promise.future();
1061}
1062
1063} // namespace QtFuture
1064
1065QT_END_NAMESPACE
1066
1067#include "qfutureinterface.moc"
bool internal_updateProgress(int progress, const QString &progressText=QString())
void sendCallOuts(const QFutureCallOutEvent &callOut1, const QFutureCallOutEvent &callOut2)
void sendCallOut(const QFutureCallOutEvent &callOut)
void setState(QFutureInterfaceBase::State state)
bool internal_updateProgressValue(int progress)
bool internal_isResultReadyAt(int index) const
void internal_setThrottled(bool enable)
QFutureInterfaceBasePrivate * continuationData
QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
typename QtPrivate::ResultTypeHelper< Function, T >::ResultType ResultType
Definition qfuture.h:130
QFuture< void > makeReadyVoidFuture()
void qfutureWarnIfUnusedResults(qsizetype numResults)
static int switch_from_to(QAtomicInt &a, int from, int to)
static int switch_off(QAtomicInt &a, int which)
static int switch_on(QAtomicInt &a, int which)