6#include <QtMultimedia/private/qmultimedia_ranges_p.h>
7#include <QtMultimedia/private/qpipewire_audiocontextmanager_p.h>
8#include <QtMultimedia/private/qpipewire_audiodevice_p.h>
9#include <QtMultimedia/private/qpipewire_registry_support_p.h>
10#include <QtCore/private/qcoreapplication_p.h>
11#include <QtCore/private/qflatmap_p.h>
12#include <QtCore/private/qthread_p.h>
13#include <QtCore/q20vector.h>
14#include <QtCore/qcoreapplication.h>
15#include <QtCore/qdebug.h>
16#include <QtCore/qloggingcategory.h>
21namespace ranges = QtMultimediaPrivate::ranges;
30 : m_observedSerial(objectSerial)
41 if (!QThread::isMainThread())
43 moveToThread(qApp->thread());
45 constexpr auto compressionTime = std::chrono::milliseconds(50);
47 m_compressionTimer.setTimerType(Qt::TimerType::CoarseTimer);
48 m_compressionTimer.setInterval(compressionTime);
49 m_compressionTimer.setSingleShot(
true);
51 m_compressionTimer.callOnTimeout(
this, [
this] {
52 audioDevicesChanged();
55 m_compressionTimerThread.setObjectName(
"PWDevMon");
56 m_compressionTimerThread.setServiceLevel(QThread::QualityOfService::Eco);
57 m_compressionTimerThread.setStackSize(1024 * 1024);
58 m_compressionTimerThread.start();
59 m_compressionTimerThread.setPriority(QThread::Priority::LowPriority);
60 m_compressionTimer.moveToThread(&m_compressionTimerThread);
64 QMetaObject::invokeMethod(&monitor.m_compressionTimer, [&] {
65 monitor.m_compressionTimer.stop();
66 monitor.m_compressionTimer.moveToThread(qApp->thread());
67 }, Qt::BlockingQueuedConnection);
73 if (m_compressionTimer.thread() != thread()) {
74 QMetaObject::invokeMethod(&m_compressionTimer, [
this] {
75 m_compressionTimer.stop();
76 m_compressionTimer.moveToThread(thread());
77 }, Qt::BlockingQueuedConnection);
80 m_compressionTimerThread.quit();
81 m_compressionTimerThread.wait();
85 PipewireRegistryType objectType, uint32_t ,
86 const spa_dict &propDict)
88 Q_ASSERT(QAudioContextManager::isInPwThreadLoop());
90 Q_ASSERT(objectType == PipewireRegistryType::Device
91 || objectType == PipewireRegistryType::Node);
93 PwPropertyDict props = toPropertyDict(propDict);
94 std::optional<std::string_view> mediaClass = getMediaClass(props);
98 std::optional<ObjectSerial> serial = getObjectSerial(props);
101 QWriteLocker lock{ &m_objectDictMutex };
102 m_objectSerialDict.emplace(id, *serial);
103 m_serialObjectDict.emplace(*serial, id);
106 switch (objectType) {
107 case PipewireRegistryType::Device: {
108 if (mediaClass !=
"Audio/Device")
112 qCDebug(lcPipewireDeviceMonitor)
113 <<
"added device" << *serial << getDeviceDescription(props).value_or(
"");
115 QWriteLocker lock{ &m_mutex };
116 m_devices.emplace(*serial, DeviceRecord{ *serial, std::move(props) });
120 case PipewireRegistryType::Node: {
123 auto addPendingNode = [&](std::list<PendingNodeRecord> &pendingRecords) {
124 std::optional<std::string_view> nodeName = getNodeName(props);
126 qCWarning(lcPipewireDeviceMonitor) <<
"node without name (ignoring):" << props;
130 if (nodeName ==
"auto_null") {
133 qCWarning(lcPipewireDeviceMonitor) <<
"Ignoring dummy output:" << props;
138 std::optional<ObjectId> deviceId = getDeviceId(props);
139 std::optional<ObjectSerial> deviceSerial =
140 deviceId ? findObjectSerial(*deviceId) : std::nullopt;
142 if (deviceId && !deviceSerial) {
143 qCInfo(lcPipewireDeviceMonitor) <<
"Cannot add node: device removed";
147 std::lock_guard guard{ m_pendingRecordsMutex };
149 qCDebug(lcPipewireDeviceMonitor) <<
"added node for device" << serial << deviceSerial;
153 pendingRecords.emplace_back(id, *serial, deviceSerial,
std::move(props));
154 pendingRecords.back().formatFuture.then(
156 [
this, weakResults = std::weak_ptr{ pendingRecords.back().formatResults }](
157 std::vector<SpaObjectAudioFormat> formats) {
160 if (
auto ptr = weakResults.lock()) {
161 *ptr = std::move(formats);
162 startCompressionTimer();
167 if (mediaClass ==
"Audio/Source" || mediaClass ==
"Audio/Source/Virtual") {
168 addPendingNode(m_pendingRecords.m_sources);
171 if (mediaClass ==
"Audio/Sink" || mediaClass ==
"Audio/Sink/Virtual") {
172 addPendingNode(m_pendingRecords.m_sinks);
185 Q_ASSERT(QAudioContextManager::isInPwThreadLoop());
187 std::optional<ObjectSerial> serial = findObjectSerial(id);
192 qCDebug(lcPipewireDeviceMonitor) <<
"removing object" << *serial;
194 std::vector<SharedObjectRemoveObserver> removalObserversForObject;
196 QWriteLocker lock{ &m_objectDictMutex };
198 for (
const auto &observer : m_objectRemoveObserver) {
199 if (observer->serial() == serial)
200 removalObserversForObject.push_back(observer);
202 q20::erase_if(m_objectRemoveObserver, [&](
const SharedObjectRemoveObserver &element) {
203 return element->serial() == serial;
206 m_objectSerialDict.erase(id);
207 m_serialObjectDict.erase(*serial);
210 for (
const SharedObjectRemoveObserver &element : removalObserversForObject)
211 emit element->objectRemoved();
214 std::lock_guard guard{ m_pendingRecordsMutex };
216 m_pendingRecords.removeRecordsForObject(*serial);
217 m_pendingRecords.m_removals.push_back(*serial);
220 startCompressionTimer();
245 Q_ASSERT(
this->thread()->isCurrentThread());
247 PendingRecords pendingRecords = [&] {
248 std::lock_guard guard{ m_pendingRecordsMutex };
249 PendingRecords resolvedRecords;
251 std::swap(m_pendingRecords.m_removals, resolvedRecords.m_removals);
252 std::swap(m_pendingRecords.m_defaultSource, resolvedRecords.m_defaultSource);
253 std::swap(m_pendingRecords.m_defaultSink, resolvedRecords.m_defaultSink);
257 auto takeFullyResolvedRecords = [](std::list<PendingNodeRecord> &toResolve,
258 std::list<PendingNodeRecord> &resolved) {
259 auto it = toResolve.begin();
260 while (it != toResolve.end()) {
263 const bool isFullyResolved = it->formatResults->has_value();
264 if (isFullyResolved) {
265 auto next =
std::next(it);
266 resolved.splice(resolved.end(), toResolve, it);
273 takeFullyResolvedRecords(m_pendingRecords.m_sources, resolvedRecords.m_sources);
274 takeFullyResolvedRecords(m_pendingRecords.m_sinks, resolvedRecords.m_sinks);
276 return resolvedRecords;
280 [](std::variant<QByteArray, NoDefaultDeviceType> arg) -> std::optional<QByteArray> {
281 if (std::holds_alternative<NoDefaultDeviceType>(arg))
284 return std::get<QByteArray>(arg);
287 bool defaultSourceChanged = pendingRecords.m_defaultSource.has_value();
288 if (defaultSourceChanged)
289 m_defaultSourceName = getNodeName(*pendingRecords.m_defaultSource);
291 bool defaultSinkChanged = pendingRecords.m_defaultSink.has_value();
292 if (defaultSinkChanged)
293 m_defaultSinkName = getNodeName(*pendingRecords.m_defaultSink);
295 if (!pendingRecords.m_sources.empty() || !pendingRecords.m_removals.empty()
296 || defaultSourceChanged)
297 updateSources(std::move(pendingRecords.m_sources), pendingRecords.m_removals);
299 if (!pendingRecords.m_sinks.empty() || !pendingRecords.m_removals.empty() || defaultSinkChanged)
300 updateSinks(std::move(pendingRecords.m_sinks), pendingRecords.m_removals);
305 for (std::list<PendingNodeRecord> *recordList : { &m_sources, &m_sinks }) {
306 recordList->remove_if([&](
const PendingNodeRecord &record) {
307 return record.serial == id || record.deviceSerial == id;
313std::optional<ObjectSerial>
317 QReadLocker guard(&m_mutex);
319 QSpan records = Mode == Direction::sink ? QSpan{ m_sinks } : QSpan{ m_sources };
320 auto it = ranges::find_if(records, [&](
const NodeRecord &sink) {
321 return getNodeName(sink.properties) == nodeName;
324 if (it == records.end())
331 return findNodeSerialForNodeName<Direction::sink>(nodeName);
337 return findNodeSerialForNodeName<Direction::source>(nodeName);
342 QSpan<
const ObjectSerial> removedObjects)
344 QWriteLocker guard(&m_mutex);
346 std::vector<NodeRecord> &sinksOrSources = Mode == Direction::sink ? m_sinks : m_sources;
348 if (!removedObjects.empty()) {
349 for (ObjectSerial id : removedObjects) {
350 q20::erase_if(sinksOrSources, [&](
const auto &record) {
351 return record.serial == id || record.deviceSerial == id;
356 for (PendingNodeRecord &record : addedNodes) {
357 Q_ASSERT(record.formatResults && record.formatResults->has_value());
358 std::vector<SpaObjectAudioFormat> &results = **record.formatResults;
360 q20::erase_if(results, [](SpaObjectAudioFormat
const &arg) {
361 const bool isIEC61937EncapsulatedDevice = std::visit([](
const auto &format) {
362 if constexpr (std::is_same_v<std::decay_t<
decltype(format)>,
363 spa_audio_iec958_codec>) {
365 return format != SPA_AUDIO_IEC958_CODEC_PCM;
369 return isIEC61937EncapsulatedDevice;
373 ranges::sort(results, [](SpaObjectAudioFormat
const &lhs, SpaObjectAudioFormat
const &rhs) {
374 auto lhs_has_iec958 = std::holds_alternative<spa_audio_iec958_codec>(lhs.sampleTypes);
375 auto rhs_has_iec958 = std::holds_alternative<spa_audio_iec958_codec>(rhs.sampleTypes);
376 return lhs_has_iec958 < rhs_has_iec958;
379 if (results.size() > 1) {
380 qCDebug(lcPipewireDeviceMonitor)
381 <<
"Multiple formats supported by node, prefer non-iec958: format"
385 if (!results.empty()) {
386 sinksOrSources.push_back(NodeRecord{
389 std::move(record.properties),
390 std::move(results.front()),
393 qCDebug(lcPipewireDeviceMonitor)
394 <<
"Could not resolve audio format for" << record.serial;
398 QList<QAudioDevice> oldDeviceList =
399 Mode == Direction::sink ? m_sinkDeviceList : m_sourceDeviceList;
401 const std::optional<QByteArray> &defaultSinkOrSourceNodeNameBA =
402 Mode == Direction::sink ? m_defaultSinkName : m_defaultSourceName;
405 const auto defaultSinkOrSourceNodeName = [&]() -> std::optional<std::string_view> {
406 if (defaultSinkOrSourceNodeNameBA)
407 return std::string_view{
408 defaultSinkOrSourceNodeNameBA->data(),
409 std::size_t(defaultSinkOrSourceNodeNameBA->size()),
414 QList<QAudioDevice> newDeviceList;
417 for (NodeRecord &sinkOrSource : sinksOrSources) {
418 std::optional<std::string_view> nodeName = getNodeName(sinkOrSource.properties);
419 bool isDefault = (defaultSinkOrSourceNodeName == nodeName);
421 auto devicePrivate = std::make_unique<QPipewireAudioDevicePrivate>(
422 sinkOrSource.properties, sinkOrSource.format, QAudioDevice::Mode::Output,
425 QAudioDevice device = QAudioDevicePrivate::createQAudioDevice(std::move(devicePrivate));
427 newDeviceList.push_back(device);
429 qCDebug(lcPipewireDeviceMonitor) <<
"adding device" << nodeName;
433 ranges::sort(newDeviceList, [](
const QAudioDevice &lhs,
const QAudioDevice &rhs) {
434 return lhs.description() < rhs.description();
439 bool deviceListsEqual = ranges::equal(oldDeviceList, newDeviceList,
440 [](
const QAudioDevice &lhs,
const QAudioDevice &rhs) {
441 return (lhs.id() == rhs.id()) && (lhs.isDefault() == rhs.isDefault());
444 if (!deviceListsEqual) {
445 qCDebug(lcPipewireDeviceMonitor) <<
"updated device list";
447 if constexpr (Mode == Direction::sink) {
448 m_sinkDeviceList = newDeviceList;
449 emit audioSinksChanged(m_sinkDeviceList);
451 m_sourceDeviceList = newDeviceList;
452 emit audioSourcesChanged(m_sourceDeviceList);
458 QSpan<
const ObjectSerial> removedObjects)
460 updateSourcesOrSinks<Direction::sink>(
std::move(addedNodes), removedObjects);
464 QSpan<
const ObjectSerial> removedObjects)
466 updateSourcesOrSinks<Direction::source>(
std::move(addedNodes), removedObjects);
469std::optional<ObjectSerial>
QAudioDeviceMonitor::findDeviceSerial(std::string_view deviceName)
const
471 QReadLocker guard(&m_mutex);
472 auto it = ranges::find_if(m_devices, [&](
auto const &entry) {
473 return getDeviceName(entry.second.properties) == deviceName;
475 if (it == m_devices.end())
482 QReadLocker lock{ &m_objectDictMutex };
484 auto it = m_serialObjectDict.find(serial);
485 if (it != m_serialObjectDict.end())
492 QReadLocker lock{ &m_objectDictMutex };
494 auto it = m_objectSerialDict.find(id);
495 if (it != m_objectSerialDict.end())
502 QWriteLocker lock{ &m_objectDictMutex };
504 if (m_serialObjectDict.find(observer->serial()) == m_serialObjectDict.end())
507 m_objectRemoveObserver.push_back(std::move(observer));
513 QWriteLocker lock{ &m_objectDictMutex };
515 q20::erase(m_objectRemoveObserver, observer);
521 QAudioContextManager::instance()->syncRegistry();
525 QAudioContextManager::instance()->syncRegistry();
527 std::lock_guard pendingRecordLock{
528 m_pendingRecordsMutex,
531 for (ObjectSerial removed : m_pendingRecords.m_removals)
532 m_pendingRecords.removeRecordsForObject(removed);
534 auto allFormatsResolved = [](
const std::list<PendingNodeRecord> &list) {
535 return ranges::all_of(list, [](
const PendingNodeRecord &record) {
536 return record.formatFuture.isFinished();
540 if (allFormatsResolved(m_pendingRecords.m_sources)
541 && allFormatsResolved(m_pendingRecords.m_sinks))
549 QMetaObject::invokeMethod(&m_compressionTimer, [&] {
550 QCoreApplication::sendPostedEvents(&m_compressionTimer, QEvent::MetaCall);
551 audioDevicesChanged(verifyThreading);
552 m_compressionTimer.stop();
553 }, Qt::BlockingQueuedConnection);
555 QReadLocker lock{ &m_mutex };
557 .sources = m_sourceDeviceList,
558 .sinks = m_sinkDeviceList,
564 QMetaObject::invokeMethod(&m_compressionTimer, [
this] {
565 if (m_compressionTimer.isActive())
567 m_compressionTimer.start();
571QAudioDeviceMonitor::PendingNodeRecord::PendingNodeRecord(ObjectId object, ObjectSerial serial,
572 std::optional<ObjectSerial> deviceSerial,
573 PwPropertyDict properties):
581 std::move(properties),
584 std::make_shared<std::optional<std::vector<SpaObjectAudioFormat>>>()
587 Q_ASSERT(QAudioContextManager::isInPwThreadLoop());
589 auto promise = std::make_shared<QPromise<std::vector<SpaObjectAudioFormat>>>();
590 formatFuture = promise->future();
592 auto shared_results = std::make_shared<std::vector<SpaObjectAudioFormat>>();
594 auto onParam = [shared_results](
int , uint32_t , uint32_t ,
595 uint32_t ,
const struct spa_pod *param)
mutable {
596 std::optional<SpaObjectAudioFormat> format = SpaObjectAudioFormat::parse(param);
598 shared_results->emplace_back(*format);
601 QAudioContextManager::withEventLoopLock([&] {
602 QAudioContextManager *context = QAudioContextManager::instance();
603 PwNodeHandle nodeProxy = context->bindNode(object);
605 enumFormatListener = std::make_unique<NodeEventListener>(std::move(nodeProxy),
606 NodeEventListener::NodeHandler{
611 enumFormatListener->enumParams(SPA_PARAM_EnumFormat);
616 enumFormatDoneListener = std::make_unique<CoreEventDoneListener>();
617 enumFormatDoneListener->asyncWait(context->coreConnection().get(),
618 [promise, shared_results] {
620 promise->emplaceResult(std::move(*shared_results));
630#include "moc_qpipewire_audiodevicemonitor_p.cpp"
std::optional< ObjectSerial > findObjectSerial(ObjectId) const
void unregisterObserver(const SharedObjectRemoveObserver &)
std::optional< ObjectSerial > findSourceNodeSerial(std::string_view nodeName) const
DeviceLists getDeviceLists(bool verifyThreading=true)
std::optional< ObjectSerial > findSinkNodeSerial(std::string_view nodeName) const
void objectRemoved(ObjectId)
void objectAdded(ObjectId, uint32_t permissions, PipewireRegistryType, uint32_t version, const spa_dict &props)
std::optional< ObjectId > findObjectId(ObjectSerial) const
bool registerObserver(SharedObjectRemoveObserver)
Q_STATIC_LOGGING_CATEGORY(lcPipewireAudioSink, "qt.multimedia.pipewire.audiosink")
StrongIdType< uint64_t, ObjectSerialTag > ObjectSerial