Qt
Internal/Contributor docs for the Qt SDK. Note: These are NOT official API docs; those are found at https://doc.qt.io/
Loading...
Searching...
No Matches
qtconcurrentiteratekernel.h
Go to the documentation of this file.
1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3// Qt-Security score:significant reason:default
4
5#ifndef QTCONCURRENT_ITERATEKERNEL_H
6#define QTCONCURRENT_ITERATEKERNEL_H
7
8#include <QtConcurrent/qtconcurrent_global.h>
9
10#if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
11
12#include <QtCore/qatomic.h>
13#include <QtConcurrent/qtconcurrentmedian.h>
14#include <QtConcurrent/qtconcurrentthreadengine.h>
15
16#include <iterator>
17
18QT_BEGIN_NAMESPACE
19
20
21
22namespace QtConcurrent {
23
24/*
25 The BlockSizeManager class manages how many iterations a thread should
26 reserve and process at a time. This is done by measuring the time spent
27 in the user code versus the control part code, and then increasing
28 the block size if the ratio between them is to small. The block size
29 management is done on the basis of the median of several timing measurements,
30 and it is done individually for each thread.
31*/
32class Q_CONCURRENT_EXPORT BlockSizeManager
33{
34public:
36
37 void timeBeforeUser();
38 void timeAfterUser();
39 int blockSize();
40
41private:
42 inline bool blockSizeMaxed()
43 {
44 return (m_blockSize >= maxBlockSize);
45 }
46
47 const int maxBlockSize;
52 int m_blockSize;
53
55};
56
57template <typename T>
59{
60public:
61 ResultReporter(ThreadEngine<T> *_threadEngine, T &_defaultValue)
62 : threadEngine(_threadEngine), defaultValue(_defaultValue)
63 {
64 }
65
71
73 {
74 const int useVectorThreshold = 4; // Tunable parameter.
78 } else {
79 for (int i = 0; i < currentResultCount; ++i)
81 }
82 }
83
84 inline T * getPointer()
85 {
86 return vector.data();
87 }
88
92
93private:
94 void resizeList(qsizetype size)
95 {
96 if constexpr (std::is_default_constructible_v<T>)
97 vector.resize(size);
98 else
99 vector.resize(size, defaultValue);
100 }
101
103};
104
105template <>
106class ResultReporter<void>
107{
108public:
109 inline ResultReporter(ThreadEngine<void> *) { }
110 inline void reserveSpace(int) { }
111 inline void reportResults(int) { }
112 inline void * getPointer() { return nullptr; }
113};
114
115template<typename T>
117{
118 template<typename U = T>
119 DefaultValueContainer(U &&_value) : value(std::forward<U>(_value))
120 {
121 }
122
124};
125
126template<>
128{
129};
130
131inline bool selectIteration(std::bidirectional_iterator_tag)
132{
133 return false; // while
134}
135
136inline bool selectIteration(std::forward_iterator_tag)
137{
138 return false; // while
139}
140
141inline bool selectIteration(std::random_access_iterator_tag)
142{
143 return true; // for
144}
145
146template <typename Iterator, typename T>
148{
149 using IteratorCategory = typename std::iterator_traits<Iterator>::iterator_category;
150
151public:
152 typedef T ResultType;
153
154 template<typename U = T, std::enable_if_t<std::is_same_v<U, void>, bool> = true>
155 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
156 : ThreadEngine<U>(pool),
157 begin(_begin),
158 end(_end),
159 current(_begin),
160 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
161 forIteration(selectIteration(IteratorCategory())),
163 {
164 }
165
166 template<typename U = T, std::enable_if_t<!std::is_same_v<U, void>, bool> = true>
167 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
168 : ThreadEngine<U>(pool),
169 begin(_begin),
170 end(_end),
171 current(_begin),
172 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
173 forIteration(selectIteration(IteratorCategory())),
175 defaultValue(U())
176 {
177 }
178
179 template<typename U = T, std::enable_if_t<!std::is_same_v<U, void>, bool> = true>
180 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end, U &&_defaultValue)
181 : ThreadEngine<U>(pool),
182 begin(_begin),
183 end(_end),
184 current(_begin),
185 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
186 forIteration(selectIteration(IteratorCategory())),
188 defaultValue(std::forward<U>(_defaultValue))
189 {
190 }
191
192 virtual ~IterateKernel() { }
193
194 virtual bool runIteration(Iterator, int , T *) { return false; }
195 virtual bool runIterations(Iterator, int, int, T *) { return false; }
196
198 {
199 progressReportingEnabled = this->isProgressReportingEnabled();
201 this->setProgressRange(0, iterationCount);
202 }
203
205 {
206 if (forIteration)
207 return (currentIndex.loadRelaxed() < iterationCount) && !this->shouldThrottleThread();
208 else // whileIteration
209 return (iteratorThreads.loadRelaxed() == 0);
210 }
211
213 {
214 if (forIteration)
215 return this->forThreadFunction();
216 else // whileIteration
217 return this->whileThreadFunction();
218 }
219
221 {
222 BlockSizeManager blockSizeManager(ThreadEngineBase::threadPool, iterationCount);
223 ResultReporter<T> resultReporter = createResultsReporter();
224
225 for(;;) {
226 if (this->isCanceled())
227 break;
228
229 const int currentBlockSize = blockSizeManager.blockSize();
230
231 if (currentIndex.loadRelaxed() >= iterationCount)
232 break;
233
234 // Atomically reserve a block of iterationCount for this thread.
235 const int beginIndex = currentIndex.fetchAndAddRelease(currentBlockSize);
236 const int endIndex = qMin(beginIndex + currentBlockSize, iterationCount);
237
238 if (beginIndex >= endIndex) {
239 // No more work
240 break;
241 }
242
243 this->waitForResume(); // (only waits if the qfuture is paused.)
244
246 this->startThread();
247
248 const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range
249 resultReporter.reserveSpace(finalBlockSize);
250
251 // Call user code with the current iteration range.
252 blockSizeManager.timeBeforeUser();
253 const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
254 blockSizeManager.timeAfterUser();
255
256 if (resultsAvailable)
257 resultReporter.reportResults(beginIndex);
258
259 // Report progress if progress reporting enabled.
261 completed.fetchAndAddAcquire(finalBlockSize);
262 this->setProgressValue(this->completed.loadRelaxed());
263 }
264
265 if (this->shouldThrottleThread())
266 return ThrottleThread;
267 }
268 return ThreadFinished;
269 }
270
272 {
273 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
274 return ThreadFinished;
275
276 ResultReporter<T> resultReporter = createResultsReporter();
277 resultReporter.reserveSpace(1);
278
279 while (current != end) {
280 // The following two lines breaks support for input iterators according to
281 // the sgi docs: dereferencing prev after calling ++current is not allowed
282 // on input iterators. (prev is dereferenced inside user.runIteration())
283 Iterator prev = current;
284 ++current;
285 int index = currentIndex.fetchAndAddRelaxed(1);
286 iteratorThreads.testAndSetRelease(1, 0);
287
288 this->waitForResume(); // (only waits if the qfuture is paused.)
289
291 this->startThread();
292
293 const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer());
294 if (resultAavailable)
295 resultReporter.reportResults(index);
296
297 if (this->shouldThrottleThread())
298 return ThrottleThread;
299
300 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
301 return ThreadFinished;
302 }
303
304 return ThreadFinished;
305 }
306
307private:
308 ResultReporter<T> createResultsReporter()
309 {
310 if constexpr (!std::is_same_v<T, void>)
311 return ResultReporter<T>(this, defaultValue.value);
312 else
313 return ResultReporter<T>(this);
314 }
315
316public:
317 const Iterator begin;
318 const Iterator end;
319 Iterator current;
323 const int iterationCount;
324 const bool forIteration;
327};
328
329} // namespace QtConcurrent
330
331
332QT_END_NAMESPACE
333
334#endif // QT_NO_CONCURRENT
335
336#endif
ThreadFunctionResult whileThreadFunction()
virtual bool runIterations(Iterator, int, int, T *)
virtual bool runIteration(Iterator, int, T *)
IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end, U &&_defaultValue)
IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
ThreadFunctionResult threadFunction() override
DefaultValueContainer< ResultType > defaultValue
\inmodule QtConcurrent
bool selectIteration(std::bidirectional_iterator_tag)
bool selectIteration(std::random_access_iterator_tag)
bool selectIteration(std::forward_iterator_tag)
static qint64 getticks()
static double elapsed(qint64 after, qint64 before)