apache spark - textfile - Cómo asignar números contiguos únicos a elementos en un Spark RDD
spark parallelize (5)
Tengo un conjunto de datos de (user, product, review)
y deseo alimentarlo con el algoritmo ALS de mllib.
El algoritmo necesita usuarios y productos para ser números, mientras que los míos son nombres de usuario de cadenas y SKU de cadenas.
En este momento, obtengo los distintos usuarios y SKU, luego les asigno ID numéricos fuera de Spark.
Me preguntaba si había una mejor manera de hacerlo. El único enfoque en el que he pensado es escribir un RDD personalizado que enumere esencialmente 1 a n
, luego llame a zip en los dos RDD.
Comenzando con Spark 1.0 hay dos métodos que puede usar para resolver esto fácilmente:
-
RDD.zipWithIndex
es comoSeq.zipWithIndex
, agrega números contiguos (Long
). Esto necesita contar los elementos en cada partición primero, por lo que su entrada se evaluará dos veces. Guarde en caché su entrada RDD si desea usar esto. -
RDD.zipWithUniqueId
también le proporciona IDLong
únicos, pero no se garantiza que sean contiguos. (Solo serán contiguos si cada partición tiene el mismo número de elementos). Lo bueno es que no es necesario que sepa nada sobre la entrada, por lo que no causará una doble evaluación.
La gente ya ha recomendado MonotonicallyIncreasingID , y mencionó el problema de que crea Longs, no Ints.
Sin embargo, en mi experiencia (advertencia - Spark 1.6) - si lo usas en un solo ejecutor (repartición en 1 antes), no hay un prefijo de ejecutor usado, y el número puede ser transferido a Int. Obviamente, debe tener menos de Integer.MAX_VALUE filas.
Otra opción fácil, si usa DataFrames y solo le preocupa la singularidad, es usar la función MonotonicallyIncreasingID
import org.apache.spark.sql.functions.monotonicallyIncreasingId
val newDf = df.withColumn("uniqueIdColumn", monotonicallyIncreasingId)
Editar: MonotonicallyIncreasingID
fue desaprobado y eliminado desde Spark 2.0 ; ahora se conoce como monotonically_increasing_id
.
Para un caso de uso de ejemplo similar, simplemente modifiqué los valores de cadena. Ver http://blog.cloudera.com/blog/2014/03/why-apache-spark-is-a-crossover-hit-for-data-scientists/
def nnHash(tag: String) = tag.hashCode & 0x7FFFFF
var tagHashes = postIDTags.map(_._2).distinct.map(tag =>(nnHash(tag),tag))
Parece que ya estás haciendo algo como esto, aunque el hashing puede ser más fácil de manejar.
Matei sugirió aquí un enfoque para emular zipWithIndex
en un RDD, lo que equivale a asignar identificadores dentro de cada partición que serán globalmente únicos: https://groups.google.com/forum/#!topic/spark-users/WxXvcn2gl1E
monotonically_increasing_id() parece ser la respuesta, pero desafortunadamente no funcionará para ALS ya que produce números de 64 bits y ALS espera de 32 bits (vea mi comentario debajo de la respuesta de radek1st para los deets).
La solución que encontré es usar zipWithIndex() , como se menciona en la respuesta de Darabos. Aquí le mostramos cómo implementarlo:
Si ya tiene un DataFrame de una sola columna con sus usuarios distintos llamados userids
, puede crear una tabla de búsqueda (LUT) de la siguiente manera:
# PySpark code
user_als_id_LUT = sqlContext.createDataFrame(userids.rdd.map(lambda x: x[0]).zipWithIndex(), StructType([StructField("userid", StringType(), True),StructField("user_als_id", IntegerType(), True)]))
Ahora usted puede:
- Utilice esta LUT para obtener ID enteros compatibles con ALS para proporcionar a ALS
- Use esta LUT para hacer una búsqueda inversa cuando necesite regresar de ALS ID a la ID original
Haz lo mismo para los artículos, obviamente.