mirror of
https://codeberg.org/Mercury-IM/Smack
synced 2025-09-10 18:59:41 +02:00
merged branch improve_bytestreams in trunk
git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/trunk@11821 b35dd754-fafc-0310-a699-88a17e54d16e
This commit is contained in:
parent
ef74695a1b
commit
8b54f34153
74 changed files with 11866 additions and 1902 deletions
|
@ -0,0 +1,795 @@
|
|||
/**
|
||||
* All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.jivesoftware.smackx.bytestreams.ibb;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.jivesoftware.smack.Connection;
|
||||
import org.jivesoftware.smack.PacketListener;
|
||||
import org.jivesoftware.smack.XMPPException;
|
||||
import org.jivesoftware.smack.filter.AndFilter;
|
||||
import org.jivesoftware.smack.filter.PacketFilter;
|
||||
import org.jivesoftware.smack.filter.PacketTypeFilter;
|
||||
import org.jivesoftware.smack.packet.IQ;
|
||||
import org.jivesoftware.smack.packet.Message;
|
||||
import org.jivesoftware.smack.packet.Packet;
|
||||
import org.jivesoftware.smack.packet.PacketExtension;
|
||||
import org.jivesoftware.smack.packet.XMPPError;
|
||||
import org.jivesoftware.smack.util.StringUtils;
|
||||
import org.jivesoftware.smackx.bytestreams.BytestreamSession;
|
||||
import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
|
||||
import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
|
||||
import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
|
||||
import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
|
||||
import org.jivesoftware.smackx.packet.SyncPacketSend;
|
||||
|
||||
/**
|
||||
* InBandBytestreamSession class represents an In-Band Bytestream session.
|
||||
* <p>
|
||||
* In-band bytestreams are bidirectional and this session encapsulates the streams for both
|
||||
* directions.
|
||||
* <p>
|
||||
* Note that closing the In-Band Bytestream session will close both streams. If both streams are
|
||||
* closed individually the session will be closed automatically once the second stream is closed.
|
||||
* Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
|
||||
* automatically if one of them is closed.
|
||||
*
|
||||
* @author Henning Staib
|
||||
*/
|
||||
public class InBandBytestreamSession implements BytestreamSession {
|
||||
|
||||
/* XMPP connection */
|
||||
private final Connection connection;
|
||||
|
||||
/* the In-Band Bytestream open request for this session */
|
||||
private final Open byteStreamRequest;
|
||||
|
||||
/*
|
||||
* the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
|
||||
*/
|
||||
private IBBInputStream inputStream;
|
||||
|
||||
/*
|
||||
* the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
|
||||
*/
|
||||
private IBBOutputStream outputStream;
|
||||
|
||||
/* JID of the remote peer */
|
||||
private String remoteJID;
|
||||
|
||||
/* flag to close both streams if one of them is closed */
|
||||
private boolean closeBothStreamsEnabled = false;
|
||||
|
||||
/* flag to indicate if session is closed */
|
||||
private boolean isClosed = false;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param connection the XMPP connection
|
||||
* @param byteStreamRequest the In-Band Bytestream open request for this session
|
||||
* @param remoteJID JID of the remote peer
|
||||
*/
|
||||
protected InBandBytestreamSession(Connection connection, Open byteStreamRequest,
|
||||
String remoteJID) {
|
||||
this.connection = connection;
|
||||
this.byteStreamRequest = byteStreamRequest;
|
||||
this.remoteJID = remoteJID;
|
||||
|
||||
// initialize streams dependent to the uses stanza type
|
||||
switch (byteStreamRequest.getStanza()) {
|
||||
case IQ:
|
||||
this.inputStream = new IQIBBInputStream();
|
||||
this.outputStream = new IQIBBOutputStream();
|
||||
break;
|
||||
case MESSAGE:
|
||||
this.inputStream = new MessageIBBInputStream();
|
||||
this.outputStream = new MessageIBBOutputStream();
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public InputStream getInputStream() {
|
||||
return this.inputStream;
|
||||
}
|
||||
|
||||
public OutputStream getOutputStream() {
|
||||
return this.outputStream;
|
||||
}
|
||||
|
||||
public int getReadTimeout() {
|
||||
return this.inputStream.readTimeout;
|
||||
}
|
||||
|
||||
public void setReadTimeout(int timeout) {
|
||||
if (timeout < 0) {
|
||||
throw new IllegalArgumentException("Timeout must be >= 0");
|
||||
}
|
||||
this.inputStream.readTimeout = timeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether both streams should be closed automatically if one of the streams is closed.
|
||||
* Default is <code>false</code>.
|
||||
*
|
||||
* @return <code>true</code> if both streams will be closed if one of the streams is closed,
|
||||
* <code>false</code> if both streams can be closed independently.
|
||||
*/
|
||||
public boolean isCloseBothStreamsEnabled() {
|
||||
return closeBothStreamsEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets whether both streams should be closed automatically if one of the streams is closed.
|
||||
* Default is <code>false</code>.
|
||||
*
|
||||
* @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
|
||||
* the streams is closed, <code>false</code> if both streams should be closed
|
||||
* independently
|
||||
*/
|
||||
public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
|
||||
this.closeBothStreamsEnabled = closeBothStreamsEnabled;
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
closeByLocal(true); // close input stream
|
||||
closeByLocal(false); // close output stream
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is invoked if a request to close the In-Band Bytestream has been received.
|
||||
*
|
||||
* @param closeRequest the close request from the remote peer
|
||||
*/
|
||||
protected void closeByPeer(Close closeRequest) {
|
||||
|
||||
/*
|
||||
* close streams without flushing them, because stream is already considered closed on the
|
||||
* remote peers side
|
||||
*/
|
||||
this.inputStream.closeInternal();
|
||||
this.inputStream.cleanup();
|
||||
this.outputStream.closeInternal(false);
|
||||
|
||||
// acknowledge close request
|
||||
IQ confirmClose = IQ.createResultIQ(closeRequest);
|
||||
this.connection.sendPacket(confirmClose);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is invoked if one of the streams has been closed locally, if an error occurred
|
||||
* locally or if the whole session should be closed.
|
||||
*
|
||||
* @throws IOException if an error occurs while sending the close request
|
||||
*/
|
||||
protected synchronized void closeByLocal(boolean in) throws IOException {
|
||||
if (this.isClosed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.closeBothStreamsEnabled) {
|
||||
this.inputStream.closeInternal();
|
||||
this.outputStream.closeInternal(true);
|
||||
}
|
||||
else {
|
||||
if (in) {
|
||||
this.inputStream.closeInternal();
|
||||
}
|
||||
else {
|
||||
// close stream but try to send any data left
|
||||
this.outputStream.closeInternal(true);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.inputStream.isClosed && this.outputStream.isClosed) {
|
||||
this.isClosed = true;
|
||||
|
||||
// send close request
|
||||
Close close = new Close(this.byteStreamRequest.getSessionID());
|
||||
close.setTo(this.remoteJID);
|
||||
try {
|
||||
SyncPacketSend.getReply(this.connection, close);
|
||||
}
|
||||
catch (XMPPException e) {
|
||||
throw new IOException("Error while closing stream: " + e.getMessage());
|
||||
}
|
||||
|
||||
this.inputStream.cleanup();
|
||||
|
||||
// remove session from manager
|
||||
InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(this);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
|
||||
* Subclasses of this input stream must provide a packet listener along with a packet filter to
|
||||
* collect the In-Band Bytestream data packets.
|
||||
*/
|
||||
private abstract class IBBInputStream extends InputStream {
|
||||
|
||||
/* the data packet listener to fill the data queue */
|
||||
private final PacketListener dataPacketListener;
|
||||
|
||||
/* queue containing received In-Band Bytestream data packets */
|
||||
protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();
|
||||
|
||||
/* buffer containing the data from one data packet */
|
||||
private byte[] buffer;
|
||||
|
||||
/* pointer to the next byte to read from buffer */
|
||||
private int bufferPointer = -1;
|
||||
|
||||
/* data packet sequence (range from 0 to 65535) */
|
||||
private long seq = -1;
|
||||
|
||||
/* flag to indicate if input stream is closed */
|
||||
private boolean isClosed = false;
|
||||
|
||||
/* flag to indicate if close method was invoked */
|
||||
private boolean closeInvoked = false;
|
||||
|
||||
/* timeout for read operations */
|
||||
private int readTimeout = 0;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
public IBBInputStream() {
|
||||
// add data packet listener to connection
|
||||
this.dataPacketListener = getDataPacketListener();
|
||||
connection.addPacketListener(this.dataPacketListener, getDataPacketFilter());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the packet listener that processes In-Band Bytestream data packets.
|
||||
*
|
||||
* @return the data packet listener
|
||||
*/
|
||||
protected abstract PacketListener getDataPacketListener();
|
||||
|
||||
/**
|
||||
* Returns the packet filter that accepts In-Band Bytestream data packets.
|
||||
*
|
||||
* @return the data packet filter
|
||||
*/
|
||||
protected abstract PacketFilter getDataPacketFilter();
|
||||
|
||||
public synchronized int read() throws IOException {
|
||||
checkClosed();
|
||||
|
||||
// if nothing read yet or whole buffer has been read fill buffer
|
||||
if (bufferPointer == -1 || bufferPointer >= buffer.length) {
|
||||
// if no data available and stream was closed return -1
|
||||
if (!loadBuffer()) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// return byte and increment buffer pointer
|
||||
return (int) buffer[bufferPointer++];
|
||||
}
|
||||
|
||||
public synchronized int read(byte[] b, int off, int len) throws IOException {
|
||||
if (b == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
|
||||
|| ((off + len) < 0)) {
|
||||
throw new IndexOutOfBoundsException();
|
||||
}
|
||||
else if (len == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
checkClosed();
|
||||
|
||||
// if nothing read yet or whole buffer has been read fill buffer
|
||||
if (bufferPointer == -1 || bufferPointer >= buffer.length) {
|
||||
// if no data available and stream was closed return -1
|
||||
if (!loadBuffer()) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// if more bytes wanted than available return all available
|
||||
int bytesAvailable = buffer.length - bufferPointer;
|
||||
if (len > bytesAvailable) {
|
||||
len = bytesAvailable;
|
||||
}
|
||||
|
||||
System.arraycopy(buffer, bufferPointer, b, off, len);
|
||||
bufferPointer += len;
|
||||
return len;
|
||||
}
|
||||
|
||||
public synchronized int read(byte[] b) throws IOException {
|
||||
return read(b, 0, b.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method blocks until a data packet is received, the stream is closed or the current
|
||||
* thread is interrupted.
|
||||
*
|
||||
* @return <code>true</code> if data was received, otherwise <code>false</code>
|
||||
* @throws IOException if data packets are out of sequence
|
||||
*/
|
||||
private synchronized boolean loadBuffer() throws IOException {
|
||||
|
||||
// wait until data is available or stream is closed
|
||||
DataPacketExtension data = null;
|
||||
try {
|
||||
if (this.readTimeout == 0) {
|
||||
while (data == null) {
|
||||
if (isClosed && this.dataQueue.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
else {
|
||||
data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
|
||||
if (data == null) {
|
||||
throw new SocketTimeoutException();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
// Restore the interrupted status
|
||||
Thread.currentThread().interrupt();
|
||||
return false;
|
||||
}
|
||||
|
||||
// handle sequence overflow
|
||||
if (this.seq == 65535) {
|
||||
this.seq = -1;
|
||||
}
|
||||
|
||||
// check if data packets sequence is successor of last seen sequence
|
||||
long seq = data.getSeq();
|
||||
if (seq - 1 != this.seq) {
|
||||
// packets out of order; close stream/session
|
||||
InBandBytestreamSession.this.close();
|
||||
throw new IOException("Packets out of sequence");
|
||||
}
|
||||
else {
|
||||
this.seq = seq;
|
||||
}
|
||||
|
||||
// set buffer to decoded data
|
||||
buffer = data.getDecodedData();
|
||||
bufferPointer = 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if this stream is closed and throws an IOException if necessary
|
||||
*
|
||||
* @throws IOException if stream is closed and no data should be read anymore
|
||||
*/
|
||||
private void checkClosed() throws IOException {
|
||||
/* throw no exception if there is data available, but not if close method was invoked */
|
||||
if ((isClosed && this.dataQueue.isEmpty()) || closeInvoked) {
|
||||
// clear data queue in case additional data was received after stream was closed
|
||||
this.dataQueue.clear();
|
||||
throw new IOException("Stream is closed");
|
||||
}
|
||||
}
|
||||
|
||||
public boolean markSupported() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
if (isClosed) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.closeInvoked = true;
|
||||
|
||||
InBandBytestreamSession.this.closeByLocal(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method sets the close flag and removes the data packet listener.
|
||||
*/
|
||||
private void closeInternal() {
|
||||
if (isClosed) {
|
||||
return;
|
||||
}
|
||||
isClosed = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoked if the session is closed.
|
||||
*/
|
||||
private void cleanup() {
|
||||
connection.removePacketListener(this.dataPacketListener);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
|
||||
* data packets.
|
||||
*/
|
||||
private class IQIBBInputStream extends IBBInputStream {
|
||||
|
||||
protected PacketListener getDataPacketListener() {
|
||||
return new PacketListener() {
|
||||
|
||||
private long lastSequence = -1;
|
||||
|
||||
public void processPacket(Packet packet) {
|
||||
// get data packet extension
|
||||
DataPacketExtension data = (DataPacketExtension) packet.getExtension(
|
||||
DataPacketExtension.ELEMENT_NAME,
|
||||
InBandBytestreamManager.NAMESPACE);
|
||||
|
||||
/*
|
||||
* check if sequence was not used already (see XEP-0047 Section 2.2)
|
||||
*/
|
||||
if (data.getSeq() <= this.lastSequence) {
|
||||
IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
|
||||
XMPPError.Condition.unexpected_request));
|
||||
connection.sendPacket(unexpectedRequest);
|
||||
return;
|
||||
|
||||
}
|
||||
|
||||
// check if encoded data is valid (see XEP-0047 Section 2.2)
|
||||
if (data.getDecodedData() == null) {
|
||||
// data is invalid; respond with bad-request error
|
||||
IQ badRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
|
||||
XMPPError.Condition.bad_request));
|
||||
connection.sendPacket(badRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
// data is valid; add to data queue
|
||||
dataQueue.offer(data);
|
||||
|
||||
// confirm IQ
|
||||
IQ confirmData = IQ.createResultIQ((IQ) packet);
|
||||
connection.sendPacket(confirmData);
|
||||
|
||||
// set last seen sequence
|
||||
this.lastSequence = data.getSeq();
|
||||
if (this.lastSequence == 65535) {
|
||||
this.lastSequence = -1;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
protected PacketFilter getDataPacketFilter() {
|
||||
/*
|
||||
* filter all IQ stanzas having type 'SET' (represented by Data class), containing a
|
||||
* data packet extension, matching session ID and recipient
|
||||
*/
|
||||
return new AndFilter(new PacketTypeFilter(Data.class), new IBBDataPacketFilter());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
|
||||
* encapsulating the data packets.
|
||||
*/
|
||||
private class MessageIBBInputStream extends IBBInputStream {
|
||||
|
||||
protected PacketListener getDataPacketListener() {
|
||||
return new PacketListener() {
|
||||
|
||||
public void processPacket(Packet packet) {
|
||||
// get data packet extension
|
||||
DataPacketExtension data = (DataPacketExtension) packet.getExtension(
|
||||
DataPacketExtension.ELEMENT_NAME,
|
||||
InBandBytestreamManager.NAMESPACE);
|
||||
|
||||
// check if encoded data is valid
|
||||
if (data.getDecodedData() == null) {
|
||||
/*
|
||||
* TODO once a majority of XMPP server implementation support XEP-0079
|
||||
* Advanced Message Processing the invalid message could be answered with an
|
||||
* appropriate error. For now we just ignore the packet. Subsequent packets
|
||||
* with an increased sequence will cause the input stream to close the
|
||||
* stream/session.
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
// data is valid; add to data queue
|
||||
dataQueue.offer(data);
|
||||
|
||||
// TODO confirm packet once XMPP servers support XEP-0079
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PacketFilter getDataPacketFilter() {
|
||||
/*
|
||||
* filter all message stanzas containing a data packet extension, matching session ID
|
||||
* and recipient
|
||||
*/
|
||||
return new AndFilter(new PacketTypeFilter(Message.class), new IBBDataPacketFilter());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* IBBDataPacketFilter class filters all packets from the remote peer of this session,
|
||||
* containing an In-Band Bytestream data packet extension whose session ID matches this sessions
|
||||
* ID.
|
||||
*/
|
||||
private class IBBDataPacketFilter implements PacketFilter {
|
||||
|
||||
public boolean accept(Packet packet) {
|
||||
// sender equals remote peer
|
||||
if (!packet.getFrom().equalsIgnoreCase(remoteJID)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// stanza contains data packet extension
|
||||
PacketExtension packetExtension = packet.getExtension(DataPacketExtension.ELEMENT_NAME,
|
||||
InBandBytestreamManager.NAMESPACE);
|
||||
if (packetExtension == null || !(packetExtension instanceof DataPacketExtension)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// session ID equals this session ID
|
||||
DataPacketExtension data = (DataPacketExtension) packetExtension;
|
||||
if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
|
||||
* Subclasses of this output stream must provide a method to send data over XMPP stream.
|
||||
*/
|
||||
private abstract class IBBOutputStream extends OutputStream {
|
||||
|
||||
/* buffer with the size of this sessions block size */
|
||||
protected final byte[] buffer;
|
||||
|
||||
/* pointer to next byte to write to buffer */
|
||||
protected int bufferPointer = 0;
|
||||
|
||||
/* data packet sequence (range from 0 to 65535) */
|
||||
protected long seq = 0;
|
||||
|
||||
/* flag to indicate if output stream is closed */
|
||||
protected boolean isClosed = false;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
public IBBOutputStream() {
|
||||
this.buffer = new byte[byteStreamRequest.getBlockSize()];
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the given data packet to the XMPP stream.
|
||||
*
|
||||
* @param data the data packet
|
||||
* @throws IOException if an I/O error occurred while sending or if the stream is closed
|
||||
*/
|
||||
protected abstract void writeToXML(DataPacketExtension data) throws IOException;
|
||||
|
||||
public synchronized void write(int b) throws IOException {
|
||||
if (this.isClosed) {
|
||||
throw new IOException("Stream is closed");
|
||||
}
|
||||
|
||||
// if buffer is full flush buffer
|
||||
if (bufferPointer >= buffer.length) {
|
||||
flushBuffer();
|
||||
}
|
||||
|
||||
buffer[bufferPointer++] = (byte) b;
|
||||
}
|
||||
|
||||
public synchronized void write(byte b[], int off, int len) throws IOException {
|
||||
if (b == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
|
||||
|| ((off + len) < 0)) {
|
||||
throw new IndexOutOfBoundsException();
|
||||
}
|
||||
else if (len == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.isClosed) {
|
||||
throw new IOException("Stream is closed");
|
||||
}
|
||||
|
||||
// is data to send greater than buffer size
|
||||
if (len >= buffer.length) {
|
||||
|
||||
// "byte" off the first chunk to write out
|
||||
writeOut(b, off, buffer.length);
|
||||
|
||||
// recursively call this method with the lesser amount
|
||||
write(b, off + buffer.length, len - buffer.length);
|
||||
}
|
||||
else {
|
||||
writeOut(b, off, len);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void write(byte[] b) throws IOException {
|
||||
write(b, 0, b.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fills the buffer with the given data and sends it over the XMPP stream if the buffers
|
||||
* capacity has been reached. This method is only called from this class so it is assured
|
||||
* that the amount of data to send is <= buffer capacity
|
||||
*
|
||||
* @param b the data
|
||||
* @param off the data
|
||||
* @param len the number of bytes to write
|
||||
* @throws IOException if an I/O error occurred while sending or if the stream is closed
|
||||
*/
|
||||
private synchronized void writeOut(byte b[], int off, int len) throws IOException {
|
||||
if (this.isClosed) {
|
||||
throw new IOException("Stream is closed");
|
||||
}
|
||||
|
||||
// set to 0 in case the next 'if' block is not executed
|
||||
int available = 0;
|
||||
|
||||
// is data to send greater that buffer space left
|
||||
if (len > buffer.length - bufferPointer) {
|
||||
// fill buffer to capacity and send it
|
||||
available = buffer.length - bufferPointer;
|
||||
System.arraycopy(b, off, buffer, bufferPointer, available);
|
||||
bufferPointer += available;
|
||||
flushBuffer();
|
||||
}
|
||||
|
||||
// copy the data left to buffer
|
||||
System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
|
||||
bufferPointer += len - available;
|
||||
}
|
||||
|
||||
public synchronized void flush() throws IOException {
|
||||
if (this.isClosed) {
|
||||
throw new IOException("Stream is closed");
|
||||
}
|
||||
flushBuffer();
|
||||
}
|
||||
|
||||
private synchronized void flushBuffer() throws IOException {
|
||||
|
||||
// do nothing if no data to send available
|
||||
if (bufferPointer == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// create data packet
|
||||
String enc = StringUtils.encodeBase64(buffer, 0, bufferPointer, false);
|
||||
DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
|
||||
this.seq, enc);
|
||||
|
||||
// write to XMPP stream
|
||||
writeToXML(data);
|
||||
|
||||
// reset buffer pointer
|
||||
bufferPointer = 0;
|
||||
|
||||
// increment sequence, considering sequence overflow
|
||||
this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1);
|
||||
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
if (isClosed) {
|
||||
return;
|
||||
}
|
||||
InBandBytestreamSession.this.closeByLocal(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the close flag and optionally flushes the stream.
|
||||
*
|
||||
* @param flush if <code>true</code> flushes the stream
|
||||
*/
|
||||
protected void closeInternal(boolean flush) {
|
||||
if (this.isClosed) {
|
||||
return;
|
||||
}
|
||||
this.isClosed = true;
|
||||
|
||||
try {
|
||||
if (flush) {
|
||||
flushBuffer();
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
/*
|
||||
* ignore, because writeToXML() will not throw an exception if stream is already
|
||||
* closed
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
|
||||
* the data packets.
|
||||
*/
|
||||
private class IQIBBOutputStream extends IBBOutputStream {
|
||||
|
||||
@Override
|
||||
protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
|
||||
// create IQ stanza containing data packet
|
||||
IQ iq = new Data(data);
|
||||
iq.setTo(remoteJID);
|
||||
|
||||
try {
|
||||
SyncPacketSend.getReply(connection, iq);
|
||||
}
|
||||
catch (XMPPException e) {
|
||||
// close session unless it is already closed
|
||||
if (!this.isClosed) {
|
||||
InBandBytestreamSession.this.close();
|
||||
throw new IOException("Error while sending Data: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
|
||||
* encapsulating the data packets.
|
||||
*/
|
||||
private class MessageIBBOutputStream extends IBBOutputStream {
|
||||
|
||||
@Override
|
||||
protected synchronized void writeToXML(DataPacketExtension data) {
|
||||
// create message stanza containing data packet
|
||||
Message message = new Message(remoteJID);
|
||||
message.addExtension(data);
|
||||
|
||||
connection.sendPacket(message);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue