summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp37
-rw-r--r--cpp/src/Ice/ConnectionI.h1
-rw-r--r--cpp/src/Ice/Network.cpp3
-rw-r--r--cpp/src/Ice/ReferenceFactory.cpp5
-rw-r--r--cpp/src/Ice/UdpTransceiver.cpp431
-rw-r--r--cpp/src/Ice/UdpTransceiver.h16
6 files changed, 324 insertions, 169 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 310f4575395..50d922be521 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -423,6 +423,12 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
assert(_state > StateNotValidated);
assert(_state < StateClosing);
+ //
+ // Ensure the message isn't bigger than what we can send with the
+ // transport.
+ //
+ _transceiver->checkSendSize(*os, _instance->messageSizeMax());
+
Int requestId;
if(response)
{
@@ -494,6 +500,12 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
assert(_state > StateNotValidated);
assert(_state < StateClosing);
+ //
+ // Ensure the message isn't bigger than what we can send with the
+ // transport.
+ //
+ _transceiver->checkSendSize(*os, _instance->messageSizeMax());
+
Int requestId;
if(response)
{
@@ -1099,13 +1111,14 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
if(current.operation & SocketOperationRead && !_readStream.b.empty())
{
- if(static_cast<Int>(_readStream.b.size()) == headerSize) // Read header.
+ if(_readHeader) // Read header if necessary.
{
if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream))
{
return;
}
assert(_readStream.i == _readStream.b.end());
+ _readHeader = false;
ptrdiff_t pos = _readStream.i - _readStream.b.begin();
if(pos < headerSize)
@@ -1173,18 +1186,12 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
_readStream.i = _readStream.b.begin() + pos;
}
-
+
if(_readStream.i != _readStream.b.end())
{
if(_endpoint->datagram())
{
- if(_warnUdp)
- {
- Warning out(_instance->initializationData().logger);
- out << "DatagramLimitException: maximum size of " << _readStream.i - _readStream.b.begin()
- << " exceeded";
- }
- throw DatagramLimitException(__FILE__, __LINE__);
+ throw DatagramLimitException(__FILE__, __LINE__); // The message was truncated.
}
else
{
@@ -1236,8 +1243,14 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
catch(const DatagramLimitException&) // Expected.
{
+ if(_warnUdp)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "maximum datagram size of " << _readStream.i - _readStream.b.begin() << " exceeded";
+ }
_readStream.resize(headerSize);
_readStream.i = _readStream.b.begin();
+ _readHeader = true;
return;
}
catch(const SocketException& ex)
@@ -1249,13 +1262,14 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
{
if(_endpoint->datagram())
{
- if(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0)
+ if(_warn)
{
Warning out(_instance->initializationData().logger);
out << "datagram connection exception:\n" << ex << '\n' << _desc;
}
_readStream.resize(headerSize);
_readStream.i = _readStream.b.begin();
+ _readHeader = true;
}
else
{
@@ -1517,6 +1531,7 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
_batchRequestCompress(false),
_batchMarker(0),
_readStream(_instance.get()),
+ _readHeader(false),
_writeStream(_instance.get()),
_dispatchCount(0),
_state(StateNotInitialized)
@@ -1973,6 +1988,7 @@ Ice::ConnectionI::validate(SocketOperation operation)
_readStream.resize(headerSize);
_readStream.i = _readStream.b.begin();
+ _readHeader = true;
return true;
}
@@ -2370,6 +2386,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
_readStream.swap(stream);
_readStream.resize(headerSize);
_readStream.i = _readStream.b.begin();
+ _readHeader = true;
assert(stream.i == stream.b.end());
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 688a6f47ccd..0ba830b18be 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -301,6 +301,7 @@ private:
std::deque<OutgoingMessage> _sendStreams;
IceInternal::BasicStream _readStream;
+ bool _readHeader;
IceInternal::BasicStream _writeStream;
int _dispatchCount;
diff --git a/cpp/src/Ice/Network.cpp b/cpp/src/Ice/Network.cpp
index 80eb685b413..089e275f49d 100644
--- a/cpp/src/Ice/Network.cpp
+++ b/cpp/src/Ice/Network.cpp
@@ -623,7 +623,8 @@ bool
IceInternal::recvTruncated()
{
#ifdef _WIN32
- return WSAGetLastError() == WSAEMSGSIZE;
+ int err = WSAGetLastError();
+ return err == WSAEMSGSIZE || err == ERROR_MORE_DATA;
#else
// We don't get an error under Linux if a datagram is truncated.
return false;
diff --git a/cpp/src/Ice/ReferenceFactory.cpp b/cpp/src/Ice/ReferenceFactory.cpp
index 0cbad088dcc..6dc659b9f63 100644
--- a/cpp/src/Ice/ReferenceFactory.cpp
+++ b/cpp/src/Ice/ReferenceFactory.cpp
@@ -13,6 +13,7 @@
#include <Ice/LocalException.h>
#include <Ice/Instance.h>
#include <Ice/EndpointI.h>
+#include <Ice/ConnectionI.h>
#include <Ice/EndpointFactoryManager.h>
#include <Ice/RouterInfo.h>
#include <Ice/Router.h>
@@ -86,8 +87,8 @@ IceInternal::ReferenceFactory::create(const Identity& ident, const Ice::Connecti
_communicator,
ident,
"", // Facet
- Reference::ModeTwoway,
- false,
+ connection->endpoint()->datagram() ? Reference::ModeDatagram : Reference::ModeTwoway,
+ connection->endpoint()->secure(),
connection);
}
diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp
index a2cdded6f8a..d185c44cf9e 100644
--- a/cpp/src/Ice/UdpTransceiver.cpp
+++ b/cpp/src/Ice/UdpTransceiver.cpp
@@ -58,34 +58,45 @@ IceInternal::UdpTransceiver::getAsyncInfo(SocketOperation status)
SocketOperation
IceInternal::UdpTransceiver::initialize()
{
- if(!_incoming)
+ if(_state == StateNeedConnect)
{
- if(_connect)
+ _state = StateConnectPending;
+ return SocketOperationConnect;
+ }
+ else if(_state <= StateConnectPending)
+ {
+ try
{
- //
- // If we're not connected yet, return SocketOperationConnect. The transceiver will be
- // connected once initialize is called again.
- //
- _connect = false;
- return SocketOperationConnect;
+ doFinishConnect(_fd);
+ _state = StateConnected;
}
- else
+ catch(const Ice::LocalException& ex)
{
- if(_traceLevels->network >= 1)
+ if(_traceLevels->network >= 2)
{
Trace out(_logger, _traceLevels->networkCat);
- out << "starting to send udp packets\n" << toString();
+ out << "failed to connect udp socket\n" << toString() << "\n" << ex;
}
- return SocketOperationNone;
+ throw;
+ }
+ }
+
+ if(_state == StateConnected)
+ {
+ if(_traceLevels->network >= 1)
+ {
+ Trace out(_logger, _traceLevels->networkCat);
+ out << "starting to send udp packets\n" << toString();
}
}
+ assert(_state >= StateConnected);
return SocketOperationNone;
}
void
IceInternal::UdpTransceiver::close()
{
- if(!_connect && _traceLevels->network >= 1)
+ if(_state >= StateConnected && _traceLevels->network >= 1)
{
Trace out(_logger, _traceLevels->networkCat);
out << "closing udp connection\n" << toString();
@@ -100,28 +111,49 @@ bool
IceInternal::UdpTransceiver::write(Buffer& buf)
{
assert(buf.i == buf.b.begin());
- //
- // The maximum packetSize is either the maximum allowable UDP
- // packet size, or the UDP send buffer size (which ever is
- // smaller).
- //
- const int packetSize = min(_maxPacketSize, _sndSize - _udpOverhead);
- if(packetSize < static_cast<int>(buf.b.size()))
+ assert(_fd != INVALID_SOCKET && _state >= StateConnected);
+
+ // The caller is supposed to check the send size before by calling checkSendSize
+ assert(min(_maxPacketSize, _sndSize - _udpOverhead) >= static_cast<int>(buf.b.size()));
+
+repeat:
+
+ ssize_t ret;
+ if(_state == StateConnected)
{
- //
- // We don't log a warning here because the client gets an exception anyway.
- //
- throw DatagramLimitException(__FILE__, __LINE__);
+#ifdef _WIN32
+ ret = ::send(_fd, reinterpret_cast<const char*>(&buf.b[0]), static_cast<int>(buf.b.size()), 0);
+#else
+ ret = ::send(_fd, reinterpret_cast<const char*>(&buf.b[0]), buf.b.size(), 0);
+#endif
}
+ else
+ {
+ socklen_t len = static_cast<socklen_t>(sizeof(_peerAddr));
+ if(_peerAddr.ss_family == AF_INET)
+ {
+ len = sizeof(sockaddr_in);
+ }
+ else if(_peerAddr.ss_family == AF_INET6)
+ {
+ len = sizeof(sockaddr_in6);
+ }
+ else
+ {
+ // No peer has sent a datagram yet.
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
-repeat:
- assert(!_connect);
- assert(_fd != INVALID_SOCKET);
#ifdef _WIN32
- ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&buf.b[0]), static_cast<int>(buf.b.size()), 0);
+ ret = ::sendto(_fd, reinterpret_cast<const char*>(&buf.b[0]), static_cast<int>(buf.b.size()), 0,
+ reinterpret_cast<struct sockaddr*>(&_peerAddr), len);
#else
- ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&buf.b[0]), buf.b.size(), 0);
+ ret = ::sendto(_fd, reinterpret_cast<const char*>(&buf.b[0]), buf.b.size(), 0,
+ reinterpret_cast<struct sockaddr*>(&_peerAddr), len);
#endif
+ }
if(ret == SOCKET_ERROR)
{
@@ -160,94 +192,92 @@ bool
IceInternal::UdpTransceiver::read(Buffer& buf)
{
assert(buf.i == buf.b.begin());
+ assert(_fd != INVALID_SOCKET);
- //
- // The maximum packetSize is either the maximum allowable UDP
- // packet size, or the UDP send buffer size (which ever is
- // smaller).
- //
const int packetSize = min(_maxPacketSize, _rcvSize - _udpOverhead);
- if(packetSize < static_cast<int>(buf.b.size()))
- {
- //
- // We log a warning here because this is the server side -- without the
- // the warning, there would only be silence.
- //
- if(_warn)
- {
- Warning out(_logger);
- out << "DatagramLimitException: maximum size of " << packetSize << " exceeded";
- }
- throw DatagramLimitException(__FILE__, __LINE__);
- }
buf.b.resize(packetSize);
buf.i = buf.b.begin();
repeat:
ssize_t ret;
- if(_connect)
+ if(_state == StateConnected)
{
- //
- // If we must connect, then we connect to the first peer that
- // sends us a packet.
- //
- struct sockaddr_storage peerAddr;
+ ret = ::recv(_fd, reinterpret_cast<char*>(&buf.b[0]), packetSize, 0);
+ }
+ else
+ {
+ assert(_incoming);
+
+ sockaddr_storage peerAddr;
memset(&peerAddr, 0, sizeof(struct sockaddr_storage));
socklen_t len = static_cast<socklen_t>(sizeof(peerAddr));
- assert(_fd != INVALID_SOCKET);
- ret = recvfrom(_fd, reinterpret_cast<char*>(&buf.b[0]), packetSize,
- 0, reinterpret_cast<struct sockaddr*>(&peerAddr), &len);
+
+ ret = recvfrom(_fd, reinterpret_cast<char*>(&buf.b[0]), packetSize, 0,
+ reinterpret_cast<struct sockaddr*>(&peerAddr), &len);
+
if(ret != SOCKET_ERROR)
{
-#ifndef NDEBUG
- bool connected = doConnect(_fd, peerAddr);
- assert(connected);
-#else
- doConnect(_fd, peerAddr);
-#endif
- _connect = false; // We are connected now.
-
- if(_traceLevels->network >= 1)
- {
- Trace out(_logger, _traceLevels->networkCat);
- out << "connected udp socket\n" << toString();
- }
+ _peerAddr = peerAddr;
}
}
- else
- {
- assert(_fd != INVALID_SOCKET);
- ret = ::recv(_fd, reinterpret_cast<char*>(&buf.b[0]), packetSize, 0);
- }
if(ret == SOCKET_ERROR)
{
- if(interrupted())
- {
- goto repeat;
- }
-
- if(wouldBlock())
+ if(recvTruncated())
{
- return false;
+ // The message was truncated and the whole buffer is filled. We ignore
+ // this error here, it will be detected at the connection level when
+ // the Ice message size is checked against the buffer size.
+ ret = buf.b.size();
}
-
- if(recvTruncated())
+ else
{
- DatagramLimitException ex(__FILE__, __LINE__);
- if(_warn)
+ if(interrupted())
{
- Warning out(_logger);
- out << "DatagramLimitException: maximum size of " << packetSize << " exceeded";
+ goto repeat;
+ }
+
+ if(wouldBlock())
+ {
+ return false;
+ }
+
+ if(connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
}
- throw ex;
-
}
+ }
+
+ if(_state == StateNeedConnect)
+ {
+ //
+ // If we must connect, we connect to the first peer that sends us a packet.
+ //
+ assert(_incoming); // Client connections should always be connected at this point.
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+#ifndef NDEBUG
+ bool connected = doConnect(_fd, _peerAddr);
+ assert(connected);
+#else
+ doConnect(_fd, _peerAddr);
+#endif
+ _state = StateConnected;
+
+ if(_traceLevels->network >= 1)
+ {
+ Trace out(_logger, _traceLevels->networkCat);
+ out << "connected udp socket\n" << toString();
+ }
}
if(_traceLevels->network >= 3)
@@ -272,26 +302,40 @@ IceInternal::UdpTransceiver::startWrite(Buffer& buf)
{
assert(buf.i == buf.b.begin());
- //
- // The maximum packetSize is either the maximum allowable UDP
- // packet size, or the UDP send buffer size (which ever is
- // smaller).
- //
- const int packetSize = min(_maxPacketSize, _sndSize - _udpOverhead);
- if(packetSize < static_cast<int>(buf.b.size()))
- {
- //
- // We don't log a warning here because the client gets an exception anyway.
- //
- throw DatagramLimitException(__FILE__, __LINE__);
- }
+ // The caller is supposed to check the send size before by calling checkSendSize
+ assert(min(_maxPacketSize, _sndSize - _udpOverhead) >= static_cast<int>(buf.b.size()));
- assert(!_connect);
assert(_fd != INVALID_SOCKET);
_write.buf.len = static_cast<int>(buf.b.size());
_write.buf.buf = reinterpret_cast<char*>(&*buf.i);
- int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL);
+ int err;
+ if(_state == StateConnected)
+ {
+ err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL);
+ }
+ else
+ {
+ socklen_t len = static_cast<socklen_t>(sizeof(_peerAddr));
+ if(_peerAddr.ss_family == AF_INET)
+ {
+ len = sizeof(sockaddr_in);
+ }
+ else if(_peerAddr.ss_family == AF_INET6)
+ {
+ len = sizeof(sockaddr_in6);
+ }
+ else
+ {
+ // No peer has sent a datagram yet.
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
+ err = WSASendTo(_fd, &_write.buf, 1, &_write.count, 0, reinterpret_cast<struct sockaddr*>(&_peerAddr),
+ len, &_write, NULL);
+ }
+
if(err == SOCKET_ERROR)
{
if(!wouldBlock())
@@ -350,25 +394,7 @@ IceInternal::UdpTransceiver::finishWrite(Buffer& buf)
void
IceInternal::UdpTransceiver::startRead(Buffer& buf)
{
- //
- // The maximum packetSize is either the maximum allowable UDP
- // packet size, or the UDP send buffer size (which ever is
- // smaller).
- //
const int packetSize = min(_maxPacketSize, _rcvSize - _udpOverhead);
- if(packetSize < static_cast<int>(buf.b.size()))
- {
- //
- // We log a warning here because this is the server side -- without the
- // the warning, there would only be silence.
- //
- if(_warn)
- {
- Warning out(_logger);
- out << "DatagramLimitException: maximum size of " << packetSize << " exceeded";
- }
- throw DatagramLimitException(__FILE__, __LINE__);
- }
buf.b.resize(packetSize);
buf.i = buf.b.begin();
@@ -376,10 +402,27 @@ IceInternal::UdpTransceiver::startRead(Buffer& buf)
_read.buf.len = packetSize;
_read.buf.buf = reinterpret_cast<char*>(&*buf.i);
- int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL);
+ int err;
+ if(_state == StateConnected)
+ {
+ err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL);
+ }
+ else
+ {
+ memset(&_readAddr, 0, sizeof(struct sockaddr_storage));
+ _readAddrLen = static_cast<socklen_t>(sizeof(_readAddr));
+
+ err = WSARecvFrom(_fd, &_read.buf, 1, &_read.count, &_read.flags,
+ reinterpret_cast<struct sockaddr*>(&_readAddr), &_readAddrLen, &_read, NULL);
+ }
+
if(err == SOCKET_ERROR)
{
- if(!wouldBlock())
+ if(recvTruncated())
+ {
+ // Nothing to do.
+ }
+ else if(!wouldBlock())
{
if(connectionLost())
{
@@ -403,19 +446,35 @@ IceInternal::UdpTransceiver::finishRead(Buffer& buf)
if(_read.count == SOCKET_ERROR)
{
WSASetLastError(_read.error);
- if(connectionLost())
+
+ if(recvTruncated())
{
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ // The message was truncated and the whole buffer is filled. We ignore
+ // this error here, it will be detected at the connection level when
+ // the Ice message size is checked against the buffer size.
+ _read.count = buf.b.size();
}
else
{
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ if(connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
}
}
+
+ if(_state == StateNotConnected)
+ {
+ _peerAddr = _readAddr;
+ }
if(_traceLevels->network >= 3)
{
@@ -442,14 +501,32 @@ IceInternal::UdpTransceiver::type() const
string
IceInternal::UdpTransceiver::toString() const
{
- if(_mcastAddr.ss_family != AF_UNSPEC && _fd != INVALID_SOCKET)
+ if(_fd == INVALID_SOCKET)
{
- return fdToString(_fd) + "\nmulticast address = " + addrToString(_mcastAddr);
+ return "<closed>";
+ }
+
+ ostringstream s;
+ if(_state == StateNotConnected)
+ {
+ struct sockaddr_storage localAddr;
+ fdToLocalAddress(_fd, localAddr);
+ s << "local address = " << addrToString(localAddr);
+ if(_peerAddr.ss_family != AF_UNSPEC)
+ {
+ s << "\nremote address = " << addrToString(_peerAddr);
+ }
}
else
{
- return fdToString(_fd);
+ s << fdToString(_fd);
+ }
+
+ if(_mcastAddr.ss_family != AF_UNSPEC)
+ {
+ s << "\nmulticast address = " + addrToString(_mcastAddr);
}
+ return s.str();
}
Ice::ConnectionInfoPtr
@@ -457,8 +534,33 @@ IceInternal::UdpTransceiver::getInfo() const
{
assert(_fd != INVALID_SOCKET);
Ice::UDPConnectionInfoPtr info = new Ice::UDPConnectionInfo();
- fdToAddressAndPort(_fd, info->localAddress, info->localPort, info->remoteAddress, info->remotePort);
- addrToAddressAndPort(_mcastAddr, info->mcastAddress, info->mcastPort);
+ if(_state == StateNotConnected)
+ {
+ struct sockaddr_storage localAddr;
+ fdToLocalAddress(_fd, localAddr);
+ addrToAddressAndPort(localAddr, info->localAddress, info->localPort);
+ if(_peerAddr.ss_family != AF_UNSPEC)
+ {
+ addrToAddressAndPort(_peerAddr, info->remoteAddress, info->remotePort);
+ }
+ else
+ {
+ info->remotePort = 0;
+ }
+ }
+ else
+ {
+ fdToAddressAndPort(_fd, info->localAddress, info->localPort, info->remoteAddress, info->remotePort);
+ }
+
+ if(_mcastAddr.ss_family != AF_UNSPEC)
+ {
+ addrToAddressAndPort(_mcastAddr, info->mcastAddress, info->mcastPort);
+ }
+ else
+ {
+ info->mcastPort = 0;
+ }
return info;
}
@@ -469,6 +571,11 @@ IceInternal::UdpTransceiver::checkSendSize(const Buffer& buf, size_t messageSize
{
Ex::throwMemoryLimitException(__FILE__, __LINE__, buf.b.size(), messageSizeMax);
}
+
+ //
+ // The maximum packetSize is either the maximum allowable UDP packet size, or
+ // the UDP send buffer size (which ever is smaller).
+ //
const int packetSize = min(_maxPacketSize, _sndSize - _udpOverhead);
if(packetSize < static_cast<int>(buf.b.size()))
{
@@ -489,24 +596,40 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s
_stats(instance->initializationData().stats),
_incoming(false),
_addr(addr),
- _connect(true),
- _warn(instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0)
+ _state(StateNeedConnect)
#ifdef ICE_USE_IOCP
, _read(SocketOperationRead),
_write(SocketOperationWrite)
#endif
{
- // AF_UNSPEC means not multicast.
- _mcastAddr.ss_family = AF_UNSPEC;
+ _mcastAddr.ss_family = AF_UNSPEC; // AF_UNSPEC means not multicast.
+ _peerAddr.ss_family = AF_UNSPEC; // Not initialized yet.
_fd = createSocket(true, _addr.ss_family);
setBufSize(instance);
setBlock(_fd, false);
+ //
+ // In general, connecting a datagram socket should be non-blocking as this just setups
+ // the default destination address for the socket. However, on some OS, connect sometime
+ // returns EWOULDBLOCK. If that's the case, we keep the state as StateNeedConnect. This
+ // will make sure the transceiver is notified when the socket is ready for sending (see
+ // the initialize() implementation).
+ //
if(doConnect(_fd, _addr))
{
- _connect = false; // We're connected now
+ _state = StateConnected;
}
+
+#ifdef ICE_USE_IOCP
+ //
+ // On Windows when using IOCP, we must make sure that the socket is connected without
+ // blocking as there's no way to do a non-blocking datagram socket conection (ConnectEx
+ // only supports connection oriented sockets). According to Microsoft documentation of
+ // the connect() call, this should always be the case.
+ //
+ assert(_state == StateConnected);
+#endif
if(isMulticast(_addr))
{
@@ -528,8 +651,7 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s
_stats(instance->initializationData().stats),
_incoming(true),
_addr(getAddressForServer(host, port, instance->protocolSupport())),
- _connect(connect),
- _warn(instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0)
+ _state(connect ? StateNeedConnect : StateNotConnected)
#ifdef ICE_USE_IOCP
, _read(SocketOperationRead),
_write(SocketOperationWrite)
@@ -544,6 +666,8 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s
out << "attempting to bind to udp socket " << addrToString(_addr);
}
+ _peerAddr.ss_family = AF_UNSPEC; // Not assigned yet.
+
if(isMulticast(_addr))
{
setReuseAddress(_fd, true);
@@ -552,11 +676,13 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s
#ifdef _WIN32
//
// Windows does not allow binding to the mcast address itself
- // so we bind to INADDR_ANY (0.0.0.0) instead.
+ // so we bind to INADDR_ANY (0.0.0.0) instead. As a result,
+ // bi-directional connection won't work because the source
+ // address won't be the multicast address and the client will
+ // therefore reject the datagram.
//
- const_cast<struct sockaddr_storage&>(_addr) =
- getAddressForServer("", getPort(_mcastAddr),
- _mcastAddr.ss_family == AF_INET ? EnableIPv4 : EnableIPv6);
+ const_cast<struct sockaddr_storage&>(_addr) =
+ getAddressForServer("", port, _mcastAddr.ss_family == AF_INET ? EnableIPv4 : EnableIPv6);
#endif
const_cast<struct sockaddr_storage&>(_addr) = doBind(_fd, _addr);
@@ -585,8 +711,7 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s
#endif
const_cast<struct sockaddr_storage&>(_addr) = doBind(_fd, _addr);
- // AF_UNSPEC means not multicast.
- _mcastAddr.ss_family = AF_UNSPEC;
+ _mcastAddr.ss_family = AF_UNSPEC; // AF_UNSPEC means not multicast.
}
if(_traceLevels->network >= 1)
@@ -645,7 +770,7 @@ IceInternal::UdpTransceiver::setBufSize(const InstancePtr& instance)
// Get property for buffer size and check for sanity.
//
Int sizeRequested = instance->initializationData().properties->getPropertyAsIntWithDefault(prop, dfltSize);
- if(sizeRequested < _udpOverhead)
+ if(sizeRequested < (_udpOverhead + headerSize))
{
Warning out(_logger);
out << "Invalid " << prop << " value of " << sizeRequested << " adjusted to " << dfltSize;
diff --git a/cpp/src/Ice/UdpTransceiver.h b/cpp/src/Ice/UdpTransceiver.h
index f7c918184a8..0883c13bb39 100644
--- a/cpp/src/Ice/UdpTransceiver.h
+++ b/cpp/src/Ice/UdpTransceiver.h
@@ -35,6 +35,14 @@ class SUdpTransceiver;
class UdpTransceiver : public Transceiver, public NativeInfo
{
+ enum State
+ {
+ StateNeedConnect,
+ StateConnectPending,
+ StateConnected,
+ StateNotConnected
+ };
+
public:
virtual NativeInfoPtr getNativeInfo();
@@ -76,17 +84,19 @@ private:
const bool _incoming;
const struct sockaddr_storage _addr;
struct sockaddr_storage _mcastAddr;
-
- bool _connect;
+ struct sockaddr_storage _peerAddr;
+
+ State _state;
int _rcvSize;
int _sndSize;
- const bool _warn;
static const int _udpOverhead;
static const int _maxPacketSize;
#ifdef ICE_USE_IOCP
AsyncInfo _read;
AsyncInfo _write;
+ struct sockaddr_storage _readAddr;
+ socklen_t _readAddrLen;
#endif
};