ruby-on-rails ruby multithreading concurrency promise

ruby on rails - ¿Ocurre la concurrencia incluso cuando solo hay un subproceso en un grupo de subprocesos?



ruby-on-rails multithreading (3)

Dado que está definiendo el grupo de subprocesos fijos para tener un solo subproceso, asumo que no está logrando ningún tipo de concurrencia. Al observar su error, parece que el único subproceso disponible del grupo estuvo ocupado durante demasiado tiempo y causó la excepción de tiempo de espera de conexión.

Cuando modificó la implementación del código para que no contuviera un grupo de subprocesos, la aplicación fue explícitamente un solo subproceso sin la posibilidad de un tiempo de espera de conexión debido a la espera de subprocesos de un grupo. Intente aumentar el tamaño de su grupo de subprocesos (quizás a 3 o 5) y vea si todavía está recibiendo la misma excepción.

Estoy usando Rails 5 y Ruby 2.4. ¿Cómo puedo averiguar, o puede decir, mirando a continuación, si hay varios subprocesos ejecutándose al mismo tiempo?

pool = Concurrent::FixedThreadPool.new(1) promises = links.map do |link| Concurrent::Promise.execute(executor: pool) do result = process_link(link) if result if result.kind_of?(Array) result.each do |my_obj| my_obj.update_attributes({ :a => a }) records_processed = records_processed + my_obj.matches.count end else records_processed = records_processed + result.matches.count result.update_attributes({ :a => a }) end end end end promises.map(&:wait).map(&:value!)

Como configuré mi grupo a "1", mi suposición es que no se ejecuta nada simultáneamente, pero sigo recibiendo este error ...

Error during processing: (ActiveRecord::ConnectionTimeoutError) could not obtain a connection from the pool within 5.000 seconds (waited 5.002 seconds); all pooled connections were in use /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:202:in `block in wait_poll'' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:193:in `loop'' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:193:in `wait_poll'' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:154:in `internal_poll'' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:278:in `internal_poll'' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:148:in `block in poll'' /Users/nataliab/.rvm/rubies/ruby-2.4.0/lib/ruby/2.4.0/monitor.rb:214:in `mon_synchronize'' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:158:in `synchronize'' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:148:in `poll'' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:717:in `acquire_connection'' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:490:in `checkout'' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:364:in `connection'' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_adapters/abstract/connection_pool.rb:883:in `retrieve_connection'' /Users/nataliab/.rvm/gems/ruby-2.4.0@global/gems/activerecord-5.0.1/lib/active_record/connection_handling.rb:128:in `retrieve_connection''

No obtengo el error anterior si modifico mi código para ejecutarlo donde estoy seguro de que no hay concurrencia ...

links.each do |link| result = process_link(link) if result if result.kind_of?(Array) result.each do |race| my_obj.update_attributes({ :a => a }) records_processed = records_processed + my_obj.matches.count end else records_processed = records_processed + result.matches.count result.update_attributes({ :a => a }) end end end

Editar: Esta es la configuración de mi base de datos para mi entorno de desarrollo. También tenga en cuenta que todo esto se está ejecutando en la consola de rieles.

development: adapter: postgresql encoding: utf8 database: sims username: postgres password: password pool: 5 timeout: 15000 host: 127.0.0.1


Su suposición de que varios subprocesos deben ejecutarse simultáneamente porque la agrupación de conexiones se está agotando no es correcta. El hecho de que una conexión todavía esté "desprotegida" en el grupo de conexiones no significa que una consulta se esté ejecutando actualmente en la conexión desprotegida en un subproceso, simplemente significa que la conexión del subproceso no se ha verificado de nuevo. El subproceso podría estar inactivo, pero aún aferrarse a una conexión desde el grupo de conexiones, siempre y cuando no haya finalizado explícitamente.

Dado que las Conexiones de ActiveRecord son de subprocesos locales, puede agotar el grupo de conexiones ejecutando consultas de ActiveRecord en varios subprocesos, como lo está haciendo en este caso. (Cada vez que se llama a Concurrent::FixedThreadPool.new(1) , se crea un nuevo subproceso.) Incluso si solo está ejecutando consultas en un solo subproceso a la vez, de forma predeterminada, la conexión se mantendrá abierta en cada subproceso hasta que se terminen.

Para evitar esto, puede verificar las conexiones manualmente después de usarlas, o asegurarse de que sus hilos terminen (se eliminen) para que sus conexiones puedan ser recuperadas por la agrupación.

  • Para verificar manualmente las conexiones, consulte la documentación de ConnectionPool para conocer sus opciones. La forma más fácil es envolver su código ActiveRecord en un bloque with_connection :

    Concurrent::Promise.execute(executor: pool) do ActiveRecord::Base.connection_pool.with_connection do # update_attributes, etc end end

  • Para asegurarse de que todos los subprocesos terminen, llame a #shutdown seguido de #wait_for_termination en el grupo de subprocesos una vez que haya terminado de usarlo:

    values = promises.map(&:value!) pool.shutdown pool.wait_for_termination


Usted asume que solo hay un hilo es incorrecto. Hay dos: el del grupo de subprocesos y el principal que generó el del grupo de subprocesos.

Podría confundirse al hacer que el subproceso principal esperara y no debería acceder a la base de datos. Eso no significa que todavía no tenga una conexión, por lo que evitará que el otro subproceso la adquiera.

Como regla general, el conjunto de conexiones de la base de datos debe configurarse al menos en el número de subprocesos generados + 1. En este caso - 2.

Código para reproducir fácilmente:

# migration class CreateFoos < ActiveRecord::Migration[5.0] def change create_table :foos do |t| t.integer :bar end end end # model class Foo < ApplicationRecord end # rake task task experiment: :environment do Foo.create pool = Concurrent::FixedThreadPool.new(1) promise = Concurrent::Promise.execute(executor: pool) do Foo.first.update_attributes!(bar: rand(-42..42)) end promise.wait.value! end

Establezca pool en 1 en su config/database.yml y ejecute la tarea. Obtendrá un error. Establézcalo en 2 - estará bien.

Puede aumentar el número de subprocesos en el grupo y agregar al menos esa cantidad de promesas para el procesamiento. Siempre fallará para el conjunto de conexiones de la base de datos = número de subprocesos en el conjunto de subprocesos y tendrá éxito si agrega uno más en config/database.yml .