java - @Async no funciona en Spring API con interfaces
multithreading asynchronous (5)
Estoy trabajando con @Async para almacenar algunos datos en paralelo en la base de datos con hibernación. Necesito hacer eso porque antes de guardar la información en la base de datos necesito ejecutar alguna tarea que demore varios minutos. Así que implementé @Async.
El problema es que @Async parece no estar funcionando. Por favor encuentre el código a continuación:
WebConfig
@Configuration
@EnableAsync
@EnableWebMvc
public class WebConfig extends WebMvcConfigurerAdapter {
}
StudentServiceImpl:
@Autowired
RunSomeTaskService runSomeTaskService;
@Override
Transactional
public Response saveWithoutWaiting(StudentBO[] students, String username) throws Exception {
...
for (StudentBO student : students) {
....
Future<Response> response = runSomeTaskService.doTasks(student);
//Finish without waiting for doTasks().
}
@Override
Transactional
public Response saveWithWaiting(StudentBO[] students, String username) throws Exception {
...
for (StudentBO student : students) {
....
Future<Response> response = runSomeTaskService.doTasks(student);
//Finish and wait for doTasks().
response.get();
}
RunSomeTaskService:
public interface RunSomeTaskService{
@Async
public Future<Response> doTasks(Student student);
}
RunSomeTaskServiceImpl:
public class RunSomeTaskServiceImpl extends CommonService implements RunSomeTaskService{
Student student;
@Override
public Future<Response> doTasks(Student student) {
Response response = new Response();
this.student = student;
//do Task
return new AsyncResult<Response>(response);
}
}
web.xml
<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
<display-name>Sample Spring Maven Project</display-name>
<servlet>
<servlet-name>mvc-dispatcher</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>/WEB-INF/spring-config.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
<async-supported>true</async-supported>
</servlet>
<servlet-mapping>
<servlet-name>mvc-dispatcher</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<filter>
<filter-name>encodingFilter</filter-name>
<filter-class>
org.springframework.web.filter.CharacterEncodingFilter
</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>encodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<filter>
<filter-name>jwtTokenAuthFilter</filter-name>
<filter-class>org.springframework.web.filter.DelegatingFilterProxy</filter-class>
</filter>
<filter-mapping>
<filter-name>jwtTokenAuthFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
</web-app>
spring.config.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<context:annotation-config />
<context:component-scan base-package="com.app.controller" />
<tx:annotation-driven transaction-manager="transactionManager"/>
<mvc:annotation-driven />
<bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
...
</bean>
<bean id="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl">
...
</bean>
<bean id="sessionFactory"
class="org.springframework.orm.hibernate5.LocalSessionFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="annotatedClasses">
<list>
<value>//every model generated with Hibernate</value>
</list>
</property>
<property name="hibernateProperties">
<props>
<prop key="hibernate.dialect">org.hibernate.dialect.MySQL5Dialect</prop>
<prop key="hibernate.show_sql">true</prop>
</props>
</property>
</bean>
<bean id="transactionManager"
class="org.springframework.orm.hibernate5.HibernateTransactionManager">
<property name="sessionFactory" ref="sessionFactory" />
</bean>
<bean id="persistenceExceptionTranslationPostProcessor"
class="org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor" />
<bean id="studentService" class="com.app.services.StudentServiceImpl"></bean>
<bean id="studentDao" class="com.app.dao.StudentDaoImpl"></bean>
...
<bean id="jwtTokenAuthFilter" class="com.app.security.JWTTokenAuthFilter" />
</beans>
Entonces, ¿podría ayudarme a comprender por qué @Async no está funcionando?
ACTUALIZACIÓN: @Async ahora está funcionando, pero no estoy obteniendo los resultados esperados.
Para el caso, que tengo que esperar el resultado (caso de sincronización) CompletableFuture.get () no está esperando la respuesta y recibo un error:
Mi código:
CompletableFuture<Response> res = extractDataService.doTask(student);
El error:
org.hibernate.HibernateException: Illegal attempt to associate a collection with two open sessions. Collection : <unknown>
Collection contents: [[]]
at org.hibernate.collection.internal.AbstractPersistentCollection.setCurrentSession(AbstractPersistentCollection.java:627)
at org.hibernate.event.internal.OnUpdateVisitor.processCollection(OnUpdateVisitor.java:46)
at org.hibernate.event.internal.AbstractVisitor.processValue(AbstractVisitor.java:104)
at org.hibernate.event.internal.AbstractVisitor.processValue(AbstractVisitor.java:65)
at org.hibernate.event.internal.AbstractVisitor.processEntityPropertyValues(AbstractVisitor.java:59)
at org.hibernate.event.internal.AbstractVisitor.process(AbstractVisitor.java:126)
at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.performUpdate(DefaultSaveOrUpdateEventListener.java:293)
at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.entityIsDetached(DefaultSaveOrUpdateEventListener.java:227)
at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.performSaveOrUpdate(DefaultSaveOrUpdateEventListener.java:92)
at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.onSaveOrUpdate(DefaultSaveOrUpdateEventListener.java:73)
at org.hibernate.internal.SessionImpl.fireSaveOrUpdate(SessionImpl.java:648)
at org.hibernate.internal.SessionImpl.saveOrUpdate(SessionImpl.java:640)
at org.hibernate.internal.SessionImpl.saveOrUpdate(SessionImpl.java:635)
at com.app.dao.CommonDaoImpl.addOrUpdate(CommonDaoImpl.java:28)
at com.app.services.ExtractDataServiceImpl.doExtraction(ExtractDataServiceImpl.java:361)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:281)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:108)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport$CompletableFutureDelegate$1.get(AsyncExecutionAspectSupport.java:237)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Cuando no tengo que esperar y ejecuto varios casos de forma asincrónica, obtengo:
12:17:44.040 [DEMO-4] DEBUG o.h.r.t.b.j.i.JdbcResourceLocalTransactionCoordinatorImpl - JDBC transaction marked for rollback-only (exception provided for stack trace)
java.lang.Exception: exception just for purpose of providing stack trace
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.markRollbackOnly(JdbcResourceLocalTransactionCoordinatorImpl.java:265) [hibernate-core-5.0.6.Final.jar:5.0.6.Final]
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.beforeCompletionCallback(JdbcResourceLocalTransactionCoordinatorImpl.java:156) [hibernate-core-5.0.6.Final.jar:5.0.6.Final]
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.access$100(JdbcResourceLocalTransactionCoordinatorImpl.java:38) [hibernate-core-5.0.6.Final.jar:5.0.6.Final]
at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.commit(JdbcResourceLocalTransactionCoordinatorImpl.java:231) [hibernate-core-5.0.6.Final.jar:5.0.6.Final]
at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:65) [hibernate-core-5.0.6.Final.jar:5.0.6.Final]
at org.springframework.orm.hibernate5.HibernateTransactionManager.doCommit(HibernateTransactionManager.java:581) [spring-orm-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:761) [spring-tx-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:730) [spring-tx-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:485) [spring-tx-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:291) [spring-tx-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) [spring-tx-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) [spring-aop-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:108) [spring-aop-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport$CompletableFutureDelegate$1.get(AsyncExecutionAspectSupport.java:237) [spring-aop-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) [na:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
12:17:44.043 [DEMO-4] DEBUG o.s.o.h.HibernateTransactionManager - Initiating transaction rollback after commit exception
org.hibernate.AssertionFailure: null id in com.app.model.FieldValue entry (don''t flush the Session after an exception occurs)
Bueno, finalmente lo hago funcionar ...
Utilicé Ejecutores de la siguiente manera:
ExecutorService executor = Executors.newFixedThreadPool(students.size());
for (StudentBO student : students) {
executor.submit(() -> extractDataService.doTask(student));
}
Donde doTask es una función regular, cuando no lo necesito para trabajar en un hilo diferente, simplemente lo llamo como es. Cuando necesito los hilos, uso el código de arriba.
Existe la posibilidad de que la anotación @EnableAsync en WebConfig.java nunca se analice. El web.xml apunta al spring-context.xml.
Puede cambiar la definición de DispatcherServlet en web.xml para:
<servlet>
<servlet-name>mvc-dispatcher</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextClass</param-name>
<param-value>
org.springframework.web.context.support.AnnotationConfigWebApplicationContext
</param-value>
</init-param>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>
com.yourpackage.WebConfig
</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
<async-supported>true</async-supported>
</servlet>
E incluye toda la configuración de spring-config.xml a esta clase.
O agregue <task:annotation-driven>
en spring-config.xml.
Actualizado
Actualmente, el paquete com.app.controller
se com.app.controller
en spring-config.xml. Asegúrese de que WebConfig.java esté en este paquete o en uno de sus subpaquetes. Si no es así, agregue el paquete de WebConfig al atributo del paquete base separados por comas.
Además, puede controlar el grupo de subprocesos utilizado por la tarea asíncrona. Crear un bean ejecutor
@Bean
public Executor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setThreadNamePrefix("asynctaskpool-");
executor.initialize();
return executor;
}
Y en tu método asíncrono usa el nombre de bean como este
@Async("asyncTaskExecutor")
public Future<Response> doTasks(Student student);
Esto asegurará que todas las tareas se ejecutarán en este grupo de subprocesos.
Solo usa:
servlet.setAsyncSupported(true);
Por ejemplo
public class WebAppInitializer implements WebApplicationInitializer {
@Override
public void onStartup(ServletContext servletContext) throws ServletException {
AnnotationConfigWebApplicationContext ctx = new AnnotationConfigWebApplicationContext();
ctx.register(WebConfig.class);
ctx.setServletContext(servletContext);
ServletRegistration.Dynamic servlet = servletContext.addServlet("dispatcher",
new DispatcherServlet(ctx));
servlet.setLoadOnStartup(1);
servlet.addMapping("/");
servlet.setAsyncSupported(true); //Servlets were marked as supporting async
// For CORS Pre Filght Request
servlet.setInitParameter("dispatchOptionsRequest", "true");
}
}
Una forma más sofisticada sería implementar AsyncConfigurer y establecer AsyncExecutor en threadPoolTaskExecutor.
Código de muestra a continuación
@Configuration
@EnableAsync(proxyTargetClass=true) //detects @Async annotation
public class AsyncConfig implements AsyncConfigurer {
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // create 10 Threads at the time of initialization
executor.setQueueCapacity(10); // queue capacity
executor.setMaxPoolSize(25); // if queue is full, then it will create new thread and go till 25
executor.setThreadNamePrefix("DEMO-");
executor.initialize();//Set up the ExecutorService.
return executor;
}
@Override
public Executor getAsyncExecutor() {
return threadPoolTaskExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new YOUR_CUSTOM_EXCEPTION_HANDLER();
}
}
La configuración anterior detectará la anotación @Async donde se mencione
Usted puede hacer CompletableFuture, con esto usted sabe cuando todas sus tareas están completas
List<CompletableFuture<T>> futureList = new ArrayList<>();
for(Student student:studentList){
CompletableFuture<T> returnedFuture = CompletableFuture.supplyAsync(() -> doSomething(student),executor).exceptionally(e -> {
log.error("Error occured in print something future",e);
return 0;
});
futureList.add(returnedFuture);
}
Completable.allOf(futureList);
Luego puede canalizar con thenCompose o luego Apply (para llevar al consumidor) para tener un control completo sobre el pipeline de tareas. Puedes cerrar ejecutores cuando hayas terminado de forma segura.