mirror of
https://codeberg.org/Mercury-IM/Smack
synced 2025-09-10 10:49:41 +02:00
Merge Smack 4.1.0-rc2
Conflicts: smack-core/src/main/java/org/jivesoftware/smack/filter/FromMatchesFilter.java smack-extensions/src/main/java/org/jivesoftware/smackx/iqregister/AccountManager.java version.gradle
This commit is contained in:
commit
fbf0ba13ce
48 changed files with 650 additions and 166 deletions
|
@ -16,7 +16,11 @@
|
|||
*/
|
||||
package org.jivesoftware.smack.sm;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.jivesoftware.smack.SmackException;
|
||||
import org.jivesoftware.smack.packet.Stanza;
|
||||
|
||||
public abstract class StreamManagementException extends SmackException {
|
||||
|
||||
|
@ -52,5 +56,59 @@ public abstract class StreamManagementException extends SmackException {
|
|||
super("Stream IDs do not match. Expected '" + expected + "', but got '" + got + "'");
|
||||
}
|
||||
}
|
||||
|
||||
public static class StreamManagementCounterError extends StreamManagementException {
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final long handledCount;
|
||||
private final long previousServerHandledCount;
|
||||
private final long ackedStanzaCount;
|
||||
private final int outstandingStanzasCount;
|
||||
private final List<Stanza> ackedStanzas;
|
||||
|
||||
public StreamManagementCounterError(long handledCount, long previousServerHandlerCount,
|
||||
long ackedStanzaCount,
|
||||
List<Stanza> ackedStanzas) {
|
||||
super(
|
||||
"There was an error regarding the Stream Mangement counters. Server reported "
|
||||
+ handledCount
|
||||
+ " handled stanzas, which means that the "
|
||||
+ ackedStanzaCount
|
||||
+ " recently send stanzas by client are now acked by the server. But Smack had only "
|
||||
+ ackedStanzas.size()
|
||||
+ " to acknowledge. The stanza id of the last acked outstanding stanza is "
|
||||
+ (ackedStanzas.isEmpty() ? "<no acked stanzas>"
|
||||
: ackedStanzas.get(ackedStanzas.size() - 1).getStanzaId()));
|
||||
this.handledCount = handledCount;
|
||||
this.previousServerHandledCount = previousServerHandlerCount;
|
||||
this.ackedStanzaCount = ackedStanzaCount;
|
||||
this.outstandingStanzasCount = ackedStanzas.size();
|
||||
this.ackedStanzas = Collections.unmodifiableList(ackedStanzas);
|
||||
}
|
||||
|
||||
public long getHandledCount() {
|
||||
return handledCount;
|
||||
}
|
||||
|
||||
public long getPreviousServerHandledCount() {
|
||||
return previousServerHandledCount;
|
||||
}
|
||||
|
||||
public long getAckedStanzaCount() {
|
||||
return ackedStanzaCount;
|
||||
}
|
||||
|
||||
public int getOutstandingStanzasCount() {
|
||||
return outstandingStanzasCount;
|
||||
}
|
||||
|
||||
public List<Stanza> getAckedStanzas() {
|
||||
return ackedStanzas;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2015 Florian Schmaus
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.jivesoftware.smack.tcp;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
||||
public class BundleAndDefer {
|
||||
|
||||
private final AtomicBoolean isStopped;
|
||||
|
||||
BundleAndDefer(AtomicBoolean isStopped) {
|
||||
this.isStopped = isStopped;
|
||||
}
|
||||
|
||||
public void stopCurrentBundleAndDefer() {
|
||||
synchronized (isStopped) {
|
||||
if (isStopped.get()) {
|
||||
return;
|
||||
}
|
||||
isStopped.set(true);
|
||||
isStopped.notify();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2015 Florian Schmaus
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.jivesoftware.smack.tcp;
|
||||
|
||||
/**
|
||||
* This callback is used to get the current value of the period in which Smack does bundle and defer
|
||||
* outgoing stanzas.
|
||||
* <p>
|
||||
* Smack will bundle and defer stanzas if the connection is authenticated, the send queue is empty
|
||||
* and if a bundle and defer callback is set, either via
|
||||
* {@link XMPPTCPConnection#setDefaultBundleAndDeferCallback(BundleAndDeferCallback)} or
|
||||
* {@link XMPPTCPConnection#setBundleandDeferCallback(BundleAndDeferCallback)}, and
|
||||
* {@link #getBundleAndDeferMillis(BundleAndDefer)} returns a positive value. In a mobile environment, bundling
|
||||
* and deferring outgoing stanzas may reduce battery consumption. It heavily depends on the
|
||||
* environment, but recommend values for the bundle and defer period range from 20-60 seconds. But
|
||||
* keep in mind that longer periods decrease the realtime aspect of Smack.
|
||||
* </p>
|
||||
* <p>
|
||||
* Smack will invoke the callback when it needs to know the length of the bundle and defer period.
|
||||
* If {@link #getBundleAndDeferMillis(BundleAndDefer)} returns 0 or a negative value, then the
|
||||
* stanzas will send immediately. You can also prematurely abort the bundling of stanzas by calling
|
||||
* {@link BundleAndDefer#stopCurrentBundleAndDefer()}.
|
||||
* </p>
|
||||
*/
|
||||
public interface BundleAndDeferCallback {
|
||||
|
||||
/**
|
||||
* Return the bundle and defer period used by Smack in milliseconds.
|
||||
*
|
||||
* @param bundleAndDefer used to premature abort bundle and defer.
|
||||
* @return the bundle and defer period in milliseconds.
|
||||
*/
|
||||
public int getBundleAndDeferMillis(BundleAndDefer bundleAndDefer);
|
||||
|
||||
}
|
|
@ -54,6 +54,7 @@ import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success;
|
|||
import org.jivesoftware.smack.sm.SMUtils;
|
||||
import org.jivesoftware.smack.sm.StreamManagementException;
|
||||
import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
|
||||
import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
|
||||
import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
|
||||
import org.jivesoftware.smack.sm.packet.StreamManagement;
|
||||
import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
|
||||
|
@ -126,6 +127,7 @@ import java.util.concurrent.BlockingQueue;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
|
@ -185,6 +187,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
private final SynchronizationPoint<XMPPException> compressSyncPoint = new SynchronizationPoint<XMPPException>(
|
||||
this);
|
||||
|
||||
private static BundleAndDeferCallback defaultBundleAndDeferCallback;
|
||||
|
||||
private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
|
||||
|
||||
private static boolean useSmDefault = false;
|
||||
|
||||
private static boolean useSmResumptionDefault = true;
|
||||
|
@ -719,7 +725,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
Socket plain = socket;
|
||||
// Secure the plain connection
|
||||
socket = context.getSocketFactory().createSocket(plain,
|
||||
plain.getInetAddress().getHostAddress(), plain.getPort(), true);
|
||||
host, plain.getPort(), true);
|
||||
// Initialize the reader and writer with the new secured version
|
||||
initReaderAndWriter();
|
||||
|
||||
|
@ -1272,6 +1278,30 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
if (element == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get a local version of the bundle and defer callback, in case it's unset
|
||||
// between the null check and the method invocation
|
||||
final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
|
||||
// If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
|
||||
// empty), then we could wait a bit for further stanzas attempting to decrease
|
||||
// our energy consumption
|
||||
if (localBundleAndDeferCallback != null && isAuthenticated() && queue.isEmpty()) {
|
||||
final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
|
||||
final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
|
||||
bundlingAndDeferringStopped));
|
||||
if (bundleAndDeferMillis > 0) {
|
||||
long remainingWait = bundleAndDeferMillis;
|
||||
final long waitStart = System.currentTimeMillis();
|
||||
synchronized (bundlingAndDeferringStopped) {
|
||||
while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
|
||||
bundlingAndDeferringStopped.wait(remainingWait);
|
||||
remainingWait = bundleAndDeferMillis
|
||||
- (System.currentTimeMillis() - waitStart);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Stanza packet = null;
|
||||
if (element instanceof Stanza) {
|
||||
packet = (Stanza) element;
|
||||
|
@ -1288,6 +1318,8 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
writer.flush();
|
||||
}
|
||||
try {
|
||||
// It is important the we put the stanza in the unacknowledged stanza
|
||||
// queue before we put it on the wire
|
||||
unacknowledgedStanzas.put(packet);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
|
@ -1646,7 +1678,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
return Math.min(clientResumptionTime, serverResumptionTime);
|
||||
}
|
||||
|
||||
private void processHandledCount(long handledCount) throws NotConnectedException {
|
||||
private void processHandledCount(long handledCount) throws NotConnectedException, StreamManagementCounterError {
|
||||
long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
|
||||
final List<Stanza> ackedStanzas = new ArrayList<Stanza>(
|
||||
handledCount <= Integer.MAX_VALUE ? (int) handledCount
|
||||
|
@ -1655,7 +1687,10 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
Stanza ackedStanza = unacknowledgedStanzas.poll();
|
||||
// If the server ack'ed a stanza, then it must be in the
|
||||
// unacknowledged stanza queue. There can be no exception.
|
||||
assert(ackedStanza != null);
|
||||
if (ackedStanza == null) {
|
||||
throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
|
||||
ackedStanzasCount, ackedStanzas);
|
||||
}
|
||||
ackedStanzas.add(ackedStanza);
|
||||
}
|
||||
|
||||
|
@ -1691,7 +1726,7 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
}
|
||||
String id = ackedStanza.getStanzaId();
|
||||
if (StringUtils.isNullOrEmpty(id)) {
|
||||
return;
|
||||
continue;
|
||||
}
|
||||
PacketListener listener = stanzaIdAcknowledgedListeners.remove(id);
|
||||
if (listener != null) {
|
||||
|
@ -1709,4 +1744,31 @@ public class XMPPTCPConnection extends AbstractXMPPConnection {
|
|||
|
||||
serverHandledStanzasCount = handledCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the default bundle and defer callback used for new connections.
|
||||
*
|
||||
* @param defaultBundleAndDeferCallback
|
||||
* @see BundleAndDeferCallback
|
||||
* @since 4.1
|
||||
*/
|
||||
public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
|
||||
XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the bundle and defer callback used for this connection.
|
||||
* <p>
|
||||
* You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
|
||||
* no longer get deferred.
|
||||
* </p>
|
||||
*
|
||||
* @param bundleAndDeferCallback the callback or <code>null</code>.
|
||||
* @see BundleAndDeferCallback
|
||||
* @since 4.1
|
||||
*/
|
||||
public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
|
||||
this.bundleAndDeferCallback = bundleAndDeferCallback;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue