Qt
Internal/Contributor docs for the Qt SDK. <b>Note:</b> These are NOT official API docs; those are found <a href='https://doc.qt.io/'>here</a>.
Loading...
Searching...
No Matches
qthreadpool.cpp
Go to the documentation of this file.
1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#include "qthreadpool.h"
5#include "qthreadpool_p.h"
6#include "qdeadlinetimer.h"
7#include "qcoreapplication.h"
8
9#include <QtCore/qpointer.h>
10
11#include <algorithm>
12#include <memory>
13
15
16using namespace Qt::StringLiterals;
17
18/*
19 QThread wrapper, provides synchronization against a ThreadPool
20*/
33
34/*
35 QThreadPool private class.
36*/
37
38
47
48/*
49 \internal
50*/
52{
53 QMutexLocker locker(&manager->mutex);
54 for(;;) {
56 runnable = nullptr;
57
58 do {
59 if (r) {
60 // If autoDelete() is false, r might already be deleted after run(), so check status now.
61 const bool del = r->autoDelete();
62
63 // run the task
64 locker.unlock();
65#ifndef QT_NO_EXCEPTIONS
66 try {
67#endif
68 r->run();
69#ifndef QT_NO_EXCEPTIONS
70 } catch (...) {
71 qWarning("Qt Concurrent has caught an exception thrown from a worker thread.\n"
72 "This is not supported, exceptions thrown in worker threads must be\n"
73 "caught before control returns to Qt Concurrent.");
75 throw;
76 }
77#endif
78
79 if (del)
80 delete r;
81 locker.relock();
82 }
83
84 // if too many threads are active, stop working in this one
86 break;
87
88 // all work is done, time to wait for more
89 if (manager->queue.isEmpty())
90 break;
91
93 r = page->pop();
94
95 if (page->isFinished()) {
97 delete page;
98 }
99 } while (true);
100
101 // this thread is about to be deleted, do not wait or expire
102 if (!manager->allThreads.contains(this)) {
104 return;
105 }
106
107 // if too many threads are active, expire this thread
111 return;
112 }
115 // wait for work, exiting after the expiry timeout is reached
117 // this thread is about to be deleted, do not work or expire
118 if (!manager->allThreads.contains(this)) {
120 return;
121 }
122 if (manager->waitingThreads.removeOne(this)) {
124 return;
125 }
127 }
128}
129
135
136
137/*
138 \internal
139*/
142
144{
145 Q_ASSERT(task != nullptr);
146 if (allThreads.isEmpty()) {
147 // always create at least one thread
149 return true;
150 }
151
152 // can't do anything if we're over the limit
154 return false;
155
156 if (!waitingThreads.isEmpty()) {
157 // recycle an available thread
159 waitingThreads.takeFirst()->runnableReady.wakeOne();
160 return true;
161 }
162
163 if (!expiredThreads.isEmpty()) {
164 // restart an expired thread
166 Q_ASSERT(thread->runnable == nullptr);
167
169
170 thread->runnable = task;
171
172 // Ensure that the thread has actually finished, otherwise the following
173 // start() has no effect.
174 thread->wait();
175 Q_ASSERT(thread->isFinished());
176 thread->start(threadPriority);
177 return true;
178 }
179
180 // start a new thread
182 return true;
183}
184
185inline bool comparePriority(int priority, const QueuePage *p)
186{
187 return p->priority() < priority;
188}
189
190void QThreadPoolPrivate::enqueueTask(QRunnable *runnable, int priority)
191{
192 Q_ASSERT(runnable != nullptr);
193 for (QueuePage *page : std::as_const(queue)) {
194 if (page->priority() == priority && !page->isFull()) {
195 page->push(runnable);
196 return;
197 }
198 }
199 auto it = std::upper_bound(queue.constBegin(), queue.constEnd(), priority, comparePriority);
200 queue.insert(std::distance(queue.constBegin(), it), new QueuePage(runnable, priority));
201}
202
210
212{
213 // try to push tasks on the queue to any available threads
214 while (!queue.isEmpty()) {
216 if (!tryStart(page->first()))
217 break;
218
219 page->pop();
220
221 if (page->isFinished()) {
223 delete page;
224 }
225 }
226}
227
229{
230 const int activeThreadCount = this->activeThreadCount();
231 return activeThreadCount >= maxThreadCount() && (activeThreadCount - reservedThreads) >= 1;
232}
233
235{
236 const int activeThreadCount = this->activeThreadCount();
237 return activeThreadCount > maxThreadCount() && (activeThreadCount - reservedThreads) > 1;
238}
239
244{
245 Q_ASSERT(runnable != nullptr);
246 auto thread = std::make_unique<QThreadPoolThread>(this);
247 if (objectName.isEmpty())
248 objectName = u"Thread (pooled)"_s;
249 thread->setObjectName(objectName);
250 Q_ASSERT(!allThreads.contains(thread.get())); // if this assert hits, we have an ABA problem (deleted threads don't get removed here)
251 allThreads.insert(thread.get());
253
254 thread->runnable = runnable;
255 thread.release()->start(threadPriority);
256}
257
266{
267 // move the contents of the set out so that we can iterate without the lock
268 auto allThreadsCopy = std::exchange(allThreads, {});
271
272 mutex.unlock();
273
274 for (QThreadPoolThread *thread : std::as_const(allThreadsCopy)) {
275 if (thread->isRunning()) {
276 thread->runnableReady.wakeAll();
277 thread->wait();
278 }
279 delete thread;
280 }
281
282 mutex.lock();
283}
284
291{
292 QMutexLocker locker(&mutex);
293 while (!(queue.isEmpty() && activeThreads == 0) && !timer.hasExpired())
295
296 if (!queue.isEmpty() || activeThreads)
297 return false;
298
299 reset();
300 // New jobs might have started during reset, but return anyway
301 // as the active thread and task count did reach 0 once, and
302 // race conditions are outside our scope.
303 return true;
304}
305
307{
308 QMutexLocker locker(&mutex);
309 while (!queue.isEmpty()) {
310 auto *page = queue.takeLast();
311 while (!page->isFinished()) {
312 QRunnable *r = page->pop();
313 if (r && r->autoDelete()) {
314 locker.unlock();
315 delete r;
316 locker.relock();
317 }
318 }
319 delete page;
320 }
321}
322
341{
342 Q_D(QThreadPool);
343
344 if (runnable == nullptr)
345 return false;
346
347 QMutexLocker locker(&d->mutex);
348 for (QueuePage *page : std::as_const(d->queue)) {
349 if (page->tryTake(runnable)) {
350 if (page->isFinished()) {
351 d->queue.removeOne(page);
352 delete page;
353 }
354 return true;
355 }
356 }
357
358 return false;
359}
360
368{
369 Q_Q(QThreadPool);
370 if (!q->tryTake(runnable))
371 return;
372 // If autoDelete() is false, runnable might already be deleted after run(), so check status now.
373 const bool del = runnable->autoDelete();
374
375 runnable->run();
376
377 if (del)
378 delete runnable;
379}
380
438 : QObject(*new QThreadPoolPrivate, parent)
439{
440 Q_D(QThreadPool);
441 connect(this, &QObject::objectNameChanged, this, [d](const QString &newName) {
442 // We keep a copy of the name under our own lock, so we can access it thread-safely.
443 QMutexLocker locker(&d->mutex);
444 d->objectName = newName;
445 });
446}
447
453{
454 Q_D(QThreadPool);
455 waitForDone();
456 Q_ASSERT(d->queue.isEmpty());
457 Q_ASSERT(d->allThreads.isEmpty());
458}
459
464{
465 Q_CONSTINIT static QPointer<QThreadPool> theInstance;
466 Q_CONSTINIT static QBasicMutex theMutex;
467
468 const QMutexLocker locker(&theMutex);
469 if (theInstance.isNull() && !QCoreApplication::closingDown())
470 theInstance = new QThreadPool();
471 return theInstance;
472}
473
479{
480 Q_CONSTINIT static QPointer<QThreadPool> guiInstance;
481 Q_CONSTINIT static QBasicMutex theMutex;
482
483 const QMutexLocker locker(&theMutex);
484 if (guiInstance.isNull() && !QCoreApplication::closingDown())
485 guiInstance = new QThreadPool();
486 return guiInstance;
487}
488
504void QThreadPool::start(QRunnable *runnable, int priority)
505{
506 if (!runnable)
507 return;
508
509 Q_D(QThreadPool);
510 QMutexLocker locker(&d->mutex);
511
512 if (!d->tryStart(runnable))
513 d->enqueueTask(runnable, priority);
514}
515
550{
551 if (!runnable)
552 return false;
553
554 Q_D(QThreadPool);
555 QMutexLocker locker(&d->mutex);
556 if (d->tryStart(runnable))
557 return true;
558
559 return false;
560}
561
595{
596 using namespace std::chrono;
597 Q_D(const QThreadPool);
598 QMutexLocker locker(&d->mutex);
599 return duration_cast<milliseconds>(d->expiryTimeout).count();
600}
601
602void QThreadPool::setExpiryTimeout(int expiryTimeout)
603{
604 Q_D(QThreadPool);
605 QMutexLocker locker(&d->mutex);
606 d->expiryTimeout = std::chrono::milliseconds(expiryTimeout);
607}
608
622{
623 Q_D(const QThreadPool);
624 QMutexLocker locker(&d->mutex);
625 return d->requestedMaxThreadCount;
626}
627
628void QThreadPool::setMaxThreadCount(int maxThreadCount)
629{
630 Q_D(QThreadPool);
631 QMutexLocker locker(&d->mutex);
632
633 if (maxThreadCount == d->requestedMaxThreadCount)
634 return;
635
636 d->requestedMaxThreadCount = maxThreadCount;
637 d->tryToStartMoreThreads();
638}
639
651{
652 Q_D(const QThreadPool);
653 QMutexLocker locker(&d->mutex);
654 return d->activeThreadCount();
655}
656
673{
674 Q_D(QThreadPool);
675 QMutexLocker locker(&d->mutex);
676 ++d->reservedThreads;
677}
678
692{
693 Q_D(QThreadPool);
694 QMutexLocker locker(&d->mutex);
695 d->stackSize = stackSize;
696}
697
699{
700 Q_D(const QThreadPool);
701 QMutexLocker locker(&d->mutex);
702 return d->stackSize;
703}
704
720{
721 Q_D(QThreadPool);
722 QMutexLocker locker(&d->mutex);
723 d->threadPriority = priority;
724}
725
727{
728 Q_D(const QThreadPool);
729 QMutexLocker locker(&d->mutex);
730 return d->threadPriority;
731}
732
746{
747 Q_D(QThreadPool);
748 QMutexLocker locker(&d->mutex);
749 --d->reservedThreads;
750 d->tryToStartMoreThreads();
751}
752
773{
774 if (!runnable)
775 return releaseThread();
776
777 Q_D(QThreadPool);
778 QMutexLocker locker(&d->mutex);
779 Q_ASSERT(d->reservedThreads > 0);
780 --d->reservedThreads;
781
782 if (!d->tryStart(runnable)) {
783 // This can only happen if we reserved max threads,
784 // and something took the one minimum thread.
785 d->enqueueTask(runnable, INT_MAX);
786 }
787}
788
820{
821 Q_D(QThreadPool);
822 return d->waitForDone(deadline);
823}
824
835{
836 Q_D(QThreadPool);
837 d->clear();
838}
839
845bool QThreadPool::contains(const QThread *thread) const
846{
847 Q_D(const QThreadPool);
848 const QThreadPoolThread *poolThread = qobject_cast<const QThreadPoolThread *>(thread);
849 if (!poolThread)
850 return false;
851 QMutexLocker locker(&d->mutex);
852 return d->allThreads.contains(const_cast<QThreadPoolThread *>(poolThread));
853}
854
856
857#include "moc_qthreadpool.cpp"
858#include "qthreadpool.moc"
QByteArray first(qsizetype n) const &
Definition qbytearray.h:196
static bool closingDown()
Returns true if the application objects are being destroyed; otherwise returns false.
\inmodule QtCore
qsizetype size() const noexcept
Definition qlist.h:397
void removeFirst() noexcept
Definition qlist.h:807
bool isEmpty() const noexcept
Definition qlist.h:401
iterator insert(qsizetype i, parameter_type t)
Definition qlist.h:488
bool removeOne(const AT &t)
Definition qlist.h:598
value_type takeFirst()
Definition qlist.h:566
const_iterator constBegin() const noexcept
Definition qlist.h:632
value_type takeLast()
Definition qlist.h:567
const T & constFirst() const noexcept
Definition qlist.h:647
const_iterator constEnd() const noexcept
Definition qlist.h:633
void clear()
Definition qlist.h:434
\inmodule QtCore
Definition qmutex.h:313
void unlock() noexcept
Unlocks this mutex locker.
Definition qmutex.h:319
void relock() noexcept
Relocks an unlocked mutex locker.
Definition qmutex.h:320
Mutex * mutex() const noexcept
Returns the mutex on which the QMutexLocker is operating.
Definition qmutex.h:321
\inmodule QtCore
Definition qmutex.h:281
void unlock() noexcept
Unlocks the mutex.
Definition qmutex.h:289
void lock() noexcept
Locks the mutex.
Definition qmutex.h:286
\inmodule QtCore
Definition qobject.h:103
static QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *member, Qt::ConnectionType=Qt::AutoConnection)
\threadsafe
Definition qobject.cpp:2960
QThread * thread() const
Returns the thread in which the object lives.
Definition qobject.cpp:1598
void objectNameChanged(const QString &objectName, QPrivateSignal)
This signal is emitted after the object's name has been changed.
void enqueue(const T &t)
Adds value t to the tail of the queue.
Definition qqueue.h:18
T dequeue()
Removes the head item in the queue and returns it.
Definition qqueue.h:19
\inmodule QtCore
Definition qrunnable.h:18
bool autoDelete() const
Returns true is auto-deletion is enabled; false otherwise.
Definition qrunnable.h:37
virtual void run()=0
Implement this pure virtual function in your subclass.
qsizetype size() const
Definition qset.h:50
bool isEmpty() const
Definition qset.h:52
bool contains(const T &value) const
Definition qset.h:71
iterator insert(const T &value)
Definition qset.h:155
\macro QT_RESTRICTED_CAST_FROM_ASCII
Definition qstring.h:129
bool isEmpty() const noexcept
Returns true if the string has no characters; otherwise returns false.
Definition qstring.h:192
bool areAllThreadsActive() const
bool tryStart(QRunnable *task)
void enqueueTask(QRunnable *task, int priority=0)
QList< QueuePage * > queue
std::chrono::duration< int, std::milli > expiryTimeout
QThread::Priority threadPriority
QWaitCondition noActiveThreads
QQueue< QThreadPoolThread * > expiredThreads
int activeThreadCount() const
int maxThreadCount() const
QSet< QThreadPoolThread * > allThreads
void startThread(QRunnable *runnable=nullptr)
void stealAndRunRunnable(QRunnable *runnable)
QQueue< QThreadPoolThread * > waitingThreads
static QThreadPool * qtGuiInstance()
Returns the QThreadPool instance for Qt Gui.
bool waitForDone(const QDeadlineTimer &timer)
bool tooManyThreadsActive() const
QThreadPoolThread(QThreadPoolPrivate *manager)
void run() override
QRunnable * runnable
QThreadPoolPrivate * manager
void registerThreadInactive()
QWaitCondition runnableReady
\inmodule QtCore
Definition qthreadpool.h:22
void reserveThread()
Reserves one thread, disregarding activeThreadCount() and maxThreadCount().
void start(QRunnable *runnable, int priority=0)
Reserves a thread and uses it to run runnable, unless this thread will make the current thread count ...
static QThreadPool * globalInstance()
Returns the global QThreadPool instance.
void setExpiryTimeout(int expiryTimeout)
int activeThreadCount
the number of active threads in the thread pool.
Definition qthreadpool.h:27
void setMaxThreadCount(int maxThreadCount)
void startOnReservedThread(QRunnable *runnable)
Releases a thread previously reserved with reserveThread() and uses it to run runnable.
bool tryTake(QRunnable *runnable)
void setThreadPriority(QThread::Priority priority)
QThreadPool(QObject *parent=nullptr)
Constructs a thread pool with the given parent.
int maxThreadCount
the maximum number of threads used by the thread pool.
Definition qthreadpool.h:26
uint stackSize
the stack size for the thread pool worker threads.
Definition qthreadpool.h:28
void setStackSize(uint stackSize)
bool waitForDone(int msecs)
Waits up to msecs milliseconds for all threads to exit and removes all threads from the thread pool.
~QThreadPool()
Destroys the QThreadPool.
bool contains(const QThread *thread) const
bool tryStart(QRunnable *runnable)
Attempts to reserve a thread to run runnable.
void releaseThread()
Releases a thread previously reserved by a call to reserveThread().
int expiryTimeout
the thread expiry timeout value in milliseconds.
Definition qthreadpool.h:25
QThread::Priority threadPriority
the thread priority for new worker threads.
Definition qthreadpool.h:29
void start(Priority=InheritPriority)
Definition qthread.cpp:996
bool isFinished() const
Definition qthread.cpp:1059
bool wait(QDeadlineTimer deadline=QDeadlineTimer(QDeadlineTimer::Forever))
Definition qthread.cpp:1023
void setStackSize(uint stackSize)
Definition qthread.cpp:1130
bool wait(QMutex *, QDeadlineTimer=QDeadlineTimer(QDeadlineTimer::Forever))
QSet< QString >::iterator it
Combined button and popup list for selecting options.
#define qWarning
Definition qlogging.h:166
GLboolean r
[2]
GLdouble GLdouble GLdouble GLdouble q
Definition qopenglext.h:259
GLfloat GLfloat p
[1]
#define Q_ASSERT(cond)
Definition qrandom.cpp:47
bool comparePriority(int priority, const QueuePage *p)
#define Q_OBJECT
unsigned int uint
Definition qtypes.h:34
QDeadlineTimer deadline(30s)
QObject::connect nullptr
QByteArray page
[45]
QTimer * timer
[3]
QNetworkAccessManager manager