summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Connection.cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2004-02-20 17:44:41 +0000
committerMarc Laukien <marc@zeroc.com>2004-02-20 17:44:41 +0000
commitf86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad (patch)
tree520786ae72c4376b505f21f8adf9f5ea522cf9bf /cpp/src/Ice/Connection.cpp
parentWin32 fixes (diff)
downloadice-f86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad.tar.bz2
ice-f86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad.tar.xz
ice-f86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad.zip
C++ -> Java
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r--cpp/src/Ice/Connection.cpp87
1 files changed, 35 insertions, 52 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index bfb0fbcff39..39e4f3772c3 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -343,9 +343,11 @@ IceInternal::Connection::monitor()
if(_acmTimeout > 0 &&
_requests.empty() &&
_asyncRequests.empty() &&
- _batchStream.b.empty() &&
+ !_batchStreamInUse &&
_dispatchCount == 0)
{
+ assert(_batchStream.b.empty());
+
if(IceUtil::Time::now() >= _acmAbsoluteTimeout)
{
setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__));
@@ -354,6 +356,9 @@ IceInternal::Connection::monitor()
}
}
+//
+// TODO: Should not be a member function of Connection.
+//
void
IceInternal::Connection::prepareRequest(BasicStream* os)
{
@@ -418,11 +423,11 @@ IceInternal::Connection::sendRequest(BasicStream* os, Outgoing* out)
try
{
IceUtil::Mutex::Lock sendSync(_sendMutex);
+
if(!_transceiver) // Has the transceiver already been closed?
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(_exception.get());
- _exception->ice_throw();
+ assert(_exception.get());
+ _exception->ice_throw(); // The exception is immutable at this point.
}
bool compress;
@@ -575,11 +580,11 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt
try
{
IceUtil::Mutex::Lock sendSync(_sendMutex);
+
if(!_transceiver) // Has the transceiver already been closed?
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_exception.get());
- _exception->ice_throw();
+ _exception->ice_throw(); // The exception is immutable at this point.
}
bool compress;
@@ -675,21 +680,21 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt
void
IceInternal::Connection::prepareBatchRequest(BasicStream* os)
{
- lock();
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
//
// Wait if flushing is currently in progress.
//
- while(_batchFlushInProgress)
+ while(_batchStreamInUse && !_exception.get())
{
wait();
}
if(_exception.get())
{
- unlock();
_exception->ice_throw();
}
+
assert(_state > StateNotValidated);
assert(_state < StateClosing);
@@ -702,44 +707,41 @@ IceInternal::Connection::prepareBatchRequest(BasicStream* os)
catch(const LocalException& ex)
{
setState(StateClosed, ex);
- unlock();
ex.ice_throw();
}
}
+ _batchStreamInUse = true;
_batchStream.swap(*os);
//
- // The Connection and _batchStream now belongs to the caller,
- // until finishBatchRequest() or abortBatchRequest() is called.
+ // _batchStream now belongs to the caller, until
+ // finishBatchRequest() is called.
//
}
void
IceInternal::Connection::finishBatchRequest(BasicStream* os)
{
- assert(!_batchFlushInProgress);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_exception.get())
{
- unlock();
_exception->ice_throw();
}
+
assert(_state > StateNotValidated);
assert(_state < StateClosing);
_batchStream.swap(*os); // Get the batch stream back.
++_batchRequestNum; // Increment the number of requests in the batch.
- unlock(); // Give the Connection back.
-}
-
-void
-IceInternal::Connection::abortBatchRequest()
-{
- assert(!_batchFlushInProgress);
- setState(StateClosed, AbortBatchRequestException(__FILE__, __LINE__));
- unlock(); // Give the Connection back.
+ //
+ // Give the Connection back.
+ //
+ assert(_batchStreamInUse);
+ _batchStreamInUse = false;
+ notifyAll();
}
void
@@ -748,20 +750,13 @@ IceInternal::Connection::flushBatchRequest()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- //
- // Wait if flushing is currently in progress.
- //
- while(_batchFlushInProgress)
+ while(_batchStreamInUse && !_exception.get())
{
wait();
}
if(_exception.get())
{
- //
- // Since batch requests are all oneways (or datagrams), we
- // must report the exception to the caller.
- //
_exception->ice_throw();
}
@@ -784,17 +779,17 @@ IceInternal::Connection::flushBatchRequest()
// Prevent that new batch requests are added while we are
// flushing.
//
- _batchFlushInProgress = true;
+ _batchStreamInUse = true;
}
try
{
IceUtil::Mutex::Lock sendSync(_sendMutex);
+
if(!_transceiver) // Has the transceiver already been closed?
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_exception.get());
- _exception->ice_throw();
+ _exception->ice_throw(); // The exception is immutable at this point.
}
//
@@ -866,16 +861,6 @@ IceInternal::Connection::flushBatchRequest()
assert(_exception.get());
//
- // Reset the batch stream, and notify that flushing is over.
- //
- BasicStream dummy(_instance.get());
- _batchStream.swap(dummy);
- assert(_batchStream.b.empty());
- _batchRequestNum = 0;
- _batchFlushInProgress = false;
- notifyAll();
-
- //
// Since batch requests are all oneways (or datagrams), we
// must report the exception to the caller.
//
@@ -892,7 +877,7 @@ IceInternal::Connection::flushBatchRequest()
_batchStream.swap(dummy);
assert(_batchStream.b.empty());
_batchRequestNum = 0;
- _batchFlushInProgress = false;
+ _batchStreamInUse = false;
notifyAll();
}
}
@@ -903,11 +888,11 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag)
try
{
IceUtil::Mutex::Lock sendSync(_sendMutex);
+
if(!_transceiver) // Has the transceiver already been closed?
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_exception.get());
- _exception->ice_throw();
+ _exception->ice_throw(); // The exception is immutable at this point.
}
bool compress;
@@ -1197,7 +1182,6 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
try
{
-
switch(messageType)
{
case requestMsg:
@@ -1484,7 +1468,7 @@ IceInternal::Connection::exception(const LocalException& ex)
string
IceInternal::Connection::toString() const
{
- return _desc;
+ return _desc; // No mutex lock, _desc is immutable.
}
bool
@@ -1526,8 +1510,8 @@ IceInternal::Connection::Connection(const InstancePtr& instance,
_requestsHint(_requests.end()),
_asyncRequestsHint(_asyncRequests.end()),
_batchStream(_instance.get()),
+ _batchStreamInUse(false),
_batchRequestNum(0),
- _batchFlushInProgress(false),
_dispatchCount(0),
_state(StateNotValidated),
_stateTime(IceUtil::Time::now())
@@ -1718,7 +1702,7 @@ IceInternal::Connection::setState(State state)
// close the transceiver.
//
IceUtil::Mutex::Lock sendSync(_sendMutex);
-
+
try
{
_transceiver->close();
@@ -1728,7 +1712,6 @@ IceInternal::Connection::setState(State state)
// Here we ignore any exceptions in close().
}
- assert(_transceiver);
_transceiver = 0;
//notifyAll(); // We notify already below.
}