6#include <QtMultimedia/private/qmultimedia_ranges_p.h>
7#include <QtMultimedia/private/qpipewire_audiocontextmanager_p.h>
8#include <QtMultimedia/private/qpipewire_audiodevice_p.h>
9#include <QtMultimedia/private/qpipewire_registry_support_p.h>
10#include <QtCore/private/qcoreapplication_p.h>
11#include <QtCore/private/qflatmap_p.h>
12#include <QtCore/private/qthread_p.h>
13#include <QtCore/q20vector.h>
14#include <QtCore/qcoreapplication.h>
15#include <QtCore/qdebug.h>
16#include <QtCore/qloggingcategory.h>
29 : m_observedSerial(objectSerial)
40 if (!QThread::isMainThread())
42 moveToThread(qApp->thread());
44 constexpr auto compressionTime = std::chrono::milliseconds(50);
46 m_compressionTimer.setTimerType(Qt::TimerType::CoarseTimer);
47 m_compressionTimer.setInterval(compressionTime);
48 m_compressionTimer.setSingleShot(
true);
50 m_compressionTimer.callOnTimeout(
this, [
this] {
51 audioDevicesChanged();
54 m_compressionTimerThread.setObjectName(
"PWDevMon");
55 m_compressionTimerThread.setServiceLevel(QThread::QualityOfService::Eco);
56 m_compressionTimerThread.setStackSize(1024 * 1024);
57 m_compressionTimerThread.start();
58 m_compressionTimerThread.setPriority(QThread::Priority::LowPriority);
59 m_compressionTimer.moveToThread(&m_compressionTimerThread);
63 QMetaObject::invokeMethod(&monitor.m_compressionTimer, [&] {
64 monitor.m_compressionTimer.stop();
65 monitor.m_compressionTimer.moveToThread(qApp->thread());
66 }, Qt::BlockingQueuedConnection);
72 if (m_compressionTimer.thread() != thread()) {
73 QMetaObject::invokeMethod(&m_compressionTimer, [
this] {
74 m_compressionTimer.stop();
75 m_compressionTimer.moveToThread(thread());
76 }, Qt::BlockingQueuedConnection);
79 m_compressionTimerThread.quit();
80 m_compressionTimerThread.wait();
84 PipewireRegistryType objectType, uint32_t ,
85 const spa_dict &propDict)
87 Q_ASSERT(QAudioContextManager::isInPwThreadLoop());
89 Q_ASSERT(objectType == PipewireRegistryType::Device
90 || objectType == PipewireRegistryType::Node);
92 PwPropertyDict props = toPropertyDict(propDict);
93 std::optional<std::string_view> mediaClass = getMediaClass(props);
97 std::optional<ObjectSerial> serial = getObjectSerial(props);
100 QWriteLocker lock{ &m_objectDictMutex };
101 m_objectSerialDict.emplace(id, *serial);
102 m_serialObjectDict.emplace(*serial, id);
105 switch (objectType) {
106 case PipewireRegistryType::Device: {
107 if (mediaClass !=
"Audio/Device")
111 qCDebug(lcPipewireDeviceMonitor)
112 <<
"added device" << *serial << getDeviceDescription(props).value_or(
"");
114 QWriteLocker lock{ &m_mutex };
115 m_devices.emplace(*serial, DeviceRecord{ *serial, std::move(props) });
119 case PipewireRegistryType::Node: {
122 auto addPendingNode = [&](std::list<PendingNodeRecord> &pendingRecords) {
123 std::optional<std::string_view> nodeName = getNodeName(props);
125 qCWarning(lcPipewireDeviceMonitor) <<
"node without name (ignoring):" << props;
129 if (nodeName ==
"auto_null") {
132 qCWarning(lcPipewireDeviceMonitor) <<
"Ignoring dummy output:" << props;
137 std::optional<ObjectId> deviceId = getDeviceId(props);
138 std::optional<ObjectSerial> deviceSerial =
139 deviceId ? findObjectSerial(*deviceId) : std::nullopt;
141 if (deviceId && !deviceSerial) {
142 qCInfo(lcPipewireDeviceMonitor) <<
"Cannot add node: device removed";
146 std::lock_guard guard{ m_pendingRecordsMutex };
148 qCDebug(lcPipewireDeviceMonitor) <<
"added node for device" << serial << deviceSerial;
152 pendingRecords.emplace_back(id, *serial, deviceSerial,
std::move(props));
153 pendingRecords.back().formatFuture.then(
155 [
this, weakResults = std::weak_ptr{ pendingRecords.back().formatResults }](
156 std::vector<SpaObjectAudioFormat> formats) {
159 if (
auto ptr = weakResults.lock()) {
160 *ptr = std::move(formats);
161 startCompressionTimer();
166 if (mediaClass ==
"Audio/Source" || mediaClass ==
"Audio/Source/Virtual") {
167 addPendingNode(m_pendingRecords.m_sources);
170 if (mediaClass ==
"Audio/Sink" || mediaClass ==
"Audio/Sink/Virtual") {
171 addPendingNode(m_pendingRecords.m_sinks);
184 Q_ASSERT(QAudioContextManager::isInPwThreadLoop());
186 std::optional<ObjectSerial> serial = findObjectSerial(id);
191 qCDebug(lcPipewireDeviceMonitor) <<
"removing object" << *serial;
193 std::vector<SharedObjectRemoveObserver> removalObserversForObject;
195 QWriteLocker lock{ &m_objectDictMutex };
197 for (
const auto &observer : m_objectRemoveObserver) {
198 if (observer->serial() == serial)
199 removalObserversForObject.push_back(observer);
201 q20::erase_if(m_objectRemoveObserver, [&](
const SharedObjectRemoveObserver &element) {
202 return element->serial() == serial;
205 m_objectSerialDict.erase(id);
206 m_serialObjectDict.erase(*serial);
209 for (
const SharedObjectRemoveObserver &element : removalObserversForObject)
210 emit element->objectRemoved();
213 std::lock_guard guard{ m_pendingRecordsMutex };
215 m_pendingRecords.removeRecordsForObject(*serial);
216 m_pendingRecords.m_removals.push_back(*serial);
219 startCompressionTimer();
244 Q_ASSERT(
this->thread()->isCurrentThread());
246 PendingRecords pendingRecords = [&] {
247 std::lock_guard guard{ m_pendingRecordsMutex };
248 PendingRecords resolvedRecords;
250 std::swap(m_pendingRecords.m_removals, resolvedRecords.m_removals);
251 std::swap(m_pendingRecords.m_defaultSource, resolvedRecords.m_defaultSource);
252 std::swap(m_pendingRecords.m_defaultSink, resolvedRecords.m_defaultSink);
256 auto takeFullyResolvedRecords = [](std::list<PendingNodeRecord> &toResolve,
257 std::list<PendingNodeRecord> &resolved) {
258 auto it = toResolve.begin();
259 while (it != toResolve.end()) {
262 const bool isFullyResolved = it->formatResults->has_value();
263 if (isFullyResolved) {
264 auto next =
std::next(it);
265 resolved.splice(resolved.end(), toResolve, it);
272 takeFullyResolvedRecords(m_pendingRecords.m_sources, resolvedRecords.m_sources);
273 takeFullyResolvedRecords(m_pendingRecords.m_sinks, resolvedRecords.m_sinks);
275 return resolvedRecords;
279 [](std::variant<QByteArray, NoDefaultDeviceType> arg) -> std::optional<QByteArray> {
280 if (std::holds_alternative<NoDefaultDeviceType>(arg))
283 return std::get<QByteArray>(arg);
286 bool defaultSourceChanged = pendingRecords.m_defaultSource.has_value();
287 if (defaultSourceChanged)
288 m_defaultSourceName = getNodeName(*pendingRecords.m_defaultSource);
290 bool defaultSinkChanged = pendingRecords.m_defaultSink.has_value();
291 if (defaultSinkChanged)
292 m_defaultSinkName = getNodeName(*pendingRecords.m_defaultSink);
294 if (!pendingRecords.m_sources.empty() || !pendingRecords.m_removals.empty()
295 || defaultSourceChanged)
296 updateSources(std::move(pendingRecords.m_sources), pendingRecords.m_removals);
298 if (!pendingRecords.m_sinks.empty() || !pendingRecords.m_removals.empty() || defaultSinkChanged)
299 updateSinks(std::move(pendingRecords.m_sinks), pendingRecords.m_removals);
304 for (std::list<PendingNodeRecord> *recordList : { &m_sources, &m_sinks }) {
305 recordList->remove_if([&](
const PendingNodeRecord &record) {
306 return record.serial == id || record.deviceSerial == id;
312std::optional<ObjectSerial>
316 QReadLocker guard(&m_mutex);
318 QSpan records = Mode == Direction::sink ? QSpan{ m_sinks } : QSpan{ m_sources };
319 auto it = std::find_if(records.begin(), records.end(), [&](
const NodeRecord &sink) {
320 return getNodeName(sink.properties) == nodeName;
323 if (it == records.end())
330 return findNodeSerialForNodeName<Direction::sink>(nodeName);
336 return findNodeSerialForNodeName<Direction::source>(nodeName);
341 QSpan<
const ObjectSerial> removedObjects)
343 QWriteLocker guard(&m_mutex);
345 std::vector<NodeRecord> &sinksOrSources = Mode == Direction::sink ? m_sinks : m_sources;
347 if (!removedObjects.empty()) {
348 for (ObjectSerial id : removedObjects) {
349 q20::erase_if(sinksOrSources, [&](
const auto &record) {
350 return record.serial == id || record.deviceSerial == id;
355 for (PendingNodeRecord &record : addedNodes) {
356 Q_ASSERT(record.formatResults && record.formatResults->has_value());
357 std::vector<SpaObjectAudioFormat> &results = **record.formatResults;
359 q20::erase_if(results, [](SpaObjectAudioFormat
const &arg) {
360 const bool isIEC61937EncapsulatedDevice = std::visit([](
const auto &format) {
361 if constexpr (std::is_same_v<std::decay_t<
decltype(format)>,
362 spa_audio_iec958_codec>) {
364 return format != SPA_AUDIO_IEC958_CODEC_PCM;
368 return isIEC61937EncapsulatedDevice;
372 std::sort(results.begin(), results.end(),
373 [](SpaObjectAudioFormat
const &lhs, SpaObjectAudioFormat
const &rhs) {
374 auto lhs_has_iec958 = std::holds_alternative<spa_audio_iec958_codec>(lhs.sampleTypes);
375 auto rhs_has_iec958 = std::holds_alternative<spa_audio_iec958_codec>(rhs.sampleTypes);
376 return lhs_has_iec958 < rhs_has_iec958;
379 if (results.size() > 1) {
380 qCDebug(lcPipewireDeviceMonitor)
381 <<
"Multiple formats supported by node, prefer non-iec958: format"
385 if (!results.empty()) {
386 sinksOrSources.push_back(NodeRecord{
389 std::move(record.properties),
390 std::move(results.front()),
393 qCDebug(lcPipewireDeviceMonitor)
394 <<
"Could not resolve audio format for" << record.serial;
398 QList<QAudioDevice> oldDeviceList =
399 Mode == Direction::sink ? m_sinkDeviceList : m_sourceDeviceList;
401 const std::optional<QByteArray> &defaultSinkOrSourceNodeNameBA =
402 Mode == Direction::sink ? m_defaultSinkName : m_defaultSourceName;
405 const auto defaultSinkOrSourceNodeName = [&]() -> std::optional<std::string_view> {
406 if (defaultSinkOrSourceNodeNameBA)
407 return std::string_view{
408 defaultSinkOrSourceNodeNameBA->data(),
409 std::size_t(defaultSinkOrSourceNodeNameBA->size()),
414 QList<QAudioDevice> newDeviceList;
417 for (NodeRecord &sinkOrSource : sinksOrSources) {
418 std::optional<std::string_view> nodeName = getNodeName(sinkOrSource.properties);
419 bool isDefault = (defaultSinkOrSourceNodeName == nodeName);
421 auto devicePrivate = std::make_unique<QPipewireAudioDevicePrivate>(
422 sinkOrSource.properties, sinkOrSource.format, QAudioDevice::Mode::Output,
425 QAudioDevice device = QAudioDevicePrivate::createQAudioDevice(std::move(devicePrivate));
427 newDeviceList.push_back(device);
429 qCDebug(lcPipewireDeviceMonitor) <<
"adding device" << nodeName;
433 std::sort(newDeviceList.begin(), newDeviceList.end(),
434 [](
const QAudioDevice &lhs,
const QAudioDevice &rhs) {
435 return lhs.description() < rhs.description();
440 bool deviceListsEqual = ranges::equal(oldDeviceList, newDeviceList,
441 [](
const QAudioDevice &lhs,
const QAudioDevice &rhs) {
442 return (lhs.id() == rhs.id()) && (lhs.isDefault() == rhs.isDefault());
445 if (!deviceListsEqual) {
446 qCDebug(lcPipewireDeviceMonitor) <<
"updated device list";
448 if constexpr (Mode == Direction::sink) {
449 m_sinkDeviceList = newDeviceList;
450 emit audioSinksChanged(m_sinkDeviceList);
452 m_sourceDeviceList = newDeviceList;
453 emit audioSourcesChanged(m_sourceDeviceList);
459 QSpan<
const ObjectSerial> removedObjects)
461 updateSourcesOrSinks<Direction::sink>(
std::move(addedNodes), removedObjects);
465 QSpan<
const ObjectSerial> removedObjects)
467 updateSourcesOrSinks<Direction::source>(
std::move(addedNodes), removedObjects);
470std::optional<ObjectSerial>
QAudioDeviceMonitor::findDeviceSerial(std::string_view deviceName)
const
472 QReadLocker guard(&m_mutex);
473 auto it = std::find_if(m_devices.begin(), m_devices.end(), [&](
auto const &entry) {
474 return getDeviceName(entry.second.properties) == deviceName;
476 if (it == m_devices.end())
483 QReadLocker lock{ &m_objectDictMutex };
485 auto it = m_serialObjectDict.find(serial);
486 if (it != m_serialObjectDict.end())
493 QReadLocker lock{ &m_objectDictMutex };
495 auto it = m_objectSerialDict.find(id);
496 if (it != m_objectSerialDict.end())
503 QWriteLocker lock{ &m_objectDictMutex };
505 if (m_serialObjectDict.find(observer->serial()) == m_serialObjectDict.end())
508 m_objectRemoveObserver.push_back(std::move(observer));
514 QWriteLocker lock{ &m_objectDictMutex };
516 q20::erase(m_objectRemoveObserver, observer);
522 QAudioContextManager::instance()->syncRegistry();
526 QAudioContextManager::instance()->syncRegistry();
528 std::lock_guard pendingRecordLock{
529 m_pendingRecordsMutex,
532 for (ObjectSerial removed : m_pendingRecords.m_removals)
533 m_pendingRecords.removeRecordsForObject(removed);
535 auto allFormatsResolved = [](
const std::list<PendingNodeRecord> &list) {
536 return std::all_of(list.begin(), list.end(), [](
const PendingNodeRecord &record) {
537 return record.formatFuture.isFinished();
541 if (allFormatsResolved(m_pendingRecords.m_sources)
542 && allFormatsResolved(m_pendingRecords.m_sinks))
550 QMetaObject::invokeMethod(&m_compressionTimer, [&] {
551 QCoreApplication::sendPostedEvents(&m_compressionTimer, QEvent::MetaCall);
552 audioDevicesChanged(verifyThreading);
553 m_compressionTimer.stop();
554 }, Qt::BlockingQueuedConnection);
556 QReadLocker lock{ &m_mutex };
558 .sources = m_sourceDeviceList,
559 .sinks = m_sinkDeviceList,
565 QMetaObject::invokeMethod(&m_compressionTimer, [
this] {
566 if (m_compressionTimer.isActive())
568 m_compressionTimer.start();
572QAudioDeviceMonitor::PendingNodeRecord::PendingNodeRecord(ObjectId object, ObjectSerial serial,
573 std::optional<ObjectSerial> deviceSerial,
574 PwPropertyDict properties):
582 std::move(properties),
585 std::make_shared<std::optional<std::vector<SpaObjectAudioFormat>>>()
588 Q_ASSERT(QAudioContextManager::isInPwThreadLoop());
590 auto promise = std::make_shared<QPromise<std::vector<SpaObjectAudioFormat>>>();
591 formatFuture = promise->future();
593 auto shared_results = std::make_shared<std::vector<SpaObjectAudioFormat>>();
595 auto onParam = [shared_results](
int , uint32_t , uint32_t ,
596 uint32_t ,
const struct spa_pod *param)
mutable {
597 std::optional<SpaObjectAudioFormat> format = SpaObjectAudioFormat::parse(param);
599 shared_results->emplace_back(*format);
602 QAudioContextManager::withEventLoopLock([&] {
603 QAudioContextManager *context = QAudioContextManager::instance();
604 PwNodeHandle nodeProxy = context->bindNode(object);
606 enumFormatListener = std::make_unique<NodeEventListener>(std::move(nodeProxy),
607 NodeEventListener::NodeHandler{
612 enumFormatListener->enumParams(SPA_PARAM_EnumFormat);
617 enumFormatDoneListener = std::make_unique<CoreEventDoneListener>();
618 enumFormatDoneListener->asyncWait(context->coreConnection().get(),
619 [promise, shared_results] {
621 promise->emplaceResult(std::move(*shared_results));
631#include "moc_qpipewire_audiodevicemonitor_p.cpp"
std::optional< ObjectSerial > findObjectSerial(ObjectId) const
void unregisterObserver(const SharedObjectRemoveObserver &)
std::optional< ObjectSerial > findSourceNodeSerial(std::string_view nodeName) const
DeviceLists getDeviceLists(bool verifyThreading=true)
std::optional< ObjectSerial > findSinkNodeSerial(std::string_view nodeName) const
void objectRemoved(ObjectId)
void objectAdded(ObjectId, uint32_t permissions, PipewireRegistryType, uint32_t version, const spa_dict &props)
std::optional< ObjectId > findObjectId(ObjectSerial) const
bool registerObserver(SharedObjectRemoveObserver)
Q_STATIC_LOGGING_CATEGORY(lcPipewireAudioSink, "qt.multimedia.pipewire.audiosink")
StrongIdType< uint64_t, ObjectSerialTag > ObjectSerial