twitter - Escaldado: ¿Cómo retener el otro campo, después de un groupBy(''field){. Size}?
cascading scalding (1)
No puedes hacer esto de una manera agradable, me temo. Piense en cómo funciona debajo del capó: divide los datos para contarlos en fragmentos y los envía a diferentes procesos, cada proceso cuenta su parte, luego un solo reductor los suma al final. Mientras que cada proceso cuenta, no conoce el tamaño completo, por lo que no puede agregar el campo. La única forma es volver y agregarlo a los datos una vez que se conoce el tamaño completo (es decir, una unión).
Si cada grupo cabe en la memoria (y puede configurar la memoria), puede:
Tsv(args("input"), (''id1, ''id2))
.groupBy(''id2)(_.size.toList[(String, String)]((''id1, ''id2) -> ''list))
.flatMapTo[(Iterable[(String, String)], Int), (String, String, Int)]((''list, ''size) -> (''id1, ''id2, ''size)) {
case (list, size) => list.map(record => (record._1, record._2, size))
}
.write(Tsv(args("output")))
Pero si su sistema no tiene suficiente memoria, tendrá que usar una unión costosa.
Observación: Puede usar Tsv en lugar de TextLine seguido de mapTo y división.
Entonces mis datos de entrada tienen dos campos / columnas: id1 e id2, y mi código es el siguiente:
TextLine(args("input"))
.read
.mapTo(''line->(''id1,''id2)) {line: String =>
val fields = line.split("/t")
(fields(0),fields(1))
}
.groupBy(''id2){.size}
.write(Tsv(args("output")))
La salida da como resultado (lo que supongo) dos campos: tamaño id2 *. Estoy un poco atascado en averiguar si es posible retener el valor id1 que también se ha agrupado con id2 y agregarlo como otro campo?