form docs spring spring-scheduled

docs - taglib spring jsp



Spring Scheduled Task ejecutándose en un entorno en clúster (7)

Estoy escribiendo una aplicación que tiene un trabajo cron que se ejecuta cada 60 segundos. La aplicación está configurada para escalar cuando sea necesario en varias instancias. Solo quiero ejecutar la tarea en 1 instancia cada 60 segundos (en cualquier nodo). Fuera de la caja no puedo encontrar una solución a esto y me sorprende que no se haya preguntado varias veces antes. Estoy usando Spring 4.1.6.

<task:scheduled-tasks> <task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/> </task:scheduled-tasks>



Esta es otra forma simple y robusta de ejecutar de manera segura un trabajo en un clúster. Puede basarse en la base de datos y ejecutar la tarea solo si el nodo es el "líder" en el clúster.

Además, cuando falla un nodo o se apaga el clúster, otro nodo se convierte en el líder.

Todo lo que tiene es crear un mecanismo de "elección de líder" y cada vez para verificar si usted es el líder:

@Scheduled(cron = "*/30 * * * * *") public void executeFailedEmailTasks() { if (checkIfLeader()) { final List<EmailTask> list = emailTaskService.getFailedEmailTasks(); for (EmailTask emailTask : list) { dispatchService.sendEmail(emailTask); } } }

Sigue esos pasos:

1. Defina el objeto y la tabla que contiene una entrada por nodo en el clúster:

@Entity(name = "SYS_NODE") public class SystemNode { /** The id. */ @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; /** The name. */ @Column(name = "TIMESTAMP") private String timestamp; /** The ip. */ @Column(name = "IP") private String ip; /** The last ping. */ @Column(name = "LAST_PING") private Date lastPing; /** The last ping. */ @Column(name = "CREATED_AT") private Date createdAt = new Date(); /** The last ping. */ @Column(name = "IS_LEADER") private Boolean isLeader = Boolean.FALSE; public Long getId() { return id; } public void setId(final Long id) { this.id = id; } public String getTimestamp() { return timestamp; } public void setTimestamp(final String timestamp) { this.timestamp = timestamp; } public String getIp() { return ip; } public void setIp(final String ip) { this.ip = ip; } public Date getLastPing() { return lastPing; } public void setLastPing(final Date lastPing) { this.lastPing = lastPing; } public Date getCreatedAt() { return createdAt; } public void setCreatedAt(final Date createdAt) { this.createdAt = createdAt; } public Boolean getIsLeader() { return isLeader; } public void setIsLeader(final Boolean isLeader) { this.isLeader = isLeader; } @Override public String toString() { return "SystemNode{" + "id=" + id + ", timestamp=''" + timestamp + ''/''' + ", ip=''" + ip + ''/''' + ", lastPing=" + lastPing + ", createdAt=" + createdAt + ", isLeader=" + isLeader + ''}''; }

}

2.Cree el servicio que a) inserte el nodo en la base de datos, b) verifique el líder

@Service @Transactional public class SystemNodeServiceImpl implements SystemNodeService, ApplicationListener { /** The logger. */ private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class); /** The constant NO_ALIVE_NODES. */ private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}"; /** The ip. */ private String ip; /** The system service. */ private SystemService systemService; /** The system node repository. */ private SystemNodeRepository systemNodeRepository; @Autowired public void setSystemService(final SystemService systemService) { this.systemService = systemService; } @Autowired public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) { this.systemNodeRepository = systemNodeRepository; } @Override public void pingNode() { final SystemNode node = systemNodeRepository.findByIp(ip); if (node == null) { createNode(); } else { updateNode(node); } } @Override public void checkLeaderShip() { final List<SystemNode> allList = systemNodeRepository.findAll(); final List<SystemNode> aliveList = filterAliveNodes(allList); SystemNode leader = findLeader(allList); if (leader != null && aliveList.contains(leader)) { setLeaderFlag(allList, Boolean.FALSE); leader.setIsLeader(Boolean.TRUE); systemNodeRepository.save(allList); } else { final SystemNode node = findMinNode(aliveList); setLeaderFlag(allList, Boolean.FALSE); node.setIsLeader(Boolean.TRUE); systemNodeRepository.save(allList); } } /** * Returns the leaded * @param list * the list * @return the leader */ private SystemNode findLeader(final List<SystemNode> list) { for (SystemNode systemNode : list) { if (systemNode.getIsLeader()) { return systemNode; } } return null; } @Override public boolean isLeader() { final SystemNode node = systemNodeRepository.findByIp(ip); return node != null && node.getIsLeader(); } @Override public void onApplicationEvent(final ApplicationEvent applicationEvent) { try { ip = InetAddress.getLocalHost().getHostAddress(); } catch (Exception e) { throw new RuntimeException(e); } if (applicationEvent instanceof ContextRefreshedEvent) { pingNode(); } } /** * Creates the node */ private void createNode() { final SystemNode node = new SystemNode(); node.setIp(ip); node.setTimestamp(String.valueOf(System.currentTimeMillis())); node.setCreatedAt(new Date()); node.setLastPing(new Date()); node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll())); systemNodeRepository.save(node); } /** * Updates the node */ private void updateNode(final SystemNode node) { node.setLastPing(new Date()); systemNodeRepository.save(node); } /** * Returns the alive nodes. * * @param list * the list * @return the alive nodes */ private List<SystemNode> filterAliveNodes(final List<SystemNode> list) { int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class); final List<SystemNode> finalList = new LinkedList<>(); for (SystemNode systemNode : list) { if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) { finalList.add(systemNode); } } if (CollectionUtils.isEmpty(finalList)) { LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list)); throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list)); } return finalList; } /** * Finds the min name node. * * @param list * the list * @return the min node */ private SystemNode findMinNode(final List<SystemNode> list) { SystemNode min = list.get(0); for (SystemNode systemNode : list) { if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) { min = systemNode; } } return min; } /** * Sets the leader flag. * * @param list * the list * @param value * the value */ private void setLeaderFlag(final List<SystemNode> list, final Boolean value) { for (SystemNode systemNode : list) { systemNode.setIsLeader(value); } }

}

3.diar la base de datos para enviar que estás vivo

@Override @Scheduled(cron = "0 0/5 * * * ?") public void executeSystemNodePing() { systemNodeService.pingNode(); } @Override @Scheduled(cron = "0 0/10 * * * ?") public void executeLeaderResolution() { systemNodeService.checkLeaderShip(); }

4.estás listo! Solo verifique si usted es el líder antes de ejecutar la tarea:

@Override @Scheduled(cron = "*/30 * * * * *") public void executeFailedEmailTasks() { if (checkIfLeader()) { final List<EmailTask> list = emailTaskService.getFailedEmailTasks(); for (EmailTask emailTask : list) { dispatchService.sendEmail(emailTask); } } }


Estoy usando una tabla de base de datos para hacer el bloqueo. Solo una tarea a la vez puede hacer una inserción en la tabla. El otro obtendrá una DuplicateKeyException. La lógica de inserción y eliminación se maneja mediante un aspecto alrededor de la anotación @Scheduled. Estoy usando Spring Boot 2.0

@Component @Aspect public class SchedulerLock { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class); @Autowired private JdbcTemplate jdbcTemplate; @Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))") public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable { String jobSignature = joinPoint.getSignature().toString(); try { jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()}); Object proceed = joinPoint.proceed(); jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature}); return proceed; }catch (DuplicateKeyException e) { LOGGER.warn("Job is currently locked: "+jobSignature); return null; } } }


@Component public class EveryTenSecondJob { @Scheduled(cron = "0/10 * * * * *") public void taskExecution() { System.out.println("Hello World"); } }


CREATE TABLE scheduler_lock( signature varchar(255) NOT NULL, date datetime DEFAULT NULL, PRIMARY KEY(signature) );


Hay un proyecto ShedLock que sirve exactamente para este propósito. Simplemente anota las tareas que deben bloquearse cuando se ejecutan

@Scheduled( ... ) @SchedulerLock(name = "scheduledTaskName") public void scheduledTask() { // do something }

Configure Spring y un LockProvider (SQL y Mongo soportados actualmente)

@Bean public TaskScheduler taskScheduler(LockProvider lockProvider) { int poolSize = 10; return SpringLockableTaskSchedulerFactory .newLockableTaskScheduler(poolSize, lockProvider); }


Los trabajos por lotes y programados generalmente se ejecutan en sus propios servidores independientes, lejos de las aplicaciones orientadas al cliente, por lo que no es un requisito común incluir un trabajo en una aplicación que se espera que se ejecute en un clúster. Además, los trabajos en entornos agrupados generalmente no necesitan preocuparse por otras instancias del mismo trabajo que se ejecutan en paralelo, por lo que otra razón por la cual el aislamiento de instancias de trabajo no es un gran requisito.

Una solución simple sería configurar sus trabajos dentro de un Perfil Spring. Por ejemplo, si su configuración actual es:

<beans> <bean id="someBean" .../> <task:scheduled-tasks> <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/> </task:scheduled-tasks> </beans>

cámbielo a:

<beans> <beans profile="scheduled"> <bean id="someBean" .../> <task:scheduled-tasks> <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/> </task:scheduled-tasks> </beans> </beans>

Luego, inicie su aplicación en una sola máquina con el perfil scheduled activado ( -Dspring.profiles.active=scheduled ).

Si el servidor primario no está disponible por alguna razón, simplemente inicie otro servidor con el perfil habilitado y las cosas continuarán funcionando bien.

Las cosas cambian si también desea una conmutación por error automática para los trabajos. Luego, deberá mantener el trabajo ejecutándose en todos los servidores y verificar la sincronización a través de un recurso común, como una tabla de base de datos, una caché en clúster, una variable JMX, etc.


Puede usar un planificador db-scheduler como db-scheduler para lograr esto. Tiene ejecuciones persistentes y utiliza un mecanismo de bloqueo optimista simple para garantizar la ejecución por un solo nodo.

Código de ejemplo de cómo se puede lograr el caso de uso:

RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60))) .execute((taskInstance, executionContext) -> { System.out.println("Executing " + taskInstance.getTaskAndInstance()); }); final Scheduler scheduler = Scheduler .create(dataSource) .startTasks(recurring1) .build(); scheduler.start();


dlock está diseñado para ejecutar tareas solo una vez mediante el uso de índices y restricciones de bases de datos. Simplemente puede hacer algo como a continuación.

@Scheduled(cron = "30 30 3 * * *") @TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES) public void execute() { }

Vea el article sobre su uso.