summaryrefslogtreecommitdiff
path: root/cppe/src/IceE/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cppe/src/IceE/Connection.cpp')
-rwxr-xr-xcppe/src/IceE/Connection.cpp178
1 files changed, 139 insertions, 39 deletions
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp
index 57f22775c0b..8e783959d80 100755
--- a/cppe/src/IceE/Connection.cpp
+++ b/cppe/src/IceE/Connection.cpp
@@ -540,6 +540,7 @@ Ice::Connection::prepareBatchRequest(BasicStream* os)
}
_batchStreamInUse = true;
+ _batchMarker = _batchStream.b.size();
_batchStream.swap(*os);
//
@@ -551,21 +552,103 @@ Ice::Connection::prepareBatchRequest(BasicStream* os)
void
Ice::Connection::finishBatchRequest(BasicStream* os)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ bool autoflush = false;
+ vector<Ice::Byte> lastRequest;
- //
- // Get the batch stream back and increment the number of requests
- // in the batch.
- //
- _batchStream.swap(*os);
- ++_batchRequestNum;
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- //
- // Notify about the batch stream not being in use anymore.
- //
- assert(_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
+ //
+ // Get the batch stream back.
+ //
+ _batchStream.swap(*os);
+
+ if(_batchAutoFlush)
+ {
+ Lock sendSync(_sendMonitor);
+
+ if(!_transceiver)
+ {
+ assert(_exception.get());
+ _exception->ice_throw(); // The exception is immutable at this point.
+ }
+
+ //
+ // Throw memory limit exception if the first message added causes us to
+ // go over limit. Otherwise put aside the marshalled message that caused
+ // limit to be exceeded and rollback stream to the marker.
+ //
+ if(_batchStream.b.size() > _instance->messageSizeMax())
+ {
+ if(_batchRequestNum == 0)
+ {
+ resetBatch(true);
+ throw MemoryLimitException(__FILE__, __LINE__);
+ }
+ vector<Ice::Byte>(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()).swap(lastRequest);
+ _batchStream.b.resize(_batchMarker);
+ autoflush = true;
+ }
+ }
+
+ if(!autoflush)
+ {
+ //
+ // Increment the number of requests in the batch.
+ //
+ ++_batchRequestNum;
+
+ //
+ // Notify about the batch stream not being in use anymore.
+ //
+ assert(_batchStreamInUse);
+ _batchStreamInUse = false;
+ notifyAll();
+ }
+ }
+
+ if(autoflush)
+ {
+ //
+ // We have to keep _batchStreamInUse set until after we insert the
+ // saved marshalled data into a new stream.
+ //
+ flushBatchRequestsInternal(true);
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // Throw memory limit exception if the message that caused us to go over
+ // limit causes us to exceed the limit by itself.
+ //
+ if(sizeof(requestBatchHdr) + lastRequest.size() > _instance->messageSizeMax())
+ {
+ resetBatch(true);
+ throw MemoryLimitException(__FILE__, __LINE__);
+ }
+
+ //
+ // Start a new batch with the last message that caused us to
+ // go over the limit.
+ //
+ try
+ {
+ _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
+ _batchStream.writeBlob(&lastRequest[0], lastRequest.size());
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ ex.ice_throw();
+ }
+
+ //
+ // Notify that the batch stream not in use anymore.
+ //
+ ++_batchRequestNum;
+ _batchStreamInUse = false;
+ notifyAll();
+ }
}
void
@@ -574,34 +657,32 @@ Ice::Connection::abortBatchRequest()
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
//
- // Destroy and reset the batch stream and batch count. We cannot
- // safe old requests in the batch stream, as they might be
- // corrupted due to incomplete marshaling.
+ // Reset the batch stream. We cannot save old requests
+ // in the batch stream, as they might be corrupted due to
+ // incomplete marshaling.
//
- BasicStream dummy(_instance.get(), _instance->messageSizeMax(), _instance->initializationData().stringConverter,
- _instance->initializationData().wstringConverter);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
-
- //
- // Notify about the batch stream not being in use
- // anymore.
- //
- assert(_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
+ resetBatch(true);
}
void
Ice::Connection::flushBatchRequests()
{
+ flushBatchRequestsInternal(false);
+}
+
+void
+Ice::Connection::flushBatchRequestsInternal(bool ignoreInUse)
+{
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- while(_batchStreamInUse && !_exception.get())
- {
- wait();
- }
+ if(!ignoreInUse)
+ {
+ while(_batchStreamInUse && !_exception.get())
+ {
+ wait();
+ }
+ }
if(_exception.get())
{
@@ -698,12 +779,28 @@ Ice::Connection::flushBatchRequests()
//
// Reset the batch stream, and notify that flushing is over.
//
- BasicStream dummy(_instance.get(), _instance->messageSizeMax(), _instance->initializationData().stringConverter,
- _instance->initializationData().wstringConverter);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchStreamInUse = false;
- notifyAll();
+ resetBatch(!ignoreInUse);
+ }
+}
+
+void
+Ice::Connection::resetBatch(bool resetInUse)
+{
+ BasicStream dummy(_instance.get(), _instance->messageSizeMax(), _instance->initializationData().stringConverter,
+ _instance->initializationData().wstringConverter, _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchMarker = 0;
+
+ //
+ // Notify about the batch stream not being in use
+ // anymore.
+ //
+ if(resetInUse)
+ {
+ assert(_batchStreamInUse);
+ _batchStreamInUse = false;
+ notifyAll();
}
}
@@ -918,10 +1015,13 @@ Ice::Connection::Connection(const InstancePtr& instance,
_instance->initializationData().wstringConverter),
#endif
#ifdef ICEE_HAS_BATCH
+ _batchAutoFlush(
+ _instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0),
_batchStream(_instance.get(), _instance->messageSizeMax(), _instance->initializationData().stringConverter,
- _instance->initializationData().wstringConverter),
+ _instance->initializationData().wstringConverter, _batchAutoFlush),
_batchStreamInUse(false),
_batchRequestNum(0),
+ _batchMarker(0),
#endif
_dispatchCount(0),
_state(StateNotValidated),