python - example - Reindexing Búsqueda elástica a través de Bulk API, escaneo y desplazamiento
python elasticsearch delete index (3)
Estoy intentando volver a indexar mi configuración de búsqueda elástica, actualmente estoy mirando la documentación de búsqueda Elastic y un ejemplo con la API de Python
Sin embargo, estoy un poco confundido sobre cómo funciona todo esto. Pude obtener la ID de espiral de la API de Python:
es = Elasticsearch("myhost")
index = "myindex"
query = {"query":{"match_all":{}}}
response = es.search(index= index, doc_type= "my-doc-type", body= query, search_type= "scan", scroll= "10m")
scroll_id = response["_scroll_id"]
Ahora mi pregunta es, ¿de qué sirve esto para mí? ¿Qué me da conocer el ID de desplazamiento? La documentación dice que use la "API a granel", pero no tengo idea de cómo los factores scoll_id en esto, fue un poco confuso.
¿Alguien podría dar un breve ejemplo que muestre cómo volver a indexar desde este punto, teniendo en cuenta que tengo el scroll_id correctamente?
Hola, puedes usar scroll api para recorrer todos los documentos de la forma más eficiente. Usando scroll_id puede encontrar una sesión que está almacenada en el servidor para su solicitud de desplazamiento específica. Por lo tanto, debe proporcionar scroll_id con cada solicitud para obtener más elementos.
La API a granel es para documentos de indexación más eficientes. Cuando copia e indexa necesita ambos, pero en realidad no están relacionados.
Tengo un código de Java que podría ayudarte a obtener una mejor idea sobre cómo funciona.
public void reIndex() {
logger.info("Start creating a new index based on the old index.");
SearchResponse searchResponse = client.prepareSearch(MUSIC_INDEX)
.setQuery(matchAllQuery())
.setSearchType(SearchType.SCAN)
.setScroll(createScrollTimeoutValue())
.setSize(SCROLL_SIZE).execute().actionGet();
BulkProcessor bulkProcessor = BulkProcessor.builder(client,
createLoggingBulkProcessorListener()).setBulkActions(BULK_ACTIONS_THRESHOLD)
.setConcurrentRequests(BULK_CONCURRENT_REQUESTS)
.setFlushInterval(createFlushIntervalTime())
.build();
while (true) {
searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(createScrollTimeoutValue()).execute().actionGet();
if (searchResponse.getHits().getHits().length == 0) {
logger.info("Closing the bulk processor");
bulkProcessor.close();
break; //Break condition: No hits are returned
}
for (SearchHit hit : searchResponse.getHits()) {
IndexRequest request = new IndexRequest(MUSIC_INDEX_NEW, hit.type(), hit.id());
request.source(hit.sourceRef());
bulkProcessor.add(request);
}
}
}
Para cualquiera que se encuentre con este problema, puede usar la siguiente API del cliente de Python para reindexar:
https://elasticsearch-py.readthedocs.org/en/master/helpers.html#elasticsearch.helpers.reindex
Esto le ayudaría a evitar tener que desplazarse y buscar para obtener todos los datos y usar la API masiva para colocar datos en el nuevo índice.
aquí hay un ejemplo de reindexación a otro nodo elasticsearch usando elasticsearch-py:
from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])
helpers.reindex(es_src, ''src_index_name'', ''des_index_name'', target_client=es_des)
también puede reindexar el resultado de una consulta en un índice diferente, aquí está cómo hacerlo:
from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])
body = {"query": {"term": {"year": "2004"}}}
helpers.reindex(es_src, ''src_index_name'', ''des_index_name'', target_client=es_des, query=body)