diff options
author | Mark Spruiell <mes@zeroc.com> | 2013-04-04 16:02:42 -0700 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2013-04-04 16:02:42 -0700 |
commit | 9cb665138c7d2422739e32b40a249c64fd3b6cd5 (patch) | |
tree | 94759d916599ca08761b98580185a230744ac67a /cpp | |
parent | x64 VC10 icexml35d.dll was linked to wrong file (diff) | |
download | ice-9cb665138c7d2422739e32b40a249c64fd3b6cd5.tar.bz2 ice-9cb665138c7d2422739e32b40a249c64fd3b6cd5.tar.xz ice-9cb665138c7d2422739e32b40a249c64fd3b6cd5.zip |
* SOCKS support for C++
* Minor cleanup in C#
* Unity fixes
Diffstat (limited to 'cpp')
38 files changed, 1043 insertions, 275 deletions
diff --git a/cpp/allTests.py b/cpp/allTests.py index 43500fbf8d5..edcb7533a8f 100755 --- a/cpp/allTests.py +++ b/cpp/allTests.py @@ -40,7 +40,7 @@ tests = [ ("Ice/operations", ["core"]), ("Ice/exceptions", ["core"]), ("Ice/ami", ["core"]), - ("Ice/info", ["core", "noipv6", "nocompress"]), + ("Ice/info", ["core", "noipv6", "nocompress", "nosocks"]), ("Ice/inheritance", ["core"]), ("Ice/facets", ["core"]), ("Ice/objects", ["core"]), @@ -70,7 +70,7 @@ tests = [ ("Ice/plugin", ["core", "nomingw"]), ("Ice/hash", ["once"]), ("Ice/admin", ["core", "noipv6"]), - ("Ice/metrics", ["core", "nossl", "noipv6", "nocompress", "nomingw"]), + ("Ice/metrics", ["core", "nossl", "noipv6", "nocompress", "nomingw", "nosocks"]), ("Ice/enums", ["once"]), ("IceSSL/configuration", ["once", "novalgrind"]), # valgrind doesn't work well with openssl ("IceBox/configuration", ["core", "noipv6", "novc90", "nomingw", "nomx"]), diff --git a/cpp/include/Ice/ProtocolPluginFacade.h b/cpp/include/Ice/ProtocolPluginFacade.h index b18016f1f8a..17034f1d516 100644 --- a/cpp/include/Ice/ProtocolPluginFacade.h +++ b/cpp/include/Ice/ProtocolPluginFacade.h @@ -17,6 +17,7 @@ #include <Ice/EndpointFactoryF.h> #include <Ice/InstanceF.h> #include <Ice/EndpointIF.h> +#include <Ice/NetworkF.h> #include <Ice/Protocol.h> namespace IceInternal @@ -74,6 +75,11 @@ public: bool preferIPv6() const; // + // Get the network proxy. + // + NetworkProxyPtr getNetworkProxy() const; + + // // Register an EndpointFactory. // void addEndpointFactory(const EndpointFactoryPtr&) const; diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index f6c52ee7c91..9be395b3f91 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -2280,7 +2280,7 @@ Ice::ConnectionI::initiateShutdown() bool Ice::ConnectionI::initialize(SocketOperation operation) { - SocketOperation s = _transceiver->initialize(); + SocketOperation s = _transceiver->initialize(_readStream, _writeStream); if(s != SocketOperationNone) { scheduleTimeout(s, connectTimeout()); diff --git a/cpp/src/Ice/EndpointI.cpp b/cpp/src/Ice/EndpointI.cpp index aabcb14b2b0..9cd4823c9ae 100644 --- a/cpp/src/Ice/EndpointI.cpp +++ b/cpp/src/Ice/EndpointI.cpp @@ -46,6 +46,17 @@ Init init; Ice::LocalObject* IceInternal::upCast(EndpointI* p) { return p; } IceUtil::Shared* IceInternal::upCast(EndpointHostResolver* p) { return p; } +vector<ConnectorPtr> +IceInternal::EndpointI::connectors(const vector<Address>& /*addrs*/, const NetworkProxyPtr& /*proxy*/) const +{ + // + // This method must be extended by endpoints which use the EndpointHostResolver to create + // connectors from IP addresses. + // + assert(false); + return vector<ConnectorPtr>(); +} + const string& IceInternal::EndpointI::connectionId() const { @@ -63,17 +74,6 @@ IceInternal::EndpointI::internal_getHash() const return _hashValue; } -vector<ConnectorPtr> -IceInternal::EndpointI::connectors(const vector<Address>& /*addrs*/) const -{ - // - // This method must be extended by endpoints which use the EndpointHostResolver to create - // connectors from IP addresses. - // - assert(false); - return vector<ConnectorPtr>(); -} - IceInternal::EndpointI::EndpointI(const std::string& connectionId) : _connectionId(connectionId), _hashInitialized(false) @@ -128,10 +128,14 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, Ice::En // Try to get the addresses without DNS lookup. If this doesn't // work, we retry with DNS lookup (and observer). // - vector<Address> addrs = getAddresses(host, port, _protocol, selType, _preferIPv6, false); - if(!addrs.empty()) + NetworkProxyPtr networkProxy = _instance->networkProxy(); + if(!networkProxy) { - return endpoint->connectors(addrs); + vector<Address> addrs = getAddresses(host, port, _protocol, selType, _preferIPv6, false); + if(!addrs.empty()) + { + return endpoint->connectors(addrs, 0); + } } ObserverHelperT<> observer; @@ -144,7 +148,13 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, Ice::En vector<ConnectorPtr> connectors; try { - connectors = endpoint->connectors(getAddresses(host, port, _protocol, selType, _preferIPv6, true)); + if(networkProxy) + { + networkProxy = networkProxy->resolveHost(); + } + + connectors = endpoint->connectors(getAddresses(host, port, _protocol, selType, _preferIPv6, true), + networkProxy); } catch(const Ice::LocalException& ex) { @@ -162,20 +172,24 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, Ice::En // Try to get the addresses without DNS lookup. If this doesn't work, we queue a resolve // entry and the thread will take care of getting the endpoint addresses. // - try + NetworkProxyPtr networkProxy = _instance->networkProxy(); + if(!networkProxy) { - vector<Address> addrs = getAddresses(host, port, _protocol, selType, _preferIPv6, false); - if(!addrs.empty()) + try { - callback->connectors(endpoint->connectors(addrs)); + vector<Address> addrs = getAddresses(host, port, _protocol, selType, _preferIPv6, false); + if(!addrs.empty()) + { + callback->connectors(endpoint->connectors(addrs, 0)); + return; + } + } + catch(const Ice::LocalException& ex) + { + callback->exception(ex); return; } } - catch(const Ice::LocalException& ex) - { - callback->exception(ex); - return; - } Lock sync(*this); assert(!_destroyed); @@ -241,11 +255,18 @@ IceInternal::EndpointHostResolver::run() threadObserver->stateChanged(ThreadStateIdle, ThreadStateInUseForOther); } + NetworkProxyPtr networkProxy = _instance->networkProxy(); + if(networkProxy) + { + networkProxy = networkProxy->resolveHost(); + } + r.callback->connectors(r.endpoint->connectors(getAddresses(r.host, r.port, _protocol, r.selType, - _preferIPv6, true))); + _preferIPv6, true), + networkProxy)); if(threadObserver) { diff --git a/cpp/src/Ice/EndpointI.h b/cpp/src/Ice/EndpointI.h index ea0b577c5ca..3e14f4d30b9 100644 --- a/cpp/src/Ice/EndpointI.h +++ b/cpp/src/Ice/EndpointI.h @@ -143,6 +143,8 @@ public: // virtual bool equivalent(const EndpointIPtr&) const = 0; + virtual std::vector<ConnectorPtr> connectors(const std::vector<Address>&, const NetworkProxyPtr&) const; + // // Compare endpoints for sorting purposes. // @@ -155,7 +157,6 @@ protected: virtual ::Ice::Int internal_getHash() const; - virtual std::vector<ConnectorPtr> connectors(const std::vector<Address>&) const; friend class EndpointHostResolver; EndpointI(const std::string&); diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index f42b4d10a69..b682ce39cff 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -309,6 +309,12 @@ IceInternal::Instance::preferIPv6() const return _preferIPv6; } +NetworkProxyPtr +IceInternal::Instance::networkProxy() const +{ + return _networkProxy; +} + ThreadPoolPtr IceInternal::Instance::clientThreadPool() { @@ -1045,8 +1051,17 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi _proxyFactory = new ProxyFactory(this); + string proxyHost = _initData.properties->getProperty("Ice.SOCKSProxyHost"); + int defaultIPv6 = 1; // IPv6 enabled by default. + if(!proxyHost.empty()) + { + int proxyPort = _initData.properties->getPropertyAsIntWithDefault("Ice.SOCKSProxyPort", 1080); + _networkProxy = new SOCKSNetworkProxy(proxyHost, proxyPort); + defaultIPv6 = 0; // IPv6 is not supported with SOCKS + } + bool ipv4 = _initData.properties->getPropertyAsIntWithDefault("Ice.IPv4", 1) > 0; - bool ipv6 = _initData.properties->getPropertyAsIntWithDefault("Ice.IPv6", 1) > 0; + bool ipv6 = _initData.properties->getPropertyAsIntWithDefault("Ice.IPv6", defaultIPv6) > 0; if(!ipv4 && !ipv6) { throw InitializationException(__FILE__, __LINE__, "Both IPV4 and IPv6 support cannot be disabled."); @@ -1064,6 +1079,12 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi _protocolSupport = EnableIPv6; } _preferIPv6 = _initData.properties->getPropertyAsInt("Ice.PreferIPv6Address") > 0; + + if(ipv6 && SOCKSNetworkProxyPtr::dynamicCast(_networkProxy)) + { + throw InitializationException(__FILE__, __LINE__, "IPv6 is not supported with SOCKS4 proxies"); + } + _endpointFactoryManager = new EndpointFactoryManager(this); #ifndef ICE_OS_WINRT EndpointFactoryPtr tcpEndpointFactory = new TcpEndpointFactory(this); diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h index 0437f4d10ab..d092e49dc3d 100644 --- a/cpp/src/Ice/Instance.h +++ b/cpp/src/Ice/Instance.h @@ -34,6 +34,7 @@ #include <Ice/RetryQueueF.h> #include <Ice/DynamicLibraryF.h> #include <Ice/PluginF.h> +#include <Ice/NetworkF.h> #include <Ice/Initialize.h> #include <Ice/ImplicitContextI.h> #include <Ice/FacetMap.h> @@ -73,6 +74,7 @@ public: ObjectAdapterFactoryPtr objectAdapterFactory() const; ProtocolSupport protocolSupport() const; bool preferIPv6() const; + NetworkProxyPtr networkProxy() const; ThreadPoolPtr clientThreadPool(); ThreadPoolPtr serverThreadPool(bool create = true); EndpointHostResolverPtr endpointHostResolver(); @@ -136,6 +138,7 @@ private: ObjectAdapterFactoryPtr _objectAdapterFactory; ProtocolSupport _protocolSupport; bool _preferIPv6; + NetworkProxyPtr _networkProxy; ThreadPoolPtr _clientThreadPool; ThreadPoolPtr _serverThreadPool; EndpointHostResolverPtr _endpointHostResolver; diff --git a/cpp/src/Ice/Network.cpp b/cpp/src/Ice/Network.cpp index e60712ab51a..b1a921220e2 100644 --- a/cpp/src/Ice/Network.cpp +++ b/cpp/src/Ice/Network.cpp @@ -35,6 +35,7 @@ #include <Ice/LocalException.h> #include <Ice/Properties.h> // For setTcpBufSize #include <Ice/LoggerUtil.h> // For setTcpBufSize +#include <Ice/Buffer.h> #include <IceUtil/Random.h> #if defined(ICE_OS_WINRT) @@ -544,6 +545,114 @@ IceInternal::AsyncInfo::AsyncInfo(SocketOperation s) } #endif +IceUtil::Shared* IceInternal::upCast(NetworkProxy* p) { return p; } +IceUtil::Shared* IceInternal::upCast(SOCKSNetworkProxy* p) { return p; } + +IceInternal::SOCKSNetworkProxy::SOCKSNetworkProxy(const string& host, int port) : + _host(host), _port(port), _haveAddress(false) +{ + memset(&_address, 0, sizeof(_address)); +} + +IceInternal::SOCKSNetworkProxy::SOCKSNetworkProxy(const Address& addr) : + _port(0), _address(addr), _haveAddress(true) +{ +} + +void +IceInternal::SOCKSNetworkProxy::beginWriteConnectRequest(const Address& addr, Buffer& buf) +{ + if(addr.saStorage.ss_family != AF_INET) + { + throw FeatureNotSupportedException(__FILE__, __LINE__, "SOCKS4 only supports IPv4 addresses"); + } + + // + // SOCKS connect request + // + buf.b.resize(9); + buf.i = buf.b.begin(); + Byte* dest = &buf.b[0]; + *dest++ = 0x04; // SOCKS version 4. + *dest++ = 0x01; // Command, establish a TCP/IP stream connection + + const Byte* src; + + // + // Port (already in big-endian order) + // + src = reinterpret_cast<const Byte*>(&addr.saIn.sin_port); + *dest++ = *src++; + *dest++ = *src; + + // + // IPv4 address (already in big-endian order) + // + src = reinterpret_cast<const Ice::Byte*>(&addr.saIn.sin_addr.s_addr); + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src; + + *dest = 0x00; // User ID. +} + +void +IceInternal::SOCKSNetworkProxy::endWriteConnectRequest(Buffer& buf) +{ + buf.b.reset(); +} + +void +IceInternal::SOCKSNetworkProxy::beginReadConnectRequestResponse(Buffer& buf) +{ + // + // Read the SOCKS4 response whose size is 8 bytes. + // + buf.b.resize(8); + buf.i = buf.b.begin(); +} + +void +IceInternal::SOCKSNetworkProxy::endReadConnectRequestResponse(Buffer& buf) +{ + buf.i = buf.b.begin(); + + if(buf.b.end() - buf.i < 2) + { + throw UnmarshalOutOfBoundsException(__FILE__, __LINE__); + } + + const Byte* src = &(*buf.i); + const Byte b1 = *src++; + const Byte b2 = *src++; + if(b1 != 0x00 || b2 != 0x5a) + { + throw ConnectFailedException(__FILE__, __LINE__); + } + buf.b.reset(); +} + +NetworkProxyPtr +IceInternal::SOCKSNetworkProxy::resolveHost() const +{ + assert(!_host.empty()); + return new SOCKSNetworkProxy(getAddresses(_host, _port, EnableIPv4, Random, false, true)[0]); +} + +Address +IceInternal::SOCKSNetworkProxy::getAddress() const +{ + assert(_haveAddress); // Host must be resolved. + return _address; +} + +string +IceInternal::SOCKSNetworkProxy::getName() const +{ + return "SOCKS"; +} + bool IceInternal::noMoreFds(int error) { @@ -1020,6 +1129,64 @@ IceInternal::fdToRemoteAddress(SOCKET fd, Address& addr) #endif } +std::string +IceInternal::fdToString(SOCKET fd, const NetworkProxyPtr& proxy, const Address& target, +#if defined(_WIN32) + bool connected) +#else + bool /*connected*/) +#endif +{ + if(fd == INVALID_SOCKET) + { + return "<closed>"; + } + + ostringstream s; + +#if defined(_WIN32) + if(!connected) + { + // + // The local address is only accessible with connected sockets on Windows. + // + s << "local address = <not available>"; + } + else + { + Address localAddr; + fdToLocalAddress(fd, localAddr); + s << "local address = " << addrToString(localAddr); + } +#else + Address localAddr; + fdToLocalAddress(fd, localAddr); + s << "local address = " << addrToString(localAddr); +#endif + + Address remoteAddr; + bool peerConnected = fdToRemoteAddress(fd, remoteAddr); + + if(proxy) + { + if(!peerConnected) + { + remoteAddr = proxy->getAddress(); + } + s << "\n" + proxy->getName() + " proxy address = " << addrToString(remoteAddr); + s << "\nremote address = " << addrToString(target); + } + else + { + if(!peerConnected) + { + remoteAddr = target; + } + s << "\nremote address = " << addrToString(remoteAddr); + } + + return s.str(); +} std::string IceInternal::fdToString(SOCKET fd) @@ -1036,7 +1203,7 @@ IceInternal::fdToString(SOCKET fd) bool peerConnected = fdToRemoteAddress(fd, remoteAddr); return addressesToString(localAddr, remoteAddr, peerConnected); -}; +} void IceInternal::fdToAddressAndPort(SOCKET fd, string& localAddress, int& localPort, string& remoteAddress, int& remotePort) diff --git a/cpp/src/Ice/Network.h b/cpp/src/Ice/Network.h index 8d7cac3a665..55036bcc986 100644 --- a/cpp/src/Ice/Network.h +++ b/cpp/src/Ice/Network.h @@ -16,6 +16,7 @@ #include <Ice/Config.h> +#include <Ice/NetworkF.h> #include <Ice/PropertiesF.h> // For setTcpBufSize #include <Ice/LoggerF.h> // For setTcpBufSize #include <Ice/Protocol.h> @@ -187,6 +188,71 @@ protected: }; typedef IceUtil::Handle<NativeInfo> NativeInfoPtr; +class ICE_API Buffer; + +class ICE_API NetworkProxy : virtual public IceUtil::Shared +{ +public: + + // + // Write the connection request on the connection established + // with the network proxy server. This is called right after + // the connection establishment succeeds. + // + virtual void beginWriteConnectRequest(const Address&, Buffer&) = 0; + virtual void endWriteConnectRequest(Buffer&) = 0; + + // + // Once the connection request has been sent, this is called + // to prepare and read the response from the proxy server. + // + virtual void beginReadConnectRequestResponse(Buffer&) = 0; + virtual void endReadConnectRequestResponse(Buffer&) = 0; + + // + // If the proxy host needs to be resolved, this should return + // a new NetworkProxy containing the IP address of the proxy. + // This is called from the endpoint host resolver thread, so + // it's safe if this this method blocks. + // + virtual NetworkProxyPtr resolveHost() const = 0; + + // + // Returns the IP address of the network proxy. This method + // must not block. It's only called on a network proxy object + // returned by resolveHost(). + // + virtual Address getAddress() const = 0; + + // + // Returns the name of the proxy, used for tracing purposes. + // + virtual std::string getName() const = 0; +}; + +class ICE_API SOCKSNetworkProxy : virtual public NetworkProxy +{ +public: + + SOCKSNetworkProxy(const std::string&, int); + SOCKSNetworkProxy(const Address&); + + virtual void beginWriteConnectRequest(const Address&, Buffer&); + virtual void endWriteConnectRequest(Buffer&); + virtual void beginReadConnectRequestResponse(Buffer&); + virtual void endReadConnectRequestResponse(Buffer&); + virtual NetworkProxyPtr resolveHost() const; + virtual Address getAddress() const; + virtual std::string getName() const; + +private: + + std::string _host; + int _port; + Address _address; + bool _haveAddress; +}; + ICE_API bool noMoreFds(int); ICE_API std::string errorToStringDNS(int); ICE_API std::vector<Address> getAddresses(const std::string&, int, ProtocolSupport, Ice::EndpointSelectionType, bool, @@ -203,6 +269,7 @@ ICE_API void closeSocket(SOCKET); ICE_API std::string addrToString(const Address&); ICE_API void fdToLocalAddress(SOCKET, Address&); ICE_API bool fdToRemoteAddress(SOCKET, Address&); +ICE_API std::string fdToString(SOCKET, const NetworkProxyPtr&, const Address&, bool); ICE_API std::string fdToString(SOCKET); ICE_API void fdToAddressAndPort(SOCKET, std::string&, int&, std::string&, int&); ICE_API void addrToAddressAndPort(const Address&, std::string&, int&); diff --git a/cpp/src/Ice/NetworkF.h b/cpp/src/Ice/NetworkF.h new file mode 100644 index 00000000000..4b8cb2d4921 --- /dev/null +++ b/cpp/src/Ice/NetworkF.h @@ -0,0 +1,30 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_NETWORK_F_H +#define ICE_NETWORK_F_H + +#include <IceUtil/Shared.h> + +#include <Ice/Handle.h> + +namespace IceInternal +{ + +class NetworkProxy; +ICE_API IceUtil::Shared* upCast(NetworkProxy*); +typedef Handle<NetworkProxy> NetworkProxyPtr; + +class SOCKSNetworkProxy; +ICE_API IceUtil::Shared* upCast(SOCKSNetworkProxy*); +typedef Handle<SOCKSNetworkProxy> SOCKSNetworkProxyPtr; + +} + +#endif diff --git a/cpp/src/Ice/ProtocolPluginFacade.cpp b/cpp/src/Ice/ProtocolPluginFacade.cpp index 483a9aa0df2..e7ca3f5c2f9 100644 --- a/cpp/src/Ice/ProtocolPluginFacade.cpp +++ b/cpp/src/Ice/ProtocolPluginFacade.cpp @@ -74,6 +74,12 @@ IceInternal::ProtocolPluginFacade::preferIPv6() const return _instance->preferIPv6(); } +NetworkProxyPtr +IceInternal::ProtocolPluginFacade::getNetworkProxy() const +{ + return _instance->networkProxy(); +} + void IceInternal::ProtocolPluginFacade::addEndpointFactory(const EndpointFactoryPtr& factory) const { diff --git a/cpp/src/Ice/TcpAcceptor.cpp b/cpp/src/Ice/TcpAcceptor.cpp index ec295b6a6bc..eeb74e10cb5 100644 --- a/cpp/src/Ice/TcpAcceptor.cpp +++ b/cpp/src/Ice/TcpAcceptor.cpp @@ -165,7 +165,7 @@ IceInternal::TcpAcceptor::accept() Trace out(_logger, _traceLevels->networkCat); out << "accepted tcp connection\n" << fdToString(fd); } - return new TcpTransceiver(_instance, fd, true); + return new TcpTransceiver(_instance, fd); } string diff --git a/cpp/src/Ice/TcpConnector.cpp b/cpp/src/Ice/TcpConnector.cpp index 435db7f2b7a..b269035a1b7 100644 --- a/cpp/src/Ice/TcpConnector.cpp +++ b/cpp/src/Ice/TcpConnector.cpp @@ -31,8 +31,8 @@ IceInternal::TcpConnector::connect() try { - TransceiverPtr transceiver = new TcpTransceiver(_instance, createSocket(false, _addr), false); - dynamic_cast<TcpTransceiver*>(transceiver.get())->connect(_addr); + TransceiverPtr transceiver = new TcpTransceiver(_instance, createSocket(false, _addr), _proxy, _addr); + dynamic_cast<TcpTransceiver*>(transceiver.get())->connect(); return transceiver; } catch(const Ice::LocalException& ex) @@ -55,7 +55,7 @@ IceInternal::TcpConnector::type() const string IceInternal::TcpConnector::toString() const { - return addrToString(_addr); + return addrToString(!_proxy ? _addr : _proxy->getAddress()); } bool @@ -120,12 +120,13 @@ IceInternal::TcpConnector::operator<(const Connector& r) const return compareAddress(_addr, p->_addr) < 0; } -IceInternal::TcpConnector::TcpConnector(const InstancePtr& instance, const Address& addr, +IceInternal::TcpConnector::TcpConnector(const InstancePtr& instance, const Address& addr, const NetworkProxyPtr& proxy, Ice::Int timeout, const string& connectionId) : _instance(instance), _traceLevels(instance->traceLevels()), _logger(instance->initializationData().logger), _addr(addr), + _proxy(proxy), _timeout(timeout), _connectionId(connectionId) { diff --git a/cpp/src/Ice/TcpConnector.h b/cpp/src/Ice/TcpConnector.h index 9c504a0ebca..e25b9f2e3a8 100644 --- a/cpp/src/Ice/TcpConnector.h +++ b/cpp/src/Ice/TcpConnector.h @@ -36,7 +36,7 @@ public: private: - TcpConnector(const InstancePtr&, const Address&, Ice::Int, const std::string&); + TcpConnector(const InstancePtr&, const Address&, const NetworkProxyPtr&, Ice::Int, const std::string&); virtual ~TcpConnector(); friend class TcpEndpointI; @@ -44,6 +44,7 @@ private: const TraceLevelsPtr _traceLevels; const ::Ice::LoggerPtr _logger; const Address _addr; + const NetworkProxyPtr _proxy; const Ice::Int _timeout; const std::string _connectionId; }; diff --git a/cpp/src/Ice/TcpEndpointI.cpp b/cpp/src/Ice/TcpEndpointI.cpp index 27324314a60..1cf09f15e3e 100644 --- a/cpp/src/Ice/TcpEndpointI.cpp +++ b/cpp/src/Ice/TcpEndpointI.cpp @@ -413,6 +413,17 @@ IceInternal::TcpEndpointI::equivalent(const EndpointIPtr& endpoint) const return tcpEndpointI->_host == _host && tcpEndpointI->_port == _port; } +vector<ConnectorPtr> +IceInternal::TcpEndpointI::connectors(const vector<Address>& addresses, const NetworkProxyPtr& proxy) const +{ + vector<ConnectorPtr> connectors; + for(unsigned int i = 0; i < addresses.size(); ++i) + { + connectors.push_back(new TcpConnector(_instance, addresses[i], proxy, _timeout, _connectionId)); + } + return connectors; +} + bool IceInternal::TcpEndpointI::operator==(const LocalObject& r) const { @@ -535,17 +546,6 @@ IceInternal::TcpEndpointI::hashInit() const return h; } -vector<ConnectorPtr> -IceInternal::TcpEndpointI::connectors(const vector<Address>& addresses) const -{ - vector<ConnectorPtr> connectors; - for(unsigned int i = 0; i < addresses.size(); ++i) - { - connectors.push_back(new TcpConnector(_instance, addresses[i], _timeout, _connectionId)); - } - return connectors; -} - IceInternal::TcpEndpointFactory::TcpEndpointFactory(const InstancePtr& instance) : _instance(instance) { diff --git a/cpp/src/Ice/TcpEndpointI.h b/cpp/src/Ice/TcpEndpointI.h index 294bf79a00e..5d3cb602ed0 100644 --- a/cpp/src/Ice/TcpEndpointI.h +++ b/cpp/src/Ice/TcpEndpointI.h @@ -44,6 +44,7 @@ public: virtual AcceptorPtr acceptor(EndpointIPtr&, const std::string&) const; virtual std::vector<EndpointIPtr> expand() const; virtual bool equivalent(const EndpointIPtr&) const; + virtual std::vector<ConnectorPtr> connectors(const std::vector<Address>&, const NetworkProxyPtr&) const; virtual bool operator==(const Ice::LocalObject&) const; virtual bool operator<(const Ice::LocalObject&) const; @@ -55,7 +56,6 @@ public: private: virtual ::Ice::Int hashInit() const; - virtual std::vector<ConnectorPtr> connectors(const std::vector<IceInternal::Address>&) const; // // All members are const, because endpoints are immutable. diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp index ee49f1f7e74..c577367c5fb 100644 --- a/cpp/src/Ice/TcpTransceiver.cpp +++ b/cpp/src/Ice/TcpTransceiver.cpp @@ -46,59 +46,126 @@ IceInternal::TcpTransceiver::getAsyncInfo(SocketOperation status) #endif SocketOperation -IceInternal::TcpTransceiver::initialize() +IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer) { - if(_state == StateNeedConnect) - { - _state = StateConnectPending; - return SocketOperationConnect; - } - else if(_state <= StateConnectPending) + try { - try + if(_state == StateNeedConnect) { -#if defined(ICE_USE_IOCP) + _state = StateConnectPending; + return SocketOperationConnect; + } + else if(_state <= StateConnectPending) + { +#ifdef ICE_USE_IOCP doFinishConnectAsync(_fd, _write); #else doFinishConnect(_fd); #endif - _state = StateConnected; - _desc = fdToString(_fd); - } - catch(const Ice::LocalException& ex) - { - if(_traceLevels->network >= 2) + + _desc = fdToString(_fd, _proxy, _addr, true); + + if(_proxy) { - Trace out(_logger, _traceLevels->networkCat); - out << "failed to establish tcp connection\n"; -#if !defined(_WIN32) // - // The local address is only accessible with connected sockets on Windows. + // Prepare the read & write buffers in advance. + // + _proxy->beginWriteConnectRequest(_addr, writeBuffer); + _proxy->beginReadConnectRequestResponse(readBuffer); + +#ifdef ICE_USE_IOCP + // + // Return SocketOperationWrite to indicate we need to start a write. // - Address localAddr; - fdToLocalAddress(_fd, localAddr); - out << "local address: " << addrToString(localAddr) << "\n"; + _state = StateProxyConnectRequest; // Send proxy connect request + return IceInternal::SocketOperationWrite; #else - out << "local address: <not available>\n"; + // + // Write the proxy connection message. + // + if(write(writeBuffer)) + { + // + // Write completed without blocking. + // + _proxy->endWriteConnectRequest(writeBuffer); + + // + // Try to read the response. + // + if(read(readBuffer)) + { + // + // Read completed without blocking - fall through. + // + _proxy->endReadConnectRequestResponse(readBuffer); + } + else + { + // + // Return SocketOperationRead to indicate we need to complete the read. + // + _state = StateProxyConnectRequestPending; // Wait for proxy response + return SocketOperationRead; + } + } + else + { + // + // Return SocketOperationWrite to indicate we need to complete the write. + // + _state = StateProxyConnectRequest; // Send proxy connect request + return SocketOperationWrite; + } #endif - out << "remote address: " << addrToString(_connectAddr) << "\n" << ex; } - throw; - } - if(_traceLevels->network >= 1) + _state = StateConnected; + } + else if(_state == StateProxyConnectRequest) + { + // + // Write completed. + // + _proxy->endWriteConnectRequest(writeBuffer); + _state = StateProxyConnectRequestPending; // Wait for proxy response + return SocketOperationRead; + } + else if(_state == StateProxyConnectRequestPending) + { + // + // Read completed. + // + _proxy->endReadConnectRequestResponse(readBuffer); + _state = StateConnected; + } + } + catch(const Ice::LocalException& ex) + { + if(_traceLevels->network >= 2) { Trace out(_logger, _traceLevels->networkCat); - out << "tcp connection established\n" << _desc; + out << "failed to establish tcp connection\n" << fdToString(_fd, _proxy, _addr, false) << "\n" << ex; } + throw; } + assert(_state == StateConnected); + if(_traceLevels->network >= 1) + { + Trace out(_logger, _traceLevels->networkCat); + out << "tcp connection established\n" << _desc; + } return SocketOperationNone; } void IceInternal::TcpTransceiver::close() { + // + // If the transceiver is not connected, its description is simply "not connected", + // which isn't very helpful. + // if(_state == StateConnected && _traceLevels->network >= 1) { Trace out(_logger, _traceLevels->networkCat); @@ -121,17 +188,19 @@ IceInternal::TcpTransceiver::close() bool IceInternal::TcpTransceiver::write(Buffer& buf) { - // Its impossible for the packetSize to be more than an Int. + // + // It's impossible for packetSize to be more than an Int. + // int packetSize = static_cast<int>(buf.b.end() - buf.i); -# ifdef ICE_USE_IOCP +#ifdef ICE_USE_IOCP // // Limit packet size to avoid performance problems on WIN32 // if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) - { + { packetSize = _maxSendPacketSize; } -# endif +#endif while(buf.i != buf.b.end()) { assert(_fd != INVALID_SOCKET); @@ -161,7 +230,7 @@ IceInternal::TcpTransceiver::write(Buffer& buf) { return false; } - + if(connectionLost()) { ConnectionLostException ex(__FILE__, __LINE__); @@ -194,13 +263,16 @@ IceInternal::TcpTransceiver::write(Buffer& buf) packetSize = static_cast<int>(buf.b.end() - buf.i); } } + return true; } bool IceInternal::TcpTransceiver::read(Buffer& buf) { - // Its impossible for the packetSize to be more than an Int. + // + // It's impossible for packetSize to be more than an Int. + // int packetSize = static_cast<int>(buf.b.end() - buf.i); while(buf.i != buf.b.end()) { @@ -220,7 +292,7 @@ IceInternal::TcpTransceiver::read(Buffer& buf) { continue; } - + if(noBuffers() && packetSize > 1024) { packetSize /= 2; @@ -231,7 +303,7 @@ IceInternal::TcpTransceiver::read(Buffer& buf) { return false; } - + if(connectionLost()) { ConnectionLostException ex(__FILE__, __LINE__); @@ -264,13 +336,14 @@ IceInternal::TcpTransceiver::read(Buffer& buf) return true; } -#if defined(ICE_USE_IOCP) +#ifdef ICE_USE_IOCP bool IceInternal::TcpTransceiver::startWrite(Buffer& buf) { - if(_state < StateConnected) + if(_state == StateConnectPending) { - doConnectAsync(_fd, _connectAddr, _write); + Address addr = _proxy ? _proxy->getAddress() : _addr; + doConnectAsync(_fd, addr, _write); return false; } @@ -279,7 +352,7 @@ IceInternal::TcpTransceiver::startWrite(Buffer& buf) int packetSize = static_cast<int>(buf.b.end() - buf.i); if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) - { + { packetSize = _maxSendPacketSize; } assert(packetSize > 0); @@ -310,7 +383,7 @@ IceInternal::TcpTransceiver::startWrite(Buffer& buf) void IceInternal::TcpTransceiver::finishWrite(Buffer& buf) { - if(_state < StateConnected) + if(_state < StateConnected && _state != StateProxyConnectRequest) { return; } @@ -336,18 +409,19 @@ IceInternal::TcpTransceiver::finishWrite(Buffer& buf) { int packetSize = static_cast<int>(buf.b.end() - buf.i); if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) - { + { packetSize = _maxSendPacketSize; } Trace out(_logger, _traceLevels->networkCat); out << "sent " << _write.count << " of " << packetSize << " bytes via tcp\n" << toString(); } - + if(_stats) { _stats->bytesSent(type(), _write.count); } + buf.i += _write.count; } @@ -409,7 +483,7 @@ IceInternal::TcpTransceiver::finishRead(Buffer& buf) ex.error = 0; throw ex; } - + if(_traceLevels->network >= 3) { int packetSize = static_cast<int>(buf.b.end() - buf.i); @@ -442,7 +516,7 @@ IceInternal::TcpTransceiver::toString() const return _desc; } -Ice::ConnectionInfoPtr +Ice::ConnectionInfoPtr IceInternal::TcpTransceiver::getInfo() const { Ice::TCPConnectionInfoPtr info = new Ice::TCPConnectionInfo(); @@ -459,15 +533,53 @@ IceInternal::TcpTransceiver::checkSendSize(const Buffer& buf, size_t messageSize } } -IceInternal::TcpTransceiver::TcpTransceiver(const InstancePtr& instance, SOCKET fd, bool connected) : +IceInternal::TcpTransceiver::TcpTransceiver(const InstancePtr& instance, SOCKET fd, const NetworkProxyPtr& proxy, + const Address& addr) : + NativeInfo(fd), + _proxy(proxy), + _addr(addr), + _traceLevels(instance->traceLevels()), + _logger(instance->initializationData().logger), + _stats(instance->initializationData().stats), + _state(StateNeedConnect) +#ifdef ICE_USE_IOCP + , _read(SocketOperationRead), + _write(SocketOperationWrite) +#endif +{ + setBlock(_fd, false); + + setTcpBufSize(_fd, instance->initializationData().properties, _logger); + +#ifdef ICE_USE_IOCP + // + // On Windows, limiting the buffer size is important to prevent + // poor throughput performances when transfering large amount of + // data. See Microsoft KB article KB823764. + // + _maxSendPacketSize = IceInternal::getSendBufferSize(fd) / 2; + if(_maxSendPacketSize < 512) + { + _maxSendPacketSize = 0; + } + + _maxReceivePacketSize = IceInternal::getRecvBufferSize(fd); + if(_maxReceivePacketSize < 512) + { + _maxReceivePacketSize = 0; + } +#endif +} + +IceInternal::TcpTransceiver::TcpTransceiver(const InstancePtr& instance, SOCKET fd) : NativeInfo(fd), _traceLevels(instance->traceLevels()), _logger(instance->initializationData().logger), _stats(instance->initializationData().stats), - _state(connected ? StateConnected : StateNeedConnect), - _desc(connected ? fdToString(_fd) : string()) + _state(StateConnected), + _desc(fdToString(_fd)) #ifdef ICE_USE_IOCP - , _read(SocketOperationRead), + , _read(SocketOperationRead), _write(SocketOperationWrite) #endif { @@ -501,15 +613,15 @@ IceInternal::TcpTransceiver::~TcpTransceiver() } void -IceInternal::TcpTransceiver::connect(const Address& addr) +IceInternal::TcpTransceiver::connect() { #if !defined(ICE_USE_IOCP) try { - if(doConnect(_fd, addr)) + if(doConnect(_fd, _addr)) { _state = StateConnected; - _desc = fdToString(_fd); + _desc = fdToString(_fd, _proxy, _addr); if(_traceLevels->network >= 1) { Trace out(_logger, _traceLevels->networkCat); @@ -518,7 +630,7 @@ IceInternal::TcpTransceiver::connect(const Address& addr) } else { - _desc = fdToString(_fd); + _desc = fdToString(_fd, _proxy, _addr); } } catch(...) @@ -527,5 +639,4 @@ IceInternal::TcpTransceiver::connect(const Address& addr) throw; } #endif - _connectAddr = addr; } diff --git a/cpp/src/Ice/TcpTransceiver.h b/cpp/src/Ice/TcpTransceiver.h index 3005da1a0e7..a2afe4bb514 100644 --- a/cpp/src/Ice/TcpTransceiver.h +++ b/cpp/src/Ice/TcpTransceiver.h @@ -29,6 +29,8 @@ class TcpTransceiver : public Transceiver, public NativeInfo { StateNeedConnect, StateConnectPending, + StateProxyConnectRequest, + StateProxyConnectRequestPending, StateConnected }; @@ -39,7 +41,7 @@ public: virtual AsyncInfo* getAsyncInfo(SocketOperation); #endif - virtual SocketOperation initialize(); + virtual SocketOperation initialize(Buffer&, Buffer&); virtual void close(); virtual bool write(Buffer&); virtual bool read(Buffer&); @@ -56,21 +58,23 @@ public: private: - TcpTransceiver(const InstancePtr&, SOCKET, bool); + TcpTransceiver(const InstancePtr&, SOCKET, const NetworkProxyPtr&, const Address&); + TcpTransceiver(const InstancePtr&, SOCKET); virtual ~TcpTransceiver(); - void connect(const Address&); + void connect(); friend class TcpConnector; friend class TcpAcceptor; + const NetworkProxyPtr _proxy; + const Address _addr; const TraceLevelsPtr _traceLevels; const Ice::LoggerPtr _logger; const Ice::StatsPtr _stats; State _state; std::string _desc; - Address _connectAddr; #ifdef ICE_USE_IOCP AsyncInfo _read; diff --git a/cpp/src/Ice/Transceiver.h b/cpp/src/Ice/Transceiver.h index e08159e1d32..c2c00cf151d 100644 --- a/cpp/src/Ice/Transceiver.h +++ b/cpp/src/Ice/Transceiver.h @@ -25,7 +25,7 @@ class ICE_API Transceiver : virtual public ::IceUtil::Shared public: virtual NativeInfoPtr getNativeInfo() = 0; - virtual SocketOperation initialize() = 0; + virtual SocketOperation initialize(Buffer&, Buffer&) = 0; virtual void close() = 0; virtual bool write(Buffer&) = 0; virtual bool read(Buffer&) = 0; diff --git a/cpp/src/Ice/UdpEndpointI.cpp b/cpp/src/Ice/UdpEndpointI.cpp index 56828588a2d..f36d54ece5c 100644 --- a/cpp/src/Ice/UdpEndpointI.cpp +++ b/cpp/src/Ice/UdpEndpointI.cpp @@ -491,6 +491,17 @@ IceInternal::UdpEndpointI::equivalent(const EndpointIPtr& endpoint) const return udpEndpointI->_host == _host && udpEndpointI->_port == _port; } +vector<ConnectorPtr> +IceInternal::UdpEndpointI::connectors(const vector<Address>& addresses, const NetworkProxyPtr&) const +{ + vector<ConnectorPtr> connectors; + for(unsigned int i = 0; i < addresses.size(); ++i) + { + connectors.push_back(new UdpConnector(_instance, addresses[i], _mcastInterface, _mcastTtl, _connectionId)); + } + return connectors; +} + bool IceInternal::UdpEndpointI::operator==(const LocalObject& r) const { @@ -643,17 +654,6 @@ IceInternal::UdpEndpointI::hashInit() const return h; } -vector<ConnectorPtr> -IceInternal::UdpEndpointI::connectors(const vector<Address>& addresses) const -{ - vector<ConnectorPtr> connectors; - for(unsigned int i = 0; i < addresses.size(); ++i) - { - connectors.push_back(new UdpConnector(_instance, addresses[i], _mcastInterface, _mcastTtl, _connectionId)); - } - return connectors; -} - IceInternal::UdpEndpointFactory::UdpEndpointFactory(const InstancePtr& instance) : _instance(instance) diff --git a/cpp/src/Ice/UdpEndpointI.h b/cpp/src/Ice/UdpEndpointI.h index 86f2f514212..81e78fec3aa 100644 --- a/cpp/src/Ice/UdpEndpointI.h +++ b/cpp/src/Ice/UdpEndpointI.h @@ -45,6 +45,7 @@ public: virtual AcceptorPtr acceptor(EndpointIPtr&, const std::string&) const; virtual std::vector<EndpointIPtr> expand() const; virtual bool equivalent(const EndpointIPtr&) const; + virtual std::vector<ConnectorPtr> connectors(const std::vector<Address>&, const NetworkProxyPtr&) const; virtual bool operator==(const Ice::LocalObject&) const; virtual bool operator<(const Ice::LocalObject&) const; @@ -56,7 +57,6 @@ public: private: virtual ::Ice::Int hashInit() const; - virtual std::vector<ConnectorPtr> connectors(const std::vector<IceInternal::Address>&) const; // // All members are const, because endpoints are immutable. diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp index 5b127660735..ead3dde9371 100644 --- a/cpp/src/Ice/UdpTransceiver.cpp +++ b/cpp/src/Ice/UdpTransceiver.cpp @@ -80,7 +80,7 @@ IceInternal::UdpTransceiver::setCompletedHandler(SocketOperationCompletedHandler #endif SocketOperation -IceInternal::UdpTransceiver::initialize() +IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/) { if(_state == StateNeedConnect) { diff --git a/cpp/src/Ice/UdpTransceiver.h b/cpp/src/Ice/UdpTransceiver.h index 59cba440870..ad6bf128619 100644 --- a/cpp/src/Ice/UdpTransceiver.h +++ b/cpp/src/Ice/UdpTransceiver.h @@ -47,7 +47,7 @@ public: virtual void setCompletedHandler(SocketOperationCompletedHandler^); #endif - virtual SocketOperation initialize(); + virtual SocketOperation initialize(Buffer&, Buffer&); virtual void close(); virtual bool write(Buffer&); virtual bool read(Buffer&); diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp index fefad619293..dc406a700dc 100644 --- a/cpp/src/IceGrid/ReplicaCache.cpp +++ b/cpp/src/IceGrid/ReplicaCache.cpp @@ -83,7 +83,7 @@ ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session) { _observers->replicaAdded(session->getInternalRegistry()); } - catch(const Ice::ConnectionRefusedException&) + catch(const Ice::ConnectFailedException&) { // Expected if the replica is being shutdown. } @@ -122,7 +122,7 @@ ReplicaCache::remove(const string& name, bool shutdown) { _observers->replicaRemoved(entry->getProxy()); } - catch(const Ice::ConnectionRefusedException&) + catch(const Ice::ConnectFailedException&) { // Expected if the replica is being shutdown. } @@ -171,7 +171,7 @@ ReplicaCache::subscribe(const ReplicaObserverPrx& observer) Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer->ice_twoway()); ReplicaObserverPrx::uncheckedCast(publisher)->replicaInit(replicas); } - catch(const Ice::ConnectionRefusedException&) + catch(const Ice::ConnectFailedException&) { // The replica is being shutdown. } @@ -193,7 +193,7 @@ ReplicaCache::unsubscribe(const ReplicaObserverPrx& observer) { _topic->unsubscribe(observer); } - catch(const Ice::ConnectionRefusedException&) + catch(const Ice::ConnectFailedException&) { // The replica is being shutdown. } diff --git a/cpp/src/IceSSL/ConnectorI.cpp b/cpp/src/IceSSL/ConnectorI.cpp index f327795ef04..fe75e4f12ea 100644 --- a/cpp/src/IceSSL/ConnectorI.cpp +++ b/cpp/src/IceSSL/ConnectorI.cpp @@ -41,7 +41,7 @@ IceSSL::ConnectorI::connect() try { - return new TransceiverI(_instance, IceInternal::createSocket(false, _addr), _host, _addr); + return new TransceiverI(_instance, IceInternal::createSocket(false, _addr), _proxy, _host, _addr); } catch(const Ice::LocalException& ex) { @@ -63,7 +63,7 @@ IceSSL::ConnectorI::type() const string IceSSL::ConnectorI::toString() const { - return IceInternal::addrToString(_addr); + return IceInternal::addrToString(!_proxy ? _addr : _proxy->getAddress()); } bool @@ -130,11 +130,13 @@ IceSSL::ConnectorI::operator<(const IceInternal::Connector& r) const } IceSSL::ConnectorI::ConnectorI(const InstancePtr& instance, const string& host, const IceInternal::Address& addr, - Ice::Int timeout, const string& connectionId) : + const IceInternal::NetworkProxyPtr& proxy, Ice::Int timeout, + const string& connectionId) : _instance(instance), _logger(instance->communicator()->getLogger()), _host(host), _addr(addr), + _proxy(proxy), _timeout(timeout), _connectionId(connectionId) { diff --git a/cpp/src/IceSSL/ConnectorI.h b/cpp/src/IceSSL/ConnectorI.h index fd7d3fffcaf..10048b42015 100644 --- a/cpp/src/IceSSL/ConnectorI.h +++ b/cpp/src/IceSSL/ConnectorI.h @@ -37,14 +37,16 @@ public: private: - ConnectorI(const InstancePtr&, const std::string&, const IceInternal::Address&, Ice::Int, const std::string&); + ConnectorI(const InstancePtr&, const std::string&, const IceInternal::Address&, + const IceInternal::NetworkProxyPtr&, Ice::Int, const std::string&); virtual ~ConnectorI(); friend class EndpointI; const InstancePtr _instance; const Ice::LoggerPtr _logger; const std::string _host; - IceInternal::Address _addr; + const IceInternal::Address _addr; + const IceInternal::NetworkProxyPtr _proxy; const Ice::Int _timeout; const std::string _connectionId; }; diff --git a/cpp/src/IceSSL/EndpointI.cpp b/cpp/src/IceSSL/EndpointI.cpp index ce55f30a306..1c71eb560ab 100644 --- a/cpp/src/IceSSL/EndpointI.cpp +++ b/cpp/src/IceSSL/EndpointI.cpp @@ -411,6 +411,18 @@ IceSSL::EndpointI::equivalent(const IceInternal::EndpointIPtr& endpoint) const return sslEndpointI->_host == _host && sslEndpointI->_port == _port; } +vector<IceInternal::ConnectorPtr> +IceSSL::EndpointI::connectors(const vector<IceInternal::Address>& addresses, + const IceInternal::NetworkProxyPtr& proxy) const +{ + vector<IceInternal::ConnectorPtr> connectors; + for(unsigned int i = 0; i < addresses.size(); ++i) + { + connectors.push_back(new ConnectorI(_instance, _host, addresses[i], proxy, _timeout, _connectionId)); + } + return connectors; +} + bool IceSSL::EndpointI::operator==(const Ice::LocalObject& r) const { @@ -533,17 +545,6 @@ IceSSL::EndpointI::hashInit() const return h; } -vector<IceInternal::ConnectorPtr> -IceSSL::EndpointI::connectors(const vector<IceInternal::Address>& addresses) const -{ - vector<IceInternal::ConnectorPtr> connectors; - for(unsigned int i = 0; i < addresses.size(); ++i) - { - connectors.push_back(new ConnectorI(_instance, _host, addresses[i], _timeout, _connectionId)); - } - return connectors; -} - IceSSL::EndpointFactoryI::EndpointFactoryI(const InstancePtr& instance) : _instance(instance) { diff --git a/cpp/src/IceSSL/EndpointI.h b/cpp/src/IceSSL/EndpointI.h index a54ac39f1e1..d5ffa52eccd 100644 --- a/cpp/src/IceSSL/EndpointI.h +++ b/cpp/src/IceSSL/EndpointI.h @@ -45,6 +45,8 @@ public: virtual IceInternal::AcceptorPtr acceptor(IceInternal::EndpointIPtr&, const std::string&) const; virtual std::vector<IceInternal::EndpointIPtr> expand() const; virtual bool equivalent(const IceInternal::EndpointIPtr&) const; + virtual std::vector<IceInternal::ConnectorPtr> connectors(const std::vector<IceInternal::Address>&, + const IceInternal::NetworkProxyPtr&) const; virtual bool operator==(const Ice::LocalObject&) const; virtual bool operator<(const Ice::LocalObject&) const; @@ -56,7 +58,6 @@ public: private: virtual ::Ice::Int hashInit() const; - virtual std::vector<IceInternal::ConnectorPtr> connectors(const std::vector<IceInternal::Address>&) const; // // All members are const, because endpoints are immutable. diff --git a/cpp/src/IceSSL/Instance.cpp b/cpp/src/IceSSL/Instance.cpp index bd56fd7eb99..eb2ae99f78f 100644 --- a/cpp/src/IceSSL/Instance.cpp +++ b/cpp/src/IceSSL/Instance.cpp @@ -812,6 +812,12 @@ IceSSL::Instance::preferIPv6() const return _facade->preferIPv6(); } +IceInternal::NetworkProxyPtr +IceSSL::Instance::networkProxy() const +{ + return _facade->getNetworkProxy(); +} + string IceSSL::Instance::defaultHost() const { diff --git a/cpp/src/IceSSL/Instance.h b/cpp/src/IceSSL/Instance.h index eacad08d549..ff63c33b951 100644 --- a/cpp/src/IceSSL/Instance.h +++ b/cpp/src/IceSSL/Instance.h @@ -40,6 +40,7 @@ public: IceInternal::EndpointHostResolverPtr endpointHostResolver() const; IceInternal::ProtocolSupport protocolSupport() const; bool preferIPv6() const; + IceInternal::NetworkProxyPtr networkProxy() const; std::string defaultHost() const; Ice::EncodingVersion defaultEncoding() const; int networkTraceLevel() const; diff --git a/cpp/src/IceSSL/TransceiverI.cpp b/cpp/src/IceSSL/TransceiverI.cpp index fdb6fe229ae..ec862faf22b 100644 --- a/cpp/src/IceSSL/TransceiverI.cpp +++ b/cpp/src/IceSSL/TransceiverI.cpp @@ -32,7 +32,7 @@ IceSSL::TransceiverI::getNativeInfo() return this; } -#if defined(ICE_USE_IOCP) +#ifdef ICE_USE_IOCP IceInternal::AsyncInfo* IceSSL::TransceiverI::getAsyncInfo(IceInternal::SocketOperation status) { @@ -50,7 +50,7 @@ IceSSL::TransceiverI::getAsyncInfo(IceInternal::SocketOperation status) #endif IceInternal::SocketOperation -IceSSL::TransceiverI::initialize() +IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer) { try { @@ -61,14 +61,89 @@ IceSSL::TransceiverI::initialize() } else if(_state <= StateConnectPending) { -#ifndef ICE_USE_IOCP +#ifdef ICE_USE_IOCP + IceInternal::doFinishConnectAsync(_fd, _write); +#else IceInternal::doFinishConnect(_fd); +#endif + + _desc = IceInternal::fdToString(_fd, _proxy, _addr, true); + + if(_proxy) + { + // + // Prepare the read & write buffers in advance. + // + _proxy->beginWriteConnectRequest(_addr, writeBuffer); + _proxy->beginReadConnectRequestResponse(readBuffer); + +#ifdef ICE_USE_IOCP + // + // Return SocketOperationWrite to indicate we need to start a write. + // + _state = StateProxyConnectRequest; // Send proxy connect request + return IceInternal::SocketOperationWrite; #else - IceInternal::doFinishConnectAsync(_fd, _write); + // + // Write the proxy connection message using TCP. + // + if(writeRaw(writeBuffer)) + { + // + // Write completed without blocking. + // + _proxy->endWriteConnectRequest(writeBuffer); + + // + // Try to read the response using TCP. + // + if(readRaw(readBuffer)) + { + // + // Read completed without blocking - fall through. + // + _proxy->endReadConnectRequestResponse(readBuffer); + } + else + { + // + // Return SocketOperationRead to indicate we need to complete the read. + // + _state = StateProxyConnectRequestPending; // Wait for proxy response + return IceInternal::SocketOperationRead; + } + } + else + { + // + // Return SocketOperationWrite to indicate we need to complete the write. + // + _state = StateProxyConnectRequest; // Send proxy connect request + return IceInternal::SocketOperationWrite; + } #endif + } + _state = StateConnected; - _desc = IceInternal::fdToString(_fd); } + else if(_state == StateProxyConnectRequest) + { + // + // Write completed. + // + _proxy->endWriteConnectRequest(writeBuffer); + _state = StateProxyConnectRequestPending; // Wait for proxy response + return IceInternal::SocketOperationRead; + } + else if(_state == StateProxyConnectRequestPending) + { + // + // Read completed. + // + _proxy->endReadConnectRequestResponse(readBuffer); + _state = StateConnected; + } + assert(_state == StateConnected); if(!_ssl) @@ -96,16 +171,18 @@ IceSSL::TransceiverI::initialize() _sentBytes = 0; #endif -#ifndef ICE_USE_IOCP - // This static_cast is necessary due to 64bit windows. There SOCKET is a non-int type. - BIO* bio = BIO_new_socket(static_cast<int>(_fd), 0); -#else +#ifdef ICE_USE_IOCP BIO* bio; if(!BIO_new_bio_pair(&bio, _maxSendPacketSize, &_iocpBio, _maxReceivePacketSize)) { bio = 0; _iocpBio = 0; } +#else + // + // This static_cast is necessary due to 64bit windows. There SOCKET is a non-int type. + // + BIO* bio = BIO_new_socket(static_cast<int>(_fd), 0); #endif if(!bio) { @@ -255,17 +332,7 @@ IceSSL::TransceiverI::initialize() } else { -#ifndef _WIN32 - // - // The local address is only accessible with connected sockets on Windows. - // - IceInternal::Address localAddr; - IceInternal::fdToLocalAddress(_fd, localAddr); - out << "local address: " << IceInternal::addrToString(localAddr) << "\n"; -#else - out << "local address: <not available>\n"; -#endif - out << "remote address: " << IceInternal::addrToString(_connectAddr) << "\n" << ex; + out << IceInternal::fdToString(_fd, _proxy, _addr, false) << "\n" << ex; } } throw; @@ -341,6 +408,14 @@ IceSSL::TransceiverI::close() bool IceSSL::TransceiverI::write(IceInternal::Buffer& buf) { + if(_state == StateProxyConnectRequest) + { + // + // We need to write the proxy message, but we have to use TCP and not SSL. + // + return writeRaw(buf); + } + #ifdef ICE_USE_IOCP if(_writeI != _writeBuffer.end()) { @@ -351,15 +426,15 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf) } #endif - // Its impossible for the packetSize to be more than an Int. + // + // It's impossible for packetSize to be more than an Int. + // int packetSize = static_cast<int>(buf.b.end() - buf.i); while(buf.i != buf.b.end()) { ERR_clear_error(); // Clear any spurious errors. assert(_fd != INVALID_SOCKET); -#ifndef ICE_USE_IOCP - int ret = SSL_write(_ssl, reinterpret_cast<const void*>(&*buf.i), packetSize); -#else +#ifdef ICE_USE_IOCP int ret; if(_sentBytes) { @@ -378,6 +453,8 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf) } } } +#else + int ret = SSL_write(_ssl, reinterpret_cast<const void*>(&*buf.i), packetSize); #endif if(ret <= 0) { @@ -482,6 +559,14 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf) bool IceSSL::TransceiverI::read(IceInternal::Buffer& buf) { + if(_state == StateProxyConnectRequestPending) + { + // + // We need to read the proxy reply, but we have to use TCP and not SSL. + // + return readRaw(buf); + } + #ifdef ICE_USE_IOCP if(_readI != _readBuffer.end()) { @@ -492,7 +577,9 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf) } #endif - // It's impossible for the packetSize to be more than an Int. + // + // It's impossible for packetSize to be more than an Int. + // int packetSize = static_cast<int>(buf.b.end() - buf.i); while(buf.i != buf.b.end()) { @@ -644,51 +731,40 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf) } #ifdef ICE_USE_IOCP + bool -IceSSL::TransceiverI::startWrite(IceInternal::Buffer& /*buf*/) +IceSSL::TransceiverI::startWrite(IceInternal::Buffer& buf) { - if(_state < StateConnected) + if(_state == StateConnectPending) { - IceInternal::doConnectAsync(_fd, _connectAddr, _write); + IceInternal::Address addr = _proxy ? _proxy->getAddress() : _addr; + IceInternal::doConnectAsync(_fd, addr, _write); return false; } + else if(_state == StateProxyConnectRequest) + { + // + // We need to write the proxy message, but we have to use TCP and not SSL. + // + assert(!buf.b.empty() && buf.i != buf.b.end()); + + const int packetSize = static_cast<int>(buf.b.end() - buf.i); + const int actualSize = writeAsync(reinterpret_cast<char*>(&*buf.i), packetSize); + return packetSize == actualSize; + } assert(!_writeBuffer.empty() && _writeI != _writeBuffer.end()); - int packetSize = static_cast<int>(_writeBuffer.end() - _writeI); - if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) - { - packetSize = _maxSendPacketSize; - } + const int packetSize = static_cast<int>(_writeBuffer.end() - _writeI); + const int actualSize = writeAsync(reinterpret_cast<char*>(&*_writeI), packetSize); - _write.buf.len = packetSize; - _write.buf.buf = reinterpret_cast<char*>(&*_writeI); - int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL); - if(err == SOCKET_ERROR) - { - if(!IceInternal::wouldBlock()) - { - if(IceInternal::connectionLost()) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = IceInternal::getSocketErrno(); - throw ex; - } - else - { - SocketException ex(__FILE__, __LINE__); - ex.error = IceInternal::getSocketErrno(); - throw ex; - } - } - } - return packetSize == static_cast<int>(_writeBuffer.end() - _writeI); + return packetSize == actualSize; } void -IceSSL::TransceiverI::finishWrite(IceInternal::Buffer& /*buf*/) +IceSSL::TransceiverI::finishWrite(IceInternal::Buffer& buf) { - if(_state < StateConnected) + if(_state < StateConnected && _state != StateProxyConnectRequest) { return; } @@ -710,12 +786,30 @@ IceSSL::TransceiverI::finishWrite(IceInternal::Buffer& /*buf*/) } } - _writeI += _write.count; + if(_state == StateProxyConnectRequest) + { + buf.i += _write.count; + } + else + { + _writeI += _write.count; + } } void IceSSL::TransceiverI::startRead(IceInternal::Buffer& buf) { + if(_state == StateProxyConnectRequestPending) + { + // + // We need to read the proxy reply, but we have to use TCP and not SSL. + // + assert(!buf.b.empty() && buf.i != buf.b.end()); + const int packetSize = static_cast<int>(buf.b.end() - buf.i); + readAsync(reinterpret_cast<char*>(&*buf.i), packetSize); + return; + } + if(_readI == _readBuffer.end()) { assert(!buf.b.empty() && buf.i != buf.b.end()); @@ -723,7 +817,7 @@ IceSSL::TransceiverI::startRead(IceInternal::Buffer& buf) ERR_clear_error(); // Clear any spurious errors. #ifndef NDEBUG - int ret = + int ret = #endif SSL_read(_ssl, reinterpret_cast<void*>(&*buf.i), static_cast<int>(buf.b.end() - buf.i)); assert(ret <= 0 && SSL_get_error(_ssl, ret) == SSL_ERROR_WANT_READ); @@ -735,37 +829,12 @@ IceSSL::TransceiverI::startRead(IceInternal::Buffer& buf) assert(!_readBuffer.empty() && _readI != _readBuffer.end()); - int packetSize = static_cast<int>(_readBuffer.end() - _readI); - if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize) - { - packetSize = _maxReceivePacketSize; - } - - _read.buf.len = packetSize; - _read.buf.buf = reinterpret_cast<char*>(&*_readI); - int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL); - if(err == SOCKET_ERROR) - { - if(!IceInternal::wouldBlock()) - { - if(IceInternal::connectionLost()) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = IceInternal::getSocketErrno(); - throw ex; - } - else - { - SocketException ex(__FILE__, __LINE__); - ex.error = IceInternal::getSocketErrno(); - throw ex; - } - } - } + const int packetSize = static_cast<int>(_readBuffer.end() - _readI); + readAsync(reinterpret_cast<char*>(&*_readI), packetSize); } void -IceSSL::TransceiverI::finishRead(IceInternal::Buffer& /*buf*/) +IceSSL::TransceiverI::finishRead(IceInternal::Buffer& buf) { if(static_cast<int>(_read.count) == SOCKET_ERROR) { @@ -783,22 +852,36 @@ IceSSL::TransceiverI::finishRead(IceInternal::Buffer& /*buf*/) throw ex; } } + else if(_read.count == 0) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = 0; + throw ex; + } - _readI += _read.count; - - if(_iocpBio && _readI == _readBuffer.end()) + if(_state == StateProxyConnectRequestPending) { - assert(_readI == _readBuffer.end()); - int n = BIO_write(_iocpBio, &_readBuffer[0], static_cast<int>(_readBuffer.size())); - if(n < 0) // Expected if the transceiver was closed. + buf.i += _read.count; + } + else + { + _readI += _read.count; + + if(_iocpBio && _readI == _readBuffer.end()) { - SecurityException ex(__FILE__, __LINE__); - ex.reason = "SSL bio write failed"; - throw ex; + assert(_readI == _readBuffer.end()); + int n = BIO_write(_iocpBio, &_readBuffer[0], static_cast<int>(_readBuffer.size())); + if(n < 0) // Expected if the transceiver was closed. + { + SecurityException ex(__FILE__, __LINE__); + ex.reason = "SSL bio write failed"; + throw ex; + } + assert(n == static_cast<int>(_readBuffer.size())); } - assert(n == static_cast<int>(_readBuffer.size())); } } + #endif string @@ -828,15 +911,17 @@ IceSSL::TransceiverI::checkSendSize(const IceInternal::Buffer& buf, size_t messa } } -IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const string& host, - const IceInternal::Address& addr) : +IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const IceInternal::NetworkProxyPtr& proxy, + const string& host, const IceInternal::Address& addr) : IceInternal::NativeInfo(fd), _instance(instance), _logger(instance->communicator()->getLogger()), _stats(instance->communicator()->getStats()), - _ssl(0), + _proxy(proxy), _host(host), + _addr(addr), _incoming(false), + _ssl(0), _state(StateNeedConnect) #ifdef ICE_USE_IOCP , _iocpBio(0), @@ -863,7 +948,6 @@ IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const _desc = IceInternal::fdToString(_fd); } #endif - _connectAddr = addr; } IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const string& adapterName) : @@ -871,9 +955,9 @@ IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const _instance(instance), _logger(instance->communicator()->getLogger()), _stats(instance->communicator()->getStats()), - _ssl(0), - _incoming(true), _adapterName(adapterName), + _incoming(true), + _ssl(0), _state(StateConnected), _desc(IceInternal::fdToString(fd)) #ifdef ICE_USE_IOCP @@ -919,7 +1003,7 @@ IceSSL::TransceiverI::getNativeConnectionInfo() const { X509_free(cert); } - + if(chain != 0) { for(int i = 0; i < sk_X509_num(chain); ++i) @@ -942,6 +1026,7 @@ IceSSL::TransceiverI::getNativeConnectionInfo() const } #ifdef ICE_USE_IOCP + bool IceSSL::TransceiverI::receive() { @@ -1005,7 +1090,7 @@ IceSSL::TransceiverI::receive() assert(_readI == _readBuffer.end()); #ifndef NDEBUG - int n = + int n = #endif BIO_write(_iocpBio, &_readBuffer[0], static_cast<int>(_readBuffer.size())); @@ -1021,7 +1106,7 @@ IceSSL::TransceiverI::send() assert(BIO_ctrl_pending(_iocpBio)); _writeBuffer.resize(BIO_ctrl_pending(_iocpBio)); #ifndef NDEBUG - int n = + int n = #endif BIO_read(_iocpBio, &_writeBuffer[0], static_cast<int>(_writeBuffer.size())); assert(n == static_cast<int>(_writeBuffer.size())); @@ -1085,4 +1170,226 @@ IceSSL::TransceiverI::send() return true; } +int +IceSSL::TransceiverI::writeAsync(char* buf, int packetSize) +{ + if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) + { + packetSize = _maxSendPacketSize; + } + + _write.buf.len = packetSize; + _write.buf.buf = buf; + + int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL); + + if(err == SOCKET_ERROR) + { + if(!IceInternal::wouldBlock()) + { + if(IceInternal::connectionLost()) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + } + } + + return packetSize; +} + +int +IceSSL::TransceiverI::readAsync(char* buf, int packetSize) +{ + if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize) + { + packetSize = _maxReceivePacketSize; + } + + _read.buf.len = packetSize; + _read.buf.buf = buf; + + int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL); + + if(err == SOCKET_ERROR) + { + if(!IceInternal::wouldBlock()) + { + if(IceInternal::connectionLost()) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + } + } + + return packetSize; +} + +#endif + +bool +IceSSL::TransceiverI::writeRaw(IceInternal::Buffer& buf) +{ + // + // It's impossible for packetSize to be more than an Int. + // + int packetSize = static_cast<int>(buf.b.end() - buf.i); +#ifdef ICE_USE_IOCP + // + // Limit packet size to avoid performance problems on WIN32 + // + if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) + { + packetSize = _maxSendPacketSize; + } #endif + while(buf.i != buf.b.end()) + { + assert(_fd != INVALID_SOCKET); + + ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0); + if(ret == 0) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = 0; + throw ex; + } + + if(ret == SOCKET_ERROR) + { + if(IceInternal::interrupted()) + { + continue; + } + + if(IceInternal::noBuffers() && packetSize > 1024) + { + packetSize /= 2; + continue; + } + + if(IceInternal::wouldBlock()) + { + return false; + } + + if(IceInternal::connectionLost()) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + } + + if(_instance->networkTraceLevel() >= 3) + { + Trace out(_logger, _instance->networkTraceCategory()); + out << "sent " << ret << " of " << packetSize << " bytes via tcp\n" << toString(); + } + + if(_stats) + { + _stats->bytesSent("tcp", static_cast<Int>(ret)); + } + + buf.i += ret; + + if(packetSize > buf.b.end() - buf.i) + { + packetSize = static_cast<int>(buf.b.end() - buf.i); + } + } + + return true; +} + +bool +IceSSL::TransceiverI::readRaw(IceInternal::Buffer& buf) +{ + // + // It's impossible for packetSize to be more than an Int. + // + int packetSize = static_cast<int>(buf.b.end() - buf.i); + while(buf.i != buf.b.end()) + { + assert(_fd != INVALID_SOCKET); + ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0); + + if(ret == 0) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = 0; + throw ex; + } + + if(ret == SOCKET_ERROR) + { + if(IceInternal::interrupted()) + { + continue; + } + + if(IceInternal::noBuffers() && packetSize > 1024) + { + packetSize /= 2; + continue; + } + + if(IceInternal::wouldBlock()) + { + return false; + } + + if(IceInternal::connectionLost()) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + } + + if(_instance->networkTraceLevel() >= 3) + { + Trace out(_logger, _instance->networkTraceCategory()); + out << "received " << ret << " of " << packetSize << " bytes via tcp\n" << toString(); + } + + if(_stats) + { + _stats->bytesReceived("tcp", static_cast<Int>(ret)); + } + + buf.i += ret; + + packetSize = static_cast<int>(buf.b.end() - buf.i); + } + + return true; +} diff --git a/cpp/src/IceSSL/TransceiverI.h b/cpp/src/IceSSL/TransceiverI.h index c96265f58f4..23d6da3975e 100644 --- a/cpp/src/IceSSL/TransceiverI.h +++ b/cpp/src/IceSSL/TransceiverI.h @@ -33,6 +33,8 @@ class TransceiverI : public IceInternal::Transceiver, public IceInternal::Native { StateNeedConnect, StateConnectPending, + StateProxyConnectRequest, + StateProxyConnectRequestPending, StateConnected, StateHandshakeComplete }; @@ -44,7 +46,7 @@ public: virtual IceInternal::AsyncInfo* getAsyncInfo(IceInternal::SocketOperation); #endif - virtual IceInternal::SocketOperation initialize(); + virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&); virtual void close(); virtual bool write(IceInternal::Buffer&); virtual bool read(IceInternal::Buffer&); @@ -61,17 +63,23 @@ public: private: - TransceiverI(const InstancePtr&, SOCKET, const std::string&, const IceInternal::Address&); + TransceiverI(const InstancePtr&, SOCKET, const IceInternal::NetworkProxyPtr&, const std::string&, + const IceInternal::Address&); TransceiverI(const InstancePtr&, SOCKET, const std::string&); virtual ~TransceiverI(); virtual NativeConnectionInfoPtr getNativeConnectionInfo() const; #ifdef ICE_USE_IOCP - bool send(); bool receive(); + bool send(); + int writeAsync(char*, int); + int readAsync(char*, int); #endif + bool writeRaw(IceInternal::Buffer&); + bool readRaw(IceInternal::Buffer&); + friend class ConnectorI; friend class AcceptorI; @@ -79,16 +87,17 @@ private: const Ice::LoggerPtr _logger; const Ice::StatsPtr _stats; - SSL* _ssl; - + const IceInternal::NetworkProxyPtr _proxy; const std::string _host; + const IceInternal::Address _addr; - const bool _incoming; const std::string _adapterName; + const bool _incoming; + + SSL* _ssl; State _state; std::string _desc; - IceInternal::Address _connectAddr; #ifdef ICE_USE_IOCP int _maxSendPacketSize; int _maxReceivePacketSize; diff --git a/cpp/test/Glacier2/sessionHelper/Client.cpp b/cpp/test/Glacier2/sessionHelper/Client.cpp index 669fb7d7b32..7f38d70db77 100644 --- a/cpp/test/Glacier2/sessionHelper/Client.cpp +++ b/cpp/test/Glacier2/sessionHelper/Client.cpp @@ -160,7 +160,7 @@ public: { ex.ice_throw();; } - catch(const Ice::ConnectionRefusedException&) + catch(const Ice::ConnectFailedException&) { cout << "ok" << endl; instance->notify(); diff --git a/cpp/test/Ice/background/Transceiver.cpp b/cpp/test/Ice/background/Transceiver.cpp index 84114816101..a398f170bad 100644 --- a/cpp/test/Ice/background/Transceiver.cpp +++ b/cpp/test/Ice/background/Transceiver.cpp @@ -18,7 +18,7 @@ Transceiver::getNativeInfo() } IceInternal::SocketOperation -Transceiver::initialize() +Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer) { #ifndef ICE_USE_IOCP IceInternal::SocketOperation status = _configuration->initializeSocketOperation(); @@ -48,7 +48,7 @@ Transceiver::initialize() _configuration->checkInitializeException(); if(!_initialized) { - IceInternal::SocketOperation status = _transceiver->initialize(); + IceInternal::SocketOperation status = _transceiver->initialize(readBuffer, writeBuffer); if(status != IceInternal::SocketOperationNone) { return status; diff --git a/cpp/test/Ice/background/Transceiver.h b/cpp/test/Ice/background/Transceiver.h index 03b873130d0..770272557b2 100644 --- a/cpp/test/Ice/background/Transceiver.h +++ b/cpp/test/Ice/background/Transceiver.h @@ -31,7 +31,7 @@ public: virtual std::string type() const; virtual std::string toString() const; virtual Ice::ConnectionInfoPtr getInfo() const; - virtual IceInternal::SocketOperation initialize(); + virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&); virtual void checkSendSize(const IceInternal::Buffer&, size_t); private: diff --git a/cpp/test/Ice/binding/AllTests.cpp b/cpp/test/Ice/binding/AllTests.cpp index 0ab430667c5..2388bde4122 100644 --- a/cpp/test/Ice/binding/AllTests.cpp +++ b/cpp/test/Ice/binding/AllTests.cpp @@ -124,7 +124,7 @@ allTests(const Ice::CommunicatorPtr& communicator) test3->ice_ping(); test(false); } - catch(const Ice::ConnectionRefusedException&) + catch(const Ice::ConnectFailedException&) { } } @@ -468,7 +468,7 @@ allTests(const Ice::CommunicatorPtr& communicator) { test->getAdapterName(); } - catch(const Ice::ConnectionRefusedException&) + catch(const Ice::ConnectFailedException&) { } @@ -516,7 +516,7 @@ allTests(const Ice::CommunicatorPtr& communicator) test(test3->ice_getConnection() == test1->ice_getConnection()); test(false); } - catch(const Ice::ConnectionRefusedException&) + catch(const Ice::ConnectFailedException&) { } } @@ -627,7 +627,7 @@ allTests(const Ice::CommunicatorPtr& communicator) { test->getAdapterName(); } - catch(const Ice::ConnectionRefusedException&) + catch(const Ice::ConnectFailedException&) { } @@ -686,7 +686,7 @@ allTests(const Ice::CommunicatorPtr& communicator) { test->getAdapterName(); } - catch(const Ice::ConnectionRefusedException&) + catch(const Ice::ConnectFailedException&) { } @@ -779,7 +779,7 @@ allTests(const Ice::CommunicatorPtr& communicator) testSecure->ice_ping(); test(false); } - catch(const Ice::ConnectionRefusedException&) + catch(const Ice::ConnectFailedException&) { } diff --git a/cpp/test/Ice/proxy/AllTests.cpp b/cpp/test/Ice/proxy/AllTests.cpp index ffc03b2fc7d..96d8c5df79a 100644 --- a/cpp/test/Ice/proxy/AllTests.cpp +++ b/cpp/test/Ice/proxy/AllTests.cpp @@ -906,7 +906,7 @@ allTests(const Ice::CommunicatorPtr& communicator) { test(!ssl); } - catch(const Ice::ConnectionRefusedException&) + catch(const Ice::ConnectFailedException&) { test(ssl); } diff --git a/cpp/test/IceStorm/rep1/run.py b/cpp/test/IceStorm/rep1/run.py index a202c9da68d..e030fa03ab3 100755 --- a/cpp/test/IceStorm/rep1/run.py +++ b/cpp/test/IceStorm/rep1/run.py @@ -102,7 +102,7 @@ icestorm.admin("create single") for replica in range(1, 3): icestorm.adminForReplica(replica, "create single", "error: topic `single' exists") -icestorm.adminForReplica(0, "create single", "ConnectionRefused") +icestorm.adminForReplica(0, "create single", "ConnectFailed") icestorm.startReplica(0, echo=False) @@ -120,7 +120,7 @@ icestorm.admin("create single") for replica in range(0, 2): icestorm.adminForReplica(replica, "create single", "error: topic `single' exists") -icestorm.adminForReplica(2, "create single", "ConnectionRefused") +icestorm.adminForReplica(2, "create single", "ConnectFailed") icestorm.startReplica(2, echo=False) @@ -138,7 +138,7 @@ icestorm.admin("destroy single") for replica in range(1, 3): icestorm.adminForReplica(replica, "destroy single", "error: couldn't find topic `single'") -icestorm.adminForReplica(0, "destroy single", "ConnectionRefused") +icestorm.adminForReplica(0, "destroy single", "ConnectFailed") icestorm.startReplica(0, echo=False) @@ -156,7 +156,7 @@ icestorm.admin("destroy single") for replica in range(0, 2): icestorm.adminForReplica(replica, "destroy single", "error: couldn't find topic `single'") -icestorm.adminForReplica(2, "destroy single", "ConnectionRefused") +icestorm.adminForReplica(2, "destroy single", "ConnectFailed") icestorm.startReplica(2, echo=False) @@ -192,7 +192,7 @@ runsub2() for replica in range(0, 2): runsub2(replica, "IceStorm::AlreadySubscribed") -runsub2(2, "ConnectionRefused") +runsub2(2, "ConnectFailed") icestorm.startReplica(2, echo=False) @@ -208,7 +208,7 @@ rununsub2() for replica in range(0, 2): rununsub2(replica) -rununsub2(2, "ConnectionRefused") +rununsub2(2, "ConnectFailed") icestorm.startReplica(2, echo=False) @@ -224,7 +224,7 @@ runsub2() for replica in range(1, 3): runsub2(replica, "IceStorm::AlreadySubscribed") -runsub2(0, "ConnectionRefused") +runsub2(0, "ConnectFailed") icestorm.startReplica(0, echo=False) @@ -240,7 +240,7 @@ rununsub2() for replica in range(1, 3): rununsub2(replica) -rununsub2(0, "ConnectionRefused") +rununsub2(0, "ConnectFailed") icestorm.startReplica(0, echo=False) |