spring-boot - jobbuilderfactory - spring batch processes
Las entidades no persisten. ¿RepositoryItemWriter y SimpleJpaWriter son seguros para subprocesos? (1)
Parece que lo arreglé agregando un PlatformTransactionManager. Ver los cambios a continuación. Espero que esto ayude a alguien, ya que este es uno con el que he estado luchando durante un par de semanas. Lo que no entiendo es por qué Spring Boot puede proporcionar un JtaTransactionManager sin Bitronix o Atomikos en mi pom.xml.
@SuppressWarnings("SpringJavaAutowiringInspection")
@Bean
public Step orderStep(StepBuilderFactory stepBuilderFactory, ItemReader<OrderEncounter> orderEncounterReader, ItemWriter<List<Order>> orderWriter,
ItemProcessor<OrderEncounter, List<Order>> orderProcessor, TaskExecutor taskExecutor, PlatformTransactionManager platformTransactionManager) {
return stepBuilderFactory.get("orderStep")
.<OrderEncounter, List<Order>> chunk(10)
.reader(orderEncounterReader)
.processor(orderProcessor)
.writer(orderWriter)
.taskExecutor(taskExecutor)
.transactionManager(platformTransactionManager)
.build();
}
Y la clase de configuración:
@Configuration
public class JTOpenDataSourceConfiguration {
@Bean(name="distillerDataSource")
@Primary
@ConfigurationProperties(prefix="spring.datasource.distiller")
public DataSource distillerDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties(prefix="spring.datasource.target")
public DataSource targetDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
public PlatformTransactionManager platformTransactionManager(@Qualifier("targetDataSource") DataSource targetDataSource) {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setDataSource(targetDataSource);
return transactionManager;
}
He encontrado un problema extraño con un RepositoryItemWriter, donde no parece ser entidades persistentes correctamente a través de mi repositorio JPA Spring Data configurado a la fuente de datos.
Configuración de paso
@Bean
public Step orderStep(StepBuilderFactory stepBuilderFactory, ItemReader<OrderEncounter> orderEncounterReader, ItemWriter<List<Order>> orderWriter,
ItemProcessor<OrderEncounter, List<Order>> orderProcessor, TaskExecutor taskExecutor) {
return stepBuilderFactory.get("orderStep")
.<OrderEncounter, List<Order>> chunk(10)
.reader(orderEncounterReader)
.processor(orderProcessor)
.writer(orderWriter)
.taskExecutor(taskExecutor)
.build();
}
Task Executor Bean (configurado para 1 hilo para fines de prueba)
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(1);
taskExecutor.setMaxPoolSize(1);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
OrderRepository
@Repository
public interface OrderRepository extends PagingAndSortingRepository<Order, Long> {
}
orderWriter bean
@Bean
public ItemWriter<List<Order>> orderWriter(OrderRepository orderRepository) {
RepositoryListItemWriter<List<Order>> writer = new RepositoryListItemWriter<>();
writer.setRepository(orderRepository);
writer.setMethodName("save");
try {
writer.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return writer;
}
RepositoryListItemWriter (RepositoryItemWriter modificado para admitir varios elementos devueltos por un conjunto de resultados de procedimientos almacenados)
public class RepositoryListItemWriter<T> implements ItemWriter<T>, InitializingBean {
protected static final Log logger = LogFactory.getLog(RepositoryListItemWriter.class);
private CrudRepository<?, ?> repository;
private String methodName;
public RepositoryListItemWriter() {
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public void setRepository(CrudRepository<?, ?> repository) {
this.repository = repository;
}
public void write(List<? extends T> items) throws Exception {
if(!CollectionUtils.isEmpty(items)) {
this.doWrite(items);
}
}
protected void doWrite(List<? extends T> items) throws Exception {
if(logger.isDebugEnabled()) {
logger.debug("Writing to the repository with " + items.size() + " items.");
}
MethodInvoker invoker = this.createMethodInvoker(this.repository, this.methodName);
Iterator i$ = items.iterator();
while(i$.hasNext()) {
Object object = i$.next();
invoker.setArguments(new Object[]{object});
this.doInvoke(invoker);
}
}
public void afterPropertiesSet() throws Exception {
Assert.state(this.repository != null, "A CrudRepository implementation is required");
}
private Object doInvoke(MethodInvoker invoker) throws Exception {
try {
invoker.prepare();
} catch (ClassNotFoundException var3) {
throw new DynamicMethodInvocationException(var3);
} catch (NoSuchMethodException var4) {
throw new DynamicMethodInvocationException(var4);
}
try {
return invoker.invoke();
} catch (InvocationTargetException var5) {
if(var5.getCause() instanceof Exception) {
throw (Exception)var5.getCause();
} else {
throw new InvocationTargetThrowableWrapper(var5.getCause());
}
} catch (IllegalAccessException var6) {
throw new DynamicMethodInvocationException(var6);
}
}
private MethodInvoker createMethodInvoker(Object targetObject, String targetMethod) {
MethodInvoker invoker = new MethodInvoker();
invoker.setTargetObject(targetObject);
invoker.setTargetMethod(targetMethod);
return invoker;
}
}
Cuando orderStep se configura sin taskExecutor, las entidades permanecen bien a través de RepositoryItemWriter en la base de datos; sin embargo, cuando se configura incluso con un task-ejecutor de subproceso único, no se conserva nada en la base de datos durante la ejecución completa del trabajo, incluso después de la finalización.
He realizado una cantidad considerable de investigación y RepositoryItemWriter parece ser seguro para subprocesos, junto con PagingAndSortingRepository y SimpleJpaWriter. ¿Alguna sugerencia?
Salida de registro de nivel de seguimiento
No TaskExecutor en el trabajo:
2015-11-23 20:51:07.589 TRACE 31126 --- [nio-8080-exec-1] o.s.beans.CachedIntrospectionResults : Found bean property ''verified
ChangedByUsername'' of type [java.lang.String]2015-11-23 20:51:07.589 TRACE 31126 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springfr
amework.orm.jpa.EntityManagerHolder@56ca0bbc] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@5ffffca4] bound to thread [http-nio-8080-exec-1]2015-11-23 20:51:07.590 TRACE 31126 --- [nio-8080-exec-1] o.h.e.i.AbstractSaveEventListener : Transient instance of: com.ii
massociates.distiller.domain.Order2015-11-23 20:51:07.590 TRACE 31126 --- [nio-8080-exec-1] o.h.e.i.DefaultPersistEventListener : Saving transient instance
2015-11-23 20:51:07.590 TRACE 31126 --- [nio-8080-exec-1] o.h.e.i.AbstractSaveEventListener : Saving [com.iimassociates.distiller.domain.Order#<null>]
2015-11-23 20:51:07.591 TRACE 31126 --- [nio-8080-exec-1] org.hibernate.engine.spi.ActionQueue : Adding an EntityIdentityInsertAction for [com.iimassociates.distiller.domain.Order] object
2015-11-23 20:51:07.591 TRACE 31126 --- [nio-8080-exec-1] org.hibernate.engine.spi.ActionQueue : Executing inserts before finding non-nullable transient entities for early insert: [EntityIdentityInsertAction[com.iimassociates.distiller.domain.Order#<null>]
]
2015-11-23 20:51:07.591 TRACE 31126 --- [nio-8080-exec-1] org.hibernate.engine.spi.ActionQueue : Adding insert with no non-nullable, transient entities: [EntityIdentityInsertAction[com.iimassociates.distiller.domain.Order#<null>]]
2015-11-23 20:51:07.591 TRACE 31126 --- [nio-8080-exec-1] org.hibernate.engine.spi.ActionQueue : Executing insertions before resolved early-insert
2015-11-23 20:51:07.591 DEBUG 31126 --- [nio-8080-exec-1] org.hibernate.engine.spi.ActionQueue : Executing identity-insert immediately
2015-11-23 20:51:07.591 TRACE 31126 --- [nio-8080-exec-1] o.h.p.entity.AbstractEntityPersister : Inserting entity: com.iimassociates.distiller.domain.Order (native id)
2015-11-23 20:51:07.592 DEBUG 31126 --- [nio-8080-exec-1] org.hibernate.SQL : insert into orders (blah blah SQL)
TaskExecutor en el trabajo
2015-11-23 21:02:34.628 TRACE 31257 --- [ taskExecutor-1] o.s.beans.CachedIntrospectionResults : Found bean property ''verifiedChangedByUsername'' of type [java.lang.String]
2015-11-23 21:02:34.628 TRACE 31257 --- [ taskExecutor-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.orm.jpa.EntityManagerHolder@2b151d8a] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@2706e09c] bound to thread [taskExecutor-1]
2015-11-23 21:02:34.629 TRACE 31257 --- [ taskExecutor-1] o.h.e.i.AbstractSaveEventListener : Transient instance of: com.iimassociates.distiller.domain.Order
2015-11-23 21:02:34.629 TRACE 31257 --- [ taskExecutor-1] o.h.e.i.DefaultPersistEventListener : Saving transient instance
2015-11-23 21:02:34.629 TRACE 31257 --- [ taskExecutor-1] o.h.e.i.AbstractSaveEventListener : Saving [com.iimassociates.distiller.domain.Order#<null>]
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] org.hibernate.engine.spi.ActionQueue : Adding an EntityIdentityInsertAction for [com.iimassociates.distiller.domain.Order] object
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] org.hibernate.engine.spi.ActionQueue : Adding insert with no non-nullable, transient entities: [EntityIdentityInsertAction[com.iimassociates.distiller.domain.Order#<delayed:1>]]
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] org.hibernate.engine.spi.ActionQueue : Adding resolved non-early insert action.
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] o.h.a.i.UnresolvedEntityInsertActions : No unresolved entity inserts that depended on [[com.iimassociates.distiller.domain.Order#<delayed:1>]]
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] o.h.a.i.UnresolvedEntityInsertActions : No entity insert actions have non-nullable, transient entity dependencies.
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.orm.jpa.EntityManagerHolder@2b151d8a] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@2706e09c] bound to thread [taskExecutor-1]
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] o.h.e.i.AbstractSaveEventListener : Transient instance of: com.iimassociates.distiller.domain.Order
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] o.h.e.i.DefaultPersistEventListener : Saving transient instance
2015-11-23 21:02:34.630 TRACE 31257 --- [ taskExecutor-1] o.h.e.i.AbstractSaveEventListener : Saving [com.iimassociates.distiller.domain.Order#<null>]
2015-11-23 21:02:34.631 TRACE 31257 --- [ taskExecutor-1] org.hibernate.engine.spi.ActionQueue : Adding an EntityIdentityInsertAction for [com.iimassociates.distiller.domain.Order] object
2015-11-23 21:02:34.631 TRACE 31257 --- [ taskExecutor-1] org.hibernate.engine.spi.ActionQueue : Adding insert with no non-nullable, transient entities: [EntityIdentityInsertAction[com.iimassociates.distiller.domain.Order#<delayed:2>]]
2015-11-23 21:02:34.631 TRACE 31257 --- [ taskExecutor-1] org.hibernate.engine.spi.ActionQueue : Adding resolved non-early insert action.