java interrupt java-9 filechannel

java - ¿Hay una manera de prevenir ClosedByInterruptException?



java-9 filechannel (3)

Al utilizar un canal de archivos asíncrono, la excepción ClosedByInterruptException nunca se produce. Simplemente no parece preocuparse por la interrupción.

Prueba realizada usando jdk 1.8.0_72

import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.concurrent.atomic.AtomicLong; public class A { static volatile boolean running = true; public static void main(String[] args) throws IOException, InterruptedException { String name = "delete.me"; Path path = new File(name).toPath(); AtomicLong position = new AtomicLong(0); AsynchronousFileChannel fc = AsynchronousFileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.DELETE_ON_CLOSE , StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.WRITE, StandardOpenOption.SYNC); CompletionHandler<Integer, Object> handler = new CompletionHandler<Integer, Object>() { @Override public void completed(Integer result, Object attachment) { //System.out.println(attachment + " completed with " + result + " bytes written"); position.getAndAdd(result); } @Override public void failed(Throwable e, Object attachment) { System.err.println(attachment + " failed with:"); e.printStackTrace(); } }; Runnable monitorRun = () -> { try { while (running) { System.out.println(name + " is " + (fc.size() >> 10) + " KB"); try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("Interrupted"); Thread.currentThread().interrupt(); System.out.println("Interrupt call failed so return"); return; } } } catch (IOException e) { System.err.println("Monitor thread died"); e.printStackTrace(); } }; Thread monitor = new Thread(monitorRun); monitor.setDaemon(true); monitor.start(); Thread writer = new Thread(() -> { ByteBuffer bb = ByteBuffer.allocateDirect(32); try { while (running) { bb.position(0).limit(32); fc.write(bb,position.get(),null,handler); try { Thread.sleep(10); } catch (InterruptedException e) { System.out.println("Interrupted"); Thread.currentThread().interrupt(); } } } catch (Exception e) { System.err.println("Writer thread died"); e.printStackTrace(); } }); writer.setDaemon(true); writer.start(); Thread.sleep(5000); monitor.interrupt(); Thread.sleep(2000); monitor = new Thread(monitorRun); monitor.start(); Thread.sleep(5000); running = false; fc.close(); } }

Genera la siguiente salida:

delete.me is 0 KB delete.me is 3 KB delete.me is 6 KB delete.me is 9 KB delete.me is 12 KB Interrupted Interrupt call failed so return delete.me is 21 KB delete.me is 24 KB delete.me is 27 KB delete.me is 30 KB delete.me is 33 KB

En el siguiente ejemplo, tengo un archivo utilizado por dos subprocesos (en el ejemplo real podría tener cualquier número de subprocesos)

import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class A { static volatile boolean running = true; public static void main(String[] args) throws IOException, InterruptedException { String name = "delete.me"; new File(name).deleteOnExit(); RandomAccessFile raf = new RandomAccessFile(name, "rw"); FileChannel fc = raf.getChannel(); Thread monitor = new Thread(() -> { try { while (running) { System.out.println(name + " is " + (fc.size() >> 10) + " KB"); try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("Interrupted"); Thread.currentThread().interrupt(); } } } catch (IOException e) { System.err.println("Monitor thread died"); e.printStackTrace(); } }); monitor.setDaemon(true); monitor.start(); Thread writer = new Thread(() -> { ByteBuffer bb = ByteBuffer.allocateDirect(32); try { while (running) { bb.position(0).limit(32); fc.write(bb); try { Thread.sleep(10); } catch (InterruptedException e) { System.out.println("Interrupted"); Thread.currentThread().interrupt(); } } } catch (IOException e) { System.err.println("Writer thread died"); e.printStackTrace(); } }); writer.setDaemon(true); writer.start(); Thread.sleep(5000); monitor.interrupt(); Thread.sleep(2000); running = false; raf.close(); } }

En lugar de crear un RandomAccessFile y una asignación de memoria para cada subproceso, tengo un archivo y una asignación de memoria compartida entre subprocesos, pero hay un problema: si se interrumpe algún subproceso, se cierra el recurso.

delete.me is 0 KB delete.me is 2 KB delete.me is 4 KB delete.me is 6 KB delete.me is 8 KB Interrupted Monitor thread died java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:315) at A.lambda$main$0(A.java:19) at java.lang.Thread.run(Thread.java:748) Writer thread died java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:199) at A.lambda$main$1(A.java:41) at java.lang.Thread.run(Thread.java:748)

¿Hay alguna manera de evitar que FileChannel se cierre solo porque se interrumpió un hilo que lo usaba?

EDITAR Lo que quiero evitar es porque sospecho que no funcionará para Java 9+

private void doNotCloseOnInterrupt(FileChannel fc) { try { Field field = AbstractInterruptibleChannel.class .getDeclaredField("interruptor"); field.setAccessible(true); field.set(fc, (Interruptible) thread -> Jvm.warn().on(getClass(), fc + " not closed on interrupt")); } catch (Exception e) { Jvm.warn().on(getClass(), "Couldn''t disable close on interrupt", e); } }

Por cierto, la llamada a fc.size() devuelve el tamaño como se esperaba con el truco anterior.


Como dijo que desea "una asignación de memoria compartida entre subprocesos", no existe ningún problema de este tipo, ya que la asignación de memoria no se ve afectada por el cierre de un FileChannel . De hecho, es una buena estrategia cerrar el canal lo antes posible, para reducir los recursos que tiene la aplicación.

P.ej

static volatile boolean running = true; public static void main(String[] args) throws IOException { Path name = Paths.get("delete.me"); MappedByteBuffer mapped; try(FileChannel fc1 = FileChannel.open(name, READ,WRITE,CREATE_NEW,DELETE_ON_CLOSE)) { mapped = fc1.map(FileChannel.MapMode.READ_WRITE, 0, 4096); } Thread thread1 = new Thread(() -> { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50)); while(running && !Thread.interrupted()) { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100)); byte[] b = new byte[5]; mapped.position(4000); mapped.get(b); System.out.println("read "+new String(b, StandardCharsets.US_ASCII)); } }); thread1.setDaemon(true); thread1.start(); Thread thread2 = new Thread(() -> { byte[] b = "HELLO".getBytes(StandardCharsets.US_ASCII); while(running && !Thread.interrupted()) { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100)); mapped.position(4000); mapped.put(b); System.out.println("wrote "+new String(b, StandardCharsets.US_ASCII)); byte b1 = b[0]; System.arraycopy(b, 1, b, 0, b.length-1); b[b.length-1] = b1; } mapped.force(); }); thread2.setDaemon(true); thread2.start(); LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5)); thread2.interrupt(); LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2)); running = false;

Esto demuestra cómo los hilos pueden leer y escribir sus datos después de que el canal se haya cerrado e interrumpir el hilo de escritura no detiene el hilo de lectura.

Si necesita realizar operaciones de FileChannel además de la E / S asignada en memoria, no hay ningún problema en el uso de varias instancias de FileChannel , por lo que cerrar un canal no afecta al otro. P.ej

static volatile boolean running = true; public static void main(String[] args) throws IOException { Path name = Paths.get("delete.me"); try(FileChannel fc1 = FileChannel.open(name,READ,WRITE,CREATE_NEW,DELETE_ON_CLOSE); FileChannel fc2 = FileChannel.open(name,READ,WRITE)) { Thread thread1 = new Thread(() -> { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50)); try { MappedByteBuffer mapped = fc1.map(FileChannel.MapMode.READ_WRITE, 0, 4096); while(running && !Thread.interrupted()) { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100)); byte[] b = new byte[5]; mapped.position(4000); mapped.get(b); System.out.println("read from map " +new String(b, StandardCharsets.US_ASCII) +", file size "+fc1.size()); } }catch(IOException ex) { ex.printStackTrace(); } }); thread1.setDaemon(true); thread1.start(); Thread thread2 = new Thread(() -> { byte[] b = "HELLO".getBytes(StandardCharsets.US_ASCII); try { MappedByteBuffer mapped = fc2.map(FileChannel.MapMode.READ_WRITE, 0, 4096); fc2.position(4096); try { while(running && !Thread.interrupted()) { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100)); mapped.position(4000); mapped.put(b); System.out.println("wrote to mapped " +new String(b, StandardCharsets.US_ASCII)); byte b1 = b[0]; System.arraycopy(b, 1, b, 0, b.length-1); b[b.length-1] = b1; fc2.write(ByteBuffer.wrap(b)); } } finally { mapped.force(); } }catch(IOException ex) { ex.printStackTrace(); } }); thread2.setDaemon(true); thread2.start(); LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5)); thread2.interrupt(); LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2)); running = false; } }

Aquí, la interrupción de un hilo cierra su canal, pero no afecta al otro. Además, incluso cuando cada hilo adquiere su propio MappedByteBuffer de su propio canal, los cambios se muestran a través del otro, incluso sin el uso de force() . Por supuesto, este último se define como un comportamiento dependiente del sistema, no se garantiza que funcione en todos los sistemas.

Pero como se muestra con el primer ejemplo, aún puede crear búferes compartidos desde solo uno de los canales al comienzo, mientras realiza las operaciones de E / S en un canal diferente, uno por hilo, y no importa si y qué canales se cierran, los buffers asignados no se ven afectados por ello.


Puede utilizar la reflexión para acceder ilegalmente al campo interruptor y obtener el tipo de clase sun.nio.ch.Interruptible desde allí para crear una instancia de proxy:

private void doNotCloseOnInterrupt(FileChannel fc) { try { Field field = AbstractInterruptibleChannel.class.getDeclaredField("interruptor"); Class<?> interruptibleClass = field.getType(); field.setAccessible(true); field.set(fc, Proxy.newProxyInstance( interruptibleClass.getClassLoader(), new Class[] { interruptibleClass }, new InterruptibleInvocationHandler())); } catch (final Exception e) { Jvm.warn().on(getClass(), "Couldn''t disable close on interrupt", e); } } public class InterruptibleInvocationHandler implements InvocationHandler { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // TODO: Check method and handle accordingly return null; } }

En Java9, esto funciona con una sola advertencia, ya que se ejecuta por defecto con --illegal-access=permit .

Sin embargo, este indicador podría eliminarse en futuras versiones y la mejor manera de garantizar que funcione a largo plazo es usar el indicador --add-opens :

--add-opens java.base/sun.nio.ch=your-module --add-opens java.base/java.nio.channels.spi=your-module

O, si no está trabajando con módulos (no recomendado):

--add-opens java.base/sun.nio.ch=ALL-UNNAMED --add-opens java.base/java.nio.channels.spi=ALL-UNNAMED

Esto funciona con Java 9, Java 10 y la compilación actual de acceso anticipado JDK 11 (28 (2018/8/23)).