Qt
Internal/Contributor docs for the Qt SDK. Note: These are NOT official API docs; those are found at https://doc.qt.io/
Loading...
Searching...
No Matches
qtconcurrentiteratekernel.h
Go to the documentation of this file.
1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#ifndef QTCONCURRENT_ITERATEKERNEL_H
5#define QTCONCURRENT_ITERATEKERNEL_H
6
7#include <QtConcurrent/qtconcurrent_global.h>
8
9#if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
10
11#include <QtCore/qatomic.h>
12#include <QtConcurrent/qtconcurrentmedian.h>
13#include <QtConcurrent/qtconcurrentthreadengine.h>
14
15#include <iterator>
16
17QT_BEGIN_NAMESPACE
18
19
20
21namespace QtConcurrent {
22
23/*
24 The BlockSizeManager class manages how many iterations a thread should
25 reserve and process at a time. This is done by measuring the time spent
26 in the user code versus the control part code, and then increasing
27 the block size if the ratio between them is to small. The block size
28 management is done on the basis of the median of several timing measurements,
29 and it is done individually for each thread.
30*/
31class Q_CONCURRENT_EXPORT BlockSizeManager
32{
33public:
35
36 void timeBeforeUser();
37 void timeAfterUser();
38 int blockSize();
39
40private:
41 inline bool blockSizeMaxed()
42 {
43 return (m_blockSize >= maxBlockSize);
44 }
45
46 const int maxBlockSize;
51 int m_blockSize;
52
54};
55
56template <typename T>
58{
59public:
60 ResultReporter(ThreadEngine<T> *_threadEngine, T &_defaultValue)
61 : threadEngine(_threadEngine), defaultValue(_defaultValue)
62 {
63 }
64
70
72 {
73 const int useVectorThreshold = 4; // Tunable parameter.
77 } else {
78 for (int i = 0; i < currentResultCount; ++i)
80 }
81 }
82
83 inline T * getPointer()
84 {
85 return vector.data();
86 }
87
91
92private:
93 void resizeList(qsizetype size)
94 {
95 if constexpr (std::is_default_constructible_v<T>)
96 vector.resize(size);
97 else
98 vector.resize(size, defaultValue);
99 }
100
102};
103
104template <>
105class ResultReporter<void>
106{
107public:
108 inline ResultReporter(ThreadEngine<void> *) { }
109 inline void reserveSpace(int) { }
110 inline void reportResults(int) { }
111 inline void * getPointer() { return nullptr; }
112};
113
114template<typename T>
116{
117 template<typename U = T>
118 DefaultValueContainer(U &&_value) : value(std::forward<U>(_value))
119 {
120 }
121
123};
124
125template<>
127{
128};
129
130inline bool selectIteration(std::bidirectional_iterator_tag)
131{
132 return false; // while
133}
134
135inline bool selectIteration(std::forward_iterator_tag)
136{
137 return false; // while
138}
139
140inline bool selectIteration(std::random_access_iterator_tag)
141{
142 return true; // for
143}
144
145template <typename Iterator, typename T>
147{
148 using IteratorCategory = typename std::iterator_traits<Iterator>::iterator_category;
149
150public:
151 typedef T ResultType;
152
153 template<typename U = T, std::enable_if_t<std::is_same_v<U, void>, bool> = true>
154 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
155 : ThreadEngine<U>(pool),
156 begin(_begin),
157 end(_end),
158 current(_begin),
159 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
160 forIteration(selectIteration(IteratorCategory())),
162 {
163 }
164
165 template<typename U = T, std::enable_if_t<!std::is_same_v<U, void>, bool> = true>
166 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
167 : ThreadEngine<U>(pool),
168 begin(_begin),
169 end(_end),
170 current(_begin),
171 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
172 forIteration(selectIteration(IteratorCategory())),
174 defaultValue(U())
175 {
176 }
177
178 template<typename U = T, std::enable_if_t<!std::is_same_v<U, void>, bool> = true>
179 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end, U &&_defaultValue)
180 : ThreadEngine<U>(pool),
181 begin(_begin),
182 end(_end),
183 current(_begin),
184 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
185 forIteration(selectIteration(IteratorCategory())),
187 defaultValue(std::forward<U>(_defaultValue))
188 {
189 }
190
191 virtual ~IterateKernel() { }
192
193 virtual bool runIteration(Iterator, int , T *) { return false; }
194 virtual bool runIterations(Iterator, int, int, T *) { return false; }
195
197 {
198 progressReportingEnabled = this->isProgressReportingEnabled();
200 this->setProgressRange(0, iterationCount);
201 }
202
204 {
205 if (forIteration)
206 return (currentIndex.loadRelaxed() < iterationCount) && !this->shouldThrottleThread();
207 else // whileIteration
208 return (iteratorThreads.loadRelaxed() == 0);
209 }
210
212 {
213 if (forIteration)
214 return this->forThreadFunction();
215 else // whileIteration
216 return this->whileThreadFunction();
217 }
218
220 {
221 BlockSizeManager blockSizeManager(ThreadEngineBase::threadPool, iterationCount);
222 ResultReporter<T> resultReporter = createResultsReporter();
223
224 for(;;) {
225 if (this->isCanceled())
226 break;
227
228 const int currentBlockSize = blockSizeManager.blockSize();
229
230 if (currentIndex.loadRelaxed() >= iterationCount)
231 break;
232
233 // Atomically reserve a block of iterationCount for this thread.
234 const int beginIndex = currentIndex.fetchAndAddRelease(currentBlockSize);
235 const int endIndex = qMin(beginIndex + currentBlockSize, iterationCount);
236
237 if (beginIndex >= endIndex) {
238 // No more work
239 break;
240 }
241
242 this->waitForResume(); // (only waits if the qfuture is paused.)
243
245 this->startThread();
246
247 const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range
248 resultReporter.reserveSpace(finalBlockSize);
249
250 // Call user code with the current iteration range.
251 blockSizeManager.timeBeforeUser();
252 const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
253 blockSizeManager.timeAfterUser();
254
255 if (resultsAvailable)
256 resultReporter.reportResults(beginIndex);
257
258 // Report progress if progress reporting enabled.
260 completed.fetchAndAddAcquire(finalBlockSize);
261 this->setProgressValue(this->completed.loadRelaxed());
262 }
263
264 if (this->shouldThrottleThread())
265 return ThrottleThread;
266 }
267 return ThreadFinished;
268 }
269
271 {
272 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
273 return ThreadFinished;
274
275 ResultReporter<T> resultReporter = createResultsReporter();
276 resultReporter.reserveSpace(1);
277
278 while (current != end) {
279 // The following two lines breaks support for input iterators according to
280 // the sgi docs: dereferencing prev after calling ++current is not allowed
281 // on input iterators. (prev is dereferenced inside user.runIteration())
282 Iterator prev = current;
283 ++current;
284 int index = currentIndex.fetchAndAddRelaxed(1);
285 iteratorThreads.testAndSetRelease(1, 0);
286
287 this->waitForResume(); // (only waits if the qfuture is paused.)
288
290 this->startThread();
291
292 const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer());
293 if (resultAavailable)
294 resultReporter.reportResults(index);
295
296 if (this->shouldThrottleThread())
297 return ThrottleThread;
298
299 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
300 return ThreadFinished;
301 }
302
303 return ThreadFinished;
304 }
305
306private:
307 ResultReporter<T> createResultsReporter()
308 {
309 if constexpr (!std::is_same_v<T, void>)
310 return ResultReporter<T>(this, defaultValue.value);
311 else
312 return ResultReporter<T>(this);
313 }
314
315public:
316 const Iterator begin;
317 const Iterator end;
318 Iterator current;
322 const int iterationCount;
323 const bool forIteration;
326};
327
328} // namespace QtConcurrent
329
330
331QT_END_NAMESPACE
332
333#endif // QT_NO_CONCURRENT
334
335#endif
ThreadFunctionResult whileThreadFunction()
virtual bool runIterations(Iterator, int, int, T *)
virtual bool runIteration(Iterator, int, T *)
IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end, U &&_defaultValue)
IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
ThreadFunctionResult threadFunction() override
DefaultValueContainer< ResultType > defaultValue
\inmodule QtConcurrent
bool selectIteration(std::bidirectional_iterator_tag)
bool selectIteration(std::random_access_iterator_tag)
bool selectIteration(std::forward_iterator_tag)
static qint64 getticks()
static double elapsed(qint64 after, qint64 before)