summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/UdpTransceiver.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2009-10-27 12:00:32 +0100
committerBenoit Foucher <benoit@zeroc.com>2009-10-27 12:00:32 +0100
commit1fdb973182e589b0d20e987360bd694ae783b0a2 (patch)
treea94cbbcc0d078bcf7aa4427951799e09feb80826 /cpp/src/Ice/UdpTransceiver.cpp
parentadd support for number protocol in Python (diff)
downloadice-1fdb973182e589b0d20e987360bd694ae783b0a2.tar.bz2
ice-1fdb973182e589b0d20e987360bd694ae783b0a2.tar.xz
ice-1fdb973182e589b0d20e987360bd694ae783b0a2.zip
Cleaned up UDP transceivers, fixes for bug 4223 and 4320
Diffstat (limited to 'cpp/src/Ice/UdpTransceiver.cpp')
-rw-r--r--cpp/src/Ice/UdpTransceiver.cpp431
1 files changed, 278 insertions, 153 deletions
diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp
index a2cdded6f8a..d185c44cf9e 100644
--- a/cpp/src/Ice/UdpTransceiver.cpp
+++ b/cpp/src/Ice/UdpTransceiver.cpp
@@ -58,34 +58,45 @@ IceInternal::UdpTransceiver::getAsyncInfo(SocketOperation status)
SocketOperation
IceInternal::UdpTransceiver::initialize()
{
- if(!_incoming)
+ if(_state == StateNeedConnect)
{
- if(_connect)
+ _state = StateConnectPending;
+ return SocketOperationConnect;
+ }
+ else if(_state <= StateConnectPending)
+ {
+ try
{
- //
- // If we're not connected yet, return SocketOperationConnect. The transceiver will be
- // connected once initialize is called again.
- //
- _connect = false;
- return SocketOperationConnect;
+ doFinishConnect(_fd);
+ _state = StateConnected;
}
- else
+ catch(const Ice::LocalException& ex)
{
- if(_traceLevels->network >= 1)
+ if(_traceLevels->network >= 2)
{
Trace out(_logger, _traceLevels->networkCat);
- out << "starting to send udp packets\n" << toString();
+ out << "failed to connect udp socket\n" << toString() << "\n" << ex;
}
- return SocketOperationNone;
+ throw;
+ }
+ }
+
+ if(_state == StateConnected)
+ {
+ if(_traceLevels->network >= 1)
+ {
+ Trace out(_logger, _traceLevels->networkCat);
+ out << "starting to send udp packets\n" << toString();
}
}
+ assert(_state >= StateConnected);
return SocketOperationNone;
}
void
IceInternal::UdpTransceiver::close()
{
- if(!_connect && _traceLevels->network >= 1)
+ if(_state >= StateConnected && _traceLevels->network >= 1)
{
Trace out(_logger, _traceLevels->networkCat);
out << "closing udp connection\n" << toString();
@@ -100,28 +111,49 @@ bool
IceInternal::UdpTransceiver::write(Buffer& buf)
{
assert(buf.i == buf.b.begin());
- //
- // The maximum packetSize is either the maximum allowable UDP
- // packet size, or the UDP send buffer size (which ever is
- // smaller).
- //
- const int packetSize = min(_maxPacketSize, _sndSize - _udpOverhead);
- if(packetSize < static_cast<int>(buf.b.size()))
+ assert(_fd != INVALID_SOCKET && _state >= StateConnected);
+
+ // The caller is supposed to check the send size before by calling checkSendSize
+ assert(min(_maxPacketSize, _sndSize - _udpOverhead) >= static_cast<int>(buf.b.size()));
+
+repeat:
+
+ ssize_t ret;
+ if(_state == StateConnected)
{
- //
- // We don't log a warning here because the client gets an exception anyway.
- //
- throw DatagramLimitException(__FILE__, __LINE__);
+#ifdef _WIN32
+ ret = ::send(_fd, reinterpret_cast<const char*>(&buf.b[0]), static_cast<int>(buf.b.size()), 0);
+#else
+ ret = ::send(_fd, reinterpret_cast<const char*>(&buf.b[0]), buf.b.size(), 0);
+#endif
}
+ else
+ {
+ socklen_t len = static_cast<socklen_t>(sizeof(_peerAddr));
+ if(_peerAddr.ss_family == AF_INET)
+ {
+ len = sizeof(sockaddr_in);
+ }
+ else if(_peerAddr.ss_family == AF_INET6)
+ {
+ len = sizeof(sockaddr_in6);
+ }
+ else
+ {
+ // No peer has sent a datagram yet.
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
-repeat:
- assert(!_connect);
- assert(_fd != INVALID_SOCKET);
#ifdef _WIN32
- ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&buf.b[0]), static_cast<int>(buf.b.size()), 0);
+ ret = ::sendto(_fd, reinterpret_cast<const char*>(&buf.b[0]), static_cast<int>(buf.b.size()), 0,
+ reinterpret_cast<struct sockaddr*>(&_peerAddr), len);
#else
- ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&buf.b[0]), buf.b.size(), 0);
+ ret = ::sendto(_fd, reinterpret_cast<const char*>(&buf.b[0]), buf.b.size(), 0,
+ reinterpret_cast<struct sockaddr*>(&_peerAddr), len);
#endif
+ }
if(ret == SOCKET_ERROR)
{
@@ -160,94 +192,92 @@ bool
IceInternal::UdpTransceiver::read(Buffer& buf)
{
assert(buf.i == buf.b.begin());
+ assert(_fd != INVALID_SOCKET);
- //
- // The maximum packetSize is either the maximum allowable UDP
- // packet size, or the UDP send buffer size (which ever is
- // smaller).
- //
const int packetSize = min(_maxPacketSize, _rcvSize - _udpOverhead);
- if(packetSize < static_cast<int>(buf.b.size()))
- {
- //
- // We log a warning here because this is the server side -- without the
- // the warning, there would only be silence.
- //
- if(_warn)
- {
- Warning out(_logger);
- out << "DatagramLimitException: maximum size of " << packetSize << " exceeded";
- }
- throw DatagramLimitException(__FILE__, __LINE__);
- }
buf.b.resize(packetSize);
buf.i = buf.b.begin();
repeat:
ssize_t ret;
- if(_connect)
+ if(_state == StateConnected)
{
- //
- // If we must connect, then we connect to the first peer that
- // sends us a packet.
- //
- struct sockaddr_storage peerAddr;
+ ret = ::recv(_fd, reinterpret_cast<char*>(&buf.b[0]), packetSize, 0);
+ }
+ else
+ {
+ assert(_incoming);
+
+ sockaddr_storage peerAddr;
memset(&peerAddr, 0, sizeof(struct sockaddr_storage));
socklen_t len = static_cast<socklen_t>(sizeof(peerAddr));
- assert(_fd != INVALID_SOCKET);
- ret = recvfrom(_fd, reinterpret_cast<char*>(&buf.b[0]), packetSize,
- 0, reinterpret_cast<struct sockaddr*>(&peerAddr), &len);
+
+ ret = recvfrom(_fd, reinterpret_cast<char*>(&buf.b[0]), packetSize, 0,
+ reinterpret_cast<struct sockaddr*>(&peerAddr), &len);
+
if(ret != SOCKET_ERROR)
{
-#ifndef NDEBUG
- bool connected = doConnect(_fd, peerAddr);
- assert(connected);
-#else
- doConnect(_fd, peerAddr);
-#endif
- _connect = false; // We are connected now.
-
- if(_traceLevels->network >= 1)
- {
- Trace out(_logger, _traceLevels->networkCat);
- out << "connected udp socket\n" << toString();
- }
+ _peerAddr = peerAddr;
}
}
- else
- {
- assert(_fd != INVALID_SOCKET);
- ret = ::recv(_fd, reinterpret_cast<char*>(&buf.b[0]), packetSize, 0);
- }
if(ret == SOCKET_ERROR)
{
- if(interrupted())
- {
- goto repeat;
- }
-
- if(wouldBlock())
+ if(recvTruncated())
{
- return false;
+ // The message was truncated and the whole buffer is filled. We ignore
+ // this error here, it will be detected at the connection level when
+ // the Ice message size is checked against the buffer size.
+ ret = buf.b.size();
}
-
- if(recvTruncated())
+ else
{
- DatagramLimitException ex(__FILE__, __LINE__);
- if(_warn)
+ if(interrupted())
{
- Warning out(_logger);
- out << "DatagramLimitException: maximum size of " << packetSize << " exceeded";
+ goto repeat;
+ }
+
+ if(wouldBlock())
+ {
+ return false;
+ }
+
+ if(connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
}
- throw ex;
-
}
+ }
+
+ if(_state == StateNeedConnect)
+ {
+ //
+ // If we must connect, we connect to the first peer that sends us a packet.
+ //
+ assert(_incoming); // Client connections should always be connected at this point.
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+#ifndef NDEBUG
+ bool connected = doConnect(_fd, _peerAddr);
+ assert(connected);
+#else
+ doConnect(_fd, _peerAddr);
+#endif
+ _state = StateConnected;
+
+ if(_traceLevels->network >= 1)
+ {
+ Trace out(_logger, _traceLevels->networkCat);
+ out << "connected udp socket\n" << toString();
+ }
}
if(_traceLevels->network >= 3)
@@ -272,26 +302,40 @@ IceInternal::UdpTransceiver::startWrite(Buffer& buf)
{
assert(buf.i == buf.b.begin());
- //
- // The maximum packetSize is either the maximum allowable UDP
- // packet size, or the UDP send buffer size (which ever is
- // smaller).
- //
- const int packetSize = min(_maxPacketSize, _sndSize - _udpOverhead);
- if(packetSize < static_cast<int>(buf.b.size()))
- {
- //
- // We don't log a warning here because the client gets an exception anyway.
- //
- throw DatagramLimitException(__FILE__, __LINE__);
- }
+ // The caller is supposed to check the send size before by calling checkSendSize
+ assert(min(_maxPacketSize, _sndSize - _udpOverhead) >= static_cast<int>(buf.b.size()));
- assert(!_connect);
assert(_fd != INVALID_SOCKET);
_write.buf.len = static_cast<int>(buf.b.size());
_write.buf.buf = reinterpret_cast<char*>(&*buf.i);
- int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL);
+ int err;
+ if(_state == StateConnected)
+ {
+ err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL);
+ }
+ else
+ {
+ socklen_t len = static_cast<socklen_t>(sizeof(_peerAddr));
+ if(_peerAddr.ss_family == AF_INET)
+ {
+ len = sizeof(sockaddr_in);
+ }
+ else if(_peerAddr.ss_family == AF_INET6)
+ {
+ len = sizeof(sockaddr_in6);
+ }
+ else
+ {
+ // No peer has sent a datagram yet.
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
+ err = WSASendTo(_fd, &_write.buf, 1, &_write.count, 0, reinterpret_cast<struct sockaddr*>(&_peerAddr),
+ len, &_write, NULL);
+ }
+
if(err == SOCKET_ERROR)
{
if(!wouldBlock())
@@ -350,25 +394,7 @@ IceInternal::UdpTransceiver::finishWrite(Buffer& buf)
void
IceInternal::UdpTransceiver::startRead(Buffer& buf)
{
- //
- // The maximum packetSize is either the maximum allowable UDP
- // packet size, or the UDP send buffer size (which ever is
- // smaller).
- //
const int packetSize = min(_maxPacketSize, _rcvSize - _udpOverhead);
- if(packetSize < static_cast<int>(buf.b.size()))
- {
- //
- // We log a warning here because this is the server side -- without the
- // the warning, there would only be silence.
- //
- if(_warn)
- {
- Warning out(_logger);
- out << "DatagramLimitException: maximum size of " << packetSize << " exceeded";
- }
- throw DatagramLimitException(__FILE__, __LINE__);
- }
buf.b.resize(packetSize);
buf.i = buf.b.begin();
@@ -376,10 +402,27 @@ IceInternal::UdpTransceiver::startRead(Buffer& buf)
_read.buf.len = packetSize;
_read.buf.buf = reinterpret_cast<char*>(&*buf.i);
- int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL);
+ int err;
+ if(_state == StateConnected)
+ {
+ err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL);
+ }
+ else
+ {
+ memset(&_readAddr, 0, sizeof(struct sockaddr_storage));
+ _readAddrLen = static_cast<socklen_t>(sizeof(_readAddr));
+
+ err = WSARecvFrom(_fd, &_read.buf, 1, &_read.count, &_read.flags,
+ reinterpret_cast<struct sockaddr*>(&_readAddr), &_readAddrLen, &_read, NULL);
+ }
+
if(err == SOCKET_ERROR)
{
- if(!wouldBlock())
+ if(recvTruncated())
+ {
+ // Nothing to do.
+ }
+ else if(!wouldBlock())
{
if(connectionLost())
{
@@ -403,19 +446,35 @@ IceInternal::UdpTransceiver::finishRead(Buffer& buf)
if(_read.count == SOCKET_ERROR)
{
WSASetLastError(_read.error);
- if(connectionLost())
+
+ if(recvTruncated())
{
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ // The message was truncated and the whole buffer is filled. We ignore
+ // this error here, it will be detected at the connection level when
+ // the Ice message size is checked against the buffer size.
+ _read.count = buf.b.size();
}
else
{
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ if(connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
}
}
+
+ if(_state == StateNotConnected)
+ {
+ _peerAddr = _readAddr;
+ }
if(_traceLevels->network >= 3)
{
@@ -442,14 +501,32 @@ IceInternal::UdpTransceiver::type() const
string
IceInternal::UdpTransceiver::toString() const
{
- if(_mcastAddr.ss_family != AF_UNSPEC && _fd != INVALID_SOCKET)
+ if(_fd == INVALID_SOCKET)
{
- return fdToString(_fd) + "\nmulticast address = " + addrToString(_mcastAddr);
+ return "<closed>";
+ }
+
+ ostringstream s;
+ if(_state == StateNotConnected)
+ {
+ struct sockaddr_storage localAddr;
+ fdToLocalAddress(_fd, localAddr);
+ s << "local address = " << addrToString(localAddr);
+ if(_peerAddr.ss_family != AF_UNSPEC)
+ {
+ s << "\nremote address = " << addrToString(_peerAddr);
+ }
}
else
{
- return fdToString(_fd);
+ s << fdToString(_fd);
+ }
+
+ if(_mcastAddr.ss_family != AF_UNSPEC)
+ {
+ s << "\nmulticast address = " + addrToString(_mcastAddr);
}
+ return s.str();
}
Ice::ConnectionInfoPtr
@@ -457,8 +534,33 @@ IceInternal::UdpTransceiver::getInfo() const
{
assert(_fd != INVALID_SOCKET);
Ice::UDPConnectionInfoPtr info = new Ice::UDPConnectionInfo();
- fdToAddressAndPort(_fd, info->localAddress, info->localPort, info->remoteAddress, info->remotePort);
- addrToAddressAndPort(_mcastAddr, info->mcastAddress, info->mcastPort);
+ if(_state == StateNotConnected)
+ {
+ struct sockaddr_storage localAddr;
+ fdToLocalAddress(_fd, localAddr);
+ addrToAddressAndPort(localAddr, info->localAddress, info->localPort);
+ if(_peerAddr.ss_family != AF_UNSPEC)
+ {
+ addrToAddressAndPort(_peerAddr, info->remoteAddress, info->remotePort);
+ }
+ else
+ {
+ info->remotePort = 0;
+ }
+ }
+ else
+ {
+ fdToAddressAndPort(_fd, info->localAddress, info->localPort, info->remoteAddress, info->remotePort);
+ }
+
+ if(_mcastAddr.ss_family != AF_UNSPEC)
+ {
+ addrToAddressAndPort(_mcastAddr, info->mcastAddress, info->mcastPort);
+ }
+ else
+ {
+ info->mcastPort = 0;
+ }
return info;
}
@@ -469,6 +571,11 @@ IceInternal::UdpTransceiver::checkSendSize(const Buffer& buf, size_t messageSize
{
Ex::throwMemoryLimitException(__FILE__, __LINE__, buf.b.size(), messageSizeMax);
}
+
+ //
+ // The maximum packetSize is either the maximum allowable UDP packet size, or
+ // the UDP send buffer size (which ever is smaller).
+ //
const int packetSize = min(_maxPacketSize, _sndSize - _udpOverhead);
if(packetSize < static_cast<int>(buf.b.size()))
{
@@ -489,24 +596,40 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s
_stats(instance->initializationData().stats),
_incoming(false),
_addr(addr),
- _connect(true),
- _warn(instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0)
+ _state(StateNeedConnect)
#ifdef ICE_USE_IOCP
, _read(SocketOperationRead),
_write(SocketOperationWrite)
#endif
{
- // AF_UNSPEC means not multicast.
- _mcastAddr.ss_family = AF_UNSPEC;
+ _mcastAddr.ss_family = AF_UNSPEC; // AF_UNSPEC means not multicast.
+ _peerAddr.ss_family = AF_UNSPEC; // Not initialized yet.
_fd = createSocket(true, _addr.ss_family);
setBufSize(instance);
setBlock(_fd, false);
+ //
+ // In general, connecting a datagram socket should be non-blocking as this just setups
+ // the default destination address for the socket. However, on some OS, connect sometime
+ // returns EWOULDBLOCK. If that's the case, we keep the state as StateNeedConnect. This
+ // will make sure the transceiver is notified when the socket is ready for sending (see
+ // the initialize() implementation).
+ //
if(doConnect(_fd, _addr))
{
- _connect = false; // We're connected now
+ _state = StateConnected;
}
+
+#ifdef ICE_USE_IOCP
+ //
+ // On Windows when using IOCP, we must make sure that the socket is connected without
+ // blocking as there's no way to do a non-blocking datagram socket conection (ConnectEx
+ // only supports connection oriented sockets). According to Microsoft documentation of
+ // the connect() call, this should always be the case.
+ //
+ assert(_state == StateConnected);
+#endif
if(isMulticast(_addr))
{
@@ -528,8 +651,7 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s
_stats(instance->initializationData().stats),
_incoming(true),
_addr(getAddressForServer(host, port, instance->protocolSupport())),
- _connect(connect),
- _warn(instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0)
+ _state(connect ? StateNeedConnect : StateNotConnected)
#ifdef ICE_USE_IOCP
, _read(SocketOperationRead),
_write(SocketOperationWrite)
@@ -544,6 +666,8 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s
out << "attempting to bind to udp socket " << addrToString(_addr);
}
+ _peerAddr.ss_family = AF_UNSPEC; // Not assigned yet.
+
if(isMulticast(_addr))
{
setReuseAddress(_fd, true);
@@ -552,11 +676,13 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s
#ifdef _WIN32
//
// Windows does not allow binding to the mcast address itself
- // so we bind to INADDR_ANY (0.0.0.0) instead.
+ // so we bind to INADDR_ANY (0.0.0.0) instead. As a result,
+ // bi-directional connection won't work because the source
+ // address won't be the multicast address and the client will
+ // therefore reject the datagram.
//
- const_cast<struct sockaddr_storage&>(_addr) =
- getAddressForServer("", getPort(_mcastAddr),
- _mcastAddr.ss_family == AF_INET ? EnableIPv4 : EnableIPv6);
+ const_cast<struct sockaddr_storage&>(_addr) =
+ getAddressForServer("", port, _mcastAddr.ss_family == AF_INET ? EnableIPv4 : EnableIPv6);
#endif
const_cast<struct sockaddr_storage&>(_addr) = doBind(_fd, _addr);
@@ -585,8 +711,7 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s
#endif
const_cast<struct sockaddr_storage&>(_addr) = doBind(_fd, _addr);
- // AF_UNSPEC means not multicast.
- _mcastAddr.ss_family = AF_UNSPEC;
+ _mcastAddr.ss_family = AF_UNSPEC; // AF_UNSPEC means not multicast.
}
if(_traceLevels->network >= 1)
@@ -645,7 +770,7 @@ IceInternal::UdpTransceiver::setBufSize(const InstancePtr& instance)
// Get property for buffer size and check for sanity.
//
Int sizeRequested = instance->initializationData().properties->getPropertyAsIntWithDefault(prop, dfltSize);
- if(sizeRequested < _udpOverhead)
+ if(sizeRequested < (_udpOverhead + headerSize))
{
Warning out(_logger);
out << "Invalid " << prop << " value of " << sizeRequested << " adjusted to " << dfltSize;