summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorDwayne Boone <dwayne@zeroc.com>2006-12-22 15:56:26 +0000
committerDwayne Boone <dwayne@zeroc.com>2006-12-22 15:56:26 +0000
commitcdad6934fd09e809cdb02404e98e1c63ebbfcb63 (patch)
treed449267709aff3b81ca155b9e11373850859825a /java/src
parentadding Ice.CacheMessageBuffers (diff)
downloadice-cdad6934fd09e809cdb02404e98e1c63ebbfcb63.tar.bz2
ice-cdad6934fd09e809cdb02404e98e1c63ebbfcb63.tar.xz
ice-cdad6934fd09e809cdb02404e98e1c63ebbfcb63.zip
Added autoflushing of batches
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/ConnectionI.java201
-rw-r--r--java/src/IceInternal/BasicStream.java27
2 files changed, 173 insertions, 55 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 6d9b24f4f98..ec7e571f4e4 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -831,6 +831,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
_batchStreamInUse = true;
+ _batchMarker = _batchStream.size();
_batchStream.swap(os);
//
@@ -839,70 +840,151 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
}
- public synchronized void
+ public void
finishBatchRequest(IceInternal.BasicStream os, boolean compress)
{
- //
- // Get the batch stream back and increment the number of
- // requests in the batch.
- //
- _batchStream.swap(os);
- ++_batchRequestNum;
+ boolean autoflush = false;
+ byte[] lastRequest = null;
- //
- // We compress the whole batch if there is at least one compressed
- // message.
- //
- if(compress)
- {
- _batchRequestCompress = true;
- }
+ synchronized(this)
+ {
+ //
+ // Get the batch stream back.
+ //
+ _batchStream.swap(os);
- //
- // Notify about the batch stream not being in use anymore.
- //
- assert(_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
+ if(_batchStream.size() > _instance.messageSizeMax())
+ {
+ //
+ // Throw memory limit exception if the first message added causes us to
+ // go over limit. Otherwise put aside the marshalled message that caused
+ // limit to be exceeded and rollback stream to the marker.
+ //
+ if(_batchRequestNum == 0)
+ {
+ resetBatch(true);
+ throw new MemoryLimitException();
+ }
+
+ lastRequest = new byte[_batchStream.size() - _batchMarker];
+ java.nio.ByteBuffer buffer = _batchStream.prepareRead();
+ buffer.position(_batchMarker);
+ buffer.get(lastRequest);
+ _batchStream.resize(_batchMarker, false);
+ autoflush = true;
+ }
+ else
+ {
+ //
+ // Increment the number of requests in the batch.
+ //
+ ++_batchRequestNum;
+
+ //
+ // We compress the whole batch if there is at least one compressed
+ // message.
+ //
+ if(compress)
+ {
+ _batchRequestCompress = true;
+ }
+
+ //
+ // Notify about the batch stream not being in use anymore.
+ //
+ assert(_batchStreamInUse);
+ _batchStreamInUse = false;
+ notifyAll();
+ }
+ }
+
+ if(autoflush)
+ {
+ //
+ // We have to keep _batchStreamInUse set until after we insert the
+ // saved marshalled data into a new stream.
+ //
+ flushBatchRequestsInternal(true);
+
+ synchronized(this)
+ {
+ //
+ // Throw memory limit exception if the message that caused us to go over
+ // limit causes us to exceed the limit by itself.
+ //
+ if(IceInternal.Protocol.requestBatchHdr.length + lastRequest.length > _instance.messageSizeMax())
+ {
+ resetBatch(true);
+ throw new MemoryLimitException();
+ }
+
+ //
+ // Start a new batch with the last message that caused us to
+ // go over the limit.
+ //
+ try
+ {
+ _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr);
+ _batchStream.writeBlob(lastRequest);
+ }
+ catch(LocalException ex)
+ {
+ setState(StateClosed, ex);
+ throw ex;
+ }
+
+ if(compress)
+ {
+ _batchRequestCompress = true;
+ }
+
+ //
+ // Notify that the batch stream not in use anymore.
+ //
+ ++_batchRequestNum;
+ _batchStreamInUse = false;
+ notifyAll();
+ }
+ }
}
public synchronized void
abortBatchRequest()
{
- //
- // Destroy and reset the batch stream and batch count. We
- // cannot save old requests in the batch stream, as they might
- // be corrupted due to incomplete marshaling.
- //
- _batchStream = new IceInternal.BasicStream(_instance);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
-
- //
- // Notify about the batch stream not being in use anymore.
- //
- assert(_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
+ //
+ // Reset the batch stream. We cannot save old requests
+ // in the batch stream, as they might be corrupted due to
+ // incomplete marshaling.
+ //
+ resetBatch(true);
}
public void
flushBatchRequests()
{
+ flushBatchRequestsInternal(false);
+ }
+
+ private void
+ flushBatchRequestsInternal(boolean ignoreInUse)
+ {
IceInternal.BasicStream stream = null;
synchronized(this)
{
- while(_batchStreamInUse && _exception == null)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
+ if(!ignoreInUse)
+ {
+ while(_batchStreamInUse && _exception == null)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+ }
if(_exception != null)
{
@@ -993,14 +1075,28 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
// Reset the batch stream, and notify that flushing is over.
//
- _batchStream = new IceInternal.BasicStream(_instance);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchStreamInUse = false;
- notifyAll();
+ resetBatch(!ignoreInUse);
}
}
+ private void
+ resetBatch(boolean resetInUse)
+ {
+ _batchStream = new IceInternal.BasicStream(_instance, true);
+ _batchRequestNum = 0;
+ _batchRequestCompress = false;
+
+ //
+ // Notify about the batch stream not being in use anymore.
+ //
+ if(resetInUse)
+ {
+ assert(_batchStreamInUse);
+ _batchStreamInUse = false;
+ notifyAll();
+ }
+ }
+
public void
sendResponse(IceInternal.BasicStream os, byte compressFlag)
{
@@ -1374,7 +1470,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
"Ice.CacheMessageBuffers", 1) == 1;
_acmAbsoluteTimeoutMillis = 0;
_nextRequestId = 1;
- _batchStream = new IceInternal.BasicStream(instance);
+ _batchStream = new IceInternal.BasicStream(instance, true);
_batchStreamInUse = false;
_batchRequestNum = 0;
_batchRequestCompress = false;
@@ -2621,6 +2717,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private boolean _batchStreamInUse;
private int _batchRequestNum;
private boolean _batchRequestCompress;
+ private int _batchMarker;
private int _dispatchCount;
diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java
index 12c20c90daf..ad8aaabb1fb 100644
--- a/java/src/IceInternal/BasicStream.java
+++ b/java/src/IceInternal/BasicStream.java
@@ -14,7 +14,20 @@ public class BasicStream
public
BasicStream(IceInternal.Instance instance)
{
+ initialize(instance, false);
+ }
+
+ public
+ BasicStream(IceInternal.Instance instance, boolean unlimited)
+ {
+ initialize(instance, unlimited);
+ }
+
+ private void
+ initialize(IceInternal.Instance instance, boolean unlimited)
+ {
_instance = instance;
+ _unlimited = unlimited;
allocate(1500);
_capacity = _buf.capacity();
_limit = 0;
@@ -115,12 +128,16 @@ public class BasicStream
java.util.ArrayList tmpObjectList = other._objectList;
other._objectList = _objectList;
_objectList = tmpObjectList;
+
+ boolean tmpUnlimited = other._unlimited;
+ other._unlimited = _unlimited;
+ _unlimited = tmpUnlimited;
}
public void
resize(int total, boolean reading)
{
- if(total > _messageSizeMax)
+ if(!_unlimited && total > _messageSizeMax)
{
throw new Ice.MemoryLimitException();
}
@@ -1934,7 +1951,7 @@ public class BasicStream
{
int oldLimit = _limit;
_limit += size;
- if(_limit > _messageSizeMax)
+ if(!_unlimited && _limit > _messageSizeMax)
{
throw new Ice.MemoryLimitException();
}
@@ -2262,7 +2279,10 @@ public class BasicStream
//
// Limit the buffer size to MessageSizeMax
//
- size = size > _messageSizeMax ? _messageSizeMax : size;
+ if(!_unlimited)
+ {
+ size = size > _messageSizeMax ? _messageSizeMax : size;
+ }
java.nio.ByteBuffer old = _buf;
assert(old != null);
@@ -2347,6 +2367,7 @@ public class BasicStream
private boolean _sliceObjects;
private int _messageSizeMax;
+ private boolean _unlimited;
private static final class SeqData
{