Qt
Internal/Contributor docs for the Qt SDK. Note: These are NOT official API docs; those are found at https://doc.qt.io/
Loading...
Searching...
No Matches
qfutureinterface.cpp
Go to the documentation of this file.
1// Copyright (C) 2020 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4// qfutureinterface.h included from qfuture.h
5#include "qfuture.h"
7
8#include <QtCore/qatomic.h>
9#include <QtCore/qcoreapplication.h>
10#include <QtCore/qthread.h>
11#include <QtCore/qvarlengtharray.h>
12#include <private/qthreadpool_p.h>
13#include <private/qobject_p.h>
14
15#include <climits> // For INT_MAX
16
17// GCC 12 gets confused about QFutureInterfaceBase::state, for some non-obvious
18// reason
19// warning: ‘unsigned int __atomic_or_fetch_4(volatile void*, unsigned int, int)’ writing 4 bytes into a region of size 0 overflows the destination [-Wstringop-overflow=]
20QT_WARNING_DISABLE_GCC("-Wstringop-overflow")
21
22QT_BEGIN_NAMESPACE
23
24enum {
25 MaxProgressEmitsPerSecond = 25
26};
27
28namespace {
29class ThreadPoolThreadReleaser {
30 QThreadPool *m_pool;
31public:
32 Q_NODISCARD_CTOR
33 explicit ThreadPoolThreadReleaser(QThreadPool *pool)
34 : m_pool(pool)
35 { if (pool) pool->releaseThread(); }
36 ~ThreadPoolThreadReleaser()
37 { if (m_pool) m_pool->reserveThread(); }
38};
39
40const auto suspendingOrSuspended =
41 QFutureInterfaceBase::Suspending | QFutureInterfaceBase::Suspended;
42
43} // unnamed namespace
44
46{
48public:
51 {
52 }
53
55 void run();
56};
57
58QFutureCallOutInterface::~QFutureCallOutInterface()
59 = default;
60
62
63QFutureInterfaceBase::QFutureInterfaceBase(State initialState)
64 : d(new QFutureInterfaceBasePrivate(initialState))
65{ }
66
67QFutureInterfaceBase::QFutureInterfaceBase(const QFutureInterfaceBase &other)
68 : d(other.d)
69{
70 d->refCount.ref();
71}
72
73QFutureInterfaceBase::~QFutureInterfaceBase()
74{
75 if (d && !d->refCount.deref())
76 delete d;
77}
78
79static inline int switch_on(QAtomicInt &a, int which)
80{
81 return a.fetchAndOrRelaxed(which) | which;
82}
83
84static inline int switch_off(QAtomicInt &a, int which)
85{
86 return a.fetchAndAndRelaxed(~which) & ~which;
87}
88
89static inline int switch_from_to(QAtomicInt &a, int from, int to)
90{
91 const auto adjusted = [&](int old) { return (old & ~from) | to; };
92 int value = a.loadRelaxed();
93 while (!a.testAndSetRelaxed(value, adjusted(value), value))
94 qYieldCpu();
95 return value;
96}
97
98void QFutureInterfaceBasePrivate::cancelImpl(QFutureInterfaceBase::CancelMode mode,
99 CancelOptions options)
100{
101 QMutexLocker locker(&m_mutex);
102
103 const auto oldState = state.loadRelaxed();
104
105 switch (mode) {
106 case QFutureInterfaceBase::CancelMode::CancelAndFinish:
107 if ((oldState & QFutureInterfaceBase::Finished)
108 && (oldState & QFutureInterfaceBase::Canceled)) {
109 return;
110 }
111 switch_from_to(state, suspendingOrSuspended | QFutureInterfaceBase::Running,
112 QFutureInterfaceBase::Canceled | QFutureInterfaceBase::Finished);
113 break;
114 case QFutureInterfaceBase::CancelMode::CancelOnly:
115 if (oldState & QFutureInterfaceBase::Canceled)
116 return;
117 switch_from_to(state, suspendingOrSuspended, QFutureInterfaceBase::Canceled);
118 break;
119 }
120
121 if (options & CancelOption::CancelContinuations) {
122 // Cancel the continuations chain
123 QMutexLocker continuationLocker(&continuationMutex);
125 while (next) {
126 QMutexLocker nextLocker(&next->continuationMutex);
127 if (next->continuationType == QFutureInterfaceBase::ContinuationType::Then) {
128 next->continuationState = QFutureInterfaceBasePrivate::Canceled;
129 next = next->continuationData;
130 } else {
131 break;
132 }
133 }
134 }
135
136 waitCondition.wakeAll();
137 pausedWaitCondition.wakeAll();
138
139 if (!(oldState & QFutureInterfaceBase::Canceled))
140 sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
141 if (mode == QFutureInterfaceBase::CancelMode::CancelAndFinish
142 && !(oldState & QFutureInterfaceBase::Finished)) {
143 sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
144 }
145
146 isValid = false;
147}
148
149void QFutureInterfaceBase::cancel()
150{
151 cancel(CancelMode::CancelOnly);
152}
153
154void QFutureInterfaceBase::cancelChain()
155{
156 cancelChain(CancelMode::CancelOnly);
157}
158
159void QFutureInterfaceBase::cancel(QFutureInterfaceBase::CancelMode mode)
160{
161 d->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::CancelContinuations);
162}
163
164void QFutureInterfaceBase::cancelChain(QFutureInterfaceBase::CancelMode mode)
165{
166 // go up through the list of continuations, cancelling each of them
167 {
168 QMutexLocker locker(&d->continuationMutex);
169 QFutureInterfaceBasePrivate *prev = d->nonConcludedParent;
170 while (prev) {
171 // Do not cancel continuations, because we're going bottom-to-top
172 prev->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::None);
173 QMutexLocker prevLocker(&prev->continuationMutex);
174 prev = prev->nonConcludedParent;
175 }
176 }
177 // finally, cancel self and all next continuations
178 d->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::CancelContinuations);
179}
180
181void QFutureInterfaceBase::setSuspended(bool suspend)
182{
183 QMutexLocker locker(&d->m_mutex);
184 if (suspend) {
185 switch_on(d->state, Suspending);
186 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
187 } else {
188 switch_off(d->state, suspendingOrSuspended);
189 d->pausedWaitCondition.wakeAll();
190 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
191 }
192}
193
194void QFutureInterfaceBase::toggleSuspended()
195{
196 QMutexLocker locker(&d->m_mutex);
197 if (d->state.loadRelaxed() & suspendingOrSuspended) {
198 switch_off(d->state, suspendingOrSuspended);
199 d->pausedWaitCondition.wakeAll();
200 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
201 } else {
202 switch_on(d->state, Suspending);
203 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
204 }
205}
206
207void QFutureInterfaceBase::reportSuspended() const
208{
209 // Needs to be called when pause is in effect,
210 // i.e. no more events will be reported.
211
212 QMutexLocker locker(&d->m_mutex);
213 const int state = d->state.loadRelaxed();
214 if (!(state & Suspending) || (state & Suspended))
215 return;
216
217 switch_from_to(d->state, Suspending, Suspended);
218 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
219}
220
221void QFutureInterfaceBase::setThrottled(bool enable)
222{
223 QMutexLocker lock(&d->m_mutex);
224 if (enable) {
225 switch_on(d->state, Throttled);
226 } else {
227 switch_off(d->state, Throttled);
228 if (!(d->state.loadRelaxed() & suspendingOrSuspended))
229 d->pausedWaitCondition.wakeAll();
230 }
231}
232
233
234bool QFutureInterfaceBase::isRunning() const
235{
236 return queryState(Running);
237}
238
239bool QFutureInterfaceBase::isStarted() const
240{
241 return queryState(Started);
242}
243
244bool QFutureInterfaceBase::isCanceled() const
245{
246 return queryState(Canceled);
247}
248
249bool QFutureInterfaceBase::isFinished() const
250{
251 return queryState(Finished);
252}
253
254bool QFutureInterfaceBase::isSuspending() const
255{
256 return queryState(Suspending);
257}
258
259#if QT_DEPRECATED_SINCE(6, 0)
260bool QFutureInterfaceBase::isPaused() const
261{
262 return queryState(static_cast<State>(suspendingOrSuspended));
263}
264#endif
265
266bool QFutureInterfaceBase::isSuspended() const
267{
268 return queryState(Suspended);
269}
270
271bool QFutureInterfaceBase::isThrottled() const
272{
273 return queryState(Throttled);
274}
275
276bool QFutureInterfaceBase::isResultReadyAt(int index) const
277{
278 QMutexLocker lock(&d->m_mutex);
279 return d->internal_isResultReadyAt(index);
280}
281
282bool QFutureInterfaceBase::isValid() const
283{
284 const QMutexLocker lock(&d->m_mutex);
285 return d->isValid;
286}
287
288bool QFutureInterfaceBase::isRunningOrPending() const
289{
290 return queryState(static_cast<State>(Running | Pending));
291}
292
293bool QFutureInterfaceBase::waitForNextResult()
294{
295 QMutexLocker lock(&d->m_mutex);
296 return d->internal_waitForNextResult();
297}
298
299void QFutureInterfaceBase::waitForResume()
300{
301 // return early if possible to avoid taking the mutex lock.
302 {
303 const int state = d->state.loadRelaxed();
304 if (!(state & suspendingOrSuspended) || (state & Canceled))
305 return;
306 }
307
308 QMutexLocker lock(&d->m_mutex);
309 const int state = d->state.loadRelaxed();
310 if (!(state & suspendingOrSuspended) || (state & Canceled))
311 return;
312
313 // decrease active thread count since this thread will wait.
314 const ThreadPoolThreadReleaser releaser(d->pool());
315
316 d->pausedWaitCondition.wait(&d->m_mutex);
317}
318
319void QFutureInterfaceBase::suspendIfRequested()
320{
321 const auto canSuspend = [] (int state) {
322 // can suspend only if 1) in any suspend-related state; 2) not canceled
323 return (state & suspendingOrSuspended) && !(state & Canceled);
324 };
325
326 // return early if possible to avoid taking the mutex lock.
327 {
328 const int state = d->state.loadRelaxed();
329 if (!canSuspend(state))
330 return;
331 }
332
333 QMutexLocker lock(&d->m_mutex);
334 const int state = d->state.loadRelaxed();
335 if (!canSuspend(state))
336 return;
337
338 // Note: expecting that Suspending and Suspended are mutually exclusive
339 if (!(state & Suspended)) {
340 // switch state in case this is the first invocation
341 switch_from_to(d->state, Suspending, Suspended);
342 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
343 }
344
345 // decrease active thread count since this thread will wait.
346 const ThreadPoolThreadReleaser releaser(d->pool());
347 d->pausedWaitCondition.wait(&d->m_mutex);
348}
349
350int QFutureInterfaceBase::progressValue() const
351{
352 const QMutexLocker lock(&d->m_mutex);
353 return d->m_progressValue;
354}
355
356int QFutureInterfaceBase::progressMinimum() const
357{
358 const QMutexLocker lock(&d->m_mutex);
359 return d->m_progress ? d->m_progress->minimum : 0;
360}
361
362int QFutureInterfaceBase::progressMaximum() const
363{
364 const QMutexLocker lock(&d->m_mutex);
365 return d->m_progress ? d->m_progress->maximum : 0;
366}
367
368int QFutureInterfaceBase::resultCount() const
369{
370 QMutexLocker lock(&d->m_mutex);
371 return d->internal_resultCount();
372}
373
374QString QFutureInterfaceBase::progressText() const
375{
376 QMutexLocker locker(&d->m_mutex);
377 return d->m_progress ? d->m_progress->text : QString();
378}
379
380bool QFutureInterfaceBase::isProgressUpdateNeeded() const
381{
382 QMutexLocker locker(&d->m_mutex);
383 return !d->progressTime.isValid() || (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond));
384}
385
386void QFutureInterfaceBase::reportStarted()
387{
388 QMutexLocker locker(&d->m_mutex);
389 if (d->state.loadRelaxed() & (Started|Canceled|Finished))
390 return;
391 d->setState(State(Started | Running));
392 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Started));
393 d->isValid = true;
394}
395
396void QFutureInterfaceBase::reportCanceled()
397{
398 cancel();
399}
400
401#ifndef QT_NO_EXCEPTIONS
402void QFutureInterfaceBase::reportException(const QException &exception)
403{
404 try {
405 exception.raise();
406 } catch (...) {
407 reportException(std::current_exception());
408 }
409}
410
411#if QT_VERSION < QT_VERSION_CHECK(7, 0, 0)
412void QFutureInterfaceBase::reportException(std::exception_ptr exception)
413#else
414void QFutureInterfaceBase::reportException(const std::exception_ptr &exception)
415#endif
416{
417 QMutexLocker locker(&d->m_mutex);
418 if (d->state.loadRelaxed() & (Canceled|Finished))
419 return;
420
421 d->hasException = true;
422 d->data.setException(exception);
423 switch_on(d->state, Canceled);
424 d->waitCondition.wakeAll();
425 d->pausedWaitCondition.wakeAll();
426 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
427}
428#endif
429
430void QFutureInterfaceBase::reportFinished()
431{
432 QMutexLocker locker(&d->m_mutex);
433 if (!isFinished()) {
434 switch_from_to(d->state, Running, Finished);
435 d->waitCondition.wakeAll();
436 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
437 }
438}
439
440void QFutureInterfaceBase::setExpectedResultCount(int resultCount)
441{
442 if (d->m_progress)
443 setProgressRange(0, resultCount);
444 d->m_expectedResultCount = resultCount;
445}
446
447int QFutureInterfaceBase::expectedResultCount()
448{
449 return d->m_expectedResultCount;
450}
451
452bool QFutureInterfaceBase::queryState(State state) const
453{
454 return d->state.loadRelaxed() & state;
455}
456
457int QFutureInterfaceBase::loadState() const
458{
459 // Used from ~QPromise, so this check is needed
460 if (!d)
461 return QFutureInterfaceBase::State::NoState;
462 return d->state.loadRelaxed();
463}
464
465void QFutureInterfaceBase::waitForResult(int resultIndex)
466{
467 if (d->hasException)
468 d->data.m_exceptionStore.rethrowException();
469
470 QMutexLocker lock(&d->m_mutex);
471 if (!isRunningOrPending())
472 return;
473 lock.unlock();
474
475 // To avoid deadlocks and reduce the number of threads used, try to
476 // run the runnable in the current thread.
477 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
478
479 lock.relock();
480
481 const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
482 while (isRunningOrPending() && !d->internal_isResultReadyAt(waitIndex))
483 d->waitCondition.wait(&d->m_mutex);
484
485 if (d->hasException)
486 d->data.m_exceptionStore.rethrowException();
487}
488
489void QFutureInterfaceBase::waitForFinished()
490{
491 QMutexLocker lock(&d->m_mutex);
492 const bool alreadyFinished = isFinished();
493 lock.unlock();
494
495 if (!alreadyFinished) {
496 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
497
498 lock.relock();
499
500 while (!isFinished())
501 d->waitCondition.wait(&d->m_mutex);
502 }
503
504 if (d->hasException)
505 d->data.m_exceptionStore.rethrowException();
506}
507
508void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex)
509{
510 if (beginIndex == endIndex || (d->state.loadRelaxed() & (Canceled|Finished)))
511 return;
512
513 d->waitCondition.wakeAll();
514
515 if (!d->m_progress) {
516 if (d->internal_updateProgressValue(d->m_progressValue + endIndex - beginIndex) == false) {
517 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
518 beginIndex,
519 endIndex));
520 return;
521 }
522
523 d->sendCallOuts(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
524 d->m_progressValue,
525 QString()),
526 QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
527 beginIndex,
528 endIndex));
529 return;
530 }
531 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex));
532}
533
534void QFutureInterfaceBase::setRunnable(QRunnable *runnable)
535{
536 d->runnable = runnable;
537}
538
539void QFutureInterfaceBase::setThreadPool(QThreadPool *pool)
540{
541 d->m_pool = pool;
542}
543
544QThreadPool *QFutureInterfaceBase::threadPool() const
545{
546 return d->m_pool;
547}
548
549void QFutureInterfaceBase::setFilterMode(bool enable)
550{
551 QMutexLocker locker(&d->m_mutex);
552 if (!hasException())
553 resultStoreBase().setFilterMode(enable);
554}
555
556/*!
557 \internal
558 Sets the progress range's minimum and maximum values to \a minimum and
559 \a maximum respectively.
560
561 If \a maximum is smaller than \a minimum, \a minimum becomes the only
562 legal value.
563
564 The progress value is reset to be \a minimum.
565
566 The progress range usage can be disabled by using setProgressRange(0, 0).
567 In this case progress value is also reset to 0.
568
569 The behavior of this method is mostly inspired by
570 \l QProgressBar::setRange.
571*/
572void QFutureInterfaceBase::setProgressRange(int minimum, int maximum)
573{
574 QMutexLocker locker(&d->m_mutex);
575 if (!d->m_progress)
576 d->m_progress.reset(new QFutureInterfaceBasePrivate::ProgressData());
577 d->m_progress->minimum = minimum;
578 d->m_progress->maximum = qMax(minimum, maximum);
579 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum));
580 d->m_progressValue = minimum;
581}
582
583void QFutureInterfaceBase::setProgressValue(int progressValue)
584{
585 setProgressValueAndText(progressValue, QString());
586}
587
588/*!
589 \internal
590 In case of the \a progressValue falling out of the progress range,
591 this method has no effect.
592 Such behavior is inspired by \l QProgressBar::setValue.
593*/
594void QFutureInterfaceBase::setProgressValueAndText(int progressValue,
595 const QString &progressText)
596{
597 QMutexLocker locker(&d->m_mutex);
598 if (!d->m_progress)
599 d->m_progress.reset(new QFutureInterfaceBasePrivate::ProgressData());
600
601 const bool useProgressRange = (d->m_progress->maximum != 0) || (d->m_progress->minimum != 0);
602 if (useProgressRange
603 && ((progressValue < d->m_progress->minimum) || (progressValue > d->m_progress->maximum))) {
604 return;
605 }
606
607 if (d->m_progressValue >= progressValue)
608 return;
609
610 if (d->state.loadRelaxed() & (Canceled|Finished))
611 return;
612
613 if (d->internal_updateProgress(progressValue, progressText)) {
614 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
615 d->m_progressValue,
616 d->m_progress->text));
617 }
618}
619
620QMutex &QFutureInterfaceBase::mutex() const
621{
622 return d->m_mutex;
623}
624
625bool QFutureInterfaceBase::hasException() const
626{
627 return d->hasException;
628}
629
630QtPrivate::ExceptionStore &QFutureInterfaceBase::exceptionStore()
631{
632 Q_ASSERT(d->hasException);
633 return d->data.m_exceptionStore;
634}
635
636QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase()
637{
638 Q_ASSERT(!d->hasException);
639 return d->data.m_results;
640}
641
642const QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() const
643{
644 Q_ASSERT(!d->hasException);
645 return d->data.m_results;
646}
647
648QFutureInterfaceBase &QFutureInterfaceBase::operator=(const QFutureInterfaceBase &other)
649{
650 QFutureInterfaceBase copy(other);
651 swap(copy);
652 return *this;
653}
654
655// ### Qt 7: inline
656void QFutureInterfaceBase::swap(QFutureInterfaceBase &other) noexcept
657{
658 qSwap(d, other.d);
659}
660
661bool QFutureInterfaceBase::refT() const noexcept
662{
663 return d->refCount.refT();
664}
665
666bool QFutureInterfaceBase::derefT() const noexcept
667{
668 // Called from ~QFutureInterface
669 return !d || d->refCount.derefT();
670}
671
672void QFutureInterfaceBase::reset()
673{
674 d->m_progressValue = 0;
675 d->m_progress.reset();
676 d->progressTime.invalidate();
677 d->isValid = false;
678}
679
680void QFutureInterfaceBase::rethrowPossibleException()
681{
682 if (hasException())
683 exceptionStore().rethrowException();
684}
685
686QFutureInterfaceBasePrivate::QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
688{
689 progressTime.invalidate();
690}
691
693{
694 if (hasException)
695 data.m_exceptionStore.~ExceptionStore();
696 else
697 data.m_results.~ResultStoreBase();
698}
699
701{
702 return hasException ? 0 : data.m_results.count(); // ### subtract canceled results.
703}
704
706{
707 return hasException ? false : (data.m_results.contains(index));
708}
709
711{
712 if (hasException)
713 return false;
714
715 if (data.m_results.hasNextResult())
716 return true;
717
718 while ((state.loadRelaxed() & QFutureInterfaceBase::Running)
719 && data.m_results.hasNextResult() == false)
720 waitCondition.wait(&m_mutex);
721
722 return !(state.loadRelaxed() & QFutureInterfaceBase::Canceled)
723 && data.m_results.hasNextResult();
724}
725
727{
728 if (m_progressValue >= progress)
729 return false;
730
731 m_progressValue = progress;
732
733 if (progressTime.isValid() && m_progressValue != 0) // make sure the first and last steps are emitted.
734 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
735 return false;
736
737 progressTime.start();
738 return true;
739
740}
741
743 const QString &progressText)
744{
745 if (m_progressValue >= progress)
746 return false;
747
748 Q_ASSERT(m_progress);
749
750 m_progressValue = progress;
751 m_progress->text = progressText;
752
753 if (progressTime.isValid() && m_progressValue != m_progress->maximum) // make sure the first and last steps are emitted.
754 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
755 return false;
756
757 progressTime.start();
758 return true;
759}
760
762{
763 // bail out if we are not changing the state
764 if ((enable && (state.loadRelaxed() & QFutureInterfaceBase::Throttled))
765 || (!enable && !(state.loadRelaxed() & QFutureInterfaceBase::Throttled)))
766 return;
767
768 // change the state
769 if (enable) {
770 switch_on(state, QFutureInterfaceBase::Throttled);
771 } else {
772 switch_off(state, QFutureInterfaceBase::Throttled);
773 if (!(state.loadRelaxed() & suspendingOrSuspended))
774 pausedWaitCondition.wakeAll();
775 }
776}
777
778void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOutEvent)
779{
780 if (outputConnections.isEmpty())
781 return;
782
783 for (int i = 0; i < outputConnections.size(); ++i)
784 outputConnections.at(i)->postCallOutEvent(callOutEvent);
785}
786
787void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOutEvent1,
788 const QFutureCallOutEvent &callOutEvent2)
789{
790 if (outputConnections.isEmpty())
791 return;
792
793 for (int i = 0; i < outputConnections.size(); ++i) {
794 QFutureCallOutInterface *iface = outputConnections.at(i);
795 iface->postCallOutEvent(callOutEvent1);
796 iface->postCallOutEvent(callOutEvent2);
797 }
798}
799
800// This function connects an output interface (for example a QFutureWatcher)
801// to this future. While holding the lock we check the state and ready results
802// and add the appropriate callouts to the queue.
803void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *iface)
804{
805 QMutexLocker locker(&m_mutex);
806
807 const auto currentState = state.loadRelaxed();
808 if (currentState & QFutureInterfaceBase::Started) {
809 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
810 if (m_progress) {
811 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
812 m_progress->minimum,
813 m_progress->maximum));
814 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
815 m_progressValue,
816 m_progress->text));
817 } else {
818 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
819 0,
820 0));
821 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
822 m_progressValue,
823 QString()));
824 }
825 }
826
827 if (!hasException) {
828 QtPrivate::ResultIteratorBase it = data.m_results.begin();
829 while (it != data.m_results.end()) {
830 const int begin = it.resultIndex();
831 const int end = begin + it.batchSize();
832 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
833 begin,
834 end));
835 it.batchedAdvance();
836 }
837 }
838
839 if (currentState & QFutureInterfaceBase::Suspended)
840 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
841 else if (currentState & QFutureInterfaceBase::Suspending)
842 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
843
844 if (currentState & QFutureInterfaceBase::Canceled)
845 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
846
847 if (currentState & QFutureInterfaceBase::Finished)
848 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
849
850 outputConnections.append(iface);
851}
852
853void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *iface)
854{
855 QMutexLocker lock(&m_mutex);
856 const qsizetype index = outputConnections.indexOf(iface);
857 if (index == -1)
858 return;
859 outputConnections.removeAt(index);
860
861 iface->callOutInterfaceDisconnected();
862}
863
864void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState)
865{
866 state.storeRelaxed(newState);
867}
868
869void QFutureInterfaceBase::setContinuation(std::function<void (const QFutureInterfaceBase &)> func,
870 void *continuationFutureData, ContinuationType type)
871{
872 auto *futureData = static_cast<QFutureInterfaceBasePrivate *>(continuationFutureData);
873
874 QMutexLocker lock(&d->continuationMutex);
875
876 // If the state is ready, run continuation immediately,
877 // otherwise save it for later.
878 if (isFinished()) {
879 d->continuationExecuted = true;
880 lock.unlock();
881 func(*this);
882 lock.relock();
883 }
884 // Unless the continuation has been cleaned earlier, we have to
885 // store the move-only continuation, to guarantee that the associated
886 // future's data stays alive.
887 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) {
888 if (d->continuation) {
889 qWarning("Adding a continuation to a future which already has a continuation. "
890 "The existing continuation is overwritten.");
891 if (d->continuationData)
892 d->continuationData->nonConcludedParent = nullptr;
893 }
894 d->continuation = std::move(func);
895 if (futureData) {
896 futureData->continuationType = type;
897 futureData->nonConcludedParent = d;
898 }
899 d->continuationData = futureData;
900 Q_ASSERT_X(!futureData || futureData->continuationType != ContinuationType::Unknown,
901 "setContinuation", "Make sure to provide a correct continuation type!");
902 }
903}
904
905/*
906 For continuations with context we expect all the needed data to be captured
907 directly by the continuation data, because this simplifies the slot
908 invocation. That's why func has no parameters.
909
910 We pass continuation data as a QVariant, because we need to keep the
911 QFutureInterface<T> for the entire lifetime of the continuation, but we
912 cannot pass a template type T as a parameter.
913*/
914void QFutureInterfaceBase::setContinuation(const QObject *context, std::function<void()> func,
915 const QVariant &continuationFuture,
916 ContinuationType type)
917{
918 Q_ASSERT(context);
919
920 using FuncType = void();
921 using Prototype = typename QtPrivate::Callable<FuncType>::Function;
922 auto slotObj = QtPrivate::makeCallableObject<Prototype>(std::move(func));
923
924 auto slot = QtPrivate::SlotObjUniquePtr(slotObj);
925
926 auto *watcher = new QObjectContinuationWrapper;
927 watcher->moveToThread(context->thread());
928
929 // We need to protect acccess to the watcher. The context object (and in turn, the watcher)
930 // could be destroyed while the continuation that emits the signal is running. We have to
931 // prevent that.
932 // The mutex has to be recursive, because the continuation itself could delete the context
933 // object (and thus the watcher), which will try to lock the mutex from the same thread twice.
934 auto watcherMutex = std::make_shared<QRecursiveMutex>();
935 const auto destroyWatcher = [watcherMutex, watcher]() mutable {
936 QMutexLocker lock(watcherMutex.get());
937 delete watcher;
938 };
939
940 // ### we're missing a convenient way to `QObject::connect()` to a `QSlotObjectBase`...
941 QObject::connect(watcher, &QObjectContinuationWrapper::run,
942 // for the following, cf. QMetaObject::invokeMethodImpl():
943 // we know `slot` is a lambda returning `void`, so we can just
944 // `call()` with `obj` and `args[0]` set to `nullptr`:
945 context, [slot = std::move(slot)] {
946 void *args[] = { nullptr }; // for `void` return value
947 slot->call(nullptr, args);
948 });
949 QObject::connect(watcher, &QObjectContinuationWrapper::run, watcher, destroyWatcher);
950
951 // We need to connect to destroyWatcher here, instead of delete or deleteLater().
952 // If the continuation is called from a separate thread, emit watcher->run() can't detect that
953 // the watcher has been deleted in the separate thread, causing a race condition and potential
954 // heap-use-after-free issue inside QObject::doActivate. destroyWatcher forces the deletion of
955 // the watcher to occur after emit watcher->run() completes and prevents the race condition.
956 QObject::connect(context, &QObject::destroyed, watcher, destroyWatcher);
957
958 // Extract a QFutureInterfaceBasePrivate pointer from the QVariant. We rely
959 // on the fact that QVariant contains QFutureInterface<T>.
960 QFutureInterfaceBasePrivate *continuationFutureData = nullptr;
961 if (continuationFuture.isValid()) {
962 Q_ASSERT(QLatin1StringView(continuationFuture.typeName())
963 .startsWith(QLatin1StringView("QFutureInterface")));
964 const auto continuationPtr =
965 static_cast<const QFutureInterfaceBase *>(continuationFuture.constData());
966 continuationFutureData = continuationPtr->d;
967 }
968
969 // Capture continuationFuture so that it lives as long as the continuation,
970 // and the continuation data remains valid.
971 setContinuation([watcherMutex = std::move(watcherMutex),
972 watcher = QPointer(watcher), continuationFuture]
973 (const QFutureInterfaceBase &parentData)
974 {
975 Q_UNUSED(parentData);
976 Q_UNUSED(continuationFuture);
977 QMutexLocker lock(watcherMutex.get());
978 if (watcher)
979 emit watcher->run();
980 }, continuationFutureData, type);
981}
982
983void QFutureInterfaceBase::cleanContinuation()
984{
985 if (!d)
986 return;
987
988 QMutexLocker lock(&d->continuationMutex);
989 d->continuation = nullptr;
990 d->continuationState = QFutureInterfaceBasePrivate::Cleaned;
991 d->continuationData = nullptr;
992}
993
994void QFutureInterfaceBase::runContinuation() const
995{
996 QMutexLocker lock(&d->continuationMutex);
997 if (d->continuation && !d->continuationExecuted) {
998 // If we run the next continuation, then this future is concluded, so
999 // we wouldn't need to revisit it in the cancelChain()
1000 if (d->continuationData)
1001 d->continuationData->nonConcludedParent = nullptr;
1002 // Save the continuation in a local function, to avoid calling
1003 // a null std::function below, in case cleanContinuation() is
1004 // called from some other thread right after unlock() below.
1005 d->continuationExecuted = true;
1006 auto fn = std::move(d->continuation);
1007 lock.unlock();
1008 fn(*this);
1009
1010 lock.relock();
1011 // Unless the continuation has been cleaned earlier, we have to
1012 // store the move-only continuation, to guarantee that the associated
1013 // future's data stays alive.
1014 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned)
1015 d->continuation = std::move(fn);
1016 }
1017}
1018
1019bool QFutureInterfaceBase::isChainCanceled() const
1020{
1021 return isCanceled() || d->continuationState == QFutureInterfaceBasePrivate::Canceled;
1022}
1023
1024void QFutureInterfaceBase::setLaunchAsync(bool value)
1025{
1026 d->launchAsync = value;
1027}
1028
1029bool QFutureInterfaceBase::launchAsync() const
1030{
1031 return d->launchAsync;
1032}
1033
1034namespace QtFuture {
1035
1037{
1038 QFutureInterface<void> promise;
1039 promise.reportStarted();
1040 promise.reportFinished();
1041
1042 return promise.future();
1043}
1044
1045} // namespace QtFuture
1046
1047QT_END_NAMESPACE
1048
1049#include "qfutureinterface.moc"
bool internal_updateProgress(int progress, const QString &progressText=QString())
void sendCallOuts(const QFutureCallOutEvent &callOut1, const QFutureCallOutEvent &callOut2)
void sendCallOut(const QFutureCallOutEvent &callOut)
void setState(QFutureInterfaceBase::State state)
bool internal_updateProgressValue(int progress)
bool internal_isResultReadyAt(int index) const
void internal_setThrottled(bool enable)
QFutureInterfaceBasePrivate * continuationData
QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
friend class const_iterator
Definition qfuture.h:260
QFuture< void > makeReadyVoidFuture()
static int switch_from_to(QAtomicInt &a, int from, int to)
static int switch_off(QAtomicInt &a, int which)
static int switch_on(QAtomicInt &a, int which)