Qt
Internal/Contributor docs for the Qt SDK. Note: These are NOT official API docs; those are found at https://doc.qt.io/
Loading...
Searching...
No Matches
qgstreamer_qiodevice_handler.cpp
Go to the documentation of this file.
1// Copyright (C) 2024 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
5
6#include <QtCore/qdebug.h>
7#include <QtCore/qglobal.h>
8#include <QtCore/qiodevice.h>
9#include <QtCore/qmutex.h>
10#include <QtCore/qobject.h>
11#include <QtCore/qspan.h>
12#include <QtCore/qurl.h>
13#include <QtCore/quuid.h>
14
15#include <gst/base/gstbasesrc.h>
16#include <map>
17#include <memory>
18#include <mutex>
19#include <utility>
20
21QT_BEGIN_NAMESPACE
22
23namespace {
24
25using namespace Qt::Literals;
26
27// QIODeviceRegistry
28
30{
31public:
32 struct Record
33 {
34 explicit Record(QByteArray, QIODevice *);
35
37 bool isValid() const;
38
40
41 template <typename Functor>
42 auto runWhileLocked(Functor &&f)
43 {
44 QMutexLocker guard(&mutex);
45 return f(device);
46 }
47
48 private:
49 QIODevice *device;
50 mutable QMutex mutex;
51 };
52 using SharedRecord = std::shared_ptr<Record>;
53
56
57private:
58 void unregisterDevice(QIODevice *);
59 void deviceDestroyed(QIODevice *);
60
61 QMutex m_registryMutex;
63 std::map<QIODevice *, QByteArray> m_reverseLookupTable;
64};
65
67{
68 Q_ASSERT(device);
69
70 if (device->isSequential())
71 qWarning() << "GStreamer: sequential QIODevices are not fully supported";
72
73 QMutexLocker lock(&m_registryMutex);
74
75 auto it = m_reverseLookupTable.find(device);
76 if (it != m_reverseLookupTable.end())
77 return it->second;
78
79 QByteArray identifier =
80 "qiodevice:/"_ba + QUuid::createUuid().toByteArray(QUuid::StringFormat::Id128);
81
82 m_registry.emplace(identifier, std::make_shared<Record>(identifier, device));
83
84 QMetaObject::Connection destroyedConnection = QObject::connect(
85 device, &QObject::destroyed, this,
86 [this, device] {
87 // Caveat: if the QIODevice has not been closed, we unregister the device, however gstreamer
88 // worker threads have a chance to briefly read from a partially destroyed QIODevice.
89 // There's nothing we can do about it
90 unregisterDevice(device);
91 },
92 Qt::DirectConnection);
93
94 QObject::connect(
95 device, &QIODevice::aboutToClose, this,
96 [this, device, destroyedConnection = std::move(destroyedConnection)] {
97 unregisterDevice(device);
98 disconnect(destroyedConnection);
99 },
100 Qt::DirectConnection);
101
102 m_reverseLookupTable.emplace(device, identifier);
103 return identifier;
104}
105
107{
108 QMutexLocker registryLock(&m_registryMutex);
109
110 auto it = m_registry.find(id);
111 if (it != m_registry.end())
112 return it->second;
113 return {};
114}
115
116void QIODeviceRegistry::unregisterDevice(QIODevice *device)
117{
118 QMutexLocker registryLock(&m_registryMutex);
119 auto reverseLookupIt = m_reverseLookupTable.find(device);
120 if (reverseLookupIt == m_reverseLookupTable.end())
121 return;
122
123 auto it = m_registry.find(reverseLookupIt->second);
124 Q_ASSERT(it != m_registry.end());
125
126 it->second->unsetDevice();
127 m_reverseLookupTable.erase(reverseLookupIt);
128 m_registry.erase(it);
129}
130
131QIODeviceRegistry::Record::Record(QByteArray id, QIODevice *device)
132 : id {
133 std::move(id),
134 },
135 device {
136 device,
137 }
138{
139 if (!device->isOpen())
140 device->open(QIODevice::ReadOnly);
141}
142
144{
145 QMutexLocker lock(&mutex);
146 device = nullptr;
147}
148
150{
151 QMutexLocker lock(&mutex);
152 return device;
153}
154
156
157// qt helpers
158
159// glib / gstreamer object
164
166{
167 void getProperty(guint propId, GValue *value, const GParamSpec *pspec) const;
168 void setProperty(guint propId, const GValue *value, const GParamSpec *pspec);
169
170 bool start();
171 bool stop();
172
175 GstFlowReturn fill(guint64 offset, guint length, GstBuffer *buf);
176 void getURI(GValue *value) const;
177 bool setURI(const char *location, GError **err = nullptr);
178
179 // lockable
180 void lock() const { GST_OBJECT_LOCK(&baseSrc); }
181 void unlock() const { GST_OBJECT_UNLOCK(&baseSrc); }
182
185};
186
187void QGstQIODeviceSrc::getProperty(guint propId, GValue *value, const GParamSpec *pspec) const
188{
189 switch (propId) {
190 case PROP_URI:
191 return getURI(value);
192
193 default:
194 G_OBJECT_WARN_INVALID_PROPERTY_ID(this, propId, pspec);
195 break;
196 }
197}
198
199void QGstQIODeviceSrc::setProperty(guint propId, const GValue *value, const GParamSpec *pspec)
200{
201 switch (propId) {
202 case PROP_URI:
203 setURI(g_value_get_string(value));
204 break;
205 default:
206 G_OBJECT_WARN_INVALID_PROPERTY_ID(this, propId, pspec);
207 break;
208 }
209}
210
212{
213 std::lock_guard guard{ *this };
214 return record && record->isValid();
215}
216
218{
219 return true;
220}
221
223{
224 std::lock_guard guard{ *this };
225 return record->runWhileLocked([&](QIODevice *device) {
226 return !device->isSequential();
227 });
228}
229
231{
232 std::lock_guard guard{ *this };
233 if (!record)
234 return std::nullopt;
235
236 qint64 size = record->runWhileLocked([&](QIODevice *device) {
237 return device->size();
238 });
239
240 if (size == -1)
241 return std::nullopt;
242 return size;
243}
244
245GstFlowReturn QGstQIODeviceSrc::fill(guint64 offset, guint length, GstBuffer *buf)
246{
247 std::unique_lock guard{ *this };
248 if (!record)
249 return GST_FLOW_ERROR;
250
251 GstMapInfo info;
252 if (!gst_buffer_map(buf, &info, GST_MAP_WRITE)) {
253 guard.unlock();
254 GST_ELEMENT_ERROR(&baseSrc, RESOURCE, WRITE, (nullptr), ("Can't map buffer for writing"));
255 return GST_FLOW_ERROR;
256 }
257
258 int64_t totalRead = 0;
259 GstFlowReturn ret = record->runWhileLocked([&](QIODevice *device) -> GstFlowReturn {
260 if (device->pos() != qint64(offset)) {
261 bool success = device->seek(offset);
262 if (!success) {
263 qWarning() << "seek on iodevice failed";
264 return GST_FLOW_ERROR;
265 }
266 }
267
268 int64_t remain = length;
269 while (remain) {
270 int64_t bytesRead =
271 device->read(reinterpret_cast<char *>(info.data + totalRead), remain);
272 if (bytesRead == -1) {
273 if (device->atEnd()) {
274 return GST_FLOW_EOS;
275 }
276 guard.unlock();
277 GST_ELEMENT_ERROR(&baseSrc, RESOURCE, READ, (nullptr), GST_ERROR_SYSTEM);
278 return GST_FLOW_ERROR;
279 }
280
281 remain -= bytesRead;
282 totalRead += bytesRead;
283 }
284
285 return GST_FLOW_OK;
286 });
287
288 if (ret != GST_FLOW_OK) {
289 gst_buffer_unmap(buf, &info);
290 gst_buffer_resize(buf, 0, 0);
291 return ret;
292 }
293
294 gst_buffer_unmap(buf, &info);
295 if (totalRead != length)
296 gst_buffer_resize(buf, 0, totalRead);
297
298 GST_BUFFER_OFFSET(buf) = offset;
299 GST_BUFFER_OFFSET_END(buf) = offset + totalRead;
300
301 return GST_FLOW_OK;
302}
303
304void QGstQIODeviceSrc::getURI(GValue *value) const
305{
306 std::lock_guard guard{ *this };
307 if (record)
308 g_value_set_string(value, record->id);
309 else
310 g_value_set_string(value, nullptr);
311}
312
313bool QGstQIODeviceSrc::setURI(const char *location, GError **err)
314{
315 Q_ASSERT(QLatin1StringView(location).startsWith("qiodevice:/"_L1));
316
317 {
318 std::lock_guard guard{ *this };
319 GstState state = GST_STATE(this);
320
321 if (state != GST_STATE_READY && state != GST_STATE_NULL) {
322 g_warning(
323 "Changing the `uri' property on qiodevicesrc when the resource is open is not "
324 "supported.");
325 if (err)
326 g_set_error(err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
327 "Changing the `uri' property on qiodevicesrc when the resource is open "
328 "is not supported.");
329 return false;
330 }
331 }
332
333 auto newRecord = gQIODeviceRegistry->findRecord(QByteArrayView{ location });
334
335 {
336 std::lock_guard guard{ *this };
337 record = std::move(newRecord);
338 }
339
340 g_object_notify(G_OBJECT(this), "uri");
341
342 return true;
343}
344
349
350// GObject
353
355
356template <typename T>
358{
359 return (G_TYPE_CHECK_INSTANCE_CAST((obj), gst_qiodevice_src_get_type(), QGstQIODeviceSrc));
360}
361
362// URI handler
364
365#define gst_qiodevice_src_parent_class parent_class
367 G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, qGstInitQIODeviceURIHandler));
368
369// implementations
370
372{
373 using SharedRecord = QIODeviceRegistry::SharedRecord;
374
375 new (reinterpret_cast<void *>(&self->record)) SharedRecord;
376
377 static constexpr guint defaultBlockSize = 16384;
378 gst_base_src_set_blocksize(&self->baseSrc, defaultBlockSize);
379}
380
382{
383 // GObject
384 GObjectClass *gobjectClass = G_OBJECT_CLASS(klass);
385 gobjectClass->set_property = [](GObject *instance, guint propId, const GValue *value,
386 GParamSpec *pspec) {
387 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(instance);
388 return src->setProperty(propId, value, pspec);
389 };
390
391 gobjectClass->get_property = [](GObject *instance, guint propId, GValue *value,
392 GParamSpec *pspec) {
393 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(instance);
394 return src->getProperty(propId, value, pspec);
395 };
396
397 g_object_class_install_property(
398 gobjectClass, PROP_URI,
399 g_param_spec_string("uri", "QRC Location", "Path of the qrc to read", nullptr,
400 GParamFlags(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
401 | GST_PARAM_MUTABLE_READY)));
402
403 gobjectClass->finalize = [](GObject *instance) {
404 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(instance);
405 using SharedRecord = QIODeviceRegistry::SharedRecord;
406 src->record.~SharedRecord();
407 G_OBJECT_CLASS(parent_class)->finalize(instance);
408 };
409
410 // GstElement
411 GstElementClass *gstelementClass = GST_ELEMENT_CLASS(klass);
412 gst_element_class_set_static_metadata(gstelementClass, "QRC Source", "Source/QRC",
413 "Read from arbitrary point in QRC resource",
414 "Tim Blechmann <tim.blechmann@qt.io>");
415
416 static GstStaticPadTemplate srctemplate =
417 GST_STATIC_PAD_TEMPLATE("src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
418
419 gst_element_class_add_static_pad_template(gstelementClass, &srctemplate);
420
421 // GstBaseSrc
422 GstBaseSrcClass *gstbasesrcClass = GST_BASE_SRC_CLASS(klass);
423 gstbasesrcClass->start = [](GstBaseSrc *basesrc) -> gboolean {
424 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(basesrc);
425 return src->start();
426 };
427 gstbasesrcClass->stop = [](GstBaseSrc *basesrc) -> gboolean {
428 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(basesrc);
429 return src->stop();
430 };
431
432 gstbasesrcClass->is_seekable = [](GstBaseSrc *basesrc) -> gboolean {
433 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(basesrc);
434 return src->isSeekable();
435 };
436 gstbasesrcClass->get_size = [](GstBaseSrc *basesrc, guint64 *size) -> gboolean {
437 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(basesrc);
438 auto optionalSize = src->size();
439 if (!optionalSize)
440 return false;
441 *size = optionalSize.value();
442 return true;
443 };
444 gstbasesrcClass->fill = [](GstBaseSrc *basesrc, guint64 offset, guint length,
445 GstBuffer *buf) -> GstFlowReturn {
446 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(basesrc);
447 return src->fill(offset, length, buf);
448 };
449}
450
451void qGstInitQIODeviceURIHandler(gpointer g_handlerInterface, gpointer)
452{
453 GstURIHandlerInterface *iface = (GstURIHandlerInterface *)g_handlerInterface;
454
455 iface->get_type = [](GType) {
456 return GST_URI_SRC;
457 };
458 iface->get_protocols = [](GType) {
459 static constexpr const gchar *protocols[] = {
460 "qiodevice",
461 nullptr,
462 };
463 return protocols;
464 };
465 iface->get_uri = [](GstURIHandler *handler) -> gchar * {
466 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(handler);
467 std::lock_guard guard{ *src };
468 if (src->record)
469 return g_strdup(src->record->id.constData());
470 return nullptr;
471 };
472 iface->set_uri = [](GstURIHandler *handler, const gchar *uri, GError **err) -> gboolean {
473 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(handler);
474 return src->setURI(uri, err);
475 };
476}
477
478} // namespace
479
480// plugin registration
481
482void qGstRegisterQIODeviceHandler(GstPlugin *plugin)
483{
484 gst_element_register(plugin, "qiodevicesrc", GST_RANK_PRIMARY, gst_qiodevice_src_get_type());
485}
486
487QUrl qGstRegisterQIODevice(QIODevice *device)
488{
489 return QUrl{
490 QString::fromLatin1(gQIODeviceRegistry->registerQIODevice(device)),
491 };
492}
493
494QT_END_NAMESPACE
Q_GLOBAL_STATIC(QIODeviceRegistry, gQIODeviceRegistry)
static void gst_qiodevice_src_init(QGstQIODeviceSrc *self)
QGstQIODeviceSrc * asQGstQIODeviceSrc(T *obj)
void qGstInitQIODeviceURIHandler(gpointer, gpointer)
static void gst_qiodevice_src_class_init(QGstQIODeviceSrcClass *klass)
GType gst_qiodevice_src_get_type()
G_DEFINE_TYPE_WITH_CODE(QGstQIODeviceSrc, gst_qiodevice_src, GST_TYPE_BASE_SRC, G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, qGstInitQIODeviceURIHandler))
QUrl qGstRegisterQIODevice(QIODevice *)
void qGstRegisterQIODeviceHandler(GstPlugin *plugin)
GstFlowReturn fill(guint64 offset, guint length, GstBuffer *buf)
bool setURI(const char *location, GError **err=nullptr)
void getProperty(guint propId, GValue *value, const GParamSpec *pspec) const
void setProperty(guint propId, const GValue *value, const GParamSpec *pspec)