clojure cluster-analysis mahout

Agrupamiento(fkmeans) con Mahout usando Clojure



cluster-analysis (1)

Estoy tratando de escribir un script corto para agrupar mis datos a través de clojure (aunque se llama a las clases de Mahout). Tengo mis datos de entrada en este formato (que es una salida de un script php )

format: (tag) (image) (frequency) tag_sit image_a 0 tag_sit image_b 1 tag_lorem image_a 1 tag_lorem image_b 0 tag_dolor image_a 0 tag_dolor image_b 1 tag_ipsum image_a 1 tag_ipsum image_b 1 tag_amit image_a 1 tag_amit image_b 0 ... (more)

Luego los escribo en un SequenceFile usando este script (clojure)

#!./bin/clj (ns sensei.sequence.core) (require ''clojure.string) (require ''clojure.java.io) (import org.apache.hadoop.conf.Configuration) (import org.apache.hadoop.fs.FileSystem) (import org.apache.hadoop.fs.Path) (import org.apache.hadoop.io.SequenceFile) (import org.apache.hadoop.io.Text) (import org.apache.mahout.math.VectorWritable) (import org.apache.mahout.math.SequentialAccessSparseVector) (with-open [reader (clojure.java.io/reader *in*)] (let [hadoop_configuration ((fn [] (let [conf (new Configuration)] (. conf set "fs.default.name" "hdfs://localhost:9000/") conf))) hadoop_fs (FileSystem/get hadoop_configuration)] (reduce (fn [writer [index value]] (. writer append index value) writer) (SequenceFile/createWriter hadoop_fs hadoop_configuration (new Path "test/sensei") Text VectorWritable) (map (fn [[tag row_vector]] (let [input_index (new Text tag) input_vector (new VectorWritable)] (. input_vector set row_vector) [input_index input_vector])) (map (fn [[tag photo_list]] (let [photo_map (apply hash-map photo_list) input_vector (new SequentialAccessSparseVector (count (vals photo_map)))] (loop [frequency_list (vals photo_map)] (if (zero? (count frequency_list)) [tag input_vector] (when-not (zero? (count frequency_list)) (. input_vector set (mod (count frequency_list) (count (vals photo_map))) (Integer/parseInt (first frequency_list))) (recur (rest frequency_list))))))) (reduce (fn [result next_line] (let [[tag photo frequency] (clojure.string/split next_line #" ")] (update-in result [tag] #(if (nil? %) [photo frequency] (conj % photo frequency))))) {} (line-seq reader)))))))

Básicamente convierte la entrada en un archivo de secuencia, en este formato.

Tecla (Texto): $ valor de tag_uri (VectorWritable): un vector (cardinalidad = número de documentos) con índice numérico y la frecuencia respectiva <0:1 1:0 2:0 3:1 4:0 ...>

Luego procedo a hacer el clúster real con este script (refiriéndome a esta publicación del blog )

#!./bin/clj (ns sensei.clustering.fkmeans) (import org.apache.hadoop.conf.Configuration) (import org.apache.hadoop.fs.Path) (import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver) (import org.apache.mahout.common.distance.EuclideanDistanceMeasure) (import org.apache.mahout.clustering.kmeans.RandomSeedGenerator) (let [hadoop_configuration ((fn [] (let [conf (new Configuration)] (. conf set "fs.default.name" "hdfs://127.0.0.1:9000/") conf))) input_path (new Path "test/sensei") output_path (new Path "test/clusters") clusters_in_path (new Path "test/clusters/cluster-0")] (FuzzyKMeansDriver/run hadoop_configuration input_path (RandomSeedGenerator/buildRandom hadoop_configuration input_path clusters_in_path (int 2) (new EuclideanDistanceMeasure)) output_path (new EuclideanDistanceMeasure) (double 0.5) (int 10) (float 5.0) true false (double 0.0) false)) '''' runSequential

Sin embargo estoy obteniendo una salida como esta

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 11/08/25 15:20:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new compressor 11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new decompressor 11/08/25 15:20:17 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 11/08/25 15:20:17 INFO input.FileInputFormat: Total input paths to process : 1 11/08/25 15:20:17 INFO mapred.JobClient: Running job: job_local_0001 11/08/25 15:20:17 INFO mapred.MapTask: io.sort.mb = 100 11/08/25 15:20:17 INFO mapred.MapTask: data buffer = 79691776/99614720 11/08/25 15:20:17 INFO mapred.MapTask: record buffer = 262144/327680 11/08/25 15:20:17 WARN mapred.LocalJobRunner: job_local_0001 java.lang.IllegalStateException: No clusters found. Check your -c path. at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansMapper.setup(FuzzyKMeansMapper.java:62) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210) 11/08/25 15:20:18 INFO mapred.JobClient: map 0% reduce 0% 11/08/25 15:20:18 INFO mapred.JobClient: Job complete: job_local_0001 11/08/25 15:20:18 INFO mapred.JobClient: Counters: 0 Exception in thread "main" java.lang.RuntimeException: java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing test/clusters/cluster-0/part-randomSeed at clojure.lang.Util.runtimeException(Util.java:153) at clojure.lang.Compiler.eval(Compiler.java:6417) at clojure.lang.Compiler.load(Compiler.java:6843) at clojure.lang.Compiler.loadFile(Compiler.java:6804) at clojure.main$load_script.invoke(main.clj:282) at clojure.main$script_opt.invoke(main.clj:342) at clojure.main$main.doInvoke(main.clj:426) at clojure.lang.RestFn.invoke(RestFn.java:436) at clojure.lang.Var.invoke(Var.java:409) at clojure.lang.AFn.applyToHelper(AFn.java:167) at clojure.lang.Var.applyTo(Var.java:518) at clojure.main.main(main.java:37) Caused by: java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing test/clusters/cluster-0/part-randomSeed at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.runIteration(FuzzyKMeansDriver.java:252) at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersMR(FuzzyKMeansDriver.java:421) at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:345) at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:295) at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35) at clojure.lang.Compiler.eval(Compiler.java:6406) ... 10 more

Cuando runSequential se establece en true

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 11/09/07 14:32:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 11/09/07 14:32:32 INFO compress.CodecPool: Got brand-new compressor 11/09/07 14:32:32 INFO compress.CodecPool: Got brand-new decompressor Exception in thread "main" java.lang.IllegalStateException: Clusters is empty! at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersSeq(FuzzyKMeansDriver.java:361) at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:343) at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:295) at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35) at clojure.lang.Compiler.eval(Compiler.java:6465) at clojure.lang.Compiler.load(Compiler.java:6902) at clojure.lang.Compiler.loadFile(Compiler.java:6863) at clojure.main$load_script.invoke(main.clj:282) at clojure.main$script_opt.invoke(main.clj:342) at clojure.main$main.doInvoke(main.clj:426) at clojure.lang.RestFn.invoke(RestFn.java:436) at clojure.lang.Var.invoke(Var.java:409) at clojure.lang.AFn.applyToHelper(AFn.java:167) at clojure.lang.Var.applyTo(Var.java:518) at clojure.main.main(main.java:37)

También he reescrito el script fkmeans a esta forma

#!./bin/clj (ns sensei.clustering.fkmeans) (import org.apache.hadoop.conf.Configuration) (import org.apache.hadoop.fs.Path) (import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver) (import org.apache.mahout.common.distance.EuclideanDistanceMeasure) (import org.apache.mahout.clustering.kmeans.RandomSeedGenerator) (let [hadoop_configuration ((fn [] (let [conf (new Configuration)] (. conf set "fs.default.name" "hdfs://localhost:9000/") conf))) driver (new FuzzyKMeansDriver)] (. driver setConf hadoop_configuration) (. driver run (into-array String ["--input" "test/sensei" "--output" "test/clusters" "--clusters" "test/clusters/clusters-0" "--clustering" "--overwrite" "--emitMostLikely" "false" "--numClusters" "3" "--maxIter" "10" "--m" "5"])))

pero sigue recibiendo el mismo error que la primera versión inicial: /

La herramienta de línea de comandos funciona bien

$ bin/mahout fkmeans --input test/sensei --output test/clusters --clusters test/clusters/clusters-0 --clustering --overwrite --emitMostLikely false --numClusters 10 --maxIter 10 --m 5

Sin embargo, no devolvería los puntos cuando intente clusterdumper aunque la opción --clustering existe en el comando anterior y --pointsDir se define aquí

$ ./bin/mahout clusterdump --seqFileDir test/clusters/clusters-1 --pointsDir test/clusters/clusteredPoints --output sensei.txt

Versión de Mahout utilizada: 0.6-snapshot, clojure 1.3.0-snapshot

Por favor avísame si me pierdo algo.


Mi conjetura es que la implementación de Mahout de fuzzy-c-means necesita grupos iniciales para comenzar, que tal vez no haya suministrado.

También suena un poco como si estuvieras ejecutando un solo nodo? Tenga en cuenta que para los sistemas de un solo nodo, debe evitar toda la sobrecarga de Mahout / Hadoop y simplemente usar un algoritmo de agrupación regular. Hadoop / Mahout tiene un costo considerable que solo se amortiza cuando ya no puede procesar los datos en un solo sistema. No es "reducir mapa" a menos que lo haga en una gran cantidad de sistemas.