usar tutorial ejemplo codigo java multithreading aop aspectj executorservice

tutorial - text box java



¿Cómo puedo hacer que los métodos externos sean interrumpibles? (8)

Como dijo mhaller , la mejor opción es lanzar un nuevo Proceso. Dado que sus trabajos no son tan cooperativos, nunca tendrá garantías sobre la terminación del subproceso.

Una buena solución para su problema sería usar una biblioteca que admita la pausa / detención arbitraria de '' hilos ligeros'' como Akka lugar del servicio del ejecutor, aunque esto puede ser un poco exagerado.

Aunque nunca he usado Akka y no puedo confirmar que funcione como esperabas, los documentos indican que hay un método stop() para detener a los actores.

El problema

Estoy ejecutando múltiples invocaciones de algún método externo a través de un ExecutorService . Me gustaría poder interrumpir estos métodos, pero desafortunadamente no revisan el indicador de interrupción por sí mismos. ¿Hay alguna manera de forzar una excepción a partir de estos métodos?

Soy consciente de que lanzar una excepción desde una ubicación arbitraria es potencialmente peligroso ; en mi caso específico, estoy dispuesto a aprovechar esta oportunidad y estoy preparado para enfrentar las consecuencias.

Detalles

Por "método externo" me refiero a algún método (s) que provienen de una biblioteca externa, y no puedo modificar su código (así puedo hacerlo, pero eso hará que sea una pesadilla de mantenimiento cada vez que se lance una nueva versión).

Los métodos externos son computacionalmente caros, no vinculados a IO, por lo que no responden a las interrupciones regulares y no puedo forzar el cierre de un canal o un socket o algo así. Como mencioné anteriormente, tampoco revisan el indicador de interrupción.

El código es conceptualmente algo así como:

// my code public void myMethod() { Object o = externalMethod(x); } // External code public class ExternalLibrary { public Object externalMethod(Object) { innerMethod1(); innerMethod1(); innerMethod1(); } private void innerMethod1() { innerMethod2(); // computationally intensive operations } private void innerMethod2() { // computationally intensive operations } }

Lo que he intentado

Thread.stop() teóricamente hará lo que yo quiera, pero no solo está en desuso, sino que también está disponible solo para hilos reales, mientras estoy trabajando con tareas de ejecutor (que también pueden compartir hilos con tareas futuras, por ejemplo, cuando se trabaja en un grupo de subprocesos). Sin embargo, si no se encuentra una solución mejor, convertiré mi código para usar Threads antiguos y usaré este método.

Otra opción que he intentado es marcar myMethod() y métodos similares con una anotación especial "Interruptable" y luego usar AspectJ (de lo cual soy un novato) para capturar todas las invocaciones de métodos allí, algo así como:

@Before("call(* *.*(..)) && withincode(@Interruptable * *.*(..))") public void checkInterrupt(JoinPoint thisJoinPoint) { if (Thread.interrupted()) throw new ForcefulInterruption(); }

Pero withincode no es recurrente a los métodos invocados por los métodos de coincidencia, por lo que tendría que editar esta anotación en el código externo.

Finalmente, esto es similar a una pregunta previa mía , aunque una diferencia notable es que ahora estoy tratando con una biblioteca externa.


Esta solución tampoco es fácil, pero podría funcionar: con Javassist o CGLIB, puede insertar código al comienzo de cada método interno (los que presumiblemente son invocados por el método principal run) para verificar si el hilo está activo. , o alguna otra bandera (si es alguna otra bandera, tendrás que agregarla también, junto con un método para configurarla).

Estoy proponiendo Javassist / CGLIB en lugar de extender la clase a través del código porque mencionas que es externo y no deseas cambiar el código fuente, y puede cambiar en el futuro. Por lo tanto, agregar las comprobaciones de interrupción en el tiempo de ejecución funcionará para la versión actual y también en versiones futuras, incluso si los nombres de los métodos internos cambian (o sus parámetros, valores de retorno, etc.). Solo tiene que tomar la clase y agregar comprobaciones de interrupción al comienzo de cada método que no sea el método run ().


He pirateado una fea solución a mi problema. No es bonito, pero funciona en mi caso, así que lo estoy publicando aquí en caso de que ayude a alguien más.

Lo que hice fue perfilar las partes de la biblioteca de mi aplicación, con la esperanza de poder aislar un pequeño grupo de métodos que son llamados repetidamente, por ejemplo, algunos métodos get o equals() o algo así; y luego podría insertar el siguiente segmento de código allí:

if (Thread.interrupted()) { // Not really necessary, but could help if the library does check it itself in some other place: Thread.currentThread().interrupt(); // Wrapping the checked InterruptedException because the signature doesn''t declare it: throw new RuntimeException(new InterruptedException()); }

O insertarlo manualmente editando el código de la biblioteca, o automáticamente escribiendo un aspecto apropiado. Tenga en cuenta que si la biblioteca intenta atrapar y tragar una RuntimeException , la excepción lanzada podría reemplazarse por otra cosa que la biblioteca no intenta atrapar.

Afortunadamente para mí, al usar VisualVM , pude encontrar un único método llamado un número muy elevado de veces durante el uso específico que estaba haciendo de la biblioteca. Después de agregar el segmento de código anterior, ahora responde adecuadamente a las interrupciones.

Esto, por supuesto, no se puede mantener, y nada garantiza que la biblioteca llamará a este método repetidamente en otros escenarios; pero funcionó para mí, y dado que es relativamente fácil hacer un perfil de otras aplicaciones e insertar las verificaciones allí, considero que esta es una solución genérica, aunque fea.


Las siguientes ideas extrañas vienen a mi mente:

  • Usar una biblioteca de modificación de código de bytes , como Javassist, para introducir las comprobaciones de interrupción en varios puntos dentro del bytecode. Justo al comienzo de los métodos puede no ser suficiente, ya que menciona que estos métodos externos no son recursivos, por lo que es posible que desee detenerlos por la fuerza en cualquier momento. Hacer esto en el nivel de código de bytes también lo haría muy receptivo, por ejemplo, incluso si el código externo se ejecuta dentro de un bucle o algo así, sería posible introducir las comprobaciones de interrupción. Sin embargo, esto agregará algunos gastos generales, por lo que el rendimiento general será más lento.
  • Lanzar procesos separados (por ejemplo, máquinas virtuales separadas) para el código externo. Los procesos abortivos pueden ser mucho más fáciles de codificar que las otras soluciones. La desventaja es que necesitaría algún tipo de canal de comunicación entre el código externo y su código, por ejemplo, IPC, sockets, etc. La segunda desventaja es que necesita muchos más recursos (CPU, memoria) para iniciar nuevas máquinas virtuales y puede ser entorno específico. Esto funcionaría si comienzas un par de tareas usando el código externo, pero no cientos de tareas. Además, el rendimiento sufriría, pero el cálculo en sí mismo sería tan rápido como el original. Los procesos se pueden detener con fuerza utilizando java.lang.Process.destroy ()
  • Utilizando un SecurityManager personalizado , que realiza la comprobación de interrupciones en cada uno de los métodos checkXXX. Si el código externo de alguna manera llama a métodos privilegiados, puede ser suficiente que abortes en estas ubicaciones. Un ejemplo sería java.lang.SecurityManager.checkPropertyAccess (String) si el código externo lee periódicamente una propiedad del sistema.

Que yo sepa, hay dos formas de usar el aspecto

  • AspecJ
  • Spring AOP

AspectJ es un compilador personalizado que intercepta métodos que son compilados (lo que significa que no puede obtener "métodos externos". Spring AOP (por defecto) usa proxying de clase para interceptar métodos en tiempo de ejecución (para interceptar "métodos externos"). con Spring AOP es que no puede representar por proxy las clases ya aprobadas (lo que AspectJ puede hacer porque no tiene clases proxy). Creo que AspectJ podría ayudarlo en este caso.


Si los métodos internos tienen nombres similares, entonces podría usar la definición de punto de corte en xml (spring / AspectJ) en lugar de anotaciones, por lo que no se necesitaría modificar el código de la biblioteca externa.


Tu escribiste:

Otra opción que he intentado es marcar myMethod() y métodos similares con una anotación especial "Interruptable" y luego usar AspectJ (de lo cual soy un novato) para capturar todas las invocaciones de métodos allí, algo así como:

@Before("call(* *.*(..)) && withincode(@Interruptable * *.*(..))") public void checkInterrupt(JoinPoint thisJoinPoint) { if (Thread.interrupted()) throw new ForcefulInterruption(); }

Pero withincode no es recurrente a los métodos invocados por los métodos de coincidencia, por lo que tendría que editar esta anotación en el código externo.

La idea de AspectJ es buena, pero necesitas

  • use cflow() o cflowbelow() para cflowbelow() recursivamente un cierto flujo de control (por ejemplo, algo como @Before("cflow(execution(@Interruptable * *(..)))") ).
  • asegúrese de tejer también su biblioteca externa, no solo su propio código. Esto se puede hacer utilizando el tejido binario, instrumentando las clases del archivo JAR y volviéndolas a empaquetar en un nuevo archivo JAR, o aplicando LTW (tejer tiempo de carga) durante el inicio de la aplicación (es decir, durante la carga de la clase).

Es posible que ni siquiera necesite su anotación de marcador si su biblioteca externa tiene un nombre de paquete con el que puede identificar within() . AspectJ es realmente poderoso, y a menudo hay más de una forma de resolver un problema. Recomendaría usarlo porque fue hecho para esfuerzos tales como los tuyos.


Una opción es:

  1. Haga que la VM se conecte a sí misma usando JDI.
  2. Busque el hilo que está ejecutando su tarea. Esto no es trivial, pero ya que tiene acceso a todos los marcos de pila, es ciertamente factible. (Si coloca un campo de id exclusivo en los objetos de su tarea, podrá identificar el hilo que lo está ejecutando).
  3. Detener el hilo de forma asincrónica.

Aunque no creo que un hilo detenido interfiera seriamente con los ejecutores (deben ser a prueba de fallas después de todo), existe una solución alternativa que no implica detener el hilo.

Siempre que sus tareas no modifiquen nada en otras partes del sistema (lo cual es una suposición razonable, de lo contrario no estaría tratando de derribarlas), lo que puede hacer es usar JDI para hacer estallar marcos de pila no deseados y salir de la tarea normalmente.

public class StoppableTask implements Runnable { private boolean stopped; private Runnable targetTask; private volatile Thread runner; private String id; public StoppableTask(TestTask targetTask) { this.targetTask = targetTask; this.id = UUID.randomUUID().toString(); } @Override public void run() { if( !stopped ) { runner = Thread.currentThread(); targetTask.run(); } else { System.out.println( "Task "+id+" stopped."); } } public Thread getRunner() { return runner; } public String getId() { return id; } }

Este es el ejecutable que envuelve todos tus otros ejecutables. Almacena una referencia al hilo de ejecución (será importante más adelante) y una identificación para que podamos encontrarlo con una llamada de JDI.

public class Main { public static void main(String[] args) throws IOException, IllegalConnectorArgumentsException, InterruptedException, IncompatibleThreadStateException, InvalidTypeException, ClassNotLoadedException { //connect to the virtual machine VirtualMachineManager manager = Bootstrap.virtualMachineManager(); VirtualMachine vm = null; for( AttachingConnector con : manager.attachingConnectors() ) { if( con instanceof SocketAttachingConnector ) { SocketAttachingConnector smac = (SocketAttachingConnector)con; Map<String,? extends Connector.Argument> arg = smac.defaultArguments(); arg.get( "port" ).setValue( "8000"); arg.get( "hostname" ).setValue( "localhost" ); vm = smac.attach( arg ); } } //start the test task ExecutorService service = Executors.newCachedThreadPool(); StoppableTask task = new StoppableTask( new TestTask() ); service.execute( task ); Thread.sleep( 1000 ); // iterate over all the threads for( ThreadReference thread : vm.allThreads() ) { //iterate over all the objects referencing the thread //could take a long time, limiting the number of referring //objects scanned is possible though, as not many objects will //reference our runner thread for( ObjectReference ob : thread.referringObjects( 0 ) ) { //this cast is safe, as no primitive values can reference a thread ReferenceType obType = (ReferenceType)ob.type(); //if thread is referenced by a stoppable task if( obType.name().equals( StoppableTask.class.getName() ) ) { StringReference taskId = (StringReference)ob.getValue( obType.fieldByName( "id" )); if( task.getId().equals( taskId.value() ) ) { //task with matching id found System.out.println( "Task "+task.getId()+" found."); //suspend thread thread.suspend(); Iterator<StackFrame> it = thread.frames().iterator(); while( it.hasNext() ) { StackFrame frame = it.next(); //find stack frame containing StoppableTask.run() if( ob.equals( frame.thisObject() ) ) { //pop all frames up to the frame below run() thread.popFrames( it.next() ); //set stopped to true ob.setValue( obType.fieldByName( "stopped") , vm.mirrorOf( true ) ); break; } } //resume thread thread.resume(); } } } } } }

Y como referencia, la llamada a la "biblioteca" con la que lo probé:

public class TestTask implements Runnable { @Override public void run() { long l = 0; while( true ) { l++; if( l % 1000000L == 0 ) System.out.print( "."); } } }

Puede probarlo iniciando la clase Main con la opción de línea de comando -agentlib:jdwp=transport=dt_socket,server=y,address=localhost:8000,timeout=5000,suspend=n . Funciona con dos advertencias. En primer lugar, si se está ejecutando un código nativo (este thisObject de un marco es nulo), tendrá que esperar hasta que haya terminado. En segundo lugar, finally no se invocan los bloques, por lo que varios recursos pueden estar filtrando.