Agrupamiento(fkmeans) con Mahout usando Clojure
cluster-analysis (1)
Estoy tratando de escribir un script corto para agrupar mis datos a través de clojure (aunque se llama a las clases de Mahout). Tengo mis datos de entrada en este formato (que es una salida de un script php )
format: (tag) (image) (frequency)
tag_sit image_a 0
tag_sit image_b 1
tag_lorem image_a 1
tag_lorem image_b 0
tag_dolor image_a 0
tag_dolor image_b 1
tag_ipsum image_a 1
tag_ipsum image_b 1
tag_amit image_a 1
tag_amit image_b 0
... (more)
Luego los escribo en un SequenceFile usando este script (clojure)
#!./bin/clj
(ns sensei.sequence.core)
(require ''clojure.string)
(require ''clojure.java.io)
(import org.apache.hadoop.conf.Configuration)
(import org.apache.hadoop.fs.FileSystem)
(import org.apache.hadoop.fs.Path)
(import org.apache.hadoop.io.SequenceFile)
(import org.apache.hadoop.io.Text)
(import org.apache.mahout.math.VectorWritable)
(import org.apache.mahout.math.SequentialAccessSparseVector)
(with-open [reader (clojure.java.io/reader *in*)]
(let [hadoop_configuration ((fn []
(let [conf (new Configuration)]
(. conf set "fs.default.name" "hdfs://localhost:9000/")
conf)))
hadoop_fs (FileSystem/get hadoop_configuration)]
(reduce
(fn [writer [index value]]
(. writer append index value)
writer)
(SequenceFile/createWriter
hadoop_fs
hadoop_configuration
(new Path "test/sensei")
Text
VectorWritable)
(map
(fn [[tag row_vector]]
(let [input_index (new Text tag)
input_vector (new VectorWritable)]
(. input_vector set row_vector)
[input_index input_vector]))
(map
(fn [[tag photo_list]]
(let [photo_map (apply hash-map photo_list)
input_vector (new SequentialAccessSparseVector (count (vals photo_map)))]
(loop [frequency_list (vals photo_map)]
(if (zero? (count frequency_list))
[tag input_vector]
(when-not (zero? (count frequency_list))
(. input_vector set
(mod (count frequency_list) (count (vals photo_map)))
(Integer/parseInt (first frequency_list)))
(recur (rest frequency_list)))))))
(reduce
(fn [result next_line]
(let [[tag photo frequency] (clojure.string/split next_line #" ")]
(update-in result [tag]
#(if (nil? %)
[photo frequency]
(conj % photo frequency)))))
{}
(line-seq reader)))))))
Básicamente convierte la entrada en un archivo de secuencia, en este formato.
Tecla (Texto): $ valor de tag_uri (VectorWritable): un vector (cardinalidad = número de documentos) con índice numérico y la frecuencia respectiva <0:1 1:0 2:0 3:1 4:0 ...>
Luego procedo a hacer el clúster real con este script (refiriéndome a esta publicación del blog )
#!./bin/clj
(ns sensei.clustering.fkmeans)
(import org.apache.hadoop.conf.Configuration)
(import org.apache.hadoop.fs.Path)
(import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver)
(import org.apache.mahout.common.distance.EuclideanDistanceMeasure)
(import org.apache.mahout.clustering.kmeans.RandomSeedGenerator)
(let [hadoop_configuration ((fn []
(let [conf (new Configuration)]
(. conf set "fs.default.name" "hdfs://127.0.0.1:9000/")
conf)))
input_path (new Path "test/sensei")
output_path (new Path "test/clusters")
clusters_in_path (new Path "test/clusters/cluster-0")]
(FuzzyKMeansDriver/run
hadoop_configuration
input_path
(RandomSeedGenerator/buildRandom
hadoop_configuration
input_path
clusters_in_path
(int 2)
(new EuclideanDistanceMeasure))
output_path
(new EuclideanDistanceMeasure)
(double 0.5)
(int 10)
(float 5.0)
true
false
(double 0.0)
false)) '''' runSequential
Sin embargo estoy obteniendo una salida como esta
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
11/08/25 15:20:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new compressor
11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new decompressor
11/08/25 15:20:17 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
11/08/25 15:20:17 INFO input.FileInputFormat: Total input paths to process : 1
11/08/25 15:20:17 INFO mapred.JobClient: Running job: job_local_0001
11/08/25 15:20:17 INFO mapred.MapTask: io.sort.mb = 100
11/08/25 15:20:17 INFO mapred.MapTask: data buffer = 79691776/99614720
11/08/25 15:20:17 INFO mapred.MapTask: record buffer = 262144/327680
11/08/25 15:20:17 WARN mapred.LocalJobRunner: job_local_0001
java.lang.IllegalStateException: No clusters found. Check your -c path.
at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansMapper.setup(FuzzyKMeansMapper.java:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
11/08/25 15:20:18 INFO mapred.JobClient: map 0% reduce 0%
11/08/25 15:20:18 INFO mapred.JobClient: Job complete: job_local_0001
11/08/25 15:20:18 INFO mapred.JobClient: Counters: 0
Exception in thread "main" java.lang.RuntimeException: java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing test/clusters/cluster-0/part-randomSeed
at clojure.lang.Util.runtimeException(Util.java:153)
at clojure.lang.Compiler.eval(Compiler.java:6417)
at clojure.lang.Compiler.load(Compiler.java:6843)
at clojure.lang.Compiler.loadFile(Compiler.java:6804)
at clojure.main$load_script.invoke(main.clj:282)
at clojure.main$script_opt.invoke(main.clj:342)
at clojure.main$main.doInvoke(main.clj:426)
at clojure.lang.RestFn.invoke(RestFn.java:436)
at clojure.lang.Var.invoke(Var.java:409)
at clojure.lang.AFn.applyToHelper(AFn.java:167)
at clojure.lang.Var.applyTo(Var.java:518)
at clojure.main.main(main.java:37)
Caused by: java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing test/clusters/cluster-0/part-randomSeed
at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.runIteration(FuzzyKMeansDriver.java:252)
at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersMR(FuzzyKMeansDriver.java:421)
at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:345)
at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:295)
at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35)
at clojure.lang.Compiler.eval(Compiler.java:6406)
... 10 more
Cuando runSequential se establece en true
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
11/09/07 14:32:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11/09/07 14:32:32 INFO compress.CodecPool: Got brand-new compressor
11/09/07 14:32:32 INFO compress.CodecPool: Got brand-new decompressor
Exception in thread "main" java.lang.IllegalStateException: Clusters is empty!
at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersSeq(FuzzyKMeansDriver.java:361)
at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:343)
at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:295)
at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35)
at clojure.lang.Compiler.eval(Compiler.java:6465)
at clojure.lang.Compiler.load(Compiler.java:6902)
at clojure.lang.Compiler.loadFile(Compiler.java:6863)
at clojure.main$load_script.invoke(main.clj:282)
at clojure.main$script_opt.invoke(main.clj:342)
at clojure.main$main.doInvoke(main.clj:426)
at clojure.lang.RestFn.invoke(RestFn.java:436)
at clojure.lang.Var.invoke(Var.java:409)
at clojure.lang.AFn.applyToHelper(AFn.java:167)
at clojure.lang.Var.applyTo(Var.java:518)
at clojure.main.main(main.java:37)
También he reescrito el script fkmeans a esta forma
#!./bin/clj
(ns sensei.clustering.fkmeans)
(import org.apache.hadoop.conf.Configuration)
(import org.apache.hadoop.fs.Path)
(import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver)
(import org.apache.mahout.common.distance.EuclideanDistanceMeasure)
(import org.apache.mahout.clustering.kmeans.RandomSeedGenerator)
(let [hadoop_configuration ((fn []
(let [conf (new Configuration)]
(. conf set "fs.default.name" "hdfs://localhost:9000/")
conf)))
driver (new FuzzyKMeansDriver)]
(. driver setConf hadoop_configuration)
(. driver
run
(into-array String ["--input" "test/sensei"
"--output" "test/clusters"
"--clusters" "test/clusters/clusters-0"
"--clustering"
"--overwrite"
"--emitMostLikely" "false"
"--numClusters" "3"
"--maxIter" "10"
"--m" "5"])))
pero sigue recibiendo el mismo error que la primera versión inicial: /
La herramienta de línea de comandos funciona bien
$ bin/mahout fkmeans --input test/sensei --output test/clusters --clusters test/clusters/clusters-0 --clustering --overwrite --emitMostLikely false --numClusters 10 --maxIter 10 --m 5
Sin embargo, no devolvería los puntos cuando intente clusterdumper aunque la opción --clustering existe en el comando anterior y --pointsDir se define aquí
$ ./bin/mahout clusterdump --seqFileDir test/clusters/clusters-1 --pointsDir test/clusters/clusteredPoints --output sensei.txt
Versión de Mahout utilizada: 0.6-snapshot, clojure 1.3.0-snapshot
Por favor avísame si me pierdo algo.
Mi conjetura es que la implementación de Mahout de fuzzy-c-means necesita grupos iniciales para comenzar, que tal vez no haya suministrado.
También suena un poco como si estuvieras ejecutando un solo nodo? Tenga en cuenta que para los sistemas de un solo nodo, debe evitar toda la sobrecarga de Mahout / Hadoop y simplemente usar un algoritmo de agrupación regular. Hadoop / Mahout tiene un costo considerable que solo se amortiza cuando ya no puede procesar los datos en un solo sistema. No es "reducir mapa" a menos que lo haga en una gran cantidad de sistemas.