java multithreading asynchronous

java - @Async no funciona en Spring API con interfaces



multithreading asynchronous (5)

Estoy trabajando con @Async para almacenar algunos datos en paralelo en la base de datos con hibernación. Necesito hacer eso porque antes de guardar la información en la base de datos necesito ejecutar alguna tarea que demore varios minutos. Así que implementé @Async.

El problema es que @Async parece no estar funcionando. Por favor encuentre el código a continuación:

WebConfig

@Configuration @EnableAsync @EnableWebMvc public class WebConfig extends WebMvcConfigurerAdapter { }

StudentServiceImpl:

@Autowired RunSomeTaskService runSomeTaskService; @Override Transactional public Response saveWithoutWaiting(StudentBO[] students, String username) throws Exception { ... for (StudentBO student : students) { .... Future<Response> response = runSomeTaskService.doTasks(student); //Finish without waiting for doTasks(). } @Override Transactional public Response saveWithWaiting(StudentBO[] students, String username) throws Exception { ... for (StudentBO student : students) { .... Future<Response> response = runSomeTaskService.doTasks(student); //Finish and wait for doTasks(). response.get(); }

RunSomeTaskService:

public interface RunSomeTaskService{ @Async public Future<Response> doTasks(Student student); }

RunSomeTaskServiceImpl:

public class RunSomeTaskServiceImpl extends CommonService implements RunSomeTaskService{ Student student; @Override public Future<Response> doTasks(Student student) { Response response = new Response(); this.student = student; //do Task return new AsyncResult<Response>(response); } }

web.xml

<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" version="3.0"> <display-name>Sample Spring Maven Project</display-name> <servlet> <servlet-name>mvc-dispatcher</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>/WEB-INF/spring-config.xml</param-value> </init-param> <load-on-startup>1</load-on-startup> <async-supported>true</async-supported> </servlet> <servlet-mapping> <servlet-name>mvc-dispatcher</servlet-name> <url-pattern>/</url-pattern> </servlet-mapping> <filter> <filter-name>encodingFilter</filter-name> <filter-class> org.springframework.web.filter.CharacterEncodingFilter </filter-class> <init-param> <param-name>encoding</param-name> <param-value>UTF-8</param-value> </init-param> </filter> <filter-mapping> <filter-name>encodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> <filter> <filter-name>jwtTokenAuthFilter</filter-name> <filter-class>org.springframework.web.filter.DelegatingFilterProxy</filter-class> </filter> <filter-mapping> <filter-name>jwtTokenAuthFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> </web-app>

spring.config.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"> <context:annotation-config /> <context:component-scan base-package="com.app.controller" /> <tx:annotation-driven transaction-manager="transactionManager"/> <mvc:annotation-driven /> <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> ... </bean> <bean id="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl"> ... </bean> <bean id="sessionFactory" class="org.springframework.orm.hibernate5.LocalSessionFactoryBean"> <property name="dataSource" ref="dataSource" /> <property name="annotatedClasses"> <list> <value>//every model generated with Hibernate</value> </list> </property> <property name="hibernateProperties"> <props> <prop key="hibernate.dialect">org.hibernate.dialect.MySQL5Dialect</prop> <prop key="hibernate.show_sql">true</prop> </props> </property> </bean> <bean id="transactionManager" class="org.springframework.orm.hibernate5.HibernateTransactionManager"> <property name="sessionFactory" ref="sessionFactory" /> </bean> <bean id="persistenceExceptionTranslationPostProcessor" class="org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor" /> <bean id="studentService" class="com.app.services.StudentServiceImpl"></bean> <bean id="studentDao" class="com.app.dao.StudentDaoImpl"></bean> ... <bean id="jwtTokenAuthFilter" class="com.app.security.JWTTokenAuthFilter" /> </beans>

Entonces, ¿podría ayudarme a comprender por qué @Async no está funcionando?

ACTUALIZACIÓN: @Async ahora está funcionando, pero no estoy obteniendo los resultados esperados.

Para el caso, que tengo que esperar el resultado (caso de sincronización) CompletableFuture.get () no está esperando la respuesta y recibo un error:

Mi código:

CompletableFuture<Response> res = extractDataService.doTask(student);

El error:

org.hibernate.HibernateException: Illegal attempt to associate a collection with two open sessions. Collection : <unknown> Collection contents: [[]] at org.hibernate.collection.internal.AbstractPersistentCollection.setCurrentSession(AbstractPersistentCollection.java:627) at org.hibernate.event.internal.OnUpdateVisitor.processCollection(OnUpdateVisitor.java:46) at org.hibernate.event.internal.AbstractVisitor.processValue(AbstractVisitor.java:104) at org.hibernate.event.internal.AbstractVisitor.processValue(AbstractVisitor.java:65) at org.hibernate.event.internal.AbstractVisitor.processEntityPropertyValues(AbstractVisitor.java:59) at org.hibernate.event.internal.AbstractVisitor.process(AbstractVisitor.java:126) at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.performUpdate(DefaultSaveOrUpdateEventListener.java:293) at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.entityIsDetached(DefaultSaveOrUpdateEventListener.java:227) at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.performSaveOrUpdate(DefaultSaveOrUpdateEventListener.java:92) at org.hibernate.event.internal.DefaultSaveOrUpdateEventListener.onSaveOrUpdate(DefaultSaveOrUpdateEventListener.java:73) at org.hibernate.internal.SessionImpl.fireSaveOrUpdate(SessionImpl.java:648) at org.hibernate.internal.SessionImpl.saveOrUpdate(SessionImpl.java:640) at org.hibernate.internal.SessionImpl.saveOrUpdate(SessionImpl.java:635) at com.app.dao.CommonDaoImpl.addOrUpdate(CommonDaoImpl.java:28) at com.app.services.ExtractDataServiceImpl.doExtraction(ExtractDataServiceImpl.java:361) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99) at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:281) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:108) at org.springframework.aop.interceptor.AsyncExecutionAspectSupport$CompletableFutureDelegate$1.get(AsyncExecutionAspectSupport.java:237) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Cuando no tengo que esperar y ejecuto varios casos de forma asincrónica, obtengo:

12:17:44.040 [DEMO-4] DEBUG o.h.r.t.b.j.i.JdbcResourceLocalTransactionCoordinatorImpl - JDBC transaction marked for rollback-only (exception provided for stack trace) java.lang.Exception: exception just for purpose of providing stack trace at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.markRollbackOnly(JdbcResourceLocalTransactionCoordinatorImpl.java:265) [hibernate-core-5.0.6.Final.jar:5.0.6.Final] at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.beforeCompletionCallback(JdbcResourceLocalTransactionCoordinatorImpl.java:156) [hibernate-core-5.0.6.Final.jar:5.0.6.Final] at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.access$100(JdbcResourceLocalTransactionCoordinatorImpl.java:38) [hibernate-core-5.0.6.Final.jar:5.0.6.Final] at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.commit(JdbcResourceLocalTransactionCoordinatorImpl.java:231) [hibernate-core-5.0.6.Final.jar:5.0.6.Final] at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:65) [hibernate-core-5.0.6.Final.jar:5.0.6.Final] at org.springframework.orm.hibernate5.HibernateTransactionManager.doCommit(HibernateTransactionManager.java:581) [spring-orm-4.2.4.RELEASE.jar:4.2.4.RELEASE] at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:761) [spring-tx-4.2.4.RELEASE.jar:4.2.4.RELEASE] at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:730) [spring-tx-4.2.4.RELEASE.jar:4.2.4.RELEASE] at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:485) [spring-tx-4.2.4.RELEASE.jar:4.2.4.RELEASE] at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:291) [spring-tx-4.2.4.RELEASE.jar:4.2.4.RELEASE] at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) [spring-tx-4.2.4.RELEASE.jar:4.2.4.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) [spring-aop-4.2.4.RELEASE.jar:4.2.4.RELEASE] at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:108) [spring-aop-4.2.4.RELEASE.jar:4.2.4.RELEASE] at org.springframework.aop.interceptor.AsyncExecutionAspectSupport$CompletableFutureDelegate$1.get(AsyncExecutionAspectSupport.java:237) [spring-aop-4.2.4.RELEASE.jar:4.2.4.RELEASE] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) [na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171] 12:17:44.043 [DEMO-4] DEBUG o.s.o.h.HibernateTransactionManager - Initiating transaction rollback after commit exception org.hibernate.AssertionFailure: null id in com.app.model.FieldValue entry (don''t flush the Session after an exception occurs)


Bueno, finalmente lo hago funcionar ...

Utilicé Ejecutores de la siguiente manera:

ExecutorService executor = Executors.newFixedThreadPool(students.size()); for (StudentBO student : students) { executor.submit(() -> extractDataService.doTask(student)); }

Donde doTask es una función regular, cuando no lo necesito para trabajar en un hilo diferente, simplemente lo llamo como es. Cuando necesito los hilos, uso el código de arriba.


Existe la posibilidad de que la anotación @EnableAsync en WebConfig.java nunca se analice. El web.xml apunta al spring-context.xml.

Puede cambiar la definición de DispatcherServlet en web.xml para:

<servlet> <servlet-name>mvc-dispatcher</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextClass</param-name> <param-value> org.springframework.web.context.support.AnnotationConfigWebApplicationContext </param-value> </init-param> <init-param> <param-name>contextConfigLocation</param-name> <param-value> com.yourpackage.WebConfig </param-value> </init-param> <load-on-startup>1</load-on-startup> <async-supported>true</async-supported> </servlet>

E incluye toda la configuración de spring-config.xml a esta clase.

O agregue <task:annotation-driven> en spring-config.xml.

Actualizado

Actualmente, el paquete com.app.controller se com.app.controller en spring-config.xml. Asegúrese de que WebConfig.java esté en este paquete o en uno de sus subpaquetes. Si no es así, agregue el paquete de WebConfig al atributo del paquete base separados por comas.

Además, puede controlar el grupo de subprocesos utilizado por la tarea asíncrona. Crear un bean ejecutor

@Bean public Executor asyncTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setThreadNamePrefix("asynctaskpool-"); executor.initialize(); return executor; }

Y en tu método asíncrono usa el nombre de bean como este

@Async("asyncTaskExecutor") public Future<Response> doTasks(Student student);

Esto asegurará que todas las tareas se ejecutarán en este grupo de subprocesos.


Solo usa:

servlet.setAsyncSupported(true);

Por ejemplo

public class WebAppInitializer implements WebApplicationInitializer { @Override public void onStartup(ServletContext servletContext) throws ServletException { AnnotationConfigWebApplicationContext ctx = new AnnotationConfigWebApplicationContext(); ctx.register(WebConfig.class); ctx.setServletContext(servletContext); ServletRegistration.Dynamic servlet = servletContext.addServlet("dispatcher", new DispatcherServlet(ctx)); servlet.setLoadOnStartup(1); servlet.addMapping("/"); servlet.setAsyncSupported(true); //Servlets were marked as supporting async // For CORS Pre Filght Request servlet.setInitParameter("dispatchOptionsRequest", "true"); } }


Una forma más sofisticada sería implementar AsyncConfigurer y establecer AsyncExecutor en threadPoolTaskExecutor.

Código de muestra a continuación

@Configuration @EnableAsync(proxyTargetClass=true) //detects @Async annotation public class AsyncConfig implements AsyncConfigurer { public Executor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); // create 10 Threads at the time of initialization executor.setQueueCapacity(10); // queue capacity executor.setMaxPoolSize(25); // if queue is full, then it will create new thread and go till 25 executor.setThreadNamePrefix("DEMO-"); executor.initialize();//Set up the ExecutorService. return executor; } @Override public Executor getAsyncExecutor() { return threadPoolTaskExecutor(); } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new YOUR_CUSTOM_EXCEPTION_HANDLER(); } }

La configuración anterior detectará la anotación @Async donde se mencione


Usted puede hacer CompletableFuture, con esto usted sabe cuando todas sus tareas están completas

List<CompletableFuture<T>> futureList = new ArrayList<>(); for(Student student:studentList){ CompletableFuture<T> returnedFuture = CompletableFuture.supplyAsync(() -> doSomething(student),executor).exceptionally(e -> { log.error("Error occured in print something future",e); return 0; }); futureList.add(returnedFuture); } Completable.allOf(futureList);

Luego puede canalizar con thenCompose o luego Apply (para llevar al consumidor) para tener un control completo sobre el pipeline de tareas. Puedes cerrar ejecutores cuando hayas terminado de forma segura.

CompletetableFuture.allOff javadoc para más información