scala - produccion - rendimientos marginales crecientes
Ejemplo de la función agregada de Scala (7)
Aquí está el blog sobre cómo el rendimiento de habilitación agregada en el procesador multinúcleo con benchmark. http://markusjais.com/scalas-parallel-collections-and-the-aggregate-method/
Aquí hay un video de "Scala parallel collections" de "Scala Days 2011". http://days2011.scala-lang.org/node/138/272
La descripción en el video
Scala Parallel Collections
Aleksandar Prokopec
Las abstracciones de programación en paralelo se vuelven cada vez más importantes a medida que crece el número de núcleos de procesador. Un modelo de programación de alto nivel permite al programador enfocarse más en el programa y menos en detalles de bajo nivel como la sincronización y el equilibrio de carga. Las colecciones paralelas de Scala extienden el modelo de programación del marco de la colección Scala, proporcionando operaciones paralelas en conjuntos de datos. La charla describirá la arquitectura del marco de recopilación paralelo, explicando sus decisiones de implementación y diseño. Se describirán las implementaciones de recolección concreta, tales como los mapas hash paralelos y los intentos paralelos de hash. Finalmente, se mostrarán varias aplicaciones de ejemplo, demostrando el modelo de programación en la práctica.
He estado buscando y no puedo encontrar un ejemplo o discusión de la función aggregate
en Scala que pueda entender. Parece bastante poderoso.
¿Se puede usar esta función para reducir los valores de tuplas para hacer una colección de tipo multimap? Por ejemplo:
val list = Seq(("one", "i"), ("two", "2"), ("two", "ii"), ("one", "1"), ("four", "iv"))
Después de aplicar agregado:
Seq(("one" -> Seq("i","1")), ("two" -> Seq("2", "ii")), ("four" -> Seq("iv"))
Además, ¿puedes dar ejemplos de los parámetros z
, segop
y combop
? No estoy seguro de lo que hacen estos parámetros.
La definición de aggregate
en la fuente TraversableOnce
es:
def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B =
foldLeft(z)(seqop)
que no es diferente de un simple foldLeft
. combop
no parece usarse en ninguna parte. Yo mismo estoy confundido en cuanto a cuál es el propósito de este método.
La firma de una colección con elementos de tipo A es:
def aggregate [B] (z: B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B
-
z
es un objeto de tipo B que actúa como un elemento neutral. Si desea contar algo, puede usar 0, si desea compilar una lista, comenzar con una lista vacía, etc. -
segop
es análogo a la función que pasa a los métodos defold
. Se necesitan dos argumentos, el primero es del mismo tipo que el elemento neutral que pasó y representa el material que ya se agregó en la iteración anterior, el segundo es el siguiente elemento de su colección. El resultado también debe ser de tipoB
-
combop
: es una función que combina dos resultados en uno.
En la mayoría de las colecciones, agregado se implementa en TraversableOnce
como:
def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B
= foldLeft(z)(seqop)
Por combop
tanto, combop
se ignora. Sin embargo, tiene sentido para colecciones paralelas , porque seqop
primero se aplicará localmente en paralelo, y luego se llama a combop
para terminar la agregación.
Entonces, para su ejemplo, puede intentar con un doblez primero:
val seqOp =
(map:Map[String,Set[String]],tuple: (String,String)) =>
map + ( tuple._1 -> ( map.getOrElse( tuple._1, Set[String]() ) + tuple._2 ) )
list.foldLeft( Map[String,Set[String]]() )( seqOp )
// returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))
Entonces debes encontrar una manera de colapsar dos multimapas:
val combOp = (map1: Map[String,Set[String]], map2: Map[String,Set[String]]) =>
(map1.keySet ++ map2.keySet).foldLeft( Map[String,Set[String]]() ) {
(result,k) =>
result + ( k -> ( map1.getOrElse(k,Set[String]() ) ++ map2.getOrElse(k,Set[String]() ) ) )
}
Ahora puede usar agregado en paralelo:
list.par.aggregate( Map[String,Set[String]]() )( seqOp, combOp )
//Returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))
Aplicando el método "par" a la lista, usando la colección paralela (scala.collection.parallel.immutable.ParSeq) de la lista para aprovechar realmente los procesadores multi-core. Sin "par", no habrá ninguna ganancia de rendimiento ya que el agregado no se realiza en la colección paralela.
La función de agregado no hace eso (excepto que es una función muy general, y podría usarse para hacer eso). Quieres groupBy
. Cerca de al menos. Al comenzar con Seq[(String, String)]
, y se agrupa tomando el primer elemento de la tupla (que es (String, String) => String)
, se devolverá un Map[String, Seq[(String, String)]
). Luego debe descartar el primer parámetro en los valores Seq [String, String)].
Asi que
list.groupBy(_._1).mapValues(_.map(_._2))
Allí se obtiene un Map[String, Seq[(String, String)]
. Si desea un Seq
lugar de Map
, llame a toSeq
para obtener el resultado. No creo que tengas una garantía en el orden en el Seq resultante, aunque
Agregado es una función más difícil.
Considere primero reduceLeft y reduceRight. Sea una secuencia no vacía as = Seq(a1, ... an)
de elementos de tipo A, y f: (A,A) => A
sea de alguna manera para combinar dos elementos de tipo A
en uno. Lo notaré como un operador binario @
, a1 @ a2
lugar de f(a1, a2)
. as.reduceLeft(@)
calculará (((a1 @ a2) @ a3)... @ an)
. reduceRight
pondrá los paréntesis en el otro lado, (a1 @ (a2 @... @ an))))
. Si @
pasa a ser asociativo, a uno no le importan los paréntesis. Uno podría calcularlo como (a1 @... @ ap) @ (ap+1 @...@an)
(también habría parantheses dentro de las 2 grandes parantheses, pero no nos preocupemos por eso). Entonces uno podría hacer las dos partes en paralelo, mientras que el horquillado anidado en reduceLeft o reduceRight fuerza un cálculo completamente secuencial. Pero el cálculo en paralelo solo es posible cuando se sabe que @
es asociativo, y el método reductionLeft no puede saberlo.
Aún así, podría haber una reduce
método, cuya llamada sería responsable de garantizar que la operación sea asociativa. Luego reduce
las llamadas que desee, posiblemente haciéndolo en paralelo. De hecho, existe tal método.
Sin embargo, existe una limitación con los diversos métodos de reducción. Los elementos de Seq solo se pueden combinar con un resultado del mismo tipo: @
tiene que ser (A,A) => A
Pero uno podría tener el problema más general de combinarlos en un B
Uno comienza con un valor b
de tipo B
y lo combina con todos los elementos de la secuencia. El operador @
es (B,A) => B
, y uno calcula (((b @ a1) @ a2) ... @ an)
. foldLeft
hace eso. foldRight
hace lo mismo, pero comienza con an
. Allí, la operación @
no tiene posibilidad de ser asociativa. Cuando uno escribe b @ a1 @ a2
, debe significar (b @ a1) @ a2
, ya que (a1 @ a2)
estaría mal escrito. Entonces foldLeft y foldRight deben ser secuenciales.
Sin embargo, supongamos que cada A
se puede convertir en B
, ¡vamos a escribirlo !
, a!
es de tipo B
Supongamos, además, que hay una operación +
(B,B) => B
, y que @
es tal que b @ a
es de hecho b + a!
. ¡En lugar de combinar elementos con @, uno podría primero transformarlos a B con !
, luego combínalos con +
. Eso sería como as.map(!).reduceLeft(+)
. Y si +
es asociativo, entonces se puede hacer con reducir, y no ser secuencial: as.map (!). Reduce (+). Podría haber un método hipotético como partición asociativa (b,!, +).
El agregado está muy cerca de eso. Sin embargo, puede ser que exista una forma más eficiente de implementar b@a
que b+a!
Por ejemplo, si el tipo B
es la List[A]
, y b @ a es a :: b, entonces a!
será a::Nil
, y b1 + b2
será b2 ::: b1
. a :: b es mucho mejor que (a :: Nil) ::: b. Para beneficiarse de la asociatividad, pero aún usar @
, ¡primero se divide b + a1! + ... + an!
b + a1! + ... + an!
, en (b + a1! + ap!) + (ap+1! + ..+ an!)
, luego vuelve a usar @
con (b @ a1 @ an) + (ap+1! @ @ an)
. ¡Uno todavía necesita el! en ap + 1, porque uno debe comenzar con algo b. Y el + también es necesario, apareciendo entre las parantheses. Para hacer eso, como as.associativeFold(!, +)
Podría cambiarse a as.optimizedAssociativeFold(b, !, @, +)
.
Volver a +
. +
es asociativo, o equivalentemente, (B, +)
es un semigrupo. En la práctica, la mayoría de los semigrupos utilizados en la programación también son monoides, es decir, contienen un elemento neutro z
(para cero ) en B, de modo que para cada b
, z + b
= b + z
= b
. En ese caso, el !
operación que tiene sentido es probable que sea a! = z @ a
a! = z @ a
. Además, como z es un elemento neutral b @ a1 ..@ an = (b + z) @ a1 @ an
que es b + (z + a1 @ an)
. Entonces, siempre es posible comenzar la agregación con z. Si se quiere b
su lugar, se obtiene b + result
al final. Con todas esas hipótesis, podemos hacer un s.aggregate(z, @, +)
. Eso es lo que hace aggregate
. @
es el argumento seqop
(aplicado en una secuencia z @ a1 @ a2 @ ap
), y +
es combop
(aplicado a resultados ya parcialmente combinados , como en (z + a1@...@ap) + (z + ap+1@...@an)
).
Para resumir, como as.aggregate(z)(seqop, combop)
calcula lo mismo que as.foldLeft(z)( seqop)
siempre que
-
(B, combop, z)
es un monoide -
seqop(b,a) = combop(b, seqop(z,a))
la implementación agregada puede usar la asociatividad de combop para agrupar los cálculos como quiera (no intercambiando elementos, sin embargo, + no tiene que ser conmutativa, ::: no lo es). Puede ejecutarlos en paralelo.
Finalmente, resolver el problema inicial usando el aggregate
se deja como un ejercicio para el lector. Una pista: implemente usando foldLeft
, luego encuentre z
y combo
que satisfagan las condiciones indicadas anteriormente.
Solo para aclarar las explicaciones de los que me precedieron, en teoría la idea es que el agregado debería funcionar así, (he cambiado los nombres de los parámetros para hacerlos más claros):
Seq(1,2,3,4).aggragate(0)(
addToPrev = (prev,curr) => prev + curr,
combineSums = (sumA,sumB) => sumA + sumB)
Debería traducirse lógicamente a
Seq(1,2,3,4)
.grouped(2) // split into groups of 2 members each
.map(prevAndCurrList => prevAndCurrList(0) + prevAndCurrList(1))
.foldLeft(0)(sumA,sumB => sumA + sumB)
Debido a que la agregación y el mapeo están separados, la lista original podría dividirse teóricamente en diferentes grupos de diferentes tamaños y ejecutarse en paralelo o incluso en máquinas diferentes. En la práctica, la implementación actual de scala no admite esta función de forma predeterminada, pero puede hacerlo en su propio código.
Veamos si algún arte ascii no ayuda. Considere la firma de tipo de aggregate
:
def aggregate [B] (z: B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B
Además, tenga en cuenta que A
refiere al tipo de la colección. Entonces, digamos que tenemos 4 elementos en esta colección, entonces aggregate
podría funcionar así:
z A z A z A z A
/ / / /seqop/ / / /
B B B B
/ / combop / /
B _ _ B
/ combop /
B
Veamos un ejemplo práctico de eso. Digamos que tengo un GenSeq("This", "is", "an", "example")
, y quiero saber cuántos caracteres hay en él. Puedo escribir lo siguiente:
Tenga en cuenta el uso de par
en el siguiente fragmento de código. La segunda función que se pasa a agregar es lo que se llama después de calcular las secuencias individuales. Scala solo puede hacer esto para conjuntos que se pueden paralelizar.
import scala.collection.GenSeq
val seq = GenSeq("This", "is", "an", "example")
val chars = seq.par.aggregate(0)(_ + _.length, _ + _)
Entonces, primero calcularía esto:
0 + "This".length // 4
0 + "is".length // 2
0 + "an".length // 2
0 + "example".length // 7
Lo que hace a continuación no puede predecirse (hay más de una forma de combinar los resultados), pero podría hacer esto (como en el arte ascii anterior):
4 + 2 // 6
2 + 7 // 9
En ese punto, concluye con
6 + 9 // 15
que da el resultado final. Ahora, esto es un poco similar en estructura a foldLeft
, pero tiene una función adicional (B, B) => B
, que fold no tiene. ¡Esta función, sin embargo, le permite trabajar en paralelo!
Considere, por ejemplo, que cada uno de los cálculos iniciales de los cuatro cálculos es independiente entre sí y puede hacerse en paralelo. Los siguientes dos (que resultan en 6 y 9) se pueden iniciar una vez que sus cálculos de los que dependen hayan finalizado, pero estos dos también pueden ejecutarse en paralelo.
Los 7 cómputos, paralelizados como se indicó anteriormente, podrían tomar tan poco como el mismo tiempo 3 cálculos seriales.
En realidad, con una colección tan pequeña, el costo de sincronizar el cálculo sería lo suficientemente grande como para eliminar cualquier ganancia. Además, si plegó esto, solo tomaría 4 cómputos en total. Sin embargo, una vez que tus colecciones se hacen más grandes, comienzas a ver algunas ganancias reales.
Considere, por otro lado, foldLeft
. Debido a que no tiene la función adicional, no puede paralelizar ningún cálculo:
(((0 + "This".length) + "is".length) + "an".length) + "example".length
Cada uno de los paréntesis internos debe computarse antes de que el externo pueda continuar.
aggregate
es como foldLeft
pero puede ejecutarse en paralelo.
Como dice missingfactor , la versión lineal de aggregate(z)(seqop, combop)
es equivalente a foldleft(z)(seqop)
. Sin embargo, esto no es práctico en el caso paralelo, donde necesitaríamos combinar no solo el siguiente elemento con el resultado anterior (como en un doblez normal) sino que queremos dividir los iterables en sub-iterables en los que llamamos agregado y necesidad de combinarlos de nuevo. (De izquierda a derecha, pero no asociativo, ya que podríamos haber combinado las últimas partes antes de las primeras partes de la iterable.) Esta re-combinación en general no es trivial, y por lo tanto, se necesita un método (S, S) => S
para lograr eso.
La definición en ParIterableLike
es:
def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
executeAndWaitResult(new Aggregate(z, seqop, combop, splitter))
}
que de hecho usa combop
.
Como referencia, Aggregate
se define como:
protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, protected[this] val pit: IterableSplitter[T])
extends Accessor[S, Aggregate[S]] {
@volatile var result: S = null.asInstanceOf[S]
def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop)
protected[this] def newSubtask(p: IterableSplitter[T]) = new Aggregate(z, seqop, combop, p)
override def merge(that: Aggregate[S]) = result = combop(result, that.result)
}
La parte importante es merge
donde se aplica combop
con dos sub-resultados.