diff options
author | Marc Laukien <marc@zeroc.com> | 2004-02-17 15:18:50 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2004-02-17 15:18:50 +0000 |
commit | 188cf85f56f900ee7eef607e3d4d4dcd271a59d3 (patch) | |
tree | d1cde4fff235f3e0269eeaa5a4d6026b2f3c4204 /cpp/src/Ice/Connection.cpp | |
parent | more ami (diff) | |
download | ice-188cf85f56f900ee7eef607e3d4d4dcd271a59d3.tar.bz2 ice-188cf85f56f900ee7eef607e3d4d4dcd271a59d3.tar.xz ice-188cf85f56f900ee7eef607e3d4d4dcd271a59d3.zip |
more ami
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 260 |
1 files changed, 122 insertions, 138 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 78e29294a75..7fc15aa0cd9 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -58,7 +58,6 @@ IceInternal::Connection::validate() if(_adapter) { IceUtil::Mutex::Lock sendSync(_sendMutex); - assert(_transceiver); // The transceiver cannot be closed already. // // Incoming connections play the active role with respect to @@ -290,8 +289,8 @@ IceInternal::Connection::waitUntilFinished() } // - // Now we must wait for connection closure. If there is a timeout, - // we force the connection closure. + // Now we must wait until close() has been called on the + // transceiver. // while(_transceiver) { @@ -321,8 +320,8 @@ IceInternal::Connection::waitUntilFinished() } // - // No return here, we must still wait until _transceiver - // becomes null. + // No return here, we must still wait until close() is + // called on the _transceiver. // } else @@ -337,8 +336,8 @@ IceInternal::Connection::waitUntilFinished() void IceInternal::Connection::monitor() { - IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this); + if(!sync.acquired()) { return; @@ -364,10 +363,7 @@ IceInternal::Connection::monitor() // // Active connection management for idle connections. // - // TODO: Hack: ACM for incoming connections doesn't work right - // with AMI. - // - if(_acmTimeout > 0 && closingOK() && !_adapter) + if(_acmTimeout > 0 && closingOK()) { if(IceUtil::Time::now() >= _acmAbsoluteTimeout) { @@ -421,14 +417,38 @@ IceInternal::Connection::sendRequest(BasicStream* os, Outgoing* out) assert(_state > StateNotValidated); assert(_state < StateClosing); - - requestId = _nextRequestId++; - if(requestId <= 0) + + // + // Only add to the request map if this is a twoway call. + // + if(out) { - _nextRequestId = 1; + // + // Create a new unique request ID. + // requestId = _nextRequestId++; + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + + // + // Fill in the request ID. + // + const Byte* p = reinterpret_cast<const Byte*>(&requestId); +#ifdef ICE_BIG_ENDIAN + reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); +#else + copy(p, p + sizeof(Int), os->b.begin() + headerSize); +#endif + + // + // Add to the requests map. + // + _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); } - + if(_acmTimeout > 0) { _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); @@ -445,19 +465,6 @@ IceInternal::Connection::sendRequest(BasicStream* os, Outgoing* out) _exception->ice_throw(); } - // - // Fill in the request ID. - // - if(!_endpoint->datagram() && out) - { - const Byte* p = reinterpret_cast<const Byte*>(&requestId); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#else - copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#endif - } - bool compress; if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes. { @@ -515,39 +522,42 @@ IceInternal::Connection::sendRequest(BasicStream* os, Outgoing* out) IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateClosed, ex); assert(_exception.get()); - _exception->ice_throw(); - } - // - // Only add to the request map if this was a twoway call, and if - // there was no exception above. - // - if(out) - { - auto_ptr<LocalException> exception; - + if(out) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - // - // If there is already an exception set, we call - // finished() directly (below, outside the thread - // synchronization), because it's possible that finished() - // has already been called on the request map. + // If the request has already been removed from the + // request map, we are out of luck. It would mean that + // finished() has been called already, and therefore the + // exception has been set using the Outgoing::finished() + // callback. In this case, we cannot throw the exception + // here, because we must not both raise an exception and + // have Outgoing::finished() called with an + // exception. This means that in some rare cases, a + // request will not be retried even though it could. But I + // honestly don't know how I could avoid this, without a + // very elaborate and complex design, which would be bad + // for performance. // - if(_exception.get()) - { - exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(_exception->ice_clone())); - } - else + map<Int, Outgoing*>::iterator p = _requests.find(requestId); + if(p != _requests.end()) { - _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); + if(p == _requestsHint) + { + _requests.erase(p++); + _requestsHint = p; + } + else + { + _requests.erase(p); + } + + _exception->ice_throw(); } } - - if(exception.get()) + else { - out->finished(*exception.get()); + _exception->ice_throw(); } } } @@ -570,6 +580,9 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt assert(_state > StateNotValidated); assert(_state < StateClosing); + // + // Create a new unique request ID. + // requestId = _nextRequestId++; if(requestId <= 0) { @@ -577,6 +590,22 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt requestId = _nextRequestId++; } + // + // Fill in the request ID. + // + const Byte* p = reinterpret_cast<const Byte*>(&requestId); +#ifdef ICE_BIG_ENDIAN + reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); +#else + copy(p, p + sizeof(Int), os->b.begin() + headerSize); +#endif + + // + // Add to the async requests map. + // + _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), + pair<const Int, OutgoingAsyncPtr>(requestId, out)); + if(_acmTimeout > 0) { _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); @@ -593,16 +622,6 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt _exception->ice_throw(); } - // - // Fill in the request ID. - // - const Byte* p = reinterpret_cast<const Byte*>(&requestId); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#else - copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#endif - bool compress; if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes. { @@ -640,7 +659,7 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt // No compression, just fill in the message size. // Int sz = static_cast<Int>(os->b.size()); - p = reinterpret_cast<const Byte*>(&sz); + const Byte* p = reinterpret_cast<const Byte*>(&sz); #ifdef ICE_BIG_ENDIAN reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); #else @@ -660,38 +679,37 @@ IceInternal::Connection::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPt IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateClosed, ex); assert(_exception.get()); - _exception->ice_throw(); - } - // - // Only add to the request map if there was no exception above. - // - auto_ptr<LocalException> exception; - - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - // - // If there is already an exception set, we call __finished() - // directly (below, outside the thread synchronization), - // because it's possible that __finished() has already been - // called on the request map. + // If the request has already been removed from the async + // request map, we are out of luck. It would mean that + // finished() has been called already, and therefore the + // exception has been set using the + // OutgoingAsync::__finished() callback. In this case, we + // cannot throw the exception here, because we must not both + // raise an exception and have OutgoingAsync::__finished() + // called with an exception. This means that in some rare + // cases, a request will not be retried even though it + // could. But I honestly don't know how I could avoid this, + // without a very elaborate and complex design, which would be + // bad for performance. // - if(_exception.get()) - { - exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(_exception->ice_clone())); - } - else + map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.find(requestId); + if(p != _asyncRequests.end()) { - _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), - pair<const Int, OutgoingAsyncPtr>(requestId, out)); + if(p == _asyncRequestsHint) + { + _asyncRequests.erase(p++); + _asyncRequestsHint = p; + } + else + { + _asyncRequests.erase(p); + } + + _exception->ice_throw(); } } - - if(exception.get()) - { - out->__finished(*exception.get()); - } } void @@ -866,7 +884,7 @@ IceInternal::Connection::flushBatchRequest() // No compression, just fill in the message size. // Int sz = static_cast<Int>(_batchStream.b.size()); - p = reinterpret_cast<const Byte*>(&sz); + const Byte* p = reinterpret_cast<const Byte*>(&sz); #ifdef ICE_BIG_ENDIAN reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + 10); #else @@ -1174,8 +1192,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa if(_warn) { Warning out(_logger); - out << "ignoring close connection message for datagram connection:\n" - << _transceiver->toString(); + out << "ignoring close connection message for datagram connection:\n" << _desc; } } else @@ -1347,8 +1364,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa if(_warn) { Warning out(_logger); - out << "ignoring unexpected validate connection message:\n" - << _transceiver->toString(); + out << "ignoring unexpected validate connection message:\n" << _desc; } break; } @@ -1440,7 +1456,7 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) { threadPool->promoteFollower(); - auto_ptr<LocalException> closeException; + auto_ptr<LocalException> exception; map<Int, Outgoing*> requests; map<Int, OutgoingAsyncPtr> asyncRequests; @@ -1466,13 +1482,11 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) } catch(const LocalException& ex) { - closeException = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone())); + exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone())); } assert(_transceiver); _transceiver = 0; - assert(_threadPool); - _threadPool = 0; notifyAll(); } @@ -1488,17 +1502,17 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p) { - p->second->finished(*_exception.get()); // Exception is immutable at this point. + p->second->finished(*_exception.get()); // The exception is immutable at this point. } for(map<Int, OutgoingAsyncPtr>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q) { - q->second->__finished(*_exception.get()); // Exception is immutable at this point. + q->second->__finished(*_exception.get()); // The exception is immutable at this point. } - if(closeException.get()) + if(exception.get()) { - closeException->ice_throw(); + exception->ice_throw(); } } @@ -1512,7 +1526,7 @@ IceInternal::Connection::exception(const LocalException& ex) string IceInternal::Connection::toString() const { - return _transceiverToString; + return _desc; } bool @@ -1539,7 +1553,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance, const ObjectAdapterPtr& adapter) : EventHandler(instance), _transceiver(transceiver), - _transceiverToString(transceiver->toString()), + _desc(transceiver->toString()), _endpoint(endpoint), _adapter(adapter), _logger(_instance->logger()), // Cached for better performance. @@ -1563,12 +1577,12 @@ IceInternal::Connection::Connection(const InstancePtr& instance, { if(_adapter) { - _threadPool = dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool(); + const_cast<ThreadPoolPtr&>(_threadPool) = dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool(); _servantManager = dynamic_cast<ObjectAdapterI*>(_adapter.get())->getServantManager(); } else { - _threadPool = _instance->clientThreadPool(); + const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool(); _servantManager = 0; } @@ -1613,7 +1627,6 @@ IceInternal::Connection::~Connection() { assert(_state == StateClosed); assert(!_transceiver); - assert(!_threadPool); assert(_dispatchCount == 0); assert(_proxyCount == 0); } @@ -1653,7 +1666,7 @@ IceInternal::Connection::setState(State state, const LocalException& ex) (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing))) { Warning out(_logger); - out << "connection exception:\n" << *_exception.get() << '\n' << _transceiver->toString(); + out << "connection exception:\n" << *_exception.get() << '\n' << _desc; } } } @@ -1664,30 +1677,6 @@ IceInternal::Connection::setState(State state, const LocalException& ex) // that is not yet marked as closed or closing. // setState(state); - - // - // We can't call __finished() on async requests here, because - // it's possible that the callback will trigger another call, - // which then might block on this connection. Calling - // finished() at this place works for non-async calls, but in - // order to keep the logic of sending exceptions to requests - // in one place, we don't do this here either. Instead, we - // move all the code into the finished() operation. - /* - for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p) - { - p->second->finished(*_exception.get()); - } - _requests.clear(); - _requestsHint = _requests.end(); - - for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) - { - q->second->__finished(*_exception.get()); - } - _asyncRequests.clear(); - _asyncRequestsHint = _asyncRequests.end(); - */ } void @@ -1785,8 +1774,6 @@ IceInternal::Connection::setState(State state) assert(_transceiver); _transceiver = 0; - assert(_threadPool); - _threadPool = 0; //notifyAll(); // We notify already below. } else @@ -1825,7 +1812,6 @@ IceInternal::Connection::initiateShutdown() const if(!_endpoint->datagram()) { IceUtil::Mutex::Lock sendSync(_sendMutex); - assert(_transceiver); // The transceiver cannot be closed already. // // Before we shut down, we send a close connection message. @@ -1855,7 +1841,6 @@ IceInternal::Connection::registerWithPool() { if(!_registeredWithPool) { - assert(_threadPool); _threadPool->_register(_transceiver->fd(), this); _registeredWithPool = true; @@ -1872,7 +1857,6 @@ IceInternal::Connection::unregisterWithPool() { if(_registeredWithPool) { - assert(_threadPool); _threadPool->unregister(_transceiver->fd()); _registeredWithPool = false; |