java multithreading threadpool resteasy thread-local

java - Propagar ThreadLocal a un nuevo Thread obtenido de un ExecutorService



multithreading threadpool (6)

Estoy ejecutando un proceso en un subproceso separado con un tiempo de espera, utilizando un ExecutorService y un Futuro (código de ejemplo here ) (el subproceso "engendrar" se lleva a cabo en un aspecto AOP).

Ahora, el hilo principal es una solicitud de Resteasy . Resteasy usa una o más variables ThreadLocal para almacenar cierta información de contexto que necesito recuperar en algún momento de mi llamada al método Rest. El problema es que, como el subproceso Resteasy se está ejecutando en un nuevo subproceso, las variables ThreadLocal se pierden.

¿Cuál sería la mejor manera de "propagar" cualquier variable ThreadLocal utilizada por Resteasy al nuevo hilo? Parece que Resteasy utiliza más de una variable ThreadLocal para realizar un seguimiento de la información de contexto y me gustaría transferir "a ciegas" toda la información al nuevo hilo.

He observado las subclases de ThreadPoolExecutor y ThreadPoolExecutor utilizado el método beforeExecute para pasar el subproceso actual a la agrupación, pero no pude encontrar una manera de pasar las variables de ThreadLocal a la agrupación.

¿Cualquier sugerencia?

Gracias


Aquí hay un ejemplo para pasar el LocaleContext actual en el subproceso primario al subproceso secundario abarcado por CompletableFuture [De forma predeterminada, usó ForkJoinPool].

Simplemente define todas las cosas que querías hacer en un subproceso secundario dentro de un bloque Ejecutable. Entonces, cuando CompletableFuture ejecuta el bloque Runnable, es el subproceso secundario el que tiene el control y listo, tiene el conjunto ThreadLocal de los padres establecido en ThreadLocal de Child.

El problema aquí no es que se haya copiado todo el ThreadLocal. Sólo se copia el LocaleContext. Dado que ThreadLocal es de acceso privado solo para el Thread al que pertenece, usar Reflection y tratar de obtener y establecer en Child es demasiado extravagante, lo que puede provocar fugas de memoria o un impacto en el rendimiento.

Entonces, si conoce los parámetros que le interesan de ThreadLocal, esta solución funciona de manera más limpia.

public void parentClassMethod(Request request) { LocaleContext currentLocale = LocaleContextHolder.getLocaleContext(); executeInChildThread(() -> { LocaleContextHolder.setLocaleContext(currentLocale); //Do whatever else you wanna do })); //Continue stuff you want to do with parent thread } private void executeInChildThread(Runnable runnable) { try { CompletableFuture.runAsync(runnable) .get(); } catch (Exception e) { LOGGER.error("something is wrong"); } }


Basado en la respuesta de @erickson escribí este código. Está trabajando para inheritableThreadLocals. Crea una lista de objetos de ruta heredables utilizando el mismo método que se usa en el contructor de subprocesos. Por supuesto que uso la reflexión para hacer esto. También anulo la clase ejecutora.

public class MyThreadPoolExecutor extends ThreadPoolExecutor { @Override public void execute(Runnable command) { super.execute(new Wrapped(command, Thread.currentThread())); } }

Envoltura:

private class Wrapped implements Runnable { private final Runnable task; private final Thread caller; public Wrapped(Runnable task, Thread caller) { this.task = task; this.caller = caller; } public void run() { Iterable<ThreadLocal<?>> vars = null; try { vars = copy(caller); } catch (Exception e) { throw new RuntimeException("error when coping Threads", e); } try { task.run(); } finally { for (ThreadLocal<?> var : vars) var.remove(); } } }

método de copia:

public static Iterable<ThreadLocal<?>> copy(Thread caller) throws Exception { List<ThreadLocal<?>> threadLocals = new ArrayList<>(); Field field = Thread.class.getDeclaredField("inheritableThreadLocals"); field.setAccessible(true); Object map = field.get(caller); Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table"); table.setAccessible(true); Method method = ThreadLocal.class .getDeclaredMethod("createInheritedMap", Class.forName("java.lang.ThreadLocal$ThreadLocalMap")); method.setAccessible(true); Object o = method.invoke(null, map); Field field2 = Thread.class.getDeclaredField("inheritableThreadLocals"); field2.setAccessible(true); field2.set(Thread.currentThread(), o); Object tbl = table.get(o); int length = Array.getLength(tbl); for (int i = 0; i < length; i++) { Object entry = Array.get(tbl, i); Object value = null; if (entry != null) { Method referentField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getMethod( "get"); referentField.setAccessible(true); value = referentField.invoke(entry); threadLocals.add((ThreadLocal<?>) value); } } return threadLocals; }


El conjunto de instancias de ThreadLocal asociadas con un hilo se mantienen en miembros privados de cada Thread . Su única oportunidad de enumerarlos es hacer una reflexión sobre el tema; De esta manera, puede anular las restricciones de acceso en los campos del hilo.

Una vez que pueda obtener el conjunto de ThreadLocal , puede copiar en los subprocesos en segundo plano utilizando los beforeExecute() y afterExecute() de ThreadPoolExecutor , o creando un contenedor Runnable para sus tareas que intercepta la llamada run() para establecer un desarmado Instancias ThreadLocal necesarias. En realidad, la última técnica podría funcionar mejor, ya que le brindaría un lugar conveniente para almacenar los valores de ThreadLocal en el momento en que la tarea se pone en cola.

Actualización: Aquí hay una ilustración más concreta del segundo enfoque. Contrariamente a mi descripción original, todo lo que está almacenado en el contenedor es el subproceso que llama, que se consulta cuando se ejecuta la tarea.

static Runnable wrap(Runnable task) { Thread caller = Thread.currentThread(); return () -> { Iterable<ThreadLocal<?>> vars = copy(caller); try { task.run(); } finally { for (ThreadLocal<?> var : vars) var.remove(); } }; } /** * For each {@code ThreadLocal} in the specified thread, copy the thread''s * value to the current thread. * * @param caller the calling thread * @return all of the {@code ThreadLocal} instances that are set on current thread */ private static Collection<ThreadLocal<?>> copy(Thread caller) { /* Use a nasty bunch of reflection to do this. */ throw new UnsupportedOperationException(); }


No me gusta el enfoque de reflexión. Una solución alternativa sería implementar el envoltorio del ejecutor y pasar el objeto directamente como un contexto ThreadLocal a todos los subprocesos secundarios que propagan un contexto principal.

public class PropagatedObject { private ThreadLocal<ConcurrentHashMap<AbsorbedObjectType, Object>> data = new ThreadLocal<>(); //put, set, merge methods, etc }

==>

public class ObjectAwareExecutor extends AbstractExecutorService { private final ExecutorService delegate; private final PropagatedObject objectAbsorber; public ObjectAwareExecutor(ExecutorService delegate, PropagatedObject objectAbsorber){ this.delegate = delegate; this.objectAbsorber = objectAbsorber; } @Override public void execute(final Runnable command) { final ConcurrentHashMap<String, Object> parentContext = objectAbsorber.get(); delegate.execute(() -> { try{ objectAbsorber.set(parentContext); command.run(); }finally { parentContext.putAll(objectAbsorber.get()); objectAbsorber.clean(); } }); objectAbsorber.merge(parentContext); }


Según entiendo su problema, puede echarle un vistazo a InheritableThreadLocal que está destinado a pasar ThreadLocal variables ThreadLocal del contexto del subproceso principal al contexto del subproceso ThreadLocal


Si miras el código ThreadLocal puedes ver:

public T get() { Thread t = Thread.currentThread(); ... }

el hilo actual no se puede sobrescribir.

Soluciones posibles:

  1. Mira el mecanismo de fork / join de java 7 (pero creo que es una mala manera)

  2. Mire el mecanismo endorsed para sobrescribir la clase ThreadLocal en su JVM.

  3. Intente volver a escribir RESTEasy (puede usar las herramientas Refactor en su IDE para reemplazar todo el uso de ThreadLocal, parece fácil)