mirror of
https://codeberg.org/Mercury-IM/Smack
synced 2025-09-09 10:19:41 +02:00
Bump Error Prone version to 2.3.4 and fix new bug patterns
This commit is contained in:
parent
1c0bdfae40
commit
d65f2c932e
19 changed files with 499 additions and 585 deletions
|
@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
|
|||
import java.nio.Buffer;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.SelectableChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.security.KeyManagementException;
|
||||
|
@ -61,7 +62,6 @@ import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
|
|||
import org.jivesoftware.smack.SmackException.SmackWrappedException;
|
||||
import org.jivesoftware.smack.SmackFuture;
|
||||
import org.jivesoftware.smack.SmackFuture.InternalSmackFuture;
|
||||
import org.jivesoftware.smack.SmackReactor.ChannelSelectedCallback;
|
||||
import org.jivesoftware.smack.SmackReactor.SelectionKeyAttachment;
|
||||
import org.jivesoftware.smack.XMPPException.FailedNonzaException;
|
||||
import org.jivesoftware.smack.XmppInputOutputFilter;
|
||||
|
@ -280,294 +280,292 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
|
|||
}
|
||||
};
|
||||
|
||||
private final ChannelSelectedCallback channelSelectedCallback =
|
||||
(selectedChannel, selectedSelectionKey) -> {
|
||||
assert selectionKey == null || selectionKey == selectedSelectionKey;
|
||||
SocketChannel selectedSocketChannel = (SocketChannel) selectedChannel;
|
||||
// We are *always* interested in OP_READ.
|
||||
int newInterestedOps = SelectionKey.OP_READ;
|
||||
boolean newPendingOutputFilterData = false;
|
||||
private void onChannelSelected(SelectableChannel selectedChannel, SelectionKey selectedSelectionKey) {
|
||||
assert selectionKey == null || selectionKey == selectedSelectionKey;
|
||||
SocketChannel selectedSocketChannel = (SocketChannel) selectedChannel;
|
||||
// We are *always* interested in OP_READ.
|
||||
int newInterestedOps = SelectionKey.OP_READ;
|
||||
boolean newPendingOutputFilterData = false;
|
||||
|
||||
if (!channelSelectedCallbackLock.tryLock()) {
|
||||
rejectedChannelSelectedCallbacks.incrementAndGet();
|
||||
if (!channelSelectedCallbackLock.tryLock()) {
|
||||
rejectedChannelSelectedCallbacks.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
|
||||
handledChannelSelectedCallbacks++;
|
||||
|
||||
long callbackBytesRead = 0;
|
||||
long callbackBytesWritten = 0;
|
||||
|
||||
try {
|
||||
boolean destinationAddressChanged = false;
|
||||
boolean isLastPartOfElement = false;
|
||||
TopLevelStreamElement currentlyOutgonigTopLevelStreamElement = null;
|
||||
StringBuilder outgoingStreamForDebugger = null;
|
||||
|
||||
writeLoop: while (true) {
|
||||
final boolean moreDataAvailable = !isLastPartOfElement || !connectionInternal.outgoingElementsQueue.isEmpty();
|
||||
|
||||
if (filteredOutgoingBuffer != null || !networkOutgoingBuffers.isEmpty()) {
|
||||
if (filteredOutgoingBuffer != null) {
|
||||
networkOutgoingBuffers.add(filteredOutgoingBuffer);
|
||||
networkOutgoingBuffersBytes += filteredOutgoingBuffer.remaining();
|
||||
|
||||
filteredOutgoingBuffer = null;
|
||||
if (moreDataAvailable && networkOutgoingBuffersBytes < 8096) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
ByteBuffer[] output = networkOutgoingBuffers.toArray(new ByteBuffer[networkOutgoingBuffers.size()]);
|
||||
long bytesWritten;
|
||||
try {
|
||||
bytesWritten = selectedSocketChannel.write(output);
|
||||
} catch (IOException e) {
|
||||
// We have seen here so far
|
||||
// - IOException "Broken pipe"
|
||||
handleReadWriteIoException(e);
|
||||
break;
|
||||
}
|
||||
|
||||
if (bytesWritten == 0) {
|
||||
newInterestedOps |= SelectionKey.OP_WRITE;
|
||||
break;
|
||||
}
|
||||
|
||||
callbackBytesWritten += bytesWritten;
|
||||
|
||||
networkOutgoingBuffersBytes -= bytesWritten;
|
||||
|
||||
List<? extends Buffer> prunedBuffers = pruneBufferList(networkOutgoingBuffers);
|
||||
|
||||
for (Buffer prunedBuffer : prunedBuffers) {
|
||||
List<TopLevelStreamElement> sendElements = bufferToElementMap.remove(prunedBuffer);
|
||||
if (sendElements == null) {
|
||||
continue;
|
||||
}
|
||||
for (TopLevelStreamElement elementJustSend : sendElements) {
|
||||
connectionInternal.fireFirstLevelElementSendListeners(elementJustSend);
|
||||
}
|
||||
}
|
||||
|
||||
// Prevent one callback from dominating the reactor thread. Break out of the write-loop if we have
|
||||
// written a certain amount.
|
||||
if (callbackBytesWritten > CALLBACK_MAX_BYTES_WRITEN) {
|
||||
newInterestedOps |= SelectionKey.OP_WRITE;
|
||||
callbackPreemtBecauseBytesWritten++;
|
||||
break;
|
||||
}
|
||||
} else if (outgoingBuffer != null || pendingOutputFilterData) {
|
||||
pendingOutputFilterData = false;
|
||||
|
||||
if (outgoingBuffer != null) {
|
||||
totalBytesWrittenBeforeFilter += outgoingBuffer.remaining();
|
||||
if (isLastPartOfElement) {
|
||||
assert currentlyOutgonigTopLevelStreamElement != null;
|
||||
currentlyOutgoingElements.add(currentlyOutgonigTopLevelStreamElement);
|
||||
}
|
||||
}
|
||||
|
||||
ByteBuffer outputFilterInputData = outgoingBuffer;
|
||||
// We can now null the outgoingBuffer since the filter step will take care of it from now on.
|
||||
outgoingBuffer = null;
|
||||
|
||||
for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterBeginIterator(); it.hasNext();) {
|
||||
XmppInputOutputFilter inputOutputFilter = it.next();
|
||||
XmppInputOutputFilter.OutputResult outputResult;
|
||||
try {
|
||||
outputResult = inputOutputFilter.output(outputFilterInputData, isLastPartOfElement,
|
||||
destinationAddressChanged, moreDataAvailable);
|
||||
} catch (IOException e) {
|
||||
connectionInternal.notifyConnectionError(e);
|
||||
break writeLoop;
|
||||
}
|
||||
newPendingOutputFilterData |= outputResult.pendingFilterData;
|
||||
outputFilterInputData = outputResult.filteredOutputData;
|
||||
if (outputFilterInputData != null) {
|
||||
outputFilterInputData.flip();
|
||||
}
|
||||
}
|
||||
|
||||
// It is ok if outpuFilterInputData is 'null' here, this is expected behavior.
|
||||
if (outputFilterInputData != null && outputFilterInputData.hasRemaining()) {
|
||||
filteredOutgoingBuffer = outputFilterInputData;
|
||||
} else {
|
||||
filteredOutgoingBuffer = null;
|
||||
}
|
||||
|
||||
// If the filters did eventually not produce any output data but if there is
|
||||
// pending output data then we have a pending write request after read.
|
||||
if (filteredOutgoingBuffer == null && newPendingOutputFilterData) {
|
||||
pendingWriteInterestAfterRead = true;
|
||||
}
|
||||
|
||||
if (filteredOutgoingBuffer != null && isLastPartOfElement) {
|
||||
bufferToElementMap.put(filteredOutgoingBuffer, new ArrayList<>(currentlyOutgoingElements));
|
||||
currentlyOutgoingElements.clear();
|
||||
}
|
||||
|
||||
// Reset that the destination address has changed.
|
||||
if (destinationAddressChanged) {
|
||||
destinationAddressChanged = false;
|
||||
}
|
||||
} else if (outgoingCharSequenceIterator != null) {
|
||||
CharSequence nextCharSequence = outgoingCharSequenceIterator.next();
|
||||
outgoingBuffer = UTF8.encode(nextCharSequence);
|
||||
if (!outgoingCharSequenceIterator.hasNext()) {
|
||||
outgoingCharSequenceIterator = null;
|
||||
isLastPartOfElement = true;
|
||||
} else {
|
||||
isLastPartOfElement = false;
|
||||
}
|
||||
|
||||
final SmackDebugger debugger = connectionInternal.smackDebugger;
|
||||
if (debugger != null) {
|
||||
if (outgoingStreamForDebugger == null) {
|
||||
outgoingStreamForDebugger = new StringBuilder();
|
||||
}
|
||||
outgoingStreamForDebugger.append(nextCharSequence);
|
||||
|
||||
if (isLastPartOfElement) {
|
||||
try {
|
||||
outputDebugSplitter.append(outgoingStreamForDebugger);
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
debugger.onOutgoingElementCompleted();
|
||||
outgoingStreamForDebugger = null;
|
||||
}
|
||||
}
|
||||
} else if (!connectionInternal.outgoingElementsQueue.isEmpty()) {
|
||||
currentlyOutgonigTopLevelStreamElement = connectionInternal.outgoingElementsQueue.poll();
|
||||
if (currentlyOutgonigTopLevelStreamElement instanceof Stanza) {
|
||||
Stanza currentlyOutgoingStanza = (Stanza) currentlyOutgonigTopLevelStreamElement;
|
||||
Jid currentDestinationAddress = currentlyOutgoingStanza.getTo();
|
||||
destinationAddressChanged = !JidUtil.equals(lastDestinationAddress, currentDestinationAddress);
|
||||
lastDestinationAddress = currentDestinationAddress;
|
||||
}
|
||||
CharSequence nextCharSequence = currentlyOutgonigTopLevelStreamElement.toXML(StreamOpen.CLIENT_NAMESPACE);
|
||||
if (nextCharSequence instanceof XmlStringBuilder) {
|
||||
XmlStringBuilder xmlStringBuilder = (XmlStringBuilder) nextCharSequence;
|
||||
XmlEnvironment outgoingStreamXmlEnvironment = connectionInternal.getOutgoingStreamXmlEnvironment();
|
||||
outgoingCharSequenceIterator = xmlStringBuilder.toList(outgoingStreamXmlEnvironment).iterator();
|
||||
} else {
|
||||
outgoingCharSequenceIterator = Collections.singletonList(nextCharSequence).iterator();
|
||||
}
|
||||
assert outgoingCharSequenceIterator != null;
|
||||
} else {
|
||||
// There is nothing more to write.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pendingOutputFilterData = newPendingOutputFilterData;
|
||||
if (!pendingWriteInterestAfterRead && pendingOutputFilterData) {
|
||||
newInterestedOps |= SelectionKey.OP_WRITE;
|
||||
}
|
||||
|
||||
readLoop: while (true) {
|
||||
// Prevent one callback from dominating the reactor thread. Break out of the read-loop if we have
|
||||
// read a certain amount.
|
||||
if (callbackBytesRead > CALLBACK_MAX_BYTES_READ) {
|
||||
callbackPreemtBecauseBytesRead++;
|
||||
break;
|
||||
}
|
||||
|
||||
int bytesRead;
|
||||
incomingBuffer.clear();
|
||||
try {
|
||||
bytesRead = selectedSocketChannel.read(incomingBuffer);
|
||||
} catch (IOException e) {
|
||||
handleReadWriteIoException(e);
|
||||
return;
|
||||
}
|
||||
|
||||
handledChannelSelectedCallbacks++;
|
||||
|
||||
long callbackBytesRead = 0;
|
||||
long callbackBytesWritten = 0;
|
||||
|
||||
try {
|
||||
boolean destinationAddressChanged = false;
|
||||
boolean isLastPartOfElement = false;
|
||||
TopLevelStreamElement currentlyOutgonigTopLevelStreamElement = null;
|
||||
StringBuilder outgoingStreamForDebugger = null;
|
||||
|
||||
writeLoop: while (true) {
|
||||
final boolean moreDataAvailable = !isLastPartOfElement || !connectionInternal.outgoingElementsQueue.isEmpty();
|
||||
|
||||
if (filteredOutgoingBuffer != null || !networkOutgoingBuffers.isEmpty()) {
|
||||
if (filteredOutgoingBuffer != null) {
|
||||
networkOutgoingBuffers.add(filteredOutgoingBuffer);
|
||||
networkOutgoingBuffersBytes += filteredOutgoingBuffer.remaining();
|
||||
|
||||
filteredOutgoingBuffer = null;
|
||||
if (moreDataAvailable && networkOutgoingBuffersBytes < 8096) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
ByteBuffer[] output = networkOutgoingBuffers.toArray(new ByteBuffer[networkOutgoingBuffers.size()]);
|
||||
long bytesWritten;
|
||||
try {
|
||||
bytesWritten = selectedSocketChannel.write(output);
|
||||
} catch (IOException e) {
|
||||
// We have seen here so far
|
||||
// - IOException "Broken pipe"
|
||||
handleReadWriteIoException(e);
|
||||
break;
|
||||
}
|
||||
|
||||
if (bytesWritten == 0) {
|
||||
newInterestedOps |= SelectionKey.OP_WRITE;
|
||||
break;
|
||||
}
|
||||
|
||||
callbackBytesWritten += bytesWritten;
|
||||
|
||||
networkOutgoingBuffersBytes -= bytesWritten;
|
||||
|
||||
List<? extends Buffer> prunedBuffers = pruneBufferList(networkOutgoingBuffers);
|
||||
|
||||
for (Buffer prunedBuffer : prunedBuffers) {
|
||||
List<TopLevelStreamElement> sendElements = bufferToElementMap.remove(prunedBuffer);
|
||||
if (sendElements == null) {
|
||||
continue;
|
||||
}
|
||||
for (TopLevelStreamElement elementJustSend : sendElements) {
|
||||
connectionInternal.fireFirstLevelElementSendListeners(elementJustSend);
|
||||
}
|
||||
}
|
||||
|
||||
// Prevent one callback from dominating the reactor thread. Break out of the write-loop if we have
|
||||
// written a certain amount.
|
||||
if (callbackBytesWritten > CALLBACK_MAX_BYTES_WRITEN) {
|
||||
newInterestedOps |= SelectionKey.OP_WRITE;
|
||||
callbackPreemtBecauseBytesWritten++;
|
||||
break;
|
||||
}
|
||||
} else if (outgoingBuffer != null || pendingOutputFilterData) {
|
||||
pendingOutputFilterData = false;
|
||||
|
||||
if (outgoingBuffer != null) {
|
||||
totalBytesWrittenBeforeFilter += outgoingBuffer.remaining();
|
||||
if (isLastPartOfElement) {
|
||||
assert currentlyOutgonigTopLevelStreamElement != null;
|
||||
currentlyOutgoingElements.add(currentlyOutgonigTopLevelStreamElement);
|
||||
}
|
||||
}
|
||||
|
||||
ByteBuffer outputFilterInputData = outgoingBuffer;
|
||||
// We can now null the outgoingBuffer since the filter step will take care of it from now on.
|
||||
outgoingBuffer = null;
|
||||
|
||||
for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterBeginIterator(); it.hasNext();) {
|
||||
XmppInputOutputFilter inputOutputFilter = it.next();
|
||||
XmppInputOutputFilter.OutputResult outputResult;
|
||||
try {
|
||||
outputResult = inputOutputFilter.output(outputFilterInputData, isLastPartOfElement,
|
||||
destinationAddressChanged, moreDataAvailable);
|
||||
} catch (IOException e) {
|
||||
connectionInternal.notifyConnectionError(e);
|
||||
break writeLoop;
|
||||
}
|
||||
newPendingOutputFilterData |= outputResult.pendingFilterData;
|
||||
outputFilterInputData = outputResult.filteredOutputData;
|
||||
if (outputFilterInputData != null) {
|
||||
outputFilterInputData.flip();
|
||||
}
|
||||
}
|
||||
|
||||
// It is ok if outpuFilterInputData is 'null' here, this is expected behavior.
|
||||
if (outputFilterInputData != null && outputFilterInputData.hasRemaining()) {
|
||||
filteredOutgoingBuffer = outputFilterInputData;
|
||||
} else {
|
||||
filteredOutgoingBuffer = null;
|
||||
}
|
||||
|
||||
// If the filters did eventually not produce any output data but if there is
|
||||
// pending output data then we have a pending write request after read.
|
||||
if (filteredOutgoingBuffer == null && newPendingOutputFilterData) {
|
||||
pendingWriteInterestAfterRead = true;
|
||||
}
|
||||
|
||||
if (filteredOutgoingBuffer != null && isLastPartOfElement) {
|
||||
bufferToElementMap.put(filteredOutgoingBuffer, new ArrayList<>(currentlyOutgoingElements));
|
||||
currentlyOutgoingElements.clear();
|
||||
}
|
||||
|
||||
// Reset that the destination address has changed.
|
||||
if (destinationAddressChanged) {
|
||||
destinationAddressChanged = false;
|
||||
}
|
||||
} else if (outgoingCharSequenceIterator != null) {
|
||||
CharSequence nextCharSequence = outgoingCharSequenceIterator.next();
|
||||
outgoingBuffer = UTF8.encode(nextCharSequence);
|
||||
if (!outgoingCharSequenceIterator.hasNext()) {
|
||||
outgoingCharSequenceIterator = null;
|
||||
isLastPartOfElement = true;
|
||||
} else {
|
||||
isLastPartOfElement = false;
|
||||
}
|
||||
|
||||
final SmackDebugger debugger = connectionInternal.smackDebugger;
|
||||
if (debugger != null) {
|
||||
if (outgoingStreamForDebugger == null) {
|
||||
outgoingStreamForDebugger = new StringBuilder();
|
||||
}
|
||||
outgoingStreamForDebugger.append(nextCharSequence);
|
||||
|
||||
if (isLastPartOfElement) {
|
||||
try {
|
||||
outputDebugSplitter.append(outgoingStreamForDebugger);
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
debugger.onOutgoingElementCompleted();
|
||||
outgoingStreamForDebugger = null;
|
||||
}
|
||||
}
|
||||
} else if (!connectionInternal.outgoingElementsQueue.isEmpty()) {
|
||||
currentlyOutgonigTopLevelStreamElement = connectionInternal.outgoingElementsQueue.poll();
|
||||
if (currentlyOutgonigTopLevelStreamElement instanceof Stanza) {
|
||||
Stanza currentlyOutgoingStanza = (Stanza) currentlyOutgonigTopLevelStreamElement;
|
||||
Jid currentDestinationAddress = currentlyOutgoingStanza.getTo();
|
||||
destinationAddressChanged = !JidUtil.equals(lastDestinationAddress, currentDestinationAddress);
|
||||
lastDestinationAddress = currentDestinationAddress;
|
||||
}
|
||||
CharSequence nextCharSequence = currentlyOutgonigTopLevelStreamElement.toXML(StreamOpen.CLIENT_NAMESPACE);
|
||||
if (nextCharSequence instanceof XmlStringBuilder) {
|
||||
XmlStringBuilder xmlStringBuilder = (XmlStringBuilder) nextCharSequence;
|
||||
XmlEnvironment outgoingStreamXmlEnvironment = connectionInternal.getOutgoingStreamXmlEnvironment();
|
||||
outgoingCharSequenceIterator = xmlStringBuilder.toList(outgoingStreamXmlEnvironment).iterator();
|
||||
} else {
|
||||
outgoingCharSequenceIterator = Collections.singletonList(nextCharSequence).iterator();
|
||||
}
|
||||
assert outgoingCharSequenceIterator != null;
|
||||
} else {
|
||||
// There is nothing more to write.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pendingOutputFilterData = newPendingOutputFilterData;
|
||||
if (!pendingWriteInterestAfterRead && pendingOutputFilterData) {
|
||||
newInterestedOps |= SelectionKey.OP_WRITE;
|
||||
}
|
||||
|
||||
readLoop: while (true) {
|
||||
// Prevent one callback from dominating the reactor thread. Break out of the read-loop if we have
|
||||
// read a certain amount.
|
||||
if (callbackBytesRead > CALLBACK_MAX_BYTES_READ) {
|
||||
callbackPreemtBecauseBytesRead++;
|
||||
break;
|
||||
}
|
||||
|
||||
int bytesRead;
|
||||
incomingBuffer.clear();
|
||||
try {
|
||||
bytesRead = selectedSocketChannel.read(incomingBuffer);
|
||||
} catch (IOException e) {
|
||||
handleReadWriteIoException(e);
|
||||
return;
|
||||
}
|
||||
|
||||
if (bytesRead < 0) {
|
||||
LOGGER.finer("NIO read() returned " + bytesRead
|
||||
+ " for " + this + ". This probably means that the TCP connection was terminated.");
|
||||
// According to the socket channel javadoc section about "asynchronous reads" a socket channel's
|
||||
// read() may return -1 if the input side of a socket is shut down.
|
||||
|
||||
// Note that we do not call notifyConnectionError() here because the connection may be
|
||||
// cleanly shutdown which would also cause read() to return '-1. I assume that this socket
|
||||
// will be selected again, on which read() would throw an IOException, which will be catched
|
||||
// and invoke notifyConnectionError() (see a few lines above).
|
||||
/*
|
||||
IOException exception = new IOException("NIO read() returned " + bytesRead);
|
||||
notifyConnectionError(exception);
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
if (!pendingInputFilterData) {
|
||||
if (bytesRead == 0) {
|
||||
// Nothing more to read.
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
pendingInputFilterData = false;
|
||||
}
|
||||
|
||||
// We have successfully read something. It is now possible that a filter is now also able to write
|
||||
// additional data (for example SSLEngine).
|
||||
if (pendingWriteInterestAfterRead) {
|
||||
pendingWriteInterestAfterRead = false;
|
||||
newInterestedOps |= SelectionKey.OP_WRITE;
|
||||
}
|
||||
|
||||
callbackBytesRead += bytesRead;
|
||||
|
||||
ByteBuffer filteredIncomingBuffer = incomingBuffer;
|
||||
for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterEndIterator(); it.hasPrevious();) {
|
||||
filteredIncomingBuffer.flip();
|
||||
|
||||
ByteBuffer newFilteredIncomingBuffer;
|
||||
try {
|
||||
newFilteredIncomingBuffer = it.previous().input(filteredIncomingBuffer);
|
||||
} catch (IOException e) {
|
||||
connectionInternal.notifyConnectionError(e);
|
||||
return;
|
||||
}
|
||||
if (newFilteredIncomingBuffer == null) {
|
||||
break readLoop;
|
||||
}
|
||||
filteredIncomingBuffer = newFilteredIncomingBuffer;
|
||||
}
|
||||
|
||||
final int bytesReadAfterFilter = filteredIncomingBuffer.flip().remaining();
|
||||
|
||||
totalBytesReadAfterFilter += bytesReadAfterFilter;
|
||||
|
||||
try {
|
||||
splitter.write(filteredIncomingBuffer);
|
||||
} catch (IOException e) {
|
||||
connectionInternal.notifyConnectionError(e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
totalBytesWritten += callbackBytesWritten;
|
||||
totalBytesRead += callbackBytesRead;
|
||||
|
||||
channelSelectedCallbackLock.unlock();
|
||||
if (bytesRead < 0) {
|
||||
LOGGER.finer("NIO read() returned " + bytesRead
|
||||
+ " for " + this + ". This probably means that the TCP connection was terminated.");
|
||||
// According to the socket channel javadoc section about "asynchronous reads" a socket channel's
|
||||
// read() may return -1 if the input side of a socket is shut down.
|
||||
// Note that we do not call notifyConnectionError() here because the connection may be
|
||||
// cleanly shutdown which would also cause read() to return '-1. I assume that this socket
|
||||
// will be selected again, on which read() would throw an IOException, which will be catched
|
||||
// and invoke notifyConnectionError() (see a few lines above).
|
||||
/*
|
||||
IOException exception = new IOException("NIO read() returned " + bytesRead);
|
||||
notifyConnectionError(exception);
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
// Indicate that there is no reactor thread racing towards handling this selection key.
|
||||
final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment;
|
||||
if (selectionKeyAttachment != null) {
|
||||
selectionKeyAttachment.resetReactorThreadRacing();
|
||||
if (!pendingInputFilterData) {
|
||||
if (bytesRead == 0) {
|
||||
// Nothing more to read.
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
pendingInputFilterData = false;
|
||||
}
|
||||
|
||||
// Check the queue again to prevent lost wakeups caused by elements inserted before we
|
||||
// called resetReactorThreadRacing() a few lines above.
|
||||
if (!connectionInternal.outgoingElementsQueue.isEmpty()) {
|
||||
setWriteInterestAfterChannelSelectedCallback.incrementAndGet();
|
||||
// We have successfully read something. It is now possible that a filter is now also able to write
|
||||
// additional data (for example SSLEngine).
|
||||
if (pendingWriteInterestAfterRead) {
|
||||
pendingWriteInterestAfterRead = false;
|
||||
newInterestedOps |= SelectionKey.OP_WRITE;
|
||||
}
|
||||
|
||||
connectionInternal.setInterestOps(selectionKey, newInterestedOps);
|
||||
};
|
||||
callbackBytesRead += bytesRead;
|
||||
|
||||
ByteBuffer filteredIncomingBuffer = incomingBuffer;
|
||||
for (ListIterator<XmppInputOutputFilter> it = connectionInternal.getXmppInputOutputFilterEndIterator(); it.hasPrevious();) {
|
||||
filteredIncomingBuffer.flip();
|
||||
|
||||
ByteBuffer newFilteredIncomingBuffer;
|
||||
try {
|
||||
newFilteredIncomingBuffer = it.previous().input(filteredIncomingBuffer);
|
||||
} catch (IOException e) {
|
||||
connectionInternal.notifyConnectionError(e);
|
||||
return;
|
||||
}
|
||||
if (newFilteredIncomingBuffer == null) {
|
||||
break readLoop;
|
||||
}
|
||||
filteredIncomingBuffer = newFilteredIncomingBuffer;
|
||||
}
|
||||
|
||||
final int bytesReadAfterFilter = filteredIncomingBuffer.flip().remaining();
|
||||
|
||||
totalBytesReadAfterFilter += bytesReadAfterFilter;
|
||||
|
||||
try {
|
||||
splitter.write(filteredIncomingBuffer);
|
||||
} catch (IOException e) {
|
||||
connectionInternal.notifyConnectionError(e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
totalBytesWritten += callbackBytesWritten;
|
||||
totalBytesRead += callbackBytesRead;
|
||||
|
||||
channelSelectedCallbackLock.unlock();
|
||||
}
|
||||
|
||||
// Indicate that there is no reactor thread racing towards handling this selection key.
|
||||
final SelectionKeyAttachment selectionKeyAttachment = this.selectionKeyAttachment;
|
||||
if (selectionKeyAttachment != null) {
|
||||
selectionKeyAttachment.resetReactorThreadRacing();
|
||||
}
|
||||
|
||||
// Check the queue again to prevent lost wakeups caused by elements inserted before we
|
||||
// called resetReactorThreadRacing() a few lines above.
|
||||
if (!connectionInternal.outgoingElementsQueue.isEmpty()) {
|
||||
setWriteInterestAfterChannelSelectedCallback.incrementAndGet();
|
||||
newInterestedOps |= SelectionKey.OP_WRITE;
|
||||
}
|
||||
|
||||
connectionInternal.setInterestOps(selectionKey, newInterestedOps);
|
||||
}
|
||||
|
||||
private void handleReadWriteIoException(IOException e) {
|
||||
if (e instanceof ClosedChannelException && !tcpNioTransport.isConnected()) {
|
||||
|
@ -677,7 +675,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
|
|||
}
|
||||
|
||||
@Override
|
||||
public Stats getStats() {
|
||||
public XmppTcpTransportModule.Stats getStats() {
|
||||
return XmppTcpTransportModule.this.getStats();
|
||||
}
|
||||
|
||||
|
@ -774,7 +772,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
|
|||
remoteAddress = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();
|
||||
|
||||
selectionKey = connectionInternal.registerWithSelector(socketChannel, SelectionKey.OP_READ,
|
||||
channelSelectedCallback);
|
||||
XmppTcpTransportModule.this::onChannelSelected);
|
||||
selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment();
|
||||
|
||||
connectionInternal.setTransport(tcpNioTransport);
|
||||
|
@ -1316,7 +1314,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
|
|||
pendingOutputFilterData = true;
|
||||
}
|
||||
|
||||
channelSelectedCallback.onChannelSelected(channel, key);
|
||||
onChannelSelected(channel, key);
|
||||
} finally {
|
||||
channelSelectedCallbackLock.unlock();
|
||||
}
|
||||
|
@ -1347,7 +1345,7 @@ public class XmppTcpTransportModule extends ModularXmppClientToServerConnectionM
|
|||
return CollectionUtil.removeUntil(buffers, b -> b.hasRemaining());
|
||||
}
|
||||
|
||||
public Stats getStats() {
|
||||
public XmppTcpTransportModule.Stats getStats() {
|
||||
return new Stats(this);
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue