java - thread - ¿Cómo asegurarme de que no estoy compartiendo el mismo socket entre dos hilos al mismo tiempo?
hilos y sockets en java (4)
Yo diría que este código tiene varios problemas:
- getLiveSocket () puede devolver el mismo socket para múltiples hilos.
- java.util.Random no funciona bien con múltiples hilos.
- La instantánea de los sockets activos en getNextSocket () puede ser obsoleta debido a la invocación simultánea del método updateLiveSockets () que modifica esa instantánea.
- Si connectToZMQSockets () no verifica el liveness de los sockets, no hay sockets en vivo durante 60 segundos debido al retraso en el método scheduleAtFixedRate.
Además, no hay indicador para verificar si el socket está en uso y no está claro si el socket regresa al pool luego de que un hilo termina su trabajo con él.
Considere simplificar el código de la siguiente manera:
- Sus clases tienen referencias cíclicas entre sí, para mí es una señal de que debe haber una sola clase.
- No creo que tenga sentido comprobar periódicamente si todos los sockets están activos porque no garantiza que el estado del socket no cambie después de la verificación y antes del envío real, la mejor estrategia es verificar un socket en particular si se lo enviara. ha fallado.
- Es mejor limitar la administración de sockets en la estructura de datos segura de subprocesos en lugar de utilizar bloqueos explícitos, por ejemplo, en una cola de bloqueo. Tal estrategia permite utilizar todos los enchufes disponibles.
Aquí hay una muestra de código:
public class SendToSocket {
private final BlockingQueue<Socket> queue;
public SendToSocket() {
this.queue = new LinkedBlockingQueue<>();
// collect all available sockets
List<Socket> sockets = new ArrayList<>();
for (Socket socket : sockets) {
queue.add(socket);
}
}
public boolean send(final byte[] reco) throws InterruptedException {
// can be replaced with poll() method
Socket socket = queue.take();
// handle exceptions if needed
boolean status = sendInternal(socket, reco);
if (!status) {
// check whether socket is live
boolean live = ping(socket);
if (!live) {
// log error
return status;
}
}
// return socket back to pool
queue.add(socket);
return status;
}
private boolean sendInternal(Socket socket, byte[] reco) {
return true;
}
private boolean ping(Socket socket) {
return true;
}
}
Esta pregunta ya tiene una respuesta aquí:
Tengo un código en el que estoy tratando con sockets y necesito asegurarme de que no comparto el mismo socket entre dos hilos . En mi código de abajo, tengo un hilo de fondo que se ejecuta cada 60 segundos y llama updateLiveSockets()
método updateLiveSockets()
. En el método updateLiveSockets()
, repito todos los sockets que tengo y luego comienzo a hacer ping uno por uno llamando al método SendToQueue
clase SendToQueue
y SendToQueue
en la respuesta, los marque como en vivo o como muertos.
Ahora, todos los hilos del lector llamarán getNextSocket()
método getNextSocket()
simultáneamente para obtener el siguiente socket disponible, por lo que debe ser seguro para subprocesos y necesito asegurarme de que todos los subprocesos del lector vean el mismo estado de SocketHolder
y Socket
.
Debajo está mi clase de SocketManager
:
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 60, 60, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(List<String> paddes, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
// ....
return socketList;
}
// this method will be called by multiple threads concurrently to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// runs every 60 seconds to ping all the socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean status = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
}
Y aquí está mi clase SendToQueue
:
// this method will be called by multiple threads concurrently to send the data
public boolean sendAsync(final long address, final byte[] encodedRecords) {
Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
PendingMessage m = new PendingMessage(address, encodedRecords, liveSockets.get().getSocket(), true);
cache.put(address, m);
return doSendAsync(m, socket);
}
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A
return msg.send(socket);
} finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
// Alternatively (checks that address points to m):
// cache.asMap().remove(address, m);
cache.invalidate(address);
}
}
Planteamiento del problema
Ahora, como pueden ver, estoy compartiendo el mismo socket entre dos hilos. Parece que getNextSocket()
podría devolver un 0MQ socket
al thread A
Al mismo tiempo, el timer thread
del timer thread
puede acceder al mismo 0MQ socket
para hacer ping. En este caso, el thread A
y el timer thread
del timer thread
están mutando el mismo 0MQ socket
, lo que puede ocasionar problemas. Así que estoy tratando de encontrar una manera para evitar que diferentes hilos envíen datos al mismo socket al mismo tiempo y arruinar mis datos.
Así que decidí sincronizar el socket para que no haya dos subprocesos que puedan acceder al mismo socket al mismo tiempo. Debajo está el cambio que hice en el método updateLiveSockets
. Me sincronicé en el socket en el siguiente método:
// runs every 60 seconds to ping all the socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// using the socket as its own lock
synchronized (socket) {
// pinging to see whether a socket is live or not
boolean status = SendToQueue.getInstance().execute(message.getAddress(), message.getEncodedRecords(), socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
Y debajo está el cambio que hice en el método doSendAsync
. En esto también sincronicé en socket antes de enviarlo.
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A by synchronizing on it
synchronized (socket) {
return msg.send(socket);
}
} finally {
msg.destroy();
}
}
¿Cuál es la mejor manera de asegurarme de que no estoy compartiendo los mismos sockets entre dos hilos? En general, tengo alrededor de 60 sockets y 20 threads accediendo a esos sockets.
Si muchos subprocesos utilizan el mismo socket, los recursos no se utilizan bien. Además, si msg.send(socket);
está bloqueado (técnicamente no debería) todos los hilos que esperan este socket están bloqueados. Así que supongo que podría haber una forma mejor de asegurar que cada subproceso usa un socket único y directo diferente al mismo tiempo en lugar de la sincronización en un socket particular. ¿También hay alguna caja de esquina o caja de borde que me haya perdido, lo que puede provocar algún error?
Como expliqué en su otra pregunta, la mejor solución para su problema es usar ConcurrentQueue
Por ejemplo, así es como eliminaría las tomas que no están vivas y mantendría las que están vivas.
private final Map<Datacenters, ConcurrentLinkedQueue<SocketHolder>> liveSocketsByDatacenter =
new ConcurrentHashMap<>();
// llena la cola y el mapa
// runs every 60 seconds to ping 70 sockets the socket to make sure whether they are alive or not (it does not matter if you ping more sockets than there are in the list because you are rotating the que)
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
Queue<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
for (int i = 0; i<70; i++) {
SocketHolder s = liveSockets.poll();
Socket socket = s.getSocket();
String endpoint = s.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, s.getContext(), endpoint, isLive);
liveSockets.add(zmq);
}
}
}
No es necesario bloquear la actualización del estado activo ya que la actualización de un booleano es una operación atómica. Simplemente actualice de forma periódica activa en un hilo de fondo después de verificarlo fuera del grupo. Es posible que desee agregar una marca de tiempo en el nivel de instancia de Socket para cuando se envíe un mensaje, por lo que puede esperar 30 segundos adicionales para hacer ping. Un socket sin usar tendría que hacer ping más rápido que un socket usado.
Supongo que la idea es que el bloque sincronizado lea 2 booleanos y establezca un valor booleano, por lo que debería regresar inmediatamente. No desea enviar mientras está sincronizado porque bloqueará otros hilos durante un tiempo realmente largo.
Cualquier tipo de sincronización que haya visto realmente solo requiere la sincronización de dos o tres operaciones atómicas .
boolean got = false;
synchronized(obj) {
if(alive && available) {
available = false;
got = true;
}
}
if(got) {
... // all thread safe because available must be false
available = true; // atomic, no need to synchronize when done
}
Por lo general, puede encontrar una manera de evitar la sincronización al tener cuidado con el orden en que realiza las actualizaciones atómicas.
Por ejemplo, probablemente podría hacer que esto funcione sin ninguna sincronización utilizando un mapa en lugar de una lista para almacenar los sockets. Tendría que pensarlo, pero probablemente puedas hacer que este hilo sea seguro sin ninguna sincronización y luego sería mucho más rápido.
Nunca consideraría usar una colección segura para subprocesos como Hashtable en lugar de HashMap.
class Socket {
Socket() {
alive = true;
available = true;
last = System.currentTimeMillis();
}
private synchronized boolean tryToGet() {
// should return pretty fast - only 3 atomic operations
if(alive && available) {
available = false;
return true;
}
return false;
}
public boolean send() {
if(tryToGet()) {
// do it
last = System.currentTimeMillis();
available = true; // no need to lock atomic operation
return true;
}
return false;
}
private boolean ping() { // ... }
public void pingIfNecessary() {
// long update may not be atomic, cast to int if not
if(alive && (System.currentTimeMillis() - last) > 30000) {
if(tryToGet()) {
// other pingIfNecessary() calls have to wait
if(ping()) {
last = System.currentTimeMillis();
} else {
alive = false;
}
available = true;
}
}
}
private boolean alive;
private boolean available;
private long last;
};
void sendUsingPool(String s) {
boolean sent = false;
while(!sent) {
for(Socket socket : sockets) {
if(socket.send(s)) {
sent = true;
break;
}
}
if(!sent) {
// increase this number if you want to be nicer
try { Thread.sleep(1); } catch (Exception e) { }
}
}
}
public void run() {
while(true) {
for(Socket socket : sockets) {
socket.pingIfNecessary();
}
try { Thread.sleep(100); } catch (Exception e) { }
}
}
En primer lugar, necesita una forma para que los clientes le notifiquen que han terminado de usar un Socket
. Podría agregar un método que les permita señalar esto. Eso es legítimo y funcionará, pero tendrá que confiar en que sus clientes se porten bien. O mejor dicho, que el programador que usa su socket no se olvide de devolverlo. Hay un patrón que ayuda a resolver esto: el patrón de ejecución alrededor . En lugar de dar un Socket
, se crea un método que acepta un Consumer<Socket>
, y luego ejecuta el consumidor, y se devuelve el Socket
.
public void useSocket(Consumer<Socket> socketUser) {
Socket socket = getSocket();
try {
socketUser.accept(socket);
} finally {
returnSocket(socket);
}
}
Ahora veamos cómo implementaremos getSocket()
y returnSocket()
. Claramente, implica obtenerlos de algún tipo de colección y devolverlos a esa colección. Una Queue
es una buena opción aquí (como otros también han notado). Permite obtenerlo desde un lado, y regresar por el otro, además de que hay muchas implementaciones seguras de subprocesos eficientes, y los usuarios y sumadores generalmente no están en disputa entre sí. Como sabes la cantidad de sockets de antemano, optaría por un ArrayBlockingQueue
.
Una preocupación adicional aquí es que su implementación devuelve un Optional
. No estoy seguro de lo que harán sus clientes si no hay un Socket
disponible, pero si está esperando y reintentando, le sugiero que simplemente haga el bloqueo de getSocket()
en la cola. Tal como están las cosas, respetaré este aspecto de su enfoque y tendré en cuenta que es posible que no haya un Socket
disponible. Para el enfoque de ejecución, esto traducirá esto en el método useSocket()
devolviendo false
si no hay ningún Socket
disponible.
private final BlockingQueue<Socket> queue;
public SocketPool(Set<Socket> sockets) {
queue = new ArrayBlockingQueue<>(sockets.size());
queue.addAll(sockets);
}
public boolean useSocket(Consumer<Socket> socketUser) throws InterruptedException {
Optional<Socket> maybeSocket = getSocket();
try {
maybeSocket.ifPresent(socketUser);
return maybeSocket.isPresent();
} finally {
maybeSocket.ifPresent(this::returnSocket);
}
}
private void returnSocket(Socket socket) {
queue.add(socket);
}
private Optional<Socket> getSocket() throws InterruptedException {
return Optional.ofNullable(queue.poll());
}
Ahí, eso es, ese es tu SocketPool
.
Ah, pero luego el tacaño: la comprobación de la vitalidad. Es mezquino porque tu control de vida en realidad compite con tus clientes habituales.
Para abordar esto, sugiero lo siguiente: deje que sus clientes informen si el Socket
que recibieron fue en vivo o no. Dado que la verificación de la vitalidad se reduce a usar el Socket, esto debería ser sencillo para sus clientes.
Entonces, en lugar de un Consumer<Socket>
, tomaremos una Function<Socket, Boolean>
. Y si la función devuelve false
, consideraremos que el Socket
ya no está activo. En ese caso, en lugar de volver a agregarlo a la cola normal, lo agregamos a una colección de Sockets muertos, y tendremos una tarea programada, que volverá a verificar los sockets muertos de forma intermitente. Como esto sucede en una colección separada, la verificación programada ya no compite con los clientes habituales.
Ahora puede crear un SocketManager
con un Map
que SocketPool
centros de datos a instancias de SocketPool
. Este mapa no necesita cambiar, por lo que puede hacer que sea definitivo e inicializarlo en el SocketManager
del SocketManager
.
Este es mi código preliminar para SocketPool
(no probado):
class SocketPool implements AutoCloseable {
private final BlockingQueue<Socket> queue;
private final Queue<Socket> deadSockets = new ConcurrentLinkedQueue<>();
private final ScheduledFuture<?> scheduledFuture;
public SocketPool(Set<Socket> sockets, ScheduledExecutorService scheduledExecutorService) {
queue = new ArrayBlockingQueue<>(sockets.size());
queue.addAll(sockets);
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this::recheckDeadSockets, 60, 60, TimeUnit.SECONDS);
}
public boolean useSocket(Function<Socket, Boolean> socketUser) throws InterruptedException {
Optional<Socket> maybeSocket = getSocket();
boolean wasLive = true;
try {
wasLive = maybeSocket.map(socketUser).orElse(false);
return wasLive && maybeSocket.isPresent();
} finally {
boolean isLive = wasLive;
maybeSocket.ifPresent(socket -> {
if (isLive) {
returnSocket(socket);
} else {
reportDead(socket);
}
});
}
}
private void reportDead(Socket socket) {
deadSockets.add(socket);
}
private void returnSocket(Socket socket) {
queue.add(socket);
}
private Optional<Socket> getSocket() throws InterruptedException {
return Optional.ofNullable(queue.poll());
}
private void recheckDeadSockets() {
for (int i = 0; i < deadSockets.size(); i++) {
Socket socket = deadSockets.poll();
if (checkAlive(socket)) {
queue.add(socket);
} else {
deadSockets.add(socket);
}
}
}
private boolean checkAlive(Socket socket) {
// do actual live check with SendSocket class, or implement directly in this one
return true;
}
@Override
public void close() throws Exception {
scheduledFuture.cancel(true);
}
}