java - tipos - ¿Es seguro este hilo de llamada de cliente JAX-WS?
web service java netbeans (4)
Como puede ver en las respuestas anteriores, los proxies de cliente JAX-WS no son seguros para subprocesos, así que solo quería compartir mi implementación con otras personas para almacenar en caché los proxies de cliente. De hecho, enfrenté el mismo problema y decidí crear un bean spring que haga el almacenamiento en caché de los proxies del Cliente JAX-WS. Puede ver más detalles en http://programtalk.com/java/using-spring-and-scheduler-to-store/
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Component;
/**
* This keeps the cache of MAX_CUNCURRENT_THREADS number of
* appConnections and tries to shares them equally amongst the threads. All the
* connections are created right at the start and if an error occurs then the
* cache is created again.
*
*/
/*
*
* Are JAX-WS client proxies thread safe? <br/> According to the JAX-WS spec,
* the client proxies are NOT thread safe. To write portable code, you should
* treat them as non-thread safe and synchronize access or use a pool of
* instances or similar.
*
*/
@Component
public class AppConnectionCache {
private static final Logger logger = org.apache.logging.log4j.LogManager.getLogger(AppConnectionCache.class);
private final Map<Integer, MyService> connectionCache = new ConcurrentHashMap<Integer, MyService>();
private int cachedConnectionId = 1;
private static final int MAX_CUNCURRENT_THREADS = 20;
private ScheduledExecutorService scheduler;
private boolean forceRecaching = true; // first time cache
@PostConstruct
public void init() {
logger.info("starting appConnectionCache");
logger.info("start caching connections"); ;;
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("appconnectioncache-scheduler-thread-%d").build();
scheduler = Executors.newScheduledThreadPool(1, factory);
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
initializeCache();
}
}, 0, 10, TimeUnit.MINUTES);
}
public void destroy() {
scheduler.shutdownNow();
}
private void initializeCache() {
if (!forceRecaching) {
return;
}
try {
loadCache();
forceRecaching = false; // this flag is used for initializing
logger.info("connections creation finished successfully!");
} catch (MyAppException e) {
logger.error("error while initializing the cache");
}
}
private void loadCache() throws MyAppException {
logger.info("create and cache appservice connections");
for (int i = 0; i < MAX_CUNCURRENT_THREADS; i++) {
tryConnect(i, true);
}
}
public MyPort getMyPort() throws MyAppException {
if (cachedConnectionId++ == MAX_CUNCURRENT_THREADS) {
cachedConnectionId = 1;
}
return tryConnect(cachedConnectionId, forceRecaching);
}
private MyPort tryConnect(int threadNum, boolean forceConnect) throws MyAppException {
boolean connect = true;
int tryNum = 0;
MyPort app = null;
while (connect && !Thread.currentThread().isInterrupted()) {
try {
app = doConnect(threadNum, forceConnect);
connect = false;
} catch (Exception e) {
tryNum = tryReconnect(tryNum, e);
}
}
return app;
}
private int tryReconnect(int tryNum, Exception e) throws MyAppException {
logger.warn(Thread.currentThread().getName() + " appservice service not available! : " + e);
// try 10 times, if
if (tryNum++ < 10) {
try {
logger.warn(Thread.currentThread().getName() + " wait 1 second");
Thread.sleep(1000);
} catch (InterruptedException f) {
// restore interrupt
Thread.currentThread().interrupt();
}
} else {
logger.warn(" appservice could not connect, number of times tried: " + (tryNum - 1));
this.forceRecaching = true;
throw new MyAppException(e);
}
logger.info(" try reconnect number: " + tryNum);
return tryNum;
}
private MyPort doConnect(int threadNum, boolean forceConnect) throws InterruptedException {
MyService service = connectionCache.get(threadNum);
if (service == null || forceConnect) {
logger.info("app service connects : " + (threadNum + 1) );
service = new MyService();
connectionCache.put(threadNum, service);
logger.info("connect done for " + (threadNum + 1));
}
return service.getAppPort();
}
}
Dado que la inicialización del servicio y el puerto del cliente WS lleva años, me gusta inicializarlos una vez en el inicio y reutilizar la misma instancia del puerto. La inicialización sería algo así:
private static RequestContext requestContext = null;
static
{
MyService service = new MyService();
MyPort myPort = service.getMyServicePort();
Map<String, Object> requestContextMap = ((BindingProvider) myPort).getRequestContext();
requestContextMap = ((BindingProvider)myPort).getRequestContext();
requestContextMap.put(BindingProvider.USERNAME_PROPERTY, uName);
requestContextMap.put(BindingProvider.PASSWORD_PROPERTY, pWord);
rc = new RequestContext();
rc.setApplication("test");
rc.setUserId("test");
}
La llamada en algún lugar de mi clase:
myPort.someFunctionCall(requestContext, "someValue");
Mi pregunta: ¿Será esta llamada segura para subprocesos?
En general, no.
De acuerdo con las preguntas frecuentes de CXF http://cxf.apache.org/faq.html#FAQ-AreJAX-WSclientproxiesthreadsafe?
Respuesta oficial de JAX-WS: No. Según la especificación de JAX-WS, los proxies del cliente NO son seguros para subprocesos. Para escribir código portátil, debe tratarlos como seguros sin subprocesos y sincronizar el acceso o usar un conjunto de instancias o similar.
Respuesta de CXF: los proxies CXF son seguros para subprocesos para MUCHOS casos de uso.
Para obtener una lista de excepciones, consulte las preguntas frecuentes.
Según las preguntas frecuentes de CXF :
¿Son seguros los subprocesos de cliente JAX-WS seguros?
Respuesta oficial de JAX-WS: No. Según la especificación de JAX-WS, los proxies del cliente NO son seguros para subprocesos. Para escribir código portátil, debe tratarlos como seguros sin subprocesos y sincronizar el acceso o usar un conjunto de instancias o similar.
Respuesta de CXF: los proxies CXF son seguros para subprocesos para MUCHOS casos de uso. Las excepciones son:
Uso de
((BindingProvider)proxy).getRequestContext()
- según la especificación de JAX-WS, el contexto de la solicitud es POR INSTANCIA. Por lo tanto, cualquier cosa que se establezca allí afectará las solicitudes en otros subprocesos. Con CXF, puedes hacer:
((BindingProvider)proxy).getRequestContext().put("thread.local.request.context","true");
y las llamadas futuras a getRequestContext () utilizarán un contexto de solicitud local del hilo. Eso permite que el contexto de la solicitud sea seguro para subprocesos. (Nota: el contexto de respuesta siempre es un hilo local en CXF)
Configuración en el conducto: si usa un código o configuración para manipular directamente el conducto (como para establecer la configuración TLS o similar), no es seguro para subprocesos. El conducto es por instancia y, por lo tanto, esas configuraciones se compartirían. Además, si usa FailoverFeature y LoadBalanceFeatures, el conducto se reemplaza sobre la marcha. Por lo tanto, la configuración establecida en el conducto podría perderse antes de ser utilizada en el hilo de configuración.
- Soporte de sesión: si activa el soporte de sesiones (consulte las especificaciones de jaxws), la cookie de sesión se almacena en el conducto. Por lo tanto, caería dentro de las reglas anteriores sobre la configuración de los conductos y, por lo tanto, se compartiría en subprocesos.
- Tokens WS-Security: si se usa WS-SecureConversation o WS-Trust, el token recuperado se almacena en caché en el Endpoint / Proxy para evitar las llamadas adicionales (y caras) al STS para obtener tokens. Así, múltiples hilos compartirán el token. Si cada subproceso tiene requisitos o credenciales de seguridad diferentes, debe usar instancias de proxy separadas.
Para los problemas de conductos, PODRÍA instalar un nuevo ConduitSelector que use un hilo local o similar. Aunque eso es un poco complejo.
Para la mayoría de los casos de uso "simples", puede usar proxies CXF en múltiples hilos. Lo anterior describe las soluciones para los demás.
Una solución general para esto es usar varios objetos de cliente en un grupo, luego usar un proxy que actúa como una fachada.
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
class ServiceObjectPool<T> extends GenericObjectPool<T> {
public ServiceObjectPool(java.util.function.Supplier<T> factory) {
super(new BasePooledObjectFactory<T>() {
@Override
public T create() throws Exception {
return factory.get();
}
@Override
public PooledObject<T> wrap(T obj) {
return new DefaultPooledObject<>(obj);
}
});
}
public static class PooledServiceProxy<T> implements InvocationHandler {
private ServiceObjectPool<T> pool;
public PooledServiceProxy(ServiceObjectPool<T> pool) {
this.pool = pool;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
T t = null;
try {
t = this.pool.borrowObject();
return method.invoke(t, args);
} finally {
if (t != null)
this.pool.returnObject(t);
}
}
}
@SuppressWarnings("unchecked")
public T getProxy(Class<? super T> interfaceType) {
PooledServiceProxy<T> handler = new PooledServiceProxy<>(this);
return (T) Proxy.newProxyInstance(interfaceType.getClassLoader(),
new Class<?>[]{interfaceType}, handler);
}
}
Para utilizar el proxy:
ServiceObjectPool<SomeNonThreadSafeService> servicePool = new ServiceObjectPool<>(createSomeNonThreadSafeService);
nowSafeService = servicePool .getProxy(SomeNonThreadSafeService.class);