usados simbolos significado reservadas palabras lenguaje comandos claves python elasticsearch elasticsearch-bulk-api

significado - simbolos del lenguaje python



Cómo usar Bulk API para almacenar las palabras clave en ES mediante el uso de Python (3)

Tengo que almacenar un mensaje en ElasticSearch para integrarlo con mi programa python. Ahora lo que intento almacenar el mensaje es:

d={"message":"this is message"} for index_nr in range(1,5): ElasticSearchAPI.addToIndex(index_nr, d) print d

Eso significa que si tengo 10 mensajes, entonces tengo que repetir mi código 10 veces. Entonces, lo que quiero hacer es intentar crear un archivo de script o un archivo por lotes. Reviso ElasticSearch Guide, BULK API es posible de usar. http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html El formato debería ser algo como a continuación:

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } } { "field1" : "value1" } { "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } } { "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } } { "field1" : "value3" } { "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} } { "doc" : {"field2" : "value2"} }

lo que hice fue:

{"index":{"_index":"test1","_type":"message","_id":"1"}} {"message":"it is red"} {"index":{"_index":"test2","_type":"message","_id":"2"}} {"message":"it is green"}

También uso la herramienta Curl para almacenar el documento.

$ curl -s -XPOST localhost:9200/_bulk --data-binary @message.json

Ahora quiero usar mi código Python para almacenar el archivo en Elastic Search.


(los otros enfoques mencionados en este hilo usan la lista de Python para la actualización de ES, que no es una buena solución hoy en día, especialmente cuando necesita agregar millones de datos a ES)

Un mejor enfoque es utilizar generadores de pitón : procesar gigas de datos sin quedarse sin memoria o comprometer demasiado la velocidad .

A continuación se muestra un fragmento de ejemplo de un caso de uso práctico: agregar datos del archivo de registro nginx a ES para su análisis.

def decode_nginx_log(_nginx_fd): for each_line in _nginx_fd: # Filter out the below from each log line remote_addr = ... timestamp = ... ... # Index for elasticsearch. Typically timestamp. idx = ... es_fields_keys = (''remote_addr'', ''timestamp'', ''url'', ''status'') es_fields_vals = (remote_addr, timestamp, url, status) # We return a dict holding values from each line es_nginx_d = dict(zip(es_fields_keys, es_fields_vals)) # Return the row on each iteration yield idx, es_nginx_d # <- Note the usage of ''yield'' def es_add_bulk(nginx_file): # The nginx file can be gzip or just text. Open it appropriately. ... es = Elasticsearch(hosts = [{''host'': ''localhost'', ''port'': 9200}]) # NOTE the (...) round brackets. This is for a generator. k = ({ "_index": "nginx", "_type" : "logs", "_id" : idx, "_source": es_nginx_d, } for idx, es_nginx_d in decode_nginx_log(_nginx_fd)) helpers.bulk(es, k) # Now, just run it. es_add_bulk(''./nginx.1.log.gz'')

Este esqueleto demuestra el uso de generadores. Puede usar esto incluso en una máquina desnuda si es necesario. Y puede continuar ampliando esto para adaptarse a sus necesidades rápidamente.

Referencia de Python Elasticsearch here .


Aunque el código de @justinachen me ayudó a comenzar con py-elasticseearch, después de buscar en el código fuente, permítanme hacer una simple mejora:

es = Elasticsearch() j = 0 actions = [] while (j <= 10): action = { "_index": "tickets-index", "_type": "tickets", "_id": j, "_source": { "any":"data" + str(j), "timestamp": datetime.now() } } actions.append(action) j += 1 helpers.bulk(es, actions)

helpers.bulk() ya hace la segmentación por ti. Y por segmentación me refiero a los mandriles enviados cada vez al servidor. Si desea reducir la cantidad de documentos enviados, hágalo: helpers.bulk(es, actions, chunk_size=100)

Alguna información útil para comenzar:

helpers.bulk() es solo un contenedor de helpers.streaming_bulk pero el primero acepta una lista que lo hace útil.

helpers.streaming_bulk se ha basado en Elasticsearch.bulk() por lo que no tiene que preocuparse por qué elegir.

Entonces, en la mayoría de los casos, helpers.bulk() debería ser todo lo que necesita.


from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch import helpers es = Elasticsearch() actions = [ { "_index": "tickets-index", "_type": "tickets", "_id": j, "_source": { "any":"data" + str(j), "timestamp": datetime.now()} } for j in range(0, 10) ] helpers.bulk(es, actions)