diff options
author | Dwayne Boone <dwayne@zeroc.com> | 2006-12-22 15:56:26 +0000 |
---|---|---|
committer | Dwayne Boone <dwayne@zeroc.com> | 2006-12-22 15:56:26 +0000 |
commit | cdad6934fd09e809cdb02404e98e1c63ebbfcb63 (patch) | |
tree | d449267709aff3b81ca155b9e11373850859825a /java/src | |
parent | adding Ice.CacheMessageBuffers (diff) | |
download | ice-cdad6934fd09e809cdb02404e98e1c63ebbfcb63.tar.bz2 ice-cdad6934fd09e809cdb02404e98e1c63ebbfcb63.tar.xz ice-cdad6934fd09e809cdb02404e98e1c63ebbfcb63.zip |
Added autoflushing of batches
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/Ice/ConnectionI.java | 201 | ||||
-rw-r--r-- | java/src/IceInternal/BasicStream.java | 27 |
2 files changed, 173 insertions, 55 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 6d9b24f4f98..ec7e571f4e4 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -831,6 +831,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } _batchStreamInUse = true; + _batchMarker = _batchStream.size(); _batchStream.swap(os); // @@ -839,70 +840,151 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // } - public synchronized void + public void finishBatchRequest(IceInternal.BasicStream os, boolean compress) { - // - // Get the batch stream back and increment the number of - // requests in the batch. - // - _batchStream.swap(os); - ++_batchRequestNum; + boolean autoflush = false; + byte[] lastRequest = null; - // - // We compress the whole batch if there is at least one compressed - // message. - // - if(compress) - { - _batchRequestCompress = true; - } + synchronized(this) + { + // + // Get the batch stream back. + // + _batchStream.swap(os); - // - // Notify about the batch stream not being in use anymore. - // - assert(_batchStreamInUse); - _batchStreamInUse = false; - notifyAll(); + if(_batchStream.size() > _instance.messageSizeMax()) + { + // + // Throw memory limit exception if the first message added causes us to + // go over limit. Otherwise put aside the marshalled message that caused + // limit to be exceeded and rollback stream to the marker. + // + if(_batchRequestNum == 0) + { + resetBatch(true); + throw new MemoryLimitException(); + } + + lastRequest = new byte[_batchStream.size() - _batchMarker]; + java.nio.ByteBuffer buffer = _batchStream.prepareRead(); + buffer.position(_batchMarker); + buffer.get(lastRequest); + _batchStream.resize(_batchMarker, false); + autoflush = true; + } + else + { + // + // Increment the number of requests in the batch. + // + ++_batchRequestNum; + + // + // We compress the whole batch if there is at least one compressed + // message. + // + if(compress) + { + _batchRequestCompress = true; + } + + // + // Notify about the batch stream not being in use anymore. + // + assert(_batchStreamInUse); + _batchStreamInUse = false; + notifyAll(); + } + } + + if(autoflush) + { + // + // We have to keep _batchStreamInUse set until after we insert the + // saved marshalled data into a new stream. + // + flushBatchRequestsInternal(true); + + synchronized(this) + { + // + // Throw memory limit exception if the message that caused us to go over + // limit causes us to exceed the limit by itself. + // + if(IceInternal.Protocol.requestBatchHdr.length + lastRequest.length > _instance.messageSizeMax()) + { + resetBatch(true); + throw new MemoryLimitException(); + } + + // + // Start a new batch with the last message that caused us to + // go over the limit. + // + try + { + _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr); + _batchStream.writeBlob(lastRequest); + } + catch(LocalException ex) + { + setState(StateClosed, ex); + throw ex; + } + + if(compress) + { + _batchRequestCompress = true; + } + + // + // Notify that the batch stream not in use anymore. + // + ++_batchRequestNum; + _batchStreamInUse = false; + notifyAll(); + } + } } public synchronized void abortBatchRequest() { - // - // Destroy and reset the batch stream and batch count. We - // cannot save old requests in the batch stream, as they might - // be corrupted due to incomplete marshaling. - // - _batchStream = new IceInternal.BasicStream(_instance); - _batchRequestNum = 0; - _batchRequestCompress = false; - - // - // Notify about the batch stream not being in use anymore. - // - assert(_batchStreamInUse); - _batchStreamInUse = false; - notifyAll(); + // + // Reset the batch stream. We cannot save old requests + // in the batch stream, as they might be corrupted due to + // incomplete marshaling. + // + resetBatch(true); } public void flushBatchRequests() { + flushBatchRequestsInternal(false); + } + + private void + flushBatchRequestsInternal(boolean ignoreInUse) + { IceInternal.BasicStream stream = null; synchronized(this) { - while(_batchStreamInUse && _exception == null) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } + if(!ignoreInUse) + { + while(_batchStreamInUse && _exception == null) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + } if(_exception != null) { @@ -993,14 +1075,28 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // Reset the batch stream, and notify that flushing is over. // - _batchStream = new IceInternal.BasicStream(_instance); - _batchRequestNum = 0; - _batchRequestCompress = false; - _batchStreamInUse = false; - notifyAll(); + resetBatch(!ignoreInUse); } } + private void + resetBatch(boolean resetInUse) + { + _batchStream = new IceInternal.BasicStream(_instance, true); + _batchRequestNum = 0; + _batchRequestCompress = false; + + // + // Notify about the batch stream not being in use anymore. + // + if(resetInUse) + { + assert(_batchStreamInUse); + _batchStreamInUse = false; + notifyAll(); + } + } + public void sendResponse(IceInternal.BasicStream os, byte compressFlag) { @@ -1374,7 +1470,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne "Ice.CacheMessageBuffers", 1) == 1; _acmAbsoluteTimeoutMillis = 0; _nextRequestId = 1; - _batchStream = new IceInternal.BasicStream(instance); + _batchStream = new IceInternal.BasicStream(instance, true); _batchStreamInUse = false; _batchRequestNum = 0; _batchRequestCompress = false; @@ -2621,6 +2717,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private boolean _batchStreamInUse; private int _batchRequestNum; private boolean _batchRequestCompress; + private int _batchMarker; private int _dispatchCount; diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java index 12c20c90daf..ad8aaabb1fb 100644 --- a/java/src/IceInternal/BasicStream.java +++ b/java/src/IceInternal/BasicStream.java @@ -14,7 +14,20 @@ public class BasicStream public BasicStream(IceInternal.Instance instance) { + initialize(instance, false); + } + + public + BasicStream(IceInternal.Instance instance, boolean unlimited) + { + initialize(instance, unlimited); + } + + private void + initialize(IceInternal.Instance instance, boolean unlimited) + { _instance = instance; + _unlimited = unlimited; allocate(1500); _capacity = _buf.capacity(); _limit = 0; @@ -115,12 +128,16 @@ public class BasicStream java.util.ArrayList tmpObjectList = other._objectList; other._objectList = _objectList; _objectList = tmpObjectList; + + boolean tmpUnlimited = other._unlimited; + other._unlimited = _unlimited; + _unlimited = tmpUnlimited; } public void resize(int total, boolean reading) { - if(total > _messageSizeMax) + if(!_unlimited && total > _messageSizeMax) { throw new Ice.MemoryLimitException(); } @@ -1934,7 +1951,7 @@ public class BasicStream { int oldLimit = _limit; _limit += size; - if(_limit > _messageSizeMax) + if(!_unlimited && _limit > _messageSizeMax) { throw new Ice.MemoryLimitException(); } @@ -2262,7 +2279,10 @@ public class BasicStream // // Limit the buffer size to MessageSizeMax // - size = size > _messageSizeMax ? _messageSizeMax : size; + if(!_unlimited) + { + size = size > _messageSizeMax ? _messageSizeMax : size; + } java.nio.ByteBuffer old = _buf; assert(old != null); @@ -2347,6 +2367,7 @@ public class BasicStream private boolean _sliceObjects; private int _messageSizeMax; + private boolean _unlimited; private static final class SeqData { |