Qt
Internal/Contributor docs for the Qt SDK. Note: These are NOT official API docs; those are found at https://doc.qt.io/
Loading...
Searching...
No Matches
qgstreamer_qiodevice_handler.cpp
Go to the documentation of this file.
1// Copyright (C) 2024 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
5
6#include <QtCore/qdebug.h>
7#include <QtCore/qglobal.h>
8#include <QtCore/qiodevice.h>
9#include <QtCore/qmutex.h>
10#include <QtCore/qobject.h>
11#include <QtCore/qspan.h>
12#include <QtCore/qurl.h>
13#include <QtCore/quuid.h>
14
15#include <gst/base/gstbasesrc.h>
16#include <map>
17#include <memory>
18#include <utility>
19
20QT_BEGIN_NAMESPACE
21
22namespace {
23
24using namespace Qt::Literals;
25
26// QIODeviceRegistry
27
29{
30public:
31 struct Record
32 {
33 explicit Record(QByteArray, QIODevice *);
34
36 bool isValid() const;
37
39
40 template <typename Functor>
41 auto runWhileLocked(Functor &&f)
42 {
43 QMutexLocker guard(&mutex);
44 return f(device);
45 }
46
47 private:
48 QIODevice *device;
49 mutable QMutex mutex;
50 };
51 using SharedRecord = std::shared_ptr<Record>;
52
55
56private:
57 void unregisterDevice(QIODevice *);
58 void deviceDestroyed(QIODevice *);
59
60 QMutex m_registryMutex;
62 std::map<QIODevice *, QByteArray> m_reverseLookupTable;
63};
64
66{
67 Q_ASSERT(device);
68
69 if (device->isSequential())
70 qWarning() << "GStreamer: sequential QIODevices are not fully supported";
71
72 QMutexLocker lock(&m_registryMutex);
73
74 auto it = m_reverseLookupTable.find(device);
75 if (it != m_reverseLookupTable.end())
76 return it->second;
77
78 QByteArray identifier =
79 "qiodevice:/"_ba + QUuid::createUuid().toByteArray(QUuid::StringFormat::Id128);
80
81 m_registry.emplace(identifier, std::make_shared<Record>(identifier, device));
82
83 QMetaObject::Connection destroyedConnection = QObject::connect(
84 device, &QObject::destroyed, this,
85 [this, device] {
86 // Caveat: if the QIODevice has not been closed, we unregister the device, however gstreamer
87 // worker threads have a chance to briefly read from a partially destroyed QIODevice.
88 // There's nothing we can do about it
89 unregisterDevice(device);
90 },
91 Qt::DirectConnection);
92
93 QObject::connect(
94 device, &QIODevice::aboutToClose, this,
95 [this, device, destroyedConnection = std::move(destroyedConnection)] {
96 unregisterDevice(device);
97 disconnect(destroyedConnection);
98 },
99 Qt::DirectConnection);
100
101 m_reverseLookupTable.emplace(device, identifier);
102 return identifier;
103}
104
106{
107 QMutexLocker registryLock(&m_registryMutex);
108
109 auto it = m_registry.find(id);
110 if (it != m_registry.end())
111 return it->second;
112 return {};
113}
114
115void QIODeviceRegistry::unregisterDevice(QIODevice *device)
116{
117 QMutexLocker registryLock(&m_registryMutex);
118 auto reverseLookupIt = m_reverseLookupTable.find(device);
119 if (reverseLookupIt == m_reverseLookupTable.end())
120 return;
121
122 auto it = m_registry.find(reverseLookupIt->second);
123 Q_ASSERT(it != m_registry.end());
124
125 it->second->unsetDevice();
126 m_reverseLookupTable.erase(reverseLookupIt);
127 m_registry.erase(it);
128}
129
130QIODeviceRegistry::Record::Record(QByteArray id, QIODevice *device)
131 : id {
132 std::move(id),
133 },
134 device {
135 device,
136 }
137{
138 if (!device->isOpen())
139 device->open(QIODevice::ReadOnly);
140}
141
143{
144 QMutexLocker lock(&mutex);
145 device = nullptr;
146}
147
149{
150 QMutexLocker lock(&mutex);
151 return device;
152}
153
155
156// qt helpers
157
158// glib / gstreamer object
163
165{
166 void getProperty(guint propId, GValue *value, const GParamSpec *pspec) const;
167 void setProperty(guint propId, const GValue *value, const GParamSpec *pspec);
168 auto lockObject() const;
169
170 bool start();
171 bool stop();
172
175 GstFlowReturn fill(guint64 offset, guint length, GstBuffer *buf);
176 void getURI(GValue *value) const;
177 bool setURI(const char *location, GError **err = nullptr);
178
181};
182
183void QGstQIODeviceSrc::getProperty(guint propId, GValue *value, const GParamSpec *pspec) const
184{
185 switch (propId) {
186 case PROP_URI:
187 return getURI(value);
188
189 default:
190 G_OBJECT_WARN_INVALID_PROPERTY_ID(this, propId, pspec);
191 break;
192 }
193}
194
195void QGstQIODeviceSrc::setProperty(guint propId, const GValue *value, const GParamSpec *pspec)
196{
197 switch (propId) {
198 case PROP_URI:
199 setURI(g_value_get_string(value));
200 break;
201 default:
202 G_OBJECT_WARN_INVALID_PROPERTY_ID(this, propId, pspec);
203 break;
204 }
205}
206
208{
209 GST_OBJECT_LOCK(this);
210 return qScopeGuard([this] {
211 GST_OBJECT_UNLOCK(this);
212 });
213}
214
216{
217 auto lock = lockObject();
218
219 return record && record->isValid();
220}
221
223{
224 return true;
225}
226
228{
229 auto lock = lockObject();
230 return record->runWhileLocked([&](QIODevice *device) {
231 return !device->isSequential();
232 });
233}
234
236{
237 auto lock = lockObject();
238 if (!record)
239 return std::nullopt;
240
241 qint64 size = record->runWhileLocked([&](QIODevice *device) {
242 return device->size();
243 });
244
245 if (size == -1)
246 return std::nullopt;
247 return size;
248}
249
250GstFlowReturn QGstQIODeviceSrc::fill(guint64 offset, guint length, GstBuffer *buf)
251{
252 auto lock = lockObject();
253
254 if (!record)
255 return GST_FLOW_ERROR;
256
257 GstMapInfo info;
258 if (!gst_buffer_map(buf, &info, GST_MAP_WRITE)) {
259 GST_ELEMENT_ERROR(this, RESOURCE, WRITE, (nullptr), ("Can't map buffer for writing"));
260 return GST_FLOW_ERROR;
261 }
262
263 int64_t totalRead = 0;
264 GstFlowReturn ret = record->runWhileLocked([&](QIODevice *device) -> GstFlowReturn {
265 if (device->pos() != qint64(offset)) {
266 bool success = device->seek(offset);
267 if (!success) {
268 qWarning() << "seek on iodevice failed";
269 return GST_FLOW_ERROR;
270 }
271 }
272
273 int64_t remain = length;
274 while (remain) {
275 int64_t bytesRead =
276 device->read(reinterpret_cast<char *>(info.data + totalRead), remain);
277 if (bytesRead == -1) {
278 if (device->atEnd()) {
279 return GST_FLOW_EOS;
280 }
281 GST_ELEMENT_ERROR(this, RESOURCE, READ, (nullptr), GST_ERROR_SYSTEM);
282 return GST_FLOW_ERROR;
283 }
284
285 remain -= bytesRead;
286 totalRead += bytesRead;
287 }
288
289 return GST_FLOW_OK;
290 });
291
292 if (ret != GST_FLOW_OK) {
293 gst_buffer_unmap(buf, &info);
294 gst_buffer_resize(buf, 0, 0);
295 return ret;
296 }
297
298 gst_buffer_unmap(buf, &info);
299 if (totalRead != length)
300 gst_buffer_resize(buf, 0, totalRead);
301
302 GST_BUFFER_OFFSET(buf) = offset;
303 GST_BUFFER_OFFSET_END(buf) = offset + totalRead;
304
305 return GST_FLOW_OK;
306}
307
308void QGstQIODeviceSrc::getURI(GValue *value) const
309{
310 auto lock = lockObject();
311 if (record)
312 g_value_set_string(value, record->id);
313 else
314 g_value_set_string(value, nullptr);
315}
316
317bool QGstQIODeviceSrc::setURI(const char *location, GError **err)
318{
319 Q_ASSERT(QLatin1StringView(location).startsWith("qiodevice:/"_L1));
320
321 {
322 auto lock = lockObject();
323 GstState state = GST_STATE(this);
324 if (state != GST_STATE_READY && state != GST_STATE_NULL) {
325 g_warning(
326 "Changing the `uri' property on qiodevicesrc when the resource is open is not "
327 "supported.");
328 if (err)
329 g_set_error(err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
330 "Changing the `uri' property on qiodevicesrc when the resource is open "
331 "is not supported.");
332 return false;
333 }
334 }
335
336 auto newRecord = gQIODeviceRegistry->findRecord(QByteArrayView{ location });
337
338 {
339 auto lock = lockObject();
340 record = std::move(newRecord);
341 }
342
343 g_object_notify(G_OBJECT(this), "uri");
344
345 return true;
346}
347
352
353// GObject
356
358
359template <typename T>
361{
362 return (G_TYPE_CHECK_INSTANCE_CAST((obj), gst_qiodevice_src_get_type(), QGstQIODeviceSrc));
363}
364
365// URI handler
367
368#define gst_qiodevice_src_parent_class parent_class
370 G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, qGstInitQIODeviceURIHandler));
371
372// implementations
373
375{
376 using SharedRecord = QIODeviceRegistry::SharedRecord;
377
378 new (reinterpret_cast<void *>(&self->record)) SharedRecord;
379
380 static constexpr guint defaultBlockSize = 16384;
381 gst_base_src_set_blocksize(&self->baseSrc, defaultBlockSize);
382}
383
385{
386 // GObject
387 GObjectClass *gobjectClass = G_OBJECT_CLASS(klass);
388 gobjectClass->set_property = [](GObject *instance, guint propId, const GValue *value,
389 GParamSpec *pspec) {
390 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(instance);
391 return src->setProperty(propId, value, pspec);
392 };
393
394 gobjectClass->get_property = [](GObject *instance, guint propId, GValue *value,
395 GParamSpec *pspec) {
396 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(instance);
397 return src->getProperty(propId, value, pspec);
398 };
399
400 g_object_class_install_property(
401 gobjectClass, PROP_URI,
402 g_param_spec_string("uri", "QRC Location", "Path of the qrc to read", nullptr,
403 GParamFlags(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
404 | GST_PARAM_MUTABLE_READY)));
405
406 gobjectClass->finalize = [](GObject *instance) {
407 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(instance);
408 using SharedRecord = QIODeviceRegistry::SharedRecord;
409 src->record.~SharedRecord();
410 G_OBJECT_CLASS(parent_class)->finalize(instance);
411 };
412
413 // GstElement
414 GstElementClass *gstelementClass = GST_ELEMENT_CLASS(klass);
415 gst_element_class_set_static_metadata(gstelementClass, "QRC Source", "Source/QRC",
416 "Read from arbitrary point in QRC resource",
417 "Tim Blechmann <tim.blechmann@qt.io>");
418
419 static GstStaticPadTemplate srctemplate =
420 GST_STATIC_PAD_TEMPLATE("src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
421
422 gst_element_class_add_static_pad_template(gstelementClass, &srctemplate);
423
424 // GstBaseSrc
425 GstBaseSrcClass *gstbasesrcClass = GST_BASE_SRC_CLASS(klass);
426 gstbasesrcClass->start = [](GstBaseSrc *basesrc) -> gboolean {
427 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(basesrc);
428 return src->start();
429 };
430 gstbasesrcClass->stop = [](GstBaseSrc *basesrc) -> gboolean {
431 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(basesrc);
432 return src->stop();
433 };
434
435 gstbasesrcClass->is_seekable = [](GstBaseSrc *basesrc) -> gboolean {
436 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(basesrc);
437 return src->isSeekable();
438 };
439 gstbasesrcClass->get_size = [](GstBaseSrc *basesrc, guint64 *size) -> gboolean {
440 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(basesrc);
441 auto optionalSize = src->size();
442 if (!optionalSize)
443 return false;
444 *size = optionalSize.value();
445 return true;
446 };
447 gstbasesrcClass->fill = [](GstBaseSrc *basesrc, guint64 offset, guint length,
448 GstBuffer *buf) -> GstFlowReturn {
449 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(basesrc);
450 return src->fill(offset, length, buf);
451 };
452}
453
454void qGstInitQIODeviceURIHandler(gpointer g_handlerInterface, gpointer)
455{
456 GstURIHandlerInterface *iface = (GstURIHandlerInterface *)g_handlerInterface;
457
458 iface->get_type = [](GType) {
459 return GST_URI_SRC;
460 };
461 iface->get_protocols = [](GType) {
462 static constexpr const gchar *protocols[] = {
463 "qiodevice",
464 nullptr,
465 };
466 return protocols;
467 };
468 iface->get_uri = [](GstURIHandler *handler) -> gchar * {
469 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(handler);
470 auto lock = src->lockObject();
471 if (src->record)
472 return g_strdup(src->record->id.constData());
473 return nullptr;
474 };
475 iface->set_uri = [](GstURIHandler *handler, const gchar *uri, GError **err) -> gboolean {
476 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(handler);
477 return src->setURI(uri, err);
478 };
479}
480
481} // namespace
482
483// plugin registration
484
485void qGstRegisterQIODeviceHandler(GstPlugin *plugin)
486{
487 gst_element_register(plugin, "qiodevicesrc", GST_RANK_PRIMARY, gst_qiodevice_src_get_type());
488}
489
490QUrl qGstRegisterQIODevice(QIODevice *device)
491{
492 return QUrl{
493 QString::fromLatin1(gQIODeviceRegistry->registerQIODevice(device)),
494 };
495}
496
497QT_END_NAMESPACE
Q_GLOBAL_STATIC(QIODeviceRegistry, gQIODeviceRegistry)
static void gst_qiodevice_src_init(QGstQIODeviceSrc *self)
QGstQIODeviceSrc * asQGstQIODeviceSrc(T *obj)
void qGstInitQIODeviceURIHandler(gpointer, gpointer)
static void gst_qiodevice_src_class_init(QGstQIODeviceSrcClass *klass)
GType gst_qiodevice_src_get_type()
G_DEFINE_TYPE_WITH_CODE(QGstQIODeviceSrc, gst_qiodevice_src, GST_TYPE_BASE_SRC, G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, qGstInitQIODeviceURIHandler))
QUrl qGstRegisterQIODevice(QIODevice *)
void qGstRegisterQIODeviceHandler(GstPlugin *plugin)
GstFlowReturn fill(guint64 offset, guint length, GstBuffer *buf)
bool setURI(const char *location, GError **err=nullptr)
void getProperty(guint propId, GValue *value, const GParamSpec *pspec) const
void setProperty(guint propId, const GValue *value, const GParamSpec *pspec)