diff options
author | Benoit Foucher <benoit@zeroc.com> | 2007-11-27 11:58:35 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2007-11-27 11:58:35 +0100 |
commit | 47f800495093fd7679a315e2d730fea22f6135b7 (patch) | |
tree | a7b8d3488f3841367dd03d10cae293f36fd10481 /java/src/IceInternal/BasicStream.java | |
parent | Fixed SystemException to no longer derive from LocalException (diff) | |
download | ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.bz2 ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.xz ice-47f800495093fd7679a315e2d730fea22f6135b7.zip |
- Added support for non-blocking AMI/batch requests, connection
creation.
- Added support for AMI oneway requests.
- Changed collocation optimization to not perform any DNS lookups.
Diffstat (limited to 'java/src/IceInternal/BasicStream.java')
-rw-r--r-- | java/src/IceInternal/BasicStream.java | 303 |
1 files changed, 107 insertions, 196 deletions
diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java index d7a33113069..16409506b80 100644 --- a/java/src/IceInternal/BasicStream.java +++ b/java/src/IceInternal/BasicStream.java @@ -12,27 +12,24 @@ package IceInternal; public class BasicStream { public - BasicStream(IceInternal.Instance instance) + BasicStream(Instance instance) { initialize(instance, false); } public - BasicStream(IceInternal.Instance instance, boolean unlimited) + BasicStream(Instance instance, boolean unlimited) { initialize(instance, unlimited); } private void - initialize(IceInternal.Instance instance, boolean unlimited) + initialize(Instance instance, boolean unlimited) { _instance = instance; + _buf = new Buffer(_instance.messageSizeMax()); _closure = null; _unlimited = unlimited; - allocate(1500); - _capacity = _buf.capacity(); - _limit = 0; - assert(_buf.limit() == _capacity); _readEncapsStack = null; _writeEncapsStack = null; @@ -56,9 +53,7 @@ public class BasicStream public void reset() { - _limit = 0; - _buf.limit(_capacity); - _buf.position(0); + _buf.reset(); if(_readEncapsStack != null) { @@ -75,7 +70,7 @@ public class BasicStream } } - public IceInternal.Instance + public Instance instance() { return _instance; @@ -104,18 +99,10 @@ public class BasicStream other._closure = _closure; _closure = tmpClosure; - java.nio.ByteBuffer tmpBuf = other._buf; + Buffer tmpBuf = other._buf; other._buf = _buf; _buf = tmpBuf; - int tmpCapacity = other._capacity; - other._capacity = _capacity; - _capacity = tmpCapacity; - - int tmpLimit = other._limit; - other._limit = _limit; - _limit = tmpLimit; - ReadEncaps tmpRead = other._readEncapsStack; other._readEncapsStack = _readEncapsStack; _readEncapsStack = tmpRead; @@ -154,49 +141,31 @@ public class BasicStream } public void - resize(int total, boolean reading) + resize(int sz, boolean reading) { - if(!_unlimited && total > _messageSizeMax) - { - throw new Ice.MemoryLimitException(); - } - if(total > _capacity) - { - final int cap2 = _capacity << 1; - int newCapacity = cap2 > total ? cap2 : total; - _buf.limit(_limit); - reallocate(newCapacity); - _capacity = _buf.capacity(); - } // - // If this stream is used for reading, then we want to set the - // buffer's limit to the new total size. If this buffer is - // used for writing, then we must set the buffer's limit to - // the buffer's capacity. + // Check memory limit if stream is not unlimited. // - if(reading) + if(!_unlimited && sz > _messageSizeMax) { - _buf.limit(total); - } - else - { - _buf.limit(_capacity); + throw new Ice.MemoryLimitException(); } - _buf.position(total); - _limit = total; + + _buf.resize(sz, reading); + _buf.b.position(sz); } - public java.nio.ByteBuffer - prepareRead() + public Buffer + prepareWrite() { + _buf.b.limit(_buf.size()); + _buf.b.position(0); return _buf; } - public java.nio.ByteBuffer - prepareWrite() + public Buffer + getBuffer() { - _buf.limit(_limit); - _buf.position(0); return _buf; } @@ -261,7 +230,7 @@ public class BasicStream sd.previous = _seqDataStack; _seqDataStack = sd; - int bytesLeft = _buf.remaining(); + int bytesLeft = _buf.b.remaining(); if(_seqDataStack.previous == null) // Outermost sequence { // @@ -287,7 +256,7 @@ public class BasicStream public void checkSeq() { - checkSeq(_buf.remaining()); + checkSeq(_buf.b.remaining()); } public void @@ -311,7 +280,7 @@ public class BasicStream public void checkFixedSeq(int numElements, int elemSize) { - int bytesLeft = _buf.remaining(); + int bytesLeft = _buf.b.remaining(); if(_seqDataStack == null) // Outermost sequence { // @@ -376,7 +345,7 @@ public class BasicStream _writeEncapsStack = curr; } - _writeEncapsStack.start = _buf.position(); + _writeEncapsStack.start = _buf.size(); writeBlob(_encapsBlob); } @@ -385,8 +354,8 @@ public class BasicStream { assert(_writeEncapsStack != null); int start = _writeEncapsStack.start; - int sz = _buf.position() - start; // Size includes size and version. - _buf.putInt(start, sz); + int sz = _buf.size() - start; // Size includes size and version. + _buf.b.putInt(start, sz); WriteEncaps curr = _writeEncapsStack; _writeEncapsStack = curr.next; @@ -413,8 +382,8 @@ public class BasicStream _readEncapsStack = curr; } - _readEncapsStack.start = _buf.position(); - + _readEncapsStack.start = _buf.b.position(); + // // I don't use readSize() and writeSize() for encapsulations, // because when creating an encapsulation, I must know in @@ -428,7 +397,7 @@ public class BasicStream throw new Ice.NegativeSizeException(); } - if(sz - 4 > _buf.limit()) + if(sz - 4 > _buf.b.limit()) { throw new Ice.UnmarshalOutOfBoundsException(); } @@ -457,7 +426,7 @@ public class BasicStream int sz = _readEncapsStack.sz; try { - _buf.position(start + sz); + _buf.b.position(start + sz); } catch(IllegalArgumentException ex) { @@ -477,7 +446,7 @@ public class BasicStream assert(_readEncapsStack != null); int start = _readEncapsStack.start; int sz = _readEncapsStack.sz; - if(_buf.position() != start + sz) + if(_buf.b.position() != start + sz) { throw new Ice.EncapsulationException(); } @@ -500,7 +469,7 @@ public class BasicStream } try { - _buf.position(_buf.position() + sz - 4); + _buf.b.position(_buf.b.position() + sz - 4); } catch(IllegalArgumentException ex) { @@ -512,13 +481,13 @@ public class BasicStream startWriteSlice() { writeInt(0); // Placeholder for the slice length. - _writeSlice = _buf.position(); + _writeSlice = _buf.size(); } public void endWriteSlice() { - final int sz = _buf.position() - _writeSlice + 4; - _buf.putInt(_writeSlice - 4, sz); + final int sz = _buf.size() - _writeSlice + 4; + _buf.b.putInt(_writeSlice - 4, sz); } public void startReadSlice() @@ -528,7 +497,7 @@ public class BasicStream { throw new Ice.NegativeSizeException(); } - _readSlice = _buf.position(); + _readSlice = _buf.b.position(); } public void endReadSlice() @@ -544,7 +513,7 @@ public class BasicStream } try { - _buf.position(_buf.position() + sz - 4); + _buf.b.position(_buf.b.position() + sz - 4); } catch(IllegalArgumentException ex) { @@ -558,13 +527,13 @@ public class BasicStream if(v > 254) { expand(5); - _buf.put((byte)-1); - _buf.putInt(v); + _buf.b.put((byte)-1); + _buf.b.putInt(v); } else { expand(1); - _buf.put((byte)v); + _buf.b.put((byte)v); } } @@ -573,10 +542,10 @@ public class BasicStream { try { - byte b = _buf.get(); + byte b = _buf.b.get(); if(b == -1) { - int v = _buf.getInt(); + int v = _buf.b.getInt(); if(v < 0) { throw new Ice.NegativeSizeException(); @@ -640,14 +609,14 @@ public class BasicStream writeBlob(byte[] v) { expand(v.length); - _buf.put(v); + _buf.b.put(v); } public void writeBlob(byte[] v, int off, int len) { expand(len); - _buf.put(v, off, len); + _buf.b.put(v, off, len); } public byte[] @@ -656,7 +625,7 @@ public class BasicStream byte[] v = new byte[sz]; try { - _buf.get(v); + _buf.b.get(v); return v; } catch(java.nio.BufferUnderflowException ex) @@ -669,7 +638,7 @@ public class BasicStream writeByte(byte v) { expand(1); - _buf.put(v); + _buf.b.put(v); } public void @@ -693,7 +662,7 @@ public class BasicStream { writeSize(v.length); expand(v.length); - _buf.put(v); + _buf.b.put(v); } } @@ -702,7 +671,7 @@ public class BasicStream { try { - return _buf.get(); + return _buf.b.get(); } catch(java.nio.BufferUnderflowException ex) { @@ -729,7 +698,7 @@ public class BasicStream final int sz = readSize(); checkFixedSeq(sz, 1); byte[] v = new byte[sz]; - _buf.get(v); + _buf.b.get(v); return v; } catch(java.nio.BufferUnderflowException ex) @@ -742,7 +711,7 @@ public class BasicStream writeBool(boolean v) { expand(1); - _buf.put(v ? (byte)1 : (byte)0); + _buf.b.put(v ? (byte)1 : (byte)0); } public void @@ -758,7 +727,7 @@ public class BasicStream expand(v.length); for(int i = 0; i < v.length; i++) { - _buf.put(v[i] ? (byte)1 : (byte)0); + _buf.b.put(v[i] ? (byte)1 : (byte)0); } } } @@ -768,7 +737,7 @@ public class BasicStream { try { - return _buf.get() == 1; + return _buf.b.get() == 1; } catch(java.nio.BufferUnderflowException ex) { @@ -786,7 +755,7 @@ public class BasicStream boolean[] v = new boolean[sz]; for(int i = 0; i < sz; i++) { - v[i] = _buf.get() == 1; + v[i] = _buf.b.get() == 1; } return v; } @@ -800,7 +769,7 @@ public class BasicStream writeShort(short v) { expand(2); - _buf.putShort(v); + _buf.b.putShort(v); } public void @@ -824,9 +793,9 @@ public class BasicStream { writeSize(v.length); expand(v.length * 2); - java.nio.ShortBuffer shortBuf = _buf.asShortBuffer(); + java.nio.ShortBuffer shortBuf = _buf.b.asShortBuffer(); shortBuf.put(v); - _buf.position(_buf.position() + v.length * 2); + _buf.b.position(_buf.b.position() + v.length * 2); } } @@ -835,7 +804,7 @@ public class BasicStream { try { - return _buf.getShort(); + return _buf.b.getShort(); } catch(java.nio.BufferUnderflowException ex) { @@ -862,9 +831,9 @@ public class BasicStream final int sz = readSize(); checkFixedSeq(sz, 2); short[] v = new short[sz]; - java.nio.ShortBuffer shortBuf = _buf.asShortBuffer(); + java.nio.ShortBuffer shortBuf = _buf.b.asShortBuffer(); shortBuf.get(v); - _buf.position(_buf.position() + sz * 2); + _buf.b.position(_buf.b.position() + sz * 2); return v; } catch(java.nio.BufferUnderflowException ex) @@ -877,7 +846,7 @@ public class BasicStream writeInt(int v) { expand(4); - _buf.putInt(v); + _buf.b.putInt(v); } public void @@ -901,9 +870,9 @@ public class BasicStream { writeSize(v.length); expand(v.length * 4); - java.nio.IntBuffer intBuf = _buf.asIntBuffer(); + java.nio.IntBuffer intBuf = _buf.b.asIntBuffer(); intBuf.put(v); - _buf.position(_buf.position() + v.length * 4); + _buf.b.position(_buf.b.position() + v.length * 4); } } @@ -912,7 +881,7 @@ public class BasicStream { try { - return _buf.getInt(); + return _buf.b.getInt(); } catch(java.nio.BufferUnderflowException ex) { @@ -939,9 +908,9 @@ public class BasicStream final int sz = readSize(); checkFixedSeq(sz, 4); int[] v = new int[sz]; - java.nio.IntBuffer intBuf = _buf.asIntBuffer(); + java.nio.IntBuffer intBuf = _buf.b.asIntBuffer(); intBuf.get(v); - _buf.position(_buf.position() + sz * 4); + _buf.b.position(_buf.b.position() + sz * 4); return v; } catch(java.nio.BufferUnderflowException ex) @@ -954,7 +923,7 @@ public class BasicStream writeLong(long v) { expand(8); - _buf.putLong(v); + _buf.b.putLong(v); } public void @@ -968,9 +937,9 @@ public class BasicStream { writeSize(v.length); expand(v.length * 8); - java.nio.LongBuffer longBuf = _buf.asLongBuffer(); + java.nio.LongBuffer longBuf = _buf.b.asLongBuffer(); longBuf.put(v); - _buf.position(_buf.position() + v.length * 8); + _buf.b.position(_buf.b.position() + v.length * 8); } } @@ -979,7 +948,7 @@ public class BasicStream { try { - return _buf.getLong(); + return _buf.b.getLong(); } catch(java.nio.BufferUnderflowException ex) { @@ -995,9 +964,9 @@ public class BasicStream final int sz = readSize(); checkFixedSeq(sz, 8); long[] v = new long[sz]; - java.nio.LongBuffer longBuf = _buf.asLongBuffer(); + java.nio.LongBuffer longBuf = _buf.b.asLongBuffer(); longBuf.get(v); - _buf.position(_buf.position() + sz * 8); + _buf.b.position(_buf.b.position() + sz * 8); return v; } catch(java.nio.BufferUnderflowException ex) @@ -1010,7 +979,7 @@ public class BasicStream writeFloat(float v) { expand(4); - _buf.putFloat(v); + _buf.b.putFloat(v); } public void @@ -1024,9 +993,9 @@ public class BasicStream { writeSize(v.length); expand(v.length * 4); - java.nio.FloatBuffer floatBuf = _buf.asFloatBuffer(); + java.nio.FloatBuffer floatBuf = _buf.b.asFloatBuffer(); floatBuf.put(v); - _buf.position(_buf.position() + v.length * 4); + _buf.b.position(_buf.b.position() + v.length * 4); } } @@ -1035,7 +1004,7 @@ public class BasicStream { try { - return _buf.getFloat(); + return _buf.b.getFloat(); } catch(java.nio.BufferUnderflowException ex) { @@ -1051,9 +1020,9 @@ public class BasicStream final int sz = readSize(); checkFixedSeq(sz, 4); float[] v = new float[sz]; - java.nio.FloatBuffer floatBuf = _buf.asFloatBuffer(); + java.nio.FloatBuffer floatBuf = _buf.b.asFloatBuffer(); floatBuf.get(v); - _buf.position(_buf.position() + sz * 4); + _buf.b.position(_buf.b.position() + sz * 4); return v; } catch(java.nio.BufferUnderflowException ex) @@ -1066,7 +1035,7 @@ public class BasicStream writeDouble(double v) { expand(8); - _buf.putDouble(v); + _buf.b.putDouble(v); } public void @@ -1080,9 +1049,9 @@ public class BasicStream { writeSize(v.length); expand(v.length * 8); - java.nio.DoubleBuffer doubleBuf = _buf.asDoubleBuffer(); + java.nio.DoubleBuffer doubleBuf = _buf.b.asDoubleBuffer(); doubleBuf.put(v); - _buf.position(_buf.position() + v.length * 8); + _buf.b.position(_buf.b.position() + v.length * 8); } } @@ -1091,7 +1060,7 @@ public class BasicStream { try { - return _buf.getDouble(); + return _buf.b.getDouble(); } catch(java.nio.BufferUnderflowException ex) { @@ -1107,9 +1076,9 @@ public class BasicStream final int sz = readSize(); checkFixedSeq(sz, 8); double[] v = new double[sz]; - java.nio.DoubleBuffer doubleBuf = _buf.asDoubleBuffer(); + java.nio.DoubleBuffer doubleBuf = _buf.b.asDoubleBuffer(); doubleBuf.get(v); - _buf.position(_buf.position() + sz * 8); + _buf.b.position(_buf.b.position() + sz * 8); return v; } catch(java.nio.BufferUnderflowException ex) @@ -1170,14 +1139,14 @@ public class BasicStream } writeSize(b.limit()); expand(b.limit()); - _buf.put(b); + _buf.b.put(b); return; } _stringBytes[i] = (byte)_stringChars[i]; } writeSize(len); expand(len); - _buf.put(_stringBytes, 0, len); + _buf.b.put(_stringBytes, 0, len); } else { @@ -1228,7 +1197,7 @@ public class BasicStream { _stringChars = new char[len]; } - _buf.get(_stringBytes, 0, len); + _buf.b.get(_stringBytes, 0, len); // // It's more efficient to construct a string using a @@ -1356,7 +1325,7 @@ public class BasicStream } public void - readObject(IceInternal.Patcher patcher) + readObject(Patcher patcher) { Ice.Object v = null; @@ -1747,7 +1716,7 @@ public class BasicStream // for(java.util.Iterator i = patchlist.iterator(); i.hasNext(); ) { - IceInternal.Patcher p = (IceInternal.Patcher)i.next(); + Patcher p = (Patcher)i.next(); try { p.patch(v); @@ -1771,25 +1740,25 @@ public class BasicStream public int pos() { - return _buf.position(); + return _buf.b.position(); } public void pos(int n) { - _buf.position(n); + _buf.b.position(n); } public int size() { - return _limit; + return _buf.size(); } public boolean isEmpty() { - return _limit == 0; + return _buf.empty(); } private static class BufferedOutputStream extends java.io.OutputStream @@ -1865,8 +1834,8 @@ public class BasicStream // If the ByteBuffer is backed by an array then we can avoid // an extra copy by using the array directly. // - data = _buf.array(); - offset = _buf.arrayOffset(); + data = _buf.b.array(); + offset = _buf.b.arrayOffset(); } catch(Exception ex) { @@ -1874,7 +1843,7 @@ public class BasicStream // Otherwise, allocate an array to hold a copy of the uncompressed data. // data = new byte[size()]; - _buf.get(data); + _buf.b.get(data); } try @@ -1922,7 +1891,7 @@ public class BasicStream // Copy the header from the uncompressed stream to the // compressed one. // - cstream._buf.put(data, offset, headerSize); + cstream._buf.b.put(data, offset, headerSize); // // Add the size of the uncompressed stream before the @@ -1933,7 +1902,7 @@ public class BasicStream // // Add the compressed message body. // - cstream._buf.put(compressed, 0, compressedLen); + cstream._buf.b.put(compressed, 0, compressedLen); return cstream; } @@ -1960,8 +1929,8 @@ public class BasicStream // If the ByteBuffer is backed by an array then we can avoid // an extra copy by using the array directly. // - compressed = _buf.array(); - offset = _buf.arrayOffset(); + compressed = _buf.b.array(); + offset = _buf.b.arrayOffset(); } catch(Exception ex) { @@ -1969,7 +1938,7 @@ public class BasicStream // Otherwise, allocate an array to hold a copy of the compressed data. // compressed = new byte[size()]; - _buf.get(compressed); + _buf.b.get(compressed); } BasicStream ucStream = new BasicStream(_instance); @@ -2020,34 +1989,19 @@ public class BasicStream // Copy the header from the compressed stream to the uncompressed one. // ucStream.pos(0); - ucStream._buf.put(compressed, offset, headerSize); + ucStream._buf.b.put(compressed, offset, headerSize); return ucStream; } private void - expand(int size) + expand(int n) { - if(_buf.position() == _limit) + if(!_unlimited && _buf.b != null && _buf.b.position() + n > _messageSizeMax) { - int oldLimit = _limit; - _limit += size; - if(!_unlimited && _limit > _messageSizeMax) - { - throw new Ice.MemoryLimitException(); - } - if(_limit > _capacity) - { - final int cap2 = _capacity << 1; - int newCapacity = cap2 > _limit ? cap2 : _limit; - _buf.limit(oldLimit); - int pos = _buf.position(); - reallocate(newCapacity); - _capacity = _buf.capacity(); - _buf.limit(_capacity); - _buf.position(pos); - } + throw new Ice.MemoryLimitException(); } + _buf.expand(n); } private static final class DynamicObjectFactory implements Ice.ObjectFactory @@ -2334,52 +2288,9 @@ public class BasicStream return buf.toString(); } - private void - allocate(int size) - { - java.nio.ByteBuffer buf = null; - try - { - //buf = java.nio.ByteBuffer.allocateDirect(size); - buf = java.nio.ByteBuffer.allocate(size); - } - catch(OutOfMemoryError ex) - { - Ice.MarshalException e = new Ice.MarshalException(); - e.reason = "OutOfMemoryError occurred while allocating a ByteBuffer"; - e.initCause(ex); - throw e; - } - buf.order(java.nio.ByteOrder.LITTLE_ENDIAN); - _buf = buf; - } - - private void - reallocate(int size) - { - // - // Limit the buffer size to MessageSizeMax - // - if(!_unlimited) - { - size = size > _messageSizeMax ? _messageSizeMax : size; - } - - java.nio.ByteBuffer old = _buf; - assert(old != null); - - allocate(size); - assert(_buf != null); - - old.position(0); - _buf.put(old); - } - - private IceInternal.Instance _instance; + private Instance _instance; + private Buffer _buf; private Object _closure; - private java.nio.ByteBuffer _buf; - private int _capacity; // Cache capacity to avoid excessive method calls. - private int _limit; // Cache limit to avoid excessive method calls. private byte[] _stringBytes; // Reusable array for reading strings. private char[] _stringChars; // Reusable array for reading strings. |