diff options
author | Dwayne Boone <dwayne@zeroc.com> | 2014-09-05 10:42:18 -0230 |
---|---|---|
committer | Dwayne Boone <dwayne@zeroc.com> | 2014-09-05 10:42:18 -0230 |
commit | 9786853ab2d88598021aaec5c0409d3a45a50a13 (patch) | |
tree | d64858749513c529fdb84a98d8637d19f2c125e4 /cpp/src | |
parent | Minor change to JS print stack traces (diff) | |
download | ice-9786853ab2d88598021aaec5c0409d3a45a50a13.tar.bz2 ice-9786853ab2d88598021aaec5c0409d3a45a50a13.tar.xz ice-9786853ab2d88598021aaec5c0409d3a45a50a13.zip |
ICE-4891 Refactor network tracing
Diffstat (limited to 'cpp/src')
44 files changed, 1220 insertions, 1335 deletions
diff --git a/cpp/src/Ice/Acceptor.h b/cpp/src/Ice/Acceptor.h index ab3aefe1c8f..0b7342ad88f 100644 --- a/cpp/src/Ice/Acceptor.h +++ b/cpp/src/Ice/Acceptor.h @@ -13,6 +13,7 @@ #include <IceUtil/Shared.h> #include <Ice/AcceptorF.h> #include <Ice/TransceiverF.h> +#include <Ice/EndpointIF.h> #include <Ice/Network.h> namespace IceInternal @@ -24,7 +25,7 @@ public: virtual NativeInfoPtr getNativeInfo() = 0; virtual void close() = 0; - virtual void listen() = 0; + virtual EndpointIPtr listen(const EndpointIPtr& endp) = 0; #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) virtual void startAccept() = 0; virtual void finishAccept() = 0; @@ -32,6 +33,7 @@ public: virtual TransceiverPtr accept() = 0; virtual std::string protocol() const = 0; virtual std::string toString() const = 0; + virtual std::string toDetailedString() const = 0; }; } diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 044e4082bc2..23ae8d63429 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -63,11 +63,11 @@ remove(multimap<K, V>& m, K k, V v) } template <typename K, typename V> ::IceInternal::Handle<V> -find(const multimap<K,::IceInternal::Handle<V> >& m, - K k, +find(const multimap<K,::IceInternal::Handle<V> >& m, + K k, const ::IceUtilInternal::ConstMemFun<bool, V, ::IceInternal::Handle<V> >& predicate) { - pair<typename multimap<K, ::IceInternal::Handle<V> >::const_iterator, + pair<typename multimap<K, ::IceInternal::Handle<V> >::const_iterator, typename multimap<K, ::IceInternal::Handle<V> >::const_iterator> pr = m.equal_range(k); for(typename multimap<K, ::IceInternal::Handle<V> >::const_iterator q = pr.first; q != pr.second; ++q) { @@ -96,7 +96,7 @@ IceInternal::OutgoingConnectionFactory::destroy() { return; } - + for_each(_connections.begin(), _connections.end(), bind2nd(Ice::secondVoidMemFun1<const ConnectorPtr, ConnectionI, ConnectionI::DestructionReason> (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed)); @@ -122,7 +122,7 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - + // // First we wait until the factory is destroyed. We also wait // until there are no pending connections anymore. Only then @@ -161,7 +161,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt Ice::EndpointSelectionType selType, bool& compress) { assert(!endpts.empty()); - + // // Apply the overrides. // @@ -169,7 +169,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // // Try to find a connection to one of the given endpoints. - // + // Ice::ConnectionIPtr connection = findConnection(endpoints, compress); if(connection) { @@ -210,7 +210,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt assert(exception.get()); exception->ice_throw(); } - + // // Try to get a connection to one of the connectors. A null result indicates that no // connection was found and that we should try to establish the connection (and that @@ -243,6 +243,12 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt try { + if(_instance->traceLevels()->network >= 2) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "trying to establish " << q->endpoint->protocol() << " connection to " + << q->connector->toString(); + } connection = createConnection(q->connector->connect(), *q); connection->start(0); @@ -277,6 +283,13 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt } catch(const Ice::LocalException& ex) { + if(_instance->traceLevels()->network >= 2) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "failed to establish " << q->endpoint->protocol() << " connection to " + << q->connector->toString() << "\n" << ex; + } + if(observer) { observer->failed(ex.ice_name()); @@ -312,7 +325,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt void IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore, - Ice::EndpointSelectionType selType, + Ice::EndpointSelectionType selType, const CreateConnectionCallbackPtr& callback) { assert(!endpts.empty()); @@ -324,7 +337,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // // Try to find a connection to one of the given endpoints. - // + // try { bool compress; @@ -340,8 +353,8 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt callback->setException(ex); return; } - - ConnectCallbackPtr cb = new ConnectCallback(this, endpoints, hasMore, callback, selType); + + ConnectCallbackPtr cb = new ConnectCallback(_instance, this, endpoints, hasMore, callback, selType); cb->getConnectors(); } @@ -388,7 +401,7 @@ IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& route // endpoint = endpoint->compress(false); - for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator q = _connections.begin(); + for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator q = _connections.begin(); q != _connections.end(); ++q) { if(q->second->endpoint() == endpoint) @@ -403,12 +416,12 @@ void IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& adapter) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - + if(_destroyed) { return; } - + for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p) { if(p->second->getAdapter() == adapter) @@ -515,7 +528,7 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr return 0; } -ConnectionIPtr +ConnectionIPtr IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInfo>& connectors, bool& compress) { // This must be called with the mutex locked. @@ -542,7 +555,7 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInf return connection; } } - + return 0; } @@ -550,7 +563,7 @@ void IceInternal::OutgoingConnectionFactory::incPendingConnectCount() { // - // Keep track of the number of pending connects. The outgoing connection factory + // Keep track of the number of pending connects. The outgoing connection factory // waitUntilFinished() method waits for all the pending connects to terminate before // to return. This ensures that the communicator client thread pool isn't destroyed // too soon and will still be available to execute the ice_exception() callbacks for @@ -577,9 +590,9 @@ IceInternal::OutgoingConnectionFactory::decPendingConnectCount() } } -ConnectionIPtr -IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo>& connectors, - const ConnectCallbackPtr& cb, +ConnectionIPtr +IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo>& connectors, + const ConnectCallbackPtr& cb, bool& compress) { { @@ -588,7 +601,7 @@ IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo { throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); } - + // // Reap closed connections // @@ -623,15 +636,15 @@ IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo } // - // Determine whether another thread/request is currently attempting to connect to + // Determine whether another thread/request is currently attempting to connect to // one of our endpoints; if so we wait until it's done. // if(addToPending(cb, connectors)) { // - // If a callback is not specified we wait until another thread notifies us about a - // change to the pending list. Otherwise, if a callback is provided we're done: - // when the pending list changes the callback will be notified and will try to + // If a callback is not specified we wait until another thread notifies us about a + // change to the pending list. Otherwise, if a callback is provided we're done: + // when the pending list changes the callback will be notified and will try to // get the connection again. // if(!cb) @@ -656,7 +669,7 @@ IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo } // - // At this point, we're responsible for establishing the connection to one of + // At this point, we're responsible for establishing the connection to one of // the given connectors. If it's a non-blocking connect, calling nextConnector // will start the connection establishment. Otherwise, we return null to get // the caller to establish the connection. @@ -706,7 +719,7 @@ IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& t _connections.insert(pair<const ConnectorPtr, ConnectionIPtr>(ci.connector, connection)); _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint(), connection)); - _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint()->compress(true), + _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint()->compress(true), connection)); return connection; } @@ -778,7 +791,7 @@ IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<Connect (*p)->setConnection(connection, compress); } } - + void IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors, const Ice::LocalException& ex, @@ -812,7 +825,7 @@ IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<Connect _pending.erase(q); } } - + for(set<ConnectCallbackPtr>::iterator r = callbacks.begin(); r != callbacks.end(); ++r) { assert(failedCallbacks.find(*r) == failedCallbacks.end()); @@ -820,7 +833,7 @@ IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<Connect } notifyAll(); } - + for(set<ConnectCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p) { (*p)->getConnection(); @@ -832,7 +845,7 @@ IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<Connect } bool -IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& cb, +IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& cb, const vector<ConnectorInfo>& connectors) { // @@ -847,7 +860,7 @@ IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& c found = true; if(cb) { - q->second.insert(cb); + q->second.insert(cb); } } } @@ -858,8 +871,8 @@ IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& c } // - // If there's no pending connection for the given connectors, we're - // responsible for its establishment. We add empty pending lists, + // If there's no pending connection for the given connectors, we're + // responsible for its establishment. We add empty pending lists, // other callbacks to the same connectors will be queued. // for(vector<ConnectorInfo>::const_iterator r = connectors.begin(); r != connectors.end(); ++r) @@ -873,7 +886,7 @@ IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& c } void -IceInternal::OutgoingConnectionFactory::removeFromPending(const ConnectCallbackPtr& cb, +IceInternal::OutgoingConnectionFactory::removeFromPending(const ConnectCallbackPtr& cb, const vector<ConnectorInfo>& connectors) { for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) @@ -942,11 +955,13 @@ IceInternal::OutgoingConnectionFactory::handleConnectionException(const LocalExc } } -IceInternal::OutgoingConnectionFactory::ConnectCallback::ConnectCallback(const OutgoingConnectionFactoryPtr& factory, +IceInternal::OutgoingConnectionFactory::ConnectCallback::ConnectCallback(const InstancePtr& instance, + const OutgoingConnectionFactoryPtr& factory, const vector<EndpointIPtr>& endpoints, bool hasMore, const CreateConnectionCallbackPtr& cb, Ice::EndpointSelectionType selType) : + _instance(instance), _factory(factory), _endpoints(endpoints), _hasMore(hasMore), @@ -966,13 +981,13 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartComplete { _observer->detach(); } - + connection->activate(); _factory->finishGetConnection(_connectors, *_iter, connection, this); } void -IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(const ConnectionIPtr& /*connection*/, +IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(const ConnectionIPtr& /*connection*/, const LocalException& ex) { assert(_iter != _connectors.end()); @@ -982,7 +997,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(c _observer->failed(ex.ice_name()); _observer->detach(); } - + _factory->handleConnectionException(ex, _hasMore || _iter != _connectors.end() - 1); if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) // No need to continue. { @@ -1091,7 +1106,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection() try { // - // If all the connectors have been created, we ask the factory to get a + // If all the connectors have been created, we ask the factory to get a // connection. // bool compress; @@ -1101,12 +1116,12 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection() // // A null return value from getConnection indicates that the connection // is being established and that everthing has been done to ensure that - // the callback will be notified when the connection establishment is + // the callback will be notified when the connection establishment is // done or that the callback already obtain the connection. - // + // return; } - + _callback->setConnection(connection, compress); _factory->decPendingConnectCount(); // Must be called last. } @@ -1135,17 +1150,30 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector() } assert(_iter != _connectors.end()); + + if(_instance->traceLevels()->network >= 2) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "trying to establish " << _iter->endpoint->protocol() << " connection to " + << _iter->connector->toString(); + } connection = _factory->createConnection(_iter->connector->connect(), *_iter); connection->start(this); } catch(const Ice::LocalException& ex) { + if(_instance->traceLevels()->network >= 2) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "failed to establish " << _iter->endpoint->protocol() << " connection to " + << _iter->connector->toString() << "\n" << ex; + } connectionStartFailed(connection, ex); } } void -IceInternal::OutgoingConnectionFactory::ConnectCallback::setConnection(const Ice::ConnectionIPtr& connection, +IceInternal::OutgoingConnectionFactory::ConnectCallback::setConnection(const Ice::ConnectionIPtr& connection, bool compress) { // @@ -1177,7 +1205,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::removeConnectors(const { // // Callback from the factory: connecting to the given connectors - // failed, we remove the connectors and return true if there's + // failed, we remove the connectors and return true if there's // no more connectors left to try. // for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) @@ -1234,7 +1262,7 @@ IceInternal::IncomingConnectionFactory::waitUntilHolding() const { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - + // // First we wait until the connection factory itself is in holding // state. @@ -1263,7 +1291,7 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished() set<ConnectionIPtr> connections; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - + // // First we wait until the factory is destroyed. If we are using // an acceptor, we also wait for it to be closed. @@ -1383,7 +1411,7 @@ IceInternal::IncomingConnectionFactory::finishAsync(SocketOperation) Error out(_instance->initializationData().logger); out << "couldn't accept connection:\n" << ex << '\n' << _acceptor->toString(); return false; - } + } return _state < StateClosed; } #endif @@ -1413,7 +1441,7 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current) IceUtil::ThreadControl::yield(); return; } - + // // Reap closed connections // @@ -1431,6 +1459,12 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current) try { transceiver = _acceptor->accept(); + + if(_instance->traceLevels()->network >= 2) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "trying to accept " << _endpoint->protocol() << " connection\n" << transceiver->toString(); + } } catch(const SocketException& ex) { @@ -1507,7 +1541,7 @@ IceInternal::IncomingConnectionFactory::toString() const { return _transceiver->toString(); } - + assert(_acceptor); return _acceptor->toString(); } @@ -1588,19 +1622,40 @@ IceInternal::IncomingConnectionFactory::initialize(const string& oaName) try { - const_cast<TransceiverPtr&>(_transceiver) = _endpoint->transceiver(const_cast<EndpointIPtr&>(_endpoint)); + const_cast<TransceiverPtr&>(_transceiver) = _endpoint->transceiver(); if(_transceiver) { + if(_instance->traceLevels()->network >= 2) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "attempting to bind to " << _endpoint->protocol() << " socket\n" << _transceiver->toString(); + } + const_cast<EndpointIPtr&>(_endpoint) = _transceiver->bind(_endpoint); + ConnectionIPtr connection = new ConnectionI(_adapter->getCommunicator(), _instance, 0, _transceiver, 0, _endpoint, _adapter); - connection->start(0); + connection->start(0); _connections.insert(connection); } else { - const_cast<AcceptorPtr&>(_acceptor) = _endpoint->acceptor(const_cast<EndpointIPtr&>(_endpoint), oaName); + const_cast<AcceptorPtr&>(_acceptor) = _endpoint->acceptor(oaName); assert(_acceptor); - _acceptor->listen(); + + if(_instance->traceLevels()->network >= 2) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "attempting to bind to " << _endpoint->protocol() << " socket " << _acceptor->toString(); + } + + const_cast<EndpointIPtr&>(_endpoint) = _acceptor->listen(_endpoint); + + if(_instance->traceLevels()->network >= 1) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "listening for " << _endpoint->protocol() << " connections\n" << _acceptor->toDetailedString(); + } + dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->initialize(this); } } @@ -1623,14 +1678,14 @@ IceInternal::IncomingConnectionFactory::initialize(const string& oaName) { try { - _acceptor->close(); + closeAcceptor(); } catch(const Ice::LocalException&) { // Ignore } } - + _state = StateFinished; _monitor->destroy(); _connections.clear(); @@ -1666,13 +1721,13 @@ IceInternal::IncomingConnectionFactory::setState(State state) { Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); out << "accepting " << _endpoint->protocol() << " connections at " << _acceptor->toString(); - } + } dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->_register(this, SocketOperationRead); } for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::activate)); break; } - + case StateHolding: { if(_state != StateActive) // Can only switch from active to holding. @@ -1685,13 +1740,13 @@ IceInternal::IncomingConnectionFactory::setState(State state) { Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); out << "holding " << _endpoint->protocol() << " connections at " << _acceptor->toString(); - } + } dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->unregister(this, SocketOperationRead); } for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::hold)); break; } - + case StateClosed: { if(_acceptor) @@ -1705,15 +1760,15 @@ IceInternal::IncomingConnectionFactory::setState(State state) #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) // - // With IOCP and WinRT, we close the acceptor now to cancel all the pending - // asynchronous operations. It's important to wait for the pending asynchronous - // operations to return before ConnectionI::finished(). Otherwise, if there was - // a pending message waiting to be sent, the connection wouldn't know whether + // With IOCP and WinRT, we close the acceptor now to cancel all the pending + // asynchronous operations. It's important to wait for the pending asynchronous + // operations to return before ConnectionI::finished(). Otherwise, if there was + // a pending message waiting to be sent, the connection wouldn't know whether // or not the send failed or succeeded, potentially breaking at-most-once semantics. // if(_acceptor) { - _acceptor->close(); + closeAcceptor(); } #endif for_each(_connections.begin(), _connections.end(), @@ -1727,7 +1782,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) if(_acceptor) { - _acceptor->close(); + closeAcceptor(); } #endif break; @@ -1738,3 +1793,15 @@ IceInternal::IncomingConnectionFactory::setState(State state) notifyAll(); } +void +IceInternal::IncomingConnectionFactory::closeAcceptor() +{ + if(_instance->traceLevels()->network >= 1) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "stopping to accept " << _endpoint->protocol() << " connections at " << _acceptor->toString(); + } + + _acceptor->close(); +} + diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h index 037c21ff2c4..8f318914600 100644 --- a/cpp/src/Ice/ConnectionFactory.h +++ b/cpp/src/Ice/ConnectionFactory.h @@ -49,11 +49,11 @@ public: class CreateConnectionCallback : virtual public IceUtil::Shared { public: - + virtual void setConnection(const Ice::ConnectionIPtr&, bool) = 0; virtual void setException(const Ice::LocalException&) = 0; }; - typedef IceUtil::Handle<CreateConnectionCallback> CreateConnectionCallbackPtr; + typedef IceUtil::Handle<CreateConnectionCallback> CreateConnectionCallbackPtr; void destroy(); @@ -62,7 +62,7 @@ public: void waitUntilFinished(); Ice::ConnectionIPtr create(const std::vector<EndpointIPtr>&, bool, Ice::EndpointSelectionType, bool&); - void create(const std::vector<EndpointIPtr>&, bool, Ice::EndpointSelectionType, + void create(const std::vector<EndpointIPtr>&, bool, Ice::EndpointSelectionType, const CreateConnectionCallbackPtr&); void setRouterInfo(const RouterInfoPtr&); void removeAdapter(const Ice::ObjectAdapterPtr&); @@ -79,9 +79,9 @@ private: ConnectorInfo(const ConnectorPtr& c, const EndpointIPtr& e) : connector(c), endpoint(e) { } - + bool operator==(const ConnectorInfo& other) const; - + ConnectorPtr connector; EndpointIPtr endpoint; }; @@ -90,7 +90,7 @@ private: { public: - ConnectCallback(const OutgoingConnectionFactoryPtr&, const std::vector<EndpointIPtr>&, bool, + ConnectCallback(const InstancePtr&, const OutgoingConnectionFactoryPtr&, const std::vector<EndpointIPtr>&, bool, const CreateConnectionCallbackPtr&, Ice::EndpointSelectionType); virtual void connectionStartCompleted(const Ice::ConnectionIPtr&); @@ -116,6 +116,7 @@ private: private: + const InstancePtr _instance; const OutgoingConnectionFactoryPtr _factory; const std::vector<EndpointIPtr> _endpoints; const bool _hasMore; @@ -134,10 +135,10 @@ private: void incPendingConnectCount(); void decPendingConnectCount(); Ice::ConnectionIPtr getConnection(const std::vector<ConnectorInfo>&, const ConnectCallbackPtr&, bool&); - void finishGetConnection(const std::vector<ConnectorInfo>&, const ConnectorInfo&, const Ice::ConnectionIPtr&, + void finishGetConnection(const std::vector<ConnectorInfo>&, const ConnectorInfo&, const Ice::ConnectionIPtr&, const ConnectCallbackPtr&); void finishGetConnection(const std::vector<ConnectorInfo>&, const Ice::LocalException&, const ConnectCallbackPtr&); - + bool addToPending(const ConnectCallbackPtr&, const std::vector<ConnectorInfo>&); void removeFromPending(const ConnectCallbackPtr&, const std::vector<ConnectorInfo>&); @@ -159,10 +160,10 @@ private: int _pendingConnectCount; }; -class IncomingConnectionFactory : public EventHandler, +class IncomingConnectionFactory : public EventHandler, public Ice::ConnectionI::StartCallback, public IceUtil::Monitor<IceUtil::Mutex> - + { public: @@ -178,7 +179,7 @@ public: EndpointIPtr endpoint() const; std::list<Ice::ConnectionIPtr> connections() const; void flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr&); - + // // Operations from EventHandler // @@ -212,6 +213,7 @@ private: }; void setState(State); + void closeAcceptor(); const InstancePtr _instance; const FactoryACMMonitorPtr _monitor; diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index fcf05ef9db0..fad11c7cd0f 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -13,6 +13,7 @@ #include <Ice/LoggerUtil.h> #include <Ice/Properties.h> #include <Ice/TraceUtil.h> +#include <Ice/TraceLevels.h> #include <Ice/DefaultsAndOverrides.h> #include <Ice/Transceiver.h> #include <Ice/ThreadPool.h> @@ -56,7 +57,7 @@ public: { _connection->timedOut(); } - + private: Ice::ConnectionI* _connection; @@ -66,9 +67,9 @@ class DispatchCall : public DispatchWorkItem { public: - DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, - const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId, - Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, + DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, + const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId, + Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream) : DispatchWorkItem(connection), @@ -90,7 +91,7 @@ public: virtual void run() { - _connection->dispatch(_startCB, _sentCBs, _compress, _requestId, _invokeNum, _servantManager, _adapter, + _connection->dispatch(_startCB, _sentCBs, _compress, _requestId, _invokeNum, _servantManager, _adapter, _outAsync, _heartbeatCallback, _stream); } @@ -156,7 +157,7 @@ Ice::ConnectionI::Observer::startRead(const Buffer& buf) _readStreamPos = buf.b.empty() ? 0 : buf.i; } -void +void Ice::ConnectionI::Observer::finishRead(const Buffer& buf) { if(_readStreamPos == 0) @@ -179,7 +180,7 @@ Ice::ConnectionI::Observer::startWrite(const Buffer& buf) _writeStreamPos = buf.b.empty() ? 0 : buf.i; } -void +void Ice::ConnectionI::Observer::finishWrite(const Buffer& buf) { if(_writeStreamPos == 0) @@ -264,7 +265,7 @@ Ice::ConnectionI::OutgoingMessage::sent() delete stream; } stream = 0; - + if(out) { out->sent(); @@ -531,7 +532,7 @@ Ice::ConnectionI::updateObserver() assert(_instance->getObserver()); _observer.attach(_instance->getObserver()->getConnectionObserver(initConnectionInfo(), - _endpoint, + _endpoint, toConnectionState(_state), _observer.get())); } @@ -550,8 +551,8 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) { // // If writing or reading, nothing to do, the connection - // timeout will kick-in if writes or reads don't progress. - // This check is necessary because the actitivy timer is + // timeout will kick-in if writes or reads don't progress. + // This check is necessary because the actitivy timer is // only set when a message is fully read/written. // return; @@ -570,7 +571,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) // called every (timeout / 2) period. // - if(acm.heartbeat == HeartbeatAlways || + if(acm.heartbeat == HeartbeatAlways || (acm.heartbeat != HeartbeatOff && now >= (_acmLastActivity + acm.timeout / 4))) { if(acm.heartbeat != HeartbeatOnInvocation || _dispatchCount > 0) @@ -578,10 +579,10 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) heartbeat(); } } - + if(acm.close != CloseOff && now >= (_acmLastActivity + acm.timeout)) { - if(acm.close == CloseOnIdleForceful || + if(acm.close == CloseOnIdleForceful || (acm.close != CloseOnIdle && (!_requests.empty() || !_asyncRequests.empty()))) { // @@ -650,7 +651,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) #endif } - out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, + out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, static_cast<Int>(os->b.size() - headerSize - 4)); // @@ -730,7 +731,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b #endif } - out->__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, + out->__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, static_cast<Int>(os->b.size() - headerSize - 4)); AsyncStatus status = AsyncStatusQueued; @@ -773,8 +774,8 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os) if(_exception.get()) { // - // If there were no batch requests queued when the connection failed, we can safely - // retry with a new connection. Otherwise, we must throw to notify the caller that + // If there were no batch requests queued when the connection failed, we can safely + // retry with a new connection. Otherwise, we must throw to notify the caller that // some previous batch requests were not sent. // if(_batchStream.b.empty()) @@ -984,23 +985,23 @@ Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchR } #ifdef ICE_CPP11 -AsyncResultPtr +AsyncResultPtr Ice::ConnectionI::begin_flushBatchRequests( - const IceInternal::Function<void (const Exception&)>& exception, + const IceInternal::Function<void (const Exception&)>& exception, const IceInternal::Function<void (bool)>& sent) { class Cpp11CB : public IceInternal::Cpp11FnCallbackNC { public: - + Cpp11CB(const IceInternal::Function<void (const Exception&)>& excb, const IceInternal::Function<void (bool)>& sentcb) : IceInternal::Cpp11FnCallbackNC(excb, sentcb) { CallbackBase::checkCallback(true, excb != nullptr); } - + virtual void completed(const AsyncResultPtr& __result) const { @@ -1017,7 +1018,7 @@ Ice::ConnectionI::begin_flushBatchRequests( } } }; - + return __begin_flushBatchRequests(new Cpp11CB(exception, sent), 0); } #endif @@ -1075,7 +1076,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #endif - out->attachRemoteObserver(initConnectionInfo(), _endpoint, + out->attachRemoteObserver(initConnectionInfo(), _endpoint, static_cast<Int>(_batchStream.b.size() - headerSize - 4)); _batchStream.swap(*out->os()); @@ -1173,7 +1174,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) return status; } -void +void Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1184,9 +1185,9 @@ Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback) _callback = callback; } -void -Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout, - const IceUtil::Optional<Ice::ACMClose>& close, +void +Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout, + const IceUtil::Optional<Ice::ACMClose>& close, const IceUtil::Optional<Ice::ACMHeartbeat>& heartbeat) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1201,7 +1202,7 @@ Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout, if(_monitor->getACM().timeout <= 0) { _acmLastActivity = IceUtil::Time(); // Disable the recording of last activity. - } + } else if(_acmLastActivity == IceUtil::Time() && _state == StateActive) { _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); @@ -1225,7 +1226,7 @@ Ice::ConnectionI::getACM() return _monitor ? _monitor->getACM() : acm; } -void +void Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1247,10 +1248,10 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) } // - // If the request is being sent, don't remove it from the send streams, + // If the request is being sent, don't remove it from the send streams, // it will be removed once the sending is finished. // - if(o == _sendStreams.begin()) + if(o == _sendStreams.begin()) { o->timedOut(true); // true = adopt the stream. } @@ -1293,18 +1294,18 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) } } -void +void Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - + for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { if(o->outAsync.get() == outAsync.get()) { if(o->requestId) { - if(_asyncRequestsHint != _asyncRequests.end() && + if(_asyncRequestsHint != _asyncRequests.end() && _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync)) { _asyncRequests.erase(_asyncRequestsHint); @@ -1315,12 +1316,12 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou _asyncRequests.erase(o->requestId); } } - + // - // If the request is being sent, don't remove it from the send streams, + // If the request is being sent, don't remove it from the send streams, // it will be removed once the sending is finished. // - if(o == _sendStreams.begin()) + if(o == _sendStreams.begin()) { o->timedOut(true); // true = adopt the stream } @@ -1347,7 +1348,7 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou return; // We're done } } - + for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) { if(p->second.get() == o.get()) @@ -1366,7 +1367,7 @@ Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_state > StateNotValidated); - + try { if(--_dispatchCount == 0) @@ -1377,21 +1378,21 @@ Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag) } notifyAll(); } - + if(_state >= StateClosed) { assert(_exception.get()); _exception->ice_throw(); } - + OutgoingMessage message(os, compressFlag > 0); sendMessage(message); - + if(_state == StateClosing && _dispatchCount == 0) { initiateShutdown(); } - + return; } catch(const LocalException& ex) @@ -1405,7 +1406,7 @@ Ice::ConnectionI::sendNoResponse() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_state > StateNotValidated); - + try { if(--_dispatchCount == 0) @@ -1416,13 +1417,13 @@ Ice::ConnectionI::sendNoResponse() } notifyAll(); } - + if(_state >= StateClosed) { assert(_exception.get()); _exception->ice_throw(); } - + if(_state == StateClosing && _dispatchCount == 0) { initiateShutdown(); @@ -1518,13 +1519,13 @@ Ice::ConnectionI::startAsync(SocketOperation operation) try { - if(operation & SocketOperationWrite) + if(operation & SocketOperationWrite) { if(_observer) { _observer.startWrite(_writeStream); } - + if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty()) { // The whole message is written, assume it's sent now for at-most-once semantics. @@ -1539,7 +1540,7 @@ Ice::ConnectionI::startAsync(SocketOperation operation) { _observer.startRead(_readStream); } - + _transceiver->startRead(_readStream); } else @@ -1563,7 +1564,19 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { + Buffer::Container::iterator start = _writeStream.i; _transceiver->finishWrite(_writeStream); + if(_instance->traceLevels()->network >= 3 && _writeStream.i != start) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "sent " << (_writeStream.i - start); + if(!_endpoint->datagram()) + { + out << " of " << (_writeStream.b.end() - start); + } + out << " bytes via " << _endpoint->protocol() << "\n" << toString(); + } + if(_observer) { _observer.finishWrite(_writeStream); @@ -1573,7 +1586,23 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) { if(!_hasMoreData) { + Buffer::Container::iterator start = _readStream.i; _transceiver->finishRead(_readStream, _hasMoreData); + if(_instance->traceLevels()->network >= 3 && _readStream.i != start) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "received "; + if(_endpoint->datagram()) + { + out << _readStream.b.size(); + } + else + { + out << (_readStream.i - start) << " of " << (_readStream.b.end() - start); + } + out << " bytes via " << _endpoint->protocol() << "\n" << toString(); + } + if(_observer && !_readHeader) { _observer.finishRead(_readStream); @@ -1632,7 +1661,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) { _observer.startWrite(_writeStream); } - writeOp = _transceiver->write(_writeStream); + writeOp = write(_writeStream); if(_observer && !(writeOp & SocketOperationWrite)) { _observer.finishWrite(_writeStream); @@ -1646,7 +1675,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) _observer.startRead(_readStream); } - readOp = _transceiver->read(_readStream, _hasMoreData); + readOp = read(_readStream); if(readOp & SocketOperationRead) { break; @@ -1665,7 +1694,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) { _observer->receivedBytes(static_cast<int>(headerSize)); } - + ptrdiff_t pos = _readStream.i - _readStream.b.begin(); if(pos < headerSize) { @@ -1674,7 +1703,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // throw IllegalMessageSizeException(__FILE__, __LINE__); } - + _readStream.i = _readStream.b.begin(); const Byte* m; _readStream.readBlob(m, static_cast<Int>(sizeof(magic))); @@ -1690,7 +1719,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) EncodingVersion ev; _readStream.read(ev); checkSupportedProtocolEncoding(ev); - + Byte messageType; _readStream.read(messageType); Byte compress; @@ -1711,7 +1740,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } _readStream.i = _readStream.b.begin() + pos; } - + if(_readStream.i != _readStream.b.end()) { if(_endpoint->datagram()) @@ -1772,15 +1801,15 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // // We parse messages first, if we receive a close // connection message we won't send more messages. - // + // if(readyOp & SocketOperationRead) { newOp = static_cast<SocketOperation>(newOp | parseMessage(current.stream, - invokeNum, - requestId, - compress, - servantManager, - adapter, + invokeNum, + requestId, + compress, + servantManager, + adapter, outAsync, heartbeatCallback, dispatchCount)); @@ -1867,15 +1896,15 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) else { _threadPool->dispatchFromThisThread(new DispatchCall(this, startCB, sentCBs, compress, requestId, invokeNum, - servantManager, adapter, outAsync, heartbeatCallback, + servantManager, adapter, outAsync, heartbeatCallback, current.stream)); } } void ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMessage>& sentCBs, - Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, - const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, + Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, + const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream) { int dispatchedCount = 0; @@ -1954,7 +1983,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess if(invokeNum) { invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter); - + // // Don't increase count, the dispatch count is // decreased when the incoming reply is sent. @@ -2007,7 +2036,7 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current) // // If there are no callbacks to call, we don't call ioCompleted() since we're not going - // to call code that will potentially block (this avoids promoting a new leader and + // to call code that will potentially block (this avoids promoting a new leader and // unecessary thread creation, especially if this is called on shutdown). // if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_callback) @@ -2030,6 +2059,24 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current) void Ice::ConnectionI::finish() { + if(!_initialized) + { + if(_instance->traceLevels()->network >= 2) + { + string verb = _connector ? "establish" : "accept"; + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString() << "\n" << *_exception.get(); + } + } + else + { + if(_instance->traceLevels()->network >= 1) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "closed " << _endpoint->protocol() << " connection\n" << toString(); + } + } + if(_startCallback) { _startCallback->connectionStartFailed(this, *_exception.get()); @@ -2041,16 +2088,16 @@ Ice::ConnectionI::finish() if(!_writeStream.b.empty()) { // - // Return the stream to the outgoing call. This is important for + // Return the stream to the outgoing call. This is important for // retriable AMI calls which are not marshalled again. // OutgoingMessage* message = &_sendStreams.front(); _writeStream.swap(*message->stream); - + #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) // // The current message might be sent but not yet removed from _sendStreams. If - // the response has been received in the meantime, we remove the message from + // the response has been received in the meantime, we remove the message from // _sendStreams to not call finished on a message which is already done. // if(message->isSent || message->receivedReply) @@ -2128,6 +2175,7 @@ Ice::ConnectionI::finish() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateFinished); + if(_dispatchCount == 0) { reap(); @@ -2264,6 +2312,7 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _dispatchCount(0), _state(StateNotInitialized), _shutdownInitiated(false), + _initialized(false), _validated(false) { int& compressionLevel = const_cast<int&>(_compressionLevel); @@ -2343,7 +2392,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex) assert(_state != StateClosed); _exception.reset(ex.ice_clone()); - + // // We don't warn if we are not validated. // @@ -2522,7 +2571,7 @@ Ice::ConnectionI::setState(State state) if(oldState != newState) { _observer.attach(_instance->getObserver()->getConnectionObserver(initConnectionInfo(), - _endpoint, + _endpoint, newState, _observer.get())); } @@ -2561,11 +2610,11 @@ Ice::ConnectionI::initiateShutdown() { assert(_state == StateClosing); assert(_dispatchCount == 0); - + if(_shutdownInitiated) { return; - } + } _shutdownInitiated = true; if(!_endpoint->datagram()) @@ -2591,7 +2640,7 @@ Ice::ConnectionI::initiateShutdown() // // Notify the the transceiver of the graceful connection closure. - // + // SocketOperation op = _transceiver->closing(true, *_exception.get()); if(op) { @@ -2648,8 +2697,8 @@ Ice::ConnectionI::initialize(SocketOperation operation) // Update the connection description once the transceiver is initialized. // const_cast<string&>(_desc) = _transceiver->toString(); + _initialized = true; setState(StateNotValidated); - return true; } @@ -2682,7 +2731,7 @@ Ice::ConnectionI::validate(SocketOperation operation) if(_writeStream.i != _writeStream.b.end()) { - SocketOperation op = _transceiver->write(_writeStream); + SocketOperation op = write(_writeStream); if(op) { scheduleTimeout(op); @@ -2711,7 +2760,7 @@ Ice::ConnectionI::validate(SocketOperation operation) if(_readStream.i != _readStream.b.end()) { - SocketOperation op = _transceiver->read(_readStream, _hasMoreData); + SocketOperation op = read(_readStream); if(op) { scheduleTimeout(op); @@ -2771,6 +2820,21 @@ Ice::ConnectionI::validate(SocketOperation operation) _readStream.i = _readStream.b.begin(); _readHeader = true; + if(_instance->traceLevels()->network >= 1) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + if(_endpoint->datagram()) + { + out << "starting to " << (_connector ? "send" : "receive") << " " << _endpoint->protocol() << " messages\n"; + out << _transceiver->toDetailedString(); + } + else + { + out << (_connector ? "established" : "accepted") << " " << _endpoint->protocol() << " connection\n"; + out << toString(); + } + } + return true; } @@ -2793,7 +2857,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) try { while(true) - { + { // // Notify the message that it was sent. // @@ -2822,7 +2886,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) // // This can occur if parseMessage (called before // sendNextMessage by message()) closes the connection. - // + // if(_state >= StateClosingPending) { return SocketOperationNone; @@ -2904,7 +2968,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) assert(_writeStream.i); if(_writeStream.i != _writeStream.b.end()) { - SocketOperation op = _transceiver->write(_writeStream); + SocketOperation op = write(_writeStream); if(op) { return op; @@ -2917,7 +2981,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) } // - // If all the messages were sent and we are in the closing state, we schedule + // If all the messages were sent and we are in the closing state, we schedule // the close timeout to wait for the peer to close the connection. // if(_state == StateClosing && _shutdownInitiated) @@ -2981,7 +3045,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) { traceSend(*message.stream, _logger, _traceLevels); } - + // // Send the message without blocking. // @@ -2989,7 +3053,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) { _observer.startWrite(stream); } - op = _transceiver->write(stream); + op = write(stream); if(!op) { if(_observer) @@ -3051,7 +3115,7 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) { _observer.startWrite(*message.stream); } - op = _transceiver->write(*message.stream); + op = write(*message.stream); if(!op) { if(_observer) @@ -3232,7 +3296,7 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse SocketOperation Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter, - OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback, + OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback, int& dispatchCount) { assert(_state > StateNotValidated && _state < StateClosed); @@ -3294,7 +3358,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request // // Notify the the transceiver of the graceful connection closure. - // + // SocketOperation op = _transceiver->closing(false, *_exception.get()); if(op) { @@ -3418,8 +3482,8 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) // - // If we just received the reply of a request which isn't acknowledge as - // sent yet, we queue the reply instead of processing it right away. It + // If we just received the reply of a request which isn't acknowledge as + // sent yet, we queue the reply instead of processing it right away. It // will be processed once the write callback is invoked for the message. // OutgoingMessage* message = _sendStreams.empty() ? 0 : &_sendStreams.front(); @@ -3445,7 +3509,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request { outAsync = 0; } -#endif +#endif notifyAll(); // Notify threads blocked in close(false) } @@ -3562,12 +3626,12 @@ Ice::ConnectionI::scheduleTimeout(SocketOperation status) timeout = _endpoint->timeout(); } } - + if(timeout < 0) { return; } - + try { if(status & IceInternal::SocketOperationRead) @@ -3636,6 +3700,46 @@ ConnectionI::toConnectionState(State state) const return connectionStateMap[static_cast<int>(state)]; } +SocketOperation +ConnectionI::read(Buffer& buf) +{ + Buffer::Container::iterator start = buf.i; + SocketOperation op = _transceiver->read(buf, _hasMoreData); + if(_instance->traceLevels()->network >= 3 && buf.i != start) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "received "; + if(_endpoint->datagram()) + { + out << buf.b.size(); + } + else + { + out << (buf.i - start) << " of " << (buf.b.end() - start); + } + out << " bytes via " << _endpoint->protocol() << "\n" << toString(); + } + return op; +} + +SocketOperation +ConnectionI::write(Buffer& buf) +{ + Buffer::Container::iterator start = buf.i; + SocketOperation op = _transceiver->write(buf); + if(_instance->traceLevels()->network >= 3 && buf.i != start) + { + Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); + out << "sent " << (buf.i - start); + if(!_endpoint->datagram()) + { + out << " of " << (buf.b.end() - start); + } + out << " bytes via " << _endpoint->protocol() << "\n" << toString(); + } + return op; +} + void ConnectionI::reap() { diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index d9ff91c5f53..4d0f6caf4f4 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -53,8 +53,8 @@ namespace Ice class LocalException; -class ConnectionI : public Connection, - public IceInternal::EventHandler, +class ConnectionI : public Connection, + public IceInternal::EventHandler, public IceInternal::ResponseHandler, public IceUtil::Monitor<IceUtil::Mutex> { @@ -165,14 +165,14 @@ public: void abortBatchRequest(); virtual void flushBatchRequests(); // From Connection. - + virtual AsyncResultPtr begin_flushBatchRequests(); virtual AsyncResultPtr begin_flushBatchRequests(const CallbackPtr&, const LocalObjectPtr& = 0); virtual AsyncResultPtr begin_flushBatchRequests(const Callback_Connection_flushBatchRequestsPtr&, - const LocalObjectPtr& = 0); + const LocalObjectPtr& = 0); #ifdef ICE_CPP11 virtual AsyncResultPtr begin_flushBatchRequests( - const ::IceInternal::Function<void (const ::Ice::Exception&)>&, + const ::IceInternal::Function<void (const ::Ice::Exception&)>&, const ::IceInternal::Function<void (bool)>& = ::IceInternal::Function<void (bool)>()); #endif @@ -183,7 +183,7 @@ public: virtual void setCallback(const ConnectionCallbackPtr&); virtual void setACM(const IceUtil::Optional<int>&, - const IceUtil::Optional<ACMClose>&, + const IceUtil::Optional<ACMClose>&, const IceUtil::Optional<ACMHeartbeat>&); virtual ACM getACM(); @@ -223,9 +223,9 @@ public: void exception(const LocalException&); virtual void invokeException(Ice::Int, const LocalException&, int); - + void dispatch(const StartCallbackPtr&, const std::vector<OutgoingMessage>&, Byte, Int, Int, - const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&, const IceInternal::OutgoingAsyncPtr&, + const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&, const IceInternal::OutgoingAsyncPtr&, const ConnectionCallbackPtr&, IceInternal::BasicStream&); void finish(); @@ -267,7 +267,7 @@ private: void doUncompress(IceInternal::BasicStream&, IceInternal::BasicStream&); #endif IceInternal::SocketOperation parseMessage(IceInternal::BasicStream&, Int&, Int&, Byte&, - IceInternal::ServantManagerPtr&, ObjectAdapterPtr&, + IceInternal::ServantManagerPtr&, ObjectAdapterPtr&, IceInternal::OutgoingAsyncPtr&, ConnectionCallbackPtr&, int&); void invokeAll(IceInternal::BasicStream&, Int, Int, Byte, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&); @@ -278,8 +278,11 @@ private: Ice::ConnectionInfoPtr initConnectionInfo() const; Ice::Instrumentation::ConnectionState toConnectionState(State) const; + IceInternal::SocketOperation read(IceInternal::Buffer&); + IceInternal::SocketOperation write(IceInternal::Buffer&); + void reap(); - + AsyncResultPtr __begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); Ice::CommunicatorPtr _communicator; @@ -292,7 +295,7 @@ private: const IceInternal::EndpointIPtr _endpoint; mutable Ice::ConnectionInfoPtr _info; - + ObjectAdapterPtr _adapter; IceInternal::ServantManagerPtr _servantManager; @@ -345,6 +348,7 @@ private: State _state; // The current state. bool _shutdownInitiated; + bool _initialized; bool _validated; Ice::ConnectionCallbackPtr _callback; diff --git a/cpp/src/Ice/EndpointI.h b/cpp/src/Ice/EndpointI.h index 08e0d4fe6a4..68aa622bc1a 100644 --- a/cpp/src/Ice/EndpointI.h +++ b/cpp/src/Ice/EndpointI.h @@ -99,12 +99,9 @@ public: // // Return a server side transceiver for this endpoint, or null if a - // transceiver can only be created by an acceptor. In case a - // transceiver is created, this operation also returns a new - // "effective" endpoint, which might differ from this endpoint, - // for example, if a dynamic port number is assigned. + // transceiver can only be created by an acceptor. // - virtual TransceiverPtr transceiver(EndpointIPtr&) const = 0; + virtual TransceiverPtr transceiver() const = 0; // // Return connectors for this endpoint, or empty vector if no @@ -117,12 +114,17 @@ public: // // Return an acceptor for this endpoint, or null if no acceptors - // is available. In case an acceptor is created, this operation - // also returns a new "effective" endpoint, which might differ - // from this endpoint, for example, if a dynamic port number is - // assigned. + // is available. // - virtual AcceptorPtr acceptor(EndpointIPtr&, const std::string&) const = 0; + virtual AcceptorPtr acceptor(const std::string&) const = 0; + + // + // Return (potentially) new endpoint based on info from associated + // Transceiver or Acceptor, which might differ from this endpoint, + // for example, if a dynamic port number was assigned. + // + virtual EndpointIPtr endpoint(const TransceiverPtr&) const = 0; + virtual EndpointIPtr endpoint(const AcceptorPtr&) const = 0; // // Expand endpoint out in to separate endpoints for each local diff --git a/cpp/src/Ice/OpaqueEndpointI.cpp b/cpp/src/Ice/OpaqueEndpointI.cpp index 6b86a20ee2c..efc5170d4cd 100644 --- a/cpp/src/Ice/OpaqueEndpointI.cpp +++ b/cpp/src/Ice/OpaqueEndpointI.cpp @@ -27,7 +27,7 @@ static string opaqueEndpointConnectionId; } -IceInternal::OpaqueEndpointI::OpaqueEndpointI(vector<string>& args) : +IceInternal::OpaqueEndpointI::OpaqueEndpointI(vector<string>& args) : _type(-1), _rawEncoding(Encoding_1_0) { initWithOptions(args); @@ -170,9 +170,8 @@ IceInternal::OpaqueEndpointI::secure() const } TransceiverPtr -IceInternal::OpaqueEndpointI::transceiver(EndpointIPtr& endp) const +IceInternal::OpaqueEndpointI::transceiver() const { - endp = const_cast<OpaqueEndpointI*>(this); return 0; } @@ -190,12 +189,23 @@ IceInternal::OpaqueEndpointI::connectors_async(Ice::EndpointSelectionType, const } AcceptorPtr -IceInternal::OpaqueEndpointI::acceptor(EndpointIPtr& endp, const string&) const +IceInternal::OpaqueEndpointI::acceptor(const string&) const { - endp = const_cast<OpaqueEndpointI*>(this); return 0; } +EndpointIPtr +IceInternal::OpaqueEndpointI::endpoint(const TransceiverPtr&) const +{ + return const_cast<OpaqueEndpointI*>(this); +} + +EndpointIPtr +IceInternal::OpaqueEndpointI::endpoint(const AcceptorPtr&) const +{ + return const_cast<OpaqueEndpointI*>(this); +} + vector<EndpointIPtr> IceInternal::OpaqueEndpointI::expand() const { diff --git a/cpp/src/Ice/OpaqueEndpointI.h b/cpp/src/Ice/OpaqueEndpointI.h index 79ef92b7a97..99e7815e846 100644 --- a/cpp/src/Ice/OpaqueEndpointI.h +++ b/cpp/src/Ice/OpaqueEndpointI.h @@ -22,7 +22,7 @@ public: OpaqueEndpointI(std::vector<std::string>&); OpaqueEndpointI(Ice::Short, BasicStream*); - + virtual void streamWrite(BasicStream*) const; virtual Ice::EndpointInfoPtr getInfo() const; virtual Ice::Short type() const; @@ -37,10 +37,12 @@ public: virtual bool datagram() const; virtual bool secure() const; - virtual TransceiverPtr transceiver(EndpointIPtr&) const; + virtual TransceiverPtr transceiver() const; virtual std::vector<ConnectorPtr> connectors(Ice::EndpointSelectionType) const; virtual void connectors_async(Ice::EndpointSelectionType, const EndpointI_connectorsPtr&) const; - virtual AcceptorPtr acceptor(EndpointIPtr&, const std::string&) const; + virtual AcceptorPtr acceptor(const std::string&) const; + virtual EndpointIPtr endpoint(const TransceiverPtr&) const; + virtual EndpointIPtr endpoint(const AcceptorPtr&) const; virtual std::vector<EndpointIPtr> expand() const; virtual bool equivalent(const EndpointIPtr&) const; virtual Ice::Int hash() const; diff --git a/cpp/src/Ice/TcpAcceptor.cpp b/cpp/src/Ice/TcpAcceptor.cpp index 29445ff48af..c77afdef2b1 100644 --- a/cpp/src/Ice/TcpAcceptor.cpp +++ b/cpp/src/Ice/TcpAcceptor.cpp @@ -9,6 +9,7 @@ #include <Ice/TcpAcceptor.h> #include <Ice/TcpTransceiver.h> +#include <Ice/EndpointI.h> #include <Ice/ProtocolInstance.h> #include <Ice/LoggerUtil.h> #include <Ice/LocalException.h> @@ -45,20 +46,16 @@ IceInternal::TcpAcceptor::getAsyncInfo(SocketOperation) void IceInternal::TcpAcceptor::close() { - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "stopping to accept " << _instance->protocol() << " connections at " << toString(); - } - SOCKET fd = _fd; _fd = INVALID_SOCKET; closeSocket(fd); } -void -IceInternal::TcpAcceptor::listen() +EndpointIPtr +IceInternal::TcpAcceptor::listen(const EndpointIPtr& endp) { + const_cast<Address&>(_addr) = doBind(_fd, _addr); + try { doListen(_fd, _backlog); @@ -68,19 +65,8 @@ IceInternal::TcpAcceptor::listen() _fd = INVALID_SOCKET; throw; } - - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "listening for " << _instance->protocol() << " connections at " << toString(); - vector<string> intfs = getHostsForEndpointExpand(inetAddrToString(_addr), _instance->protocolSupport(), true); - if(!intfs.empty()) - { - out << "\nlocal interfaces: "; - out << IceUtilInternal::joinString(intfs, ", "); - } - } + return endp->endpoint(this); } #ifdef ICE_USE_IOCP @@ -90,20 +76,20 @@ IceInternal::TcpAcceptor::startAccept() LPFN_ACCEPTEX AcceptEx = NULL; // a pointer to the 'AcceptEx()' function GUID GuidAcceptEx = WSAID_ACCEPTEX; // The Guid DWORD dwBytes; - if(WSAIoctl(_fd, + if(WSAIoctl(_fd, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof(GuidAcceptEx), &AcceptEx, sizeof(AcceptEx), &dwBytes, - NULL, + NULL, NULL) == SOCKET_ERROR) { SocketException ex(__FILE__, __LINE__); ex.error = getSocketErrno(); throw ex; - } + } assert(_acceptFd == INVALID_SOCKET); _acceptFd = createSocket(false, _addr); @@ -140,16 +126,16 @@ IceInternal::TcpAcceptor::accept() { SocketException ex(__FILE__, __LINE__); ex.error = _acceptError; - throw ex; + throw ex; } - if(setsockopt(_acceptFd, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&_acceptFd, sizeof(_acceptFd)) == + if(setsockopt(_acceptFd, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&_acceptFd, sizeof(_acceptFd)) == SOCKET_ERROR) { closeSocketNoThrow(_acceptFd); _acceptFd = INVALID_SOCKET; SocketException ex(__FILE__, __LINE__); ex.error = getSocketErrno(); - throw ex; + throw ex; } SOCKET fd = _acceptFd; @@ -158,11 +144,6 @@ IceInternal::TcpAcceptor::accept() SOCKET fd = doAccept(_fd); #endif - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "accepted " << _instance->protocol() << " connection\n" << fdToString(fd); - } return new TcpTransceiver(_instance, fd); } @@ -178,6 +159,20 @@ IceInternal::TcpAcceptor::toString() const return addrToString(_addr); } +string +IceInternal::TcpAcceptor::toDetailedString() const +{ + ostringstream os; + os << "local address = " << toString(); + vector<string> intfs = getHostsForEndpointExpand(inetAddrToString(_addr), _instance->protocolSupport(), true); + if(!intfs.empty()) + { + os << "\nlocal interfaces = "; + os << IceUtilInternal::joinString(intfs, ", "); + } + return os.str(); +} + int IceInternal::TcpAcceptor::effectivePort() const { @@ -220,13 +215,6 @@ IceInternal::TcpAcceptor::TcpAcceptor(const ProtocolInstancePtr& instance, const // setReuseAddress(_fd, true); #endif - - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "attempting to bind to " << _instance->protocol() << " socket " << toString(); - } - const_cast<Address&>(_addr) = doBind(_fd, _addr); } IceInternal::TcpAcceptor::~TcpAcceptor() diff --git a/cpp/src/Ice/TcpAcceptor.h b/cpp/src/Ice/TcpAcceptor.h index 32a54af9182..7248bb5a533 100644 --- a/cpp/src/Ice/TcpAcceptor.h +++ b/cpp/src/Ice/TcpAcceptor.h @@ -30,7 +30,7 @@ public: #endif virtual void close(); - virtual void listen(); + virtual EndpointIPtr listen(const EndpointIPtr&); #ifdef ICE_USE_IOCP virtual void startAccept(); virtual void finishAccept(); @@ -39,6 +39,7 @@ public: virtual TransceiverPtr accept(); virtual std::string protocol() const; virtual std::string toString() const; + virtual std::string toDetailedString() const; int effectivePort() const; diff --git a/cpp/src/Ice/TcpConnector.cpp b/cpp/src/Ice/TcpConnector.cpp index 41579d374df..cf79f0dd5df 100644 --- a/cpp/src/Ice/TcpConnector.cpp +++ b/cpp/src/Ice/TcpConnector.cpp @@ -22,28 +22,10 @@ using namespace IceInternal; TransceiverPtr IceInternal::TcpConnector::connect() { - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "trying to establish " << _instance->protocol() << " connection to " << toString(); - } + TransceiverPtr transceiver = new TcpTransceiver(_instance, createSocket(false, _addr), _proxy, _addr, _sourceAddr); + dynamic_cast<TcpTransceiver*>(transceiver.get())->connect(); + return transceiver; - try - { - TransceiverPtr transceiver = - new TcpTransceiver(_instance, createSocket(false, _addr), _proxy, _addr, _sourceAddr); - dynamic_cast<TcpTransceiver*>(transceiver.get())->connect(); - return transceiver; - } - catch(const Ice::LocalException& ex) - { - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "failed to establish " << _instance->protocol() << " connection to " << toString() << "\n" << ex; - } - throw; - } } Short diff --git a/cpp/src/Ice/TcpEndpointI.cpp b/cpp/src/Ice/TcpEndpointI.cpp index a47c956c6c2..49b44649038 100644 --- a/cpp/src/Ice/TcpEndpointI.cpp +++ b/cpp/src/Ice/TcpEndpointI.cpp @@ -136,18 +136,28 @@ IceInternal::TcpEndpointI::secure() const } TransceiverPtr -IceInternal::TcpEndpointI::transceiver(EndpointIPtr& endp) const +IceInternal::TcpEndpointI::transceiver() const { - endp = const_cast<TcpEndpointI*>(this); return 0; } AcceptorPtr -IceInternal::TcpEndpointI::acceptor(EndpointIPtr& endp, const string&) const +IceInternal::TcpEndpointI::acceptor(const string&) const { - TcpAcceptor* p = new TcpAcceptor(_instance, _host, _port); - endp = createEndpoint(_host, p->effectivePort(), _connectionId); - return p; + return new TcpAcceptor(_instance, _host, _port); +} + +EndpointIPtr +IceInternal::TcpEndpointI::endpoint(const TransceiverPtr& transceiver) const +{ + return const_cast<TcpEndpointI*>(this); +} + +EndpointIPtr +IceInternal::TcpEndpointI::endpoint(const AcceptorPtr& acceptor) const +{ + TcpAcceptor* p = dynamic_cast<TcpAcceptor*>(acceptor.get()); + return createEndpoint(_host, p->effectivePort(), _connectionId); } string diff --git a/cpp/src/Ice/TcpEndpointI.h b/cpp/src/Ice/TcpEndpointI.h index 0e94a1d1de7..fd566bd2235 100644 --- a/cpp/src/Ice/TcpEndpointI.h +++ b/cpp/src/Ice/TcpEndpointI.h @@ -36,8 +36,10 @@ public: virtual bool datagram() const; virtual bool secure() const; - virtual TransceiverPtr transceiver(EndpointIPtr&) const; - virtual AcceptorPtr acceptor(EndpointIPtr&, const std::string&) const; + virtual TransceiverPtr transceiver() const; + virtual AcceptorPtr acceptor(const std::string&) const; + virtual EndpointIPtr endpoint(const TransceiverPtr&) const; + virtual EndpointIPtr endpoint(const AcceptorPtr&) const; virtual std::string options() const; virtual bool operator==(const Ice::LocalObject&) const; diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp index ec2078c279d..76440f5f935 100644 --- a/cpp/src/Ice/TcpTransceiver.cpp +++ b/cpp/src/Ice/TcpTransceiver.cpp @@ -46,115 +46,97 @@ IceInternal::TcpTransceiver::getAsyncInfo(SocketOperation status) SocketOperation IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool& hasMoreData) { - try + if(_state == StateNeedConnect) + { + _state = StateConnectPending; + return SocketOperationConnect; + } + else if(_state <= StateConnectPending) { - if(_state == StateNeedConnect) - { - _state = StateConnectPending; - return SocketOperationConnect; - } - else if(_state <= StateConnectPending) - { #ifdef ICE_USE_IOCP - doFinishConnectAsync(_fd, _write); + doFinishConnectAsync(_fd, _write); #else - doFinishConnect(_fd); + doFinishConnect(_fd); #endif - _desc = fdToString(_fd, _proxy, _addr, true); + _desc = fdToString(_fd, _proxy, _addr, true); - if(_proxy) - { - // - // Prepare the read & write buffers in advance. - // - _proxy->beginWriteConnectRequest(_addr, writeBuffer); - _proxy->beginReadConnectRequestResponse(readBuffer); + if(_proxy) + { + // + // Prepare the read & write buffers in advance. + // + _proxy->beginWriteConnectRequest(_addr, writeBuffer); + _proxy->beginReadConnectRequestResponse(readBuffer); #ifdef ICE_USE_IOCP + // + // Return SocketOperationWrite to indicate we need to start a write. + // + _state = StateProxyConnectRequest; // Send proxy connect request + return IceInternal::SocketOperationWrite; +#else + // + // Write the proxy connection message. + // + if(write(writeBuffer)) + { // - // Return SocketOperationWrite to indicate we need to start a write. + // Write completed without blocking. // - _state = StateProxyConnectRequest; // Send proxy connect request - return IceInternal::SocketOperationWrite; -#else + _proxy->endWriteConnectRequest(writeBuffer); + // - // Write the proxy connection message. + // Try to read the response. // - if(write(writeBuffer)) + if(read(readBuffer, hasMoreData)) { // - // Write completed without blocking. - // - _proxy->endWriteConnectRequest(writeBuffer); - + // Read completed without blocking - fall through. // - // Try to read the response. - // - if(read(readBuffer, hasMoreData)) - { - // - // Read completed without blocking - fall through. - // - _proxy->endReadConnectRequestResponse(readBuffer); - } - else - { - // - // Return SocketOperationRead to indicate we need to complete the read. - // - _state = StateProxyConnectRequestPending; // Wait for proxy response - return SocketOperationRead; - } + _proxy->endReadConnectRequestResponse(readBuffer); } else { // - // Return SocketOperationWrite to indicate we need to complete the write. + // Return SocketOperationRead to indicate we need to complete the read. // - _state = StateProxyConnectRequest; // Send proxy connect request - return SocketOperationWrite; + _state = StateProxyConnectRequestPending; // Wait for proxy response + return SocketOperationRead; } -#endif } - - _state = StateConnected; - } - else if(_state == StateProxyConnectRequest) - { - // - // Write completed. - // - _proxy->endWriteConnectRequest(writeBuffer); - _state = StateProxyConnectRequestPending; // Wait for proxy response - return SocketOperationRead; - } - else if(_state == StateProxyConnectRequestPending) - { - // - // Read completed. - // - _proxy->endReadConnectRequestResponse(readBuffer); - _state = StateConnected; + else + { + // + // Return SocketOperationWrite to indicate we need to complete the write. + // + _state = StateProxyConnectRequest; // Send proxy connect request + return SocketOperationWrite; + } +#endif } + + _state = StateConnected; } - catch(const Ice::LocalException& ex) + else if(_state == StateProxyConnectRequest) { - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "failed to establish " << _instance->protocol() << " connection\n" - << fdToString(_fd, _proxy, _addr, false) << "\n" << ex; - } - throw; + // + // Write completed. + // + _proxy->endWriteConnectRequest(writeBuffer); + _state = StateProxyConnectRequestPending; // Wait for proxy response + return SocketOperationRead; } - - assert(_state == StateConnected); - if(_instance->traceLevel() >= 1) + else if(_state == StateProxyConnectRequestPending) { - Trace out(_instance->logger(), _instance->traceCategory()); - out << _instance->protocol() << " connection established\n" << _desc; + // + // Read completed. + // + _proxy->endReadConnectRequestResponse(readBuffer); + _state = StateConnected; } + + assert(_state == StateConnected); return SocketOperationNone; } @@ -169,16 +151,6 @@ IceInternal::TcpTransceiver::closing(bool initiator, const Ice::LocalException&) void IceInternal::TcpTransceiver::close() { - // - // If the transceiver is not connected, its description is simply "not connected", - // which isn't very helpful. - // - if(_state == StateConnected && _instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "closing " << _instance->protocol() << " connection\n" << toString(); - } - assert(_fd != INVALID_SOCKET); try { @@ -257,13 +229,6 @@ IceInternal::TcpTransceiver::write(Buffer& buf) } } - if(_instance->traceLevel() >= 3) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "sent " << ret << " of " << packetSize << " bytes via " << _instance->protocol() << '\n' - << toString(); - } - buf.i += ret; if(packetSize > buf.b.end() - buf.i) @@ -331,13 +296,6 @@ IceInternal::TcpTransceiver::read(Buffer& buf, bool&) } } - if(_instance->traceLevel() >= 3) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "received " << ret << " of " << packetSize << " bytes via " << _instance->protocol() << '\n' - << toString(); - } - buf.i += ret; packetSize = static_cast<int>(buf.b.end() - buf.i); @@ -414,19 +372,6 @@ IceInternal::TcpTransceiver::finishWrite(Buffer& buf) } } - if(_instance->traceLevel() >= 3) - { - int packetSize = static_cast<int>(buf.b.end() - buf.i); - if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) - { - packetSize = _maxSendPacketSize; - } - Trace out(_instance->logger(), _instance->traceCategory()); - - out << "sent " << _write.count << " of " << packetSize << " bytes via " << _instance->protocol() << '\n' - << toString(); - } - buf.i += _write.count; } @@ -489,18 +434,6 @@ IceInternal::TcpTransceiver::finishRead(Buffer& buf, bool&) throw ex; } - if(_instance->traceLevel() >= 3) - { - int packetSize = static_cast<int>(buf.b.end() - buf.i); - if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize) - { - packetSize = _maxReceivePacketSize; - } - Trace out(_instance->logger(), _instance->traceCategory()); - out << "received " << _read.count << " of " << packetSize << " bytes via " << _instance->protocol() << '\n' - << toString(); - } - buf.i += _read.count; } #endif @@ -517,6 +450,12 @@ IceInternal::TcpTransceiver::toString() const return _desc; } +string +IceInternal::TcpTransceiver::toDetailedString() const +{ + return toString(); +} + Ice::ConnectionInfoPtr IceInternal::TcpTransceiver::getInfo() const { @@ -622,11 +561,6 @@ IceInternal::TcpTransceiver::connect() { _state = StateConnected; _desc = fdToString(_fd, _proxy, _addr, true); - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << _instance->protocol() << " connection established\n" << _desc; - } } else { diff --git a/cpp/src/Ice/TcpTransceiver.h b/cpp/src/Ice/TcpTransceiver.h index 4e0f36b0056..0e1189a3d31 100644 --- a/cpp/src/Ice/TcpTransceiver.h +++ b/cpp/src/Ice/TcpTransceiver.h @@ -51,6 +51,7 @@ public: #endif virtual std::string protocol() const; virtual std::string toString() const; + virtual std::string toDetailedString() const; virtual Ice::ConnectionInfoPtr getInfo() const; virtual void checkSendSize(const Buffer&, size_t); diff --git a/cpp/src/Ice/Transceiver.cpp b/cpp/src/Ice/Transceiver.cpp index e739e00961b..962aefeca1d 100644 --- a/cpp/src/Ice/Transceiver.cpp +++ b/cpp/src/Ice/Transceiver.cpp @@ -15,3 +15,9 @@ using namespace IceInternal; IceUtil::Shared* IceInternal::upCast(Transceiver* p) { return p; } +EndpointIPtr +IceInternal::Transceiver::bind(const EndpointIPtr&) +{ + assert(false); + return 0; +} diff --git a/cpp/src/Ice/Transceiver.h b/cpp/src/Ice/Transceiver.h index 3e645a7793e..377f99b8f82 100644 --- a/cpp/src/Ice/Transceiver.h +++ b/cpp/src/Ice/Transceiver.h @@ -13,6 +13,7 @@ #include <IceUtil/Shared.h> #include <Ice/TransceiverF.h> #include <Ice/ConnectionF.h> +#include <Ice/EndpointIF.h> #include <Ice/Network.h> namespace IceInternal @@ -23,13 +24,13 @@ class Buffer; class ICE_API Transceiver : virtual public ::IceUtil::Shared { public: - + virtual NativeInfoPtr getNativeInfo() = 0; - + virtual SocketOperation initialize(Buffer&, Buffer&, bool&) = 0; virtual SocketOperation closing(bool, const Ice::LocalException&) = 0; virtual void close() = 0; - + virtual EndpointIPtr bind(const EndpointIPtr&); virtual SocketOperation write(Buffer&) = 0; virtual SocketOperation read(Buffer&, bool&) = 0; #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) @@ -41,6 +42,7 @@ public: virtual std::string protocol() const = 0; virtual std::string toString() const = 0; + virtual std::string toDetailedString() const = 0; virtual Ice::ConnectionInfoPtr getInfo() const = 0; virtual void checkSendSize(const Buffer&, size_t) = 0; }; diff --git a/cpp/src/Ice/UdpEndpointI.cpp b/cpp/src/Ice/UdpEndpointI.cpp index 22d87e20174..dacaef1adb9 100644 --- a/cpp/src/Ice/UdpEndpointI.cpp +++ b/cpp/src/Ice/UdpEndpointI.cpp @@ -144,20 +144,30 @@ IceInternal::UdpEndpointI::secure() const } TransceiverPtr -IceInternal::UdpEndpointI::transceiver(EndpointIPtr& endp) const +IceInternal::UdpEndpointI::transceiver() const { - UdpTransceiver* p = new UdpTransceiver(_instance, _host, _port, _mcastInterface, _connect); - endp = createEndpoint(_host, p->effectivePort(), _connectionId); - return p; + return new UdpTransceiver(_instance, _host, _port, _mcastInterface, _connect); } AcceptorPtr -IceInternal::UdpEndpointI::acceptor(EndpointIPtr& endp, const string&) const +IceInternal::UdpEndpointI::acceptor(const string&) const { - endp = const_cast<UdpEndpointI*>(this); return 0; } +EndpointIPtr +IceInternal::UdpEndpointI::endpoint(const TransceiverPtr& transceiver) const +{ + UdpTransceiver* p = dynamic_cast<UdpTransceiver*>(transceiver.get()); + return createEndpoint(_host, p->effectivePort(), _connectionId); +} + +EndpointIPtr +IceInternal::UdpEndpointI::endpoint(const AcceptorPtr& acceptor) const +{ + return const_cast<UdpEndpointI*>(this); +} + string IceInternal::UdpEndpointI::options() const { diff --git a/cpp/src/Ice/UdpEndpointI.h b/cpp/src/Ice/UdpEndpointI.h index ae43d412a32..d2531fce206 100644 --- a/cpp/src/Ice/UdpEndpointI.h +++ b/cpp/src/Ice/UdpEndpointI.h @@ -36,8 +36,10 @@ public: virtual bool datagram() const; virtual bool secure() const; - virtual TransceiverPtr transceiver(EndpointIPtr&) const; - virtual AcceptorPtr acceptor(EndpointIPtr&, const std::string&) const; + virtual TransceiverPtr transceiver() const; + virtual AcceptorPtr acceptor(const std::string&) const; + virtual EndpointIPtr endpoint(const TransceiverPtr&) const; + virtual EndpointIPtr endpoint(const AcceptorPtr&) const; virtual std::string options() const; virtual bool operator==(const Ice::LocalObject&) const; diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp index 932c04282ee..418c2528285 100644 --- a/cpp/src/Ice/UdpTransceiver.cpp +++ b/cpp/src/Ice/UdpTransceiver.cpp @@ -8,6 +8,7 @@ // ********************************************************************** #include <Ice/UdpTransceiver.h> +#include <Ice/EndpointI.h> #include <Ice/Connection.h> #include <Ice/ProtocolInstance.h> #include <Ice/LoggerUtil.h> @@ -87,39 +88,19 @@ IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeB } else if(_state <= StateConnectPending) { - try - { #if defined(ICE_USE_IOCP) - doFinishConnectAsync(_fd, _write); + doFinishConnectAsync(_fd, _write); #elif defined(ICE_OS_WINRT) - if(_write.count == SOCKET_ERROR) - { - checkConnectErrorCode(__FILE__, __LINE__, _write.error, _addr.host); - } -#else - doFinishConnect(_fd); -#endif - _state = StateConnected; - } - catch(const Ice::LocalException& ex) + if(_write.count == SOCKET_ERROR) { - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "failed to connect " << _instance->protocol() << " socket\n" << toString() << "\n" << ex; - } - throw; + checkConnectErrorCode(__FILE__, __LINE__, _write.error, _addr.host); } +#else + doFinishConnect(_fd); +#endif + _state = StateConnected; } - if(_state == StateConnected) - { - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "starting to send " << _instance->protocol() << " packets\n" << toString(); - } - } assert(_state >= StateConnected); return SocketOperationNone; } @@ -134,12 +115,6 @@ IceInternal::UdpTransceiver::closing(bool, const Ice::LocalException&) void IceInternal::UdpTransceiver::close() { - if(_state >= StateConnected && _instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "closing " << _instance->protocol() << " connection\n" << toString(); - } - #ifdef ICE_OS_WINRT IceUtil::Mutex::Lock lock(_mutex); if(_readPending) @@ -156,6 +131,55 @@ IceInternal::UdpTransceiver::close() _fd = INVALID_SOCKET; } +EndpointIPtr +IceInternal::UdpTransceiver::bind(const EndpointIPtr& endp) +{ + if(isMulticast(_addr)) + { + setReuseAddress(_fd, true); + _mcastAddr = _addr; + +#ifdef _WIN32 + // + // Windows does not allow binding to the mcast address itself + // so we bind to INADDR_ANY (0.0.0.0) instead. As a result, + // bi-directional connection won't work because the source + // address won't be the multicast address and the client will + // therefore reject the datagram. + // + const_cast<Address&>(_addr) = getAddressForServer("", _port, getProtocolSupport(_addr), false); +#endif + + const_cast<Address&>(_addr) = doBind(_fd, _addr); + if(getPort(_mcastAddr) == 0) + { + setPort(_mcastAddr, getPort(_addr)); + } + setMcastGroup(_fd, _mcastAddr, _mcastInterface); + } + else + { +#ifndef _WIN32 + // + // Enable SO_REUSEADDR on Unix platforms to allow re-using + // the socket even if it's in the TIME_WAIT state. On + // Windows, this doesn't appear to be necessary and + // enabling SO_REUSEADDR would actually not be a good + // thing since it allows a second process to bind to an + // address even it's already bound by another process. + // + // TODO: using SO_EXCLUSIVEADDRUSE on Windows would + // probably be better but it's only supported by recent + // Windows versions (XP SP2, Windows Server 2003). + // + setReuseAddress(_fd, true); +#endif + const_cast<Address&>(_addr) = doBind(_fd, _addr); + } + + _bound = true; + return endp->endpoint(this); +} SocketOperation IceInternal::UdpTransceiver::write(Buffer& buf) @@ -229,12 +253,6 @@ repeat: throw ex; } - if(_instance->traceLevel() >= 3) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "sent " << ret << " bytes via " << _instance->protocol() << '\n' << toString(); - } - assert(ret == static_cast<ssize_t>(buf.b.size())); buf.i = buf.b.end(); return SocketOperationNone; @@ -341,12 +359,6 @@ repeat: } } - if(_instance->traceLevel() >= 3) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "received " << ret << " bytes via " << _instance->protocol() << '\n' << toString(); - } - buf.b.resize(ret); buf.i = buf.b.end(); return SocketOperationNone; @@ -597,12 +609,6 @@ IceInternal::UdpTransceiver::finishWrite(Buffer& buf) #endif } - if(_instance->traceLevel() >= 3) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "sent " << _write.count << " bytes via " << _instance->protocol() << '\n' << toString(); - } - assert(_write.count == buf.b.size()); buf.i = buf.b.end(); } @@ -738,12 +744,6 @@ IceInternal::UdpTransceiver::finishRead(Buffer& buf, bool&) int ret = _read.count; #endif - if(_instance->traceLevel() >= 3) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "received " << ret << " bytes via " << _instance->protocol() << '\n' << toString(); - } - buf.b.resize(ret); buf.i = buf.b.end(); } @@ -764,7 +764,11 @@ IceInternal::UdpTransceiver::toString() const } ostringstream s; - if(_state == StateNotConnected) + if(_incoming && !_bound) + { + s << "local address = " << addrToString(_addr); + } + else if(_state == StateNotConnected) { Address localAddr; fdToLocalAddress(_fd, localAddr); @@ -793,6 +797,20 @@ IceInternal::UdpTransceiver::toString() const return s.str(); } +string +IceInternal::UdpTransceiver::toDetailedString() const +{ + ostringstream os; + os << toString(); + vector<string> intfs = getHostsForEndpointExpand(inetAddrToString(_addr), _instance->protocolSupport(), true); + if(!intfs.empty()) + { + os << "\nlocal interfaces = "; + os << IceUtilInternal::joinString(intfs, ", "); + } + return os.str(); +} + Ice::ConnectionInfoPtr IceInternal::UdpTransceiver::getInfo() const { @@ -866,7 +884,6 @@ IceInternal::UdpTransceiver::effectivePort() const return getPort(_addr); } - IceInternal::UdpTransceiver::UdpTransceiver(const ProtocolInstancePtr& instance, const Address& addr, #ifdef ICE_OS_WINRT @@ -881,6 +898,7 @@ IceInternal::UdpTransceiver::UdpTransceiver(const ProtocolInstancePtr& instance, ) : _instance(instance), _incoming(false), + _bound(false), _addr(addr), _state(StateNeedConnect) #if defined(ICE_USE_IOCP) @@ -950,7 +968,10 @@ IceInternal::UdpTransceiver::UdpTransceiver(const ProtocolInstancePtr& instance, const string& mcastInterface, bool connect) : _instance(instance), _incoming(true), + _bound(false), _addr(getAddressForServer(host, port, instance->protocolSupport(), instance->preferIPv6())), + _mcastInterface(mcastInterface), + _port(port), _state(connect ? StateNeedConnect : StateNotConnected) #ifdef ICE_OS_WINRT , _readPending(false) @@ -976,69 +997,6 @@ IceInternal::UdpTransceiver::UdpTransceiver(const ProtocolInstancePtr& instance, this->appendMessage(args); }); #endif - - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "attempting to bind to " << _instance->protocol() << " socket " << addrToString(_addr); - } - - if(isMulticast(_addr)) - { - setReuseAddress(_fd, true); - _mcastAddr = _addr; - -#ifdef _WIN32 - // - // Windows does not allow binding to the mcast address itself - // so we bind to INADDR_ANY (0.0.0.0) instead. As a result, - // bi-directional connection won't work because the source - // address won't be the multicast address and the client will - // therefore reject the datagram. - // - const_cast<Address&>(_addr) = getAddressForServer("", port, getProtocolSupport(_addr), false); -#endif - - const_cast<Address&>(_addr) = doBind(_fd, _addr); - if(getPort(_mcastAddr) == 0) - { - setPort(_mcastAddr, getPort(_addr)); - } - setMcastGroup(_fd, _mcastAddr, mcastInterface); - } - else - { -#ifndef _WIN32 - // - // Enable SO_REUSEADDR on Unix platforms to allow re-using - // the socket even if it's in the TIME_WAIT state. On - // Windows, this doesn't appear to be necessary and - // enabling SO_REUSEADDR would actually not be a good - // thing since it allows a second process to bind to an - // address even it's already bound by another process. - // - // TODO: using SO_EXCLUSIVEADDRUSE on Windows would - // probably be better but it's only supported by recent - // Windows versions (XP SP2, Windows Server 2003). - // - setReuseAddress(_fd, true); -#endif - const_cast<Address&>(_addr) = doBind(_fd, _addr); - } - - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "starting to receive " << _instance->protocol() << " packets\n" << toString(); - - vector<string> interfaces = - getHostsForEndpointExpand(inetAddrToString(_addr), _instance->protocolSupport(), true); - if(!interfaces.empty()) - { - out << "\nlocal interfaces: "; - out << IceUtilInternal::joinString(interfaces, ", "); - } - } } IceInternal::UdpTransceiver::~UdpTransceiver() diff --git a/cpp/src/Ice/UdpTransceiver.h b/cpp/src/Ice/UdpTransceiver.h index 618d788480c..e9f4b6b1b55 100644 --- a/cpp/src/Ice/UdpTransceiver.h +++ b/cpp/src/Ice/UdpTransceiver.h @@ -47,6 +47,7 @@ public: virtual SocketOperation initialize(Buffer&, Buffer&, bool&); virtual SocketOperation closing(bool, const Ice::LocalException&); virtual void close(); + virtual EndpointIPtr bind(const EndpointIPtr&); virtual SocketOperation write(Buffer&); virtual SocketOperation read(Buffer&, bool&); #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) @@ -57,6 +58,7 @@ public: #endif virtual std::string protocol() const; virtual std::string toString() const; + virtual std::string toDetailedString() const; virtual Ice::ConnectionInfoPtr getInfo() const; virtual void checkSendSize(const Buffer&, size_t); @@ -82,10 +84,13 @@ private: const ProtocolInstancePtr _instance; const bool _incoming; + bool _bound; const Address _addr; Address _mcastAddr; + const std::string _mcastInterface; Address _peerAddr; + int _port; State _state; int _rcvSize; diff --git a/cpp/src/Ice/WSAcceptor.cpp b/cpp/src/Ice/WSAcceptor.cpp index a6d5f793fc5..8aa995a028d 100644 --- a/cpp/src/Ice/WSAcceptor.cpp +++ b/cpp/src/Ice/WSAcceptor.cpp @@ -9,6 +9,7 @@ #include <Ice/WSAcceptor.h> #include <Ice/WSTransceiver.h> +#include <Ice/WSEndpoint.h> using namespace std; using namespace Ice; @@ -27,7 +28,7 @@ IceInternal::WSAcceptor::getAsyncInfo(IceInternal::SocketOperation status) return _delegate->getNativeInfo()->getAsyncInfo(status); } #elif defined(ICE_OS_WINRT) -void +void IceInternal::WSAcceptor::setCompletedHandler(IceInternal::SocketOperationCompletedHandler^ handler) { _delegate->getNativeInfo()->setCompletedHandler(handler); @@ -40,10 +41,12 @@ IceInternal::WSAcceptor::close() _delegate->close(); } -void -IceInternal::WSAcceptor::listen() +EndpointIPtr +IceInternal::WSAcceptor::listen(const EndpointIPtr& endp) { - _delegate->listen(); + WSEndpoint* p = dynamic_cast<WSEndpoint*>(endp.get()); + EndpointIPtr endpoint = _delegate->listen(p->delegate()); + return endp->endpoint(this); } #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) @@ -82,6 +85,12 @@ IceInternal::WSAcceptor::toString() const return _delegate->toString(); } +string +IceInternal::WSAcceptor::toDetailedString() const +{ + return _delegate->toDetailedString(); +} + IceInternal::WSAcceptor::WSAcceptor(const ProtocolInstancePtr& instance, const IceInternal::AcceptorPtr& del) : _instance(instance), _delegate(del) { diff --git a/cpp/src/Ice/WSAcceptor.h b/cpp/src/Ice/WSAcceptor.h index 5b4347a25e9..a58cb496046 100644 --- a/cpp/src/Ice/WSAcceptor.h +++ b/cpp/src/Ice/WSAcceptor.h @@ -33,7 +33,7 @@ public: #endif virtual void close(); - virtual void listen(); + virtual EndpointIPtr listen(const EndpointIPtr&); #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) virtual void startAccept(); virtual void finishAccept(); @@ -41,6 +41,9 @@ public: virtual TransceiverPtr accept(); virtual std::string protocol() const; virtual std::string toString() const; + virtual std::string toDetailedString() const; + + virtual AcceptorPtr delegate() const { return _delegate; } private: diff --git a/cpp/src/Ice/WSEndpoint.cpp b/cpp/src/Ice/WSEndpoint.cpp index 46a21372ce2..b128fe13cc2 100644 --- a/cpp/src/Ice/WSEndpoint.cpp +++ b/cpp/src/Ice/WSEndpoint.cpp @@ -35,7 +35,7 @@ IceInternal::WSEndpoint::WSEndpoint(const ProtocolInstancePtr& instance, const E } } -IceInternal::WSEndpoint::WSEndpoint(const ProtocolInstancePtr& instance, const EndpointIPtr& del, +IceInternal::WSEndpoint::WSEndpoint(const ProtocolInstancePtr& instance, const EndpointIPtr& del, BasicStream* s) : _instance(instance), _delegate(IPEndpointIPtr::dynamicCast(del)) { @@ -70,9 +70,9 @@ IceInternal::WSEndpoint::getInfo() const { return _endpoint->secure(); } - + private: - + const EndpointIPtr _endpoint; }; @@ -111,7 +111,7 @@ IceInternal::WSEndpoint::timeout() const return _delegate->timeout(); } -IceInternal::EndpointIPtr +EndpointIPtr IceInternal::WSEndpoint::timeout(Int timeout) const { if(timeout == _delegate->timeout()) @@ -130,7 +130,7 @@ IceInternal::WSEndpoint::connectionId() const return _delegate->connectionId(); } -IceInternal::EndpointIPtr +EndpointIPtr IceInternal::WSEndpoint::connectionId(const string& connectionId) const { if(connectionId == _delegate->connectionId()) @@ -149,7 +149,7 @@ IceInternal::WSEndpoint::compress() const return _delegate->compress(); } -IceInternal::EndpointIPtr +EndpointIPtr IceInternal::WSEndpoint::compress(bool compress) const { if(compress == _delegate->compress()) @@ -174,10 +174,9 @@ IceInternal::WSEndpoint::secure() const return _delegate->secure(); } -IceInternal::TransceiverPtr -IceInternal::WSEndpoint::transceiver(EndpointIPtr& endp) const +TransceiverPtr +IceInternal::WSEndpoint::transceiver() const { - endp = const_cast<WSEndpoint*>(this); return 0; } @@ -200,8 +199,8 @@ IceInternal::WSEndpoint::connectors_async(Ice::EndpointSelectionType selType, { public: - CallbackI(const EndpointI_connectorsPtr& callback, const ProtocolInstancePtr& instance, - const string& host, int port, const string& resource) : + CallbackI(const EndpointI_connectorsPtr& callback, const ProtocolInstancePtr& instance, + const string& host, int port, const string& resource) : _callback(callback), _instance(instance), _host(host), _port(port), _resource(resource) { } @@ -222,29 +221,38 @@ IceInternal::WSEndpoint::connectors_async(Ice::EndpointSelectionType selType, } private: - + const EndpointI_connectorsPtr _callback; const ProtocolInstancePtr _instance; const string _host; const int _port; const string _resource; }; - _delegate->connectors_async(selType, new CallbackI(callback, _instance, _delegate->host(), _delegate->port(), + _delegate->connectors_async(selType, new CallbackI(callback, _instance, _delegate->host(), _delegate->port(), _resource)); } -IceInternal::AcceptorPtr -IceInternal::WSEndpoint::acceptor(EndpointIPtr& endp, const string& adapterName) const +AcceptorPtr +IceInternal::WSEndpoint::acceptor(const string& adapterName) const { - EndpointIPtr delEndp; - AcceptorPtr delAcc = _delegate->acceptor(delEndp, adapterName); - if(delEndp) - { - endp = new WSEndpoint(_instance, delEndp, _resource); - } + AcceptorPtr delAcc = _delegate->acceptor(adapterName); return new WSAcceptor(_instance, delAcc); } +EndpointIPtr +IceInternal::WSEndpoint::endpoint(const TransceiverPtr& transceiver) const +{ + return const_cast<WSEndpoint*>(this); +} + +EndpointIPtr +IceInternal::WSEndpoint::endpoint(const AcceptorPtr& acceptor) const +{ + WSAcceptor* p = dynamic_cast<WSAcceptor*>(acceptor.get()); + EndpointIPtr delEndp = _delegate->endpoint(p->delegate()); + return new WSEndpoint(_instance, delEndp, _resource); +} + vector<EndpointIPtr> IceInternal::WSEndpoint::expand() const { @@ -306,6 +314,12 @@ IceInternal::WSEndpoint::options() const return s.str(); } +EndpointIPtr +IceInternal::WSEndpoint::delegate() const +{ + return EndpointIPtr::dynamicCast(_delegate); +} + bool IceInternal::WSEndpoint::operator==(const Ice::LocalObject& r) const { @@ -372,7 +386,7 @@ IceInternal::WSEndpoint::operator<(const Ice::LocalObject& r) const return false; -} +} bool IceInternal::WSEndpoint::checkOption(const string& option, const string& argument, const string& endpoint) @@ -420,13 +434,13 @@ IceInternal::WSEndpointFactory::protocol() const return _instance->protocol(); } -IceInternal::EndpointIPtr +EndpointIPtr IceInternal::WSEndpointFactory::create(vector<string>& args, bool oaEndpoint) const { return new WSEndpoint(_instance, _delegate->create(args, oaEndpoint), args); } -IceInternal::EndpointIPtr +EndpointIPtr IceInternal::WSEndpointFactory::read(BasicStream* s) const { return new WSEndpoint(_instance, _delegate->read(s), s); @@ -439,7 +453,7 @@ IceInternal::WSEndpointFactory::destroy() _instance = 0; } -IceInternal::EndpointFactoryPtr +EndpointFactoryPtr IceInternal::WSEndpointFactory::clone(const ProtocolInstancePtr&) const { assert(false); // We don't support cloning this transport. diff --git a/cpp/src/Ice/WSEndpoint.h b/cpp/src/Ice/WSEndpoint.h index fec60753963..c344c614575 100644 --- a/cpp/src/Ice/WSEndpoint.h +++ b/cpp/src/Ice/WSEndpoint.h @@ -42,15 +42,19 @@ public: virtual bool datagram() const; virtual bool secure() const; - virtual TransceiverPtr transceiver(EndpointIPtr&) const; + virtual TransceiverPtr transceiver() const; virtual std::vector<ConnectorPtr> connectors(Ice::EndpointSelectionType) const; virtual void connectors_async(Ice::EndpointSelectionType, const EndpointI_connectorsPtr&) const; - virtual AcceptorPtr acceptor(EndpointIPtr&, const std::string&) const; + virtual AcceptorPtr acceptor(const std::string&) const; + virtual EndpointIPtr endpoint(const TransceiverPtr&) const; + virtual EndpointIPtr endpoint(const AcceptorPtr&) const; virtual std::vector<EndpointIPtr> expand() const; virtual bool equivalent(const EndpointIPtr&) const; virtual ::Ice::Int hash() const; virtual std::string options() const; + EndpointIPtr delegate() const; + virtual bool operator==(const Ice::LocalObject&) const; virtual bool operator<(const Ice::LocalObject&) const; diff --git a/cpp/src/Ice/WSTransceiver.cpp b/cpp/src/Ice/WSTransceiver.cpp index cb05c34d75b..441ef688abb 100644 --- a/cpp/src/Ice/WSTransceiver.cpp +++ b/cpp/src/Ice/WSTransceiver.cpp @@ -780,6 +780,12 @@ IceInternal::WSTransceiver::toString() const return _delegate->toString(); } +string +IceInternal::WSTransceiver::toDetailedString() const +{ + return _delegate->toDetailedString(); +} + Ice::ConnectionInfoPtr IceInternal::WSTransceiver::getInfo() const { diff --git a/cpp/src/Ice/WSTransceiver.h b/cpp/src/Ice/WSTransceiver.h index ea4f714d87c..655dbdb7023 100644 --- a/cpp/src/Ice/WSTransceiver.h +++ b/cpp/src/Ice/WSTransceiver.h @@ -48,6 +48,7 @@ public: #endif virtual std::string protocol() const; virtual std::string toString() const; + virtual std::string toDetailedString() const; virtual Ice::ConnectionInfoPtr getInfo() const; virtual void checkSendSize(const Buffer&, size_t); diff --git a/cpp/src/Ice/winrt/StreamAcceptor.cpp b/cpp/src/Ice/winrt/StreamAcceptor.cpp index f7bf238c159..a30a7f4fae6 100644 --- a/cpp/src/Ice/winrt/StreamAcceptor.cpp +++ b/cpp/src/Ice/winrt/StreamAcceptor.cpp @@ -10,6 +10,7 @@ #include <Ice/winrt/StreamAcceptor.h> #include <Ice/winrt/StreamTransceiver.h> +#include <Ice/EndpointI.h> #include <Ice/ProtocolInstance.h> #include <Ice/LocalException.h> #include <Ice/LoggerUtil.h> @@ -42,13 +43,6 @@ IceInternal::StreamAcceptor::setCompletedHandler(SocketOperationCompletedHandler void IceInternal::StreamAcceptor::close() { - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "stopping to accept " << _instance->protocol() << " connections at " << toString(); - } - - IceUtil::Mutex::Lock lock(_mutex); if(_acceptPending) { @@ -70,22 +64,11 @@ IceInternal::StreamAcceptor::close() closeSocket(fd); } -void -IceInternal::StreamAcceptor::listen() +EndpointIPtr +IceInternal::StreamAcceptor::listen(const EndpointIPtr& endp) { - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "accepting " << _instance->protocol() << " connections at " << toString(); - - vector<string> interfaces = - getHostsForEndpointExpand(inetAddrToString(_addr), _instance->protocolSupport(), true); - if(!interfaces.empty()) - { - out << "\nlocal interfaces: "; - out << IceUtilInternal::joinString(interfaces, ", "); - } - } + const_cast<Address&>(_addr) = doBind(_fd, _addr); + return endp->endpoint(this); } void @@ -140,11 +123,6 @@ IceInternal::StreamAcceptor::accept() _accepted.pop_front(); } - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "accepted " << _instance->protocol() << " connection\n" << fdToString(fd); - } return new StreamTransceiver(_instance, fd, true); } @@ -160,6 +138,20 @@ IceInternal::StreamAcceptor::toString() const return addrToString(_addr); } +string +IceInternal::StreamAcceptor::toDetailedString() const +{ + ostringstream os; + os << "local address = " << toString(); + vector<string> intfs = getHostsForEndpointExpand(inetAddrToString(_addr), _instance->protocolSupport(), true); + if(!intfs.empty()) + { + os << "\nlocal interfaces = "; + os << IceUtilInternal::joinString(intfs, ", "); + } + return os.str(); +} + int IceInternal::StreamAcceptor::effectivePort() const { @@ -179,13 +171,6 @@ IceInternal::StreamAcceptor::StreamAcceptor(const ProtocolInstancePtr& instance, { queueAcceptedSocket(args->Socket); }); - - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "attempting to bind to " << _instance->protocol() << " socket " << toString(); - } - const_cast<Address&>(_addr) = doBind(_fd, _addr); } IceInternal::StreamAcceptor::~StreamAcceptor() diff --git a/cpp/src/Ice/winrt/StreamAcceptor.h b/cpp/src/Ice/winrt/StreamAcceptor.h index bdb233eda1e..f798ceee7f5 100644 --- a/cpp/src/Ice/winrt/StreamAcceptor.h +++ b/cpp/src/Ice/winrt/StreamAcceptor.h @@ -32,7 +32,7 @@ public: virtual void setCompletedHandler(SocketOperationCompletedHandler^); virtual void close(); - virtual void listen(); + virtual EndpointIPtr listen(const EndpointIPtr&); virtual void startAccept(); virtual void finishAccept(); @@ -40,6 +40,7 @@ public: virtual TransceiverPtr accept(); virtual std::string protocol() const; virtual std::string toString() const; + virtual std::string toDetailedString() const; int effectivePort() const; diff --git a/cpp/src/Ice/winrt/StreamConnector.cpp b/cpp/src/Ice/winrt/StreamConnector.cpp index 546574397b1..2f9be7b274e 100644 --- a/cpp/src/Ice/winrt/StreamConnector.cpp +++ b/cpp/src/Ice/winrt/StreamConnector.cpp @@ -23,27 +23,9 @@ using namespace IceInternal; TransceiverPtr IceInternal::StreamConnector::connect() { - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "trying to establish " << _instance->protocol() << " connection to " << toString(); - } - - try - { - TransceiverPtr transceiver = new StreamTransceiver(_instance, createSocket(false, _addr), false); - dynamic_cast<StreamTransceiver*>(transceiver.get())->connect(_addr); - return transceiver; - } - catch(const Ice::LocalException& ex) - { - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "failed to establish " << _instance->protocol() << " connection to " << toString() << "\n" << ex; - } - throw; - } + TransceiverPtr transceiver = new StreamTransceiver(_instance, createSocket(false, _addr), false); + dynamic_cast<StreamTransceiver*>(transceiver.get())->connect(_addr); + return transceiver; } Short @@ -76,12 +58,12 @@ IceInternal::StreamConnector::operator==(const Connector& r) const { return false; } - + if(_timeout != p->_timeout) { return false; } - + if(_connectionId != p->_connectionId) { return false; @@ -122,7 +104,7 @@ IceInternal::StreamConnector::operator<(const Connector& r) const { return false; } - + if(_connectionId < p->_connectionId) { return true; @@ -134,7 +116,7 @@ IceInternal::StreamConnector::operator<(const Connector& r) const return compareAddress(_addr, p->_addr) < 0; } -IceInternal::StreamConnector::StreamConnector(const ProtocolInstancePtr& instance, const Address& addr, +IceInternal::StreamConnector::StreamConnector(const ProtocolInstancePtr& instance, const Address& addr, Ice::Int timeout, const string& connectionId) : _instance(instance), _addr(addr), diff --git a/cpp/src/Ice/winrt/StreamEndpointI.cpp b/cpp/src/Ice/winrt/StreamEndpointI.cpp index 961f8859604..927e4a3c063 100644 --- a/cpp/src/Ice/winrt/StreamEndpointI.cpp +++ b/cpp/src/Ice/winrt/StreamEndpointI.cpp @@ -165,18 +165,28 @@ IceInternal::StreamEndpointI::secure() const } TransceiverPtr -IceInternal::StreamEndpointI::transceiver(EndpointIPtr& endp) const +IceInternal::StreamEndpointI::transceiver() const { - endp = const_cast<StreamEndpointI*>(this); return 0; } AcceptorPtr -IceInternal::StreamEndpointI::acceptor(EndpointIPtr& endp, const string&) const +IceInternal::StreamEndpointI::acceptor(const string&) const { - StreamAcceptor* p = new StreamAcceptor(_instance, _host, _port); - endp = createEndpoint(_host, p->effectivePort(), _connectionId); - return p; + return new StreamAcceptor(_instance, _host, _port); +} + +EndpointIPtr +IceInternal::StreamEndpointI::endpoint(const TransceiverPtr&) const +{ + return const_cast<StreamEndpointI*>(this); +} + +EndpointIPtr +IceInternal::StreamEndpointI::endpoint(const AcceptorPtr& acceptor) const +{ + StreamAcceptor* p = dynamic_cast<StreamAcceptor*>(acceptor.get()); + return createEndpoint(_host, p->effectivePort(), _connectionId); } string diff --git a/cpp/src/Ice/winrt/StreamEndpointI.h b/cpp/src/Ice/winrt/StreamEndpointI.h index 58de59fd410..6fc35c8f9e9 100644 --- a/cpp/src/Ice/winrt/StreamEndpointI.h +++ b/cpp/src/Ice/winrt/StreamEndpointI.h @@ -36,8 +36,10 @@ public: virtual bool datagram() const; virtual bool secure() const; - virtual TransceiverPtr transceiver(EndpointIPtr&) const; - virtual AcceptorPtr acceptor(EndpointIPtr&, const std::string&) const; + virtual TransceiverPtr transceiver() const; + virtual AcceptorPtr acceptor(const std::string&) const; + virtual EndpointIPtr endpoint(const TransceiverPtr&) const; + virtual EndpointIPtr endpoint(const AcceptorPtr&) const; virtual std::string options() const; virtual bool operator==(const Ice::LocalObject&) const; diff --git a/cpp/src/Ice/winrt/StreamTransceiver.cpp b/cpp/src/Ice/winrt/StreamTransceiver.cpp index b76421465ef..50edac7ed02 100644 --- a/cpp/src/Ice/winrt/StreamTransceiver.cpp +++ b/cpp/src/Ice/winrt/StreamTransceiver.cpp @@ -10,7 +10,6 @@ #include <Ice/winrt/StreamTransceiver.h> #include <Ice/Connection.h> #include <Ice/ProtocolInstance.h> -#include <Ice/TraceLevels.h> #include <Ice/LoggerUtil.h> #include <Ice/Buffer.h> #include <Ice/LocalException.h> @@ -61,7 +60,7 @@ IceInternal::StreamTransceiver::getNativeInfo() return this; } -void +void IceInternal::StreamTransceiver::setCompletedHandler(SocketOperationCompletedHandler^ handler) { _completedHandler = handler; @@ -79,32 +78,12 @@ IceInternal::StreamTransceiver::initialize(Buffer&, Buffer&,bool&) } else if(_state <= StateConnectPending) { - try - { - if(_write.count == SOCKET_ERROR) - { - checkConnectErrorCode(__FILE__, __LINE__, _write.error, _connectAddr.host); - } - _state = StateConnected; - _desc = fdToString(_fd); - } - catch(const Ice::LocalException& ex) - { - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "failed to establish " << _instance->protocol() << " connection\n"; - out << "local address: <not available>\n"; - out << "remote address: " << addrToString(_connectAddr) << "\n" << ex; - } - throw; - } - - if(_instance->traceLevel() >= 1) + if(_write.count == SOCKET_ERROR) { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "" << _instance->protocol() << " connection established\n" << _desc; + checkConnectErrorCode(__FILE__, __LINE__, _write.error, _connectAddr.host); } + _state = StateConnected; + _desc = fdToString(_fd); } assert(_state == StateConnected); return SocketOperationNone; @@ -121,12 +100,6 @@ IceInternal::StreamTransceiver::closing(bool initiator, const Ice::LocalExceptio void IceInternal::StreamTransceiver::close() { - if(_state == StateConnected && _instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "closing " << _instance->protocol() << " connection\n" << toString(); - } - assert(_fd != INVALID_SOCKET); try { @@ -162,14 +135,14 @@ IceInternal::StreamTransceiver::startWrite(Buffer& buf) IAsyncAction^ action = safe_cast<StreamSocket^>(_fd)->ConnectAsync( _connectAddr.host, _connectAddr.port, - _instance->type() == IceSSL::EndpointType ? + _instance->type() == IceSSL::EndpointType ? // // SocketProtectionLevel::Tls12 is new in Windows 8.1 SDK // #if defined(_MSC_VER) && _MSC_VER >= 1800 SocketProtectionLevel::Tls12 : #else - SocketProtectionLevel::Ssl : + SocketProtectionLevel::Ssl : #endif SocketProtectionLevel::PlainSocket); @@ -203,7 +176,7 @@ IceInternal::StreamTransceiver::startWrite(Buffer& buf) int packetSize = static_cast<int>(buf.b.end() - buf.i); if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) - { + { packetSize = _maxSendPacketSize; } assert(packetSize > 0); @@ -240,18 +213,6 @@ IceInternal::StreamTransceiver::finishWrite(Buffer& buf) checkErrorCode(__FILE__, __LINE__, _write.error); } - if(_instance->traceLevel() >= 3) - { - int packetSize = static_cast<int>(buf.b.end() - buf.i); - if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) - { - packetSize = _maxSendPacketSize; - } - Trace out(_instance->logger(), _instance->traceCategory()); - out << "sent " << _write.count << " of " << packetSize << " bytes via " << _instance->protocol() << "\n" - << toString(); - } - buf.i += _write.count; } @@ -308,18 +269,6 @@ IceInternal::StreamTransceiver::finishRead(Buffer& buf, bool& hasMoreData) checkErrorCode(__FILE__, __LINE__, ex->HResult); } - if(_instance->traceLevel() >= 3) - { - int packetSize = static_cast<int>(buf.b.end() - buf.i); - if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize) - { - packetSize = _maxReceivePacketSize; - } - Trace out(_instance->logger(), _instance->traceCategory()); - out << "received " << _read.count << " of " << packetSize << " bytes via " << _instance->protocol() << "\n" - << toString(); - } - buf.i += _read.count; } @@ -335,7 +284,13 @@ IceInternal::StreamTransceiver::toString() const return _desc; } -Ice::ConnectionInfoPtr +string +IceInternal::StreamTransceiver::toDetailedString() const +{ + return toString(); +} + +Ice::ConnectionInfoPtr IceInternal::StreamTransceiver::getInfo() const { Ice::IPConnectionInfoPtr info; @@ -422,7 +377,7 @@ IceInternal::StreamTransceiver::checkIfErrorOrCompleted(SocketOperation op, IAsy { checkConnectErrorCode(__FILE__, __LINE__, info->ErrorCode.Value, _connectAddr.host); } - else + else { checkErrorCode(__FILE__, __LINE__, info->ErrorCode.Value); } diff --git a/cpp/src/Ice/winrt/StreamTransceiver.h b/cpp/src/Ice/winrt/StreamTransceiver.h index c37c6eb8f54..e7c63589e5f 100644 --- a/cpp/src/Ice/winrt/StreamTransceiver.h +++ b/cpp/src/Ice/winrt/StreamTransceiver.h @@ -33,7 +33,7 @@ public: virtual NativeInfoPtr getNativeInfo(); virtual void setCompletedHandler(SocketOperationCompletedHandler^); - + virtual SocketOperation initialize(Buffer&, Buffer&, bool&); virtual SocketOperation closing(bool, const Ice::LocalException&); virtual void close(); @@ -47,6 +47,7 @@ public: virtual std::string protocol() const; virtual std::string toString() const; + virtual std::string toDetailedString() const; virtual Ice::ConnectionInfoPtr getInfo() const; virtual void checkSendSize(const Buffer&, size_t); @@ -61,9 +62,9 @@ private: friend class StreamConnector; friend class StreamAcceptor; - + const ProtocolInstancePtr _instance; - + State _state; std::string _desc; Address _connectAddr; diff --git a/cpp/src/IceSSL/AcceptorI.cpp b/cpp/src/IceSSL/AcceptorI.cpp index 8a7fb3ca187..f55f1b2b2f4 100644 --- a/cpp/src/IceSSL/AcceptorI.cpp +++ b/cpp/src/IceSSL/AcceptorI.cpp @@ -16,6 +16,7 @@ #include <IceSSL/Util.h> +#include <Ice/EndpointI.h> #include <Ice/Communicator.h> #include <Ice/Exception.h> #include <Ice/LocalException.h> @@ -54,20 +55,16 @@ IceSSL::AcceptorI::getAsyncInfo(IceInternal::SocketOperation) void IceSSL::AcceptorI::close() { - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "stopping to accept " << _instance->protocol() << " connections at " << toString(); - } - SOCKET fd = _fd; _fd = INVALID_SOCKET; IceInternal::closeSocket(fd); } -void -IceSSL::AcceptorI::listen() +IceInternal::EndpointIPtr +IceSSL::AcceptorI::listen(const IceInternal::EndpointIPtr& endp) { + const_cast<IceInternal::Address&>(_addr) = IceInternal::doBind(_fd, _addr); + try { IceInternal::doListen(_fd, _backlog); @@ -78,20 +75,7 @@ IceSSL::AcceptorI::listen() throw; } - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "listening for " << _instance->protocol() << " connections at " << toString(); - - vector<string> interfaces = - IceInternal::getHostsForEndpointExpand(IceInternal::inetAddrToString(_addr), _instance->protocolSupport(), - true); - if(!interfaces.empty()) - { - out << "\nlocal interfaces: "; - out << IceUtilInternal::joinString(interfaces, ", "); - } - } + return endp->endpoint(this); } #ifdef ICE_USE_IOCP @@ -101,20 +85,20 @@ IceSSL::AcceptorI::startAccept() LPFN_ACCEPTEX AcceptEx = NULL; // a pointer to the 'AcceptEx()' function GUID GuidAcceptEx = WSAID_ACCEPTEX; // The Guid DWORD dwBytes; - if(WSAIoctl(_fd, + if(WSAIoctl(_fd, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof(GuidAcceptEx), &AcceptEx, sizeof(AcceptEx), &dwBytes, - NULL, + NULL, NULL) == SOCKET_ERROR) { SocketException ex(__FILE__, __LINE__); ex.error = IceInternal::getSocketErrno(); throw ex; - } + } assert(_acceptFd == INVALID_SOCKET); _acceptFd = IceInternal::createSocket(false, _addr); @@ -130,7 +114,7 @@ IceSSL::AcceptorI::startAccept() } } -void +void IceSSL::AcceptorI::finishAccept() { if(static_cast<int>(_info.count) == SOCKET_ERROR || _fd == INVALID_SOCKET) @@ -162,10 +146,10 @@ IceSSL::AcceptorI::accept() { SocketException ex(__FILE__, __LINE__); ex.error = _acceptError; - throw ex; + throw ex; } - if(setsockopt(_acceptFd, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&_acceptFd, sizeof(_acceptFd)) == + if(setsockopt(_acceptFd, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&_acceptFd, sizeof(_acceptFd)) == SOCKET_ERROR) { IceInternal::closeSocketNoThrow(_acceptFd); @@ -179,12 +163,6 @@ IceSSL::AcceptorI::accept() _acceptFd = INVALID_SOCKET; #endif - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "attempting to accept " << _instance->protocol() << " connection\n" << IceInternal::fdToString(fd); - } - // // SSL handshaking is performed in TransceiverI::initialize, since // accept must not block. @@ -204,6 +182,20 @@ IceSSL::AcceptorI::toString() const return IceInternal::addrToString(_addr); } +string +IceSSL::AcceptorI::toDetailedString() const +{ + ostringstream os; + os << "local address = " << toString(); + vector<string> intfs = getHostsForEndpointExpand(inetAddrToString(_addr), _instance->protocolSupport(), true); + if(!intfs.empty()) + { + os << "\nlocal interfaces = "; + os << IceUtilInternal::joinString(intfs, ", "); + } + return os.str(); +} + int IceSSL::AcceptorI::effectivePort() const { @@ -254,12 +246,6 @@ IceSSL::AcceptorI::AcceptorI(const InstancePtr& instance, const string& adapterN // IceInternal::setReuseAddress(_fd, true); #endif - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "attempting to bind to " << _instance->protocol() << " socket " << toString(); - } - const_cast<IceInternal::Address&>(_addr) = IceInternal::doBind(_fd, _addr); } IceSSL::AcceptorI::~AcceptorI() diff --git a/cpp/src/IceSSL/AcceptorI.h b/cpp/src/IceSSL/AcceptorI.h index 6496ee27bb3..e86e73897d6 100644 --- a/cpp/src/IceSSL/AcceptorI.h +++ b/cpp/src/IceSSL/AcceptorI.h @@ -32,7 +32,7 @@ public: #endif virtual void close(); - virtual void listen(); + virtual IceInternal::EndpointIPtr listen(const IceInternal::EndpointIPtr&); #ifdef ICE_USE_IOCP virtual void startAccept(); virtual void finishAccept(); @@ -40,6 +40,7 @@ public: virtual IceInternal::TransceiverPtr accept(); virtual std::string protocol() const; virtual std::string toString() const; + virtual std::string toDetailedString() const; int effectivePort() const; diff --git a/cpp/src/IceSSL/ConnectorI.cpp b/cpp/src/IceSSL/ConnectorI.cpp index b8cb1144b5b..e7914663088 100644 --- a/cpp/src/IceSSL/ConnectorI.cpp +++ b/cpp/src/IceSSL/ConnectorI.cpp @@ -36,26 +36,7 @@ IceSSL::ConnectorI::connect() ex.reason = "IceSSL: plug-in is not initialized"; throw ex; } - - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "trying to establish " << _instance->protocol() << " connection to " << toString(); - } - - try - { - return new TransceiverI(_instance, IceInternal::createSocket(false, _addr), _proxy, _host, _addr, _sourceAddr); - } - catch(const Ice::LocalException& ex) - { - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "failed to establish " << _instance->protocol() << " connection to " << toString() << "\n" << ex; - } - throw; - } + return new TransceiverI(_instance, IceInternal::createSocket(false, _addr), _proxy, _host, _addr, _sourceAddr); } Short diff --git a/cpp/src/IceSSL/EndpointI.cpp b/cpp/src/IceSSL/EndpointI.cpp index aab9bb2efc6..cf395733097 100644 --- a/cpp/src/IceSSL/EndpointI.cpp +++ b/cpp/src/IceSSL/EndpointI.cpp @@ -138,18 +138,28 @@ IceSSL::EndpointI::secure() const } IceInternal::TransceiverPtr -IceSSL::EndpointI::transceiver(IceInternal::EndpointIPtr& endp) const +IceSSL::EndpointI::transceiver() const { - endp = const_cast<EndpointI*>(this); return 0; } IceInternal::AcceptorPtr -IceSSL::EndpointI::acceptor(IceInternal::EndpointIPtr& endp, const string& adapterName) const +IceSSL::EndpointI::acceptor(const string& adapterName) const { - AcceptorI* p = new AcceptorI(_instance, adapterName, _host, _port); - endp = new EndpointI(_instance, _host, p->effectivePort(), _sourceAddr, _timeout, _connectionId, _compress); - return p; + return new AcceptorI(_instance, adapterName, _host, _port); +} + +IceInternal::EndpointIPtr +IceSSL::EndpointI::endpoint(const IceInternal::TransceiverPtr& transceiver) const +{ + return const_cast<EndpointI*>(this); +} + +IceInternal::EndpointIPtr +IceSSL::EndpointI::endpoint(const IceInternal::AcceptorPtr& acceptor) const +{ + AcceptorI* p = dynamic_cast<AcceptorI*>(acceptor.get()); + return new EndpointI(_instance, _host, p->effectivePort(), _sourceAddr, _timeout, _connectionId, _compress); } string diff --git a/cpp/src/IceSSL/EndpointI.h b/cpp/src/IceSSL/EndpointI.h index a760e8d3ae8..1d69bba838a 100644 --- a/cpp/src/IceSSL/EndpointI.h +++ b/cpp/src/IceSSL/EndpointI.h @@ -37,8 +37,10 @@ public: virtual bool datagram() const; virtual bool secure() const; - virtual IceInternal::TransceiverPtr transceiver(IceInternal::EndpointIPtr&) const; - virtual IceInternal::AcceptorPtr acceptor(IceInternal::EndpointIPtr&, const std::string&) const; + virtual IceInternal::TransceiverPtr transceiver() const; + virtual IceInternal::AcceptorPtr acceptor(const std::string&) const; + virtual IceInternal::EndpointIPtr endpoint(const IceInternal::TransceiverPtr&) const; + virtual IceInternal::EndpointIPtr endpoint(const IceInternal::AcceptorPtr&) const; virtual std::string options() const; virtual bool operator==(const Ice::LocalObject&) const; diff --git a/cpp/src/IceSSL/OpenSSLTransceiverI.cpp b/cpp/src/IceSSL/OpenSSLTransceiverI.cpp index a8341206ebf..8df471c87eb 100644 --- a/cpp/src/IceSSL/OpenSSLTransceiverI.cpp +++ b/cpp/src/IceSSL/OpenSSLTransceiverI.cpp @@ -74,275 +74,242 @@ IceSSL::TransceiverI::getNativeInfo() IceInternal::SocketOperation IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool&) { - try + if(_state == StateNeedConnect) { - if(_state == StateNeedConnect) - { - _state = StateConnectPending; - return IceInternal::SocketOperationConnect; - } - else if(_state <= StateConnectPending) - { - IceInternal::doFinishConnect(_fd); + _state = StateConnectPending; + return IceInternal::SocketOperationConnect; + } + else if(_state <= StateConnectPending) + { + IceInternal::doFinishConnect(_fd); - _desc = IceInternal::fdToString(_fd, _proxy, _addr, true); + _desc = IceInternal::fdToString(_fd, _proxy, _addr, true); + + if(_proxy) + { + // + // Prepare the read & write buffers in advance. + // + _proxy->beginWriteConnectRequest(_addr, writeBuffer); + _proxy->beginReadConnectRequestResponse(readBuffer); - if(_proxy) + // + // Write the proxy connection message using TCP. + // + if(writeRaw(writeBuffer)) { // - // Prepare the read & write buffers in advance. + // Write completed without blocking. // - _proxy->beginWriteConnectRequest(_addr, writeBuffer); - _proxy->beginReadConnectRequestResponse(readBuffer); + _proxy->endWriteConnectRequest(writeBuffer); // - // Write the proxy connection message using TCP. + // Try to read the response using TCP. // - if(writeRaw(writeBuffer)) + if(readRaw(readBuffer)) { // - // Write completed without blocking. - // - _proxy->endWriteConnectRequest(writeBuffer); - - // - // Try to read the response using TCP. + // Read completed without blocking - fall through. // - if(readRaw(readBuffer)) - { - // - // Read completed without blocking - fall through. - // - _proxy->endReadConnectRequestResponse(readBuffer); - } - else - { - // - // Return SocketOperationRead to indicate we need to complete the read. - // - _state = StateProxyConnectRequestPending; // Wait for proxy response - return IceInternal::SocketOperationRead; - } + _proxy->endReadConnectRequestResponse(readBuffer); } else { // - // Return SocketOperationWrite to indicate we need to complete the write. + // Return SocketOperationRead to indicate we need to complete the read. // - _state = StateProxyConnectRequest; // Send proxy connect request - return IceInternal::SocketOperationWrite; + _state = StateProxyConnectRequestPending; // Wait for proxy response + return IceInternal::SocketOperationRead; } } - - _state = StateConnected; - } - else if(_state == StateProxyConnectRequest) - { - // - // Write completed. - // - _proxy->endWriteConnectRequest(writeBuffer); - _state = StateProxyConnectRequestPending; // Wait for proxy response - return IceInternal::SocketOperationRead; + else + { + // + // Return SocketOperationWrite to indicate we need to complete the write. + // + _state = StateProxyConnectRequest; // Send proxy connect request + return IceInternal::SocketOperationWrite; + } } - else if(_state == StateProxyConnectRequestPending) + + _state = StateConnected; + } + else if(_state == StateProxyConnectRequest) + { + // + // Write completed. + // + _proxy->endWriteConnectRequest(writeBuffer); + _state = StateProxyConnectRequestPending; // Wait for proxy response + return IceInternal::SocketOperationRead; + } + else if(_state == StateProxyConnectRequestPending) + { + // + // Read completed. + // + _proxy->endReadConnectRequestResponse(readBuffer); + _state = StateConnected; + } + + assert(_state == StateConnected); + + if(!_ssl) + { + // + // This static_cast is necessary due to 64bit windows. There SOCKET is a non-int type. + // + BIO* bio = BIO_new_socket(static_cast<int>(_fd), 0); + if(!bio) { - // - // Read completed. - // - _proxy->endReadConnectRequestResponse(readBuffer); - _state = StateConnected; + SecurityException ex(__FILE__, __LINE__); + ex.reason = "openssl failure"; + throw ex; } - assert(_state == StateConnected); - + _ssl = SSL_new(_engine->context()); if(!_ssl) { - // - // This static_cast is necessary due to 64bit windows. There SOCKET is a non-int type. - // - BIO* bio = BIO_new_socket(static_cast<int>(_fd), 0); - if(!bio) - { - SecurityException ex(__FILE__, __LINE__); - ex.reason = "openssl failure"; - throw ex; - } - - _ssl = SSL_new(_engine->context()); - if(!_ssl) - { - BIO_free(bio); - SecurityException ex(__FILE__, __LINE__); - ex.reason = "openssl failure"; - throw ex; - } - SSL_set_bio(_ssl, bio, bio); + BIO_free(bio); + SecurityException ex(__FILE__, __LINE__); + ex.reason = "openssl failure"; + throw ex; } + SSL_set_bio(_ssl, bio, bio); + } - while(!SSL_is_init_finished(_ssl)) - { - // - // Only one thread calls initialize(), so synchronization is not necessary here. - // + while(!SSL_is_init_finished(_ssl)) + { + // + // Only one thread calls initialize(), so synchronization is not necessary here. + // - // - // BUGFIX: an openssl bug that affects OpensSSL < 1.0.0k - // could cause a deadlock when decoding public keys. - // - // See: http://cvs.openssl.org/chngview?cn=22569 - // + // + // BUGFIX: an openssl bug that affects OpensSSL < 1.0.0k + // could cause a deadlock when decoding public keys. + // + // See: http://cvs.openssl.org/chngview?cn=22569 + // #if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < 0x100000bfL - IceUtilInternal::MutexPtrLock<IceUtil::Mutex> sync(sslMutex); + IceUtilInternal::MutexPtrLock<IceUtil::Mutex> sync(sslMutex); #endif - int ret = _incoming ? SSL_accept(_ssl) : SSL_connect(_ssl); + int ret = _incoming ? SSL_accept(_ssl) : SSL_connect(_ssl); #if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < 0x100000bfL - sync.release(); + sync.release(); #endif - if(ret <= 0) + if(ret <= 0) + { + switch(SSL_get_error(_ssl, ret)) { - switch(SSL_get_error(_ssl, ret)) - { - case SSL_ERROR_NONE: - assert(SSL_is_init_finished(_ssl)); - break; - case SSL_ERROR_ZERO_RETURN: + case SSL_ERROR_NONE: + assert(SSL_is_init_finished(_ssl)); + break; + case SSL_ERROR_ZERO_RETURN: + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + case SSL_ERROR_WANT_READ: + { + return IceInternal::SocketOperationRead; + } + case SSL_ERROR_WANT_WRITE: + { + return IceInternal::SocketOperationWrite; + } + case SSL_ERROR_SYSCALL: + { + if(ret == 0) { ConnectionLostException ex(__FILE__, __LINE__); - ex.error = IceInternal::getSocketErrno(); + ex.error = 0; throw ex; } - case SSL_ERROR_WANT_READ: - { - return IceInternal::SocketOperationRead; - } - case SSL_ERROR_WANT_WRITE: - { - return IceInternal::SocketOperationWrite; - } - case SSL_ERROR_SYSCALL: + + if(ret == -1) { - if(ret == 0) + if(IceInternal::interrupted()) { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = 0; - throw ex; + break; } - if(ret == -1) + if(IceInternal::wouldBlock()) { - if(IceInternal::interrupted()) + if(SSL_want_read(_ssl)) { - break; + return IceInternal::SocketOperationRead; } - - if(IceInternal::wouldBlock()) + else if(SSL_want_write(_ssl)) { - if(SSL_want_read(_ssl)) - { - return IceInternal::SocketOperationRead; - } - else if(SSL_want_write(_ssl)) - { - return IceInternal::SocketOperationWrite; - } - - break; + return IceInternal::SocketOperationWrite; } - if(IceInternal::connectionLost()) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = IceInternal::getSocketErrno(); - throw ex; - } + break; } - SocketException ex(__FILE__, __LINE__); - ex.error = IceInternal::getSocketErrno(); - throw ex; - } - case SSL_ERROR_SSL: - { - IceInternal::Address remoteAddr; - string desc = "<not available>"; - if(IceInternal::fdToRemoteAddress(_fd, remoteAddr)) + + if(IceInternal::connectionLost()) { - desc = IceInternal::addrToString(remoteAddr); + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; } - ostringstream ostr; - ostr << "SSL error occurred for new " << (_incoming ? "incoming" : "outgoing") - << " connection:\nremote address = " << desc << "\n" << _engine->sslErrors(); - ProtocolException ex(__FILE__, __LINE__); - ex.reason = ostr.str(); - throw ex; - } } + SocketException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; } - } - - long result = SSL_get_verify_result(_ssl); - if(result != X509_V_OK) - { - if(_engine->getVerifyPeer() == 0) + case SSL_ERROR_SSL: { - if(_engine->securityTraceLevel() >= 1) + IceInternal::Address remoteAddr; + string desc = "<not available>"; + if(IceInternal::fdToRemoteAddress(_fd, remoteAddr)) { - ostringstream ostr; - ostr << "IceSSL: ignoring certificate verification failure:\n" - << X509_verify_cert_error_string(result); - _instance->logger()->trace(_instance->traceCategory(), ostr.str()); + desc = IceInternal::addrToString(remoteAddr); } - } - else - { ostringstream ostr; - ostr << "IceSSL: certificate verification failed:\n" << X509_verify_cert_error_string(result); - string msg = ostr.str(); - if(_engine->securityTraceLevel() >= 1) - { - _instance->logger()->trace(_instance->traceCategory(), msg); - } - SecurityException ex(__FILE__, __LINE__); - ex.reason = msg; + ostr << "SSL error occurred for new " << (_incoming ? "incoming" : "outgoing") + << " connection:\nremote address = " << desc << "\n" << _engine->sslErrors(); + ProtocolException ex(__FILE__, __LINE__); + ex.reason = ostr.str(); throw ex; } - } - _engine->verifyPeer(_fd, _host, getNativeConnectionInfo()); - _state = StateHandshakeComplete; - } - catch(const Ice::LocalException& ex) - { - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "failed to establish " << _instance->protocol() << " connection\n"; - if(_incoming) - { - out << IceInternal::fdToString(_fd) << "\n" << ex; - } - else - { - out << IceInternal::fdToString(_fd, _proxy, _addr, false) << "\n" << ex; } } - throw; } - if(_instance->traceLevel() >= 1) + long result = SSL_get_verify_result(_ssl); + if(result != X509_V_OK) { - Trace out(_instance->logger(), _instance->traceCategory()); - if(_incoming) + if(_engine->getVerifyPeer() == 0) { - out << "accepted " << _instance->protocol() << " connection\n" << _desc; + if(_engine->securityTraceLevel() >= 1) + { + ostringstream ostr; + ostr << "IceSSL: ignoring certificate verification failure:\n" + << X509_verify_cert_error_string(result); + _instance->logger()->trace(_instance->traceCategory(), ostr.str()); + } } else { - out << _instance->protocol() << " connection established\n" << _desc; + ostringstream ostr; + ostr << "IceSSL: certificate verification failed:\n" << X509_verify_cert_error_string(result); + string msg = ostr.str(); + if(_engine->securityTraceLevel() >= 1) + { + _instance->logger()->trace(_instance->traceCategory(), msg); + } + SecurityException ex(__FILE__, __LINE__); + ex.reason = msg; + throw ex; } } + _engine->verifyPeer(_fd, _host, getNativeConnectionInfo()); + _state = StateHandshakeComplete; if(_engine->securityTraceLevel() >= 1) { @@ -706,6 +673,12 @@ IceSSL::TransceiverI::toString() const return _desc; } +string +IceSSL::TransceiverI::toDetailedString() const +{ + return toString(); +} + Ice::ConnectionInfoPtr IceSSL::TransceiverI::getInfo() const { diff --git a/cpp/src/IceSSL/SChannelTransceiverI.cpp b/cpp/src/IceSSL/SChannelTransceiverI.cpp index 5905077dd2f..8b3867d571e 100644 --- a/cpp/src/IceSSL/SChannelTransceiverI.cpp +++ b/cpp/src/IceSSL/SChannelTransceiverI.cpp @@ -637,180 +637,147 @@ IceSSL::TransceiverI::encryptMessage(IceInternal::Buffer& buffer) IceInternal::SocketOperation IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool& hasMoreData) { - try + if(_state == StateNeedConnect) { - if(_state == StateNeedConnect) - { - _state = StateConnectPending; - return IceInternal::SocketOperationConnect; - } - else if(_state <= StateConnectPending) - { - IceInternal::doFinishConnectAsync(_fd, _write); - - _desc = IceInternal::fdToString(_fd, _proxy, _addr, true); + _state = StateConnectPending; + return IceInternal::SocketOperationConnect; + } + else if(_state <= StateConnectPending) + { + IceInternal::doFinishConnectAsync(_fd, _write); - if(_proxy) - { - // - // Prepare the read & write buffers in advance. - // - _proxy->beginWriteConnectRequest(_addr, writeBuffer); - _proxy->beginReadConnectRequestResponse(readBuffer); - - // - // Return SocketOperationWrite to indicate we need to start a write. - // - _state = StateProxyConnectRequest; // Send proxy connect request - return IceInternal::SocketOperationWrite; - } + _desc = IceInternal::fdToString(_fd, _proxy, _addr, true); - _state = StateConnected; - } - else if(_state == StateProxyConnectRequest) + if(_proxy) { // - // Write completed. + // Prepare the read & write buffers in advance. // - _proxy->endWriteConnectRequest(writeBuffer); - _state = StateProxyConnectRequestPending; // Wait for proxy response - return IceInternal::SocketOperationRead; - } - else if(_state == StateProxyConnectRequestPending) - { + _proxy->beginWriteConnectRequest(_addr, writeBuffer); + _proxy->beginReadConnectRequestResponse(readBuffer); + // - // Read completed. + // Return SocketOperationWrite to indicate we need to start a write. // - _proxy->endReadConnectRequestResponse(readBuffer); - _state = StateConnected; + _state = StateProxyConnectRequest; // Send proxy connect request + return IceInternal::SocketOperationWrite; } - assert(_state >= StateConnected && _state <= StateHandshakeWriteContinue); + _state = StateConnected; + } + else if(_state == StateProxyConnectRequest) + { + // + // Write completed. + // + _proxy->endWriteConnectRequest(writeBuffer); + _state = StateProxyConnectRequestPending; // Wait for proxy response + return IceInternal::SocketOperationRead; + } + else if(_state == StateProxyConnectRequestPending) + { + // + // Read completed. + // + _proxy->endReadConnectRequestResponse(readBuffer); + _state = StateConnected; + } + + assert(_state >= StateConnected && _state <= StateHandshakeWriteContinue); - if(!_credentialsInitialized) - { - _readBuffer.b.resize(2048); - _readBuffer.i = _readBuffer.b.begin(); + if(!_credentialsInitialized) + { + _readBuffer.b.resize(2048); + _readBuffer.i = _readBuffer.b.begin(); - _credentials = _engine->newCredentialsHandle(_incoming); - _credentialsInitialized = true; - } + _credentials = _engine->newCredentialsHandle(_incoming); + _credentialsInitialized = true; + } + + IceInternal::SocketOperation op = sslHandshake(); + if(op != IceInternal::SocketOperationNone) + { + return op; + } - IceInternal::SocketOperation op = sslHandshake(); - if(op != IceInternal::SocketOperationNone) + if(!_incoming || _engine->getVerifyPeer() > 0) + { + // + // Build the peer certificate chain and verify it. + // + PCCERT_CONTEXT cert = 0; + SECURITY_STATUS err = QueryContextAttributes(&_ssl, SECPKG_ATTR_REMOTE_CERT_CONTEXT, &cert); + if(err && err != SEC_E_NO_CREDENTIALS) { - return op; + throw ProtocolException(__FILE__, __LINE__, "IceSSL: certificate verification failure:" + + IceUtilInternal::lastErrorToString()); } - if(!_incoming || _engine->getVerifyPeer() > 0) + if(!cert && (!_incoming || _engine->getVerifyPeer() == 2)) { - // - // Build the peer certificate chain and verify it. - // - PCCERT_CONTEXT cert = 0; - SECURITY_STATUS err = QueryContextAttributes(&_ssl, SECPKG_ATTR_REMOTE_CERT_CONTEXT, &cert); - if(err && err != SEC_E_NO_CREDENTIALS) + // Clients require server certificate if VerifyPeer>0 + // and servers require client certificate if + // VerifyPeer=2 + throw ProtocolException(__FILE__, __LINE__, "IceSSL: certificate required:" + + IceUtilInternal::lastErrorToString()); + } + else if(cert) // Verify the remote certificate + { + try { - throw ProtocolException(__FILE__, __LINE__, "IceSSL: certificate verification failure:" + - IceUtilInternal::lastErrorToString()); - } + CERT_CHAIN_PARA chainP; + memset(&chainP, 0, sizeof(chainP)); + chainP.cbSize = sizeof(chainP); - if(!cert && (!_incoming || _engine->getVerifyPeer() == 2)) - { - // Clients require server certificate if VerifyPeer>0 - // and servers require client certificate if - // VerifyPeer=2 - throw ProtocolException(__FILE__, __LINE__, "IceSSL: certificate required:" + - IceUtilInternal::lastErrorToString()); - } - else if(cert) // Verify the remote certificate - { - try + PCCERT_CHAIN_CONTEXT certChain; + if(!CertGetCertificateChain(_engine->chainEngine(), cert, 0, 0, &chainP, + CERT_CHAIN_REVOCATION_CHECK_CACHE_ONLY, 0, &certChain)) { - CERT_CHAIN_PARA chainP; - memset(&chainP, 0, sizeof(chainP)); - chainP.cbSize = sizeof(chainP); - - PCCERT_CHAIN_CONTEXT certChain; - if(!CertGetCertificateChain(_engine->chainEngine(), cert, 0, 0, &chainP, - CERT_CHAIN_REVOCATION_CHECK_CACHE_ONLY, 0, &certChain)) - { - CertFreeCertificateContext(cert); - throw IceUtilInternal::lastErrorToString(); - } + CertFreeCertificateContext(cert); + throw IceUtilInternal::lastErrorToString(); + } - CERT_SIMPLE_CHAIN* simpleChain = certChain->rgpChain[0]; + CERT_SIMPLE_CHAIN* simpleChain = certChain->rgpChain[0]; - string trustError; - if(simpleChain->TrustStatus.dwErrorStatus != CERT_TRUST_NO_ERROR) - { - trustError = trustStatusToString(certChain->TrustStatus.dwErrorStatus); - } + string trustError; + if(simpleChain->TrustStatus.dwErrorStatus != CERT_TRUST_NO_ERROR) + { + trustError = trustStatusToString(certChain->TrustStatus.dwErrorStatus); + } - CertFreeCertificateChain(certChain); - CertFreeCertificateContext(cert); - if(!trustError.empty()) - { - throw trustError; - } + CertFreeCertificateChain(certChain); + CertFreeCertificateContext(cert); + if(!trustError.empty()) + { + throw trustError; } - catch(const string& reason) + } + catch(const string& reason) + { + if(_engine->getVerifyPeer() == 0) { - if(_engine->getVerifyPeer() == 0) + if(_instance->traceLevel() >= 1) { - if(_instance->traceLevel() >= 1) - { - _instance->logger()->trace(_instance->traceCategory(), - "IceSSL: ignoring certificate verification failure\n" + reason); - } + _instance->logger()->trace(_instance->traceCategory(), + "IceSSL: ignoring certificate verification failure\n" + reason); } - else + } + else + { + ostringstream os; + os << "IceSSL: certificate verification failure\n" << reason; + string msg = os.str(); + if(_instance->traceLevel() >= 1) { - ostringstream os; - os << "IceSSL: certificate verification failure\n" << reason; - string msg = os.str(); - if(_instance->traceLevel() >= 1) - { - _instance->logger()->trace(_instance->traceCategory(), msg); - } - throw ProtocolException(__FILE__, __LINE__, msg); + _instance->logger()->trace(_instance->traceCategory(), msg); } + throw ProtocolException(__FILE__, __LINE__, msg); } } } - _engine->verifyPeer(_fd, _host, getNativeConnectionInfo()); - _state = StateHandshakeComplete; - } - catch(const Ice::LocalException& ex) - { - if(_instance->traceLevel() >= 2) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "failed to establish " << _instance->protocol() << " connection\n"; - if(_incoming) - { - out << IceInternal::fdToString(_fd) << "\n" << ex; - } - else - { - out << IceInternal::fdToString(_fd, _proxy, _addr, false) << "\n" << ex; - } - } - throw; - } - - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - if(_incoming) - { - out << "accepted " << _instance->protocol() << " connection\n" << _desc; - } - else - { - out << _instance->protocol() << " connection established\n" << _desc; - } } + _engine->verifyPeer(_fd, _host, getNativeConnectionInfo()); + _state = StateHandshakeComplete; if(_instance->engine()->securityTraceLevel() >= 1) { @@ -855,11 +822,6 @@ IceSSL::TransceiverI::closing(bool initiator, const Ice::LocalException&) void IceSSL::TransceiverI::close() { - if(_state == StateHandshakeComplete && _instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "closing " << _instance->protocol() << " connection\n" << toString(); - } if(_sslInitialized) { DeleteSecurityContext(&_ssl); @@ -907,13 +869,6 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf) assert(_writeBuffer.i == _writeBuffer.b.end()); // Finished writing the encrypted data - if(_instance->traceLevel() >= 3) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "sent " << _bufferedW << " of " << (buf.b.end() - buf.i) << " bytes via " << _instance->protocol() - << '\n' << toString(); - } - buf.i += _bufferedW; _bufferedW = 0; } @@ -947,12 +902,6 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool& hasMoreData) continue; } - if(_instance->traceLevel() >= 3) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "received " << decrypted << " of " << buf.b.end() - buf.i << " bytes via " << _instance->protocol() - << '\n' << toString(); - } buf.i += decrypted; } hasMoreData = !_readUnprocessed.b.empty() || _readBuffer.i != _readBuffer.b.begin(); @@ -1044,12 +993,6 @@ IceSSL::TransceiverI::finishWrite(IceInternal::Buffer& buf) _writeBuffer.i += _write.count; if(_writeBuffer.i == _writeBuffer.b.end()) { - if(_instance->traceLevel() >= 3) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "sent " << _bufferedW << " of " << (buf.b.end() - buf.i) << " bytes via " - << _instance->protocol() << '\n' << toString(); - } buf.i += _bufferedW; _bufferedW = 0; } @@ -1130,12 +1073,6 @@ IceSSL::TransceiverI::finishRead(IceInternal::Buffer& buf, bool& hasMoreData) size_t decrypted = decryptMessage(buf); if(decrypted > 0) { - if(_instance->traceLevel() >= 3) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "received " << decrypted << " of " << buf.b.end() - buf.i << " bytes via " - << _instance->protocol() << '\n' << toString(); - } buf.i += decrypted; hasMoreData = !_readUnprocessed.b.empty() || _readBuffer.i != _readBuffer.b.begin(); } @@ -1160,6 +1097,12 @@ IceSSL::TransceiverI::toString() const return _desc; } +string +IceSSL::TransceiverI::toDetailedString() const +{ + return toString(); +} + Ice::ConnectionInfoPtr IceSSL::TransceiverI::getInfo() const { @@ -1224,11 +1167,6 @@ IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const { _state = StateConnected; _desc = IceInternal::fdToString(_fd, _proxy, _addr, true); - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << _instance->protocol() << " connection established\n" << _desc; - } } else { diff --git a/cpp/src/IceSSL/SChannelTransceiverI.h b/cpp/src/IceSSL/SChannelTransceiverI.h index 85752b929a3..53f5d2765ae 100644 --- a/cpp/src/IceSSL/SChannelTransceiverI.h +++ b/cpp/src/IceSSL/SChannelTransceiverI.h @@ -71,6 +71,7 @@ public: virtual void finishRead(IceInternal::Buffer&, bool&); virtual std::string protocol() const; virtual std::string toString() const; + virtual std::string toDetailedString() const; virtual Ice::ConnectionInfoPtr getInfo() const; virtual void checkSendSize(const IceInternal::Buffer&, size_t); @@ -84,19 +85,19 @@ private: virtual NativeConnectionInfoPtr getNativeConnectionInfo() const; IceInternal::SocketOperation sslHandshake(); - + size_t decryptMessage(IceInternal::Buffer&); size_t encryptMessage(IceInternal::Buffer&); - + bool writeRaw(IceInternal::Buffer&); bool readRaw(IceInternal::Buffer&); - + friend class ConnectorI; friend class AcceptorI; const InstancePtr _instance; const SChannelEnginePtr _engine; - + const IceInternal::NetworkProxyPtr _proxy; const std::string _host; const IceInternal::Address _addr; @@ -107,13 +108,13 @@ private: State _state; std::string _desc; - + // // Buffered encrypted data that has not been written. // IceInternal::Buffer _writeBuffer; size_t _bufferedW; - + // // Buffered data that has not been decrypted. // diff --git a/cpp/src/IceSSL/SecureTransportTransceiverI.cpp b/cpp/src/IceSSL/SecureTransportTransceiverI.cpp index 3a74ee44e5f..9a8534e70e5 100644 --- a/cpp/src/IceSSL/SecureTransportTransceiverI.cpp +++ b/cpp/src/IceSSL/SecureTransportTransceiverI.cpp @@ -176,185 +176,152 @@ IceSSL::TransceiverI::getNativeInfo() IceInternal::SocketOperation IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool&) { - try + if(_state == StateNeedConnect) { - if(_state == StateNeedConnect) - { - _state = StateConnectPending; - return IceInternal::SocketOperationConnect; - } - else if(_state <= StateConnectPending) + _state = StateConnectPending; + return IceInternal::SocketOperationConnect; + } + else if(_state <= StateConnectPending) + { + IceInternal::doFinishConnect(_fd); + _desc = IceInternal::fdToString(_fd, _proxy, _addr, true); + + if(_proxy) { - IceInternal::doFinishConnect(_fd); - _desc = IceInternal::fdToString(_fd, _proxy, _addr, true); + // + // Prepare the read & write buffers in advance. + // + _proxy->beginWriteConnectRequest(_addr, writeBuffer); + _proxy->beginReadConnectRequestResponse(readBuffer); - if(_proxy) + // + // Write the proxy connection message using TCP. + // + if(writeRaw(writeBuffer)) { // - // Prepare the read & write buffers in advance. + // Write completed without blocking. // - _proxy->beginWriteConnectRequest(_addr, writeBuffer); - _proxy->beginReadConnectRequestResponse(readBuffer); + _proxy->endWriteConnectRequest(writeBuffer); // - // Write the proxy connection message using TCP. + // Try to read the response using TCP. // - if(writeRaw(writeBuffer)) + if(readRaw(readBuffer)) { // - // Write completed without blocking. + // Read completed without blocking - fall through. // - _proxy->endWriteConnectRequest(writeBuffer); - - // - // Try to read the response using TCP. - // - if(readRaw(readBuffer)) - { - // - // Read completed without blocking - fall through. - // - _proxy->endReadConnectRequestResponse(readBuffer); - } - else - { - // - // Return SocketOperationRead to indicate we need to complete the read. - // - _state = StateProxyConnectRequestPending; // Wait for proxy response - return IceInternal::SocketOperationRead; - } + _proxy->endReadConnectRequestResponse(readBuffer); } else { // - // Return SocketOperationWrite to indicate we need to complete the write. + // Return SocketOperationRead to indicate we need to complete the read. // - _state = StateProxyConnectRequest; // Send proxy connect request - return IceInternal::SocketOperationWrite; + _state = StateProxyConnectRequestPending; // Wait for proxy response + return IceInternal::SocketOperationRead; } } - - _state = StateConnected; - } - else if(_state == StateProxyConnectRequest) - { - // - // Write completed. - // - _proxy->endWriteConnectRequest(writeBuffer); - _state = StateProxyConnectRequestPending; // Wait for proxy response - return IceInternal::SocketOperationRead; - } - else if(_state == StateProxyConnectRequestPending) - { - // - // Read completed. - // - _proxy->endReadConnectRequestResponse(readBuffer); - _state = StateConnected; - } - - assert(_state == StateConnected); - - OSStatus err = 0; - if(!_ssl) - { - // - // Initialize SSL context - // - _ssl = _engine->newContext(_incoming); - if((err = SSLSetIOFuncs(_ssl, socketRead, socketWrite))) - { - throw SecurityException(__FILE__, __LINE__, "IceSSL: setting IO functions failed\n" + - errorToString(err)); - } - - if((err = SSLSetConnection(_ssl, reinterpret_cast<SSLConnectionRef>(this)))) + else { - throw SecurityException(__FILE__, __LINE__, "IceSSL: setting SSL connection failed\n" + - errorToString(err)); + // + // Return SocketOperationWrite to indicate we need to complete the write. + // + _state = StateProxyConnectRequest; // Send proxy connect request + return IceInternal::SocketOperationWrite; } } - SSLSessionState state; - SSLGetSessionState(_ssl, &state); + _state = StateConnected; + } + else if(_state == StateProxyConnectRequest) + { + // + // Write completed. + // + _proxy->endWriteConnectRequest(writeBuffer); + _state = StateProxyConnectRequestPending; // Wait for proxy response + return IceInternal::SocketOperationRead; + } + else if(_state == StateProxyConnectRequestPending) + { + // + // Read completed. + // + _proxy->endReadConnectRequestResponse(readBuffer); + _state = StateConnected; + } + + assert(_state == StateConnected); + OSStatus err = 0; + if(!_ssl) + { // - // SSL Handshake + // Initialize SSL context // - while(state == kSSLHandshake || state == kSSLIdle) + _ssl = _engine->newContext(_incoming); + if((err = SSLSetIOFuncs(_ssl, socketRead, socketWrite))) { - err = SSLHandshake(_ssl); - if(err == noErr) - { - break; // We're done! - } - else if(err == errSSLWouldBlock) - { - assert(_flags & SSLWantRead || _flags & SSLWantWrite); - return _flags & SSLWantRead ? IceInternal::SocketOperationRead : IceInternal::SocketOperationWrite; - } - else if(err == errSSLPeerAuthCompleted) - { - assert(!_trust); - err = SSLCopyPeerTrust(_ssl, &_trust); - if(err == noErr) - { - checkTrustResult(_trust, _engine, _instance); - continue; // Call SSLHandshake to resume the handsake. - } - // Let it fall through, this will raise a SecurityException with the SSLCopyPeerTrust error. - } - else if(err == errSSLClosedGraceful || err == errSSLClosedAbort) - { - throw ConnectionLostException(__FILE__, __LINE__, 0); - } + throw SecurityException(__FILE__, __LINE__, "IceSSL: setting IO functions failed\n" + + errorToString(err)); + } - IceInternal::Address remoteAddr; - string desc = "<not available>"; - if(IceInternal::fdToRemoteAddress(_fd, remoteAddr)) - { - desc = IceInternal::addrToString(remoteAddr); - } - ostringstream os; - os << "IceSSL: ssl error occurred for new " << (_incoming ? "incoming" : "outgoing") << " connection:\n" - << "remote address = " << desc << "\n" << errorToString(err); - throw ProtocolException(__FILE__, __LINE__, os.str()); + if((err = SSLSetConnection(_ssl, reinterpret_cast<SSLConnectionRef>(this)))) + { + throw SecurityException(__FILE__, __LINE__, "IceSSL: setting SSL connection failed\n" + + errorToString(err)); } - _engine->verifyPeer(_fd, _host, getNativeConnectionInfo()); - _state = StateHandshakeComplete; } - catch(const Ice::LocalException& ex) + + SSLSessionState state; + SSLGetSessionState(_ssl, &state); + + // + // SSL Handshake + // + while(state == kSSLHandshake || state == kSSLIdle) { - if(_instance->traceLevel() >= 2) + err = SSLHandshake(_ssl); + if(err == noErr) { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "failed to establish " << _instance->protocol() << " connection\n"; - if(_incoming) - { - out << IceInternal::fdToString(_fd) << "\n" << ex; - } - else + break; // We're done! + } + else if(err == errSSLWouldBlock) + { + assert(_flags & SSLWantRead || _flags & SSLWantWrite); + return _flags & SSLWantRead ? IceInternal::SocketOperationRead : IceInternal::SocketOperationWrite; + } + else if(err == errSSLPeerAuthCompleted) + { + assert(!_trust); + err = SSLCopyPeerTrust(_ssl, &_trust); + if(err == noErr) { - out << IceInternal::fdToString(_fd, _proxy, _addr, false) << "\n" << ex; + checkTrustResult(_trust, _engine, _instance); + continue; // Call SSLHandshake to resume the handsake. } + // Let it fall through, this will raise a SecurityException with the SSLCopyPeerTrust error. } - throw; - } - - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - if(_incoming) + else if(err == errSSLClosedGraceful || err == errSSLClosedAbort) { - out << "accepted " << _instance->protocol() << " connection\n" << _desc; + throw ConnectionLostException(__FILE__, __LINE__, 0); } - else + + IceInternal::Address remoteAddr; + string desc = "<not available>"; + if(IceInternal::fdToRemoteAddress(_fd, remoteAddr)) { - out << _instance->protocol() << " connection established\n" << _desc; + desc = IceInternal::addrToString(remoteAddr); } + ostringstream os; + os << "IceSSL: ssl error occurred for new " << (_incoming ? "incoming" : "outgoing") << " connection:\n" + << "remote address = " << desc << "\n" << errorToString(err); + throw ProtocolException(__FILE__, __LINE__, os.str()); } + _engine->verifyPeer(_fd, _host, getNativeConnectionInfo()); + _state = StateHandshakeComplete; if(_instance->engine()->securityTraceLevel() >= 1) { @@ -396,12 +363,6 @@ IceSSL::TransceiverI::closing(bool initiator, const Ice::LocalException&) void IceSSL::TransceiverI::close() { - if(_state == StateHandshakeComplete && _instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "closing " << _instance->protocol() << " connection\n" << toString(); - } - if(_trust) { CFRelease(_trust); @@ -493,21 +454,6 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf) } } - if(_instance->traceLevel() >= 3) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "sent "; - if(_buffered) - { - out << _buffered << " of " << _buffered; - } - else - { - out << processed << " of " << packetSize; - } - out << " bytes via " << protocol() << "\n" << toString(); - } - if(_buffered) { buf.i += _buffered; @@ -559,16 +505,6 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool&) size_t processed = 0; OSStatus err = SSLRead(_ssl, reinterpret_cast<void*>(buf.i), packetSize, &processed); - if(processed) - { - if(_instance->traceLevel() >= 3) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "received " << processed << " of " << packetSize << " bytes via " << protocol() << "\n" - << toString(); - } - } - if(err) { if(err == errSSLWouldBlock) @@ -630,6 +566,12 @@ IceSSL::TransceiverI::toString() const return _desc; } +string +IceSSL::TransceiverI::toDetailedString() const +{ + return toString(); +} + Ice::ConnectionInfoPtr IceSSL::TransceiverI::getInfo() const { @@ -670,11 +612,6 @@ IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const { _state = StateConnected; _desc = IceInternal::fdToString(_fd, _proxy, _addr, true); - if(_instance->traceLevel() >= 1) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << _instance->protocol() << " connection established\n" << _desc; - } } else { @@ -790,12 +727,6 @@ IceSSL::TransceiverI::writeRaw(IceInternal::Buffer& buf) } } - if(_instance->traceLevel() >= 3) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "sent " << ret << " of " << packetSize << " bytes via " << protocol() << "\n" << toString(); - } - buf.i += ret; if(packetSize > buf.b.end() - buf.i) @@ -849,12 +780,6 @@ IceSSL::TransceiverI::readRaw(IceInternal::Buffer& buf) } } - if(_instance->traceLevel() >= 3) - { - Trace out(_instance->logger(), _instance->traceCategory()); - out << "received " << ret << " of " << packetSize << " bytes via " << protocol() << "\n" << toString(); - } - buf.i += ret; if(packetSize > buf.b.end() - buf.i) { diff --git a/cpp/src/IceSSL/SecureTransportTransceiverI.h b/cpp/src/IceSSL/SecureTransportTransceiverI.h index 17ff0bb4c7c..e0221db2123 100644 --- a/cpp/src/IceSSL/SecureTransportTransceiverI.h +++ b/cpp/src/IceSSL/SecureTransportTransceiverI.h @@ -53,6 +53,7 @@ public: virtual std::string protocol() const; virtual std::string toString() const; + virtual std::string toDetailedString() const; virtual Ice::ConnectionInfoPtr getInfo() const; virtual void checkSendSize(const IceInternal::Buffer&, size_t); |