Qt
Internal/Contributor docs for the Qt SDK. Note: These are NOT official API docs; those are found at https://doc.qt.io/
Loading...
Searching...
No Matches
qctfserver.cpp
Go to the documentation of this file.
1// Copyright (C) 2023 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3// Qt-Security score:critical reason:network-protocol
4
5#include <qloggingcategory.h>
6#include "qctfserver_p.h"
7
8#if QT_CONFIG(zstd)
9#include <zstd.h>
10#endif
11
12using namespace Qt::StringLiterals;
13
14Q_LOGGING_CATEGORY(lcCtfInfoTrace, "qt.core.ctfserver", QtWarningMsg)
15
16#if QT_CONFIG(zstd)
17static QByteArray zstdCompress(ZSTD_CCtx *&context, const QByteArray &data, int compression)
18{
19 if (context == nullptr)
20 context = ZSTD_createCCtx();
21 qsizetype size = data.size();
22 size = ZSTD_COMPRESSBOUND(size);
23 QByteArray compressed(size, Qt::Uninitialized);
24 char *dst = compressed.data();
25 size_t n = ZSTD_compressCCtx(context, dst, size,
26 data.constData(), data.size(),
27 compression);
28 if (ZSTD_isError(n)) {
29 qCWarning(lcCtfInfoTrace) << "Compression with zstd failed: " << QString::fromUtf8(ZSTD_getErrorName(n));
30 return {};
31 }
32 compressed.truncate(n);
33 return compressed;
34}
35#endif
36
37QCtfServer::QCtfServer(QObject *parent)
39{
40 m_keySet << "cliendId"_L1
41 << "clientVersion"_L1
42 << "sessionName"_L1
43 << "sessionTracepoints"_L1
44 << "flags"_L1
45 << "bufferSize"_L1
46 << "compressionScheme"_L1;
47}
48
50{
51#if QT_CONFIG(zstd)
52 ZSTD_freeCCtx(m_zstdCCtx);
53#endif
54}
55
56void QCtfServer::setHost(const QString &address)
57{
58 m_address = address;
59}
60
61void QCtfServer::setPort(int port)
62{
63 m_port = port;
64}
65
67{
68 m_cb = cb;
69}
70
71QString QCtfServer::sessionName() const
72{
73 return m_req.sessionName;
74}
75
77{
78 return m_req.sessionTracepoints;
79}
80
82{
83 return m_bufferOnIdle;
84}
85
87{
88 return m_status;
89}
90
91void QCtfServer::setStatusAndNotify(ServerStatus status)
92{
93 m_status = status;
94 m_cb->handleStatusChange(status);
95}
96
97void QCtfServer::bytesWritten(qint64 size)
98{
99 m_writtenSize += size;
100 if (m_writtenSize >= m_waitWriteSize && m_eventLoop)
101 m_eventLoop->exit();
102}
103
104void QCtfServer::initWrite()
105{
106 m_waitWriteSize = 0;
107 m_writtenSize = 0;
108}
109
110bool QCtfServer::waitSocket()
111{
112 if (m_eventLoop)
113 m_eventLoop->exec();
114 return m_socket->state() == QTcpSocket::ConnectedState;
115}
116
117void QCtfServer::handleString(QCborStreamReader &cbor)
118{
119 const auto readString = [](QCborStreamReader &cbor) -> QString {
120 QString result;
121 auto r = cbor.readString();
122 while (r.status == QCborStreamReader::Ok) {
123 result += r.data;
124 r = cbor.readString();
125 }
126
127 if (r.status == QCborStreamReader::Error) {
128 // handle error condition
129 result.clear();
130 }
131 return result;
132 };
133 do {
134 if (m_currentKey.isEmpty()) {
135 m_currentKey = readString(cbor);
136 } else {
137 switch (m_keySet.indexOf(m_currentKey)) {
138 case RequestSessionName:
139 m_req.sessionName = readString(cbor);
140 break;
141 case RequestSessionTracepoints:
142 m_req.sessionTracepoints = readString(cbor);
143 break;
144 case RequestCompressionScheme:
145 m_requestedCompressionScheme = readString(cbor);
146 break;
147 default:
148 // handle error
149 break;
150 }
151 m_currentKey.clear();
152 }
153 if (cbor.lastError() == QCborError::EndOfFile) {
154 if (!waitSocket())
155 return;
156 cbor.reparse();
157 }
158 } while (cbor.lastError() == QCborError::EndOfFile);
159}
160
161void QCtfServer::handleFixedWidth(QCborStreamReader &cbor)
162{
163 switch (m_keySet.indexOf(m_currentKey)) {
164 case RequestClientId:
165 if (!cbor.isUnsignedInteger())
166 return;
167 m_req.clientId = cbor.toUnsignedInteger();
168 break;
169 case RequestClientVersion:
170 if (!cbor.isUnsignedInteger())
171 return;
172 m_req.clientVersion = cbor.toUnsignedInteger();
173 break;
174 case RequestFlags:
175 if (!cbor.isUnsignedInteger())
176 return;
177 m_req.flags = cbor.toUnsignedInteger();
178 break;
179 case RequestBufferSize:
180 if (!cbor.isUnsignedInteger())
181 return;
182 m_req.bufferSize = cbor.toUnsignedInteger();
183 break;
184 default:
185 // handle error
186 break;
187 }
188 m_currentKey.clear();
189}
190
191void QCtfServer::readCbor(QCborStreamReader &cbor)
192{
193 switch (cbor.type()) {
194 case QCborStreamReader::UnsignedInteger:
195 case QCborStreamReader::NegativeInteger:
196 case QCborStreamReader::SimpleType:
197 case QCborStreamReader::Float16:
198 case QCborStreamReader::Float:
199 case QCborStreamReader::Double:
200 handleFixedWidth(cbor);
201 cbor.next();
202 break;
203 case QCborStreamReader::ByteArray:
204 case QCborStreamReader::String:
205 handleString(cbor);
206 break;
207 case QCborStreamReader::Array:
208 case QCborStreamReader::Map:
209 cbor.enterContainer();
210 while (cbor.lastError() == QCborError::NoError && cbor.hasNext())
211 readCbor(cbor);
212 if (cbor.lastError() == QCborError::NoError)
213 cbor.leaveContainer();
214 default:
215 break;
216 }
217}
218
219void QCtfServer::writePacket(TracePacket &packet, QCborStreamWriter &cbor)
220{
221 cbor.startMap(4);
222 cbor.append("magic"_L1);
223 cbor.append(packet.PacketMagicNumber);
224 cbor.append("name"_L1);
225 cbor.append(QString::fromUtf8(packet.stream_name));
226 cbor.append("flags"_L1);
227 cbor.append(packet.flags);
228
229 cbor.append("data"_L1);
230 if (m_compression > 0) {
231 QByteArray compressed;
232#if QT_CONFIG(zstd)
233 if (m_requestedCompressionScheme == QStringLiteral("zstd"))
234 compressed = zstdCompress(m_zstdCCtx, packet.stream_data, m_compression);
235 else
236#endif
237 compressed = qCompress(packet.stream_data, m_compression);
238
239 cbor.append(compressed);
240 } else {
241 cbor.append(packet.stream_data);
242 }
243
244 cbor.endMap();
245}
246
247bool QCtfServer::recognizedCompressionScheme() const
248{
249 if (m_requestedCompressionScheme.isEmpty())
250 return true;
251#if QT_CONFIG(zstd)
252 if (m_requestedCompressionScheme == QStringLiteral("zstd"))
253 return true;
254#endif
255 if (m_requestedCompressionScheme == QStringLiteral("zlib"))
256 return true;
257 return false;
258}
259
261{
262 m_server = new QTcpServer();
263 QHostAddress addr;
264 if (m_address.isEmpty())
265 addr = QHostAddress(QHostAddress::Any);
266 else
267 addr = QHostAddress(m_address);
268
269 qCInfo(lcCtfInfoTrace) << "Starting CTF server: " << m_address << ", port: " << m_port;
270
271 while (m_stopping == 0) {
272 if (!m_server->isListening()) {
273 if (!m_server->listen(addr, m_port)) {
274 qCInfo(lcCtfInfoTrace) << "Unable to start server";
275 m_stopping = 1;
276 setStatusAndNotify(Error);
277 }
278 }
279 setStatusAndNotify(Idle);
280 if (m_server->waitForNewConnection(-1)) {
281 qCInfo(lcCtfInfoTrace) << "client connection";
282 m_eventLoop = new QEventLoop();
283 m_socket = m_server->nextPendingConnection();
284
285 QObject::connect(m_socket, &QTcpSocket::readyRead, [&](){
286 if (m_eventLoop) m_eventLoop->exit();
287 });
288 QObject::connect(m_socket, &QTcpSocket::bytesWritten, this, &QCtfServer::bytesWritten);
289 QObject::connect(m_socket, &QTcpSocket::disconnected, [&](){
290 if (m_eventLoop) m_eventLoop->exit();
291 });
292
293 m_server->close(); // Do not wait for more connections
294 setStatusAndNotify(Connected);
295
296 if (waitSocket())
297 {
298 QCborStreamReader cbor(m_socket);
299
300 m_req = {};
301 while (cbor.hasNext() && cbor.lastError() == QCborError::NoError)
302 readCbor(cbor);
303
304 if (!m_req.isValid()) {
305 qCInfo(lcCtfInfoTrace) << "Invalid trace request.";
306 m_socket->close();
307 } else {
308 m_compression = m_req.flags & CompressionMask;
309#if QT_CONFIG(zstd)
310 m_compression = qMin(m_compression, ZSTD_maxCLevel());
311#else
312 m_compression = qMin(m_compression, 9);
313#endif
314 m_bufferOnIdle = !(m_req.flags & DontBufferOnIdle);
315
316 m_maxPackets = qMax(m_req.bufferSize / TracePacket::PacketSize, 16u);
317
318 if (!recognizedCompressionScheme()) {
319 qCWarning(lcCtfInfoTrace) << "Client requested unrecognized compression scheme: " << m_requestedCompressionScheme;
320 m_requestedCompressionScheme.clear();
321 m_compression = 0;
322 }
323
324 qCInfo(lcCtfInfoTrace) << "request received: " << m_req.sessionName << ", " << m_req.sessionTracepoints;
325
327 {
328 TraceResponse resp;
329 resp.serverId = ServerId;
330 resp.serverVersion = 1;
331 resp.serverName = QStringLiteral("Ctf Server");
332
333 QCborStreamWriter cbor(m_socket);
334 cbor.startMap(m_compression ? 4 : 3);
335 cbor.append("serverId"_L1);
336 cbor.append(resp.serverId);
337 cbor.append("serverVersion"_L1);
338 cbor.append(resp.serverVersion);
339 cbor.append("serverName"_L1);
340 cbor.append(resp.serverName);
341 if (m_compression) {
342 cbor.append("compressionScheme"_L1);
343 cbor.append(m_requestedCompressionScheme);
344 }
345 cbor.endMap();
346 }
347
348 qCInfo(lcCtfInfoTrace) << "response sent, sending data";
349 if (waitSocket()) {
350 while (m_socket->state() == QTcpSocket::ConnectedState) {
351 QList<TracePacket> packets;
352 {
353 QMutexLocker lock(&m_mutex);
354 while (m_packets.size() == 0)
355 m_bufferHasData.wait(&m_mutex);
356 packets = std::exchange(m_packets, {});
357 }
358
359 {
360 QCborStreamWriter cbor(m_socket);
361 for (TracePacket &packet : packets) {
362 writePacket(packet, cbor);
363 if (!waitSocket())
364 break;
365 }
366 }
367 qCInfo(lcCtfInfoTrace) << packets.size() << " packets written";
368 }
369 }
370
371 qCInfo(lcCtfInfoTrace) << "client connection closed";
372 }
373 }
374 delete m_eventLoop;
375 m_eventLoop = nullptr;
376 } else {
377 qCInfo(lcCtfInfoTrace) << "error: " << m_server->errorString();
378 m_stopping = 1;
379 setStatusAndNotify(Error);
380 }
381 }
382}
383
385{
386 start();
387}
389{
390 this->m_stopping = 1;
391 wait();
392}
393
394void QCtfServer::bufferData(const QString &stream, const QByteArray &data, quint32 flags)
395{
396 QMutexLocker lock(&m_mutex);
397 TracePacket packet;
398 packet.stream_name = stream.toUtf8();
399 packet.stream_data = data;
400 packet.flags = flags;
401 m_packets.append(packet);
402 if (m_packets.size() > m_maxPackets)
403 m_packets.pop_front();
404 m_bufferHasData.wakeOne();
405}
\inmodule QtCore\reentrant
\inmodule QtCore\reentrant
void setHost(const QString &address)
QString sessionTracepoints() const
QCtfServer(QObject *parent=nullptr)
bool bufferOnIdle() const
void setCallback(ServerCallback *cb)
void startServer()
void stopServer()
void bufferData(const QString &stream, const QByteArray &data, quint32 flags)
void setPort(int port)
void run() override
ServerStatus status() const
QString sessionName() const
Definition qlist.h:80
\inmodule QtCore
Definition qmutex.h:346
#define Q_LOGGING_CATEGORY(name,...)
#define qCInfo(category,...)
#define qCWarning(category,...)
virtual void handleSessionChange()=0