Qt
Internal/Contributor docs for the Qt SDK. <b>Note:</b> These are NOT official API docs; those are found <a href='https://doc.qt.io/'>here</a>.
Loading...
Searching...
No Matches
qtconcurrentreducekernel.h
Go to the documentation of this file.
1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#ifndef QTCONCURRENT_REDUCEKERNEL_H
5#define QTCONCURRENT_REDUCEKERNEL_H
6
7#include <QtConcurrent/qtconcurrent_global.h>
8
9#if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
10
11#include <QtCore/qatomic.h>
12#include <QtCore/qlist.h>
13#include <QtCore/qmap.h>
14#include <QtCore/qmutex.h>
15#include <QtCore/qthread.h>
16#include <QtCore/qthreadpool.h>
17
18#include <mutex>
19
21
22namespace QtPrivate {
23
24template<typename Sequence>
26{
27 SequenceHolder(const Sequence &s) : sequence(s) { }
28 SequenceHolder(Sequence &&s) : sequence(std::move(s)) { }
29 Sequence sequence;
30};
31
32}
33
34namespace QtConcurrent {
35
36/*
37 The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
38 limit the reduce queue size for MapReduce. When the number of
39 reduce blocks in the queue exceeds ReduceQueueStartLimit,
40 MapReduce won't start any new threads, and when it exceeds
41 ReduceQueueThrottleLimit running threads will be stopped.
42*/
43#ifdef Q_QDOC
44enum ReduceQueueLimits {
47};
48#else
49enum {
52};
53#endif
54
55// IntermediateResults holds a block of intermediate results from a
56// map or filter functor. The begin/end offsets indicates the origin
57// and range of the block.
58template <typename T>
60{
61public:
62 int begin, end;
63 QList<T> vector;
64};
65
70 // ParallelReduce = 0x8
71};
72Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
73#ifndef Q_QDOC
75#endif
76// supports both ordered and out-of-order reduction
77template <typename ReduceFunctor, typename ReduceResultType, typename T>
79{
80 typedef QMap<int, IntermediateResults<T> > ResultsMap;
81
82 const ReduceOptions reduceOptions;
83
84 QMutex mutex;
85 int progress, resultsMapSize;
86 const int threadCount;
87 ResultsMap resultsMap;
88
89 bool canReduce(int begin) const
90 {
91 return (((reduceOptions & UnorderedReduce)
92 && progress == 0)
93 || ((reduceOptions & OrderedReduce)
94 && progress == begin));
95 }
96
97 void reduceResult(ReduceFunctor &reduce,
98 ReduceResultType &r,
99 const IntermediateResults<T> &result)
100 {
101 for (int i = 0; i < result.vector.size(); ++i) {
102 std::invoke(reduce, r, result.vector.at(i));
103 }
104 }
105
106 void reduceResults(ReduceFunctor &reduce,
107 ReduceResultType &r,
109 {
110 typename ResultsMap::iterator it = map.begin();
111 while (it != map.end()) {
112 reduceResult(reduce, r, it.value());
113 ++it;
114 }
115 }
116
117public:
118 ReduceKernel(QThreadPool *pool, ReduceOptions _reduceOptions)
119 : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
120 threadCount(std::max(pool->maxThreadCount(), 1))
121 { }
122
123 void runReduce(ReduceFunctor &reduce,
124 ReduceResultType &r,
125 const IntermediateResults<T> &result)
126 {
127 std::unique_lock<QMutex> locker(mutex);
128 if (!canReduce(result.begin)) {
129 ++resultsMapSize;
130 resultsMap.insert(result.begin, result);
131 return;
132 }
133
134 if (reduceOptions & UnorderedReduce) {
135 // UnorderedReduce
136 progress = -1;
137
138 // reduce this result
139 locker.unlock();
140 reduceResult(reduce, r, result);
141 locker.lock();
142
143 // reduce all stored results as well
144 while (!resultsMap.isEmpty()) {
145 ResultsMap resultsMapCopy = resultsMap;
146 resultsMap.clear();
147
148 locker.unlock();
149 reduceResults(reduce, r, resultsMapCopy);
150 locker.lock();
151
152 resultsMapSize -= resultsMapCopy.size();
153 }
154
155 progress = 0;
156 } else {
157 // reduce this result
158 locker.unlock();
159 reduceResult(reduce, r, result);
160 locker.lock();
161
162 // OrderedReduce
163 progress += result.end - result.begin;
164
165 // reduce as many other results as possible
166 typename ResultsMap::iterator it = resultsMap.begin();
167 while (it != resultsMap.end()) {
168 if (it.value().begin != progress)
169 break;
170
171 locker.unlock();
172 reduceResult(reduce, r, it.value());
173 locker.lock();
174
175 --resultsMapSize;
176 progress += it.value().end - it.value().begin;
177 it = resultsMap.erase(it);
178 }
179 }
180 }
181
182 // final reduction
183 void finish(ReduceFunctor &reduce, ReduceResultType &r)
184 {
185 reduceResults(reduce, r, resultsMap);
186 }
187
188 inline bool shouldThrottle()
189 {
190 std::lock_guard<QMutex> locker(mutex);
191 return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
192 }
193
194 inline bool shouldStartThread()
195 {
196 std::lock_guard<QMutex> locker(mutex);
197 return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
198 }
199};
200
201template <typename Sequence, typename Base, typename Functor1, typename Functor2>
202struct SequenceHolder2 : private QtPrivate::SequenceHolder<Sequence>, public Base
203{
204 template<typename S = Sequence, typename F1 = Functor1, typename F2 = Functor2>
205 SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2,
206 ReduceOptions reduceOptions)
207 : QtPrivate::SequenceHolder<Sequence>(std::forward<S>(_sequence)),
208 Base(pool, this->sequence.cbegin(), this->sequence.cend(),
209 std::forward<F1>(functor1), std::forward<F2>(functor2), reduceOptions)
210 { }
211
212 template<typename InitialValueType, typename S = Sequence,
213 typename F1 = Functor1, typename F2 = Functor2>
214 SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2,
215 InitialValueType &&initialValue, ReduceOptions reduceOptions)
216 : QtPrivate::SequenceHolder<Sequence>(std::forward<S>(_sequence)),
217 Base(pool, this->sequence.cbegin(), this->sequence.cend(),
218 std::forward<F1>(functor1), std::forward<F2>(functor2),
219 std::forward<InitialValueType>(initialValue), reduceOptions)
220 { }
221
222 void finish() override
223 {
224 Base::finish();
225 // Clear the sequence to make sure all temporaries are destroyed
226 // before finished is signaled.
227 this->sequence = Sequence();
228 }
229};
230
231} // namespace QtConcurrent
232
234
235#endif // QT_NO_CONCURRENT
236
237#endif
iterator insert(const Key &key, const T &value)
Definition qmap.h:688
iterator erase(const_iterator it)
Definition qmap.h:619
void clear()
Definition qmap.h:289
bool isEmpty() const
Definition qmap.h:269
iterator begin()
Definition qmap.h:598
iterator end()
Definition qmap.h:602
\inmodule QtCore
Definition qmutex.h:281
iterator begin()
Definition qset.h:136
iterator end()
Definition qset.h:140
\inmodule QtCore
Definition qthreadpool.h:22
ReduceKernel(QThreadPool *pool, ReduceOptions _reduceOptions)
void finish(ReduceFunctor &reduce, ReduceResultType &r)
void runReduce(ReduceFunctor &reduce, ReduceResultType &r, const IntermediateResults< T > &result)
#define this
Definition dialogs.cpp:9
QMap< QString, QString > map
[6]
QSet< QString >::iterator it
Combined button and popup list for selecting options.
\inmodule QtConcurrent
ReduceOption
This enum specifies the order of which results from the map or filter function are passed to the redu...
\macro QT_NO_KEYWORDS >
#define Q_DECLARE_FLAGS(Flags, Enum)
Definition qflags.h:174
#define Q_DECLARE_OPERATORS_FOR_FLAGS(Flags)
Definition qflags.h:194
GLboolean r
[2]
GLdouble s
[6]
Definition qopenglext.h:235
GLuint64EXT * result
[6]
QtPrivate::QRegularExpressionMatchIteratorRangeBasedForIterator begin(const QRegularExpressionMatchIterator &iterator)
QMutex mutex
[2]
SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2, ReduceOptions reduceOptions)
SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2, InitialValueType &&initialValue, ReduceOptions reduceOptions)