job example chunk batch baeldung spring-batch

spring batch - example - ¿Cómo procesar filas relacionadas lógicamente después de ItemReader en SpringBatch?



spring batch example (5)

Guión

Para hacerlo simple, supongamos que tengo un ItemReader que me devuelve 25 filas.

  1. Las primeras 10 filas pertenecen al estudiante A

  2. Los siguientes 5 pertenecen al estudiante B

  3. y los 10 restantes pertenecen al estudiante C

Quiero agregarlos juntos, lógicamente, por studentId y aplanarlos para terminar con una fila por estudiante.

Problema

Si entiendo correctamente, establecer el intervalo de compromiso en 5 hará lo siguiente:

  1. Envíe 5 filas al Procesador (que las agregará o hará cualquier lógica comercial que le diga).
  2. Después de procesado se escribirán 5 filas.
  3. Luego lo hará de nuevo para las siguientes 5 filas y así sucesivamente.

Si eso es cierto, entonces para los próximos cinco tendré que revisar los ya escritos, sacarlos, agregarlos a los que estoy procesando actualmente y escribirlos de nuevo.

Yo personalmente no me gusta eso.

  1. ¿Cuál es la mejor práctica para manejar una situación como esta en Spring Batch?

Alternativa

A veces siento que es mucho más fácil escribir un programa principal de Spring JDBC y luego tengo el control total de lo que quiero hacer. Sin embargo, quería aprovechar la supervisión del estado del repositorio del trabajo, la capacidad de reinicio, omisión, trabajo y escuchas de pasos ...

Mi código de lote de primavera

Mi módulo-contexto.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <description>Example job to get you started. It provides a skeleton for a typical batch application.</description> <batch:job id="job1"> <batch:step id="step1" > <batch:tasklet transaction-manager="transactionManager" start-limit="100" > <batch:chunk reader="attendanceItemReader" processor="attendanceProcessor" writer="attendanceItemWriter" commit-interval="10" /> </batch:tasklet> </batch:step> </batch:job> <bean id="attendanceItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> <property name="dataSource"> <ref bean="sourceDataSource"/> </property> <property name="sql" value="select s.student_name ,s.student_id ,fas.attendance_days ,fas.attendance_value from K12INTEL_DW.ftbl_attendance_stumonabssum fas inner join k12intel_dw.dtbl_students s on fas.student_key = s.student_key inner join K12INTEL_DW.dtbl_schools ds on fas.school_key = ds.school_key inner join k12intel_dw.dtbl_school_dates dsd on fas.school_dates_key = dsd.school_dates_key where dsd.rolling_local_school_yr_number = 0 and ds.school_code = ? and s.student_activity_indicator = ''Active'' and fas.LOCAL_GRADING_PERIOD = ''G1'' and s.student_current_grade_level = ''Gr 9'' order by s.student_id"/> <property name="preparedStatementSetter" ref="attendanceStatementSetter"/> <property name="rowMapper" ref="attendanceRowMapper"/> </bean> <bean id="attendanceStatementSetter" class="edu.kdc.visioncards.preparedstatements.AttendanceStatementSetter"/> <bean id="attendanceRowMapper" class="edu.kdc.visioncards.rowmapper.AttendanceRowMapper"/> <bean id="attendanceProcessor" class="edu.kdc.visioncards.AttendanceProcessor" /> <bean id="attendanceItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"> <property name="resource" value="file:target/outputs/passthrough.txt"/> <property name="lineAggregator"> <bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" /> </property> </bean> </beans>

Mis clases de apoyo para el lector.

Un estado de cuenta preparado

package edu.kdc.visioncards.preparedstatements; import java.sql.PreparedStatement; import java.sql.SQLException; import org.springframework.jdbc.core.PreparedStatementSetter; public class AttendanceStatementSetter implements PreparedStatementSetter { public void setValues(PreparedStatement ps) throws SQLException { ps.setInt(1, 7); } }

y un RowMapper

package edu.kdc.visioncards.rowmapper; import java.sql.ResultSet; import java.sql.SQLException; import org.springframework.jdbc.core.RowMapper; import edu.kdc.visioncards.dto.AttendanceDTO; public class AttendanceRowMapper<T> implements RowMapper<AttendanceDTO> { public static final String STUDENT_NAME = "STUDENT_NAME"; public static final String STUDENT_ID = "STUDENT_ID"; public static final String ATTENDANCE_DAYS = "ATTENDANCE_DAYS"; public static final String ATTENDANCE_VALUE = "ATTENDANCE_VALUE"; public AttendanceDTO mapRow(ResultSet rs, int rowNum) throws SQLException { AttendanceDTO dto = new AttendanceDTO(); dto.setStudentId(rs.getString(STUDENT_ID)); dto.setStudentName(rs.getString(STUDENT_NAME)); dto.setAttDays(rs.getInt(ATTENDANCE_DAYS)); dto.setAttValue(rs.getInt(ATTENDANCE_VALUE)); return dto; } }

Mi procesador

package edu.kdc.visioncards; import java.util.HashMap; import java.util.Map; import org.springframework.batch.item.ItemProcessor; import edu.kdc.visioncards.dto.AttendanceDTO; public class AttendanceProcessor implements ItemProcessor<AttendanceDTO, Map<Integer, AttendanceDTO>> { private Map<Integer, AttendanceDTO> map = new HashMap<Integer, AttendanceDTO>(); public Map<Integer, AttendanceDTO> process(AttendanceDTO dto) throws Exception { if(map.containsKey(new Integer(dto.getStudentId()))){ AttendanceDTO attDto = (AttendanceDTO)map.get(new Integer(dto.getStudentId())); attDto.setAttDays(attDto.getAttDays() + dto.getAttDays()); attDto.setAttValue(attDto.getAttValue() + dto.getAttValue()); }else{ map.put(new Integer(dto.getStudentId()), dto); } return map; } }

Mis preocupaciones de código arriba

En el Procesador, creo un HashMap y, al procesar las filas, compruebo si ya tengo ese Estudiante en el Mapa, si no está allí, lo agrego. Si ya está allí, tomo los valores que me interesan y los agrego con la fila que estoy procesando actualmente.

Después de eso, Spring Batch Framework escribe en un archivo de acuerdo con mi configuración

Mi pregunta es la siguiente:

  1. No quiero que vaya al escritor. Quiero procesar todas las filas restantes. ¿Cómo mantengo este mapa que he creado en la memoria para el siguiente conjunto de filas que deben pasar por este mismo procesador? Cada vez que se procesa una fila a través de AttendanceProcessor, el mapa se inicializa. ¿Debo poner la inicialización del mapa en un bloque estático?

En mi aplicación, creé un CollectingJdbcCursorItemReader que extiende el JdbcCursorItemReader estándar y realiza exactamente lo que necesitas. Internamente utiliza mi CollectingRowMapper : una extensión del RowMapper estándar que asigna varias filas relacionadas a un objeto.

Aquí está el código del ItemReader, el código de la interfaz CollectingRowMapper , y una implementación abstracta del mismo, está disponible en otra respuesta mía.

import java.sql.ResultSet; import java.sql.SQLException; import org.springframework.batch.item.ReaderNotOpenException; import org.springframework.batch.item.database.JdbcCursorItemReader; import org.springframework.jdbc.core.RowMapper; /** * A JdbcCursorItemReader that uses a {@link CollectingRowMapper}. * Like the superclass this reader is not thread-safe. * * @author Pino Navato **/ public class CollectingJdbcCursorItemReader<T> extends JdbcCursorItemReader<T> { private CollectingRowMapper<T> rowMapper; private boolean firstRead = true; /** * Accepts a {@link CollectingRowMapper} only. **/ @Override public void setRowMapper(RowMapper<T> rowMapper) { this.rowMapper = (CollectingRowMapper<T>)rowMapper; super.setRowMapper(rowMapper); } /** * Read next row and map it to item. **/ @Override protected T doRead() throws Exception { if (rs == null) { throw new ReaderNotOpenException("Reader must be open before it can be read."); } try { if (firstRead) { if (!rs.next()) { //Subsequent calls to next() will be executed by rowMapper return null; } firstRead = false; } else if (!rowMapper.hasNext()) { return null; } T item = readCursor(rs, getCurrentItemCount()); return item; } catch (SQLException se) { throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se); } } @Override protected T readCursor(ResultSet rs, int currentRow) throws SQLException { T result = super.readCursor(rs, currentRow); setCurrentItemCount(rs.getRow()); return result; } }

Puede usarlo como el JdbcCursorItemReader clásico: el único requisito es que le proporcione un CollectingRowMapper lugar del clásico RowMapper .


Siempre sigo este patrón:

  1. Hago que el alcance de mi lector sea "paso", y en @PostConstruct busco los resultados y los coloco en un Mapa
  2. En el procesador, convierto la colección asociada en una lista de escritura, y envío la lista de escritura
  3. En ItemWriter, conservo los elementos grabables según el caso

Use Step Execution Listener y almacene los registros como mapas de StepExecutionContext, luego puede agruparlos en el escritor o en el oyente del escritor y escribirlos a la vez.


básicamente, se habla de procesamiento por lotes con ID cambiantes (1), donde el lote tiene que realizar un seguimiento del cambio

Para primavera / primavera hablamos de:

  • ItemWriter que comprueba la lista de elementos para un cambio de ID
  • antes del cambio, los elementos se almacenan en un almacén de datos temporal (2) (Lista, Mapa, lo que sea), y no se escriben
  • cuando la identificación cambia, el código de negocio de agregación / acoplado se ejecuta en los elementos en el almacén de datos y se debe escribir un elemento, ahora el almacén de datos se puede usar para los siguientes elementos con la siguiente identificación
  • este concepto necesita un lector que indique el paso "Estoy agotado" para vaciar correctamente el almacén de datos temporal en el final de los elementos (archivo / base de datos)

Aquí un ejemplo de código simple y simple

@Override public void write(List<? extends SimpleItem> items) throws Exception { // setup with first sharedId at startup if (currentId == null){ currentId = items.get(0).getSharedId(); } // check for change of sharedId in input // keep items in temporary dataStore until id change of input // call delegate if there is an id change or if the reader is exhausted for (SimpleItem item : items) { // already known sharedId, add to tempData if (item.getSharedId() == currentId) { tempData.add(item); } else { // or new sharedId, write tempData, empty it, keep new id // the delegate does the flattening/aggregating delegate.write(tempData); tempData.clear(); currentId = item.getSharedId(); tempData.add(item); } } // check if reader is exhausted, flush tempData if ((Boolean) stepExecution.getExecutionContext().get("readerExhausted") && tempData.size() > 0) { delegate.write(tempData); // optional delegate.clear(); } }

(1) asumiendo que los artículos están ordenados por una ID (también puede ser compuesto)

(2) un frijol de hashmap para seguridad de hilos


porque cambiaste tu pregunta agrego una nueva respuesta

Si se ordenan a los estudiantes, entonces no hay necesidad de una lista / mapa, puede usar exactamente un objeto StudentObject en el procesador para mantener el "actual" y agregarlo hasta que haya uno nuevo (leer: cambio de identificación)

si los estudiantes no reciben órdenes, nunca sabrá cuándo un estudiante específico está "terminado" y tendrá que mantener a todos los estudiantes en un mapa que no se pueda escribir hasta el final de la secuencia de lectura completa

tener cuidado:

  • El procesador necesita saber cuando el lector está agotado.
  • es difícil hacer que funcione con cualquier concepto de tasa de compromiso e "id" si agrega elementos que de alguna manera son idénticos al procesador, simplemente no puede saber si el elemento procesado actualmente es el último
  • Básicamente, el caso de uso se resuelve completamente a nivel de lector o a nivel de escritor (ver otra respuesta)

private SimpleItem currentItem; private StepExecution stepExecution; @Override public SimpleItem process(SimpleItem newItem) throws Exception { SimpleItem returnItem = null; if (currentItem == null) { currentItem = new SimpleItem(newItem.getId(), newItem.getValue()); } else if (currentItem.getId() == newItem.getId()) { // aggregate somehow String value = currentItem.getValue() + newItem.getValue(); currentItem.setValue(value); } else { // "clone"/copy currentItem returnItem = new SimpleItem(currentItem.getId(), currentItem.getValue()); // replace currentItem currentItem = newItem; } // reader exhausted? if(stepExecution.getExecutionContext().containsKey("readerExhausted") && (Boolean)stepExecution.getExecutionContext().get("readerExhausted") && currentItem.getId() == stepExecution.getExecutionContext().getInt("lastItemId")) { returnItem = new SimpleItem(currentItem.getId(), currentItem.getValue()); } return returnItem; }