mirror of
https://codeberg.org/Mercury-IM/Smack
synced 2025-12-08 14:11:07 +01:00
Rework incoming packet listeners and Roster
Differentiate between asynchronous and synchronous ones. Asynchronous are the ones where the invocation order may not be the same as the order in which the stanzas arrived. Since it's no longer guaranteed that when a unit test calls processPacket(stanza) the stanza will be completely processed when the call returns, it was necessary to extend the unit tests (mostly Roster and ChatManager) with a packet listener that waits for his invocation. Since we now also use LinkedHashMaps as Map for the packet listeners (SMACK-531, SMACK-424), adding a packet listeners as last also means that it will be called as last. We exploit this behavior change now in the unit tests. Rename 'recvListeners' to 'syncRecvListeners' in AbstractXMPPConnection. Rename 'rosterInitialized' to 'loaded' in Roster. Add Roster.isLoaded(). Reset 'loaded' to false in Roster.setOfflinePresencesAndResetLoaded() (was setOfflinePresences()). Fixes SMACK-583, SMACK-532, SMACK-424
This commit is contained in:
parent
e5c6c9bdf8
commit
717090d272
39 changed files with 443 additions and 306 deletions
|
|
@ -22,6 +22,7 @@ import java.io.Writer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
|
@ -126,12 +127,14 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
private final Collection<PacketCollector> collectors = new ConcurrentLinkedQueue<PacketCollector>();
|
||||
|
||||
/**
|
||||
* List of PacketListeners that will be notified when a new packet was received.
|
||||
* List of PacketListeners that will be notified synchronously when a new packet was received.
|
||||
*/
|
||||
private final Map<PacketListener, ListenerWrapper> recvListeners =
|
||||
new HashMap<PacketListener, ListenerWrapper>();
|
||||
private final Map<PacketListener, ListenerWrapper> syncRecvListeners = new LinkedHashMap<>();
|
||||
|
||||
private final Map<PacketListener, ListenerWrapper> asyncRecvListeners = new HashMap<>();
|
||||
/**
|
||||
* List of PacketListeners that will be notified asynchronously when a new packet was received.
|
||||
*/
|
||||
private final Map<PacketListener, ListenerWrapper> asyncRecvListeners = new LinkedHashMap<>();
|
||||
|
||||
/**
|
||||
* List of PacketListeners that will be notified when a new packet was sent.
|
||||
|
|
@ -245,6 +248,14 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
// @formatter:on
|
||||
);
|
||||
|
||||
/**
|
||||
* A executor service used to invoke the callbacks of synchronous packet listeners. We use a executor service to
|
||||
* decouple incoming stanza processing from callback invocation. It is important that order of callback invocation
|
||||
* is the same as the order of the incoming stanzas. Therefore we use a <i>single</i> threaded executor service.
|
||||
*/
|
||||
private final ExecutorService singleThreadedExecutorService = Executors.newSingleThreadExecutor(new SmackExecutorThreadFactory(
|
||||
getConnectionCounter(), "Sync PacketListener Callback"));
|
||||
|
||||
private Roster roster;
|
||||
|
||||
/**
|
||||
|
|
@ -593,27 +604,10 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
// changes to the roster. Note: because of this waiting logic, internal
|
||||
// Smack code should be wary about calling the getRoster method, and may need to
|
||||
// access the roster object directly.
|
||||
// Also only check for rosterInitalized is isRosterLoadedAtLogin is set, otherwise the user
|
||||
// Also only check for rosterIsLoaded is isRosterLoadedAtLogin is set, otherwise the user
|
||||
// has to manually call Roster.reload() before he can expect a initialized roster.
|
||||
if (!roster.rosterInitialized && config.isRosterLoadedAtLogin()) {
|
||||
try {
|
||||
synchronized (roster) {
|
||||
long waitTime = getPacketReplyTimeout();
|
||||
long start = System.currentTimeMillis();
|
||||
while (!roster.rosterInitialized) {
|
||||
if (waitTime <= 0) {
|
||||
break;
|
||||
}
|
||||
roster.wait(waitTime);
|
||||
long now = System.currentTimeMillis();
|
||||
waitTime -= now - start;
|
||||
start = now;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (InterruptedException ie) {
|
||||
// Ignore.
|
||||
}
|
||||
if (!roster.isLoaded() && config.isRosterLoadedAtLogin()) {
|
||||
roster.waitUntilLoaded();
|
||||
}
|
||||
return roster;
|
||||
}
|
||||
|
|
@ -727,19 +721,29 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
|
||||
@Override
|
||||
public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) {
|
||||
if (packetListener == null) {
|
||||
throw new NullPointerException("Packet listener is null.");
|
||||
}
|
||||
ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
|
||||
synchronized (recvListeners) {
|
||||
recvListeners.put(packetListener, wrapper);
|
||||
}
|
||||
addAsyncPacketListener(packetListener, packetFilter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removePacketListener(PacketListener packetListener) {
|
||||
synchronized (recvListeners) {
|
||||
return recvListeners.remove(packetListener) != null;
|
||||
return removeAsyncPacketListener(packetListener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addSyncPacketListener(PacketListener packetListener, PacketFilter packetFilter) {
|
||||
if (packetListener == null) {
|
||||
throw new NullPointerException("Packet listener is null.");
|
||||
}
|
||||
ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
|
||||
synchronized (syncRecvListeners) {
|
||||
syncRecvListeners.put(packetListener, wrapper);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeSyncPacketListener(PacketListener packetListener) {
|
||||
synchronized (syncRecvListeners) {
|
||||
return syncRecvListeners.remove(packetListener) != null;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -961,7 +965,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
// First handle the async recv listeners. Note that this code is very similar to what follows a few lines below,
|
||||
// the only difference is that asyncRecvListeners is used here and that the packet listeners are started in
|
||||
// their own thread.
|
||||
Collection<PacketListener> listenersToNotify = new LinkedList<PacketListener>();
|
||||
final Collection<PacketListener> listenersToNotify = new LinkedList<PacketListener>();
|
||||
synchronized (asyncRecvListeners) {
|
||||
for (ListenerWrapper listenerWrapper : asyncRecvListeners.values()) {
|
||||
if (listenerWrapper.filterMatches(packet)) {
|
||||
|
|
@ -989,25 +993,33 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
}
|
||||
|
||||
// Notify the receive listeners interested in the packet
|
||||
listenersToNotify = new LinkedList<PacketListener>();
|
||||
synchronized (recvListeners) {
|
||||
for (ListenerWrapper listenerWrapper : recvListeners.values()) {
|
||||
listenersToNotify.clear();
|
||||
synchronized (syncRecvListeners) {
|
||||
for (ListenerWrapper listenerWrapper : syncRecvListeners.values()) {
|
||||
if (listenerWrapper.filterMatches(packet)) {
|
||||
listenersToNotify.add(listenerWrapper.getListener());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (PacketListener listener : listenersToNotify) {
|
||||
try {
|
||||
listener.processPacket(packet);
|
||||
} catch(NotConnectedException e) {
|
||||
LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
|
||||
// Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single
|
||||
// threaded executor service and therefore keeps the order.
|
||||
singleThreadedExecutorService.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (PacketListener listener : listenersToNotify) {
|
||||
try {
|
||||
listener.processPacket(packet);
|
||||
} catch(NotConnectedException e) {
|
||||
LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1145,6 +1157,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
executorService.shutdownNow();
|
||||
cachedExecutorService.shutdown();
|
||||
removeCallbacksService.shutdownNow();
|
||||
singleThreadedExecutorService.shutdownNow();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.log(Level.WARNING, "finalize() threw trhowable", t);
|
||||
}
|
||||
|
|
@ -1302,14 +1315,14 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
}
|
||||
}
|
||||
finally {
|
||||
removePacketListener(this);
|
||||
removeAsyncPacketListener(this);
|
||||
}
|
||||
}
|
||||
};
|
||||
removeCallbacksService.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
boolean removed = removePacketListener(packetListener);
|
||||
boolean removed = removeAsyncPacketListener(packetListener);
|
||||
// If the packetListener got removed, then it was never run and
|
||||
// we never received a response, inform the exception callback
|
||||
if (removed && exceptionCallback != null) {
|
||||
|
|
@ -1317,7 +1330,7 @@ public abstract class AbstractXMPPConnection implements XMPPConnection {
|
|||
}
|
||||
}
|
||||
}, timeout, TimeUnit.MILLISECONDS);
|
||||
addPacketListener(packetListener, replyFilter);
|
||||
addAsyncPacketListener(packetListener, replyFilter);
|
||||
sendPacket(stanza);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -137,7 +137,7 @@ public class ChatManager extends Manager{
|
|||
|
||||
// Add a listener for all message packets so that we can deliver
|
||||
// messages to the best Chat instance available.
|
||||
connection.addPacketListener(new PacketListener() {
|
||||
connection.addSyncPacketListener(new PacketListener() {
|
||||
public void processPacket(Packet packet) {
|
||||
Message message = (Message) packet;
|
||||
Chat chat;
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ import org.jivesoftware.smack.packet.Packet;
|
|||
* org.jivesoftware.smack.filter.PacketFilter)}
|
||||
* </p>
|
||||
*
|
||||
* @see XMPPConnection#addPacketListener(PacketListener, org.jivesoftware.smack.filter.PacketFilter)
|
||||
* @see XMPPConnection#addAsyncPacketListener(PacketListener, org.jivesoftware.smack.filter.PacketFilter)
|
||||
* @author Matt Tucker
|
||||
*/
|
||||
public interface PacketListener {
|
||||
|
|
|
|||
|
|
@ -85,9 +85,11 @@ public class Roster {
|
|||
private final List<RosterEntry> unfiledEntries = new CopyOnWriteArrayList<RosterEntry>();
|
||||
private final List<RosterListener> rosterListeners = new CopyOnWriteArrayList<RosterListener>();
|
||||
private final Map<String, Map<String, Presence>> presenceMap = new ConcurrentHashMap<String, Map<String, Presence>>();
|
||||
|
||||
// The roster is marked as initialized when at least a single roster packet
|
||||
// has been received and processed.
|
||||
boolean rosterInitialized = false;
|
||||
private boolean loaded = false;
|
||||
|
||||
private final PresencePacketListener presencePacketListener = new PresencePacketListener();
|
||||
|
||||
private SubscriptionMode subscriptionMode = getDefaultSubscriptionMode();
|
||||
|
|
@ -124,10 +126,13 @@ public class Roster {
|
|||
Roster(final XMPPConnection connection) {
|
||||
this.connection = connection;
|
||||
rosterStore = connection.getRosterStore();
|
||||
|
||||
// Note that we use sync packet listeners because RosterListeners should be invoked in the same order as the
|
||||
// roster stanzas arrive.
|
||||
// Listen for any roster packets.
|
||||
connection.addPacketListener(new RosterPushListener(), ROSTER_PUSH_FILTER);
|
||||
connection.addSyncPacketListener(new RosterPushListener(), ROSTER_PUSH_FILTER);
|
||||
// Listen for any presence packets.
|
||||
connection.addPacketListener(presencePacketListener, PRESENCE_PACKET_FILTER);
|
||||
connection.addSyncPacketListener(presencePacketListener, PRESENCE_PACKET_FILTER);
|
||||
|
||||
// Listen for connection events
|
||||
connection.addConnectionListener(new AbstractConnectionListener() {
|
||||
|
|
@ -151,22 +156,12 @@ public class Roster {
|
|||
|
||||
public void connectionClosed() {
|
||||
// Changes the presence available contacts to unavailable
|
||||
try {
|
||||
setOfflinePresences();
|
||||
}
|
||||
catch (NotConnectedException e) {
|
||||
LOGGER.log(Level.SEVERE, "Not connected exception" ,e);
|
||||
}
|
||||
setOfflinePresencesAndResetLoaded();
|
||||
}
|
||||
|
||||
public void connectionClosedOnError(Exception e) {
|
||||
// Changes the presence available contacts to unavailable
|
||||
try {
|
||||
setOfflinePresences();
|
||||
}
|
||||
catch (NotConnectedException e1) {
|
||||
LOGGER.log(Level.SEVERE, "Not connected exception" ,e);
|
||||
}
|
||||
setOfflinePresencesAndResetLoaded();
|
||||
}
|
||||
|
||||
});
|
||||
|
|
@ -238,6 +233,40 @@ public class Roster {
|
|||
});
|
||||
}
|
||||
|
||||
protected boolean waitUntilLoaded() {
|
||||
long waitTime = connection.getPacketReplyTimeout();
|
||||
long start = System.currentTimeMillis();
|
||||
while (!loaded) {
|
||||
if (waitTime <= 0) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
synchronized (this) {
|
||||
if (!loaded) {
|
||||
wait(waitTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
LOGGER.log(Level.FINE, "spurious interrupt", e);
|
||||
}
|
||||
long now = System.currentTimeMillis();
|
||||
waitTime -= now - start;
|
||||
start = now;
|
||||
}
|
||||
return isLoaded();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the roster is loaded.
|
||||
*
|
||||
* @return true if the roster is loaded.
|
||||
* @since 4.1
|
||||
*/
|
||||
public boolean isLoaded() {
|
||||
return loaded;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a listener to this roster. The listener will be fired anytime one or more
|
||||
* changes to the roster are pushed from the server.
|
||||
|
|
@ -696,7 +725,7 @@ public class Roster {
|
|||
* to offline.
|
||||
* @throws NotConnectedException
|
||||
*/
|
||||
private void setOfflinePresences() throws NotConnectedException {
|
||||
private void setOfflinePresencesAndResetLoaded() {
|
||||
Presence packetUnavailable;
|
||||
for (String user : presenceMap.keySet()) {
|
||||
Map<String, Presence> resources = presenceMap.get(user);
|
||||
|
|
@ -704,10 +733,18 @@ public class Roster {
|
|||
for (String resource : resources.keySet()) {
|
||||
packetUnavailable = new Presence(Presence.Type.unavailable);
|
||||
packetUnavailable.setFrom(user + "/" + resource);
|
||||
presencePacketListener.processPacket(packetUnavailable);
|
||||
try {
|
||||
presencePacketListener.processPacket(packetUnavailable);
|
||||
}
|
||||
catch (NotConnectedException e) {
|
||||
throw new IllegalStateException(
|
||||
"presencePakcetListener should never throw a NotConnectedException when processPacket is called with a presence of type unavailable",
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
loaded = false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -719,8 +756,8 @@ public class Roster {
|
|||
* @param updatedEntries the collection of address of the updated contacts.
|
||||
* @param deletedEntries the collection of address of the deleted contacts.
|
||||
*/
|
||||
private void fireRosterChangedEvent(Collection<String> addedEntries, Collection<String> updatedEntries,
|
||||
Collection<String> deletedEntries) {
|
||||
private void fireRosterChangedEvent(final Collection<String> addedEntries, final Collection<String> updatedEntries,
|
||||
final Collection<String> deletedEntries) {
|
||||
for (RosterListener listener : rosterListeners) {
|
||||
if (!addedEntries.isEmpty()) {
|
||||
listener.entriesAdded(addedEntries);
|
||||
|
|
@ -739,7 +776,7 @@ public class Roster {
|
|||
*
|
||||
* @param presence the presence change.
|
||||
*/
|
||||
private void fireRosterPresenceEvent(Presence presence) {
|
||||
private void fireRosterPresenceEvent(final Presence presence) {
|
||||
for (RosterListener listener : rosterListeners) {
|
||||
listener.presenceChanged(presence);
|
||||
}
|
||||
|
|
@ -1068,7 +1105,7 @@ public class Roster {
|
|||
}
|
||||
}
|
||||
|
||||
rosterInitialized = true;
|
||||
loaded = true;
|
||||
synchronized (Roster.this) {
|
||||
Roster.this.notifyAll();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -246,16 +246,20 @@ public interface XMPPConnection {
|
|||
public void removePacketCollector(PacketCollector collector);
|
||||
|
||||
/**
|
||||
* Registers a packet listener with this connection. A packet listener will be invoked only
|
||||
* when an incoming packet is received. A packet filter determines
|
||||
* which packets will be delivered to the listener. If the same packet listener
|
||||
* is added again with a different filter, only the new filter will be used.
|
||||
*
|
||||
* NOTE: If you want get a similar callback for outgoing packets, see {@link #addPacketInterceptor(PacketListener, PacketFilter)}.
|
||||
*
|
||||
* Registers a packet listener with this connection.
|
||||
* <p>
|
||||
* This method has been deprecated. It is important to differentiate between using an asynchronous packet listener
|
||||
* (preferred where possible) and a synchronous packet lister. Refer
|
||||
* {@link #addAsyncPacketListener(PacketListener, PacketFilter)} and
|
||||
* {@link #addSyncPacketListener(PacketListener, PacketFilter)} for more information.
|
||||
* </p>
|
||||
*
|
||||
* @param packetListener the packet listener to notify of new received packets.
|
||||
* @param packetFilter the packet filter to use.
|
||||
* @param packetFilter the packet filter to use.
|
||||
* @deprecated use {@link #addAsyncPacketListener(PacketListener, PacketFilter)} or
|
||||
* {@link #addSyncPacketListener(PacketListener, PacketFilter)}.
|
||||
*/
|
||||
@Deprecated
|
||||
public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter);
|
||||
|
||||
/**
|
||||
|
|
@ -263,22 +267,54 @@ public interface XMPPConnection {
|
|||
*
|
||||
* @param packetListener the packet listener to remove.
|
||||
* @return true if the packet listener was removed
|
||||
* @deprecated use {@link #removeAsyncPacketListener(PacketListener)} or {@link #removeSyncPacketListener(PacketListener)}.
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean removePacketListener(PacketListener packetListener);
|
||||
|
||||
/**
|
||||
* Registers a <b>synchronous</b> packet listener with this connection. A packet listener will be invoked only when
|
||||
* an incoming packet is received. A packet filter determines which packets will be delivered to the listener. If
|
||||
* the same packet listener is added again with a different filter, only the new filter will be used.
|
||||
* <p>
|
||||
* <b>Important:</b> This packet listeners will be called in the same <i>single</i> thread that processes all
|
||||
* incoming stanzas. Only use this kind of packet filter if it does not perform any XMPP activity that waits for a
|
||||
* response. Consider using {@link #addAsyncPacketListener(PacketListener, PacketFilter)} when possible, i.e. when
|
||||
* the invocation order doesn't have to be the same as the order of the arriving packets. If the order of the
|
||||
* arriving packets, consider using a {@link PacketCollector} when possible.
|
||||
* </p>
|
||||
*
|
||||
* @param packetListener the packet listener to notify of new received packets.
|
||||
* @param packetFilter the packet filter to use.
|
||||
* @see {@link #addPacketInterceptor(PacketListener, PacketFilter)} for a similar callback for outgoing stanzas.
|
||||
* @since 4.1
|
||||
*/
|
||||
public void addSyncPacketListener(PacketListener packetListener, PacketFilter packetFilter);
|
||||
|
||||
/**
|
||||
* Removes a packet listener for received packets from this connection.
|
||||
*
|
||||
* @param packetListener the packet listener to remove.
|
||||
* @return true if the packet listener was removed
|
||||
* @since 4.1
|
||||
*/
|
||||
public boolean removeSyncPacketListener(PacketListener packetListener);
|
||||
|
||||
/**
|
||||
* Registers an <b>asynchronous</b> packet listener with this connection. A packet listener will be invoked only
|
||||
* when an incoming packet is received. A packet filter determines which packets will be delivered to the listener.
|
||||
* If the same packet listener is added again with a different filter, only the new filter will be used.
|
||||
* <p>
|
||||
* Unlike {@link #addPacketListener(PacketListener, PacketFilter)} packet listeners added with this method will be
|
||||
* Unlike {@link #addAsyncPacketListener(PacketListener, PacketFilter)} packet listeners added with this method will be
|
||||
* invoked asynchronously in their own thread. Use this method if the order of the packet listeners must not depend
|
||||
* on the order how the stanzas where received.
|
||||
* </p>
|
||||
*
|
||||
* @param packetListener the packet listener to notify of new received packets.
|
||||
* @param packetFilter the packet filter to use.
|
||||
*/
|
||||
* @see {@link #addPacketInterceptor(PacketListener, PacketFilter)} for a similar callback for outgoing stanzas.
|
||||
* @since 4.1
|
||||
*/
|
||||
public void addAsyncPacketListener(PacketListener packetListener, PacketFilter packetFilter);
|
||||
|
||||
/**
|
||||
|
|
@ -286,6 +322,7 @@ public interface XMPPConnection {
|
|||
*
|
||||
* @param packetListener the packet listener to remove.
|
||||
* @return true if the packet listener was removed
|
||||
* @since 4.1
|
||||
*/
|
||||
public boolean removeAsyncPacketListener(PacketListener packetListener);
|
||||
|
||||
|
|
@ -316,7 +353,7 @@ public interface XMPPConnection {
|
|||
* will be delivered to the interceptor.
|
||||
*
|
||||
* <p>
|
||||
* NOTE: For a similar functionality on incoming packets, see {@link #addPacketListener(PacketListener, PacketFilter)}.
|
||||
* NOTE: For a similar functionality on incoming packets, see {@link #addAsyncPacketListener(PacketListener, PacketFilter)}.
|
||||
*
|
||||
* @param packetInterceptor the packet interceptor to notify of packets about to be sent.
|
||||
* @param packetFilter the packet filter to use.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue