java multithreading thread-safety executorservice callable

java - Ejecutar un método en paralelo desde un método de llamada.



multithreading thread-safety (2)

Como sé, RestTemplate está bloqueando, se dice en ForkJoinPool JavaDoc en ForkJoinTask :

Los cálculos deben evitar los métodos o bloques sincronizados, y deben minimizar otros bloqueos de sincronización, además de unirse a otras tareas o usar sincronizadores como los Phasers que se anuncian para cooperar con la programación de fork / join. ...
Las tareas tampoco deben realizar bloqueo IO, ...

Llamada en llamada es redundante.
Y no necesitas dos ejecutores. También puede devolver un resultado parcial en getSyncData(DataRequest key) . Esto se puede hacer así.

DataClient.java

public class DataClient implements Client { private RestTemplate restTemplate = new RestTemplate(); // first executor private ExecutorService service = Executors.newFixedThreadPool(15); @Override public List<DataResponse> getSyncData(DataRequest key) { List<DataResponse> responseList = null; DataFetcherResult response = null; try { response = getAsyncData(key); responseList = response.get(key.getTimeout(), key.getTimeoutUnit()); } catch (TimeoutException ex) { response.cancel(true); responseList = response.getPartialResult(); } return responseList; } @Override public DataFetcherResult getAsyncData(DataRequest key) { List<DataRequest> keys = generateKeys(key); final List<Future<DataResponse>> responseList = new ArrayList<>(); final CountDownLatch latch = new CountDownLatch(keys.size());//assume keys is not null for (final DataRequest _key : keys) { responseList.add(service.submit(new Callable<DataResponse>() { @Override public DataResponse call() throws Exception { DataResponse response = null; try { response = performDataRequest(_key); } finally { latch.countDown(); return response; } } })); } return new DataFetcherResult(responseList, latch); } // In this method I am making a HTTP call to another service // and then I will make List<DataRequest> accordingly. private List<DataRequest> generateKeys(DataRequest key) { List<DataRequest> keys = new ArrayList<>(); // use key object which is passed in contructor to make HTTP call to another service // and then make List of DataRequest object and return keys. return keys; } private DataResponse performDataRequest(DataRequest key) { // This will have all LogicA code here which is shown in my original design. // everything as it is same.. return null; } }

DataFetcherResult.java

public class DataFetcherResult implements Future<List<DataResponse>> { final List<Future<DataResponse>> futures; final CountDownLatch latch; public DataFetcherResult(List<Future<DataResponse>> futures, CountDownLatch latch) { this.futures = futures; this.latch = latch; } //non-blocking public List<DataResponse> getPartialResult() { List<DataResponse> result = new ArrayList<>(futures.size()); for (Future<DataResponse> future : futures) { try { result.add(future.isDone() ? future.get() : null); //instead of null you can return new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); //ExecutionException or CancellationException could be thrown, especially if DataFetcherResult was cancelled //you can handle them here and return DataResponse with corresponding DataErrorEnum and DataStatusEnum } } return result; } @Override public List<DataResponse> get() throws ExecutionException, InterruptedException { List<DataResponse> result = new ArrayList<>(futures.size()); for (Future<DataResponse> future : futures) { result.add(future.get()); } return result; } @Override public List<DataResponse> get(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { if (latch.await(timeout, timeUnit)) { return get(); } throw new TimeoutException();//or getPartialResult() } @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = true; for (Future<DataResponse> future : futures) { cancelled &= future.cancel(mayInterruptIfRunning); } return cancelled; } @Override public boolean isCancelled() { boolean cancelled = true; for (Future<DataResponse> future : futures) { cancelled &= future.isCancelled(); } return cancelled; } @Override public boolean isDone() { boolean done = true; for (Future<DataResponse> future : futures) { done &= future.isDone(); } return done; } //and etc. }

Lo escribí con CountDownLatch y se ve genial, pero note que hay un matiz. Puede quedarse atascado por un momento en DataFetcherResult.get(long timeout, TimeUnit timeUnit) porque DataFetcherResult.get(long timeout, TimeUnit timeUnit) no está sincronizado con el estado del futuro. Y podría suceder que latch.getCount() == 0 pero no todos los futuros devolverían future.isDone() == true al mismo tiempo. Porque ya han pasado latch.countDown(); inside por finally {} El bloque de Callable pero no cambió el state interno state que todavía es igual a NEW .
Y así, las llamadas a get() interior de get(long timeout, TimeUnit timeUnit) pueden causar un pequeño retraso.
Caso similar fue descrito aquí .

Obtener con timeout DataFetcherResult.get(...) puede reescribirse usando futures future.get(long timeout, TimeUnit timeUnit) y puede eliminar CountDownLatch de una clase.

public List<DataResponse> get(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException{ List<DataResponse> result = new ArrayList<>(futures.size()); long timeoutMs = timeUnit.toMillis(timeout); boolean timeout = false; for (Future<DataResponse> future : futures) { long beforeGet = System.currentTimeMillis(); try { if (!timeout && timeoutMs > 0) { result.add(future.get(timeoutMs, TimeUnit.MILLISECONDS)); timeoutMs -= System.currentTimeMillis() - beforeGet; } else { if (future.isDone()) { result.add(future.get()); } else { //result.add(new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR)); ? } } } catch (TimeoutException e) { result.add(new DataResponse(DataErrorEnum.TIMEOUT, DataStatusEnum.ERROR)); timeout = true; } //you can also handle ExecutionException or CancellationException here } return result; }

Este código se dio como ejemplo y debe probarse antes de usarlo en producción, pero parece legítimo :)

Tengo una biblioteca que está siendo utilizada por el cliente y están pasando el objeto DataRequest que tiene userid , timeout y algunos otros campos. Ahora uso este objeto DataRequest para crear una URL y luego hago una llamada HTTP utilizando RestTemplate y mi servicio devuelve una respuesta JSON, la uso para hacer un objeto DataResponse y les devuelvo este objeto DataResponse .

A continuación se encuentra mi clase DataClient utilizada por el cliente al DataRequest objeto DataRequest . Estoy utilizando el valor de tiempo de espera pasado por el cliente en DataRequest para DataRequest tiempo de la solicitud si está tomando demasiado tiempo en el método getSyncData .

public class DataClient implements Client { private RestTemplate restTemplate = new RestTemplate(); // first executor private ExecutorService service = Executors.newFixedThreadPool(15); @Override public DataResponse getSyncData(DataRequest key) { DataResponse response = null; Future<DataResponse> responseFuture = null; try { responseFuture = getAsyncData(key); response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit()); } catch (TimeoutException ex) { response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR); responseFuture.cancel(true); // logging exception here } return response; } @Override public Future<DataResponse> getAsyncData(DataRequest key) { DataFetcherTask task = new DataFetcherTask(key, restTemplate); Future<DataResponse> future = service.submit(task); return future; } }

Clase DataFetcherTask :

public class DataFetcherTask implements Callable<DataResponse> { private DataRequest key; private RestTemplate restTemplate; public DataFetcherTask(DataRequest key, RestTemplate restTemplate) { this.key = key; this.restTemplate = restTemplate; } @Override public DataResponse call() throws Exception { // In a nutshell below is what I am doing here. // 1. Make an url using DataRequest key. // 2. And then execute the url RestTemplate. // 3. Make a DataResponse object and return it. // I am calling this whole logic in call method as LogicA } }

A partir de ahora, mi clase DataFetcherTask es responsable de una clave DataRequest como se muestra arriba.

Planteamiento del problema:-

Ahora tengo un pequeño cambio de diseño. El cliente pasará el objeto DataRequest (por ejemplo, keyA) a mi biblioteca y luego haré una nueva llamada http a otro servicio (que no estoy haciendo en mi diseño actual) usando el ID de usuario presente en el objeto DataRequest (keyA) que dará Vuelvo a la lista de ID de usuario, así que DataRequest esos ID de usuario y haré algunos otros DataRequest (keyB, keyC, keyD) uno por cada ID de usuario devuelto en la respuesta. Y luego List<DataRequest> objeto List<DataRequest> que tendrá los objetos keyB, keyC y keyD DataRequest . El elemento máximo en la List<DataRequest> será tres, eso es todo.

Ahora, para cada uno de esos objetos DataRequest en la List<DataRequest> , quiero ejecutar el método DataFetcherTask.call en paralelo y luego hacer la List<DataResponse> agregando cada DataResponse para cada clave. Así que tendré tres llamadas paralelas a DataFetcherTask.call . La idea detrás de esta llamada paralela es obtener los datos de todas esas tres claves como máximo en el mismo valor de tiempo de espera global.

Por lo tanto, mi propuesta es: la clase DataFetcherTask devolverá el objeto List<DataResponse> lugar de DataResponse y luego la firma de getSyncData y el método getAsyncData también cambiará. Así que aquí está el algoritmo:

  • Use el objeto DataRequest pasado por el cliente para hacer la List<DataRequest> llamando a otro servicio HTTP.
  • Realice una llamada paralela para cada DataRequest en el DataRequest List<DataRequest> a DataFetcherTask.call y devuelva el objeto List<DataResponse> al cliente en lugar de a DataResponse .

De esta manera, puedo aplicar el mismo tiempo de espera global en el paso 1 junto con el paso 2 también. Si cualquiera de los pasos anteriores está tomando tiempo, solo tendremos un tiempo de espera en el método getSyncData .

Clase DataFetcherTask después del cambio de diseño:

public class DataFetcherTask implements Callable<List<DataResponse>> { private DataRequest key; private RestTemplate restTemplate; // second executor here private ExecutorService executorService = Executors.newFixedThreadPool(10); public DataFetcherTask(DataRequest key, RestTemplate restTemplate) { this.key = key; this.restTemplate = restTemplate; } @Override public List<DataResponse> call() throws Exception { List<DataRequest> keys = generateKeys(); CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService); int count = 0; for (final DataRequest key : keys) { comp.submit(new Callable<DataResponse>() { @Override public DataResponse call() throws Exception { return performDataRequest(key); } }); } List<DataResponse> responseList = new ArrayList<DataResponse>(); while (count-- > 0) { Future<DataResponse> future = comp.take(); responseList.add(future.get()); } return responseList; } // In this method I am making a HTTP call to another service // and then I will make List<DataRequest> accordingly. private List<DataRequest> generateKeys() { List<DataRequest> keys = new ArrayList<>(); // use key object which is passed in contructor to make HTTP call to another service // and then make List of DataRequest object and return keys. return keys; } private DataResponse performDataRequest(DataRequest key) { // This will have all LogicA code here which is shown in my original design. // everything as it is same.. } }

Ahora mi pregunta es -

  • ¿Tiene que ser así? ¿Cuál es el diseño correcto para resolver este problema? Me refiero a que tener el método de call en otro método de call parece extraño?
  • ¿Necesitamos tener dos ejecutores como los que tengo en mi código? ¿Hay alguna forma mejor de resolver este problema o cualquier tipo de simplificación / cambio de diseño que podamos hacer aquí?

He simplificado el código para que la idea aclare lo que estoy tratando de hacer ...


Como ya se mencionó en los comentarios de su pregunta, puede usar el framework ForkJoin de Java. Esto le ahorrará el grupo de subprocesos extra dentro de su DataFetcherTask .

Simplemente necesita usar un ForkJoinPool en su DataClient y convertir su DataFetcherTask en un RecursiveTask (uno de los subtipos de ForkJoinTask ). Esto le permite ejecutar fácilmente otras subtareas en paralelo.

Entonces, después de estas modificaciones, su código tendrá un aspecto similar al siguiente:

DataFetcherTask

La DataFetcherTask ahora es una DataFetcherTask RecursiveTask que primero genera las claves e invoca subtareas para cada clave generada. Estas subtareas se ejecutan en el mismo ForkJoinPool que la tarea principal.

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> { private final DataRequest key; private final RestTemplate restTemplate; public DataFetcherTask(DataRequest key, RestTemplate restTemplate) { this.key = key; this.restTemplate = restTemplate; } @Override protected List<DataResponse> compute() { // Create subtasks for the key and invoke them List<DataRequestTask> requestTasks = requestTasks(generateKeys()); invokeAll(requestTasks); // All tasks are finished if invokeAll() returns. List<DataResponse> responseList = new ArrayList<>(requestTasks.size()); for (DataRequestTask task : requestTasks) { try { responseList.add(task.get()); } catch (InterruptedException | ExecutionException e) { // TODO - Handle exception properly Thread.currentThread().interrupt(); return Collections.emptyList(); } } return responseList; } private List<DataRequestTask> requestTasks(List<DataRequest> keys) { List<DataRequestTask> tasks = new ArrayList<>(keys.size()); for (DataRequest key : keys) { tasks.add(new DataRequestTask(key)); } return tasks; } // In this method I am making a HTTP call to another service // and then I will make List<DataRequest> accordingly. private List<DataRequest> generateKeys() { List<DataRequest> keys = new ArrayList<>(); // use key object which is passed in contructor to make HTTP call to another service // and then make List of DataRequest object and return keys. return keys; } /** Inner class for the subtasks. */ private static class DataRequestTask extends RecursiveTask<DataResponse> { private final DataRequest request; public DataRequestTask(DataRequest request) { this.request = request; } @Override protected DataResponse compute() { return performDataRequest(this.request); } private DataResponse performDataRequest(DataRequest key) { // This will have all LogicA code here which is shown in my original design. // everything as it is same.. return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK); } } }

DataClient

El DataClient no cambiará mucho a excepción del nuevo conjunto de subprocesos:

public class DataClient implements Client { private final RestTemplate restTemplate = new RestTemplate(); // Replace the ExecutorService with a ForkJoinPool private final ForkJoinPool service = new ForkJoinPool(15); @Override public List<DataResponse> getSyncData(DataRequest key) { List<DataResponse> responsList = null; Future<List<DataResponse>> responseFuture = null; try { responseFuture = getAsyncData(key); responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit()); } catch (TimeoutException | ExecutionException | InterruptedException ex) { responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR)); responseFuture.cancel(true); // logging exception here } return responsList; } @Override public Future<List<DataResponse>> getAsyncData(DataRequest key) { DataFetcherTask task = new DataFetcherTask(key, this.restTemplate); return this.service.submit(task); } }

Una vez que esté en Java8, puede considerar cambiar la implementación a CompletableFuture s. Entonces se vería algo así:

DataClientCF

public class DataClientCF { private final RestTemplate restTemplate = new RestTemplate(); private final ExecutorService executor = Executors.newFixedThreadPool(15); public List<DataResponse> getData(DataRequest initialKey) { return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor) .thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList())) .thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList())) .exceptionally(t -> { throw new RuntimeException(t); }) .join(); } private List<DataRequest> generateKeys(DataRequest key) { return new ArrayList<>(); } private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) { return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor); } }

Como se mencionó en los comentarios, Guava''s ListenableFuture s proporcionaría una funcionalidad similar para Java7, pero sin Lambdas tienden a volverse torpes.