query property ndb how google create google-app-engine mapreduce memcached google-cloud-datastore app-engine-ndb

google-app-engine - property - ndb put



Los modelos ndb no se guardan en Memcache cuando se usa MapReduce (2)

Como Slawek Rewaj ya mencionó, esto es causado por el caché en contexto. Al recuperar una entidad, NDB primero prueba la memoria caché en contexto, luego Memcache y, finalmente, recupera la entidad del almacén de datos si no se encontró ni en la memoria caché en contexto ni en Memcache. La memoria caché en contexto es solo un diccionario de Python y su duración y visibilidad están limitadas a la solicitud actual, pero MapReduce realiza múltiples llamadas a product_bulk_import_map () dentro de una única solicitud.

Puede encontrar más información sobre la memoria caché en contexto aquí: https://cloud.google.com/appengine/docs/python/ndb/cache#incontext

Creé dos tuberías de MapReduce para cargar archivos CSV para crear categorías y productos a granel. Cada producto se vincula a una categoría a través de KeyProperty. Los modelos de Categoría y Producto se basan en ndb.Model, por lo tanto, según la documentación, creo que se almacenarían en caché automáticamente en Memcache cuando se recuperaran del Datastore.

He ejecutado estas secuencias de comandos en el servidor para subir 30 categorías y, posteriormente, 3000 productos. Todos los datos aparecen en el almacén de datos como se esperaba.

Sin embargo, no parece que la carga del Producto esté usando Memcache para obtener las Categorías. Cuando reviso el visor de Memcache en el portal, dice algo parecido a que el recuento de aciertos fue de alrededor de 180 y el recuento de la desaparición alrededor de 60. Si estaba cargando 3000 productos y recuperando la categoría cada vez, ¿no debería tener alrededor de 3000? hits + pierde de buscar la categoría (es decir, Category.get_by_id (category_id))? Y es probable que falten 3000 más por intentar recuperar el producto existente antes de crear uno nuevo (el algoritmo maneja tanto la creación de la entidad como las actualizaciones).

Aquí está la función de mapeo relevante del producto, que toma una línea del archivo CSV para crear o actualizar el producto:

def product_bulk_import_map(data): """Product Bulk Import map function.""" result = {"status" : "CREATED"} product_data = data try: # parse input parameter tuple byteoffset, line_data = data # parse base product data product_data = [x for x in csv.reader([line_data])][0] (p_id, c_id, p_type, p_description) = product_data # process category category = Category.get_by_id(c_id) if category is None: raise Exception(product_import_error_messages["category"] % c_id) # store in datastore product = Product.get_by_id(p_id) if product is not None: result["status"] = "UPDATED" product.category = category.key product.product_type = p_type product.description = p_description else: product = Product( id = p_id, category = category.key, product_type = p_type, description = p_description ) product.put() result["entity"] = product.to_dict() except Exception as e: # catch any exceptions, and note failure in output result["status"] = "FAILED" result["entity"] = str(e) # return results yield (str(product_data), result)


MapReduce desactiva intencionalmente Memcache para NDB.

Consulte mapreduce / util.py ln 373, _set_ndb_cache_policy() (desde 2015-05-01):

def _set_ndb_cache_policy(): """Tell NDB to never cache anything in memcache or in-process. This ensures that entities fetched from Datastore input_readers via NDB will not bloat up the request memory size and Datastore Puts will avoid doing calls to memcache. Without this you get soft memory limit exits, which hurts overall throughput. """ ndb_ctx = ndb.get_context() ndb_ctx.set_cache_policy(lambda key: False) ndb_ctx.set_memcache_policy(lambda key: False)

Puede forzar get_by_id() y put() para usar Memcache, por ejemplo:

product = Product.get_by_id(p_id, use_memcache=True) ... product.put(use_memcache=True)

Alternativamente, puede modificar el contexto de NDB si está trabajando en lotes junto con mapreduce.operation . Sin embargo, no sé lo suficiente como para decir si esto tiene otros efectos no deseados:

ndb_ctx = ndb.get_context() ndb_ctx.set_memcache_policy(lambda key: True) ... yield operation.db.Put(product)

En cuanto a la docstring sobre "salidas de límite de memoria flexible", no entiendo por qué eso ocurriría si solo se habilitara Memcache (es decir, sin memoria caché en contexto).

En realidad, parece que quieres que Memcache esté habilitado para las puts, de lo contrario, tu aplicación termina leyendo datos obsoletos de Memcache de NDB después de que tu mapper haya modificado los datos que se encuentran debajo.