tutorial spark example español ejemplo python apache-spark machine-learning pyspark rdd

python - example - Spark ALS predictAll devuelve vacío



spark 2.3 6 (1)

Hay dos condiciones básicas bajo las cuales MatrixFactorizationMode.predictAll puede devolver un RDD con un número menor de elementos que la entrada:

  • usuario falta en el conjunto de entrenamiento.
  • producto falta en el conjunto de entrenamiento.

Puede reproducir fácilmente este comportamiento y comprobar que no depende de cómo se ha creado RDD. Primero usemos datos de ejemplo para construir un modelo:

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating def parse(s): x, y, z = s.split(",") return Rating(int(x), int(y), float(z)) ratings = (sc.textFile("data/mllib/als/test.data") .map(parse) .union(sc.parallelize([Rating(1, 5, 4.0)]))) model = ALS.train(ratings, 10, 10)

A continuación, veamos qué productos y usuarios están presentes en los datos de entrenamiento:

set(ratings.map(lambda r: r.product).collect()) ## {1, 2, 3, 4, 5} set(ratings.map(lambda r: r.user).collect()) ## {1, 2, 3, 4}

Ahora permite crear datos de prueba y verificar predicciones:

valid_test = sc.parallelize([(2, 5), (1, 4), (3, 5)]) valid_test ## ParallelCollectionRDD[434] at parallelize at PythonRDD.scala:423 model.predictAll(valid_test).count() ## 3

Hasta aquí todo bien. Luego permite mapearlo usando la misma lógica que en tu código:

valid_test_ = valid_test.map(lambda xs: tuple(int(x) for x in xs)) valid_test_ ## PythonRDD[497] at RDD at PythonRDD.scala:43 model.predictAll(valid_test_).count() ## 3

Sigo bien. A continuación, creemos datos no válidos y repita el experimento:

invalid_test = sc.parallelize([ (2, 6), # No product in the training data (6, 1) # No user in the training data ]) invalid_test ## ParallelCollectionRDD[500] at parallelize at PythonRDD.scala:423 model.predictAll(invalid_test).count() ## 0 invalid_test_ = invalid_test.map(lambda xs: tuple(int(x) for x in xs)) model.predictAll(invalid_test_).count() ## 0

Como se esperaba, no hay predicciones para entradas no válidas.

Finalmente, puede confirmar que este es realmente el caso utilizando ML modelo que es completamente independiente en la formación / predicción del código de Python:

from pyspark.ml.recommendation import ALS as MLALS model_ml = MLALS(rank=10, maxIter=10).fit( ratings.toDF(["user", "item", "rating"]) ) model_ml.transform((valid_test + invalid_test).toDF(["user", "item"])).show() ## +----+----+----------+ ## |user|item|prediction| ## +----+----+----------+ ## | 6| 1| NaN| ## | 1| 4| 1.0184212| ## | 2| 5| 4.0041084| ## | 3| 5|0.40498763| ## | 2| 6| NaN| ## +----+----+----------+

Como puede ver, ningún usuario / elemento correspondiente en los datos de entrenamiento significa que no hay predicción.

Tengo el siguiente código de prueba de Python (los argumentos para ALS.train están definidos en otra parte):

r1 = (2, 1) r2 = (3, 1) test = sc.parallelize([r1, r2]) model = ALS.train(ratings, rank, numIter, lmbda) predictions = model.predictAll(test) print test.take(1) print predictions.count() print predictions

Lo cual funciona, porque tiene un recuento de 1 en contra de las predicciones variables y salidas:

[(2, 1)] 1 ParallelCollectionRDD[2691] at parallelize at PythonRDD.scala:423

Sin embargo, cuando trato de usar un RDD que creé utilizando el siguiente código, ya no funciona:

model = ALS.train(ratings, rank, numIter, lmbda) validation_data = validation.map(lambda xs: tuple(int(x) for x in xs)) predictions = model.predictAll(validation_data) print validation_data.take(1) print predictions.count() print validation_data

Qué salidas:

[(61, 3864)] 0 PythonRDD[4018] at RDD at PythonRDD.scala:43

Como puede ver, el comando "predecir todo regresa vacío" cuando pasó el RDD mapeado. Los valores que entran son ambos del mismo formato. La única diferencia notable que puedo ver es que el primer ejemplo utiliza parallelize y produce un ParallelCollectionRDD mientras que el segundo ejemplo solo usa un mapa que produce un PythonRDD . ¿Predice que todo solo funcionará si se pasa un cierto tipo de RDD? Si es así, ¿es posible convertir entre tipos de RDD? No estoy seguro de cómo hacerlo funcionar.