1
0
Fork 0
mirror of https://github.com/vanitasvitae/Smack.git synced 2025-09-09 17:19:39 +02:00

Remove SynchronizationPoint

This continues the design started with e98d42790 ("SmackReactor/NIO,
Java8/Android19, Pretty print XML, FSM connections"), where the
exceptions that caused an operation to fail, are not recorded within
SynchronizationPoint but within the connection instance itself.
This commit is contained in:
Florian Schmaus 2020-05-26 21:39:08 +02:00
parent b1a4ccfae8
commit 57961a8cc1
15 changed files with 352 additions and 684 deletions

View file

@ -26,10 +26,9 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.jivesoftware.smack.SmackException.ConnectionException;
import org.jivesoftware.smack.SmackException.EndpointConnectionException;
import org.jivesoftware.smack.SynchronizationPoint;
import org.jivesoftware.smack.c2s.internal.ModularXmppClientToServerConnectionInternal;
import org.jivesoftware.smack.fsm.StateTransitionResult;
import org.jivesoftware.smack.tcp.XmppTcpTransportModule.EstablishingTcpConnectionState;
import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint;
import org.jivesoftware.smack.util.Async;
@ -48,7 +47,10 @@ public final class ConnectionAttemptState {
final SocketChannel socketChannel;
final List<RemoteConnectionException<?>> connectionExceptions;
final SynchronizationPoint<ConnectionException> tcpConnectionEstablishedSyncPoint;
EndpointConnectionException connectionException;
boolean connected;
long deadline;
final Iterator<Rfc6120TcpRemoteConnectionEndpoint> connectionEndpointIterator;
/** The current connection endpoint we are trying */
@ -65,17 +67,32 @@ public final class ConnectionAttemptState {
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
connectionEndpointIterator = discoveredEndpoints.result.discoveredRemoteConnectionEndpoints.iterator();
List<Rfc6120TcpRemoteConnectionEndpoint> endpoints = discoveredEndpoints.result.discoveredRemoteConnectionEndpoints;
connectionEndpointIterator = endpoints.iterator();
connectionEndpoint = connectionEndpointIterator.next();
connectionExceptions = new ArrayList<>(discoveredEndpoints.result.discoveredRemoteConnectionEndpoints.size());
tcpConnectionEstablishedSyncPoint = new SynchronizationPoint<>(connectionInternal.connection,
"TCP connection establishment");
connectionExceptions = new ArrayList<>(endpoints.size());
}
void establishTcpConnection() {
StateTransitionResult.Failure establishTcpConnection() throws InterruptedException {
RemoteConnectionEndpoint.InetSocketAddressCoupling<Rfc6120TcpRemoteConnectionEndpoint> address = nextAddress();
establishTcpConnection(address);
synchronized (this) {
while (!connected && connectionException == null) {
final long now = System.currentTimeMillis();
if (now >= deadline) {
return new StateTransitionResult.FailureCausedByTimeout("Timeout waiting to establish connection");
}
wait (deadline - now);
}
}
if (connected) {
assert connectionException == null;
// Success case: we have been able to establish a connection to one remote endpoint.
return null;
}
return new StateTransitionResult.FailureCausedByException<Exception>(connectionException);
}
private void establishTcpConnection(
@ -84,8 +101,10 @@ public final class ConnectionAttemptState {
establishingTcpConnectionState, address);
connectionInternal.invokeConnectionStateMachineListener(connectingToHostEvent);
final boolean connected;
final InetSocketAddress inetSocketAddress = address.getInetSocketAddress();
// TODO: Should use "connect timeout" instead of reply timeout. But first connect timeout needs to be moved from
// XMPPTCPConnectionConfiguration. into XMPPConnectionConfiguration.
deadline = System.currentTimeMillis() + connectionInternal.connection.getReplyTimeout();
try {
connected = socketChannel.connect(inetSocketAddress);
} catch (IOException e) {
@ -98,7 +117,9 @@ public final class ConnectionAttemptState {
establishingTcpConnectionState, address, true);
connectionInternal.invokeConnectionStateMachineListener(connectedToHostEvent);
tcpConnectionEstablishedSyncPoint.reportSuccess();
synchronized (this) {
notifyAll();
}
return;
}
@ -124,9 +145,10 @@ public final class ConnectionAttemptState {
establishingTcpConnectionState, address, false);
connectionInternal.invokeConnectionStateMachineListener(connectedToHostEvent);
// Do not set 'state' here, since this is processed by a reactor thread, which doesn't hold
// the objects lock.
tcpConnectionEstablishedSyncPoint.reportSuccess();
connected = true;
synchronized (ConnectionAttemptState.this) {
notifyAll();
}
});
} catch (ClosedChannelException e) {
onIOExceptionWhenEstablishingTcpConnection(e, address);
@ -137,14 +159,14 @@ public final class ConnectionAttemptState {
RemoteConnectionEndpoint.InetSocketAddressCoupling<Rfc6120TcpRemoteConnectionEndpoint> failedAddress) {
RemoteConnectionEndpoint.InetSocketAddressCoupling<Rfc6120TcpRemoteConnectionEndpoint> nextInetSocketAddress = nextAddress();
if (nextInetSocketAddress == null) {
EndpointConnectionException connectionException = EndpointConnectionException.from(
connectionException = EndpointConnectionException.from(
discoveredEndpoints.result.lookupFailures, connectionExceptions);
tcpConnectionEstablishedSyncPoint.reportFailure(connectionException);
synchronized (this) {
notifyAll();
}
return;
}
tcpConnectionEstablishedSyncPoint.resetTimeout();
RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(
failedAddress, exception);
connectionExceptions.add(rce);

View file

@ -26,11 +26,6 @@ import java.io.Writer;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collection;
@ -44,7 +39,6 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
@ -65,13 +59,12 @@ import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
import org.jivesoftware.smack.SmackException.ConnectionException;
import org.jivesoftware.smack.SmackException.EndpointConnectionException;
import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackException.NotLoggedInException;
import org.jivesoftware.smack.SmackException.SecurityNotPossibleException;
import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
import org.jivesoftware.smack.SmackFuture;
import org.jivesoftware.smack.StanzaListener;
import org.jivesoftware.smack.SynchronizationPoint;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XMPPException.FailedNonzaException;
@ -151,8 +144,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
private SSLSocket secureSocket;
private final Semaphore readerWriterSemaphore = new Semaphore(2);
/**
* Protected access level because of unit test purposes
*/
@ -166,14 +157,12 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
/**
*
*/
private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>(
this, "stream compression feature");
private boolean streamFeaturesAfterAuthenticationReceived;
/**
*
*/
private final SynchronizationPoint<SmackException> compressSyncPoint = new SynchronizationPoint<>(
this, "stream compression");
private boolean compressSyncPoint;
/**
* The default bundle and defer callback, used for new connections.
@ -197,15 +186,14 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
/**
* The stream ID of the stream that is currently resumable, ie. the stream we hold the state
* for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
* {@link #unacknowledgedStanzas}.
* {@link #unFailedNonzaExceptionacknowledgedStanzas}.
*/
private String smSessionId;
private final SynchronizationPoint<FailedNonzaException> smResumedSyncPoint = new SynchronizationPoint<>(
this, "stream resumed element");
private SyncPointState smResumedSyncPoint;
private Failed smResumptionFailed;
private final SynchronizationPoint<SmackException> smEnabledSyncPoint = new SynchronizationPoint<>(
this, "stream enabled element");
private boolean smEnabledSyncPoint;
/**
* The client's preferred maximum resumption time in seconds.
@ -381,15 +369,17 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// Wait for stream features after the authentication.
// TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
// renamed to "streamFeaturesAfterAuthenticationReceived".
maybeCompressFeaturesReceived.checkIfSuccessOrWait();
waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
// If compression is enabled then request the server to use stream compression. XEP-170
// recommends to perform stream compression before resource binding.
maybeEnableCompression();
if (isSmResumptionPossible()) {
smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId));
if (smResumedSyncPoint.wasSuccessful()) {
smResumedSyncPoint = SyncPointState.request_sent;
sendNonza(new Resume(clientHandledStanzasCount, smSessionId));
waitForCondition(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream");
if (smResumedSyncPoint == SyncPointState.successful) {
// We successfully resumed the stream, be done here
afterSuccessfulLogin(true);
return;
@ -397,7 +387,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// SM resumption failed, what Smack does here is to report success of
// lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
// normal resource binding can be tried.
LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process");
assert smResumptionFailed != null;
LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed);
}
List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
@ -421,9 +412,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
if (isSmAvailable() && useSm) {
// Remove what is maybe left from previously stream managed sessions
serverHandledStanzasCount = 0;
sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime));
// XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
// then this is a non recoverable error and we therefore throw an exception.
smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime));
waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement");
synchronized (requestAckPredicates) {
if (requestAckPredicates.isEmpty()) {
// Assure that we have at lest one predicate set up that so that we request acks
@ -503,9 +495,14 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
setWasAuthenticated();
// Wait for reader and writer threads to be terminated.
readerWriterSemaphore.acquireUninterruptibly(2);
readerWriterSemaphore.release(2);
try {
boolean readerAndWriterThreadsTermianted = waitForCondition(() -> !packetWriter.running && !packetReader.running);
if (!readerAndWriterThreadsTermianted) {
LOGGER.severe("Reader and writer threads did not terminate");
}
} catch (InterruptedException e) {
LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e);
}
if (disconnectedButResumeable) {
return;
@ -536,10 +533,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
@Override
protected void initState() {
super.initState();
maybeCompressFeaturesReceived.init();
compressSyncPoint.init();
smResumedSyncPoint.init();
smEnabledSyncPoint.init();
streamFeaturesAfterAuthenticationReceived = compressSyncPoint = false;
smResumedSyncPoint = SyncPointState.initial;
smResumptionFailed = null;
smEnabledSyncPoint = false;
}
@Override
@ -652,15 +649,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// Set the reader and writer instance variables
initReaderAndWriter();
int availableReaderWriterSemaphorePermits = readerWriterSemaphore.availablePermits();
if (availableReaderWriterSemaphorePermits < 2) {
Object[] logObjects = new Object[] {
this,
availableReaderWriterSemaphorePermits,
};
LOGGER.log(Level.FINE, "Not every reader/writer threads where terminated on connection re-initializtion of {0}. Available permits {1}", logObjects);
}
readerWriterSemaphore.acquire(2);
// Start the writer thread. This will open an XMPP stream to the server
packetWriter.init();
// Start the reader thread. The startup() method will block until we
@ -688,17 +676,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* existing plain connection and perform a handshake. This method won't return until the
* connection has finished the handshake or an error occurred while securing the connection.
* @throws IOException if an I/O error occurred.
* @throws CertificateException
* @throws NoSuchAlgorithmException if no such algorithm is available.
* @throws NoSuchProviderException
* @throws KeyStoreException
* @throws UnrecoverableKeyException
* @throws KeyManagementException if there was a key mangement error.
* @throws SmackException if Smack detected an exceptional situation.
* @throws Exception if an exception occurs.
* @throws SecurityNotPossibleException if TLS is not possible.
* @throws CertificateException if there is an issue with the certificate.
*/
@SuppressWarnings("LiteralClassName")
private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException {
private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException {
SmackTlsContext smackTlsContext = getSmackTlsContext();
Socket plain = socket;
@ -773,7 +755,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
@Override
public boolean isUsingCompression() {
return compressionHandler != null && compressSyncPoint.wasSuccessful();
return compressionHandler != null && compressSyncPoint;
}
/**
@ -791,10 +773,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
*
* @throws NotConnectedException if the XMPP connection is not connected.
* @throws SmackException if Smack detected an exceptional situation.
* @throws NoResponseException if there was no response from the remote entity.
* @throws InterruptedException if the calling thread was interrupted.
* @throws XMPPException if an XMPP protocol error was received.
*/
private void maybeEnableCompression() throws SmackException, InterruptedException {
private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException {
if (!config.isCompressionEnabled()) {
return;
}
@ -807,7 +789,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// If stream compression was offered by the server and we want to use
// compression then send compression request to the server
if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod()));
sendNonza(new Compress(compressionHandler.getCompressionMethod()));
waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression");
} else {
LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
}
@ -835,11 +818,11 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// We connected successfully to the servers TCP port
initConnection();
// TLS handled will be successful either if TLS was established, or if it was not mandatory.
tlsHandled.checkIfSuccessOrWaitOrThrow();
// TLS handled will be true either if TLS was established, or if it was not mandatory.
waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS");
// Wait with SASL auth until the SASL mechanisms have been received
saslFeatureReceived.checkIfSuccessOrWaitOrThrow();
waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server");
}
/**
@ -857,24 +840,28 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
if (startTlsFeature != null) {
if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
SecurityRequiredByServerException smackException = new SecurityRequiredByServerException();
tlsHandled.reportFailure(smackException);
currentSmackException = smackException;
notifyWaitingThreads();
throw smackException;
}
if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
sendNonza(new StartTls());
} else {
tlsHandled.reportSuccess();
tlsHandled = true;
notifyWaitingThreads();
}
} else {
tlsHandled.reportSuccess();
tlsHandled = true;
notifyWaitingThreads();
}
if (isSaslAuthenticated()) {
// If we have received features after the SASL has been successfully completed, then we
// have also *maybe* received, as it is an optional feature, the compression feature
// from the server.
maybeCompressFeaturesReceived.reportSuccess();
streamFeaturesAfterAuthenticationReceived = true;
notifyWaitingThreads();
}
}
@ -899,6 +886,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
private volatile boolean done;
private boolean running;
/**
* Initializes the reader in order to be used. The reader is initialized during the
* first connection and when reconnecting due to an abruptly disconnection.
@ -910,11 +899,13 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
@Override
public void run() {
LOGGER.finer(threadName + " start");
running = true;
try {
parsePackets();
} finally {
LOGGER.finer(threadName + " exit");
XMPPTCPConnection.this.readerWriterSemaphore.release();
running = false;
notifyWaitingThreads();
}
}
}, threadName);
@ -931,10 +922,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* Parse top-level packets in order to process them further.
*/
private void parsePackets() {
boolean initialStreamOpenSend = false;
try {
openStreamAndResetParser();
initialStreamOpenSend = true;
XmlPullParser.Event eventType = parser.getEventType();
while (!done) {
switch (eventType) {
@ -955,27 +944,21 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
break;
case "error":
StreamError streamError = PacketParserUtils.parseStreamError(parser);
saslFeatureReceived.reportFailure(new StreamErrorException(streamError));
currentXmppException = new StreamErrorException(streamError);
// Mark the tlsHandled sync point as success, we will use the saslFeatureReceived sync
// point to report the error, which is checked immediately after tlsHandled in
// connectInternal().
tlsHandled.reportSuccess();
tlsHandled = true;
notifyWaiters();
throw new StreamErrorException(streamError);
case "features":
parseFeaturesAndNotify(parser);
break;
case "proceed":
try {
// Secure the connection by negotiating TLS
proceedTLSReceived();
// Send a new opening stream to the server
openStreamAndResetParser();
}
catch (Exception e) {
SmackException.SmackWrappedException smackException = new SmackException.SmackWrappedException(e);
tlsHandled.reportFailure(smackException);
throw e;
}
// Secure the connection by negotiating TLS
proceedTLSReceived();
// Send a new opening stream to the server
openStreamAndResetParser();
break;
case "failure":
String namespace = parser.getNamespace(null);
@ -989,8 +972,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// situation. It is still possible to authenticate and
// use the connection but using an uncompressed connection
// TODO Parse failure stanza
compressSyncPoint.reportFailure(new SmackException.SmackMessageException(
"Could not establish compression"));
currentSmackException = new SmackException.SmackMessageException("Could not establish compression");
notifyWaitingThreads();
break;
default:
parseAndProcessNonza(parser);
@ -1004,7 +987,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// Send a new opening stream to the server
openStreamAndResetParser();
// Notify that compression is being used
compressSyncPoint.reportSuccess();
compressSyncPoint = true;
notifyWaitingThreads();
break;
case Enabled.ELEMENT:
Enabled enabled = ParseStreamManagement.enabled(parser);
@ -1012,7 +996,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
smSessionId = enabled.getId();
if (StringUtils.isNullOrEmpty(smSessionId)) {
SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received");
smEnabledSyncPoint.reportFailure(xmppException);
setCurrentConnectionExceptionAndNotify(xmppException);
throw xmppException;
}
smServerMaxResumptionTime = enabled.getMaxResumptionTime();
@ -1022,28 +1006,18 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
}
clientHandledStanzasCount = 0;
smWasEnabledAtLeastOnce = true;
smEnabledSyncPoint.reportSuccess();
LOGGER.fine("Stream Management (XEP-198): successfully enabled");
smEnabledSyncPoint = true;
notifyWaitingThreads();
break;
case Failed.ELEMENT:
Failed failed = ParseStreamManagement.failed(parser);
FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
// If only XEP-198 would specify different failure elements for the SM
// enable and SM resume failure case. But this is not the case, so we
// need to determine if this is a 'Failed' response for either 'Enable'
// or 'Resume'.
if (smResumedSyncPoint.requestSent()) {
smResumedSyncPoint.reportFailure(xmppException);
}
else {
if (!smEnabledSyncPoint.requestSent()) {
throw new IllegalStateException("Failed element received but SM was not previously enabled");
}
smEnabledSyncPoint.reportFailure(new SmackException.SmackWrappedException(xmppException));
// Report success for last lastFeaturesReceived so that in case a
// failed resumption, we can continue with normal resource binding.
// See text of XEP-198 5. below Example 11.
lastFeaturesReceived.reportSuccess();
if (smResumedSyncPoint == SyncPointState.request_sent) {
// This is a <failed/> nonza in a response to resuming a previous stream, failure to do
// so is non-fatal as we can simply continue with resource binding in this case.
smResumptionFailed = failed;
} else {
FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
setCurrentConnectionExceptionAndNotify(xmppException);
}
break;
case Resumed.ELEMENT:
@ -1052,7 +1026,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
}
// Mark SM as enabled
smEnabledSyncPoint.reportSuccess();
smEnabledSyncPoint = true;
// First, drop the stanzas already handled by the server
processHandledCount(resumed.getHandledCount());
// Then re-send what is left in the unacknowledged queue
@ -1068,8 +1042,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
requestSmAcknowledgementInternal();
}
// Mark SM resumption as successful
smResumedSyncPoint.reportSuccess();
LOGGER.fine("Stream Management (XEP-198): Stream resumed");
smResumedSyncPoint = SyncPointState.successful;
notifyWaitingThreads();
break;
case AckAnswer.ELEMENT:
AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
@ -1077,7 +1051,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
break;
case AckRequest.ELEMENT:
ParseStreamManagement.ackRequest(parser);
if (smEnabledSyncPoint.wasSuccessful()) {
if (smEnabledSyncPoint) {
sendSmAcknowledgementInternal();
} else {
LOGGER.warning("SM Ack Request received while SM is not enabled");
@ -1101,7 +1075,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
// did re-start the queue again, causing this writer to assume that the queue is not
// shutdown, which results in a call to disconnect().
final boolean queueWasShutdown = packetWriter.queue.isShutdown();
closingStreamReceived.reportSuccess();
closingStreamReceived = true;
notifyWaitingThreads();
if (queueWasShutdown) {
// We received a closing stream element *after* we initiated the
@ -1137,12 +1112,9 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
}
}
catch (Exception e) {
// TODO: Move the call closingStreamReceived.reportFailure(e) into notifyConnectionError?
closingStreamReceived.reportFailure(e);
// The exception can be ignored if the the connection is 'done'
// or if the it was caused because the socket got closed. It can not be ignored if it
// happened before (or while) the initial stream opened was send.
if (!(done || packetWriter.queue.isShutdown()) || !initialStreamOpenSend) {
// or if the it was caused because the socket got closed.
if (!done) {
// Close the connection and notify connection listeners of the
// error.
notifyConnectionError(e);
@ -1161,12 +1133,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
QUEUE_SIZE, true);
/**
* Needs to be protected for unit testing purposes.
*/
protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<>(
XMPPTCPConnection.this, "shutdown completed");
/**
* If set, the stanza writer is shut down
*/
@ -1184,12 +1150,13 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
*/
private boolean shouldBundleAndDefer;
private boolean running;
/**
* Initializes the writer in order to be used. It is called at the first connection and also
* is invoked if the connection is disconnected by an error.
*/
void init() {
shutdownDone.init();
shutdownTimestamp = null;
if (unacknowledgedStanzas != null) {
@ -1204,11 +1171,13 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
@Override
public void run() {
LOGGER.finer(threadName + " start");
running = true;
try {
writePackets();
} finally {
LOGGER.finer(threadName + " exit");
XMPPTCPConnection.this.readerWriterSemaphore.release();
running = false;
notifyWaitingThreads();
}
}
}, threadName);
@ -1262,13 +1231,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
instantShutdown = instant;
queue.shutdown();
shutdownTimestamp = System.currentTimeMillis();
if (shutdownDone.isNotInInitialState()) {
try {
shutdownDone.checkIfSuccessOrWait();
} catch (NoResponseException | InterruptedException e) {
LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e);
}
}
}
/**
@ -1407,9 +1369,6 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
} else {
LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
}
} finally {
LOGGER.fine("Reporting shutdownDone success in writer thread");
shutdownDone.reportSuccess();
}
// Delay notifyConnectionError after shutdownDone has been reported in the finally block.
if (writerException != null) {
@ -1721,7 +1680,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* @return true if Stream Management was negotiated.
*/
public boolean isSmEnabled() {
return smEnabledSyncPoint.wasSuccessful();
return smEnabledSyncPoint;
}
/**
@ -1730,7 +1689,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
* @return true if the stream was resumed.
*/
public boolean streamWasResumed() {
return smResumedSyncPoint.wasSuccessful();
return smResumedSyncPoint == SyncPointState.successful;
}
/**

View file

@ -47,17 +47,13 @@ import javax.net.ssl.SSLSession;
import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.SmackException.ConnectionException;
import org.jivesoftware.smack.SmackException.ConnectionUnexpectedTerminatedException;
import org.jivesoftware.smack.SmackException.NoResponseException;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
import org.jivesoftware.smack.SmackException.SmackWrappedException;
import org.jivesoftware.smack.SmackException.SmackCertificateException;
import org.jivesoftware.smack.SmackFuture;
import org.jivesoftware.smack.SmackFuture.InternalSmackFuture;
import org.jivesoftware.smack.SmackReactor.SelectionKeyAttachment;
import org.jivesoftware.smack.XMPPException.FailedNonzaException;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.XmppInputOutputFilter;
import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.ConnectedButUnauthenticatedStateDescriptor;
import org.jivesoftware.smack.c2s.ModularXmppClientToServerConnection.LookupRemoteConnectionEndpointsStateDescriptor;
@ -744,23 +740,14 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
@Override
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
throws InterruptedException, ConnectionUnexpectedTerminatedException, NotConnectedException,
NoResponseException, IOException {
throws InterruptedException, IOException, SmackException, XMPPException {
// The fields inetSocketAddress and failedAddresses are handed over from LookupHostAddresses to
// ConnectingToHost.
ConnectionAttemptState connectionAttemptState = new ConnectionAttemptState(connectionInternal, discoveredTcpEndpoints,
this);
connectionAttemptState.establishTcpConnection();
try {
connectionAttemptState.tcpConnectionEstablishedSyncPoint.checkIfSuccessOrWaitOrThrow();
} catch (ConnectionException | NoResponseException e) {
// TODO: It is not really elegant that we catch the exception here. Ideally ConnectionAttemptState would
// simply return a StateTranstionResult.FailureCausedByException.
return new StateTransitionResult.FailureCausedByException<>(e);
} catch (SmackWrappedException e) {
// Should never throw SmackWrappedException.
throw new AssertionError(e);
StateTransitionResult.Failure failure = connectionAttemptState.establishTcpConnection();
if (failure != null) {
return failure;
}
socketChannel = connectionAttemptState.socketChannel;
@ -858,8 +845,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
@Override
public StateTransitionResult.AttemptResult transitionInto(WalkStateGraphContext walkStateGraphContext)
throws SmackWrappedException, FailedNonzaException, IOException, InterruptedException,
ConnectionUnexpectedTerminatedException, NoResponseException, NotConnectedException {
throws IOException, InterruptedException, SmackException, XMPPException {
connectionInternal.sendAndWaitForResponse(StartTls.INSTANCE, TlsProceed.class, TlsFailure.class);
SmackTlsContext smackTlsContext = connectionInternal.getSmackTlsContext();
@ -881,7 +867,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
try {
tlsState.waitForHandshakeFinished();
} catch (CertificateException e) {
throw new SmackWrappedException(e);
throw new SmackCertificateException(e);
}
connectionInternal.newStreamOpenWaitForFeaturesSequence("stream features after TLS established");
@ -1166,43 +1152,20 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
private void handleSslException(SSLException e) {
handshakeException = e;
handshakeStatus = TlsHandshakeStatus.failed;
synchronized (this) {
notifyAll();
}
connectionInternal.notifyWaitingThreads();
}
private void onHandshakeFinished() {
handshakeStatus = TlsHandshakeStatus.successful;
synchronized (this) {
notifyAll();
}
connectionInternal.notifyWaitingThreads();
}
private boolean isHandshakeFinished() {
return handshakeStatus == TlsHandshakeStatus.successful || handshakeStatus == TlsHandshakeStatus.failed;
}
private void waitForHandshakeFinished() throws InterruptedException, CertificateException, SSLException, ConnectionUnexpectedTerminatedException, NoResponseException {
final long deadline = System.currentTimeMillis() + connectionInternal.connection.getReplyTimeout();
Exception currentConnectionException = null;
synchronized (this) {
while (!isHandshakeFinished()
&& (currentConnectionException = connectionInternal.getCurrentConnectionException()) == null) {
final long now = System.currentTimeMillis();
if (now >= deadline)
break;
wait(deadline - now);
}
}
if (currentConnectionException != null) {
throw new SmackException.ConnectionUnexpectedTerminatedException(currentConnectionException);
}
if (!isHandshakeFinished()) {
throw NoResponseException.newWith(connectionInternal.connection, "TLS handshake to finish");
}
private void waitForHandshakeFinished() throws InterruptedException, CertificateException, SSLException, SmackException, XMPPException {
connectionInternal.waitForCondition(() -> isHandshakeFinished(), "TLS handshake to finish");
if (handshakeStatus == TlsHandshakeStatus.failed) {
throw handshakeException;
@ -1235,7 +1198,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
@Override
public void waitUntilInputOutputClosed() throws IOException, CertificateException, InterruptedException,
ConnectionUnexpectedTerminatedException, NoResponseException {
SmackException, XMPPException {
waitForHandshakeFinished();
}

View file

@ -1,6 +1,6 @@
/**
*
* Copyright 2014-2019 Florian Schmaus
* Copyright 2014-2020 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -124,8 +124,6 @@ public class PacketWriterTest {
// Not really cool, but may increases the chances for 't' to block in sendStanza.
Thread.sleep(250);
// Set to true for testing purposes, so that shutdown() won't wait packet writer
pw.shutdownDone.reportSuccess();
// Shutdown the packetwriter, this will also interrupt the writer thread, which is what we hope to happen in the
// thread created above.
pw.shutdown(false);