spring batch - example - ¿Cómo procesar filas relacionadas lógicamente después de ItemReader en SpringBatch?
spring batch example (5)
Guión
Para hacerlo simple, supongamos que tengo un ItemReader que me devuelve 25 filas.
Las primeras 10 filas pertenecen al estudiante A
Los siguientes 5 pertenecen al estudiante B
y los 10 restantes pertenecen al estudiante C
Quiero agregarlos juntos, lógicamente, por studentId y aplanarlos para terminar con una fila por estudiante.
Problema
Si entiendo correctamente, establecer el intervalo de compromiso en 5 hará lo siguiente:
- Envíe 5 filas al Procesador (que las agregará o hará cualquier lógica comercial que le diga).
- Después de procesado se escribirán 5 filas.
- Luego lo hará de nuevo para las siguientes 5 filas y así sucesivamente.
Si eso es cierto, entonces para los próximos cinco tendré que revisar los ya escritos, sacarlos, agregarlos a los que estoy procesando actualmente y escribirlos de nuevo.
Yo personalmente no me gusta eso.
- ¿Cuál es la mejor práctica para manejar una situación como esta en Spring Batch?
Alternativa
A veces siento que es mucho más fácil escribir un programa principal de Spring JDBC y luego tengo el control total de lo que quiero hacer. Sin embargo, quería aprovechar la supervisión del estado del repositorio del trabajo, la capacidad de reinicio, omisión, trabajo y escuchas de pasos ...
Mi código de lote de primavera
Mi módulo-contexto.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<description>Example job to get you started. It provides a skeleton for a typical batch application.</description>
<batch:job id="job1">
<batch:step id="step1" >
<batch:tasklet transaction-manager="transactionManager" start-limit="100" >
<batch:chunk reader="attendanceItemReader"
processor="attendanceProcessor"
writer="attendanceItemWriter"
commit-interval="10"
/>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="attendanceItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader">
<property name="dataSource">
<ref bean="sourceDataSource"/>
</property>
<property name="sql"
value="select s.student_name ,s.student_id ,fas.attendance_days ,fas.attendance_value from K12INTEL_DW.ftbl_attendance_stumonabssum fas inner join k12intel_dw.dtbl_students s on fas.student_key = s.student_key inner join K12INTEL_DW.dtbl_schools ds on fas.school_key = ds.school_key inner join k12intel_dw.dtbl_school_dates dsd on fas.school_dates_key = dsd.school_dates_key where dsd.rolling_local_school_yr_number = 0 and ds.school_code = ? and s.student_activity_indicator = ''Active'' and fas.LOCAL_GRADING_PERIOD = ''G1'' and s.student_current_grade_level = ''Gr 9'' order by s.student_id"/>
<property name="preparedStatementSetter" ref="attendanceStatementSetter"/>
<property name="rowMapper" ref="attendanceRowMapper"/>
</bean>
<bean id="attendanceStatementSetter" class="edu.kdc.visioncards.preparedstatements.AttendanceStatementSetter"/>
<bean id="attendanceRowMapper" class="edu.kdc.visioncards.rowmapper.AttendanceRowMapper"/>
<bean id="attendanceProcessor" class="edu.kdc.visioncards.AttendanceProcessor" />
<bean id="attendanceItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" value="file:target/outputs/passthrough.txt"/>
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" />
</property>
</bean>
</beans>
Mis clases de apoyo para el lector.
Un estado de cuenta preparado
package edu.kdc.visioncards.preparedstatements;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.springframework.jdbc.core.PreparedStatementSetter;
public class AttendanceStatementSetter implements PreparedStatementSetter {
public void setValues(PreparedStatement ps) throws SQLException {
ps.setInt(1, 7);
}
}
y un RowMapper
package edu.kdc.visioncards.rowmapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import edu.kdc.visioncards.dto.AttendanceDTO;
public class AttendanceRowMapper<T> implements RowMapper<AttendanceDTO> {
public static final String STUDENT_NAME = "STUDENT_NAME";
public static final String STUDENT_ID = "STUDENT_ID";
public static final String ATTENDANCE_DAYS = "ATTENDANCE_DAYS";
public static final String ATTENDANCE_VALUE = "ATTENDANCE_VALUE";
public AttendanceDTO mapRow(ResultSet rs, int rowNum) throws SQLException {
AttendanceDTO dto = new AttendanceDTO();
dto.setStudentId(rs.getString(STUDENT_ID));
dto.setStudentName(rs.getString(STUDENT_NAME));
dto.setAttDays(rs.getInt(ATTENDANCE_DAYS));
dto.setAttValue(rs.getInt(ATTENDANCE_VALUE));
return dto;
}
}
Mi procesador
package edu.kdc.visioncards;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.item.ItemProcessor;
import edu.kdc.visioncards.dto.AttendanceDTO;
public class AttendanceProcessor implements ItemProcessor<AttendanceDTO, Map<Integer, AttendanceDTO>> {
private Map<Integer, AttendanceDTO> map = new HashMap<Integer, AttendanceDTO>();
public Map<Integer, AttendanceDTO> process(AttendanceDTO dto) throws Exception {
if(map.containsKey(new Integer(dto.getStudentId()))){
AttendanceDTO attDto = (AttendanceDTO)map.get(new Integer(dto.getStudentId()));
attDto.setAttDays(attDto.getAttDays() + dto.getAttDays());
attDto.setAttValue(attDto.getAttValue() + dto.getAttValue());
}else{
map.put(new Integer(dto.getStudentId()), dto);
}
return map;
}
}
Mis preocupaciones de código arriba
En el Procesador, creo un HashMap y, al procesar las filas, compruebo si ya tengo ese Estudiante en el Mapa, si no está allí, lo agrego. Si ya está allí, tomo los valores que me interesan y los agrego con la fila que estoy procesando actualmente.
Después de eso, Spring Batch Framework escribe en un archivo de acuerdo con mi configuración
Mi pregunta es la siguiente:
- No quiero que vaya al escritor. Quiero procesar todas las filas restantes. ¿Cómo mantengo este mapa que he creado en la memoria para el siguiente conjunto de filas que deben pasar por este mismo procesador? Cada vez que se procesa una fila a través de AttendanceProcessor, el mapa se inicializa. ¿Debo poner la inicialización del mapa en un bloque estático?
En mi aplicación, creé un CollectingJdbcCursorItemReader
que extiende el JdbcCursorItemReader
estándar y realiza exactamente lo que necesitas. Internamente utiliza mi CollectingRowMapper
: una extensión del RowMapper
estándar que asigna varias filas relacionadas a un objeto.
Aquí está el código del ItemReader, el código de la interfaz CollectingRowMapper
, y una implementación abstracta del mismo, está disponible en otra respuesta mía.
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.batch.item.ReaderNotOpenException;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.jdbc.core.RowMapper;
/**
* A JdbcCursorItemReader that uses a {@link CollectingRowMapper}.
* Like the superclass this reader is not thread-safe.
*
* @author Pino Navato
**/
public class CollectingJdbcCursorItemReader<T> extends JdbcCursorItemReader<T> {
private CollectingRowMapper<T> rowMapper;
private boolean firstRead = true;
/**
* Accepts a {@link CollectingRowMapper} only.
**/
@Override
public void setRowMapper(RowMapper<T> rowMapper) {
this.rowMapper = (CollectingRowMapper<T>)rowMapper;
super.setRowMapper(rowMapper);
}
/**
* Read next row and map it to item.
**/
@Override
protected T doRead() throws Exception {
if (rs == null) {
throw new ReaderNotOpenException("Reader must be open before it can be read.");
}
try {
if (firstRead) {
if (!rs.next()) { //Subsequent calls to next() will be executed by rowMapper
return null;
}
firstRead = false;
} else if (!rowMapper.hasNext()) {
return null;
}
T item = readCursor(rs, getCurrentItemCount());
return item;
}
catch (SQLException se) {
throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se);
}
}
@Override
protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
T result = super.readCursor(rs, currentRow);
setCurrentItemCount(rs.getRow());
return result;
}
}
Puede usarlo como el JdbcCursorItemReader
clásico: el único requisito es que le proporcione un CollectingRowMapper
lugar del clásico RowMapper
.
Siempre sigo este patrón:
- Hago que el alcance de mi lector sea "paso", y en @PostConstruct busco los resultados y los coloco en un Mapa
- En el procesador, convierto la colección asociada en una lista de escritura, y envío la lista de escritura
- En ItemWriter, conservo los elementos grabables según el caso
Use Step Execution Listener y almacene los registros como mapas de StepExecutionContext, luego puede agruparlos en el escritor o en el oyente del escritor y escribirlos a la vez.
básicamente, se habla de procesamiento por lotes con ID cambiantes (1), donde el lote tiene que realizar un seguimiento del cambio
Para primavera / primavera hablamos de:
- ItemWriter que comprueba la lista de elementos para un cambio de ID
- antes del cambio, los elementos se almacenan en un almacén de datos temporal (2) (Lista, Mapa, lo que sea), y no se escriben
- cuando la identificación cambia, el código de negocio de agregación / acoplado se ejecuta en los elementos en el almacén de datos y se debe escribir un elemento, ahora el almacén de datos se puede usar para los siguientes elementos con la siguiente identificación
- este concepto necesita un lector que indique el paso "Estoy agotado" para vaciar correctamente el almacén de datos temporal en el final de los elementos (archivo / base de datos)
Aquí un ejemplo de código simple y simple
@Override
public void write(List<? extends SimpleItem> items) throws Exception {
// setup with first sharedId at startup
if (currentId == null){
currentId = items.get(0).getSharedId();
}
// check for change of sharedId in input
// keep items in temporary dataStore until id change of input
// call delegate if there is an id change or if the reader is exhausted
for (SimpleItem item : items) {
// already known sharedId, add to tempData
if (item.getSharedId() == currentId) {
tempData.add(item);
} else {
// or new sharedId, write tempData, empty it, keep new id
// the delegate does the flattening/aggregating
delegate.write(tempData);
tempData.clear();
currentId = item.getSharedId();
tempData.add(item);
}
}
// check if reader is exhausted, flush tempData
if ((Boolean) stepExecution.getExecutionContext().get("readerExhausted")
&& tempData.size() > 0) {
delegate.write(tempData);
// optional delegate.clear();
}
}
(1) asumiendo que los artículos están ordenados por una ID (también puede ser compuesto)
(2) un frijol de hashmap para seguridad de hilos
porque cambiaste tu pregunta agrego una nueva respuesta
Si se ordenan a los estudiantes, entonces no hay necesidad de una lista / mapa, puede usar exactamente un objeto StudentObject en el procesador para mantener el "actual" y agregarlo hasta que haya uno nuevo (leer: cambio de identificación)
si los estudiantes no reciben órdenes, nunca sabrá cuándo un estudiante específico está "terminado" y tendrá que mantener a todos los estudiantes en un mapa que no se pueda escribir hasta el final de la secuencia de lectura completa
tener cuidado:
- El procesador necesita saber cuando el lector está agotado.
- es difícil hacer que funcione con cualquier concepto de tasa de compromiso e "id" si agrega elementos que de alguna manera son idénticos al procesador, simplemente no puede saber si el elemento procesado actualmente es el último
- Básicamente, el caso de uso se resuelve completamente a nivel de lector o a nivel de escritor (ver otra respuesta)
private SimpleItem currentItem;
private StepExecution stepExecution;
@Override
public SimpleItem process(SimpleItem newItem) throws Exception {
SimpleItem returnItem = null;
if (currentItem == null) {
currentItem = new SimpleItem(newItem.getId(), newItem.getValue());
} else if (currentItem.getId() == newItem.getId()) {
// aggregate somehow
String value = currentItem.getValue() + newItem.getValue();
currentItem.setValue(value);
} else {
// "clone"/copy currentItem
returnItem = new SimpleItem(currentItem.getId(), currentItem.getValue());
// replace currentItem
currentItem = newItem;
}
// reader exhausted?
if(stepExecution.getExecutionContext().containsKey("readerExhausted")
&& (Boolean)stepExecution.getExecutionContext().get("readerExhausted")
&& currentItem.getId() == stepExecution.getExecutionContext().getInt("lastItemId")) {
returnItem = new SimpleItem(currentItem.getId(), currentItem.getValue());
}
return returnItem;
}