1
0
Fork 0
mirror of https://github.com/vanitasvitae/Smack.git synced 2025-09-09 00:59:39 +02:00

Rework WebSocket code

Related to SMACK-835.
This commit is contained in:
Florian Schmaus 2021-01-25 19:51:45 +01:00
parent 0c013e4f29
commit c5a546554b
38 changed files with 953 additions and 498 deletions

View file

@ -1,6 +1,6 @@
/**
*
* Copyright 2020 Aditya Borikar
* Copyright 2020 Aditya Borikar, 2021 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,7 +17,6 @@
package org.jivesoftware.smack.websocket.okhttp;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -27,39 +26,34 @@ import org.jivesoftware.smack.debugger.SmackDebugger;
import okhttp3.Headers;
import okhttp3.Response;
import org.jxmpp.xml.splitter.Utf8ByteXmppXmlSplitter;
import org.jxmpp.xml.splitter.XmlPrettyPrinter;
import org.jxmpp.xml.splitter.XmppXmlSplitter;
public final class LoggingInterceptor {
private static final Logger LOGGER = Logger.getAnonymousLogger();
private static final int MAX_ELEMENT_SIZE = 64 * 1024;
private final SmackDebugger debugger;
private final Utf8ByteXmppXmlSplitter incomingTextSplitter;
private final Utf8ByteXmppXmlSplitter outgoingTextSplitter;
private static final Logger LOGGER = Logger.getLogger(LoggingInterceptor.class.getName());
public LoggingInterceptor(SmackDebugger smackDebugger) {
private final SmackDebugger debugger;
private final XmppXmlSplitter incomingXmlSplitter;
private final XmppXmlSplitter outgoingXmlSplitter;
LoggingInterceptor(SmackDebugger smackDebugger) {
this.debugger = smackDebugger;
XmlPrettyPrinter incomingTextPrinter = XmlPrettyPrinter.builder()
.setPrettyWriter(sb -> debugger.incomingStreamSink(sb))
.setTabWidth(4)
.build();
XmppXmlSplitter incomingXmlSplitter = new XmppXmlSplitter(MAX_ELEMENT_SIZE, null,
incomingTextPrinter);
incomingTextSplitter = new Utf8ByteXmppXmlSplitter(incomingXmlSplitter);
incomingXmlSplitter = new XmppXmlSplitter(incomingTextPrinter);
XmlPrettyPrinter outgoingTextPrinter = XmlPrettyPrinter.builder()
.setPrettyWriter(sb -> debugger.outgoingStreamSink(sb))
.setTabWidth(4)
.build();
XmppXmlSplitter outgoingXmlSplitter = new XmppXmlSplitter(MAX_ELEMENT_SIZE, null,
outgoingTextPrinter);
outgoingTextSplitter = new Utf8ByteXmppXmlSplitter(outgoingXmlSplitter);
outgoingXmlSplitter = new XmppXmlSplitter(outgoingTextPrinter);
}
// Open response received here isn't in the form of an Xml an so, there isn't much to format.
public void interceptOpenResponse(Response response) {
void interceptOpenResponse(Response response) {
Headers headers = response.headers();
Iterator<?> iterator = headers.iterator();
StringBuilder sb = new StringBuilder();
@ -70,18 +64,18 @@ public final class LoggingInterceptor {
debugger.incomingStreamSink(sb);
}
public void interceptReceivedText(String text) {
void interceptReceivedText(String text) {
try {
incomingTextSplitter.write(text.getBytes(Charset.defaultCharset()));
incomingXmlSplitter.write(text);
} catch (IOException e) {
// Connections shouldn't be terminated due to exceptions encountered during debugging. hence only log them.
LOGGER.log(Level.WARNING, "IOException encountered while parsing received text: " + text, e);
}
}
public void interceptSentText(String text) {
void interceptSentText(String text) {
try {
outgoingTextSplitter.write(text.getBytes(Charset.defaultCharset()));
outgoingXmlSplitter.write(text);
} catch (IOException e) {
// Connections shouldn't be terminated due to exceptions encountered during debugging, hence only log them.
LOGGER.log(Level.WARNING, "IOException encountered while parsing outgoing text: " + text, e);

View file

@ -1,6 +1,6 @@
/**
*
* Copyright 2020 Aditya Borikar
* Copyright 2020 Aditya Borikar, 2020-2021 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,22 +16,17 @@
*/
package org.jivesoftware.smack.websocket.okhttp;
import java.io.IOException;
import java.net.URI;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.ssl.SSLSession;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.SmackFuture;
import org.jivesoftware.smack.c2s.internal.ModularXmppClientToServerConnectionInternal;
import org.jivesoftware.smack.packet.TopLevelStreamElement;
import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smack.websocket.WebSocketException;
import org.jivesoftware.smack.websocket.elements.WebSocketOpenElement;
import org.jivesoftware.smack.websocket.impl.AbstractWebSocket;
import org.jivesoftware.smack.websocket.rce.WebSocketRemoteConnectionEndpoint;
import org.jivesoftware.smack.xml.XmlPullParserException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@ -43,135 +38,119 @@ public final class OkHttpWebSocket extends AbstractWebSocket {
private static final Logger LOGGER = Logger.getLogger(OkHttpWebSocket.class.getName());
private static OkHttpClient okHttpClient = null;
private static final OkHttpClient okHttpClient = new OkHttpClient();
// This is a potential candidate to be placed into AbstractWebSocket, but I keep it here until smack-websocket-java11
// arrives.
private final SmackFuture.InternalSmackFuture<AbstractWebSocket, Exception> future = new SmackFuture.InternalSmackFuture<>();
private final ModularXmppClientToServerConnectionInternal connectionInternal;
private final LoggingInterceptor interceptor;
private String openStreamHeader;
private WebSocket currentWebSocket;
private WebSocketConnectionPhase phase;
private WebSocketRemoteConnectionEndpoint connectedEndpoint;
private final WebSocket okHttpWebSocket;
public OkHttpWebSocket(ModularXmppClientToServerConnectionInternal connectionInternal) {
this.connectionInternal = connectionInternal;
public OkHttpWebSocket(WebSocketRemoteConnectionEndpoint endpoint,
ModularXmppClientToServerConnectionInternal connectionInternal) {
super(endpoint, connectionInternal);
if (okHttpClient == null) {
// Creates an instance of okHttp client.
OkHttpClient.Builder builder = new OkHttpClient.Builder();
okHttpClient = builder.build();
}
// Add some mechanism to enable and disable this interceptor.
if (connectionInternal.smackDebugger != null) {
interceptor = new LoggingInterceptor(connectionInternal.smackDebugger);
} else {
interceptor = null;
}
}
@Override
public void connect(WebSocketRemoteConnectionEndpoint endpoint) throws InterruptedException, SmackException, XMPPException {
final String currentUri = endpoint.getWebSocketEndpoint().toString();
final URI uri = endpoint.getUri();
final String url = uri.toString();
Request request = new Request.Builder()
.url(currentUri)
.url(url)
.header("Sec-WebSocket-Protocol", "xmpp")
.build();
WebSocketListener listener = new WebSocketListener() {
okHttpWebSocket = okHttpClient.newWebSocket(request, listener);
}
@Override
public void onOpen(WebSocket webSocket, Response response) {
LOGGER.log(Level.FINER, "WebSocket is open");
phase = WebSocketConnectionPhase.openFrameSent;
if (interceptor != null) {
interceptor.interceptOpenResponse(response);
}
send(new WebSocketOpenElement(connectionInternal.connection.getXMPPServiceDomain()));
private final WebSocketListener listener = new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
LOGGER.log(Level.FINER, "OkHttp invoked onOpen() for {0}. Response: {1}",
new Object[] { webSocket, response });
if (interceptor != null) {
interceptor.interceptOpenResponse(response);
}
@Override
public void onMessage(WebSocket webSocket, String text) {
if (interceptor != null) {
interceptor.interceptReceivedText(text);
}
if (isCloseElement(text)) {
connectionInternal.onStreamClosed();
return;
}
future.setResult(OkHttpWebSocket.this);
}
String closingStream = "</stream>";
switch (phase) {
case openFrameSent:
if (isOpenElement(text)) {
// Converts the <open> element received into <stream> element.
openStreamHeader = getStreamFromOpenElement(text);
phase = WebSocketConnectionPhase.exchangingTopLevelStreamElements;
try {
connectionInternal.onStreamOpen(PacketParserUtils.getParserFor(openStreamHeader));
} catch (XmlPullParserException | IOException e) {
LOGGER.log(Level.WARNING, "Exception caught:", e);
}
} else {
LOGGER.log(Level.WARNING, "Unexpected Frame received", text);
}
break;
case exchangingTopLevelStreamElements:
connectionInternal.parseAndProcessElement(openStreamHeader + text + closingStream);
break;
default:
LOGGER.log(Level.INFO, "Default text: " + text);
}
@Override
public void onMessage(WebSocket webSocket, String text) {
if (interceptor != null) {
interceptor.interceptReceivedText(text);
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
LOGGER.log(Level.INFO, "Exception caught", t);
WebSocketException websocketException = new WebSocketException(t);
if (connectionInternal.connection.isConnected()) {
connectionInternal.notifyConnectionError(websocketException);
} else {
connectionInternal.setCurrentConnectionExceptionAndNotify(websocketException);
}
onIncomingWebSocketElement(text);
}
@Override
public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
LOGGER.log(Level.FINER, "OkHttp invoked onFailure() for " + webSocket + ". Response: " + response, throwable);
WebSocketException websocketException = new WebSocketException(throwable);
// If we are already connected, then we need to notify the connection that it got tear down. Otherwise we
// need to notify the thread calling connect() that the connection failed.
if (future.wasSuccessful()) {
connectionInternal.notifyConnectionError(websocketException);
} else {
future.setException(websocketException);
}
};
}
// Creates an instance of websocket through okHttpClient.
currentWebSocket = okHttpClient.newWebSocket(request, listener);
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
LOGGER.log(Level.FINER, "OkHttp invoked onClosing() for " + webSocket + ". Code: " + code + ". Reason: " + reason);
}
// Open a new stream and wait until features are received.
connectionInternal.waitForFeaturesReceived("Waiting to receive features");
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
LOGGER.log(Level.FINER, "OkHttp invoked onClosed() for " + webSocket + ". Code: " + code + ". Reason: " + reason);
}
connectedEndpoint = endpoint;
};
@Override
public SmackFuture<AbstractWebSocket, Exception> getFuture() {
return future;
}
@Override
public void send(TopLevelStreamElement element) {
String textToBeSent = element.toXML().toString();
public void send(String element) {
if (interceptor != null) {
interceptor.interceptSentText(textToBeSent);
interceptor.interceptSentText(element);
}
currentWebSocket.send(textToBeSent);
okHttpWebSocket.send(element);
}
@Override
public void disconnect(int code, String message) {
currentWebSocket.close(code, message);
LOGGER.log(Level.INFO, "WebSocket has been closed with message: " + message);
LOGGER.log(Level.INFO, "WebSocket closing with code: " + code + " and message: " + message);
okHttpWebSocket.close(code, message);
}
@Override
public boolean isConnectionSecure() {
return connectedEndpoint.isSecureEndpoint();
return endpoint.isSecureEndpoint();
}
@Override
public boolean isConnected() {
return connectedEndpoint == null ? false : true;
// TODO: Do we need this method at all if we create an AbstractWebSocket object for every endpoint?
return true;
}
@Override
public SSLSession getSSLSession() {
// TODO: What shall we do about this method, as it appears that OkHttp does not provide access to the used SSLSession?
return null;
}
}

View file

@ -1,6 +1,6 @@
/**
*
* Copyright 2020 Florian Schmaus.
* Copyright 2020-2021 Florian Schmaus.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,12 +19,13 @@ package org.jivesoftware.smack.websocket.okhttp;
import org.jivesoftware.smack.c2s.internal.ModularXmppClientToServerConnectionInternal;
import org.jivesoftware.smack.websocket.impl.AbstractWebSocket;
import org.jivesoftware.smack.websocket.impl.WebSocketFactory;
import org.jivesoftware.smack.websocket.rce.WebSocketRemoteConnectionEndpoint;
public class OkHttpWebSocketFactory implements WebSocketFactory {
@Override
public AbstractWebSocket create(ModularXmppClientToServerConnectionInternal connectionInternal) {
return new OkHttpWebSocket(connectionInternal);
public AbstractWebSocket create(WebSocketRemoteConnectionEndpoint endpoint, ModularXmppClientToServerConnectionInternal connectionInternal) {
return new OkHttpWebSocket(endpoint, connectionInternal);
}
}

View file

@ -1,6 +1,6 @@
/**
*
* Copyright 2020 Florian Schmaus.
* Copyright 2020-2021 Florian Schmaus.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,8 @@
*/
package org.jivesoftware.smack.websocket.okhttp;
import java.net.URISyntaxException;
import org.jivesoftware.smack.websocket.test.WebSocketFactoryServiceTestUtil;
import org.junit.jupiter.api.Test;
@ -23,7 +25,7 @@ import org.junit.jupiter.api.Test;
public class OkHttpWebSocketFactoryServiceTest {
@Test
public void createWebSocketTest() {
public void createWebSocketTest() throws URISyntaxException {
WebSocketFactoryServiceTestUtil.createWebSocketTest(OkHttpWebSocket.class);
}