5#include <qloggingcategory.h>
12using namespace Qt::StringLiterals;
17static QByteArray zstdCompress(ZSTD_CCtx *&context,
const QByteArray &data,
int compression)
19 if (context ==
nullptr)
20 context = ZSTD_createCCtx();
21 qsizetype size = data.size();
22 size = ZSTD_COMPRESSBOUND(size);
23 QByteArray compressed(size, Qt::Uninitialized);
24 char *dst = compressed.data();
25 size_t n = ZSTD_compressCCtx(context, dst, size,
26 data.constData(), data.size(),
28 if (ZSTD_isError(n)) {
29 qCWarning(lcCtfInfoTrace) <<
"Compression with zstd failed: " << QString::fromUtf8(ZSTD_getErrorName(n));
32 compressed.truncate(n);
40 m_keySet <<
"cliendId"_L1
43 <<
"sessionTracepoints"_L1
46 <<
"compressionScheme"_L1;
52 ZSTD_freeCCtx(m_zstdCCtx);
73 return m_req.sessionName;
78 return m_req.sessionTracepoints;
83 return m_bufferOnIdle;
91void QCtfServer::setStatusAndNotify(ServerStatus status)
94 m_cb->handleStatusChange(status);
99 m_writtenSize += size;
100 if (m_writtenSize >= m_waitWriteSize && m_eventLoop)
114 return m_socket->state() == QTcpSocket::ConnectedState;
117void QCtfServer::handleString(QCborStreamReader &cbor)
119 const auto readString = [](QCborStreamReader &cbor) -> QString {
121 auto r = cbor.readString();
122 while (r.status == QCborStreamReader::Ok) {
124 r = cbor.readString();
127 if (r.status == QCborStreamReader::Error) {
134 if (m_currentKey.isEmpty()) {
135 m_currentKey = readString(cbor);
137 switch (m_keySet.indexOf(m_currentKey)) {
138 case RequestSessionName:
139 m_req.sessionName = readString(cbor);
141 case RequestSessionTracepoints:
142 m_req.sessionTracepoints = readString(cbor);
144 case RequestCompressionScheme:
145 m_requestedCompressionScheme = readString(cbor);
151 m_currentKey.clear();
153 if (cbor.lastError() == QCborError::EndOfFile) {
158 }
while (cbor.lastError() == QCborError::EndOfFile);
161void QCtfServer::handleFixedWidth(QCborStreamReader &cbor)
163 switch (m_keySet.indexOf(m_currentKey)) {
164 case RequestClientId:
165 if (!cbor.isUnsignedInteger())
167 m_req.clientId = cbor.toUnsignedInteger();
169 case RequestClientVersion:
170 if (!cbor.isUnsignedInteger())
172 m_req.clientVersion = cbor.toUnsignedInteger();
175 if (!cbor.isUnsignedInteger())
177 m_req.flags = cbor.toUnsignedInteger();
179 case RequestBufferSize:
180 if (!cbor.isUnsignedInteger())
182 m_req.bufferSize = cbor.toUnsignedInteger();
188 m_currentKey.clear();
191void QCtfServer::readCbor(QCborStreamReader &cbor)
193 switch (cbor.type()) {
194 case QCborStreamReader::UnsignedInteger:
195 case QCborStreamReader::NegativeInteger:
196 case QCborStreamReader::SimpleType:
197 case QCborStreamReader::Float16:
198 case QCborStreamReader::Float:
199 case QCborStreamReader::Double:
200 handleFixedWidth(cbor);
203 case QCborStreamReader::ByteArray:
204 case QCborStreamReader::String:
207 case QCborStreamReader::Array:
208 case QCborStreamReader::Map:
209 cbor.enterContainer();
210 while (cbor.lastError() == QCborError::NoError && cbor.hasNext())
212 if (cbor.lastError() == QCborError::NoError)
213 cbor.leaveContainer();
222 cbor.append(
"magic"_L1);
223 cbor.append(packet.PacketMagicNumber);
224 cbor.append(
"name"_L1);
225 cbor.append(QString::fromUtf8(packet.stream_name));
226 cbor.append(
"flags"_L1);
227 cbor.append(packet.flags);
229 cbor.append(
"data"_L1);
230 if (m_compression > 0) {
233 if (m_requestedCompressionScheme == QStringLiteral(
"zstd"))
234 compressed = zstdCompress(m_zstdCCtx, packet.stream_data, m_compression);
237 compressed = qCompress(packet.stream_data, m_compression);
239 cbor.append(compressed);
241 cbor.append(packet.stream_data);
247bool QCtfServer::recognizedCompressionScheme()
const
249 if (m_requestedCompressionScheme.isEmpty())
252 if (m_requestedCompressionScheme == QStringLiteral(
"zstd"))
255 if (m_requestedCompressionScheme == QStringLiteral(
"zlib"))
262 m_server =
new QTcpServer();
264 if (m_address.isEmpty())
265 addr = QHostAddress(QHostAddress::Any);
267 addr = QHostAddress(m_address);
269 qCInfo(lcCtfInfoTrace) <<
"Starting CTF server: " << m_address <<
", port: " << m_port;
271 while (m_stopping == 0) {
272 if (!m_server->isListening()) {
273 if (!m_server->listen(addr, m_port)) {
274 qCInfo(lcCtfInfoTrace) <<
"Unable to start server";
276 setStatusAndNotify(Error);
279 setStatusAndNotify(Idle);
280 if (m_server->waitForNewConnection(-1)) {
281 qCInfo(lcCtfInfoTrace) <<
"client connection";
282 m_eventLoop =
new QEventLoop();
283 m_socket = m_server->nextPendingConnection();
285 QObject::connect(m_socket, &QTcpSocket::readyRead, [&](){
286 if (m_eventLoop) m_eventLoop->exit();
288 QObject::connect(m_socket, &QTcpSocket::bytesWritten,
this, &QCtfServer::bytesWritten);
289 QObject::connect(m_socket, &QTcpSocket::disconnected, [&](){
290 if (m_eventLoop) m_eventLoop->exit();
294 setStatusAndNotify(Connected);
301 while (cbor.hasNext() && cbor.lastError() == QCborError::NoError)
304 if (!m_req.isValid()) {
305 qCInfo(lcCtfInfoTrace) <<
"Invalid trace request.";
308 m_compression = m_req.flags & CompressionMask;
310 m_compression = qMin(m_compression, ZSTD_maxCLevel());
312 m_compression = qMin(m_compression, 9);
314 m_bufferOnIdle = !(m_req.flags & DontBufferOnIdle);
316 m_maxPackets = qMax(m_req.bufferSize / TracePacket::PacketSize, 16u);
318 if (!recognizedCompressionScheme()) {
319 qCWarning(lcCtfInfoTrace) <<
"Client requested unrecognized compression scheme: " << m_requestedCompressionScheme;
320 m_requestedCompressionScheme.clear();
324 qCInfo(lcCtfInfoTrace) <<
"request received: " << m_req.sessionName <<
", " << m_req.sessionTracepoints;
329 resp.serverId = ServerId;
330 resp.serverVersion = 1;
331 resp.serverName = QStringLiteral(
"Ctf Server");
334 cbor.startMap(m_compression ? 4 : 3);
335 cbor.append(
"serverId"_L1);
336 cbor.append(resp.serverId);
337 cbor.append(
"serverVersion"_L1);
338 cbor.append(resp.serverVersion);
339 cbor.append(
"serverName"_L1);
340 cbor.append(resp.serverName);
342 cbor.append(
"compressionScheme"_L1);
343 cbor.append(m_requestedCompressionScheme);
348 qCInfo(lcCtfInfoTrace) <<
"response sent, sending data";
350 while (m_socket->state() == QTcpSocket::ConnectedState) {
354 while (m_packets.size() == 0)
355 m_bufferHasData.wait(&m_mutex);
356 packets = std::exchange(m_packets, {});
361 for (TracePacket &packet : packets) {
362 writePacket(packet, cbor);
367 qCInfo(lcCtfInfoTrace) << packets.size() <<
" packets written";
371 qCInfo(lcCtfInfoTrace) <<
"client connection closed";
375 m_eventLoop =
nullptr;
377 qCInfo(lcCtfInfoTrace) <<
"error: " << m_server->errorString();
379 setStatusAndNotify(Error);
390 this->m_stopping = 1;
398 packet.stream_name = stream.toUtf8();
399 packet.stream_data = data;
400 packet.flags = flags;
401 m_packets.append(packet);
402 if (m_packets.size() > m_maxPackets)
403 m_packets.pop_front();
404 m_bufferHasData.wakeOne();
\inmodule QtCore\reentrant
\inmodule QtCore\reentrant
void setHost(const QString &address)
QString sessionTracepoints() const
QCtfServer(QObject *parent=nullptr)
bool bufferOnIdle() const
void setCallback(ServerCallback *cb)
void bufferData(const QString &stream, const QByteArray &data, quint32 flags)
ServerStatus status() const
QString sessionName() const
#define Q_LOGGING_CATEGORY(name,...)
#define qCInfo(category,...)
#define qCWarning(category,...)
virtual void handleSessionChange()=0