websockets example android spring websocket stomp

android - example - websocket spring boot angular 4



Configure un cliente Stomp en Android con el marco Spring en el lado del servidor (3)

Estoy desarrollando una aplicación para Android que intercambia datos con un servidor jetty configurado en Spring. Para obtener una aplicación de Android más dinámica, estoy tratando de usar el protocolo WebSocket con mensajes de Stomp.

Para realizar esto, configuro un intermediario de mensajes de socket web en primavera:

@Configuration //@EnableScheduling @ComponentScan( basePackages="project.web", excludeFilters = @ComponentScan.Filter(type= FilterType.ANNOTATION, value = Configuration.class) ) @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.enableSimpleBroker("/message"); config.setApplicationDestinationPrefixes("/app"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/client"); } }

y un controlador SimpMessageSendingOperations in Spring para enviar mensajes del servidor al cliente:

@Controller public class MessageAddController { private final Log log = LogFactory.getLog(MessageAddController.class); private SimpMessageSendingOperations messagingTemplate; private UserManager userManager; private MessageManager messageManager; @Autowired public MessageAddController(SimpMessageSendingOperations messagingTemplate, UserManager userManager, MessageManager messageManager){ this.messagingTemplate = messagingTemplate; this.userManager = userManager; this.messageManager = messageManager; } @RequestMapping("/Message/Add") @ResponseBody public SimpleMessage addFriendship( @RequestParam String content, @RequestParam Long otherUser_id ){ if(log.isInfoEnabled()) log.info("Execute MessageAdd action"); SimpleMessage simpleMessage; try{ User curentUser = userManager.getCurrentUser(); User otherUser = userManager.findUser(otherUser_id); Message message = new Message(); message.setContent(content); message.setUserSender(curentUser); message.setUserReceiver(otherUser); messageManager.createMessage(message); Message newMessage = messageManager.findLastMessageCreated(); messagingTemplate.convertAndSend( "/message/add", newMessage);//send message through websocket simpleMessage = new SimpleMessage(null, newMessage); } catch (Exception e) { if(log.isErrorEnabled()) log.error("A problem of type : " + e.getClass() + " has occured, with message : " + e.getMessage()); simpleMessage = new SimpleMessage( new SimpleException(e.getClass(), e.getMessage()), null); } return simpleMessage; } }

Cuando pruebo esta configuración en un navegador web con stomp.js, no tengo ningún problema: los mensajes se intercambian perfectamente entre el navegador web y el servidor Jetty. El código JavaScript que se utiliza para la prueba del navegador web:

var stompClient = null; function setConnected(connected) { document.getElementById(''connect'').disabled = connected; document.getElementById(''disconnect'').disabled = !connected; document.getElementById(''conversationDiv'').style.visibility = connected ? ''visible'' : ''hidden''; document.getElementById(''response'').innerHTML = ''''; } function connect() { stompClient = Stomp.client("ws://YOUR_IP/client"); stompClient.connect({}, function(frame) { setConnected(true); stompClient.subscribe(''/message/add'', function(message){ showMessage(JSON.parse(message.body).content); }); }); } function disconnect() { stompClient.disconnect(); setConnected(false); console.log("Disconnected"); } function showMessage(message) { var response = document.getElementById(''response''); var p = document.createElement(''p''); p.style.wordWrap = ''break-word''; p.appendChild(document.createTextNode(message)); response.appendChild(p); }

Los problemas ocurren cuando trato de usar stomp en Android con bibliotecas como gozirra, activemq-stomp u otras: la mayoría de las veces, la conexión con el servidor no funciona. Mi aplicación se detiene para ejecutarse y, después de unos minutos, aparece el siguiente mensaje en logcat: java.net.UnknownHostException: Unable to resolve host "ws://192.168.1.39/client": No address associated with hostname , y yo no entiendo por qué Código usando la biblioteca de Gozzira que gestiona el atractivo de stomp en mi actividad de Android:

private void stomp_test() { String ip = "ws://192.172.6.39/client"; int port = 8080; String channel = "/message/add"; Client c; try { c = new Client( ip, port, "", "" ); Log.i("Stomp", "Connection established"); c.subscribe( channel, new Listener() { public void message( Map header, String message ) { Log.i("Stomp", "Message received!!!"); } }); } catch (IOException ex) { Log.e("Stomp", ex.getMessage()); ex.printStackTrace(); } catch (LoginException ex) { Log.e("Stomp", ex.getMessage()); ex.printStackTrace(); } catch (Exception ex) { Log.e("Stomp", ex.getMessage()); ex.printStackTrace(); } }

Después de algunas investigaciones, encontré que la mayoría de las personas que quieren usar stomp sobre websocket con Java Client usan el servidor ActiveMQ, como en este sitio . Pero las herramientas de resorte son muy simples de usar y sería genial si pudiera mantener mi capa de servidor como es ahora. ¿Alguien sabría cómo usar stomp java (Android) en el lado del cliente con la configuración de Spring en el lado del servidor?


Logré usar stomp sobre socket web con Android y Spring Server.

Para hacer tal cosa, utilicé una biblioteca de socket web: werbench (siga este enlace para descargarlo). Para instalar, utilicé el comando maven mvn install y mvn install el jar en mi repositorio local. Entonces, necesito agregar una capa de stomp en el socket web básico uno, pero no pude encontrar ninguna biblioteca de stomp en Java que pudiera administrar stomp over web socket (tuve que renunciar a Gozzira). Así que creo mi propio (con stomp.js como modelo). No dudes en preguntarme si quieres echarle un vistazo, pero me di cuenta muy rápido, así que no puede manejar tanto como stomp.js. Entonces, necesito realizar una autenticación con mi servidor de primavera. Para lograrlo, seguí la indicación de este sitio . cuando recupero la cookie JSESSIONID, solo tenía que declarar un encabezado con esta cookie en la instanciación de un socket web werbench en mi "biblioteca" stomp.

EDITAR: esta es la clase principal en esta biblioteca, la que administra la conexión del zócalo sobre el zócalo web:

import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import android.util.Log; import de.roderick.weberknecht.WebSocket; import de.roderick.weberknecht.WebSocketEventHandler; import de.roderick.weberknecht.WebSocketMessage; public class Stomp { private static final String TAG = Stomp.class.getSimpleName(); public static final int CONNECTED = 1;//Connection completely established public static final int NOT_AGAIN_CONNECTED = 2;//Connection process is ongoing public static final int DECONNECTED_FROM_OTHER = 3;//Error, no more internet connection, etc. public static final int DECONNECTED_FROM_APP = 4;//application explicitely ask for shut down the connection private static final String PREFIX_ID_SUBSCIPTION = "sub-"; private static final String ACCEPT_VERSION_NAME = "accept-version"; private static final String ACCEPT_VERSION = "1.1,1.0"; private static final String COMMAND_CONNECT = "CONNECT"; private static final String COMMAND_CONNECTED = "CONNECTED"; private static final String COMMAND_MESSAGE = "MESSAGE"; private static final String COMMAND_RECEIPT = "RECEIPT"; private static final String COMMAND_ERROR = "ERROR"; private static final String COMMAND_DISCONNECT = "DISCONNECT"; private static final String COMMAND_SEND = "SEND"; private static final String COMMAND_SUBSCRIBE = "SUBSCRIBE"; private static final String COMMAND_UNSUBSCRIBE = "UNSUBSCRIBE"; private static final String SUBSCRIPTION_ID = "id"; private static final String SUBSCRIPTION_DESTINATION = "destination"; private static final String SUBSCRIPTION_SUBSCRIPTION = "subscription"; private static final Set<String> VERSIONS = new HashSet<String>(); static { VERSIONS.add("V1.0"); VERSIONS.add("V1.1"); VERSIONS.add("V1.2"); } private WebSocket websocket; private int counter; private int connection; private Map<String, String> headers; private int maxWebSocketFrameSize; private Map<String, Subscription> subscriptions; private ListenerWSNetwork networkListener; /** * Constructor of a stomp object. Only url used to set up a connection with a server can be instantiate * * @param url * the url of the server to connect with */ public Stomp(String url, Map<String,String> headersSetup, ListenerWSNetwork stompStates){ try { this.websocket = new WebSocket(new URI(url), null, headersSetup); this.counter = 0; this.headers = new HashMap<String, String>(); this.maxWebSocketFrameSize = 16 * 1024; this.connection = NOT_AGAIN_CONNECTED; this.networkListener = stompStates; this.networkListener.onState(NOT_AGAIN_CONNECTED); this.subscriptions = new HashMap<String, Subscription>(); this.websocket.setEventHandler(new WebSocketEventHandler() { @Override public void onOpen(){ if(Stomp.this.headers != null){ Stomp.this.headers.put(ACCEPT_VERSION_NAME, ACCEPT_VERSION); transmit(COMMAND_CONNECT, Stomp.this.headers, null); Log.d(TAG, "...Web Socket Openned"); } } @Override public void onMessage(WebSocketMessage message) { Log.d(TAG, "<<< " + message.getText()); Frame frame = Frame.fromString(message.getText()); boolean isMessageConnected = false; if(frame.getCommand().equals(COMMAND_CONNECTED)){ Stomp.this.connection = CONNECTED; Stomp.this.networkListener.onState(CONNECTED); Log.d(TAG, "connected to server : " + frame.getHeaders().get("server")); isMessageConnected = true; } else if(frame.getCommand().equals(COMMAND_MESSAGE)){ String subscription = frame.getHeaders().get(SUBSCRIPTION_SUBSCRIPTION); ListenerSubscription onReceive = Stomp.this.subscriptions.get(subscription).getCallback(); if(onReceive != null){ onReceive.onMessage(frame.getHeaders(), frame.getBody()); } else{ Log.e(TAG, "Error : Subscription with id = " + subscription + " had not been subscribed"); //ACTION TO DETERMINE TO MANAGE SUBCRIPTION ERROR } } else if(frame.getCommand().equals(COMMAND_RECEIPT)){ //I DON''T KNOW WHAT A RECEIPT STOMP MESSAGE IS } else if(frame.getCommand().equals(COMMAND_ERROR)){ Log.e(TAG, "Error : Headers = " + frame.getHeaders() + ", Body = " + frame.getBody()); //ACTION TO DETERMINE TO MANAGE ERROR MESSAGE } else { } if(isMessageConnected) Stomp.this.subscribe(); } @Override public void onClose(){ if(connection == DECONNECTED_FROM_APP){ Log.d(TAG, "Web Socket disconnected"); disconnectFromApp(); } else{ Log.w(TAG, "Problem : Web Socket disconnected whereas Stomp disconnect method has never " + "been called."); disconnectFromServer(); } } @Override public void onPing() { } @Override public void onPong() { } @Override public void onError(IOException e) { Log.e(TAG, "Error : " + e.getMessage()); } }); } catch (URISyntaxException e) { e.printStackTrace(); } } /** * Send a message to server thanks to websocket * * @param command * one of a frame property, see {@link Frame} for more details * @param headers * one of a frame property, see {@link Frame} for more details * @param body * one of a frame property, see {@link Frame} for more details */ private void transmit(String command, Map<String, String> headers, String body){ String out = Frame.marshall(command, headers, body); Log.d(TAG, ">>> " + out); while (true) { if (out.length() > this.maxWebSocketFrameSize) { this.websocket.send(out.substring(0, this.maxWebSocketFrameSize)); out = out.substring(this.maxWebSocketFrameSize); } else { this.websocket.send(out); break; } } } /** * Set up a web socket connection with a server */ public void connect(){ if(this.connection != CONNECTED){ Log.d(TAG, "Opening Web Socket..."); try{ this.websocket.connect(); } catch (Exception e){ Log.w(TAG, "Impossible to establish a connection : " + e.getClass() + ":" + e.getMessage()); } } } /** * disconnection come from the server, without any intervention of client side. Operations order is very important */ private void disconnectFromServer(){ if(this.connection == CONNECTED){ this.connection = DECONNECTED_FROM_OTHER; this.websocket.close(); this.networkListener.onState(this.connection); } } /** * disconnection come from the app, because the public method disconnect was called */ private void disconnectFromApp(){ if(this.connection == DECONNECTED_FROM_APP){ this.websocket.close(); this.networkListener.onState(this.connection); } } /** * Close the web socket connection with the server. Operations order is very important */ public void disconnect(){ if(this.connection == CONNECTED){ this.connection = DECONNECTED_FROM_APP; transmit(COMMAND_DISCONNECT, null, null); } } /** * Send a simple message to the server thanks to the body parameter * * * @param destination * The destination through a Stomp message will be send to the server * @param headers * headers of the message * @param body * body of a message */ public void send(String destination, Map<String,String> headers, String body){ if(this.connection == CONNECTED){ if(headers == null) headers = new HashMap<String, String>(); if(body == null) body = ""; headers.put(SUBSCRIPTION_DESTINATION, destination); transmit(COMMAND_SEND, headers, body); } } /** * Allow a client to send a subscription message to the server independently of the initialization of the web socket. * If connection have not been already done, just save the subscription * * @param subscription * a subscription object */ public void subscribe(Subscription subscription){ subscription.setId(PREFIX_ID_SUBSCIPTION + this.counter++); this.subscriptions.put(subscription.getId(), subscription); if(this.connection == CONNECTED){ Map<String, String> headers = new HashMap<String, String>(); headers.put(SUBSCRIPTION_ID, subscription.getId()); headers.put(SUBSCRIPTION_DESTINATION, subscription.getDestination()); subscribe(headers); } } /** * Subscribe to a Stomp channel, through messages will be send and received. A message send from a determine channel * can not be receive in an another. * */ private void subscribe(){ if(this.connection == CONNECTED){ for(Subscription subscription : this.subscriptions.values()){ Map<String, String> headers = new HashMap<String, String>(); headers.put(SUBSCRIPTION_ID, subscription.getId()); headers.put(SUBSCRIPTION_DESTINATION, subscription.getDestination()); subscribe(headers); } } } /** * Send the subscribe to the server with an header * @param headers * header of a subscribe STOMP message */ private void subscribe(Map<String, String> headers){ transmit(COMMAND_SUBSCRIBE, headers, null); } /** * Destroy a subscription with its id * * @param id * the id of the subscription. This id is automatically setting up in the subscribe method */ public void unsubscribe(String id){ if(this.connection == CONNECTED){ Map<String, String> headers = new HashMap<String, String>(); headers.put(SUBSCRIPTION_ID, id); this.subscriptions.remove(id); this.transmit(COMMAND_UNSUBSCRIBE, headers, null); } } }

Este es el mensaje Marco de un Stomp:

import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; public class Frame { // private final static String CONTENT_LENGTH = "content-length"; private String command; private Map<String, String> headers; private String body; /** * Constructor of a Frame object. All parameters of a frame can be instantiate * * @param command * @param headers * @param body */ public Frame(String command, Map<String, String> headers, String body){ this.command = command; this.headers = headers != null ? headers : new HashMap<String, String>(); this.body = body != null ? body : ""; } public String getCommand(){ return command; } public Map<String, String> getHeaders(){ return headers; } public String getBody(){ return body; } /** * Transform a frame object into a String. This method is copied on the objective C one, in the MMPReactiveStompClient * library * @return a frame object convert in a String */ private String toStringg(){ String strLines = this.command; strLines += Byte.LF; for(String key : this.headers.keySet()){ strLines += key + ":" + this.headers.get(key); strLines += Byte.LF; } strLines += Byte.LF; strLines += this.body; strLines += Byte.NULL; return strLines; } /** * Create a frame from a received message. This method is copied on the objective C one, in the MMPReactiveStompClient * library * * @param data * a part of the message received from network, which represented a frame * @return * An object frame */ public static Frame fromString(String data){ List<String> contents = new ArrayList<String>(Arrays.asList(data.split(Byte.LF))); while(contents.size() > 0 && contents.get(0).equals("")){ contents.remove(0); } String command = contents.get(0); Map<String, String> headers = new HashMap<String, String>(); String body = ""; contents.remove(0); boolean hasHeaders = false; for(String line : contents){ if(hasHeaders){ for(int i=0; i < line.length(); i++){ Character c = line.charAt(i); if(!c.equals(''/0'')) body += c; } } else{ if(line.equals("")){ hasHeaders = true; } else { String[] header = line.split(":"); headers.put(header[0], header[1]); } } } return new Frame(command, headers, body); } // No need this method, a single frame will be always be send because body of the message will never be excessive // /** // * Transform a message received from server in a Set of objects, named frame, manageable by java // * // * @param datas // * message received from network // * @return // * a Set of Frame // */ // public static Set<Frame> unmarshall(String datas){ // String data; // String[] ref = datas.split(Byte.NULL + Byte.LF + "*");//NEED TO VERIFY THIS PARAMETER // Set<Frame> results = new HashSet<Frame>(); // // for (int i = 0, len = ref.length; i < len; i++) { // data = ref[i]; // // if ((data != null ? data.length() : 0) > 0){ // results.add(unmarshallSingle(data));//"unmarshallSingle" is the old name method for "fromString" // } // } // return results; // } /** * Create a frame with based fame component and convert them into a string * * @param command * @param headers * @param body * @return a frame object convert in a String, thanks to <code>toStringg()</code> method */ public static String marshall(String command, Map<String, String> headers, String body){ Frame frame = new Frame(command, headers, body); return frame.toStringg(); } private class Byte { public static final String LF = "/n"; public static final String NULL = "/0"; } }

Este es un objeto utilizado para establecer una suscripción a través del protocolo stomp:

public class Subscription { private String id; private String destination; private ListenerSubscription callback; public Subscription(String destination, ListenerSubscription callback){ this.destination = destination; this.callback = callback; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getDestination() { return destination; } public ListenerSubscription getCallback() { return callback; } }

Al menos, hay dos interfaces utilizadas como la clase Java "Ejecutar", para escuchar la red de socket web y un canal de suscripción determinado

public interface ListenerWSNetwork { public void onState(int state); } import java.util.Map; public interface ListenerSubscription { public void onMessage(Map<String, String> headers, String body); }

Para obtener más información, no dude en preguntarme.


La solución perfecta Gracias Eperrin. Me gustaría llenar la solución completa, por ejemplo, en su fase de Actividad / Servicio que llama método de connection , por supuesto, no en MainThread.

private void connection() { Map<String,String> headersSetup = new HashMap<String,String>(); Stomp stomp = new Stomp(hostUrl, headersSetup, new ListenerWSNetwork() { @Override public void onState(int state) { } }); stomp.connect(); stomp.subscribe(new Subscription(testUrl, new ListenerSubscription() { @Override public void onMessage(Map<String, String> headers, String body) { } })); }

Y tenga cuidado en la biblioteca weberknecht de websocket es un error en la clase WebSocketHandshake en el método verifyServerHandshakeHeaders en la línea 124 es solo verificar si (! Headers.get ("Connection"). Equal ("Upgrade")) y cuando el servidor envía actualización en lugar de Upgrade se produce un error en la conexión: campo de encabezado faltante en el protocolo de enlace del servidor: la conexión debe desactivarse ignorar casos si (! headers.get ("Conexión"). es igual aIgnoreCase ("Actualizar"))


Mi implementación del protocolo STOMP para Android (o Java simple) con RxJava https://github.com/NaikSoftware/StompProtocolAndroid . Probado en el servidor STOMP con SpringBoot. Ejemplo simple (con retrolambda ):

private StompClient mStompClient; // ... mStompClient = Stomp.over(WebSocket.class, "ws://localhost:8080/app/hello/websocket"); mStompClient.connect(); mStompClient.topic("/topic/greetings").subscribe(topicMessage -> { Log.d(TAG, topicMessage.getPayload()); }); mStompClient.send("/app/hello", "My first STOMP message!"); // ... mStompClient.disconnect();

Agregue la siguiente classpath en el proyecto:

classpath ''me.tatarka:gradle-retrolambda:3.2.0''

Agregue lo siguiente en su aplicación build.gradle:

apply plugin: ''me.tatarka.retrolambda'' android { ............. compileOptions { sourceCompatibility JavaVersion.VERSION_1_8 targetCompatibility JavaVersion.VERSION_1_8 } } dependencies { ............................ compile ''org.java-websocket:Java-WebSocket:1.3.0'' compile ''com.github.NaikSoftware:StompProtocolAndroid:1.1.5'' }

Todos trabajando asincrónicamente! Puede llamar a connect() después de subscribe() y send() , los mensajes se enviarán a la cola.

Características adicionales:

  • encabezados HTTP adicionales para la consulta del protocolo de enlace (para pasar el token de autenticación u otro)
  • puede implementar transporte propio para la biblioteca, solo implemente la interfaz ConnectionProvider
  • suscribir eventos del ciclo de vida de la conexión (conectado, cerrado, error)

Por ejemplo :

public class MainActivity extends AppCompatActivity { private StompClient mStompClient; public static final String TAG="StompClient"; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); Button view = (Button) findViewById(R.id.button); view.setOnClickListener(e-> new LongOperation().execute("")); } private class LongOperation extends AsyncTask<String, Void, String> { private StompClient mStompClient; String TAG="LongOperation"; @Override protected String doInBackground(String... params) { mStompClient = Stomp.over(WebSocket.class, "ws://localhost:8080/app/hello/websocket"); mStompClient.connect(); mStompClient.topic("/topic/greetings").subscribe(topicMessage -> { Log.d(TAG, topicMessage.getPayload()); }); mStompClient.send("/app/hello", "My first STOMP message!").subscribe(); mStompClient.lifecycle().subscribe(lifecycleEvent -> { switch (lifecycleEvent.getType()) { case OPENED: Log.d(TAG, "Stomp connection opened"); break; case ERROR: Log.e(TAG, "Error", lifecycleEvent.getException()); break; case CLOSED: Log.d(TAG, "Stomp connection closed"); break; } }); return "Executed"; } @Override protected void onPostExecute(String result) { } } }

Agregar permiso de Internet en manifest.xml

<uses-permission android:name="android.permission.INTERNET" />