mirror of
https://codeberg.org/Mercury-IM/Smack
synced 2025-09-10 18:59:41 +02:00
File transfer upgrade, 1.5 and beautification.
Fixed fault tolerant negotiator. SMACK-128 git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/trunk@7616 b35dd754-fafc-0310-a699-88a17e54d16e
This commit is contained in:
parent
93766ee788
commit
c95c8b7e3a
10 changed files with 629 additions and 453 deletions
|
@ -31,20 +31,17 @@ import org.jivesoftware.smack.packet.IQ;
|
|||
import org.jivesoftware.smack.packet.Packet;
|
||||
import org.jivesoftware.smack.packet.XMPPError;
|
||||
import org.jivesoftware.smack.util.StringUtils;
|
||||
import org.jivesoftware.smack.util.Cache;
|
||||
import org.jivesoftware.smackx.ServiceDiscoveryManager;
|
||||
import org.jivesoftware.smackx.packet.Bytestream;
|
||||
import org.jivesoftware.smackx.packet.Bytestream.StreamHost;
|
||||
import org.jivesoftware.smackx.packet.Bytestream.StreamHostUsed;
|
||||
import org.jivesoftware.smackx.packet.DiscoverInfo;
|
||||
import org.jivesoftware.smackx.packet.DiscoverInfo.Identity;
|
||||
import org.jivesoftware.smackx.packet.DiscoverItems;
|
||||
import org.jivesoftware.smackx.packet.DiscoverItems.Item;
|
||||
import org.jivesoftware.smackx.packet.StreamInitiation;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.*;
|
||||
import java.util.*;
|
||||
import java.net.InetAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* A SOCKS5 bytestream is negotiated partly over the XMPP XML stream and partly
|
||||
|
@ -80,28 +77,17 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
*/
|
||||
private static final int CONNECT_FAILURE_THRESHOLD = 2;
|
||||
|
||||
private static final long BLACKLIST_LIFETIME = 60 * 1000 * 120;
|
||||
|
||||
public static boolean isAllowLocalProxyHost = true;
|
||||
|
||||
private final XMPPConnection connection;
|
||||
|
||||
private List<String> proxies;
|
||||
private Socks5TransferNegotiatorManager transferNegotiatorManager;
|
||||
|
||||
private List<String> streamHosts;
|
||||
|
||||
// locks the proxies during their initialization process
|
||||
private final Object proxyLock = new Object();
|
||||
|
||||
private ProxyProcess proxyProcess;
|
||||
|
||||
// locks on the proxy process during its initiatilization process
|
||||
private final Object processLock = new Object();
|
||||
|
||||
private final Cache addressBlacklist = new Cache(100, BLACKLIST_LIFETIME);
|
||||
|
||||
public Socks5TransferNegotiator(final XMPPConnection connection) {
|
||||
public Socks5TransferNegotiator(Socks5TransferNegotiatorManager transferNegotiatorManager,
|
||||
final XMPPConnection connection)
|
||||
{
|
||||
this.connection = connection;
|
||||
this.transferNegotiatorManager = transferNegotiatorManager;
|
||||
}
|
||||
|
||||
public PacketFilter getInitiationPacketFilter(String from, String sessionID) {
|
||||
|
@ -117,7 +103,6 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
*/
|
||||
InputStream negotiateIncomingStream(Packet streamInitiation)
|
||||
throws XMPPException {
|
||||
|
||||
Bytestream streamHostsInfo = (Bytestream) streamInitiation;
|
||||
|
||||
if (streamHostsInfo.getType().equals(IQ.Type.ERROR)) {
|
||||
|
@ -135,7 +120,7 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
ex.getXMPPError());
|
||||
connection.sendPacket(errorPacket);
|
||||
}
|
||||
throw(ex);
|
||||
throw (ex);
|
||||
}
|
||||
|
||||
// send used-host confirmation
|
||||
|
@ -145,7 +130,11 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
connection.sendPacket(streamResponse);
|
||||
|
||||
try {
|
||||
return selectedHost.establishedSocket.getInputStream();
|
||||
PushbackInputStream stream = new PushbackInputStream(
|
||||
selectedHost.establishedSocket.getInputStream());
|
||||
int firstByte = stream.read();
|
||||
stream.unread(firstByte);
|
||||
return stream;
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new XMPPException("Error establishing input stream", e);
|
||||
|
@ -184,13 +173,12 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
* Selects a host to connect to over which the file will be transmitted.
|
||||
*
|
||||
* @param streamHostsInfo the packet recieved from the initiator containing the available hosts
|
||||
* to transfer the file
|
||||
* to transfer the file
|
||||
* @return the selected host and socket that were created.
|
||||
* @throws XMPPException when there is no appropriate host.
|
||||
*/
|
||||
private SelectedHostInfo selectHost(Bytestream streamHostsInfo)
|
||||
throws XMPPException
|
||||
{
|
||||
throws XMPPException {
|
||||
Iterator it = streamHostsInfo.getStreamHosts().iterator();
|
||||
StreamHost selectedHost = null;
|
||||
Socket socket = null;
|
||||
|
@ -200,7 +188,7 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
|
||||
// Check to see if this address has been blacklisted
|
||||
int failures = getConnectionFailures(address);
|
||||
if(failures >= CONNECT_FAILURE_THRESHOLD) {
|
||||
if (failures >= CONNECT_FAILURE_THRESHOLD) {
|
||||
continue;
|
||||
}
|
||||
// establish socket
|
||||
|
@ -219,7 +207,7 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
socket = null;
|
||||
}
|
||||
}
|
||||
if (selectedHost == null || socket == null || !socket.isConnected()) {
|
||||
if (selectedHost == null || socket == null || !socket.isConnected()) {
|
||||
String errorMessage = "Could not establish socket with any provided host";
|
||||
throw new XMPPException(errorMessage, new XMPPError(
|
||||
XMPPError.Condition.no_acceptable, errorMessage));
|
||||
|
@ -229,19 +217,11 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
}
|
||||
|
||||
private void incrementConnectionFailures(String address) {
|
||||
Integer count = (Integer) addressBlacklist.get(address);
|
||||
if(count == null) {
|
||||
count = new Integer(1);
|
||||
}
|
||||
else {
|
||||
count = new Integer(count.intValue() + 1);
|
||||
}
|
||||
addressBlacklist.put(address, count);
|
||||
transferNegotiatorManager.incrementConnectionFailures(address);
|
||||
}
|
||||
|
||||
private int getConnectionFailures(String address) {
|
||||
Integer count = (Integer) addressBlacklist.get(address);
|
||||
return (count != null ? count.intValue() : 0);
|
||||
return transferNegotiatorManager.getConnectionFailures(address);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -292,9 +272,8 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
}
|
||||
|
||||
private Socket initBytestreamSocket(final String sessionID,
|
||||
String initiator, String target) throws Exception
|
||||
{
|
||||
ProxyProcess process;
|
||||
String initiator, String target) throws Exception {
|
||||
Socks5TransferNegotiatorManager.ProxyProcess process;
|
||||
try {
|
||||
process = establishListeningSocket();
|
||||
}
|
||||
|
@ -328,14 +307,14 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
* @param sessionID The session id of the stream.
|
||||
* @param proxy The server socket which will listen locally for remote
|
||||
* connections.
|
||||
* @param digest the digest of the userids and the session id
|
||||
* @param query the query which the response is being awaited
|
||||
* @param digest the digest of the userids and the session id
|
||||
* @param query the query which the response is being awaited
|
||||
* @return the selected host
|
||||
* @throws XMPPException when the response from the peer is an error or doesn't occur
|
||||
* @throws IOException when there is an error establishing the local socket
|
||||
* @throws IOException when there is an error establishing the local socket
|
||||
*/
|
||||
private SelectedHostInfo waitForUsedHostResponse(String sessionID,
|
||||
final ProxyProcess proxy, final String digest,
|
||||
final Socks5TransferNegotiatorManager.ProxyProcess proxy, final String digest,
|
||||
final Bytestream query) throws XMPPException, IOException
|
||||
{
|
||||
SelectedHostInfo info = new SelectedHostInfo();
|
||||
|
@ -394,22 +373,13 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
}
|
||||
}
|
||||
|
||||
private ProxyProcess establishListeningSocket() throws IOException {
|
||||
synchronized (processLock) {
|
||||
if (proxyProcess == null) {
|
||||
proxyProcess = new ProxyProcess(new ServerSocket(7777));
|
||||
proxyProcess.start();
|
||||
}
|
||||
}
|
||||
proxyProcess.addTransfer();
|
||||
return proxyProcess;
|
||||
private Socks5TransferNegotiatorManager.ProxyProcess establishListeningSocket()
|
||||
throws IOException {
|
||||
return transferNegotiatorManager.addTransfer();
|
||||
}
|
||||
|
||||
private void cleanupListeningSocket() {
|
||||
if (proxyProcess == null) {
|
||||
return;
|
||||
}
|
||||
proxyProcess.removeTransfer();
|
||||
transferNegotiatorManager.removeTransfer();
|
||||
}
|
||||
|
||||
private String discoverLocalIP() throws UnknownHostException {
|
||||
|
@ -439,34 +409,31 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
* </iq>
|
||||
* </pre>
|
||||
*
|
||||
* @param from initiator@host1/foo - the file transfer initiator.
|
||||
* @param to target@host2/bar - the file transfer target.
|
||||
* @param sid 'mySID' - the unique identifier for this file transfer
|
||||
* @param from initiator@host1/foo - the file transfer initiator.
|
||||
* @param to target@host2/bar - the file transfer target.
|
||||
* @param sid 'mySID' - the unique identifier for this file transfer
|
||||
* @param localIP the IP of the local machine if it is being provided, null otherwise.
|
||||
* @param port the port of the local mahine if it is being provided, null otherwise.
|
||||
* @param port the port of the local mahine if it is being provided, null otherwise.
|
||||
* @return the created <b><i>Bytestream</b></i> packet
|
||||
*/
|
||||
private Bytestream createByteStreamInit(final String from, final String to,
|
||||
final String sid, final String localIP, final int port) {
|
||||
final String sid, final String localIP, final int port)
|
||||
{
|
||||
Bytestream bs = new Bytestream();
|
||||
bs.setTo(to);
|
||||
bs.setFrom(from);
|
||||
bs.setSessionID(sid);
|
||||
bs.setType(IQ.Type.SET);
|
||||
bs.setMode(Bytestream.Mode.TCP);
|
||||
bs.setMode(Bytestream.Mode.tcp);
|
||||
if (localIP != null && port > 0) {
|
||||
bs.addStreamHost(from, localIP, port);
|
||||
}
|
||||
// make sure the proxies have been initialized completely
|
||||
synchronized (proxyLock) {
|
||||
if (proxies == null) {
|
||||
initProxies();
|
||||
}
|
||||
}
|
||||
Collection<Bytestream.StreamHost> streamHosts = transferNegotiatorManager.getStreamHosts();
|
||||
|
||||
if (streamHosts != null) {
|
||||
Iterator it = streamHosts.iterator();
|
||||
while (it.hasNext()) {
|
||||
bs.addStreamHost((StreamHost) it.next());
|
||||
for (StreamHost host : streamHosts) {
|
||||
bs.addStreamHost(host);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -474,101 +441,20 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Checks the service discovery item returned from a server component to verify if it is
|
||||
* a File Transfer proxy or not.
|
||||
*
|
||||
* @param manager the service discovery manager which will be used to query the component
|
||||
* @param item the discovered item on the server relating
|
||||
* @return returns the JID of the proxy if it is a proxy or null if the item is not a proxy.
|
||||
*/
|
||||
private String checkIsProxy(ServiceDiscoveryManager manager, Item item) {
|
||||
DiscoverInfo info;
|
||||
try {
|
||||
info = manager.discoverInfo(item.getEntityID());
|
||||
}
|
||||
catch (XMPPException e) {
|
||||
return null;
|
||||
}
|
||||
Iterator itx = info.getIdentities();
|
||||
while (itx.hasNext()) {
|
||||
DiscoverInfo.Identity identity = (Identity) itx.next();
|
||||
if ("proxy".equalsIgnoreCase(identity.getCategory())
|
||||
&& "bytestreams".equalsIgnoreCase(
|
||||
identity.getType())) {
|
||||
return info.getFrom();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void initProxies() {
|
||||
proxies = new ArrayList<String>();
|
||||
ServiceDiscoveryManager manager = ServiceDiscoveryManager
|
||||
.getInstanceFor(connection);
|
||||
try {
|
||||
DiscoverItems discoItems = manager.discoverItems(connection.getServiceName());
|
||||
Iterator it = discoItems.getItems();
|
||||
while (it.hasNext()) {
|
||||
DiscoverItems.Item item = (Item) it.next();
|
||||
String proxy = checkIsProxy(manager, item);
|
||||
if(proxy != null) {
|
||||
proxies.add(proxy);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (XMPPException e) {
|
||||
return;
|
||||
}
|
||||
if (proxies.size() > 0) {
|
||||
initStreamHosts();
|
||||
}
|
||||
}
|
||||
|
||||
private void initStreamHosts() {
|
||||
List streamHosts = new ArrayList();
|
||||
Iterator it = proxies.iterator();
|
||||
IQ query;
|
||||
PacketCollector collector;
|
||||
Bytestream response;
|
||||
while (it.hasNext()) {
|
||||
String jid = it.next().toString();
|
||||
query = new IQ() {
|
||||
public String getChildElementXML() {
|
||||
return "<query xmlns=\"http://jabber.org/protocol/bytestreams\"/>";
|
||||
}
|
||||
};
|
||||
query.setType(IQ.Type.GET);
|
||||
query.setTo(jid);
|
||||
|
||||
collector = connection.createPacketCollector(new PacketIDFilter(
|
||||
query.getPacketID()));
|
||||
connection.sendPacket(query);
|
||||
|
||||
response = (Bytestream) collector.nextResult(SmackConfiguration
|
||||
.getPacketReplyTimeout());
|
||||
if (response != null) {
|
||||
streamHosts.addAll(response.getStreamHosts());
|
||||
}
|
||||
collector.cancel();
|
||||
}
|
||||
this.streamHosts = streamHosts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the packet to send notification to the stream host to activate
|
||||
* the stream.
|
||||
*
|
||||
* @param sessionID the session ID of the file transfer to activate.
|
||||
* @param from
|
||||
* @param to the JID of the stream host
|
||||
* @param target the JID of the file transfer target.
|
||||
* @param from the sender of the bytestreeam
|
||||
* @param to the JID of the stream host
|
||||
* @param target the JID of the file transfer target.
|
||||
* @return the packet to send notification to the stream host to
|
||||
* activate the stream.
|
||||
*/
|
||||
private static Bytestream createByteStreamActivate(final String sessionID,
|
||||
final String from, final String to, final String target) {
|
||||
final String from, final String to, final String target)
|
||||
{
|
||||
Bytestream activate = new Bytestream(sessionID);
|
||||
activate.setMode(null);
|
||||
activate.setToActivate(target);
|
||||
|
@ -578,61 +464,6 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
return activate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Negotiates the Socks 5 bytestream when the local computer is acting as
|
||||
* the proxy.
|
||||
*
|
||||
* @param connection the socket connection with the peer.
|
||||
* @return the SHA-1 digest that is used to uniquely identify the file
|
||||
* transfer.
|
||||
* @throws XMPPException
|
||||
* @throws IOException
|
||||
*/
|
||||
private String establishSocks5UploadConnection(Socket connection) throws XMPPException, IOException {
|
||||
OutputStream out = new DataOutputStream(connection.getOutputStream());
|
||||
InputStream in = new DataInputStream(connection.getInputStream());
|
||||
|
||||
// first byte is version should be 5
|
||||
int b = in.read();
|
||||
if (b != 5) {
|
||||
throw new XMPPException("Only SOCKS5 supported");
|
||||
}
|
||||
|
||||
// second byte number of authentication methods supported
|
||||
b = in.read();
|
||||
int[] auth = new int[b];
|
||||
for (int i = 0; i < b; i++) {
|
||||
auth[i] = in.read();
|
||||
}
|
||||
|
||||
int authMethod = -1;
|
||||
for (int i = 0; i < auth.length; i++) {
|
||||
authMethod = (auth[i] == 0 ? 0 : -1); // only auth method
|
||||
// 0, no
|
||||
// authentication,
|
||||
// supported
|
||||
if (authMethod == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (authMethod != 0) {
|
||||
throw new XMPPException("Authentication method not supported");
|
||||
}
|
||||
byte[] cmd = new byte[2];
|
||||
cmd[0] = (byte) 0x05;
|
||||
cmd[1] = (byte) 0x00;
|
||||
out.write(cmd);
|
||||
|
||||
String responseDigest = createIncomingSocks5Message(in);
|
||||
cmd = createOutgoingSocks5Message(0, responseDigest);
|
||||
|
||||
if (!connection.isConnected()) {
|
||||
throw new XMPPException("Socket closed by remote user");
|
||||
}
|
||||
out.write(cmd);
|
||||
return responseDigest;
|
||||
}
|
||||
|
||||
public String[] getNamespaces() {
|
||||
return new String[]{NAMESPACE};
|
||||
}
|
||||
|
@ -659,7 +490,7 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
createIncomingSocks5Message(in);
|
||||
}
|
||||
|
||||
private String createIncomingSocks5Message(InputStream in)
|
||||
static String createIncomingSocks5Message(InputStream in)
|
||||
throws IOException {
|
||||
byte[] cmd = new byte[5];
|
||||
in.read(cmd, 0, 5);
|
||||
|
@ -673,7 +504,7 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
return digest;
|
||||
}
|
||||
|
||||
private byte[] createOutgoingSocks5Message(int cmd, String digest) {
|
||||
static byte[] createOutgoingSocks5Message(int cmd, String digest) {
|
||||
byte addr[] = digest.getBytes();
|
||||
|
||||
byte[] data = new byte[7 + addr.length];
|
||||
|
@ -691,14 +522,7 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
}
|
||||
|
||||
public void cleanup() {
|
||||
synchronized (processLock) {
|
||||
if (proxyProcess != null) {
|
||||
proxyProcess.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void cancel() {
|
||||
}
|
||||
|
||||
private static class SelectedHostInfo {
|
||||
|
@ -718,132 +542,6 @@ public class Socks5TransferNegotiator extends StreamNegotiator {
|
|||
}
|
||||
}
|
||||
|
||||
private class ProxyProcess implements Runnable {
|
||||
|
||||
private final ServerSocket listeningSocket;
|
||||
|
||||
private final Map connectionMap = new HashMap();
|
||||
|
||||
private boolean done = false;
|
||||
|
||||
private Thread thread;
|
||||
private int transfers;
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
try {
|
||||
listeningSocket.setSoTimeout(10000);
|
||||
}
|
||||
catch (SocketException e) {
|
||||
// There was a TCP error, lets print the stack trace
|
||||
e.printStackTrace();
|
||||
return;
|
||||
}
|
||||
while (!done) {
|
||||
Socket conn = null;
|
||||
synchronized (ProxyProcess.this) {
|
||||
while (transfers <= 0 && !done) {
|
||||
transfers = -1;
|
||||
try {
|
||||
ProxyProcess.this.wait();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
/* Do nothing */
|
||||
}
|
||||
}
|
||||
}
|
||||
if(done) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
synchronized (listeningSocket) {
|
||||
conn = listeningSocket.accept();
|
||||
}
|
||||
if (conn == null) {
|
||||
continue;
|
||||
}
|
||||
String digest = establishSocks5UploadConnection(conn);
|
||||
synchronized (connectionMap) {
|
||||
connectionMap.put(digest, conn);
|
||||
}
|
||||
}
|
||||
catch (SocketTimeoutException e) {
|
||||
/* Do Nothing */
|
||||
}
|
||||
catch (IOException e) {
|
||||
/* Do Nothing */
|
||||
}
|
||||
catch (XMPPException e) {
|
||||
e.printStackTrace();
|
||||
if (conn != null) {
|
||||
try {
|
||||
conn.close();
|
||||
}
|
||||
catch (IOException e1) {
|
||||
/* Do Nothing */
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
try {
|
||||
listeningSocket.close();
|
||||
}
|
||||
catch (IOException e) {
|
||||
/* Do Nothing */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void start() {
|
||||
thread.start();
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
done = true;
|
||||
synchronized (this) {
|
||||
this.notify();
|
||||
}
|
||||
synchronized (listeningSocket) {
|
||||
listeningSocket.notify();
|
||||
}
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
return listeningSocket.getLocalPort();
|
||||
}
|
||||
|
||||
ProxyProcess(ServerSocket listeningSocket) {
|
||||
thread = new Thread(this, "File Transfer Connection Listener");
|
||||
this.listeningSocket = listeningSocket;
|
||||
}
|
||||
|
||||
public Socket getSocket(String digest) {
|
||||
synchronized (connectionMap) {
|
||||
return (Socket) connectionMap.get(digest);
|
||||
}
|
||||
}
|
||||
|
||||
public void addTransfer() {
|
||||
synchronized (this) {
|
||||
if (transfers == -1) {
|
||||
transfers = 1;
|
||||
this.notify();
|
||||
}
|
||||
else {
|
||||
transfers++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void removeTransfer() {
|
||||
synchronized (this) {
|
||||
transfers--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class BytestreamSIDFilter implements PacketFilter {
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue