java - Ejecutar un método en paralelo desde un método de llamada.
multithreading thread-safety (2)
Como sé, RestTemplate está bloqueando, se dice en ForkJoinPool JavaDoc en ForkJoinTask :
Los cálculos deben evitar los métodos o bloques sincronizados, y deben minimizar otros bloqueos de sincronización, además de unirse a otras tareas o usar sincronizadores como los Phasers que se anuncian para cooperar con la programación de fork / join. ...
Las tareas tampoco deben realizar bloqueo IO, ...
Llamada en llamada es redundante.
Y no necesitas dos ejecutores. También puede devolver un resultado parcial en getSyncData(DataRequest key)
. Esto se puede hacer así.
DataClient.java
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
// first executor
private ExecutorService service = Executors.newFixedThreadPool(15);
@Override
public List<DataResponse> getSyncData(DataRequest key) {
List<DataResponse> responseList = null;
DataFetcherResult response = null;
try {
response = getAsyncData(key);
responseList = response.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException ex) {
response.cancel(true);
responseList = response.getPartialResult();
}
return responseList;
}
@Override
public DataFetcherResult getAsyncData(DataRequest key) {
List<DataRequest> keys = generateKeys(key);
final List<Future<DataResponse>> responseList = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(keys.size());//assume keys is not null
for (final DataRequest _key : keys) {
responseList.add(service.submit(new Callable<DataResponse>() {
@Override
public DataResponse call() throws Exception {
DataResponse response = null;
try {
response = performDataRequest(_key);
} finally {
latch.countDown();
return response;
}
}
}));
}
return new DataFetcherResult(responseList, latch);
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys(DataRequest key) {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
private DataResponse performDataRequest(DataRequest key) {
// This will have all LogicA code here which is shown in my original design.
// everything as it is same..
return null;
}
}
DataFetcherResult.java
public class DataFetcherResult implements Future<List<DataResponse>> {
final List<Future<DataResponse>> futures;
final CountDownLatch latch;
public DataFetcherResult(List<Future<DataResponse>> futures, CountDownLatch latch) {
this.futures = futures;
this.latch = latch;
}
//non-blocking
public List<DataResponse> getPartialResult() {
List<DataResponse> result = new ArrayList<>(futures.size());
for (Future<DataResponse> future : futures) {
try {
result.add(future.isDone() ? future.get() : null);
//instead of null you can return new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
//ExecutionException or CancellationException could be thrown, especially if DataFetcherResult was cancelled
//you can handle them here and return DataResponse with corresponding DataErrorEnum and DataStatusEnum
}
}
return result;
}
@Override
public List<DataResponse> get() throws ExecutionException, InterruptedException {
List<DataResponse> result = new ArrayList<>(futures.size());
for (Future<DataResponse> future : futures) {
result.add(future.get());
}
return result;
}
@Override
public List<DataResponse> get(long timeout, TimeUnit timeUnit)
throws ExecutionException, InterruptedException, TimeoutException {
if (latch.await(timeout, timeUnit)) {
return get();
}
throw new TimeoutException();//or getPartialResult()
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = true;
for (Future<DataResponse> future : futures) {
cancelled &= future.cancel(mayInterruptIfRunning);
}
return cancelled;
}
@Override
public boolean isCancelled() {
boolean cancelled = true;
for (Future<DataResponse> future : futures) {
cancelled &= future.isCancelled();
}
return cancelled;
}
@Override
public boolean isDone() {
boolean done = true;
for (Future<DataResponse> future : futures) {
done &= future.isDone();
}
return done;
}
//and etc.
}
Lo escribí con CountDownLatch
y se ve genial, pero note que hay un matiz. Puede quedarse atascado por un momento en DataFetcherResult.get(long timeout, TimeUnit timeUnit)
porque DataFetcherResult.get(long timeout, TimeUnit timeUnit)
no está sincronizado con el estado del futuro. Y podría suceder que latch.getCount() == 0
pero no todos los futuros devolverían future.isDone() == true
al mismo tiempo. Porque ya han pasado latch.countDown();
inside por finally {}
El bloque de Callable pero no cambió el state
interno state
que todavía es igual a NEW
.
Y así, las llamadas a get()
interior de get(long timeout, TimeUnit timeUnit)
pueden causar un pequeño retraso.
Caso similar fue descrito aquí .
Obtener con timeout DataFetcherResult.get(...)
puede reescribirse usando futures future.get(long timeout, TimeUnit timeUnit)
y puede eliminar CountDownLatch
de una clase.
public List<DataResponse> get(long timeout, TimeUnit timeUnit)
throws ExecutionException, InterruptedException{
List<DataResponse> result = new ArrayList<>(futures.size());
long timeoutMs = timeUnit.toMillis(timeout);
boolean timeout = false;
for (Future<DataResponse> future : futures) {
long beforeGet = System.currentTimeMillis();
try {
if (!timeout && timeoutMs > 0) {
result.add(future.get(timeoutMs, TimeUnit.MILLISECONDS));
timeoutMs -= System.currentTimeMillis() - beforeGet;
} else {
if (future.isDone()) {
result.add(future.get());
} else {
//result.add(new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR)); ?
}
}
} catch (TimeoutException e) {
result.add(new DataResponse(DataErrorEnum.TIMEOUT, DataStatusEnum.ERROR));
timeout = true;
}
//you can also handle ExecutionException or CancellationException here
}
return result;
}
Este código se dio como ejemplo y debe probarse antes de usarlo en producción, pero parece legítimo :)
Tengo una biblioteca que está siendo utilizada por el cliente y están pasando el objeto DataRequest
que tiene userid
, timeout
y algunos otros campos. Ahora uso este objeto DataRequest
para crear una URL y luego hago una llamada HTTP utilizando RestTemplate
y mi servicio devuelve una respuesta JSON, la uso para hacer un objeto DataResponse
y les devuelvo este objeto DataResponse
.
A continuación se encuentra mi clase DataClient
utilizada por el cliente al DataRequest
objeto DataRequest
. Estoy utilizando el valor de tiempo de espera pasado por el cliente en DataRequest
para DataRequest
tiempo de la solicitud si está tomando demasiado tiempo en el método getSyncData
.
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
// first executor
private ExecutorService service = Executors.newFixedThreadPool(15);
@Override
public DataResponse getSyncData(DataRequest key) {
DataResponse response = null;
Future<DataResponse> responseFuture = null;
try {
responseFuture = getAsyncData(key);
response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException ex) {
response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
responseFuture.cancel(true);
// logging exception here
}
return response;
}
@Override
public Future<DataResponse> getAsyncData(DataRequest key) {
DataFetcherTask task = new DataFetcherTask(key, restTemplate);
Future<DataResponse> future = service.submit(task);
return future;
}
}
Clase DataFetcherTask
:
public class DataFetcherTask implements Callable<DataResponse> {
private DataRequest key;
private RestTemplate restTemplate;
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() throws Exception {
// In a nutshell below is what I am doing here.
// 1. Make an url using DataRequest key.
// 2. And then execute the url RestTemplate.
// 3. Make a DataResponse object and return it.
// I am calling this whole logic in call method as LogicA
}
}
A partir de ahora, mi clase DataFetcherTask
es responsable de una clave DataRequest
como se muestra arriba.
Planteamiento del problema:-
Ahora tengo un pequeño cambio de diseño. El cliente pasará el objeto DataRequest
(por ejemplo, keyA) a mi biblioteca y luego haré una nueva llamada http a otro servicio (que no estoy haciendo en mi diseño actual) usando el ID de usuario presente en el objeto DataRequest
(keyA) que dará Vuelvo a la lista de ID de usuario, así que DataRequest
esos ID de usuario y haré algunos otros DataRequest
(keyB, keyC, keyD) uno por cada ID de usuario devuelto en la respuesta. Y luego List<DataRequest>
objeto List<DataRequest>
que tendrá los objetos keyB, keyC y keyD DataRequest
. El elemento máximo en la List<DataRequest>
será tres, eso es todo.
Ahora, para cada uno de esos objetos DataRequest
en la List<DataRequest>
, quiero ejecutar el método DataFetcherTask.call
en paralelo y luego hacer la List<DataResponse>
agregando cada DataResponse
para cada clave. Así que tendré tres llamadas paralelas a DataFetcherTask.call
. La idea detrás de esta llamada paralela es obtener los datos de todas esas tres claves como máximo en el mismo valor de tiempo de espera global.
Por lo tanto, mi propuesta es: la clase DataFetcherTask
devolverá el objeto List<DataResponse>
lugar de DataResponse
y luego la firma de getSyncData
y el método getAsyncData
también cambiará. Así que aquí está el algoritmo:
- Use el objeto DataRequest pasado por el cliente para hacer la
List<DataRequest>
llamando a otro servicio HTTP. - Realice una llamada paralela para cada
DataRequest
en elDataRequest
List<DataRequest>
aDataFetcherTask.call
y devuelva el objetoList<DataResponse>
al cliente en lugar de aDataResponse
.
De esta manera, puedo aplicar el mismo tiempo de espera global en el paso 1 junto con el paso 2 también. Si cualquiera de los pasos anteriores está tomando tiempo, solo tendremos un tiempo de espera en el método getSyncData
.
Clase DataFetcherTask
después del cambio de diseño:
public class DataFetcherTask implements Callable<List<DataResponse>> {
private DataRequest key;
private RestTemplate restTemplate;
// second executor here
private ExecutorService executorService = Executors.newFixedThreadPool(10);
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public List<DataResponse> call() throws Exception {
List<DataRequest> keys = generateKeys();
CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);
int count = 0;
for (final DataRequest key : keys) {
comp.submit(new Callable<DataResponse>() {
@Override
public DataResponse call() throws Exception {
return performDataRequest(key);
}
});
}
List<DataResponse> responseList = new ArrayList<DataResponse>();
while (count-- > 0) {
Future<DataResponse> future = comp.take();
responseList.add(future.get());
}
return responseList;
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys() {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
private DataResponse performDataRequest(DataRequest key) {
// This will have all LogicA code here which is shown in my original design.
// everything as it is same..
}
}
Ahora mi pregunta es -
- ¿Tiene que ser así? ¿Cuál es el diseño correcto para resolver este problema? Me refiero a que tener el método de
call
en otro método decall
parece extraño? - ¿Necesitamos tener dos ejecutores como los que tengo en mi código? ¿Hay alguna forma mejor de resolver este problema o cualquier tipo de simplificación / cambio de diseño que podamos hacer aquí?
He simplificado el código para que la idea aclare lo que estoy tratando de hacer ...
Como ya se mencionó en los comentarios de su pregunta, puede usar el framework ForkJoin de Java. Esto le ahorrará el grupo de subprocesos extra dentro de su DataFetcherTask
.
Simplemente necesita usar un ForkJoinPool
en su DataClient
y convertir su DataFetcherTask
en un RecursiveTask
(uno de los subtipos de ForkJoinTask
). Esto le permite ejecutar fácilmente otras subtareas en paralelo.
Entonces, después de estas modificaciones, su código tendrá un aspecto similar al siguiente:
DataFetcherTask
La DataFetcherTask
ahora es una DataFetcherTask
RecursiveTask
que primero genera las claves e invoca subtareas para cada clave generada. Estas subtareas se ejecutan en el mismo ForkJoinPool
que la tarea principal.
public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {
private final DataRequest key;
private final RestTemplate restTemplate;
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
protected List<DataResponse> compute() {
// Create subtasks for the key and invoke them
List<DataRequestTask> requestTasks = requestTasks(generateKeys());
invokeAll(requestTasks);
// All tasks are finished if invokeAll() returns.
List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
for (DataRequestTask task : requestTasks) {
try {
responseList.add(task.get());
} catch (InterruptedException | ExecutionException e) {
// TODO - Handle exception properly
Thread.currentThread().interrupt();
return Collections.emptyList();
}
}
return responseList;
}
private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
List<DataRequestTask> tasks = new ArrayList<>(keys.size());
for (DataRequest key : keys) {
tasks.add(new DataRequestTask(key));
}
return tasks;
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys() {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
/** Inner class for the subtasks. */
private static class DataRequestTask extends RecursiveTask<DataResponse> {
private final DataRequest request;
public DataRequestTask(DataRequest request) {
this.request = request;
}
@Override
protected DataResponse compute() {
return performDataRequest(this.request);
}
private DataResponse performDataRequest(DataRequest key) {
// This will have all LogicA code here which is shown in my original design.
// everything as it is same..
return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK);
}
}
}
DataClient
El DataClient
no cambiará mucho a excepción del nuevo conjunto de subprocesos:
public class DataClient implements Client {
private final RestTemplate restTemplate = new RestTemplate();
// Replace the ExecutorService with a ForkJoinPool
private final ForkJoinPool service = new ForkJoinPool(15);
@Override
public List<DataResponse> getSyncData(DataRequest key) {
List<DataResponse> responsList = null;
Future<List<DataResponse>> responseFuture = null;
try {
responseFuture = getAsyncData(key);
responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException | ExecutionException | InterruptedException ex) {
responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));
responseFuture.cancel(true);
// logging exception here
}
return responsList;
}
@Override
public Future<List<DataResponse>> getAsyncData(DataRequest key) {
DataFetcherTask task = new DataFetcherTask(key, this.restTemplate);
return this.service.submit(task);
}
}
Una vez que esté en Java8, puede considerar cambiar la implementación a CompletableFuture
s. Entonces se vería algo así:
DataClientCF
public class DataClientCF {
private final RestTemplate restTemplate = new RestTemplate();
private final ExecutorService executor = Executors.newFixedThreadPool(15);
public List<DataResponse> getData(DataRequest initialKey) {
return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor)
.thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList()))
.thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList()))
.exceptionally(t -> { throw new RuntimeException(t); })
.join();
}
private List<DataRequest> generateKeys(DataRequest key) {
return new ArrayList<>();
}
private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) {
return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor);
}
}
Como se mencionó en los comentarios, Guava''s ListenableFuture
s proporcionaría una funcionalidad similar para Java7, pero sin Lambdas tienden a volverse torpes.