summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorMark Spruiell <mes@zeroc.com>2004-12-02 20:49:49 +0000
committerMark Spruiell <mes@zeroc.com>2004-12-02 20:49:49 +0000
commit35e3ed3fdf47eb7c260f0b500405d6fa109ebac9 (patch)
tree86a8cc5d44c47d94b5be53efea0b0dd45eb84796 /java/src
parentReverted back Object/LocalObject type id for backward compatibility (diff)
downloadice-35e3ed3fdf47eb7c260f0b500405d6fa109ebac9.tar.bz2
ice-35e3ed3fdf47eb7c260f0b500405d6fa109ebac9.tar.xz
ice-35e3ed3fdf47eb7c260f0b500405d6fa109ebac9.zip
adding bzip2 support
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/ConnectionI.java460
-rw-r--r--java/src/Ice/_ObjectDelM.java5
-rw-r--r--java/src/IceInternal/BasicStream.java267
-rw-r--r--java/src/IceInternal/DefaultsAndOverrides.java10
-rw-r--r--java/src/IceInternal/IncomingConnectionFactory.java5
-rw-r--r--java/src/IceInternal/Outgoing.java24
-rw-r--r--java/src/IceInternal/OutgoingAsync.java9
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java71
8 files changed, 652 insertions, 199 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 2ec74a60c33..f02222732f6 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -109,14 +109,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
throw new ConnectionNotValidatedException();
}
- byte compress = is.readByte(); // Ignore compression status for validate connection.
+ byte compress = is.readByte(); // Ignore compression status for validate connection.
int size = is.readInt();
if(size != IceInternal.Protocol.headerSize)
{
throw new IllegalMessageSizeException();
}
- IceInternal.TraceUtil.traceHeader("received validate connection", is,
- _logger, _traceLevels);
+ IceInternal.TraceUtil.traceHeader("received validate connection", is, _logger, _traceLevels);
}
}
catch(LocalException ex)
@@ -389,10 +388,61 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
os.writeBlob(_requestHdr);
}
+ private IceInternal.BasicStream
+ doCompress(IceInternal.BasicStream uncompressed, boolean compress)
+ {
+ if(_compressionSupported)
+ {
+ if(compress && uncompressed.size() >= 100)
+ {
+ //
+ // Do compression.
+ //
+ IceInternal.BasicStream cstream = uncompressed.compress(IceInternal.Protocol.headerSize);
+ if(cstream != null)
+ {
+ //
+ // Set compression status.
+ //
+ cstream.pos(9);
+ cstream.writeByte((byte)2);
+
+ //
+ // Write the size of the compressed stream into the header.
+ //
+ cstream.pos(10);
+ cstream.writeInt(cstream.size());
+
+ //
+ // Write the compression status and size of the compressed stream into the header of the
+ // uncompressed stream -- we need this to trace requests correctly.
+ //
+ uncompressed.pos(9);
+ uncompressed.writeByte((byte)2);
+ uncompressed.writeInt(cstream.size());
+
+ return cstream;
+ }
+ }
+ }
+
+ uncompressed.pos(9);
+ uncompressed.writeByte((byte)((_compressionSupported && compress) ? 1 : 0));
+
+ //
+ // Not compressed, fill in the message size.
+ //
+ uncompressed.pos(10);
+ uncompressed.writeInt(uncompressed.size());
+
+ return uncompressed;
+ }
+
public void
- sendRequest(IceInternal.BasicStream os, IceInternal.Outgoing out)
+ sendRequest(IceInternal.BasicStream os, IceInternal.Outgoing out, boolean compress)
{
int requestId = 0;
+ IceInternal.BasicStream stream = null;
synchronized(this)
{
@@ -405,12 +455,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
assert(_state > StateNotValidated);
assert(_state < StateClosing);
-
- //
- // Fill in the message size.
- //
- os.pos(10);
- os.writeInt(os.size());
//
// Only add to the request map if this is a twoway call.
@@ -439,6 +483,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_requests.put(requestId, out);
}
+ stream = doCompress(os, _overrideCompress ? _overrideCompressValue : compress);
+
if(_acmTimeout > 0)
{
_acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
@@ -454,12 +500,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
assert(_exception != null);
throw _exception; // The exception is immutable at this point.
}
-
+
//
// Send the request.
//
IceInternal.TraceUtil.traceRequest("sending request", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
+ _transceiver.write(stream, _endpoint.timeout());
}
}
catch(LocalException ex)
@@ -500,12 +546,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
}
+ finally
+ {
+ if(stream != null && stream != os)
+ {
+ stream.destroy();
+ }
+ }
}
public void
- sendAsyncRequest(IceInternal.BasicStream os, IceInternal.OutgoingAsync out)
+ sendAsyncRequest(IceInternal.BasicStream os, IceInternal.OutgoingAsync out, boolean compress)
{
int requestId = 0;
+ IceInternal.BasicStream stream = null;
synchronized(this)
{
@@ -518,12 +572,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
assert(_state > StateNotValidated);
assert(_state < StateClosing);
-
- //
- // Fill in the message size.
- //
- os.pos(10);
- os.writeInt(os.size());
//
// Create a new unique request ID.
@@ -542,10 +590,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
os.writeInt(requestId);
//
- // Add to the requests map.
+ // Add to the async requests map.
//
_asyncRequests.put(requestId, out);
+ stream = doCompress(os, _overrideCompress ? _overrideCompressValue : compress);
+
if(_acmTimeout > 0)
{
_acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
@@ -561,13 +611,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
assert(_exception != null);
throw _exception; // The exception is immutable at this point.
}
-
+
//
// Send the request.
//
- IceInternal.TraceUtil.traceRequest("sending asynchronous request", os,
- _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
+ IceInternal.TraceUtil.traceRequest("sending asynchronous request", os, _logger, _traceLevels);
+ _transceiver.write(stream, _endpoint.timeout());
}
}
catch(LocalException ex)
@@ -600,6 +649,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
}
+ finally
+ {
+ if(stream != null && stream != os)
+ {
+ stream.destroy();
+ }
+ }
}
private final static byte[] _requestBatchHdr =
@@ -663,7 +719,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
public synchronized void
- finishBatchRequest(IceInternal.BasicStream os)
+ finishBatchRequest(IceInternal.BasicStream os, boolean compress)
{
//
// Get the batch stream back and increment the number of
@@ -673,6 +729,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
++_batchRequestNum;
//
+ // We compress the whole batch if there is at least one compressed
+ // message.
+ //
+ if(compress)
+ {
+ _batchRequestCompress = true;
+ }
+
+ //
// Notify about the batch stream not being in use anymore.
//
assert(_batchStreamInUse);
@@ -691,6 +756,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_batchStream.destroy();
_batchStream = new IceInternal.BasicStream(_instance);
_batchRequestNum = 0;
+ _batchRequestCompress = false;
//
// Notify about the batch stream not being in use anymore.
@@ -703,6 +769,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public void
flushBatchRequests()
{
+ IceInternal.BasicStream stream = null;
+
synchronized(this)
{
while(_batchStreamInUse && _exception == null)
@@ -728,18 +796,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
assert(_state > StateNotValidated);
assert(_state < StateClosing);
-
+
//
// Fill in the message size.
//
_batchStream.pos(10);
_batchStream.writeInt(_batchStream.size());
-
+
//
// Fill in the number of requests in the batch.
//
_batchStream.writeInt(_batchRequestNum);
+ stream = doCompress(_batchStream, _overrideCompress ? _overrideCompressValue : _batchRequestCompress);
+
if(_acmTimeout > 0)
{
_acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
@@ -761,13 +831,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
assert(_exception != null);
throw _exception; // The exception is immutable at this point.
}
-
+
//
// Send the batch request.
//
- IceInternal.TraceUtil.traceBatchRequest("sending batch request", _batchStream,
- _logger, _traceLevels);
- _transceiver.write(_batchStream, _endpoint.timeout());
+ IceInternal.TraceUtil.traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
+ _transceiver.write(stream, _endpoint.timeout());
}
}
catch(LocalException ex)
@@ -784,6 +853,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
throw _exception;
}
}
+ finally
+ {
+ if(stream != null && stream != _batchStream)
+ {
+ stream.destroy();
+ }
+ }
synchronized(this)
{
@@ -793,14 +869,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_batchStream.destroy();
_batchStream = new IceInternal.BasicStream(_instance);
_batchRequestNum = 0;
+ _batchRequestCompress = false;
_batchStreamInUse = false;
notifyAll();
}
}
public void
- sendResponse(IceInternal.BasicStream os, byte compress)
+ sendResponse(IceInternal.BasicStream os, byte compressFlag)
{
+ IceInternal.BasicStream stream = null;
try
{
synchronized(_sendMutex)
@@ -811,17 +889,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
throw _exception; // The exception is immutable at this point.
}
- //
- // Fill in the message size.
- //
- os.pos(10);
- os.writeInt(os.size());
-
+ stream = doCompress(os, compressFlag != 0);
+
//
// Send the reply.
//
IceInternal.TraceUtil.traceReply("sending reply", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
+ _transceiver.write(stream, _endpoint.timeout());
}
}
catch(LocalException ex)
@@ -831,6 +905,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
setState(StateClosed, ex);
}
}
+ finally
+ {
+ if(stream != os)
+ {
+ stream.destroy();
+ }
+ }
synchronized(this)
{
@@ -1011,6 +1092,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
IceInternal.ServantManager servantManager = null;
ObjectAdapter adapter = null;
IceInternal.OutgoingAsync outAsync = null;
+ boolean destroyStream = false;
synchronized(this)
{
@@ -1046,7 +1128,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
compress = stream.readByte();
if(compress == (byte)2)
{
- throw new CompressionNotSupportedException();
+ if(_compressionSupported)
+ {
+ IceInternal.BasicStream ustream = stream.uncompress(IceInternal.Protocol.headerSize);
+ if(ustream != stream)
+ {
+ destroyStream = true;
+ stream = ustream;
+ }
+ }
+ else
+ {
+ throw new CompressionNotSupportedException();
+ }
}
stream.pos(IceInternal.Protocol.headerSize);
@@ -1054,8 +1148,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
case IceInternal.Protocol.closeConnectionMsg:
{
- IceInternal.TraceUtil.traceHeader("received close connection", stream,
- _logger, _traceLevels);
+ IceInternal.TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels);
if(_endpoint.datagram() && _warn)
{
_logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
@@ -1072,13 +1165,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_state == StateClosing)
{
IceInternal.TraceUtil.traceRequest("received request during closing\n" +
- "(ignored by server, client will retry)",
- stream, _logger, _traceLevels);
+ "(ignored by server, client will retry)",
+ stream, _logger, _traceLevels);
}
else
{
- IceInternal.TraceUtil.traceRequest("received request", stream,
- _logger, _traceLevels);
+ IceInternal.TraceUtil.traceRequest("received request", stream, _logger, _traceLevels);
requestId = stream.readInt();
invokeNum = 1;
servantManager = _servantManager;
@@ -1092,15 +1184,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_state == StateClosing)
{
- IceInternal.TraceUtil.traceBatchRequest
- ("received batch request during closing\n" +
- "(ignored by server, client will retry)",
- stream, _logger, _traceLevels);
+ IceInternal.TraceUtil.traceBatchRequest("received batch request during closing\n" +
+ "(ignored by server, client will retry)",
+ stream, _logger, _traceLevels);
}
else
{
- IceInternal.TraceUtil.traceBatchRequest("received batch request",
- stream, _logger, _traceLevels);
+ IceInternal.TraceUtil.traceBatchRequest("received batch request", stream, _logger,
+ _traceLevels);
invokeNum = stream.readInt();
if(invokeNum < 0)
{
@@ -1135,8 +1226,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
case IceInternal.Protocol.validateConnectionMsg:
{
- IceInternal.TraceUtil.traceHeader("received validate connection", stream,
- _logger, _traceLevels);
+ IceInternal.TraceUtil.traceHeader("received validate connection", stream, _logger,
+ _traceLevels);
if(_warn)
{
_logger.warning("ignoring unexpected validate connection message:\n" + _desc);
@@ -1147,139 +1238,46 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
default:
{
IceInternal.TraceUtil.traceHeader("received unknown message\n" +
- "(invalid, closing connection)",
- stream, _logger, _traceLevels);
+ "(invalid, closing connection)", stream, _logger,
+ _traceLevels);
throw new UnknownMessageException();
}
}
}
catch(LocalException ex)
{
+ if(destroyStream)
+ {
+ stream.destroy();
+ }
setState(StateClosed, ex);
return;
}
}
- //
- // Asynchronous replies must be handled outside the thread
- // synchronization, so that nested calls are possible.
- //
- if(outAsync != null)
- {
- outAsync.__finished(stream);
- }
-
- //
- // Method invocation (or multiple invocations for batch messages)
- // must be done outside the thread synchronization, so that nested
- // calls are possible.
- //
- IceInternal.Incoming in = null;
try
{
- while(invokeNum > 0)
- {
-
- //
- // Prepare the invocation.
- //
- boolean response = !_endpoint.datagram() && requestId != 0;
- in = getIncoming(adapter, response, compress);
- IceInternal.BasicStream is = in.is();
- stream.swap(is);
- IceInternal.BasicStream os = in.os();
-
- //
- // Prepare the response if necessary.
- //
- if(response)
- {
- assert(invokeNum == 1); // No further invocations if a response is expected.
- os.writeBlob(_replyHdr);
-
- //
- // Fill in the request ID.
- //
- os.writeInt(requestId);
- }
-
- in.invoke(servantManager);
-
- //
- // If there are more invocations, we need the stream back.
- //
- if(--invokeNum > 0)
- {
- stream.swap(is);
- }
-
- reclaimIncoming(in);
- in = null;
- }
- }
- catch(LocalException ex)
- {
- synchronized(this)
- {
- setState(StateClosed, ex);
- }
- }
- catch(java.lang.AssertionError ex) // Upon assertion, we print the stack trace.
- {
- synchronized(this)
- {
- UnknownException uex = new UnknownException();
- //uex.unknown = ex.toString();
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- uex.unknown = sw.toString();
- if(ex instanceof java.lang.AssertionError)
- {
- _logger.error(uex.unknown);
- }
- setState(StateClosed, uex);
- }
- }
- catch(java.lang.Exception ex)
- {
- synchronized(this)
+ //
+ // Asynchronous replies must be handled outside the thread
+ // synchronization, so that nested calls are possible.
+ //
+ if(outAsync != null)
{
- UnknownException uex = new UnknownException();
- //uex.unknown = ex.toString();
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- uex.unknown = sw.toString();
- setState(StateClosed, uex);
+ outAsync.__finished(stream);
}
+
+ //
+ // Method invocation (or multiple invocations for batch messages)
+ // must be done outside the thread synchronization, so that nested
+ // calls are possible.
+ //
+ invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter);
}
finally
{
- if(in != null)
+ if(destroyStream)
{
- reclaimIncoming(in);
- }
- }
-
- //
- // If invoke() above raised an exception, and therefore
- // neither sendResponse() nor sendNoResponse() has been
- // called, then we must decrement _dispatchCount here.
- //
- if(invokeNum > 0)
- {
- synchronized(this)
- {
- assert(_dispatchCount > 0);
- _dispatchCount -= invokeNum;
- assert(_dispatchCount >= 0);
- if(_dispatchCount == 0)
- {
- notifyAll();
- }
+ stream.destroy();
}
}
}
@@ -1416,6 +1414,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_batchStream = new IceInternal.BasicStream(instance);
_batchStreamInUse = false;
_batchRequestNum = 0;
+ _batchRequestCompress = false;
_dispatchCount = 0;
_state = StateNotValidated;
_stateTime = System.currentTimeMillis();
@@ -1430,6 +1429,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_threadPool = _instance.clientThreadPool();
_servantManager = null;
}
+
+ _overrideCompress = _instance.defaultsAndOverrides().overrideCompress;
+ _overrideCompressValue = _instance.defaultsAndOverrides().overrideCompressValue;
}
protected void
@@ -1648,7 +1650,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
os.writeByte(IceInternal.Protocol.encodingMajor);
os.writeByte(IceInternal.Protocol.encodingMinor);
os.writeByte(IceInternal.Protocol.closeConnectionMsg);
- os.writeByte((byte)0); // Compression status.
+ os.writeByte(_compressionSupported ? (byte)1 : (byte)0);
os.writeInt(IceInternal.Protocol.headerSize); // Message size.
//
@@ -1694,6 +1696,120 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
private void
+ invokeAll(IceInternal.BasicStream stream, int invokeNum, int requestId, byte compress,
+ IceInternal.ServantManager servantManager, ObjectAdapter adapter)
+ {
+ //
+ // Note: In contrast to other private or protected methods, this
+ // operation must be called *without* the mutex locked.
+ //
+
+ IceInternal.Incoming in = null;
+ try
+ {
+ while(invokeNum > 0)
+ {
+
+ //
+ // Prepare the invocation.
+ //
+ boolean response = !_endpoint.datagram() && requestId != 0;
+ in = getIncoming(adapter, response, compress);
+ IceInternal.BasicStream is = in.is();
+ stream.swap(is);
+ IceInternal.BasicStream os = in.os();
+
+ //
+ // Prepare the response if necessary.
+ //
+ if(response)
+ {
+ assert(invokeNum == 1); // No further invocations if a response is expected.
+ os.writeBlob(_replyHdr);
+
+ //
+ // Add the request ID.
+ //
+ os.writeInt(requestId);
+ }
+
+ in.invoke(servantManager);
+
+ //
+ // If there are more invocations, we need the stream back.
+ //
+ if(--invokeNum > 0)
+ {
+ stream.swap(is);
+ }
+
+ reclaimIncoming(in);
+ in = null;
+ }
+ }
+ catch(LocalException ex)
+ {
+ synchronized(this)
+ {
+ setState(StateClosed, ex);
+ }
+ }
+ catch(java.lang.AssertionError ex) // Upon assertion, we print the stack trace.
+ {
+ synchronized(this)
+ {
+ UnknownException uex = new UnknownException();
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ uex.unknown = sw.toString();
+ _logger.error(uex.unknown);
+ setState(StateClosed, uex);
+ }
+ }
+ catch(java.lang.Exception ex)
+ {
+ synchronized(this)
+ {
+ UnknownException uex = new UnknownException();
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ uex.unknown = sw.toString();
+ setState(StateClosed, uex);
+ }
+ }
+ finally
+ {
+ if(in != null)
+ {
+ reclaimIncoming(in);
+ }
+ }
+
+ //
+ // If invoke() above raised an exception, and therefore
+ // neither sendResponse() nor sendNoResponse() has been
+ // called, then we must decrement _dispatchCount here.
+ //
+ if(invokeNum > 0)
+ {
+ synchronized(this)
+ {
+ assert(_dispatchCount > 0);
+ _dispatchCount -= invokeNum;
+ assert(_dispatchCount >= 0);
+ if(_dispatchCount == 0)
+ {
+ notifyAll();
+ }
+ }
+ }
+ }
+
+ private void
warning(String msg, Exception ex)
{
java.io.StringWriter sw = new java.io.StringWriter();
@@ -1775,6 +1891,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private IceInternal.BasicStream _batchStream;
private boolean _batchStreamInUse;
private int _batchRequestNum;
+ private boolean _batchRequestCompress;
private int _dispatchCount;
@@ -1789,4 +1906,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private IceInternal.Incoming _incomingCache;
private java.lang.Object _incomingCacheMutex = new java.lang.Object();
+
+ private static boolean _compressionSupported = IceInternal.BasicStream.compressible();
+
+ private boolean _overrideCompress;
+ private boolean _overrideCompressValue;
}
diff --git a/java/src/Ice/_ObjectDelM.java b/java/src/Ice/_ObjectDelM.java
index 970e529d386..1fa57dd08e0 100644
--- a/java/src/Ice/_ObjectDelM.java
+++ b/java/src/Ice/_ObjectDelM.java
@@ -176,10 +176,12 @@ public class _ObjectDelM implements _ObjectDel
__reference = from.__reference;
__connection = from.__connection;
+ __compress = from.__compress;
}
protected IceInternal.Reference __reference;
protected ConnectionI __connection;
+ protected boolean __compress;
public void
setup(IceInternal.Reference ref)
@@ -195,6 +197,7 @@ public class _ObjectDelM implements _ObjectDel
__reference = ref;
BooleanHolder compress = new BooleanHolder();
__connection = __reference.getConnection(compress);
+ __compress = compress.value;
}
protected IceInternal.Outgoing
@@ -206,7 +209,7 @@ public class _ObjectDelM implements _ObjectDel
{
if(__outgoingCache == null)
{
- out = new IceInternal.Outgoing(__connection, __reference, operation, mode, context);
+ out = new IceInternal.Outgoing(__connection, __reference, operation, mode, context, __compress);
}
else
{
diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java
index dc6aff676e0..1dc909aad2a 100644
--- a/java/src/IceInternal/BasicStream.java
+++ b/java/src/IceInternal/BasicStream.java
@@ -611,6 +611,13 @@ public class BasicStream
_buf.put(v);
}
+ public void
+ writeBlob(byte[] v, int off, int len)
+ {
+ expand(len);
+ _buf.put(v, off, len);
+ }
+
public byte[]
readBlob(int sz)
{
@@ -1657,6 +1664,239 @@ public class BasicStream
return _limit == 0;
}
+ private static class BufferedOutputStream extends java.io.OutputStream
+ {
+ BufferedOutputStream(byte[] data)
+ {
+ _data = data;
+ }
+
+ public void
+ close()
+ throws java.io.IOException
+ {
+ }
+
+ public void
+ flush()
+ throws java.io.IOException
+ {
+ }
+
+ public void
+ write(byte[] b)
+ throws java.io.IOException
+ {
+ assert(_data.length - _pos >= b.length);
+ System.arraycopy(b, 0, _data, _pos, b.length);
+ _pos += b.length;
+ }
+
+ public void
+ write(byte[] b, int off, int len)
+ throws java.io.IOException
+ {
+ assert(_data.length - _pos >= len);
+ System.arraycopy(b, off, _data, _pos, len);
+ _pos += len;
+ }
+
+ public void
+ write(int b)
+ throws java.io.IOException
+ {
+ assert(_data.length - _pos >= 1);
+ _data[_pos] = (byte)b;
+ ++_pos;
+ }
+
+ int
+ pos()
+ {
+ return _pos;
+ }
+
+ private byte[] _data;
+ private int _pos;
+ }
+
+ public BasicStream
+ compress(int headerSize)
+ {
+ assert(compressible());
+
+ int uncompressedLen = size() - headerSize;
+ int compressedLen = (int)(uncompressedLen * 1.01 + 600);
+ byte[] compressed = new byte[compressedLen];
+
+ byte[] data = null;
+ int offset = 0;
+ try
+ {
+ //
+ // If the ByteBuffer is backed by an array then we can avoid
+ // an extra copy by using the array directly.
+ //
+ data = _buf.array();
+ offset = _buf.arrayOffset();
+ }
+ catch(Exception ex)
+ {
+ //
+ // Otherwise, allocate an array to hold a copy of the uncompressed data.
+ //
+ data = new byte[size()];
+ _buf.get(data);
+ }
+
+ try
+ {
+ //
+ // Compress the data using the class org.apache.tools.bzip2.CBZip2OutputStream.
+ // Its constructor requires an OutputStream argument, therefore we pass the
+ // compressed BasicStream in an OutputStream wrapper.
+ //
+ BufferedOutputStream bos = new BufferedOutputStream(compressed);
+ //
+ // For interoperability with the bzip2 C library, we insert the magic bytes
+ // 'B', 'Z' before invoking the Java implementation.
+ //
+ bos.write((int)'B');
+ bos.write((int)'Z');
+ java.lang.Object[] args = new java.lang.Object[]{ bos };
+ java.io.OutputStream os = (java.io.OutputStream)_bzOutputStreamCtor.newInstance(args);
+ os.write(data, offset + headerSize, uncompressedLen);
+ os.close();
+ compressedLen = bos.pos();
+ }
+ catch(Exception ex)
+ {
+ Ice.CompressionException e = new Ice.CompressionException();
+ e.reason = "bzip2 compression failure";
+ e.initCause(ex);
+ throw e;
+ }
+
+ //
+ // Don't bother if the compressed data is larger than the
+ // uncompressed data.
+ //
+ if(compressedLen >= uncompressedLen)
+ {
+ return null;
+ }
+
+ BasicStream cstream = new BasicStream(_instance);
+ cstream.resize(headerSize + 4 + compressedLen, false);
+ cstream.pos(0);
+
+ //
+ // Copy the header from the uncompressed stream to the
+ // compressed one.
+ //
+ cstream._buf.put(data, offset, headerSize);
+
+ //
+ // Add the size of the uncompressed stream before the
+ // message body.
+ //
+ cstream.writeInt(size());
+
+ //
+ // Add the compressed message body.
+ //
+ cstream._buf.put(compressed, 0, compressedLen);
+
+ return cstream;
+ }
+
+ public BasicStream
+ uncompress(int headerSize)
+ {
+ assert(compressible());
+
+ pos(headerSize);
+ int uncompressedSize = readInt();
+ if(uncompressedSize <= headerSize)
+ {
+ throw new Ice.IllegalMessageSizeException();
+ }
+
+ int compressedLen = size() - headerSize - 4;
+
+ byte[] compressed = null;
+ int offset = 0;
+ try
+ {
+ //
+ // If the ByteBuffer is backed by an array then we can avoid
+ // an extra copy by using the array directly.
+ //
+ compressed = _buf.array();
+ offset = _buf.arrayOffset();
+ }
+ catch(Exception ex)
+ {
+ //
+ // Otherwise, allocate an array to hold a copy of the compressed data.
+ //
+ compressed = new byte[size()];
+ _buf.get(compressed);
+ }
+
+ BasicStream ucStream = new BasicStream(_instance);
+ ucStream.resize(uncompressedSize, false);
+
+ try
+ {
+ //
+ // Uncompress the data using the class org.apache.tools.bzip2.CBZip2InputStream.
+ // Its constructor requires an InputStream argument, therefore we pass the
+ // compressed data in a ByteArrayInputStream.
+ //
+ java.io.ByteArrayInputStream bais =
+ new java.io.ByteArrayInputStream(compressed, offset + headerSize + 4, compressedLen);
+ //
+ // For interoperability with the bzip2 C library, we insert the magic bytes
+ // 'B', 'Z' during compression and therefore must extract them before we
+ // invoke the Java implementation.
+ //
+ byte magicB = (byte)bais.read();
+ byte magicZ = (byte)bais.read();
+ if(magicB != (byte)'B' || magicZ != (byte)'Z')
+ {
+ Ice.CompressionException e = new Ice.CompressionException();
+ e.reason = "bzip2 uncompression failure: invalid magic bytes";
+ throw e;
+ }
+ java.lang.Object[] args = new java.lang.Object[]{ bais };
+ java.io.InputStream is = (java.io.InputStream)_bzInputStreamCtor.newInstance(args);
+ ucStream.pos(headerSize);
+ byte[] arr = new byte[8 * 1024];
+ int n;
+ while((n = is.read(arr)) != -1)
+ {
+ ucStream.writeBlob(arr, 0, n);
+ }
+ is.close();
+ }
+ catch(Exception ex)
+ {
+ Ice.CompressionException e = new Ice.CompressionException();
+ e.reason = "bzip2 uncompression failure";
+ e.initCause(ex);
+ throw e;
+ }
+
+ //
+ // Copy the header from the compressed stream to the uncompressed one.
+ //
+ ucStream.pos(0);
+ ucStream._buf.put(compressed, offset, headerSize);
+
+ return ucStream;
+ }
+
private void
expand(int size)
{
@@ -2036,4 +2276,31 @@ public class BasicStream
private static java.util.HashMap _exceptionFactories = new java.util.HashMap();
private static java.lang.Object _factoryMutex = new java.lang.Object(); // Protects _exceptionFactories.
+
+ public static boolean
+ compressible()
+ {
+ return _bzInputStreamCtor != null && _bzOutputStreamCtor != null;
+ }
+
+ private static java.lang.reflect.Constructor _bzInputStreamCtor;
+ private static java.lang.reflect.Constructor _bzOutputStreamCtor;
+ static
+ {
+ try
+ {
+ Class cls;
+ Class[] types = new Class[1];
+ cls = Class.forName("org.apache.tools.bzip2.CBZip2InputStream");
+ types[0] = java.io.InputStream.class;
+ _bzInputStreamCtor = cls.getDeclaredConstructor(types);
+ cls = Class.forName("org.apache.tools.bzip2.CBZip2OutputStream");
+ types[0] = java.io.OutputStream.class;
+ _bzOutputStreamCtor = cls.getDeclaredConstructor(types);
+ }
+ catch(Exception ex)
+ {
+ // Ignore - bzip2 compression not available.
+ }
+ }
}
diff --git a/java/src/IceInternal/DefaultsAndOverrides.java b/java/src/IceInternal/DefaultsAndOverrides.java
index 357ec174f97..64370b9beed 100644
--- a/java/src/IceInternal/DefaultsAndOverrides.java
+++ b/java/src/IceInternal/DefaultsAndOverrides.java
@@ -57,15 +57,17 @@ public final class DefaultsAndOverrides
if(value.length() > 0)
{
overrideCompress = true;
- if(properties.getPropertyAsInt("Ice.Override.Compress") != 0)
+ boolean b = properties.getPropertyAsInt("Ice.Override.Compress") > 0;
+ if(!BasicStream.compressible() && b)
{
- System.err.println("warning: compression not supported, Ice.Override.Compress ignored");
+ System.err.println("warning: bzip2 support not available, Ice.Override.Compress ignored");
+ b = false;
}
- overrideCompressValue = false;
+ overrideCompressValue = b;
}
else
{
- overrideCompress = false;
+ overrideCompress = !BasicStream.compressible();
overrideCompressValue = false;
}
diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java
index ef9a21c1272..399573dbb55 100644
--- a/java/src/IceInternal/IncomingConnectionFactory.java
+++ b/java/src/IceInternal/IncomingConnectionFactory.java
@@ -330,6 +330,11 @@ public class IncomingConnectionFactory extends EventHandler
_endpoint = _endpoint.timeout(defaultsAndOverrides.overrideTimeoutValue);
}
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ _endpoint = _endpoint.compress(defaultsAndOverrides.overrideCompressValue);
+ }
+
try
{
EndpointHolder h = new EndpointHolder();
diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java
index aed85cc9c5a..eea8a2b4c1a 100644
--- a/java/src/IceInternal/Outgoing.java
+++ b/java/src/IceInternal/Outgoing.java
@@ -13,13 +13,14 @@ public final class Outgoing
{
public
Outgoing(Ice.ConnectionI connection, Reference ref, String operation, Ice.OperationMode mode,
- java.util.Map context)
+ java.util.Map context, boolean compress)
{
_connection = connection;
_reference = ref;
_state = StateUnsent;
_is = new BasicStream(ref.getInstance());
_os = new BasicStream(ref.getInstance());
+ _compress = compress;
writeHeader(operation, mode, context);
}
@@ -81,7 +82,7 @@ public final class Outgoing
// call back on this object, so we don't need to lock
// the mutex, keep track of state, or save exceptions.
//
- _connection.sendRequest(_os, this);
+ _connection.sendRequest(_os, this, _compress);
//
// Wait until the request has completed, or until the
@@ -193,16 +194,15 @@ public final class Outgoing
case Reference.ModeDatagram:
{
//
- // For oneway and datagram requests, the connection
- // object never calls back on this object. Therefore
- // we don't need to lock the mutex or save
- // exceptions. We simply let all exceptions from
- // sending propagate to the caller, because such
- // exceptions can be retried without violating
- // "at-most-once".
+ // For oneway and datagram requests, the connection object
+ // never calls back on this object. Therefore we don't
+ // need to lock the mutex or save exceptions. We simply
+ // let all exceptions from sending propagate to the
+ // caller, because such exceptions can be retried without
+ // violating "at-most-once".
//
_state = StateInProgress;
- _connection.sendRequest(_os, null);
+ _connection.sendRequest(_os, null, _compress);
break;
}
@@ -215,7 +215,7 @@ public final class Outgoing
// apply.
//
_state = StateInProgress;
- _connection.finishBatchRequest(_os);
+ _connection.finishBatchRequest(_os, _compress);
break;
}
}
@@ -503,5 +503,7 @@ public final class Outgoing
private BasicStream _is;
private BasicStream _os;
+ private boolean _compress;
+
public Outgoing next; // For use by Ice._ObjectDelM
}
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 12fde2daa76..fd82778e1fc 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -283,8 +283,7 @@ public abstract class OutgoingAsync
_reference = ((Ice.ObjectPrxHelperBase)prx).__reference();
assert(_connection == null);
- Ice.BooleanHolder compress = new Ice.BooleanHolder();
- _connection = _reference.getConnection(compress);
+ _connection = _reference.getConnection(_compress);
_cnt = 0;
_mode = mode;
assert(__is == null);
@@ -371,8 +370,7 @@ public abstract class OutgoingAsync
{
if(_connection == null)
{
- Ice.BooleanHolder compress = new Ice.BooleanHolder();
- _connection = _reference.getConnection(compress);
+ _connection = _reference.getConnection(_compress);
}
if(_connection.timeout() >= 0)
@@ -386,7 +384,7 @@ public abstract class OutgoingAsync
try
{
- _connection.sendAsyncRequest(__os, this);
+ _connection.sendAsyncRequest(__os, this, _compress.value);
//
// Don't do anything after sendAsyncRequest() returned
@@ -479,6 +477,7 @@ public abstract class OutgoingAsync
private Ice.ConnectionI _connection;
private int _cnt;
private Ice.OperationMode _mode;
+ private Ice.BooleanHolder _compress = new Ice.BooleanHolder();
//
// Must be volatile, because we don't want to lock the monitor
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index 2264c7566e2..6e8bc384b24 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -87,11 +87,13 @@ public class OutgoingConnectionFactory
}
public Ice.ConnectionI
- create(Endpoint[] endpoints, Ice.BooleanHolder compress)
+ create(Endpoint[] endpts, Ice.BooleanHolder compress)
{
- assert(endpoints.length > 0);
+ assert(endpts.length > 0);
+ Endpoint[] endpoints = new Endpoint[endpts.length];
+ System.arraycopy(endpts, 0, endpoints, 0, endpts.length);
- compress.value = false; // TODO: compression is not supported yet.
+ DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
synchronized(this)
{
@@ -127,13 +129,23 @@ public class OutgoingConnectionFactory
//
// Modify endpoints with overrides.
//
- DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
for(int i = 0; i < endpoints.length; i++)
{
if(defaultsAndOverrides.overrideTimeout)
{
endpoints[i] = endpoints[i].timeout(defaultsAndOverrides.overrideTimeoutValue);
}
+
+ //
+ // The Connection object does not take the compression flag of
+ // endpoints into account, but instead gets the information
+ // about whether messages should be compressed or not from
+ // other sources. In order to allow connection sharing for
+ // endpoints that differ in the value of the compression flag
+ // only, we always set the compression flag to false here in
+ // this connection factory.
+ //
+ endpoints[i] = endpoints[i].compress(false);
}
//
@@ -151,11 +163,20 @@ public class OutgoingConnectionFactory
Ice.ConnectionI connection = (Ice.ConnectionI)q.next();
//
- // Don't return connections for which destruction
- // has been initiated.
+ // Don't return connections for which destruction has
+ // been initiated.
//
if(!connection.isDestroyed())
{
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ compress.value = defaultsAndOverrides.overrideCompressValue;
+ }
+ else
+ {
+ compress.value = endpts[i].compress();
+ }
+
return connection;
}
}
@@ -219,11 +240,20 @@ public class OutgoingConnectionFactory
Ice.ConnectionI connection = (Ice.ConnectionI)q.next();
//
- // Don't return connections for which
- // destruction has been initiated.
+ // Don't return connections for which destruction has
+ // been initiated.
//
if(!connection.isDestroyed())
{
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ compress.value = defaultsAndOverrides.overrideCompressValue;
+ }
+ else
+ {
+ compress.value = endpts[i].compress();
+ }
+
return connection;
}
}
@@ -259,7 +289,6 @@ public class OutgoingConnectionFactory
assert(connector != null);
int timeout;
- DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
if(defaultsAndOverrides.overrideConnectTimeout)
{
timeout = defaultsAndOverrides.overrideConnectTimeoutValue;
@@ -277,6 +306,15 @@ public class OutgoingConnectionFactory
}
connection = new Ice.ConnectionI(_instance, transceiver, endpoint, null);
connection.validate();
+
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ compress.value = defaultsAndOverrides.overrideCompressValue;
+ }
+ else
+ {
+ compress.value = endpts[i].compress();
+ }
break;
}
catch(Ice.LocalException ex)
@@ -369,11 +407,26 @@ public class OutgoingConnectionFactory
for(int i = 0; i < endpoints.length; i++)
{
Endpoint endpoint = endpoints[i];
+
+ //
+ // Modify endpoints with overrides.
+ //
if(defaultsAndOverrides.overrideTimeout)
{
endpoint = endpoint.timeout(defaultsAndOverrides.overrideTimeoutValue);
}
+ //
+ // The Connection object does not take the compression flag of
+ // endpoints into account, but instead gets the information
+ // about whether messages should be compressed or not from
+ // other sources. In order to allow connection sharing for
+ // endpoints that differ in the value of the compression flag
+ // only, we always set the compression flag to false here in
+ // this connection factory.
+ //
+ endpoint = endpoint.compress(false);
+
java.util.LinkedList connectionList = (java.util.LinkedList)_connections.get(endpoints[i]);
if(connectionList != null)
{