c++ - ¿Hay una tubería local dentro del proceso en Qt?

La siguiente es una implementación usable y rudimentaria. Utiliza un par interno de señal-ranura para enviar los datos al otro punto final. De esa manera, cualquier extremo de la conexión puede vivir en cualquier hilo, y los extremos se pueden mover entre hilos sin perder datos o inducir ninguna carrera.

El QRingBuffer privado se utiliza en lugar de reinventar la rueda. Agregue QT += core-private al archivo .pro para que esté disponible. Qt PIMPL se utiliza para obtener acceso a los búferes internos del dispositivo en Qt versiones 5.7 y superiores.

Si desea crear una instancia de una tubería abierta, como suele ser el caso, puede pasar el modo de E / S al constructor. Uso tipico:

int main(/*…*/) { /*…*/ AppPipe end1 { QIODevice::ReadWrite }; AppPipe end2 { &end1, QIODevice::ReadWrite }; AppPipe end3 { &end1, QIODevice::ReadOnly }; // the pipes are open ready to use /*…*/ }

Cualquier cosa que escriba en una tubería, termina como datos legibles en la otra, tuberías conectadas y viceversa. En el ejemplo anterior, los datos escritos en end1 se pueden leer desde end2 y end3 independiente. Los datos escritos en end2 son legibles desde end1 . end3 es efectivamente una tubería de solo escucha. La conexión de tuberías adicionales es barata: no hay un costo O (N) asociado con el envío de grandes cantidades de datos a varias tuberías, porque leer QRingBuffer s de las tuberías conectadas almacena copias poco profundas de conjuntos de bytes completos enviados desde la tubería de origen.

QIODevice mantiene QIODevice semántica de QIODevice : puede conectarse a la señal readyRead , usar la tubería con un QDataStream o un QTextStream , etc. Al igual que con cualquier QIODevice , solo puede usar la clase desde su thread() , pero el otro punto final puede vivir en cualquier subproceso, y ambos se pueden mover entre subprocesos como se desee, sin pérdida de datos.

Si el otro extremo de la tubería no está abierto y es legible, las escrituras no son operativas, aunque tengan éxito. Al cerrar la tubería, se borran las memorias intermedias de lectura y escritura, de modo que se puede volver a abrir para su reutilización.

Los almacenamientos intermedios de tuberías escriben datos de forma predeterminada, y el almacenamiento intermedio de escritura se puede vaciar por la fuerza usando AppPipe::flush() , a menos que se abra en modo QIODevice::Unbuffered .

Las señales hasIncoming y hasOutgoing son útiles para monitoring los datos que pasan por la tubería.

// https://github.com/KubaO/stackoverflown/tree/master/questions/local-pipe-32317081 // This project is compatible with Qt 4 and Qt 5 #include <QtTest> #include <private/qiodevice_p.h> #include <private/qringbuffer_p.h> #include <algorithm> #include <climits> #ifndef Q_DECL_OVERRIDE #define Q_DECL_OVERRIDE #endif class AppPipePrivate : public QIODevicePrivate { public: #if QT_VERSION < QT_VERSION_CHECK(5,7,0) QRingBuffer buffer; QRingBuffer writeBuffer; int writeBufferChunkSize; #endif const QByteArray *writeData; AppPipePrivate() : writeData(0) { writeBufferChunkSize = 4096; } }; /// A simple point-to-point intra-process pipe. The other endpoint can live in any /// thread. class AppPipe : public QIODevice { Q_OBJECT Q_DECLARE_PRIVATE(AppPipe) static inline int intLen(qint64 len) { return std::min(len, qint64(INT_MAX)); } Q_SLOT void _a_write(const QByteArray &data) { Q_D(AppPipe); if (!(d->openMode & QIODevice::ReadOnly)) return; // We must be readable. d->buffer.append(data); // This is a chunk shipped from the source. emit hasIncoming(data); emit readyRead(); } void hasOutgoingLong(const char *data, qint64 len) { while (len) { int const size = intLen(len); emit hasOutgoing(QByteArray(data, size)); data += size; len -= size; } } public: AppPipe(QIODevice::OpenMode mode, QObject *parent = 0) : QIODevice(*new AppPipePrivate, parent) { open(mode); } AppPipe(AppPipe *other, QIODevice::OpenMode mode, QObject *parent = 0) : QIODevice(*new AppPipePrivate, parent) { open(mode); addOther(other); } AppPipe(AppPipe *other, QObject *parent = 0) : QIODevice(*new AppPipePrivate, parent) { addOther(other); } ~AppPipe() Q_DECL_OVERRIDE {} void addOther(AppPipe *other) { if (other) { connect(this, SIGNAL(hasOutgoing(QByteArray)), other, SLOT(_a_write(QByteArray)), Qt::UniqueConnection); connect(other, SIGNAL(hasOutgoing(QByteArray)), this, SLOT(_a_write(QByteArray)), Qt::UniqueConnection); } } void removeOther(AppPipe *other) { disconnect(this, SIGNAL(hasOutgoing(QByteArray)), other, SLOT(_a_write(QByteArray))); disconnect(other, SIGNAL(hasOutgoing(QByteArray)), this, SLOT(_a_write(QByteArray))); } void flush() { Q_D(AppPipe); while (!d->writeBuffer.isEmpty()) { QByteArray const data = d->writeBuffer.read(); emit hasOutgoing(data); emit bytesWritten(data.size()); } } void close() Q_DECL_OVERRIDE { Q_D(AppPipe); flush(); QIODevice::close(); d->buffer.clear(); } qint64 write(const QByteArray &data) { // This is an optional optimization. The base method works OK. Q_D(AppPipe); QScopedValueRollback<const QByteArray*> back(d->writeData); if (!(d->openMode & Text)) d->writeData = &data; return QIODevice::write(data); } qint64 writeData(const char *data, qint64 len) Q_DECL_OVERRIDE { Q_D(AppPipe); bool buffered = !(d->openMode & Unbuffered); if (buffered && (d->writeBuffer.size() + len) > d->writeBufferChunkSize) flush(); if (!buffered || len > d->writeBufferChunkSize || (len == d->writeBufferChunkSize && d->writeBuffer.isEmpty())) { if (d->writeData && d->writeData->data() == data && d->writeData->size() == len) emit hasOutgoing(*d->writeData); else hasOutgoingLong(data, len); } else memcpy(d->writeBuffer.reserve(len), data, len); return len; } bool isSequential() const Q_DECL_OVERRIDE { return true; } Q_SIGNAL void hasOutgoing(const QByteArray &); Q_SIGNAL void hasIncoming(const QByteArray &); #if QT_VERSION >= QT_VERSION_CHECK(5,7,0) // all the data is in the read buffer already qint64 readData(char *, qint64) Q_DECL_OVERRIDE { return 0; } #else qint64 readData(char *data, qint64 len) Q_DECL_OVERRIDE { Q_D(AppPipe); qint64 hadRead = 0; while (len && !d->buffer.isEmpty()) { int size = d->buffer.read(data, intLen(len)); hadRead += size; data += size; len -= size; } return hadRead; } bool canReadLine() const Q_DECL_OVERRIDE { Q_D(const AppPipe); return d->buffer.indexOf(''/n'') != -1 || QIODevice::canReadLine(); } qint64 bytesAvailable() const Q_DECL_OVERRIDE { Q_D(const AppPipe); return QIODevice::bytesAvailable() + d->buffer.size(); } qint64 bytesToWrite() const Q_DECL_OVERRIDE { Q_D(const AppPipe); return QIODevice::bytesToWrite() + d->writeBuffer.size(); } #endif }; // ... #include "main.moc"

Un arnés de prueba mínimo:

class TestAppPipe : public QObject { Q_OBJECT QByteArray data1, data2; struct PipePair { AppPipe end1, end2; PipePair(QIODevice::OpenMode mode = QIODevice::NotOpen) : end1(QIODevice::ReadWrite | mode), end2(&end1, QIODevice::ReadWrite | mode) {} }; Q_SLOT void initTestCase() { data1 = randomData(); data2 = randomData(); } Q_SLOT void sizes() { QCOMPARE(sizeof(AppPipe), sizeof(QIODevice)); } Q_SLOT void basic() { PipePair p; QVERIFY(p.end1.isOpen() && p.end1.isWritable() && p.end1.isReadable()); QVERIFY(p.end2.isOpen() && p.end2.isWritable() && p.end2.isReadable()); static const char hello[] = "Hello There!"; p.end1.write(hello); p.end1.flush(); QCOMPARE(p.end2.readAll(), QByteArray(hello)); } static QByteArray randomData(int const size = 1024*1024*32) { QByteArray data; data.resize(size); char *const d = data.data(); for (char *p = d+data.size()-1; p >= d; --p) *p = qrand(); Q_ASSERT(data.size() == size); return data; } static void randomChunkWrite(AppPipe *dev, const QByteArray &payload) { for (int written = 0, left = payload.size(); left; ) { int const chunk = std::min(qrand() % 82931, left); dev->write(payload.mid(written, chunk)); left -= chunk; written += chunk; } dev->flush(); } void runBigData(PipePair &p) { Q_ASSERT(!data1.isEmpty() && !data2.isEmpty()); randomChunkWrite(&p.end1, data1); randomChunkWrite(&p.end2, data2); QCOMPARE(p.end1.bytesAvailable(), qint64(data2.size())); QCOMPARE(p.end2.bytesAvailable(), qint64(data1.size())); QCOMPARE(p.end1.readAll(), data2); QCOMPARE(p.end2.readAll(), data1); } Q_SLOT void bigDataBuffered() { PipePair p; runBigData(p); } Q_SLOT void bigDataUnbuffered() { PipePair p(QIODevice::Unbuffered); runBigData(p); } Q_SLOT void cleanupTestCase() { data1.clear(); data2.clear(); } }; QTEST_MAIN(TestAppPipe)

# local-pipe-32317081.pro QT = core greaterThan(QT_MAJOR_VERSION, 4): QT = core-private testlib else: CONFIG += qtestlib DEFINES += / QT_DEPRECATED_WARNINGS / QT_DISABLE_DEPRECATED_BEFORE=0x060000 / QT_RESTRICTED_CAST_FROM_ASCII CONFIG += console c++14 CONFIG -= app_bundle TEMPLATE = app SOURCES = main.cpp

¿Qt tiene un par QIODevice que funcionaría para las comunicaciones punto a punto dentro del proceso?

Se podría usar el QTCPSocket concreto o QLocalSocket , pero la API de conexión del lado del servidor es un poco engorrosa y parece un desperdicio forzar los datos a través del sistema operativo.