diff options
author | Mark Spruiell <mes@zeroc.com> | 2004-12-02 20:49:49 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2004-12-02 20:49:49 +0000 |
commit | 35e3ed3fdf47eb7c260f0b500405d6fa109ebac9 (patch) | |
tree | 86a8cc5d44c47d94b5be53efea0b0dd45eb84796 /java/src | |
parent | Reverted back Object/LocalObject type id for backward compatibility (diff) | |
download | ice-35e3ed3fdf47eb7c260f0b500405d6fa109ebac9.tar.bz2 ice-35e3ed3fdf47eb7c260f0b500405d6fa109ebac9.tar.xz ice-35e3ed3fdf47eb7c260f0b500405d6fa109ebac9.zip |
adding bzip2 support
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/Ice/ConnectionI.java | 460 | ||||
-rw-r--r-- | java/src/Ice/_ObjectDelM.java | 5 | ||||
-rw-r--r-- | java/src/IceInternal/BasicStream.java | 267 | ||||
-rw-r--r-- | java/src/IceInternal/DefaultsAndOverrides.java | 10 | ||||
-rw-r--r-- | java/src/IceInternal/IncomingConnectionFactory.java | 5 | ||||
-rw-r--r-- | java/src/IceInternal/Outgoing.java | 24 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 9 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingConnectionFactory.java | 71 |
8 files changed, 652 insertions, 199 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 2ec74a60c33..f02222732f6 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -109,14 +109,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { throw new ConnectionNotValidatedException(); } - byte compress = is.readByte(); // Ignore compression status for validate connection. + byte compress = is.readByte(); // Ignore compression status for validate connection. int size = is.readInt(); if(size != IceInternal.Protocol.headerSize) { throw new IllegalMessageSizeException(); } - IceInternal.TraceUtil.traceHeader("received validate connection", is, - _logger, _traceLevels); + IceInternal.TraceUtil.traceHeader("received validate connection", is, _logger, _traceLevels); } } catch(LocalException ex) @@ -389,10 +388,61 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne os.writeBlob(_requestHdr); } + private IceInternal.BasicStream + doCompress(IceInternal.BasicStream uncompressed, boolean compress) + { + if(_compressionSupported) + { + if(compress && uncompressed.size() >= 100) + { + // + // Do compression. + // + IceInternal.BasicStream cstream = uncompressed.compress(IceInternal.Protocol.headerSize); + if(cstream != null) + { + // + // Set compression status. + // + cstream.pos(9); + cstream.writeByte((byte)2); + + // + // Write the size of the compressed stream into the header. + // + cstream.pos(10); + cstream.writeInt(cstream.size()); + + // + // Write the compression status and size of the compressed stream into the header of the + // uncompressed stream -- we need this to trace requests correctly. + // + uncompressed.pos(9); + uncompressed.writeByte((byte)2); + uncompressed.writeInt(cstream.size()); + + return cstream; + } + } + } + + uncompressed.pos(9); + uncompressed.writeByte((byte)((_compressionSupported && compress) ? 1 : 0)); + + // + // Not compressed, fill in the message size. + // + uncompressed.pos(10); + uncompressed.writeInt(uncompressed.size()); + + return uncompressed; + } + public void - sendRequest(IceInternal.BasicStream os, IceInternal.Outgoing out) + sendRequest(IceInternal.BasicStream os, IceInternal.Outgoing out, boolean compress) { int requestId = 0; + IceInternal.BasicStream stream = null; synchronized(this) { @@ -405,12 +455,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(_state > StateNotValidated); assert(_state < StateClosing); - - // - // Fill in the message size. - // - os.pos(10); - os.writeInt(os.size()); // // Only add to the request map if this is a twoway call. @@ -439,6 +483,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _requests.put(requestId, out); } + stream = doCompress(os, _overrideCompress ? _overrideCompressValue : compress); + if(_acmTimeout > 0) { _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; @@ -454,12 +500,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(_exception != null); throw _exception; // The exception is immutable at this point. } - + // // Send the request. // IceInternal.TraceUtil.traceRequest("sending request", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); + _transceiver.write(stream, _endpoint.timeout()); } } catch(LocalException ex) @@ -500,12 +546,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } } + finally + { + if(stream != null && stream != os) + { + stream.destroy(); + } + } } public void - sendAsyncRequest(IceInternal.BasicStream os, IceInternal.OutgoingAsync out) + sendAsyncRequest(IceInternal.BasicStream os, IceInternal.OutgoingAsync out, boolean compress) { int requestId = 0; + IceInternal.BasicStream stream = null; synchronized(this) { @@ -518,12 +572,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(_state > StateNotValidated); assert(_state < StateClosing); - - // - // Fill in the message size. - // - os.pos(10); - os.writeInt(os.size()); // // Create a new unique request ID. @@ -542,10 +590,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne os.writeInt(requestId); // - // Add to the requests map. + // Add to the async requests map. // _asyncRequests.put(requestId, out); + stream = doCompress(os, _overrideCompress ? _overrideCompressValue : compress); + if(_acmTimeout > 0) { _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; @@ -561,13 +611,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(_exception != null); throw _exception; // The exception is immutable at this point. } - + // // Send the request. // - IceInternal.TraceUtil.traceRequest("sending asynchronous request", os, - _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); + IceInternal.TraceUtil.traceRequest("sending asynchronous request", os, _logger, _traceLevels); + _transceiver.write(stream, _endpoint.timeout()); } } catch(LocalException ex) @@ -600,6 +649,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } } + finally + { + if(stream != null && stream != os) + { + stream.destroy(); + } + } } private final static byte[] _requestBatchHdr = @@ -663,7 +719,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } public synchronized void - finishBatchRequest(IceInternal.BasicStream os) + finishBatchRequest(IceInternal.BasicStream os, boolean compress) { // // Get the batch stream back and increment the number of @@ -673,6 +729,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne ++_batchRequestNum; // + // We compress the whole batch if there is at least one compressed + // message. + // + if(compress) + { + _batchRequestCompress = true; + } + + // // Notify about the batch stream not being in use anymore. // assert(_batchStreamInUse); @@ -691,6 +756,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _batchStream.destroy(); _batchStream = new IceInternal.BasicStream(_instance); _batchRequestNum = 0; + _batchRequestCompress = false; // // Notify about the batch stream not being in use anymore. @@ -703,6 +769,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public void flushBatchRequests() { + IceInternal.BasicStream stream = null; + synchronized(this) { while(_batchStreamInUse && _exception == null) @@ -728,18 +796,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(_state > StateNotValidated); assert(_state < StateClosing); - + // // Fill in the message size. // _batchStream.pos(10); _batchStream.writeInt(_batchStream.size()); - + // // Fill in the number of requests in the batch. // _batchStream.writeInt(_batchRequestNum); + stream = doCompress(_batchStream, _overrideCompress ? _overrideCompressValue : _batchRequestCompress); + if(_acmTimeout > 0) { _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; @@ -761,13 +831,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(_exception != null); throw _exception; // The exception is immutable at this point. } - + // // Send the batch request. // - IceInternal.TraceUtil.traceBatchRequest("sending batch request", _batchStream, - _logger, _traceLevels); - _transceiver.write(_batchStream, _endpoint.timeout()); + IceInternal.TraceUtil.traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); + _transceiver.write(stream, _endpoint.timeout()); } } catch(LocalException ex) @@ -784,6 +853,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne throw _exception; } } + finally + { + if(stream != null && stream != _batchStream) + { + stream.destroy(); + } + } synchronized(this) { @@ -793,14 +869,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _batchStream.destroy(); _batchStream = new IceInternal.BasicStream(_instance); _batchRequestNum = 0; + _batchRequestCompress = false; _batchStreamInUse = false; notifyAll(); } } public void - sendResponse(IceInternal.BasicStream os, byte compress) + sendResponse(IceInternal.BasicStream os, byte compressFlag) { + IceInternal.BasicStream stream = null; try { synchronized(_sendMutex) @@ -811,17 +889,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne throw _exception; // The exception is immutable at this point. } - // - // Fill in the message size. - // - os.pos(10); - os.writeInt(os.size()); - + stream = doCompress(os, compressFlag != 0); + // // Send the reply. // IceInternal.TraceUtil.traceReply("sending reply", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); + _transceiver.write(stream, _endpoint.timeout()); } } catch(LocalException ex) @@ -831,6 +905,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne setState(StateClosed, ex); } } + finally + { + if(stream != os) + { + stream.destroy(); + } + } synchronized(this) { @@ -1011,6 +1092,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceInternal.ServantManager servantManager = null; ObjectAdapter adapter = null; IceInternal.OutgoingAsync outAsync = null; + boolean destroyStream = false; synchronized(this) { @@ -1046,7 +1128,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne compress = stream.readByte(); if(compress == (byte)2) { - throw new CompressionNotSupportedException(); + if(_compressionSupported) + { + IceInternal.BasicStream ustream = stream.uncompress(IceInternal.Protocol.headerSize); + if(ustream != stream) + { + destroyStream = true; + stream = ustream; + } + } + else + { + throw new CompressionNotSupportedException(); + } } stream.pos(IceInternal.Protocol.headerSize); @@ -1054,8 +1148,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { case IceInternal.Protocol.closeConnectionMsg: { - IceInternal.TraceUtil.traceHeader("received close connection", stream, - _logger, _traceLevels); + IceInternal.TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels); if(_endpoint.datagram() && _warn) { _logger.warning("ignoring close connection message for datagram connection:\n" + _desc); @@ -1072,13 +1165,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_state == StateClosing) { IceInternal.TraceUtil.traceRequest("received request during closing\n" + - "(ignored by server, client will retry)", - stream, _logger, _traceLevels); + "(ignored by server, client will retry)", + stream, _logger, _traceLevels); } else { - IceInternal.TraceUtil.traceRequest("received request", stream, - _logger, _traceLevels); + IceInternal.TraceUtil.traceRequest("received request", stream, _logger, _traceLevels); requestId = stream.readInt(); invokeNum = 1; servantManager = _servantManager; @@ -1092,15 +1184,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_state == StateClosing) { - IceInternal.TraceUtil.traceBatchRequest - ("received batch request during closing\n" + - "(ignored by server, client will retry)", - stream, _logger, _traceLevels); + IceInternal.TraceUtil.traceBatchRequest("received batch request during closing\n" + + "(ignored by server, client will retry)", + stream, _logger, _traceLevels); } else { - IceInternal.TraceUtil.traceBatchRequest("received batch request", - stream, _logger, _traceLevels); + IceInternal.TraceUtil.traceBatchRequest("received batch request", stream, _logger, + _traceLevels); invokeNum = stream.readInt(); if(invokeNum < 0) { @@ -1135,8 +1226,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne case IceInternal.Protocol.validateConnectionMsg: { - IceInternal.TraceUtil.traceHeader("received validate connection", stream, - _logger, _traceLevels); + IceInternal.TraceUtil.traceHeader("received validate connection", stream, _logger, + _traceLevels); if(_warn) { _logger.warning("ignoring unexpected validate connection message:\n" + _desc); @@ -1147,139 +1238,46 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne default: { IceInternal.TraceUtil.traceHeader("received unknown message\n" + - "(invalid, closing connection)", - stream, _logger, _traceLevels); + "(invalid, closing connection)", stream, _logger, + _traceLevels); throw new UnknownMessageException(); } } } catch(LocalException ex) { + if(destroyStream) + { + stream.destroy(); + } setState(StateClosed, ex); return; } } - // - // Asynchronous replies must be handled outside the thread - // synchronization, so that nested calls are possible. - // - if(outAsync != null) - { - outAsync.__finished(stream); - } - - // - // Method invocation (or multiple invocations for batch messages) - // must be done outside the thread synchronization, so that nested - // calls are possible. - // - IceInternal.Incoming in = null; try { - while(invokeNum > 0) - { - - // - // Prepare the invocation. - // - boolean response = !_endpoint.datagram() && requestId != 0; - in = getIncoming(adapter, response, compress); - IceInternal.BasicStream is = in.is(); - stream.swap(is); - IceInternal.BasicStream os = in.os(); - - // - // Prepare the response if necessary. - // - if(response) - { - assert(invokeNum == 1); // No further invocations if a response is expected. - os.writeBlob(_replyHdr); - - // - // Fill in the request ID. - // - os.writeInt(requestId); - } - - in.invoke(servantManager); - - // - // If there are more invocations, we need the stream back. - // - if(--invokeNum > 0) - { - stream.swap(is); - } - - reclaimIncoming(in); - in = null; - } - } - catch(LocalException ex) - { - synchronized(this) - { - setState(StateClosed, ex); - } - } - catch(java.lang.AssertionError ex) // Upon assertion, we print the stack trace. - { - synchronized(this) - { - UnknownException uex = new UnknownException(); - //uex.unknown = ex.toString(); - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - uex.unknown = sw.toString(); - if(ex instanceof java.lang.AssertionError) - { - _logger.error(uex.unknown); - } - setState(StateClosed, uex); - } - } - catch(java.lang.Exception ex) - { - synchronized(this) + // + // Asynchronous replies must be handled outside the thread + // synchronization, so that nested calls are possible. + // + if(outAsync != null) { - UnknownException uex = new UnknownException(); - //uex.unknown = ex.toString(); - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - uex.unknown = sw.toString(); - setState(StateClosed, uex); + outAsync.__finished(stream); } + + // + // Method invocation (or multiple invocations for batch messages) + // must be done outside the thread synchronization, so that nested + // calls are possible. + // + invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter); } finally { - if(in != null) + if(destroyStream) { - reclaimIncoming(in); - } - } - - // - // If invoke() above raised an exception, and therefore - // neither sendResponse() nor sendNoResponse() has been - // called, then we must decrement _dispatchCount here. - // - if(invokeNum > 0) - { - synchronized(this) - { - assert(_dispatchCount > 0); - _dispatchCount -= invokeNum; - assert(_dispatchCount >= 0); - if(_dispatchCount == 0) - { - notifyAll(); - } + stream.destroy(); } } } @@ -1416,6 +1414,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _batchStream = new IceInternal.BasicStream(instance); _batchStreamInUse = false; _batchRequestNum = 0; + _batchRequestCompress = false; _dispatchCount = 0; _state = StateNotValidated; _stateTime = System.currentTimeMillis(); @@ -1430,6 +1429,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _threadPool = _instance.clientThreadPool(); _servantManager = null; } + + _overrideCompress = _instance.defaultsAndOverrides().overrideCompress; + _overrideCompressValue = _instance.defaultsAndOverrides().overrideCompressValue; } protected void @@ -1648,7 +1650,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne os.writeByte(IceInternal.Protocol.encodingMajor); os.writeByte(IceInternal.Protocol.encodingMinor); os.writeByte(IceInternal.Protocol.closeConnectionMsg); - os.writeByte((byte)0); // Compression status. + os.writeByte(_compressionSupported ? (byte)1 : (byte)0); os.writeInt(IceInternal.Protocol.headerSize); // Message size. // @@ -1694,6 +1696,120 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } private void + invokeAll(IceInternal.BasicStream stream, int invokeNum, int requestId, byte compress, + IceInternal.ServantManager servantManager, ObjectAdapter adapter) + { + // + // Note: In contrast to other private or protected methods, this + // operation must be called *without* the mutex locked. + // + + IceInternal.Incoming in = null; + try + { + while(invokeNum > 0) + { + + // + // Prepare the invocation. + // + boolean response = !_endpoint.datagram() && requestId != 0; + in = getIncoming(adapter, response, compress); + IceInternal.BasicStream is = in.is(); + stream.swap(is); + IceInternal.BasicStream os = in.os(); + + // + // Prepare the response if necessary. + // + if(response) + { + assert(invokeNum == 1); // No further invocations if a response is expected. + os.writeBlob(_replyHdr); + + // + // Add the request ID. + // + os.writeInt(requestId); + } + + in.invoke(servantManager); + + // + // If there are more invocations, we need the stream back. + // + if(--invokeNum > 0) + { + stream.swap(is); + } + + reclaimIncoming(in); + in = null; + } + } + catch(LocalException ex) + { + synchronized(this) + { + setState(StateClosed, ex); + } + } + catch(java.lang.AssertionError ex) // Upon assertion, we print the stack trace. + { + synchronized(this) + { + UnknownException uex = new UnknownException(); + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + uex.unknown = sw.toString(); + _logger.error(uex.unknown); + setState(StateClosed, uex); + } + } + catch(java.lang.Exception ex) + { + synchronized(this) + { + UnknownException uex = new UnknownException(); + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + uex.unknown = sw.toString(); + setState(StateClosed, uex); + } + } + finally + { + if(in != null) + { + reclaimIncoming(in); + } + } + + // + // If invoke() above raised an exception, and therefore + // neither sendResponse() nor sendNoResponse() has been + // called, then we must decrement _dispatchCount here. + // + if(invokeNum > 0) + { + synchronized(this) + { + assert(_dispatchCount > 0); + _dispatchCount -= invokeNum; + assert(_dispatchCount >= 0); + if(_dispatchCount == 0) + { + notifyAll(); + } + } + } + } + + private void warning(String msg, Exception ex) { java.io.StringWriter sw = new java.io.StringWriter(); @@ -1775,6 +1891,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private IceInternal.BasicStream _batchStream; private boolean _batchStreamInUse; private int _batchRequestNum; + private boolean _batchRequestCompress; private int _dispatchCount; @@ -1789,4 +1906,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private IceInternal.Incoming _incomingCache; private java.lang.Object _incomingCacheMutex = new java.lang.Object(); + + private static boolean _compressionSupported = IceInternal.BasicStream.compressible(); + + private boolean _overrideCompress; + private boolean _overrideCompressValue; } diff --git a/java/src/Ice/_ObjectDelM.java b/java/src/Ice/_ObjectDelM.java index 970e529d386..1fa57dd08e0 100644 --- a/java/src/Ice/_ObjectDelM.java +++ b/java/src/Ice/_ObjectDelM.java @@ -176,10 +176,12 @@ public class _ObjectDelM implements _ObjectDel __reference = from.__reference; __connection = from.__connection; + __compress = from.__compress; } protected IceInternal.Reference __reference; protected ConnectionI __connection; + protected boolean __compress; public void setup(IceInternal.Reference ref) @@ -195,6 +197,7 @@ public class _ObjectDelM implements _ObjectDel __reference = ref; BooleanHolder compress = new BooleanHolder(); __connection = __reference.getConnection(compress); + __compress = compress.value; } protected IceInternal.Outgoing @@ -206,7 +209,7 @@ public class _ObjectDelM implements _ObjectDel { if(__outgoingCache == null) { - out = new IceInternal.Outgoing(__connection, __reference, operation, mode, context); + out = new IceInternal.Outgoing(__connection, __reference, operation, mode, context, __compress); } else { diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java index dc6aff676e0..1dc909aad2a 100644 --- a/java/src/IceInternal/BasicStream.java +++ b/java/src/IceInternal/BasicStream.java @@ -611,6 +611,13 @@ public class BasicStream _buf.put(v); } + public void + writeBlob(byte[] v, int off, int len) + { + expand(len); + _buf.put(v, off, len); + } + public byte[] readBlob(int sz) { @@ -1657,6 +1664,239 @@ public class BasicStream return _limit == 0; } + private static class BufferedOutputStream extends java.io.OutputStream + { + BufferedOutputStream(byte[] data) + { + _data = data; + } + + public void + close() + throws java.io.IOException + { + } + + public void + flush() + throws java.io.IOException + { + } + + public void + write(byte[] b) + throws java.io.IOException + { + assert(_data.length - _pos >= b.length); + System.arraycopy(b, 0, _data, _pos, b.length); + _pos += b.length; + } + + public void + write(byte[] b, int off, int len) + throws java.io.IOException + { + assert(_data.length - _pos >= len); + System.arraycopy(b, off, _data, _pos, len); + _pos += len; + } + + public void + write(int b) + throws java.io.IOException + { + assert(_data.length - _pos >= 1); + _data[_pos] = (byte)b; + ++_pos; + } + + int + pos() + { + return _pos; + } + + private byte[] _data; + private int _pos; + } + + public BasicStream + compress(int headerSize) + { + assert(compressible()); + + int uncompressedLen = size() - headerSize; + int compressedLen = (int)(uncompressedLen * 1.01 + 600); + byte[] compressed = new byte[compressedLen]; + + byte[] data = null; + int offset = 0; + try + { + // + // If the ByteBuffer is backed by an array then we can avoid + // an extra copy by using the array directly. + // + data = _buf.array(); + offset = _buf.arrayOffset(); + } + catch(Exception ex) + { + // + // Otherwise, allocate an array to hold a copy of the uncompressed data. + // + data = new byte[size()]; + _buf.get(data); + } + + try + { + // + // Compress the data using the class org.apache.tools.bzip2.CBZip2OutputStream. + // Its constructor requires an OutputStream argument, therefore we pass the + // compressed BasicStream in an OutputStream wrapper. + // + BufferedOutputStream bos = new BufferedOutputStream(compressed); + // + // For interoperability with the bzip2 C library, we insert the magic bytes + // 'B', 'Z' before invoking the Java implementation. + // + bos.write((int)'B'); + bos.write((int)'Z'); + java.lang.Object[] args = new java.lang.Object[]{ bos }; + java.io.OutputStream os = (java.io.OutputStream)_bzOutputStreamCtor.newInstance(args); + os.write(data, offset + headerSize, uncompressedLen); + os.close(); + compressedLen = bos.pos(); + } + catch(Exception ex) + { + Ice.CompressionException e = new Ice.CompressionException(); + e.reason = "bzip2 compression failure"; + e.initCause(ex); + throw e; + } + + // + // Don't bother if the compressed data is larger than the + // uncompressed data. + // + if(compressedLen >= uncompressedLen) + { + return null; + } + + BasicStream cstream = new BasicStream(_instance); + cstream.resize(headerSize + 4 + compressedLen, false); + cstream.pos(0); + + // + // Copy the header from the uncompressed stream to the + // compressed one. + // + cstream._buf.put(data, offset, headerSize); + + // + // Add the size of the uncompressed stream before the + // message body. + // + cstream.writeInt(size()); + + // + // Add the compressed message body. + // + cstream._buf.put(compressed, 0, compressedLen); + + return cstream; + } + + public BasicStream + uncompress(int headerSize) + { + assert(compressible()); + + pos(headerSize); + int uncompressedSize = readInt(); + if(uncompressedSize <= headerSize) + { + throw new Ice.IllegalMessageSizeException(); + } + + int compressedLen = size() - headerSize - 4; + + byte[] compressed = null; + int offset = 0; + try + { + // + // If the ByteBuffer is backed by an array then we can avoid + // an extra copy by using the array directly. + // + compressed = _buf.array(); + offset = _buf.arrayOffset(); + } + catch(Exception ex) + { + // + // Otherwise, allocate an array to hold a copy of the compressed data. + // + compressed = new byte[size()]; + _buf.get(compressed); + } + + BasicStream ucStream = new BasicStream(_instance); + ucStream.resize(uncompressedSize, false); + + try + { + // + // Uncompress the data using the class org.apache.tools.bzip2.CBZip2InputStream. + // Its constructor requires an InputStream argument, therefore we pass the + // compressed data in a ByteArrayInputStream. + // + java.io.ByteArrayInputStream bais = + new java.io.ByteArrayInputStream(compressed, offset + headerSize + 4, compressedLen); + // + // For interoperability with the bzip2 C library, we insert the magic bytes + // 'B', 'Z' during compression and therefore must extract them before we + // invoke the Java implementation. + // + byte magicB = (byte)bais.read(); + byte magicZ = (byte)bais.read(); + if(magicB != (byte)'B' || magicZ != (byte)'Z') + { + Ice.CompressionException e = new Ice.CompressionException(); + e.reason = "bzip2 uncompression failure: invalid magic bytes"; + throw e; + } + java.lang.Object[] args = new java.lang.Object[]{ bais }; + java.io.InputStream is = (java.io.InputStream)_bzInputStreamCtor.newInstance(args); + ucStream.pos(headerSize); + byte[] arr = new byte[8 * 1024]; + int n; + while((n = is.read(arr)) != -1) + { + ucStream.writeBlob(arr, 0, n); + } + is.close(); + } + catch(Exception ex) + { + Ice.CompressionException e = new Ice.CompressionException(); + e.reason = "bzip2 uncompression failure"; + e.initCause(ex); + throw e; + } + + // + // Copy the header from the compressed stream to the uncompressed one. + // + ucStream.pos(0); + ucStream._buf.put(compressed, offset, headerSize); + + return ucStream; + } + private void expand(int size) { @@ -2036,4 +2276,31 @@ public class BasicStream private static java.util.HashMap _exceptionFactories = new java.util.HashMap(); private static java.lang.Object _factoryMutex = new java.lang.Object(); // Protects _exceptionFactories. + + public static boolean + compressible() + { + return _bzInputStreamCtor != null && _bzOutputStreamCtor != null; + } + + private static java.lang.reflect.Constructor _bzInputStreamCtor; + private static java.lang.reflect.Constructor _bzOutputStreamCtor; + static + { + try + { + Class cls; + Class[] types = new Class[1]; + cls = Class.forName("org.apache.tools.bzip2.CBZip2InputStream"); + types[0] = java.io.InputStream.class; + _bzInputStreamCtor = cls.getDeclaredConstructor(types); + cls = Class.forName("org.apache.tools.bzip2.CBZip2OutputStream"); + types[0] = java.io.OutputStream.class; + _bzOutputStreamCtor = cls.getDeclaredConstructor(types); + } + catch(Exception ex) + { + // Ignore - bzip2 compression not available. + } + } } diff --git a/java/src/IceInternal/DefaultsAndOverrides.java b/java/src/IceInternal/DefaultsAndOverrides.java index 357ec174f97..64370b9beed 100644 --- a/java/src/IceInternal/DefaultsAndOverrides.java +++ b/java/src/IceInternal/DefaultsAndOverrides.java @@ -57,15 +57,17 @@ public final class DefaultsAndOverrides if(value.length() > 0) { overrideCompress = true; - if(properties.getPropertyAsInt("Ice.Override.Compress") != 0) + boolean b = properties.getPropertyAsInt("Ice.Override.Compress") > 0; + if(!BasicStream.compressible() && b) { - System.err.println("warning: compression not supported, Ice.Override.Compress ignored"); + System.err.println("warning: bzip2 support not available, Ice.Override.Compress ignored"); + b = false; } - overrideCompressValue = false; + overrideCompressValue = b; } else { - overrideCompress = false; + overrideCompress = !BasicStream.compressible(); overrideCompressValue = false; } diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index ef9a21c1272..399573dbb55 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -330,6 +330,11 @@ public class IncomingConnectionFactory extends EventHandler _endpoint = _endpoint.timeout(defaultsAndOverrides.overrideTimeoutValue); } + if(defaultsAndOverrides.overrideCompress) + { + _endpoint = _endpoint.compress(defaultsAndOverrides.overrideCompressValue); + } + try { EndpointHolder h = new EndpointHolder(); diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java index aed85cc9c5a..eea8a2b4c1a 100644 --- a/java/src/IceInternal/Outgoing.java +++ b/java/src/IceInternal/Outgoing.java @@ -13,13 +13,14 @@ public final class Outgoing { public Outgoing(Ice.ConnectionI connection, Reference ref, String operation, Ice.OperationMode mode, - java.util.Map context) + java.util.Map context, boolean compress) { _connection = connection; _reference = ref; _state = StateUnsent; _is = new BasicStream(ref.getInstance()); _os = new BasicStream(ref.getInstance()); + _compress = compress; writeHeader(operation, mode, context); } @@ -81,7 +82,7 @@ public final class Outgoing // call back on this object, so we don't need to lock // the mutex, keep track of state, or save exceptions. // - _connection.sendRequest(_os, this); + _connection.sendRequest(_os, this, _compress); // // Wait until the request has completed, or until the @@ -193,16 +194,15 @@ public final class Outgoing case Reference.ModeDatagram: { // - // For oneway and datagram requests, the connection - // object never calls back on this object. Therefore - // we don't need to lock the mutex or save - // exceptions. We simply let all exceptions from - // sending propagate to the caller, because such - // exceptions can be retried without violating - // "at-most-once". + // For oneway and datagram requests, the connection object + // never calls back on this object. Therefore we don't + // need to lock the mutex or save exceptions. We simply + // let all exceptions from sending propagate to the + // caller, because such exceptions can be retried without + // violating "at-most-once". // _state = StateInProgress; - _connection.sendRequest(_os, null); + _connection.sendRequest(_os, null, _compress); break; } @@ -215,7 +215,7 @@ public final class Outgoing // apply. // _state = StateInProgress; - _connection.finishBatchRequest(_os); + _connection.finishBatchRequest(_os, _compress); break; } } @@ -503,5 +503,7 @@ public final class Outgoing private BasicStream _is; private BasicStream _os; + private boolean _compress; + public Outgoing next; // For use by Ice._ObjectDelM } diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 12fde2daa76..fd82778e1fc 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -283,8 +283,7 @@ public abstract class OutgoingAsync _reference = ((Ice.ObjectPrxHelperBase)prx).__reference(); assert(_connection == null); - Ice.BooleanHolder compress = new Ice.BooleanHolder(); - _connection = _reference.getConnection(compress); + _connection = _reference.getConnection(_compress); _cnt = 0; _mode = mode; assert(__is == null); @@ -371,8 +370,7 @@ public abstract class OutgoingAsync { if(_connection == null) { - Ice.BooleanHolder compress = new Ice.BooleanHolder(); - _connection = _reference.getConnection(compress); + _connection = _reference.getConnection(_compress); } if(_connection.timeout() >= 0) @@ -386,7 +384,7 @@ public abstract class OutgoingAsync try { - _connection.sendAsyncRequest(__os, this); + _connection.sendAsyncRequest(__os, this, _compress.value); // // Don't do anything after sendAsyncRequest() returned @@ -479,6 +477,7 @@ public abstract class OutgoingAsync private Ice.ConnectionI _connection; private int _cnt; private Ice.OperationMode _mode; + private Ice.BooleanHolder _compress = new Ice.BooleanHolder(); // // Must be volatile, because we don't want to lock the monitor diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 2264c7566e2..6e8bc384b24 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -87,11 +87,13 @@ public class OutgoingConnectionFactory } public Ice.ConnectionI - create(Endpoint[] endpoints, Ice.BooleanHolder compress) + create(Endpoint[] endpts, Ice.BooleanHolder compress) { - assert(endpoints.length > 0); + assert(endpts.length > 0); + Endpoint[] endpoints = new Endpoint[endpts.length]; + System.arraycopy(endpts, 0, endpoints, 0, endpts.length); - compress.value = false; // TODO: compression is not supported yet. + DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); synchronized(this) { @@ -127,13 +129,23 @@ public class OutgoingConnectionFactory // // Modify endpoints with overrides. // - DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); for(int i = 0; i < endpoints.length; i++) { if(defaultsAndOverrides.overrideTimeout) { endpoints[i] = endpoints[i].timeout(defaultsAndOverrides.overrideTimeoutValue); } + + // + // The Connection object does not take the compression flag of + // endpoints into account, but instead gets the information + // about whether messages should be compressed or not from + // other sources. In order to allow connection sharing for + // endpoints that differ in the value of the compression flag + // only, we always set the compression flag to false here in + // this connection factory. + // + endpoints[i] = endpoints[i].compress(false); } // @@ -151,11 +163,20 @@ public class OutgoingConnectionFactory Ice.ConnectionI connection = (Ice.ConnectionI)q.next(); // - // Don't return connections for which destruction - // has been initiated. + // Don't return connections for which destruction has + // been initiated. // if(!connection.isDestroyed()) { + if(defaultsAndOverrides.overrideCompress) + { + compress.value = defaultsAndOverrides.overrideCompressValue; + } + else + { + compress.value = endpts[i].compress(); + } + return connection; } } @@ -219,11 +240,20 @@ public class OutgoingConnectionFactory Ice.ConnectionI connection = (Ice.ConnectionI)q.next(); // - // Don't return connections for which - // destruction has been initiated. + // Don't return connections for which destruction has + // been initiated. // if(!connection.isDestroyed()) { + if(defaultsAndOverrides.overrideCompress) + { + compress.value = defaultsAndOverrides.overrideCompressValue; + } + else + { + compress.value = endpts[i].compress(); + } + return connection; } } @@ -259,7 +289,6 @@ public class OutgoingConnectionFactory assert(connector != null); int timeout; - DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); if(defaultsAndOverrides.overrideConnectTimeout) { timeout = defaultsAndOverrides.overrideConnectTimeoutValue; @@ -277,6 +306,15 @@ public class OutgoingConnectionFactory } connection = new Ice.ConnectionI(_instance, transceiver, endpoint, null); connection.validate(); + + if(defaultsAndOverrides.overrideCompress) + { + compress.value = defaultsAndOverrides.overrideCompressValue; + } + else + { + compress.value = endpts[i].compress(); + } break; } catch(Ice.LocalException ex) @@ -369,11 +407,26 @@ public class OutgoingConnectionFactory for(int i = 0; i < endpoints.length; i++) { Endpoint endpoint = endpoints[i]; + + // + // Modify endpoints with overrides. + // if(defaultsAndOverrides.overrideTimeout) { endpoint = endpoint.timeout(defaultsAndOverrides.overrideTimeoutValue); } + // + // The Connection object does not take the compression flag of + // endpoints into account, but instead gets the information + // about whether messages should be compressed or not from + // other sources. In order to allow connection sharing for + // endpoints that differ in the value of the compression flag + // only, we always set the compression flag to false here in + // this connection factory. + // + endpoint = endpoint.compress(false); + java.util.LinkedList connectionList = (java.util.LinkedList)_connections.get(endpoints[i]); if(connectionList != null) { |