summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorMark Spruiell <mes@zeroc.com>2013-04-04 16:02:42 -0700
committerMark Spruiell <mes@zeroc.com>2013-04-04 16:02:42 -0700
commit9cb665138c7d2422739e32b40a249c64fd3b6cd5 (patch)
tree94759d916599ca08761b98580185a230744ac67a /cpp/src
parentx64 VC10 icexml35d.dll was linked to wrong file (diff)
downloadice-9cb665138c7d2422739e32b40a249c64fd3b6cd5.tar.bz2
ice-9cb665138c7d2422739e32b40a249c64fd3b6cd5.tar.xz
ice-9cb665138c7d2422739e32b40a249c64fd3b6cd5.zip
* SOCKS support for C++
* Minor cleanup in C# * Unity fixes
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp2
-rw-r--r--cpp/src/Ice/EndpointI.cpp71
-rw-r--r--cpp/src/Ice/EndpointI.h3
-rw-r--r--cpp/src/Ice/Instance.cpp23
-rw-r--r--cpp/src/Ice/Instance.h3
-rw-r--r--cpp/src/Ice/Network.cpp169
-rw-r--r--cpp/src/Ice/Network.h67
-rw-r--r--cpp/src/Ice/NetworkF.h30
-rw-r--r--cpp/src/Ice/ProtocolPluginFacade.cpp6
-rw-r--r--cpp/src/Ice/TcpAcceptor.cpp2
-rw-r--r--cpp/src/Ice/TcpConnector.cpp9
-rw-r--r--cpp/src/Ice/TcpConnector.h3
-rw-r--r--cpp/src/Ice/TcpEndpointI.cpp22
-rw-r--r--cpp/src/Ice/TcpEndpointI.h2
-rw-r--r--cpp/src/Ice/TcpTransceiver.cpp219
-rw-r--r--cpp/src/Ice/TcpTransceiver.h12
-rw-r--r--cpp/src/Ice/Transceiver.h2
-rw-r--r--cpp/src/Ice/UdpEndpointI.cpp22
-rw-r--r--cpp/src/Ice/UdpEndpointI.h2
-rw-r--r--cpp/src/Ice/UdpTransceiver.cpp2
-rw-r--r--cpp/src/Ice/UdpTransceiver.h2
-rw-r--r--cpp/src/IceGrid/ReplicaCache.cpp8
-rw-r--r--cpp/src/IceSSL/ConnectorI.cpp8
-rw-r--r--cpp/src/IceSSL/ConnectorI.h6
-rw-r--r--cpp/src/IceSSL/EndpointI.cpp23
-rw-r--r--cpp/src/IceSSL/EndpointI.h3
-rw-r--r--cpp/src/IceSSL/Instance.cpp6
-rw-r--r--cpp/src/IceSSL/Instance.h1
-rw-r--r--cpp/src/IceSSL/TransceiverI.cpp519
-rw-r--r--cpp/src/IceSSL/TransceiverI.h23
30 files changed, 1016 insertions, 254 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index f6c52ee7c91..9be395b3f91 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -2280,7 +2280,7 @@ Ice::ConnectionI::initiateShutdown()
bool
Ice::ConnectionI::initialize(SocketOperation operation)
{
- SocketOperation s = _transceiver->initialize();
+ SocketOperation s = _transceiver->initialize(_readStream, _writeStream);
if(s != SocketOperationNone)
{
scheduleTimeout(s, connectTimeout());
diff --git a/cpp/src/Ice/EndpointI.cpp b/cpp/src/Ice/EndpointI.cpp
index aabcb14b2b0..9cd4823c9ae 100644
--- a/cpp/src/Ice/EndpointI.cpp
+++ b/cpp/src/Ice/EndpointI.cpp
@@ -46,6 +46,17 @@ Init init;
Ice::LocalObject* IceInternal::upCast(EndpointI* p) { return p; }
IceUtil::Shared* IceInternal::upCast(EndpointHostResolver* p) { return p; }
+vector<ConnectorPtr>
+IceInternal::EndpointI::connectors(const vector<Address>& /*addrs*/, const NetworkProxyPtr& /*proxy*/) const
+{
+ //
+ // This method must be extended by endpoints which use the EndpointHostResolver to create
+ // connectors from IP addresses.
+ //
+ assert(false);
+ return vector<ConnectorPtr>();
+}
+
const string&
IceInternal::EndpointI::connectionId() const
{
@@ -63,17 +74,6 @@ IceInternal::EndpointI::internal_getHash() const
return _hashValue;
}
-vector<ConnectorPtr>
-IceInternal::EndpointI::connectors(const vector<Address>& /*addrs*/) const
-{
- //
- // This method must be extended by endpoints which use the EndpointHostResolver to create
- // connectors from IP addresses.
- //
- assert(false);
- return vector<ConnectorPtr>();
-}
-
IceInternal::EndpointI::EndpointI(const std::string& connectionId) :
_connectionId(connectionId),
_hashInitialized(false)
@@ -128,10 +128,14 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, Ice::En
// Try to get the addresses without DNS lookup. If this doesn't
// work, we retry with DNS lookup (and observer).
//
- vector<Address> addrs = getAddresses(host, port, _protocol, selType, _preferIPv6, false);
- if(!addrs.empty())
+ NetworkProxyPtr networkProxy = _instance->networkProxy();
+ if(!networkProxy)
{
- return endpoint->connectors(addrs);
+ vector<Address> addrs = getAddresses(host, port, _protocol, selType, _preferIPv6, false);
+ if(!addrs.empty())
+ {
+ return endpoint->connectors(addrs, 0);
+ }
}
ObserverHelperT<> observer;
@@ -144,7 +148,13 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, Ice::En
vector<ConnectorPtr> connectors;
try
{
- connectors = endpoint->connectors(getAddresses(host, port, _protocol, selType, _preferIPv6, true));
+ if(networkProxy)
+ {
+ networkProxy = networkProxy->resolveHost();
+ }
+
+ connectors = endpoint->connectors(getAddresses(host, port, _protocol, selType, _preferIPv6, true),
+ networkProxy);
}
catch(const Ice::LocalException& ex)
{
@@ -162,20 +172,24 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, Ice::En
// Try to get the addresses without DNS lookup. If this doesn't work, we queue a resolve
// entry and the thread will take care of getting the endpoint addresses.
//
- try
+ NetworkProxyPtr networkProxy = _instance->networkProxy();
+ if(!networkProxy)
{
- vector<Address> addrs = getAddresses(host, port, _protocol, selType, _preferIPv6, false);
- if(!addrs.empty())
+ try
{
- callback->connectors(endpoint->connectors(addrs));
+ vector<Address> addrs = getAddresses(host, port, _protocol, selType, _preferIPv6, false);
+ if(!addrs.empty())
+ {
+ callback->connectors(endpoint->connectors(addrs, 0));
+ return;
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ callback->exception(ex);
return;
}
}
- catch(const Ice::LocalException& ex)
- {
- callback->exception(ex);
- return;
- }
Lock sync(*this);
assert(!_destroyed);
@@ -241,11 +255,18 @@ IceInternal::EndpointHostResolver::run()
threadObserver->stateChanged(ThreadStateIdle, ThreadStateInUseForOther);
}
+ NetworkProxyPtr networkProxy = _instance->networkProxy();
+ if(networkProxy)
+ {
+ networkProxy = networkProxy->resolveHost();
+ }
+
r.callback->connectors(r.endpoint->connectors(getAddresses(r.host,
r.port,
_protocol,
r.selType,
- _preferIPv6, true)));
+ _preferIPv6, true),
+ networkProxy));
if(threadObserver)
{
diff --git a/cpp/src/Ice/EndpointI.h b/cpp/src/Ice/EndpointI.h
index ea0b577c5ca..3e14f4d30b9 100644
--- a/cpp/src/Ice/EndpointI.h
+++ b/cpp/src/Ice/EndpointI.h
@@ -143,6 +143,8 @@ public:
//
virtual bool equivalent(const EndpointIPtr&) const = 0;
+ virtual std::vector<ConnectorPtr> connectors(const std::vector<Address>&, const NetworkProxyPtr&) const;
+
//
// Compare endpoints for sorting purposes.
//
@@ -155,7 +157,6 @@ protected:
virtual ::Ice::Int internal_getHash() const;
- virtual std::vector<ConnectorPtr> connectors(const std::vector<Address>&) const;
friend class EndpointHostResolver;
EndpointI(const std::string&);
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp
index f42b4d10a69..b682ce39cff 100644
--- a/cpp/src/Ice/Instance.cpp
+++ b/cpp/src/Ice/Instance.cpp
@@ -309,6 +309,12 @@ IceInternal::Instance::preferIPv6() const
return _preferIPv6;
}
+NetworkProxyPtr
+IceInternal::Instance::networkProxy() const
+{
+ return _networkProxy;
+}
+
ThreadPoolPtr
IceInternal::Instance::clientThreadPool()
{
@@ -1045,8 +1051,17 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi
_proxyFactory = new ProxyFactory(this);
+ string proxyHost = _initData.properties->getProperty("Ice.SOCKSProxyHost");
+ int defaultIPv6 = 1; // IPv6 enabled by default.
+ if(!proxyHost.empty())
+ {
+ int proxyPort = _initData.properties->getPropertyAsIntWithDefault("Ice.SOCKSProxyPort", 1080);
+ _networkProxy = new SOCKSNetworkProxy(proxyHost, proxyPort);
+ defaultIPv6 = 0; // IPv6 is not supported with SOCKS
+ }
+
bool ipv4 = _initData.properties->getPropertyAsIntWithDefault("Ice.IPv4", 1) > 0;
- bool ipv6 = _initData.properties->getPropertyAsIntWithDefault("Ice.IPv6", 1) > 0;
+ bool ipv6 = _initData.properties->getPropertyAsIntWithDefault("Ice.IPv6", defaultIPv6) > 0;
if(!ipv4 && !ipv6)
{
throw InitializationException(__FILE__, __LINE__, "Both IPV4 and IPv6 support cannot be disabled.");
@@ -1064,6 +1079,12 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi
_protocolSupport = EnableIPv6;
}
_preferIPv6 = _initData.properties->getPropertyAsInt("Ice.PreferIPv6Address") > 0;
+
+ if(ipv6 && SOCKSNetworkProxyPtr::dynamicCast(_networkProxy))
+ {
+ throw InitializationException(__FILE__, __LINE__, "IPv6 is not supported with SOCKS4 proxies");
+ }
+
_endpointFactoryManager = new EndpointFactoryManager(this);
#ifndef ICE_OS_WINRT
EndpointFactoryPtr tcpEndpointFactory = new TcpEndpointFactory(this);
diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h
index 0437f4d10ab..d092e49dc3d 100644
--- a/cpp/src/Ice/Instance.h
+++ b/cpp/src/Ice/Instance.h
@@ -34,6 +34,7 @@
#include <Ice/RetryQueueF.h>
#include <Ice/DynamicLibraryF.h>
#include <Ice/PluginF.h>
+#include <Ice/NetworkF.h>
#include <Ice/Initialize.h>
#include <Ice/ImplicitContextI.h>
#include <Ice/FacetMap.h>
@@ -73,6 +74,7 @@ public:
ObjectAdapterFactoryPtr objectAdapterFactory() const;
ProtocolSupport protocolSupport() const;
bool preferIPv6() const;
+ NetworkProxyPtr networkProxy() const;
ThreadPoolPtr clientThreadPool();
ThreadPoolPtr serverThreadPool(bool create = true);
EndpointHostResolverPtr endpointHostResolver();
@@ -136,6 +138,7 @@ private:
ObjectAdapterFactoryPtr _objectAdapterFactory;
ProtocolSupport _protocolSupport;
bool _preferIPv6;
+ NetworkProxyPtr _networkProxy;
ThreadPoolPtr _clientThreadPool;
ThreadPoolPtr _serverThreadPool;
EndpointHostResolverPtr _endpointHostResolver;
diff --git a/cpp/src/Ice/Network.cpp b/cpp/src/Ice/Network.cpp
index e60712ab51a..b1a921220e2 100644
--- a/cpp/src/Ice/Network.cpp
+++ b/cpp/src/Ice/Network.cpp
@@ -35,6 +35,7 @@
#include <Ice/LocalException.h>
#include <Ice/Properties.h> // For setTcpBufSize
#include <Ice/LoggerUtil.h> // For setTcpBufSize
+#include <Ice/Buffer.h>
#include <IceUtil/Random.h>
#if defined(ICE_OS_WINRT)
@@ -544,6 +545,114 @@ IceInternal::AsyncInfo::AsyncInfo(SocketOperation s)
}
#endif
+IceUtil::Shared* IceInternal::upCast(NetworkProxy* p) { return p; }
+IceUtil::Shared* IceInternal::upCast(SOCKSNetworkProxy* p) { return p; }
+
+IceInternal::SOCKSNetworkProxy::SOCKSNetworkProxy(const string& host, int port) :
+ _host(host), _port(port), _haveAddress(false)
+{
+ memset(&_address, 0, sizeof(_address));
+}
+
+IceInternal::SOCKSNetworkProxy::SOCKSNetworkProxy(const Address& addr) :
+ _port(0), _address(addr), _haveAddress(true)
+{
+}
+
+void
+IceInternal::SOCKSNetworkProxy::beginWriteConnectRequest(const Address& addr, Buffer& buf)
+{
+ if(addr.saStorage.ss_family != AF_INET)
+ {
+ throw FeatureNotSupportedException(__FILE__, __LINE__, "SOCKS4 only supports IPv4 addresses");
+ }
+
+ //
+ // SOCKS connect request
+ //
+ buf.b.resize(9);
+ buf.i = buf.b.begin();
+ Byte* dest = &buf.b[0];
+ *dest++ = 0x04; // SOCKS version 4.
+ *dest++ = 0x01; // Command, establish a TCP/IP stream connection
+
+ const Byte* src;
+
+ //
+ // Port (already in big-endian order)
+ //
+ src = reinterpret_cast<const Byte*>(&addr.saIn.sin_port);
+ *dest++ = *src++;
+ *dest++ = *src;
+
+ //
+ // IPv4 address (already in big-endian order)
+ //
+ src = reinterpret_cast<const Ice::Byte*>(&addr.saIn.sin_addr.s_addr);
+ *dest++ = *src++;
+ *dest++ = *src++;
+ *dest++ = *src++;
+ *dest++ = *src;
+
+ *dest = 0x00; // User ID.
+}
+
+void
+IceInternal::SOCKSNetworkProxy::endWriteConnectRequest(Buffer& buf)
+{
+ buf.b.reset();
+}
+
+void
+IceInternal::SOCKSNetworkProxy::beginReadConnectRequestResponse(Buffer& buf)
+{
+ //
+ // Read the SOCKS4 response whose size is 8 bytes.
+ //
+ buf.b.resize(8);
+ buf.i = buf.b.begin();
+}
+
+void
+IceInternal::SOCKSNetworkProxy::endReadConnectRequestResponse(Buffer& buf)
+{
+ buf.i = buf.b.begin();
+
+ if(buf.b.end() - buf.i < 2)
+ {
+ throw UnmarshalOutOfBoundsException(__FILE__, __LINE__);
+ }
+
+ const Byte* src = &(*buf.i);
+ const Byte b1 = *src++;
+ const Byte b2 = *src++;
+ if(b1 != 0x00 || b2 != 0x5a)
+ {
+ throw ConnectFailedException(__FILE__, __LINE__);
+ }
+ buf.b.reset();
+}
+
+NetworkProxyPtr
+IceInternal::SOCKSNetworkProxy::resolveHost() const
+{
+ assert(!_host.empty());
+ return new SOCKSNetworkProxy(getAddresses(_host, _port, EnableIPv4, Random, false, true)[0]);
+}
+
+Address
+IceInternal::SOCKSNetworkProxy::getAddress() const
+{
+ assert(_haveAddress); // Host must be resolved.
+ return _address;
+}
+
+string
+IceInternal::SOCKSNetworkProxy::getName() const
+{
+ return "SOCKS";
+}
+
bool
IceInternal::noMoreFds(int error)
{
@@ -1020,6 +1129,64 @@ IceInternal::fdToRemoteAddress(SOCKET fd, Address& addr)
#endif
}
+std::string
+IceInternal::fdToString(SOCKET fd, const NetworkProxyPtr& proxy, const Address& target,
+#if defined(_WIN32)
+ bool connected)
+#else
+ bool /*connected*/)
+#endif
+{
+ if(fd == INVALID_SOCKET)
+ {
+ return "<closed>";
+ }
+
+ ostringstream s;
+
+#if defined(_WIN32)
+ if(!connected)
+ {
+ //
+ // The local address is only accessible with connected sockets on Windows.
+ //
+ s << "local address = <not available>";
+ }
+ else
+ {
+ Address localAddr;
+ fdToLocalAddress(fd, localAddr);
+ s << "local address = " << addrToString(localAddr);
+ }
+#else
+ Address localAddr;
+ fdToLocalAddress(fd, localAddr);
+ s << "local address = " << addrToString(localAddr);
+#endif
+
+ Address remoteAddr;
+ bool peerConnected = fdToRemoteAddress(fd, remoteAddr);
+
+ if(proxy)
+ {
+ if(!peerConnected)
+ {
+ remoteAddr = proxy->getAddress();
+ }
+ s << "\n" + proxy->getName() + " proxy address = " << addrToString(remoteAddr);
+ s << "\nremote address = " << addrToString(target);
+ }
+ else
+ {
+ if(!peerConnected)
+ {
+ remoteAddr = target;
+ }
+ s << "\nremote address = " << addrToString(remoteAddr);
+ }
+
+ return s.str();
+}
std::string
IceInternal::fdToString(SOCKET fd)
@@ -1036,7 +1203,7 @@ IceInternal::fdToString(SOCKET fd)
bool peerConnected = fdToRemoteAddress(fd, remoteAddr);
return addressesToString(localAddr, remoteAddr, peerConnected);
-};
+}
void
IceInternal::fdToAddressAndPort(SOCKET fd, string& localAddress, int& localPort, string& remoteAddress, int& remotePort)
diff --git a/cpp/src/Ice/Network.h b/cpp/src/Ice/Network.h
index 8d7cac3a665..55036bcc986 100644
--- a/cpp/src/Ice/Network.h
+++ b/cpp/src/Ice/Network.h
@@ -16,6 +16,7 @@
#include <Ice/Config.h>
+#include <Ice/NetworkF.h>
#include <Ice/PropertiesF.h> // For setTcpBufSize
#include <Ice/LoggerF.h> // For setTcpBufSize
#include <Ice/Protocol.h>
@@ -187,6 +188,71 @@ protected:
};
typedef IceUtil::Handle<NativeInfo> NativeInfoPtr;
+class ICE_API Buffer;
+
+class ICE_API NetworkProxy : virtual public IceUtil::Shared
+{
+public:
+
+ //
+ // Write the connection request on the connection established
+ // with the network proxy server. This is called right after
+ // the connection establishment succeeds.
+ //
+ virtual void beginWriteConnectRequest(const Address&, Buffer&) = 0;
+ virtual void endWriteConnectRequest(Buffer&) = 0;
+
+ //
+ // Once the connection request has been sent, this is called
+ // to prepare and read the response from the proxy server.
+ //
+ virtual void beginReadConnectRequestResponse(Buffer&) = 0;
+ virtual void endReadConnectRequestResponse(Buffer&) = 0;
+
+ //
+ // If the proxy host needs to be resolved, this should return
+ // a new NetworkProxy containing the IP address of the proxy.
+ // This is called from the endpoint host resolver thread, so
+ // it's safe if this this method blocks.
+ //
+ virtual NetworkProxyPtr resolveHost() const = 0;
+
+ //
+ // Returns the IP address of the network proxy. This method
+ // must not block. It's only called on a network proxy object
+ // returned by resolveHost().
+ //
+ virtual Address getAddress() const = 0;
+
+ //
+ // Returns the name of the proxy, used for tracing purposes.
+ //
+ virtual std::string getName() const = 0;
+};
+
+class ICE_API SOCKSNetworkProxy : virtual public NetworkProxy
+{
+public:
+
+ SOCKSNetworkProxy(const std::string&, int);
+ SOCKSNetworkProxy(const Address&);
+
+ virtual void beginWriteConnectRequest(const Address&, Buffer&);
+ virtual void endWriteConnectRequest(Buffer&);
+ virtual void beginReadConnectRequestResponse(Buffer&);
+ virtual void endReadConnectRequestResponse(Buffer&);
+ virtual NetworkProxyPtr resolveHost() const;
+ virtual Address getAddress() const;
+ virtual std::string getName() const;
+
+private:
+
+ std::string _host;
+ int _port;
+ Address _address;
+ bool _haveAddress;
+};
+
ICE_API bool noMoreFds(int);
ICE_API std::string errorToStringDNS(int);
ICE_API std::vector<Address> getAddresses(const std::string&, int, ProtocolSupport, Ice::EndpointSelectionType, bool,
@@ -203,6 +269,7 @@ ICE_API void closeSocket(SOCKET);
ICE_API std::string addrToString(const Address&);
ICE_API void fdToLocalAddress(SOCKET, Address&);
ICE_API bool fdToRemoteAddress(SOCKET, Address&);
+ICE_API std::string fdToString(SOCKET, const NetworkProxyPtr&, const Address&, bool);
ICE_API std::string fdToString(SOCKET);
ICE_API void fdToAddressAndPort(SOCKET, std::string&, int&, std::string&, int&);
ICE_API void addrToAddressAndPort(const Address&, std::string&, int&);
diff --git a/cpp/src/Ice/NetworkF.h b/cpp/src/Ice/NetworkF.h
new file mode 100644
index 00000000000..4b8cb2d4921
--- /dev/null
+++ b/cpp/src/Ice/NetworkF.h
@@ -0,0 +1,30 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#ifndef ICE_NETWORK_F_H
+#define ICE_NETWORK_F_H
+
+#include <IceUtil/Shared.h>
+
+#include <Ice/Handle.h>
+
+namespace IceInternal
+{
+
+class NetworkProxy;
+ICE_API IceUtil::Shared* upCast(NetworkProxy*);
+typedef Handle<NetworkProxy> NetworkProxyPtr;
+
+class SOCKSNetworkProxy;
+ICE_API IceUtil::Shared* upCast(SOCKSNetworkProxy*);
+typedef Handle<SOCKSNetworkProxy> SOCKSNetworkProxyPtr;
+
+}
+
+#endif
diff --git a/cpp/src/Ice/ProtocolPluginFacade.cpp b/cpp/src/Ice/ProtocolPluginFacade.cpp
index 483a9aa0df2..e7ca3f5c2f9 100644
--- a/cpp/src/Ice/ProtocolPluginFacade.cpp
+++ b/cpp/src/Ice/ProtocolPluginFacade.cpp
@@ -74,6 +74,12 @@ IceInternal::ProtocolPluginFacade::preferIPv6() const
return _instance->preferIPv6();
}
+NetworkProxyPtr
+IceInternal::ProtocolPluginFacade::getNetworkProxy() const
+{
+ return _instance->networkProxy();
+}
+
void
IceInternal::ProtocolPluginFacade::addEndpointFactory(const EndpointFactoryPtr& factory) const
{
diff --git a/cpp/src/Ice/TcpAcceptor.cpp b/cpp/src/Ice/TcpAcceptor.cpp
index ec295b6a6bc..eeb74e10cb5 100644
--- a/cpp/src/Ice/TcpAcceptor.cpp
+++ b/cpp/src/Ice/TcpAcceptor.cpp
@@ -165,7 +165,7 @@ IceInternal::TcpAcceptor::accept()
Trace out(_logger, _traceLevels->networkCat);
out << "accepted tcp connection\n" << fdToString(fd);
}
- return new TcpTransceiver(_instance, fd, true);
+ return new TcpTransceiver(_instance, fd);
}
string
diff --git a/cpp/src/Ice/TcpConnector.cpp b/cpp/src/Ice/TcpConnector.cpp
index 435db7f2b7a..b269035a1b7 100644
--- a/cpp/src/Ice/TcpConnector.cpp
+++ b/cpp/src/Ice/TcpConnector.cpp
@@ -31,8 +31,8 @@ IceInternal::TcpConnector::connect()
try
{
- TransceiverPtr transceiver = new TcpTransceiver(_instance, createSocket(false, _addr), false);
- dynamic_cast<TcpTransceiver*>(transceiver.get())->connect(_addr);
+ TransceiverPtr transceiver = new TcpTransceiver(_instance, createSocket(false, _addr), _proxy, _addr);
+ dynamic_cast<TcpTransceiver*>(transceiver.get())->connect();
return transceiver;
}
catch(const Ice::LocalException& ex)
@@ -55,7 +55,7 @@ IceInternal::TcpConnector::type() const
string
IceInternal::TcpConnector::toString() const
{
- return addrToString(_addr);
+ return addrToString(!_proxy ? _addr : _proxy->getAddress());
}
bool
@@ -120,12 +120,13 @@ IceInternal::TcpConnector::operator<(const Connector& r) const
return compareAddress(_addr, p->_addr) < 0;
}
-IceInternal::TcpConnector::TcpConnector(const InstancePtr& instance, const Address& addr,
+IceInternal::TcpConnector::TcpConnector(const InstancePtr& instance, const Address& addr, const NetworkProxyPtr& proxy,
Ice::Int timeout, const string& connectionId) :
_instance(instance),
_traceLevels(instance->traceLevels()),
_logger(instance->initializationData().logger),
_addr(addr),
+ _proxy(proxy),
_timeout(timeout),
_connectionId(connectionId)
{
diff --git a/cpp/src/Ice/TcpConnector.h b/cpp/src/Ice/TcpConnector.h
index 9c504a0ebca..e25b9f2e3a8 100644
--- a/cpp/src/Ice/TcpConnector.h
+++ b/cpp/src/Ice/TcpConnector.h
@@ -36,7 +36,7 @@ public:
private:
- TcpConnector(const InstancePtr&, const Address&, Ice::Int, const std::string&);
+ TcpConnector(const InstancePtr&, const Address&, const NetworkProxyPtr&, Ice::Int, const std::string&);
virtual ~TcpConnector();
friend class TcpEndpointI;
@@ -44,6 +44,7 @@ private:
const TraceLevelsPtr _traceLevels;
const ::Ice::LoggerPtr _logger;
const Address _addr;
+ const NetworkProxyPtr _proxy;
const Ice::Int _timeout;
const std::string _connectionId;
};
diff --git a/cpp/src/Ice/TcpEndpointI.cpp b/cpp/src/Ice/TcpEndpointI.cpp
index 27324314a60..1cf09f15e3e 100644
--- a/cpp/src/Ice/TcpEndpointI.cpp
+++ b/cpp/src/Ice/TcpEndpointI.cpp
@@ -413,6 +413,17 @@ IceInternal::TcpEndpointI::equivalent(const EndpointIPtr& endpoint) const
return tcpEndpointI->_host == _host && tcpEndpointI->_port == _port;
}
+vector<ConnectorPtr>
+IceInternal::TcpEndpointI::connectors(const vector<Address>& addresses, const NetworkProxyPtr& proxy) const
+{
+ vector<ConnectorPtr> connectors;
+ for(unsigned int i = 0; i < addresses.size(); ++i)
+ {
+ connectors.push_back(new TcpConnector(_instance, addresses[i], proxy, _timeout, _connectionId));
+ }
+ return connectors;
+}
+
bool
IceInternal::TcpEndpointI::operator==(const LocalObject& r) const
{
@@ -535,17 +546,6 @@ IceInternal::TcpEndpointI::hashInit() const
return h;
}
-vector<ConnectorPtr>
-IceInternal::TcpEndpointI::connectors(const vector<Address>& addresses) const
-{
- vector<ConnectorPtr> connectors;
- for(unsigned int i = 0; i < addresses.size(); ++i)
- {
- connectors.push_back(new TcpConnector(_instance, addresses[i], _timeout, _connectionId));
- }
- return connectors;
-}
-
IceInternal::TcpEndpointFactory::TcpEndpointFactory(const InstancePtr& instance)
: _instance(instance)
{
diff --git a/cpp/src/Ice/TcpEndpointI.h b/cpp/src/Ice/TcpEndpointI.h
index 294bf79a00e..5d3cb602ed0 100644
--- a/cpp/src/Ice/TcpEndpointI.h
+++ b/cpp/src/Ice/TcpEndpointI.h
@@ -44,6 +44,7 @@ public:
virtual AcceptorPtr acceptor(EndpointIPtr&, const std::string&) const;
virtual std::vector<EndpointIPtr> expand() const;
virtual bool equivalent(const EndpointIPtr&) const;
+ virtual std::vector<ConnectorPtr> connectors(const std::vector<Address>&, const NetworkProxyPtr&) const;
virtual bool operator==(const Ice::LocalObject&) const;
virtual bool operator<(const Ice::LocalObject&) const;
@@ -55,7 +56,6 @@ public:
private:
virtual ::Ice::Int hashInit() const;
- virtual std::vector<ConnectorPtr> connectors(const std::vector<IceInternal::Address>&) const;
//
// All members are const, because endpoints are immutable.
diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp
index ee49f1f7e74..c577367c5fb 100644
--- a/cpp/src/Ice/TcpTransceiver.cpp
+++ b/cpp/src/Ice/TcpTransceiver.cpp
@@ -46,59 +46,126 @@ IceInternal::TcpTransceiver::getAsyncInfo(SocketOperation status)
#endif
SocketOperation
-IceInternal::TcpTransceiver::initialize()
+IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer)
{
- if(_state == StateNeedConnect)
- {
- _state = StateConnectPending;
- return SocketOperationConnect;
- }
- else if(_state <= StateConnectPending)
+ try
{
- try
+ if(_state == StateNeedConnect)
{
-#if defined(ICE_USE_IOCP)
+ _state = StateConnectPending;
+ return SocketOperationConnect;
+ }
+ else if(_state <= StateConnectPending)
+ {
+#ifdef ICE_USE_IOCP
doFinishConnectAsync(_fd, _write);
#else
doFinishConnect(_fd);
#endif
- _state = StateConnected;
- _desc = fdToString(_fd);
- }
- catch(const Ice::LocalException& ex)
- {
- if(_traceLevels->network >= 2)
+
+ _desc = fdToString(_fd, _proxy, _addr, true);
+
+ if(_proxy)
{
- Trace out(_logger, _traceLevels->networkCat);
- out << "failed to establish tcp connection\n";
-#if !defined(_WIN32)
//
- // The local address is only accessible with connected sockets on Windows.
+ // Prepare the read & write buffers in advance.
+ //
+ _proxy->beginWriteConnectRequest(_addr, writeBuffer);
+ _proxy->beginReadConnectRequestResponse(readBuffer);
+
+#ifdef ICE_USE_IOCP
+ //
+ // Return SocketOperationWrite to indicate we need to start a write.
//
- Address localAddr;
- fdToLocalAddress(_fd, localAddr);
- out << "local address: " << addrToString(localAddr) << "\n";
+ _state = StateProxyConnectRequest; // Send proxy connect request
+ return IceInternal::SocketOperationWrite;
#else
- out << "local address: <not available>\n";
+ //
+ // Write the proxy connection message.
+ //
+ if(write(writeBuffer))
+ {
+ //
+ // Write completed without blocking.
+ //
+ _proxy->endWriteConnectRequest(writeBuffer);
+
+ //
+ // Try to read the response.
+ //
+ if(read(readBuffer))
+ {
+ //
+ // Read completed without blocking - fall through.
+ //
+ _proxy->endReadConnectRequestResponse(readBuffer);
+ }
+ else
+ {
+ //
+ // Return SocketOperationRead to indicate we need to complete the read.
+ //
+ _state = StateProxyConnectRequestPending; // Wait for proxy response
+ return SocketOperationRead;
+ }
+ }
+ else
+ {
+ //
+ // Return SocketOperationWrite to indicate we need to complete the write.
+ //
+ _state = StateProxyConnectRequest; // Send proxy connect request
+ return SocketOperationWrite;
+ }
#endif
- out << "remote address: " << addrToString(_connectAddr) << "\n" << ex;
}
- throw;
- }
- if(_traceLevels->network >= 1)
+ _state = StateConnected;
+ }
+ else if(_state == StateProxyConnectRequest)
+ {
+ //
+ // Write completed.
+ //
+ _proxy->endWriteConnectRequest(writeBuffer);
+ _state = StateProxyConnectRequestPending; // Wait for proxy response
+ return SocketOperationRead;
+ }
+ else if(_state == StateProxyConnectRequestPending)
+ {
+ //
+ // Read completed.
+ //
+ _proxy->endReadConnectRequestResponse(readBuffer);
+ _state = StateConnected;
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ if(_traceLevels->network >= 2)
{
Trace out(_logger, _traceLevels->networkCat);
- out << "tcp connection established\n" << _desc;
+ out << "failed to establish tcp connection\n" << fdToString(_fd, _proxy, _addr, false) << "\n" << ex;
}
+ throw;
}
+
assert(_state == StateConnected);
+ if(_traceLevels->network >= 1)
+ {
+ Trace out(_logger, _traceLevels->networkCat);
+ out << "tcp connection established\n" << _desc;
+ }
return SocketOperationNone;
}
void
IceInternal::TcpTransceiver::close()
{
+ //
+ // If the transceiver is not connected, its description is simply "not connected",
+ // which isn't very helpful.
+ //
if(_state == StateConnected && _traceLevels->network >= 1)
{
Trace out(_logger, _traceLevels->networkCat);
@@ -121,17 +188,19 @@ IceInternal::TcpTransceiver::close()
bool
IceInternal::TcpTransceiver::write(Buffer& buf)
{
- // Its impossible for the packetSize to be more than an Int.
+ //
+ // It's impossible for packetSize to be more than an Int.
+ //
int packetSize = static_cast<int>(buf.b.end() - buf.i);
-# ifdef ICE_USE_IOCP
+#ifdef ICE_USE_IOCP
//
// Limit packet size to avoid performance problems on WIN32
//
if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
- {
+ {
packetSize = _maxSendPacketSize;
}
-# endif
+#endif
while(buf.i != buf.b.end())
{
assert(_fd != INVALID_SOCKET);
@@ -161,7 +230,7 @@ IceInternal::TcpTransceiver::write(Buffer& buf)
{
return false;
}
-
+
if(connectionLost())
{
ConnectionLostException ex(__FILE__, __LINE__);
@@ -194,13 +263,16 @@ IceInternal::TcpTransceiver::write(Buffer& buf)
packetSize = static_cast<int>(buf.b.end() - buf.i);
}
}
+
return true;
}
bool
IceInternal::TcpTransceiver::read(Buffer& buf)
{
- // Its impossible for the packetSize to be more than an Int.
+ //
+ // It's impossible for packetSize to be more than an Int.
+ //
int packetSize = static_cast<int>(buf.b.end() - buf.i);
while(buf.i != buf.b.end())
{
@@ -220,7 +292,7 @@ IceInternal::TcpTransceiver::read(Buffer& buf)
{
continue;
}
-
+
if(noBuffers() && packetSize > 1024)
{
packetSize /= 2;
@@ -231,7 +303,7 @@ IceInternal::TcpTransceiver::read(Buffer& buf)
{
return false;
}
-
+
if(connectionLost())
{
ConnectionLostException ex(__FILE__, __LINE__);
@@ -264,13 +336,14 @@ IceInternal::TcpTransceiver::read(Buffer& buf)
return true;
}
-#if defined(ICE_USE_IOCP)
+#ifdef ICE_USE_IOCP
bool
IceInternal::TcpTransceiver::startWrite(Buffer& buf)
{
- if(_state < StateConnected)
+ if(_state == StateConnectPending)
{
- doConnectAsync(_fd, _connectAddr, _write);
+ Address addr = _proxy ? _proxy->getAddress() : _addr;
+ doConnectAsync(_fd, addr, _write);
return false;
}
@@ -279,7 +352,7 @@ IceInternal::TcpTransceiver::startWrite(Buffer& buf)
int packetSize = static_cast<int>(buf.b.end() - buf.i);
if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
- {
+ {
packetSize = _maxSendPacketSize;
}
assert(packetSize > 0);
@@ -310,7 +383,7 @@ IceInternal::TcpTransceiver::startWrite(Buffer& buf)
void
IceInternal::TcpTransceiver::finishWrite(Buffer& buf)
{
- if(_state < StateConnected)
+ if(_state < StateConnected && _state != StateProxyConnectRequest)
{
return;
}
@@ -336,18 +409,19 @@ IceInternal::TcpTransceiver::finishWrite(Buffer& buf)
{
int packetSize = static_cast<int>(buf.b.end() - buf.i);
if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
- {
+ {
packetSize = _maxSendPacketSize;
}
Trace out(_logger, _traceLevels->networkCat);
out << "sent " << _write.count << " of " << packetSize << " bytes via tcp\n" << toString();
}
-
+
if(_stats)
{
_stats->bytesSent(type(), _write.count);
}
+
buf.i += _write.count;
}
@@ -409,7 +483,7 @@ IceInternal::TcpTransceiver::finishRead(Buffer& buf)
ex.error = 0;
throw ex;
}
-
+
if(_traceLevels->network >= 3)
{
int packetSize = static_cast<int>(buf.b.end() - buf.i);
@@ -442,7 +516,7 @@ IceInternal::TcpTransceiver::toString() const
return _desc;
}
-Ice::ConnectionInfoPtr
+Ice::ConnectionInfoPtr
IceInternal::TcpTransceiver::getInfo() const
{
Ice::TCPConnectionInfoPtr info = new Ice::TCPConnectionInfo();
@@ -459,15 +533,53 @@ IceInternal::TcpTransceiver::checkSendSize(const Buffer& buf, size_t messageSize
}
}
-IceInternal::TcpTransceiver::TcpTransceiver(const InstancePtr& instance, SOCKET fd, bool connected) :
+IceInternal::TcpTransceiver::TcpTransceiver(const InstancePtr& instance, SOCKET fd, const NetworkProxyPtr& proxy,
+ const Address& addr) :
+ NativeInfo(fd),
+ _proxy(proxy),
+ _addr(addr),
+ _traceLevels(instance->traceLevels()),
+ _logger(instance->initializationData().logger),
+ _stats(instance->initializationData().stats),
+ _state(StateNeedConnect)
+#ifdef ICE_USE_IOCP
+ , _read(SocketOperationRead),
+ _write(SocketOperationWrite)
+#endif
+{
+ setBlock(_fd, false);
+
+ setTcpBufSize(_fd, instance->initializationData().properties, _logger);
+
+#ifdef ICE_USE_IOCP
+ //
+ // On Windows, limiting the buffer size is important to prevent
+ // poor throughput performances when transfering large amount of
+ // data. See Microsoft KB article KB823764.
+ //
+ _maxSendPacketSize = IceInternal::getSendBufferSize(fd) / 2;
+ if(_maxSendPacketSize < 512)
+ {
+ _maxSendPacketSize = 0;
+ }
+
+ _maxReceivePacketSize = IceInternal::getRecvBufferSize(fd);
+ if(_maxReceivePacketSize < 512)
+ {
+ _maxReceivePacketSize = 0;
+ }
+#endif
+}
+
+IceInternal::TcpTransceiver::TcpTransceiver(const InstancePtr& instance, SOCKET fd) :
NativeInfo(fd),
_traceLevels(instance->traceLevels()),
_logger(instance->initializationData().logger),
_stats(instance->initializationData().stats),
- _state(connected ? StateConnected : StateNeedConnect),
- _desc(connected ? fdToString(_fd) : string())
+ _state(StateConnected),
+ _desc(fdToString(_fd))
#ifdef ICE_USE_IOCP
- , _read(SocketOperationRead),
+ , _read(SocketOperationRead),
_write(SocketOperationWrite)
#endif
{
@@ -501,15 +613,15 @@ IceInternal::TcpTransceiver::~TcpTransceiver()
}
void
-IceInternal::TcpTransceiver::connect(const Address& addr)
+IceInternal::TcpTransceiver::connect()
{
#if !defined(ICE_USE_IOCP)
try
{
- if(doConnect(_fd, addr))
+ if(doConnect(_fd, _addr))
{
_state = StateConnected;
- _desc = fdToString(_fd);
+ _desc = fdToString(_fd, _proxy, _addr);
if(_traceLevels->network >= 1)
{
Trace out(_logger, _traceLevels->networkCat);
@@ -518,7 +630,7 @@ IceInternal::TcpTransceiver::connect(const Address& addr)
}
else
{
- _desc = fdToString(_fd);
+ _desc = fdToString(_fd, _proxy, _addr);
}
}
catch(...)
@@ -527,5 +639,4 @@ IceInternal::TcpTransceiver::connect(const Address& addr)
throw;
}
#endif
- _connectAddr = addr;
}
diff --git a/cpp/src/Ice/TcpTransceiver.h b/cpp/src/Ice/TcpTransceiver.h
index 3005da1a0e7..a2afe4bb514 100644
--- a/cpp/src/Ice/TcpTransceiver.h
+++ b/cpp/src/Ice/TcpTransceiver.h
@@ -29,6 +29,8 @@ class TcpTransceiver : public Transceiver, public NativeInfo
{
StateNeedConnect,
StateConnectPending,
+ StateProxyConnectRequest,
+ StateProxyConnectRequestPending,
StateConnected
};
@@ -39,7 +41,7 @@ public:
virtual AsyncInfo* getAsyncInfo(SocketOperation);
#endif
- virtual SocketOperation initialize();
+ virtual SocketOperation initialize(Buffer&, Buffer&);
virtual void close();
virtual bool write(Buffer&);
virtual bool read(Buffer&);
@@ -56,21 +58,23 @@ public:
private:
- TcpTransceiver(const InstancePtr&, SOCKET, bool);
+ TcpTransceiver(const InstancePtr&, SOCKET, const NetworkProxyPtr&, const Address&);
+ TcpTransceiver(const InstancePtr&, SOCKET);
virtual ~TcpTransceiver();
- void connect(const Address&);
+ void connect();
friend class TcpConnector;
friend class TcpAcceptor;
+ const NetworkProxyPtr _proxy;
+ const Address _addr;
const TraceLevelsPtr _traceLevels;
const Ice::LoggerPtr _logger;
const Ice::StatsPtr _stats;
State _state;
std::string _desc;
- Address _connectAddr;
#ifdef ICE_USE_IOCP
AsyncInfo _read;
diff --git a/cpp/src/Ice/Transceiver.h b/cpp/src/Ice/Transceiver.h
index e08159e1d32..c2c00cf151d 100644
--- a/cpp/src/Ice/Transceiver.h
+++ b/cpp/src/Ice/Transceiver.h
@@ -25,7 +25,7 @@ class ICE_API Transceiver : virtual public ::IceUtil::Shared
public:
virtual NativeInfoPtr getNativeInfo() = 0;
- virtual SocketOperation initialize() = 0;
+ virtual SocketOperation initialize(Buffer&, Buffer&) = 0;
virtual void close() = 0;
virtual bool write(Buffer&) = 0;
virtual bool read(Buffer&) = 0;
diff --git a/cpp/src/Ice/UdpEndpointI.cpp b/cpp/src/Ice/UdpEndpointI.cpp
index 56828588a2d..f36d54ece5c 100644
--- a/cpp/src/Ice/UdpEndpointI.cpp
+++ b/cpp/src/Ice/UdpEndpointI.cpp
@@ -491,6 +491,17 @@ IceInternal::UdpEndpointI::equivalent(const EndpointIPtr& endpoint) const
return udpEndpointI->_host == _host && udpEndpointI->_port == _port;
}
+vector<ConnectorPtr>
+IceInternal::UdpEndpointI::connectors(const vector<Address>& addresses, const NetworkProxyPtr&) const
+{
+ vector<ConnectorPtr> connectors;
+ for(unsigned int i = 0; i < addresses.size(); ++i)
+ {
+ connectors.push_back(new UdpConnector(_instance, addresses[i], _mcastInterface, _mcastTtl, _connectionId));
+ }
+ return connectors;
+}
+
bool
IceInternal::UdpEndpointI::operator==(const LocalObject& r) const
{
@@ -643,17 +654,6 @@ IceInternal::UdpEndpointI::hashInit() const
return h;
}
-vector<ConnectorPtr>
-IceInternal::UdpEndpointI::connectors(const vector<Address>& addresses) const
-{
- vector<ConnectorPtr> connectors;
- for(unsigned int i = 0; i < addresses.size(); ++i)
- {
- connectors.push_back(new UdpConnector(_instance, addresses[i], _mcastInterface, _mcastTtl, _connectionId));
- }
- return connectors;
-}
-
IceInternal::UdpEndpointFactory::UdpEndpointFactory(const InstancePtr& instance)
: _instance(instance)
diff --git a/cpp/src/Ice/UdpEndpointI.h b/cpp/src/Ice/UdpEndpointI.h
index 86f2f514212..81e78fec3aa 100644
--- a/cpp/src/Ice/UdpEndpointI.h
+++ b/cpp/src/Ice/UdpEndpointI.h
@@ -45,6 +45,7 @@ public:
virtual AcceptorPtr acceptor(EndpointIPtr&, const std::string&) const;
virtual std::vector<EndpointIPtr> expand() const;
virtual bool equivalent(const EndpointIPtr&) const;
+ virtual std::vector<ConnectorPtr> connectors(const std::vector<Address>&, const NetworkProxyPtr&) const;
virtual bool operator==(const Ice::LocalObject&) const;
virtual bool operator<(const Ice::LocalObject&) const;
@@ -56,7 +57,6 @@ public:
private:
virtual ::Ice::Int hashInit() const;
- virtual std::vector<ConnectorPtr> connectors(const std::vector<IceInternal::Address>&) const;
//
// All members are const, because endpoints are immutable.
diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp
index 5b127660735..ead3dde9371 100644
--- a/cpp/src/Ice/UdpTransceiver.cpp
+++ b/cpp/src/Ice/UdpTransceiver.cpp
@@ -80,7 +80,7 @@ IceInternal::UdpTransceiver::setCompletedHandler(SocketOperationCompletedHandler
#endif
SocketOperation
-IceInternal::UdpTransceiver::initialize()
+IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/)
{
if(_state == StateNeedConnect)
{
diff --git a/cpp/src/Ice/UdpTransceiver.h b/cpp/src/Ice/UdpTransceiver.h
index 59cba440870..ad6bf128619 100644
--- a/cpp/src/Ice/UdpTransceiver.h
+++ b/cpp/src/Ice/UdpTransceiver.h
@@ -47,7 +47,7 @@ public:
virtual void setCompletedHandler(SocketOperationCompletedHandler^);
#endif
- virtual SocketOperation initialize();
+ virtual SocketOperation initialize(Buffer&, Buffer&);
virtual void close();
virtual bool write(Buffer&);
virtual bool read(Buffer&);
diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp
index fefad619293..dc406a700dc 100644
--- a/cpp/src/IceGrid/ReplicaCache.cpp
+++ b/cpp/src/IceGrid/ReplicaCache.cpp
@@ -83,7 +83,7 @@ ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session)
{
_observers->replicaAdded(session->getInternalRegistry());
}
- catch(const Ice::ConnectionRefusedException&)
+ catch(const Ice::ConnectFailedException&)
{
// Expected if the replica is being shutdown.
}
@@ -122,7 +122,7 @@ ReplicaCache::remove(const string& name, bool shutdown)
{
_observers->replicaRemoved(entry->getProxy());
}
- catch(const Ice::ConnectionRefusedException&)
+ catch(const Ice::ConnectFailedException&)
{
// Expected if the replica is being shutdown.
}
@@ -171,7 +171,7 @@ ReplicaCache::subscribe(const ReplicaObserverPrx& observer)
Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer->ice_twoway());
ReplicaObserverPrx::uncheckedCast(publisher)->replicaInit(replicas);
}
- catch(const Ice::ConnectionRefusedException&)
+ catch(const Ice::ConnectFailedException&)
{
// The replica is being shutdown.
}
@@ -193,7 +193,7 @@ ReplicaCache::unsubscribe(const ReplicaObserverPrx& observer)
{
_topic->unsubscribe(observer);
}
- catch(const Ice::ConnectionRefusedException&)
+ catch(const Ice::ConnectFailedException&)
{
// The replica is being shutdown.
}
diff --git a/cpp/src/IceSSL/ConnectorI.cpp b/cpp/src/IceSSL/ConnectorI.cpp
index f327795ef04..fe75e4f12ea 100644
--- a/cpp/src/IceSSL/ConnectorI.cpp
+++ b/cpp/src/IceSSL/ConnectorI.cpp
@@ -41,7 +41,7 @@ IceSSL::ConnectorI::connect()
try
{
- return new TransceiverI(_instance, IceInternal::createSocket(false, _addr), _host, _addr);
+ return new TransceiverI(_instance, IceInternal::createSocket(false, _addr), _proxy, _host, _addr);
}
catch(const Ice::LocalException& ex)
{
@@ -63,7 +63,7 @@ IceSSL::ConnectorI::type() const
string
IceSSL::ConnectorI::toString() const
{
- return IceInternal::addrToString(_addr);
+ return IceInternal::addrToString(!_proxy ? _addr : _proxy->getAddress());
}
bool
@@ -130,11 +130,13 @@ IceSSL::ConnectorI::operator<(const IceInternal::Connector& r) const
}
IceSSL::ConnectorI::ConnectorI(const InstancePtr& instance, const string& host, const IceInternal::Address& addr,
- Ice::Int timeout, const string& connectionId) :
+ const IceInternal::NetworkProxyPtr& proxy, Ice::Int timeout,
+ const string& connectionId) :
_instance(instance),
_logger(instance->communicator()->getLogger()),
_host(host),
_addr(addr),
+ _proxy(proxy),
_timeout(timeout),
_connectionId(connectionId)
{
diff --git a/cpp/src/IceSSL/ConnectorI.h b/cpp/src/IceSSL/ConnectorI.h
index fd7d3fffcaf..10048b42015 100644
--- a/cpp/src/IceSSL/ConnectorI.h
+++ b/cpp/src/IceSSL/ConnectorI.h
@@ -37,14 +37,16 @@ public:
private:
- ConnectorI(const InstancePtr&, const std::string&, const IceInternal::Address&, Ice::Int, const std::string&);
+ ConnectorI(const InstancePtr&, const std::string&, const IceInternal::Address&,
+ const IceInternal::NetworkProxyPtr&, Ice::Int, const std::string&);
virtual ~ConnectorI();
friend class EndpointI;
const InstancePtr _instance;
const Ice::LoggerPtr _logger;
const std::string _host;
- IceInternal::Address _addr;
+ const IceInternal::Address _addr;
+ const IceInternal::NetworkProxyPtr _proxy;
const Ice::Int _timeout;
const std::string _connectionId;
};
diff --git a/cpp/src/IceSSL/EndpointI.cpp b/cpp/src/IceSSL/EndpointI.cpp
index ce55f30a306..1c71eb560ab 100644
--- a/cpp/src/IceSSL/EndpointI.cpp
+++ b/cpp/src/IceSSL/EndpointI.cpp
@@ -411,6 +411,18 @@ IceSSL::EndpointI::equivalent(const IceInternal::EndpointIPtr& endpoint) const
return sslEndpointI->_host == _host && sslEndpointI->_port == _port;
}
+vector<IceInternal::ConnectorPtr>
+IceSSL::EndpointI::connectors(const vector<IceInternal::Address>& addresses,
+ const IceInternal::NetworkProxyPtr& proxy) const
+{
+ vector<IceInternal::ConnectorPtr> connectors;
+ for(unsigned int i = 0; i < addresses.size(); ++i)
+ {
+ connectors.push_back(new ConnectorI(_instance, _host, addresses[i], proxy, _timeout, _connectionId));
+ }
+ return connectors;
+}
+
bool
IceSSL::EndpointI::operator==(const Ice::LocalObject& r) const
{
@@ -533,17 +545,6 @@ IceSSL::EndpointI::hashInit() const
return h;
}
-vector<IceInternal::ConnectorPtr>
-IceSSL::EndpointI::connectors(const vector<IceInternal::Address>& addresses) const
-{
- vector<IceInternal::ConnectorPtr> connectors;
- for(unsigned int i = 0; i < addresses.size(); ++i)
- {
- connectors.push_back(new ConnectorI(_instance, _host, addresses[i], _timeout, _connectionId));
- }
- return connectors;
-}
-
IceSSL::EndpointFactoryI::EndpointFactoryI(const InstancePtr& instance)
: _instance(instance)
{
diff --git a/cpp/src/IceSSL/EndpointI.h b/cpp/src/IceSSL/EndpointI.h
index a54ac39f1e1..d5ffa52eccd 100644
--- a/cpp/src/IceSSL/EndpointI.h
+++ b/cpp/src/IceSSL/EndpointI.h
@@ -45,6 +45,8 @@ public:
virtual IceInternal::AcceptorPtr acceptor(IceInternal::EndpointIPtr&, const std::string&) const;
virtual std::vector<IceInternal::EndpointIPtr> expand() const;
virtual bool equivalent(const IceInternal::EndpointIPtr&) const;
+ virtual std::vector<IceInternal::ConnectorPtr> connectors(const std::vector<IceInternal::Address>&,
+ const IceInternal::NetworkProxyPtr&) const;
virtual bool operator==(const Ice::LocalObject&) const;
virtual bool operator<(const Ice::LocalObject&) const;
@@ -56,7 +58,6 @@ public:
private:
virtual ::Ice::Int hashInit() const;
- virtual std::vector<IceInternal::ConnectorPtr> connectors(const std::vector<IceInternal::Address>&) const;
//
// All members are const, because endpoints are immutable.
diff --git a/cpp/src/IceSSL/Instance.cpp b/cpp/src/IceSSL/Instance.cpp
index bd56fd7eb99..eb2ae99f78f 100644
--- a/cpp/src/IceSSL/Instance.cpp
+++ b/cpp/src/IceSSL/Instance.cpp
@@ -812,6 +812,12 @@ IceSSL::Instance::preferIPv6() const
return _facade->preferIPv6();
}
+IceInternal::NetworkProxyPtr
+IceSSL::Instance::networkProxy() const
+{
+ return _facade->getNetworkProxy();
+}
+
string
IceSSL::Instance::defaultHost() const
{
diff --git a/cpp/src/IceSSL/Instance.h b/cpp/src/IceSSL/Instance.h
index eacad08d549..ff63c33b951 100644
--- a/cpp/src/IceSSL/Instance.h
+++ b/cpp/src/IceSSL/Instance.h
@@ -40,6 +40,7 @@ public:
IceInternal::EndpointHostResolverPtr endpointHostResolver() const;
IceInternal::ProtocolSupport protocolSupport() const;
bool preferIPv6() const;
+ IceInternal::NetworkProxyPtr networkProxy() const;
std::string defaultHost() const;
Ice::EncodingVersion defaultEncoding() const;
int networkTraceLevel() const;
diff --git a/cpp/src/IceSSL/TransceiverI.cpp b/cpp/src/IceSSL/TransceiverI.cpp
index fdb6fe229ae..ec862faf22b 100644
--- a/cpp/src/IceSSL/TransceiverI.cpp
+++ b/cpp/src/IceSSL/TransceiverI.cpp
@@ -32,7 +32,7 @@ IceSSL::TransceiverI::getNativeInfo()
return this;
}
-#if defined(ICE_USE_IOCP)
+#ifdef ICE_USE_IOCP
IceInternal::AsyncInfo*
IceSSL::TransceiverI::getAsyncInfo(IceInternal::SocketOperation status)
{
@@ -50,7 +50,7 @@ IceSSL::TransceiverI::getAsyncInfo(IceInternal::SocketOperation status)
#endif
IceInternal::SocketOperation
-IceSSL::TransceiverI::initialize()
+IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer)
{
try
{
@@ -61,14 +61,89 @@ IceSSL::TransceiverI::initialize()
}
else if(_state <= StateConnectPending)
{
-#ifndef ICE_USE_IOCP
+#ifdef ICE_USE_IOCP
+ IceInternal::doFinishConnectAsync(_fd, _write);
+#else
IceInternal::doFinishConnect(_fd);
+#endif
+
+ _desc = IceInternal::fdToString(_fd, _proxy, _addr, true);
+
+ if(_proxy)
+ {
+ //
+ // Prepare the read & write buffers in advance.
+ //
+ _proxy->beginWriteConnectRequest(_addr, writeBuffer);
+ _proxy->beginReadConnectRequestResponse(readBuffer);
+
+#ifdef ICE_USE_IOCP
+ //
+ // Return SocketOperationWrite to indicate we need to start a write.
+ //
+ _state = StateProxyConnectRequest; // Send proxy connect request
+ return IceInternal::SocketOperationWrite;
#else
- IceInternal::doFinishConnectAsync(_fd, _write);
+ //
+ // Write the proxy connection message using TCP.
+ //
+ if(writeRaw(writeBuffer))
+ {
+ //
+ // Write completed without blocking.
+ //
+ _proxy->endWriteConnectRequest(writeBuffer);
+
+ //
+ // Try to read the response using TCP.
+ //
+ if(readRaw(readBuffer))
+ {
+ //
+ // Read completed without blocking - fall through.
+ //
+ _proxy->endReadConnectRequestResponse(readBuffer);
+ }
+ else
+ {
+ //
+ // Return SocketOperationRead to indicate we need to complete the read.
+ //
+ _state = StateProxyConnectRequestPending; // Wait for proxy response
+ return IceInternal::SocketOperationRead;
+ }
+ }
+ else
+ {
+ //
+ // Return SocketOperationWrite to indicate we need to complete the write.
+ //
+ _state = StateProxyConnectRequest; // Send proxy connect request
+ return IceInternal::SocketOperationWrite;
+ }
#endif
+ }
+
_state = StateConnected;
- _desc = IceInternal::fdToString(_fd);
}
+ else if(_state == StateProxyConnectRequest)
+ {
+ //
+ // Write completed.
+ //
+ _proxy->endWriteConnectRequest(writeBuffer);
+ _state = StateProxyConnectRequestPending; // Wait for proxy response
+ return IceInternal::SocketOperationRead;
+ }
+ else if(_state == StateProxyConnectRequestPending)
+ {
+ //
+ // Read completed.
+ //
+ _proxy->endReadConnectRequestResponse(readBuffer);
+ _state = StateConnected;
+ }
+
assert(_state == StateConnected);
if(!_ssl)
@@ -96,16 +171,18 @@ IceSSL::TransceiverI::initialize()
_sentBytes = 0;
#endif
-#ifndef ICE_USE_IOCP
- // This static_cast is necessary due to 64bit windows. There SOCKET is a non-int type.
- BIO* bio = BIO_new_socket(static_cast<int>(_fd), 0);
-#else
+#ifdef ICE_USE_IOCP
BIO* bio;
if(!BIO_new_bio_pair(&bio, _maxSendPacketSize, &_iocpBio, _maxReceivePacketSize))
{
bio = 0;
_iocpBio = 0;
}
+#else
+ //
+ // This static_cast is necessary due to 64bit windows. There SOCKET is a non-int type.
+ //
+ BIO* bio = BIO_new_socket(static_cast<int>(_fd), 0);
#endif
if(!bio)
{
@@ -255,17 +332,7 @@ IceSSL::TransceiverI::initialize()
}
else
{
-#ifndef _WIN32
- //
- // The local address is only accessible with connected sockets on Windows.
- //
- IceInternal::Address localAddr;
- IceInternal::fdToLocalAddress(_fd, localAddr);
- out << "local address: " << IceInternal::addrToString(localAddr) << "\n";
-#else
- out << "local address: <not available>\n";
-#endif
- out << "remote address: " << IceInternal::addrToString(_connectAddr) << "\n" << ex;
+ out << IceInternal::fdToString(_fd, _proxy, _addr, false) << "\n" << ex;
}
}
throw;
@@ -341,6 +408,14 @@ IceSSL::TransceiverI::close()
bool
IceSSL::TransceiverI::write(IceInternal::Buffer& buf)
{
+ if(_state == StateProxyConnectRequest)
+ {
+ //
+ // We need to write the proxy message, but we have to use TCP and not SSL.
+ //
+ return writeRaw(buf);
+ }
+
#ifdef ICE_USE_IOCP
if(_writeI != _writeBuffer.end())
{
@@ -351,15 +426,15 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf)
}
#endif
- // Its impossible for the packetSize to be more than an Int.
+ //
+ // It's impossible for packetSize to be more than an Int.
+ //
int packetSize = static_cast<int>(buf.b.end() - buf.i);
while(buf.i != buf.b.end())
{
ERR_clear_error(); // Clear any spurious errors.
assert(_fd != INVALID_SOCKET);
-#ifndef ICE_USE_IOCP
- int ret = SSL_write(_ssl, reinterpret_cast<const void*>(&*buf.i), packetSize);
-#else
+#ifdef ICE_USE_IOCP
int ret;
if(_sentBytes)
{
@@ -378,6 +453,8 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf)
}
}
}
+#else
+ int ret = SSL_write(_ssl, reinterpret_cast<const void*>(&*buf.i), packetSize);
#endif
if(ret <= 0)
{
@@ -482,6 +559,14 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf)
bool
IceSSL::TransceiverI::read(IceInternal::Buffer& buf)
{
+ if(_state == StateProxyConnectRequestPending)
+ {
+ //
+ // We need to read the proxy reply, but we have to use TCP and not SSL.
+ //
+ return readRaw(buf);
+ }
+
#ifdef ICE_USE_IOCP
if(_readI != _readBuffer.end())
{
@@ -492,7 +577,9 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf)
}
#endif
- // It's impossible for the packetSize to be more than an Int.
+ //
+ // It's impossible for packetSize to be more than an Int.
+ //
int packetSize = static_cast<int>(buf.b.end() - buf.i);
while(buf.i != buf.b.end())
{
@@ -644,51 +731,40 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf)
}
#ifdef ICE_USE_IOCP
+
bool
-IceSSL::TransceiverI::startWrite(IceInternal::Buffer& /*buf*/)
+IceSSL::TransceiverI::startWrite(IceInternal::Buffer& buf)
{
- if(_state < StateConnected)
+ if(_state == StateConnectPending)
{
- IceInternal::doConnectAsync(_fd, _connectAddr, _write);
+ IceInternal::Address addr = _proxy ? _proxy->getAddress() : _addr;
+ IceInternal::doConnectAsync(_fd, addr, _write);
return false;
}
+ else if(_state == StateProxyConnectRequest)
+ {
+ //
+ // We need to write the proxy message, but we have to use TCP and not SSL.
+ //
+ assert(!buf.b.empty() && buf.i != buf.b.end());
+
+ const int packetSize = static_cast<int>(buf.b.end() - buf.i);
+ const int actualSize = writeAsync(reinterpret_cast<char*>(&*buf.i), packetSize);
+ return packetSize == actualSize;
+ }
assert(!_writeBuffer.empty() && _writeI != _writeBuffer.end());
- int packetSize = static_cast<int>(_writeBuffer.end() - _writeI);
- if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
- {
- packetSize = _maxSendPacketSize;
- }
+ const int packetSize = static_cast<int>(_writeBuffer.end() - _writeI);
+ const int actualSize = writeAsync(reinterpret_cast<char*>(&*_writeI), packetSize);
- _write.buf.len = packetSize;
- _write.buf.buf = reinterpret_cast<char*>(&*_writeI);
- int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL);
- if(err == SOCKET_ERROR)
- {
- if(!IceInternal::wouldBlock())
- {
- if(IceInternal::connectionLost())
- {
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = IceInternal::getSocketErrno();
- throw ex;
- }
- else
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = IceInternal::getSocketErrno();
- throw ex;
- }
- }
- }
- return packetSize == static_cast<int>(_writeBuffer.end() - _writeI);
+ return packetSize == actualSize;
}
void
-IceSSL::TransceiverI::finishWrite(IceInternal::Buffer& /*buf*/)
+IceSSL::TransceiverI::finishWrite(IceInternal::Buffer& buf)
{
- if(_state < StateConnected)
+ if(_state < StateConnected && _state != StateProxyConnectRequest)
{
return;
}
@@ -710,12 +786,30 @@ IceSSL::TransceiverI::finishWrite(IceInternal::Buffer& /*buf*/)
}
}
- _writeI += _write.count;
+ if(_state == StateProxyConnectRequest)
+ {
+ buf.i += _write.count;
+ }
+ else
+ {
+ _writeI += _write.count;
+ }
}
void
IceSSL::TransceiverI::startRead(IceInternal::Buffer& buf)
{
+ if(_state == StateProxyConnectRequestPending)
+ {
+ //
+ // We need to read the proxy reply, but we have to use TCP and not SSL.
+ //
+ assert(!buf.b.empty() && buf.i != buf.b.end());
+ const int packetSize = static_cast<int>(buf.b.end() - buf.i);
+ readAsync(reinterpret_cast<char*>(&*buf.i), packetSize);
+ return;
+ }
+
if(_readI == _readBuffer.end())
{
assert(!buf.b.empty() && buf.i != buf.b.end());
@@ -723,7 +817,7 @@ IceSSL::TransceiverI::startRead(IceInternal::Buffer& buf)
ERR_clear_error(); // Clear any spurious errors.
#ifndef NDEBUG
- int ret =
+ int ret =
#endif
SSL_read(_ssl, reinterpret_cast<void*>(&*buf.i), static_cast<int>(buf.b.end() - buf.i));
assert(ret <= 0 && SSL_get_error(_ssl, ret) == SSL_ERROR_WANT_READ);
@@ -735,37 +829,12 @@ IceSSL::TransceiverI::startRead(IceInternal::Buffer& buf)
assert(!_readBuffer.empty() && _readI != _readBuffer.end());
- int packetSize = static_cast<int>(_readBuffer.end() - _readI);
- if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize)
- {
- packetSize = _maxReceivePacketSize;
- }
-
- _read.buf.len = packetSize;
- _read.buf.buf = reinterpret_cast<char*>(&*_readI);
- int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL);
- if(err == SOCKET_ERROR)
- {
- if(!IceInternal::wouldBlock())
- {
- if(IceInternal::connectionLost())
- {
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = IceInternal::getSocketErrno();
- throw ex;
- }
- else
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = IceInternal::getSocketErrno();
- throw ex;
- }
- }
- }
+ const int packetSize = static_cast<int>(_readBuffer.end() - _readI);
+ readAsync(reinterpret_cast<char*>(&*_readI), packetSize);
}
void
-IceSSL::TransceiverI::finishRead(IceInternal::Buffer& /*buf*/)
+IceSSL::TransceiverI::finishRead(IceInternal::Buffer& buf)
{
if(static_cast<int>(_read.count) == SOCKET_ERROR)
{
@@ -783,22 +852,36 @@ IceSSL::TransceiverI::finishRead(IceInternal::Buffer& /*buf*/)
throw ex;
}
}
+ else if(_read.count == 0)
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
- _readI += _read.count;
-
- if(_iocpBio && _readI == _readBuffer.end())
+ if(_state == StateProxyConnectRequestPending)
{
- assert(_readI == _readBuffer.end());
- int n = BIO_write(_iocpBio, &_readBuffer[0], static_cast<int>(_readBuffer.size()));
- if(n < 0) // Expected if the transceiver was closed.
+ buf.i += _read.count;
+ }
+ else
+ {
+ _readI += _read.count;
+
+ if(_iocpBio && _readI == _readBuffer.end())
{
- SecurityException ex(__FILE__, __LINE__);
- ex.reason = "SSL bio write failed";
- throw ex;
+ assert(_readI == _readBuffer.end());
+ int n = BIO_write(_iocpBio, &_readBuffer[0], static_cast<int>(_readBuffer.size()));
+ if(n < 0) // Expected if the transceiver was closed.
+ {
+ SecurityException ex(__FILE__, __LINE__);
+ ex.reason = "SSL bio write failed";
+ throw ex;
+ }
+ assert(n == static_cast<int>(_readBuffer.size()));
}
- assert(n == static_cast<int>(_readBuffer.size()));
}
}
+
#endif
string
@@ -828,15 +911,17 @@ IceSSL::TransceiverI::checkSendSize(const IceInternal::Buffer& buf, size_t messa
}
}
-IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const string& host,
- const IceInternal::Address& addr) :
+IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const IceInternal::NetworkProxyPtr& proxy,
+ const string& host, const IceInternal::Address& addr) :
IceInternal::NativeInfo(fd),
_instance(instance),
_logger(instance->communicator()->getLogger()),
_stats(instance->communicator()->getStats()),
- _ssl(0),
+ _proxy(proxy),
_host(host),
+ _addr(addr),
_incoming(false),
+ _ssl(0),
_state(StateNeedConnect)
#ifdef ICE_USE_IOCP
, _iocpBio(0),
@@ -863,7 +948,6 @@ IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const
_desc = IceInternal::fdToString(_fd);
}
#endif
- _connectAddr = addr;
}
IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const string& adapterName) :
@@ -871,9 +955,9 @@ IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const
_instance(instance),
_logger(instance->communicator()->getLogger()),
_stats(instance->communicator()->getStats()),
- _ssl(0),
- _incoming(true),
_adapterName(adapterName),
+ _incoming(true),
+ _ssl(0),
_state(StateConnected),
_desc(IceInternal::fdToString(fd))
#ifdef ICE_USE_IOCP
@@ -919,7 +1003,7 @@ IceSSL::TransceiverI::getNativeConnectionInfo() const
{
X509_free(cert);
}
-
+
if(chain != 0)
{
for(int i = 0; i < sk_X509_num(chain); ++i)
@@ -942,6 +1026,7 @@ IceSSL::TransceiverI::getNativeConnectionInfo() const
}
#ifdef ICE_USE_IOCP
+
bool
IceSSL::TransceiverI::receive()
{
@@ -1005,7 +1090,7 @@ IceSSL::TransceiverI::receive()
assert(_readI == _readBuffer.end());
#ifndef NDEBUG
- int n =
+ int n =
#endif
BIO_write(_iocpBio, &_readBuffer[0], static_cast<int>(_readBuffer.size()));
@@ -1021,7 +1106,7 @@ IceSSL::TransceiverI::send()
assert(BIO_ctrl_pending(_iocpBio));
_writeBuffer.resize(BIO_ctrl_pending(_iocpBio));
#ifndef NDEBUG
- int n =
+ int n =
#endif
BIO_read(_iocpBio, &_writeBuffer[0], static_cast<int>(_writeBuffer.size()));
assert(n == static_cast<int>(_writeBuffer.size()));
@@ -1085,4 +1170,226 @@ IceSSL::TransceiverI::send()
return true;
}
+int
+IceSSL::TransceiverI::writeAsync(char* buf, int packetSize)
+{
+ if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
+ {
+ packetSize = _maxSendPacketSize;
+ }
+
+ _write.buf.len = packetSize;
+ _write.buf.buf = buf;
+
+ int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL);
+
+ if(err == SOCKET_ERROR)
+ {
+ if(!IceInternal::wouldBlock())
+ {
+ if(IceInternal::connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ }
+ }
+
+ return packetSize;
+}
+
+int
+IceSSL::TransceiverI::readAsync(char* buf, int packetSize)
+{
+ if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize)
+ {
+ packetSize = _maxReceivePacketSize;
+ }
+
+ _read.buf.len = packetSize;
+ _read.buf.buf = buf;
+
+ int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL);
+
+ if(err == SOCKET_ERROR)
+ {
+ if(!IceInternal::wouldBlock())
+ {
+ if(IceInternal::connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ }
+ }
+
+ return packetSize;
+}
+
+#endif
+
+bool
+IceSSL::TransceiverI::writeRaw(IceInternal::Buffer& buf)
+{
+ //
+ // It's impossible for packetSize to be more than an Int.
+ //
+ int packetSize = static_cast<int>(buf.b.end() - buf.i);
+#ifdef ICE_USE_IOCP
+ //
+ // Limit packet size to avoid performance problems on WIN32
+ //
+ if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
+ {
+ packetSize = _maxSendPacketSize;
+ }
#endif
+ while(buf.i != buf.b.end())
+ {
+ assert(_fd != INVALID_SOCKET);
+
+ ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0);
+ if(ret == 0)
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
+
+ if(ret == SOCKET_ERROR)
+ {
+ if(IceInternal::interrupted())
+ {
+ continue;
+ }
+
+ if(IceInternal::noBuffers() && packetSize > 1024)
+ {
+ packetSize /= 2;
+ continue;
+ }
+
+ if(IceInternal::wouldBlock())
+ {
+ return false;
+ }
+
+ if(IceInternal::connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ }
+
+ if(_instance->networkTraceLevel() >= 3)
+ {
+ Trace out(_logger, _instance->networkTraceCategory());
+ out << "sent " << ret << " of " << packetSize << " bytes via tcp\n" << toString();
+ }
+
+ if(_stats)
+ {
+ _stats->bytesSent("tcp", static_cast<Int>(ret));
+ }
+
+ buf.i += ret;
+
+ if(packetSize > buf.b.end() - buf.i)
+ {
+ packetSize = static_cast<int>(buf.b.end() - buf.i);
+ }
+ }
+
+ return true;
+}
+
+bool
+IceSSL::TransceiverI::readRaw(IceInternal::Buffer& buf)
+{
+ //
+ // It's impossible for packetSize to be more than an Int.
+ //
+ int packetSize = static_cast<int>(buf.b.end() - buf.i);
+ while(buf.i != buf.b.end())
+ {
+ assert(_fd != INVALID_SOCKET);
+ ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0);
+
+ if(ret == 0)
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
+
+ if(ret == SOCKET_ERROR)
+ {
+ if(IceInternal::interrupted())
+ {
+ continue;
+ }
+
+ if(IceInternal::noBuffers() && packetSize > 1024)
+ {
+ packetSize /= 2;
+ continue;
+ }
+
+ if(IceInternal::wouldBlock())
+ {
+ return false;
+ }
+
+ if(IceInternal::connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ }
+
+ if(_instance->networkTraceLevel() >= 3)
+ {
+ Trace out(_logger, _instance->networkTraceCategory());
+ out << "received " << ret << " of " << packetSize << " bytes via tcp\n" << toString();
+ }
+
+ if(_stats)
+ {
+ _stats->bytesReceived("tcp", static_cast<Int>(ret));
+ }
+
+ buf.i += ret;
+
+ packetSize = static_cast<int>(buf.b.end() - buf.i);
+ }
+
+ return true;
+}
diff --git a/cpp/src/IceSSL/TransceiverI.h b/cpp/src/IceSSL/TransceiverI.h
index c96265f58f4..23d6da3975e 100644
--- a/cpp/src/IceSSL/TransceiverI.h
+++ b/cpp/src/IceSSL/TransceiverI.h
@@ -33,6 +33,8 @@ class TransceiverI : public IceInternal::Transceiver, public IceInternal::Native
{
StateNeedConnect,
StateConnectPending,
+ StateProxyConnectRequest,
+ StateProxyConnectRequestPending,
StateConnected,
StateHandshakeComplete
};
@@ -44,7 +46,7 @@ public:
virtual IceInternal::AsyncInfo* getAsyncInfo(IceInternal::SocketOperation);
#endif
- virtual IceInternal::SocketOperation initialize();
+ virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&);
virtual void close();
virtual bool write(IceInternal::Buffer&);
virtual bool read(IceInternal::Buffer&);
@@ -61,17 +63,23 @@ public:
private:
- TransceiverI(const InstancePtr&, SOCKET, const std::string&, const IceInternal::Address&);
+ TransceiverI(const InstancePtr&, SOCKET, const IceInternal::NetworkProxyPtr&, const std::string&,
+ const IceInternal::Address&);
TransceiverI(const InstancePtr&, SOCKET, const std::string&);
virtual ~TransceiverI();
virtual NativeConnectionInfoPtr getNativeConnectionInfo() const;
#ifdef ICE_USE_IOCP
- bool send();
bool receive();
+ bool send();
+ int writeAsync(char*, int);
+ int readAsync(char*, int);
#endif
+ bool writeRaw(IceInternal::Buffer&);
+ bool readRaw(IceInternal::Buffer&);
+
friend class ConnectorI;
friend class AcceptorI;
@@ -79,16 +87,17 @@ private:
const Ice::LoggerPtr _logger;
const Ice::StatsPtr _stats;
- SSL* _ssl;
-
+ const IceInternal::NetworkProxyPtr _proxy;
const std::string _host;
+ const IceInternal::Address _addr;
- const bool _incoming;
const std::string _adapterName;
+ const bool _incoming;
+
+ SSL* _ssl;
State _state;
std::string _desc;
- IceInternal::Address _connectAddr;
#ifdef ICE_USE_IOCP
int _maxSendPacketSize;
int _maxReceivePacketSize;