summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Connection.cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2004-01-23 17:50:31 +0000
committerMarc Laukien <marc@zeroc.com>2004-01-23 17:50:31 +0000
commitd558a3af64c1b1cbc8e188d7f13705dad3677ced (patch)
tree176be43bf5b11db91f9778b3df709d99d7a81ed6 /cpp/src/Ice/Connection.cpp
parentcomment edit (diff)
downloadice-d558a3af64c1b1cbc8e188d7f13705dad3677ced.tar.bz2
ice-d558a3af64c1b1cbc8e188d7f13705dad3677ced.tar.xz
ice-d558a3af64c1b1cbc8e188d7f13705dad3677ced.zip
fixes
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r--cpp/src/Ice/Connection.cpp69
1 files changed, 45 insertions, 24 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index f684f3525b2..c7b08b24487 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -666,6 +666,11 @@ IceInternal::Connection::prepareBatchRequest(BasicStream* os)
assert(_state > StateNotValidated);
assert(_state < StateClosing);
+ while(_batchFlushInProgress)
+ {
+ wait();
+ }
+
if(_batchStream.b.empty())
{
try
@@ -714,8 +719,6 @@ IceInternal::Connection::abortBatchRequest()
void
IceInternal::Connection::flushBatchRequest()
{
- BasicStream batchStream(_instance.get());
-
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
@@ -736,38 +739,34 @@ IceInternal::Connection::flushBatchRequest()
return; // Nothing to do.
}
+ _batchStream.i = _batchStream.b.begin();
+
if(_acmTimeout > 0)
{
_acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
}
//
- // Reset _batchStream and _batchRequestNum, so that new batch
- // messages can be sent.
+ // Prevent that new batch requests are added while we are
+ // flushing.
//
- _batchStream.swap(batchStream);
- assert(_batchStream.b.empty());
- _batchRequestNum = 0;
+ _batchFlushInProgress = true;
}
- assert(!batchStream.b.empty());
-
try
{
- batchStream.i = batchStream.b.begin();
-
//
// Fill in the number of requests in the batch.
//
const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), batchStream.b.begin() + headerSize);
+ reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
#else
- copy(p, p + sizeof(Int), batchStream.b.begin() + headerSize);
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
#endif
bool compress;
- if(batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
+ if(_batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
{
compress = false;
}
@@ -781,20 +780,20 @@ IceInternal::Connection::flushBatchRequest()
//
// Set compression status.
//
- batchStream.b[9] = 2; // Message is compressed.
+ _batchStream.b[9] = 2; // Message is compressed.
//
// Do compression.
//
BasicStream cstream(_instance.get());
- doCompress(batchStream, cstream);
+ doCompress(_batchStream, cstream);
//
// Send the batch request.
//
IceUtil::Mutex::Lock sendSync(_sendMutex);
- batchStream.i = batchStream.b.begin();
- traceBatchRequest("sending batch request", batchStream, _logger, _traceLevels);
+ _batchStream.i = _batchStream.b.begin();
+ traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
cstream.i = cstream.b.begin();
_transceiver->write(cstream, _endpoint->timeout());
}
@@ -803,21 +802,21 @@ IceInternal::Connection::flushBatchRequest()
//
// No compression, just fill in the message size.
//
- Int sz = static_cast<Int>(batchStream.b.size());
+ Int sz = static_cast<Int>(_batchStream.b.size());
p = reinterpret_cast<const Byte*>(&sz);
#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), batchStream.b.begin() + 10);
+ reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + 10);
#else
- copy(p, p + sizeof(Int), batchStream.b.begin() + 10);
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + 10);
#endif
//
// Send the batch request.
//
IceUtil::Mutex::Lock sendSync(_sendMutex);
- batchStream.i = batchStream.b.begin();
- traceBatchRequest("sending batch request", batchStream, _logger, _traceLevels);
- _transceiver->write(batchStream, _endpoint->timeout());
+ _batchStream.i = _batchStream.b.begin();
+ traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
+ _transceiver->write(_batchStream, _endpoint->timeout());
}
}
catch(const LocalException& ex)
@@ -827,6 +826,21 @@ IceInternal::Connection::flushBatchRequest()
assert(_exception.get());
//
+ // Reset _batchStream and _batchRequestNum, so that new batch
+ // messages can be sent.
+ //
+ BasicStream dummy(_instance.get());
+ _batchStream.swap(dummy);
+ assert(_batchStream.b.empty());
+ _batchRequestNum = 0;
+
+ //
+ // Notify that flushing is over.
+ //
+ _batchFlushInProgress = false;
+ notifyAll();
+
+ //
// Since batch requests are all oneways (or datagrams), we
// must report the exception to the caller.
//
@@ -836,6 +850,12 @@ IceInternal::Connection::flushBatchRequest()
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+ //
+ // Notify that flushing is over.
+ //
+ _batchFlushInProgress = false;
+ notifyAll();
+
if(_proxyCount == 0 && !_adapter && closingOK())
{
setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
@@ -1459,6 +1479,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance,
_asyncRequestsHint(_asyncRequests.end()),
_batchStream(_instance.get()),
_batchRequestNum(0),
+ _batchFlushInProgress(false),
_dispatchCount(0),
_proxyCount(0),
_state(StateNotValidated),