summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp180
1 files changed, 136 insertions, 44 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index c66e096b376..95a6fb66267 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -878,6 +878,7 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os)
}
_batchStreamInUse = true;
+ _batchMarker = _batchStream.b.size();
_batchStream.swap(*os);
//
@@ -889,30 +890,106 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os)
void
Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ bool autoflush = false;
+ vector<Ice::Byte> lastRequest;
- //
- // Get the batch stream back and increment the number of requests
- // in the batch.
- //
- _batchStream.swap(*os);
- ++_batchRequestNum;
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- //
- // We compress the whole batch if there is at least one compressed
- // message.
- //
- if(compress)
+ //
+ // Get the batch stream back.
+ //
+ _batchStream.swap(*os);
+
+ if(_batchStream.b.size() > _instance->messageSizeMax())
+ {
+ //
+ // Throw memory limit exception if the first message added causes us to
+ // go over limit. Otherwise put aside the marshalled message that caused
+ // limit to be exceeded and rollback stream to the marker.
+ //
+ if(_batchRequestNum == 0)
+ {
+ resetBatch(true);
+ throw MemoryLimitException(__FILE__, __LINE__);
+ }
+
+ vector<Ice::Byte>(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()).swap(lastRequest);
+ _batchStream.b.resize(_batchMarker);
+ autoflush = true;
+ }
+ else
+ {
+ //
+ // Increment the number of requests in the batch.
+ //
+ ++_batchRequestNum;
+
+ //
+ // We compress the whole batch if there is at least one compressed
+ // message.
+ //
+ if(compress)
+ {
+ _batchRequestCompress = true;
+ }
+
+ //
+ // Notify about the batch stream not being in use anymore.
+ //
+ assert(_batchStreamInUse);
+ _batchStreamInUse = false;
+ notifyAll();
+ }
+ }
+
+ if(autoflush)
{
- _batchRequestCompress = true;
+ //
+ // We have to keep _batchStreamInUse set until after we insert the
+ // saved marshalled data into a new stream.
+ //
+ flushBatchRequestsInternal(true);
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ //
+ // Throw memory limit exception if the message that caused us to go over
+ // limit causes us to exceed the limit by itself.
+ //
+ if(sizeof(requestBatchHdr) + lastRequest.size() > _instance->messageSizeMax())
+ {
+ resetBatch(true);
+ throw MemoryLimitException(__FILE__, __LINE__);
+ }
+
+ //
+ // Start a new batch with the last message that caused us to
+ // go over the limit.
+ //
+ try
+ {
+ _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
+ _batchStream.writeBlob(&lastRequest[0], lastRequest.size());
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ ex.ice_throw();
+ }
+
+ if(compress)
+ {
+ _batchRequestCompress = true;
+ }
+
+ //
+ // Notify that the batch stream not in use anymore.
+ //
+ ++_batchRequestNum;
+ _batchStreamInUse = false;
+ notifyAll();
}
-
- //
- // Notify about the batch stream not being in use anymore.
- //
- assert(_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
}
void
@@ -921,33 +998,31 @@ Ice::ConnectionI::abortBatchRequest()
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
//
- // Destroy and reset the batch stream and batch count. We cannot
- // safe old requests in the batch stream, as they might be
- // corrupted due to incomplete marshaling.
+ // Reset the batch stream. We cannot save old requests
+ // in the batch stream, as they might be corrupted due to
+ // incomplete marshaling.
//
- BasicStream dummy(_instance.get());
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
-
- //
- // Notify about the batch stream not being in use
- // anymore.
- //
- assert(_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
+ resetBatch(true);
}
void
Ice::ConnectionI::flushBatchRequests()
{
+ flushBatchRequestsInternal(false);
+}
+
+void
+Ice::ConnectionI::flushBatchRequestsInternal(bool ignoreInUse)
+{
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- while(_batchStreamInUse && !_exception.get())
+ if(!ignoreInUse)
{
- wait();
+ while(_batchStreamInUse && !_exception.get())
+ {
+ wait();
+ }
}
if(_exception.get())
@@ -1066,12 +1141,28 @@ Ice::ConnectionI::flushBatchRequests()
//
// Reset the batch stream, and notify that flushing is over.
//
- BasicStream dummy(_instance.get());
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchStreamInUse = false;
- notifyAll();
+ resetBatch(!ignoreInUse);
+ }
+}
+
+void
+Ice::ConnectionI::resetBatch(bool resetInUse)
+{
+ BasicStream dummy(_instance.get(), true);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchRequestCompress = false;
+ _batchMarker = 0;
+
+ //
+ // Notify about the batch stream not being in use
+ // anymore.
+ //
+ if(resetInUse)
+ {
+ assert(_batchStreamInUse);
+ _batchStreamInUse = false;
+ notifyAll();
}
}
@@ -1465,10 +1556,11 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
_nextRequestId(1),
_requestsHint(_requests.end()),
_asyncRequestsHint(_asyncRequests.end()),
- _batchStream(_instance.get()),
+ _batchStream(_instance.get(), true),
_batchStreamInUse(false),
_batchRequestNum(0),
_batchRequestCompress(false),
+ _batchMarker(0),
_dispatchCount(0),
_state(StateNotValidated),
_stateTime(IceUtil::Time::now())