1
0
Fork 0
mirror of https://github.com/vanitasvitae/Smack.git synced 2025-09-09 00:59:39 +02:00

Add non-blocking send

This commit is contained in:
Florian Schmaus 2022-06-02 15:55:24 +02:00
parent 711d7d92bd
commit c77948bb91
9 changed files with 237 additions and 101 deletions

View file

@ -29,17 +29,19 @@ import org.jivesoftware.smack.AbstractXMPPConnection;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.SmackException.GenericConnectionException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackException.OutgoingQueueFullException;
import org.jivesoftware.smack.SmackException.SmackWrappedException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XMPPException.StreamErrorException;
import org.jivesoftware.smack.packet.Element;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Nonza;
import org.jivesoftware.smack.packet.Presence;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.packet.StanzaError;
import org.jivesoftware.smack.packet.TopLevelStreamElement;
import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
import org.jivesoftware.smack.util.Async;
import org.jivesoftware.smack.util.CloseableUtil;
import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smack.xml.XmlPullParser;
@ -90,6 +92,10 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection {
@SuppressWarnings("HidingField")
private final BOSHConfiguration config;
private final ArrayBlockingQueueWithShutdown<TopLevelStreamElement> outgoingQueue = new ArrayBlockingQueueWithShutdown<>(100, true);
private Thread writerThread;
// Some flags which provides some info about the current state.
private boolean isFirstInitialization = true;
private boolean done = false;
@ -194,11 +200,16 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection {
}
}
assert writerThread == null || !writerThread.isAlive();
outgoingQueue.start();
writerThread = Async.go(this::writeElements, this + " Writer");
// If there is no feedback, throw an remote server timeout error
if (!connected && !done) {
done = true;
String errorMessage = "Timeout reached for the connection to "
+ getHost() + ":" + getPort() + ".";
instantShutdown();
throw new SmackException.SmackMessageException(errorMessage);
}
@ -207,6 +218,7 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection {
"<stream:stream xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'/>");
onStreamOpen(parser);
} catch (XmlPullParserException | IOException e) {
instantShutdown();
throw new AssertionError("Failed to setup stream environment", e);
}
}
@ -234,40 +246,92 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection {
afterSuccessfulLogin(false);
}
@Override
public void sendNonza(Nonza element) throws NotConnectedException {
if (done) {
throw new NotConnectedException();
}
sendElement(element);
}
private volatile boolean writerThreadRunning;
@Override
protected void sendStanzaInternal(Stanza packet) throws NotConnectedException {
sendElement(packet);
}
private void sendElement(Element element) {
private void writeElements() {
writerThreadRunning = true;
try {
send(ComposableBody.builder().setPayloadXML(element.toXML(BOSH_URI).toString()).build());
if (element instanceof Stanza) {
firePacketSendingListeners((Stanza) element);
while (true) {
TopLevelStreamElement element;
try {
element = outgoingQueue.take();
} catch (InterruptedException e) {
LOGGER.log(Level.FINE,
"Writer thread exiting: Outgoing queue was shutdown as signalled by interrupted exception",
e);
return;
}
String xmlPayload = element.toXML(BOSH_URI).toString();
ComposableBody.Builder composableBodyBuilder = ComposableBody.builder().setPayloadXML(xmlPayload);
if (sessionID != null) {
BodyQName qName = BodyQName.create(BOSH_URI, "sid");
composableBodyBuilder.setAttribute(qName, sessionID);
}
ComposableBody composableBody = composableBodyBuilder.build();
try {
client.send(composableBody);
} catch (BOSHException e) {
LOGGER.log(Level.WARNING, this + " received BOSHException in writer thread, connection broke!", e);
// TODO: Signal the user that there was an unexpected exception.
return;
}
if (element instanceof Stanza) {
Stanza stanza = (Stanza) element;
firePacketSendingListeners(stanza);
}
}
}
catch (BOSHException e) {
LOGGER.log(Level.SEVERE, "BOSHException in sendStanzaInternal", e);
} catch (Exception exception) {
LOGGER.log(Level.WARNING, "BOSH writer thread threw", exception);
} finally {
writerThreadRunning = false;
notifyWaitingThreads();
}
}
@Override
protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException {
throwNotConnectedExceptionIfAppropriate();
try {
outgoingQueue.put(element);
} catch (InterruptedException e) {
throwNotConnectedExceptionIfAppropriate();
// If the method above did not throw, then the sending thread was interrupted
throw e;
}
}
@Override
protected void sendNonBlockingInternal(TopLevelStreamElement element)
throws NotConnectedException, OutgoingQueueFullException {
throwNotConnectedExceptionIfAppropriate();
boolean enqueued = outgoingQueue.offer(element);
if (!enqueued) {
throwNotConnectedExceptionIfAppropriate();
throw new OutgoingQueueFullException();
}
}
/**
* Closes the connection by setting presence to unavailable and closing the
* HTTP client. The shutdown logic will be used during a planned disconnection or when
* dealing with an unexpected disconnection. Unlike {@link #disconnect()} the connection's
* BOSH stanza reader will not be removed; thus connection's state is kept.
*
*/
@Override
protected void shutdown() {
instantShutdown();
}
@Override
public void instantShutdown() {
outgoingQueue.shutdown();
try {
boolean writerThreadTerminated = waitFor(() -> !writerThreadRunning);
if (!writerThreadTerminated) {
LOGGER.severe("Writer thread of " + this + " did not terminate timely");
}
} catch (InterruptedException e) {
LOGGER.log(Level.FINE, "Interrupted while waiting for writer thread to terminate", e);
}
if (client != null) {
try {
@ -275,20 +339,15 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection {
} catch (Exception e) {
LOGGER.log(Level.WARNING, "shutdown", e);
}
client = null;
}
instantShutdown();
}
@Override
public void instantShutdown() {
setWasAuthenticated();
sessionID = null;
done = true;
authenticated = false;
connected = false;
isFirstInitialization = false;
client = null;
// Close down the readers and writers.
CloseableUtil.maybeClose(readerPipe, LOGGER);
@ -410,14 +469,15 @@ public class XMPPBOSHConnection extends AbstractXMPPConnection {
// XMPP over BOSH is unusual when it comes to SASL authentication: Instead of sending a new stream open, it
// requires a special XML element ot be send after successful SASL authentication.
// See XEP-0206 § 5., especially the following is example 5 of XEP-0206.
ComposableBody composeableBody = ComposableBody.builder().setNamespaceDefinition("xmpp",
XMPPBOSHConnection.XMPP_BOSH_NS).setAttribute(
BodyQName.createWithPrefix(XMPPBOSHConnection.XMPP_BOSH_NS, "restart",
"xmpp"), "true").setAttribute(
BodyQName.create(XMPPBOSHConnection.BOSH_URI, "to"), getXMPPServiceDomain().toString()).build();
ComposableBody composeableBody = ComposableBody.builder()
.setNamespaceDefinition("xmpp", XMPPBOSHConnection.XMPP_BOSH_NS)
.setAttribute(BodyQName.createWithPrefix(XMPPBOSHConnection.XMPP_BOSH_NS, "restart", "xmpp"), "true")
.setAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "to"), getXMPPServiceDomain().toString())
.setAttribute(BodyQName.create(BOSH_URI, "sid"), sessionID)
.build();
try {
send(composeableBody);
client.send(composeableBody);
} catch (BOSHException e) {
// jbosh's exception API does not really match the one of Smack.
throw new SmackException.SmackWrappedException(e);