java - google - protobuf vs json
¿Hay equivalentes en C++ para las funciones de E/S delimitadas por los búferes de protocolo en Java? (10)
Aqui tienes:
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
using namespace google::protobuf::io;
class FASWriter
{
std::ofstream mFs;
OstreamOutputStream *_OstreamOutputStream;
CodedOutputStream *_CodedOutputStream;
public:
FASWriter(const std::string &file) : mFs(file,std::ios::out | std::ios::binary)
{
assert(mFs.good());
_OstreamOutputStream = new OstreamOutputStream(&mFs);
_CodedOutputStream = new CodedOutputStream(_OstreamOutputStream);
}
inline void operator()(const ::google::protobuf::Message &msg)
{
_CodedOutputStream->WriteVarint32(msg.ByteSize());
if ( !msg.SerializeToCodedStream(_CodedOutputStream) )
std::cout << "SerializeToCodedStream error " << std::endl;
}
~FASWriter()
{
delete _CodedOutputStream;
delete _OstreamOutputStream;
mFs.close();
}
};
class FASReader
{
std::ifstream mFs;
IstreamInputStream *_IstreamInputStream;
CodedInputStream *_CodedInputStream;
public:
FASReader(const std::string &file), mFs(file,std::ios::in | std::ios::binary)
{
assert(mFs.good());
_IstreamInputStream = new IstreamInputStream(&mFs);
_CodedInputStream = new CodedInputStream(_IstreamInputStream);
}
template<class T>
bool ReadNext()
{
T msg;
unsigned __int32 size;
bool ret;
if ( ret = _CodedInputStream->ReadVarint32(&size) )
{
CodedInputStream::Limit msgLimit = _CodedInputStream->PushLimit(size);
if ( ret = msg.ParseFromCodedStream(_CodedInputStream) )
{
_CodedInputStream->PopLimit(msgLimit);
std::cout << mFeed << " FASReader ReadNext: " << msg.DebugString() << std::endl;
}
}
return ret;
}
~FASReader()
{
delete _CodedInputStream;
delete _IstreamInputStream;
mFs.close();
}
};
Estoy tratando de leer / escribir múltiples mensajes de Buffers de Protocolo desde archivos, tanto en C ++ como en Java. Google sugiere escribir prefijos de longitud antes de los mensajes, pero no hay manera de hacerlo por defecto (que yo pudiera ver).
Sin embargo, la API de Java en la versión 2.1.0 recibió un conjunto de funciones de E / S "Delimitadas" que aparentemente hacen ese trabajo:
parseDelimitedFrom
mergeDelimitedFrom
writeDelimitedTo
¿Hay equivalentes en C ++? Y si no, ¿cuál es el formato de los prefijos de tamaño que la API de Java adjunta, por lo que puedo analizar esos mensajes en C ++?
De acuerdo, entonces no he podido encontrar funciones de C ++ de alto nivel que implementen lo que necesito, pero algunas especulaciones a través de la referencia de la API de Java arrojaron lo siguiente, dentro de la interfaz de MessageLite :
void writeDelimitedTo(OutputStream output)
/* Like writeTo(OutputStream), but writes the size of
the message as a varint before writing the data. */
¡Entonces el prefijo de tamaño de Java es un varist (Buffers de Protocolo)!
Armado con esa información, CodedStream la API de C ++ y encontré el encabezado de CodedStream , que tiene estos:
bool CodedInputStream::ReadVarint32(uint32 * value)
void CodedOutputStream::WriteVarint32(uint32 value)
Utilicándolos, debería ser capaz de rodar mis propias funciones de C ++ que hacen el trabajo.
Sin embargo, realmente deberían agregar esto a la API principal de mensajes; falta la funcionalidad teniendo en cuenta que Java lo tiene, y también lo hace el excelente puerto C # de protobuf-net de Marc Gravell (a través de SerializeWithLengthPrefix y DeserializeWithLengthPrefix).
IsteamInputStream es muy frágil para eofs y otros errores que se producen fácilmente cuando se usa junto con std :: istream. Después de esto, las secuencias de protobuf se dañan permanentemente y se destruye cualquier dato de buffer utilizado. Hay soporte adecuado para la lectura de flujos tradicionales en protobuf.
Implemente google::protobuf::io::CopyingInputStream
y utilícelo junto con CopyingInputStreamAdapter . Haz lo mismo para las variantes de salida.
En la práctica, una llamada de análisis termina en google::protobuf::io::CopyingInputStream::Read(void* buffer, int size)
donde se proporciona un buffer. Lo único que queda por hacer es leer de alguna manera.
Aquí hay un ejemplo para usar con las secuencias sincronizadas de Asio ( SyncReadStream / SyncWriteStream ):
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
using namespace google::protobuf::io;
template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream {
public:
AsioInputStream(SyncReadStream& sock);
int Read(void* buffer, int size);
private:
SyncReadStream& m_Socket;
};
template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
m_Socket(sock) {}
template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
{
std::size_t bytes_read;
boost::system::error_code ec;
bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);
if(!ec) {
return bytes_read;
} else if (ec == boost::asio::error::eof) {
return 0;
} else {
return -1;
}
}
template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream {
public:
AsioOutputStream(SyncWriteStream& sock);
bool Write(const void* buffer, int size);
private:
SyncWriteStream& m_Socket;
};
template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
m_Socket(sock) {}
template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
{
boost::system::error_code ec;
m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
return !ec;
}
Uso:
AsioInputStream<boost::asio::ip::tcp::socket> ais(m_Socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
CopyingInputStreamAdaptor cis_adp(&ais);
CodedInputStream cis(&cis_adp);
Message protoMessage;
uint32_t msg_size;
/* Read message size */
if(!cis.ReadVarint32(&msg_size)) {
// Handle error
}
/* Make sure not to read beyond limit of message */
CodedInputStream::Limit msg_limit = cis.PushLimit(msg_size);
if(!msg.ParseFromCodedStream(&cis)) {
// Handle error
}
/* Remove limit */
cis.PopLimit(msg_limit);
Llego un poco tarde a la fiesta aquí, pero las implementaciones siguientes incluyen algunas optimizaciones que faltan en las otras respuestas y no fallarán después de 64MB de entrada (aunque todavía impone el límite de 64MB en cada mensaje individual, pero no en toda la secuencia )
(Soy el autor de las bibliotecas de protobuf de C ++ y Java, pero ya no trabajo para Google. Siento que este código nunca haya llegado a la versión oficial. Así es como sería si tuviera).
bool writeDelimitedTo(
const google::protobuf::MessageLite& message,
google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
// We create a new coded stream for each message. Don''t worry, this is fast.
google::protobuf::io::CodedOutputStream output(rawOutput);
// Write the size.
const int size = message.ByteSize();
output.WriteVarint32(size);
uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
if (buffer != NULL) {
// Optimization: The message fits in one buffer, so use the faster
// direct-to-array serialization path.
message.SerializeWithCachedSizesToArray(buffer);
} else {
// Slightly-slower path when the message is multiple buffers.
message.SerializeWithCachedSizes(&output);
if (output.HadError()) return false;
}
return true;
}
bool readDelimitedFrom(
google::protobuf::io::ZeroCopyInputStream* rawInput,
google::protobuf::MessageLite* message) {
// We create a new coded stream for each message. Don''t worry, this is fast,
// and it makes sure the 64MB total size limit is imposed per-message rather
// than on the whole stream. (See the CodedInputStream interface for more
// info on this limit.)
google::protobuf::io::CodedInputStream input(rawInput);
// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
input.PushLimit(size);
// Parse the message.
if (!message->MergeFromCodedStream(&input)) return false;
if (!input.ConsumedEntireMessage()) return false;
// Release the limit.
input.PopLimit(limit);
return true;
}
Me encontré con el mismo problema tanto en C ++ como en Python.
Para la versión de C ++, utilicé una mezcla del código Kenton Varda publicado en este hilo y el código de la solicitud de extracción que envió al equipo de protobuf (porque la versión publicada aquí no maneja EOF mientras que la que envió a github hace )
#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/io/coded_stream.h>
bool writeDelimitedTo(const google::protobuf::MessageLite& message,
google::protobuf::io::ZeroCopyOutputStream* rawOutput)
{
// We create a new coded stream for each message. Don''t worry, this is fast.
google::protobuf::io::CodedOutputStream output(rawOutput);
// Write the size.
const int size = message.ByteSize();
output.WriteVarint32(size);
uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
if (buffer != NULL)
{
// Optimization: The message fits in one buffer, so use the faster
// direct-to-array serialization path.
message.SerializeWithCachedSizesToArray(buffer);
}
else
{
// Slightly-slower path when the message is multiple buffers.
message.SerializeWithCachedSizes(&output);
if (output.HadError())
return false;
}
return true;
}
bool readDelimitedFrom(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message, bool* clean_eof)
{
// We create a new coded stream for each message. Don''t worry, this is fast,
// and it makes sure the 64MB total size limit is imposed per-message rather
// than on the whole stream. (See the CodedInputStream interface for more
// info on this limit.)
google::protobuf::io::CodedInputStream input(rawInput);
const int start = input.CurrentPosition();
if (clean_eof)
*clean_eof = false;
// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size))
{
if (clean_eof)
*clean_eof = input.CurrentPosition() == start;
return false;
}
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size);
// Parse the message.
if (!message->MergeFromCodedStream(&input)) return false;
if (!input.ConsumedEntireMessage()) return false;
// Release the limit.
input.PopLimit(limit);
return true;
}
Y aquí está mi implementación de python2:
from google.protobuf.internal import encoder
from google.protobuf.internal import decoder
#I had to implement this because the tools in google.protobuf.internal.decoder
#read from a buffer, not from a file-like objcet
def readRawVarint32(stream):
mask = 0x80 # (1 << 7)
raw_varint32 = []
while 1:
b = stream.read(1)
#eof
if b == "":
break
raw_varint32.append(b)
if not (ord(b) & mask):
#we found a byte starting with a 0, which means it''s the last byte of this varint
break
return raw_varint32
def writeDelimitedTo(message, stream):
message_str = message.SerializeToString()
delimiter = encoder._VarintBytes(len(message_str))
stream.write(delimiter + message_str)
def readDelimitedFrom(MessageType, stream):
raw_varint32 = readRawVarint32(stream)
message = None
if raw_varint32:
size, _ = decoder._DecodeVarint32(raw_varint32, 0)
data = stream.read(size)
if len(data) < size:
raise Exception("Unexpected end of file")
message = MessageType()
message.ParseFromString(data)
return message
#In place version that takes an already built protobuf object
#In my tests, this is around 20% faster than the other version
#of readDelimitedFrom()
def readDelimitedFrom_inplace(message, stream):
raw_varint32 = readRawVarint32(stream)
if raw_varint32:
size, _ = decoder._DecodeVarint32(raw_varint32, 0)
data = stream.read(size)
if len(data) < size:
raise Exception("Unexpected end of file")
message.ParseFromString(data)
return message
else:
return None
Puede que no sea el código más atractivo y estoy seguro de que se puede refactorizar un poco, pero al menos eso debería mostrarte una forma de hacerlo.
Ahora el gran problema: es LENTO .
Incluso cuando se utiliza la implementación C ++ de python-protobuf, es un orden de magnitud más lento que en C ++ puro. Tengo un punto de referencia donde leo 10 millones de mensajes protobuf de ~ 30 bytes cada uno desde un archivo. Toma ~ 0.9s en C ++, y 35s en python.
Una forma de hacerlo un poco más rápido sería volver a implementar el decodificador varint para hacerlo leer desde un archivo y decodificarlo de una vez, en lugar de leer desde un archivo y luego decodificar como lo hace actualmente este código. (el perfil muestra que se gasta una cantidad significativa de tiempo en el codificador / decodificador de varints). Pero no hace falta decir que solo no es suficiente para cerrar la brecha entre la versión de Python y la versión de C ++.
Cualquier idea para hacerlo más rápido es muy bienvenido :)
Puede usar getline para leer una cadena de una secuencia, usando el delimitador especificado:
istream& getline ( istream& is, string& str, char delim );
(definido en el encabezado)
Resolví el mismo problema usando CodedOutputStream / ArrayOutputStream para escribir el mensaje (con el tamaño) y CodedInputStream / ArrayInputStream para leer el mensaje (con el tamaño).
Por ejemplo, el siguiente pseudocódigo escribe el tamaño del mensaje siguiente por el mensaje:
const unsigned bufLength = 256;
unsigned char buffer[bufLength];
Message protoMessage;
google::protobuf::io::ArrayOutputStream arrayOutput(buffer, bufLength);
google::protobuf::io::CodedOutputStream codedOutput(&arrayOutput);
codedOutput.WriteLittleEndian32(protoMessage.ByteSize());
protoMessage.SerializeToCodedStream(&codedOutput);
Al escribir, también debe verificar que su memoria intermedia sea lo suficientemente grande como para ajustarse al mensaje (incluido el tamaño). Y al leer, debe verificar que su buffer contenga un mensaje completo (incluido el tamaño).
Definitivamente sería útil si añadieran métodos de conveniencia a la API de C ++ similares a los provistos por la API de Java.
También estaba buscando una solución para esto. Aquí está el núcleo de nuestra solución, suponiendo que algunos códigos Java escriben muchos mensajes MyRecord con writeDelimitedTo
en un archivo. Abra el archivo y bucle, haciendo:
if(someCodedInputStream->ReadVarint32(&bytes)) { CodedInputStream::Limit msgLimit = someCodedInputStream->PushLimit(bytes); if(myRecord->ParseFromCodedStream(someCodedInputStream)) { //do your stuff with the parsed MyRecord instance } else { //handle parse error } someCodedInputStream->PopLimit(msgLimit); } else { //maybe end of file }
Espero eso ayude.
Trabajando con una versión Object-C de buffer de protocolo, me encontré con este problema exacto. Al enviar desde el cliente iOS a un servidor basado en Java que usa parseDelimitedFrom, que espera la longitud como el primer byte, primero tuve que llamar a writeRawByte al CodedOutputStream. Publicando aquí para ayudar a otros a enfrentar este problema. Al trabajar en este tema, uno pensaría que Google proto-bufs vendría con una bandera simple que hace esto por ti ...
Request* request = [rBuild build];
[self sendMessage:request];
}
- (void) sendMessage:(Request *) request {
//** get length
NSData* n = [request data];
uint8_t len = [n length];
PBCodedOutputStream* os = [PBCodedOutputStream streamWithOutputStream:outputStream];
//** prepend it to message, such that Request.parseDelimitedFrom(in) can parse it properly
[os writeRawByte:len];
[request writeToCodedOutputStream:os];
[os flush];
}
Ya que no puedo escribir esto como un comentario a la respuesta de Kenton Varda arriba; Creo que hay un error en el código que publicó (así como en otras respuestas que se han proporcionado). El siguiente código:
...
google::protobuf::io::CodedInputStream input(rawInput);
// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
input.PushLimit(size);
...
establece un límite incorrecto porque no tiene en cuenta el tamaño de varint32 que ya se ha leído de la entrada. Esto puede provocar la pérdida / corrupción de datos a medida que se leen bytes adicionales de la transmisión, que pueden formar parte del siguiente mensaje. La forma habitual de manejar esto correctamente es eliminar el CodedInputStream utilizado para leer el tamaño y crear uno nuevo para leer la carga útil:
...
uint32_t size;
{
google::protobuf::io::CodedInputStream input(rawInput);
// Read the size.
if (!input.ReadVarint32(&size)) return false;
}
google::protobuf::io::CodedInputStream input(rawInput);
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
input.PushLimit(size);
...