diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-10-27 12:00:32 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-10-27 12:00:32 +0100 |
commit | 1fdb973182e589b0d20e987360bd694ae783b0a2 (patch) | |
tree | a94cbbcc0d078bcf7aa4427951799e09feb80826 /cpp/src/Ice/UdpTransceiver.cpp | |
parent | add support for number protocol in Python (diff) | |
download | ice-1fdb973182e589b0d20e987360bd694ae783b0a2.tar.bz2 ice-1fdb973182e589b0d20e987360bd694ae783b0a2.tar.xz ice-1fdb973182e589b0d20e987360bd694ae783b0a2.zip |
Cleaned up UDP transceivers, fixes for bug 4223 and 4320
Diffstat (limited to 'cpp/src/Ice/UdpTransceiver.cpp')
-rw-r--r-- | cpp/src/Ice/UdpTransceiver.cpp | 431 |
1 files changed, 278 insertions, 153 deletions
diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp index a2cdded6f8a..d185c44cf9e 100644 --- a/cpp/src/Ice/UdpTransceiver.cpp +++ b/cpp/src/Ice/UdpTransceiver.cpp @@ -58,34 +58,45 @@ IceInternal::UdpTransceiver::getAsyncInfo(SocketOperation status) SocketOperation IceInternal::UdpTransceiver::initialize() { - if(!_incoming) + if(_state == StateNeedConnect) { - if(_connect) + _state = StateConnectPending; + return SocketOperationConnect; + } + else if(_state <= StateConnectPending) + { + try { - // - // If we're not connected yet, return SocketOperationConnect. The transceiver will be - // connected once initialize is called again. - // - _connect = false; - return SocketOperationConnect; + doFinishConnect(_fd); + _state = StateConnected; } - else + catch(const Ice::LocalException& ex) { - if(_traceLevels->network >= 1) + if(_traceLevels->network >= 2) { Trace out(_logger, _traceLevels->networkCat); - out << "starting to send udp packets\n" << toString(); + out << "failed to connect udp socket\n" << toString() << "\n" << ex; } - return SocketOperationNone; + throw; + } + } + + if(_state == StateConnected) + { + if(_traceLevels->network >= 1) + { + Trace out(_logger, _traceLevels->networkCat); + out << "starting to send udp packets\n" << toString(); } } + assert(_state >= StateConnected); return SocketOperationNone; } void IceInternal::UdpTransceiver::close() { - if(!_connect && _traceLevels->network >= 1) + if(_state >= StateConnected && _traceLevels->network >= 1) { Trace out(_logger, _traceLevels->networkCat); out << "closing udp connection\n" << toString(); @@ -100,28 +111,49 @@ bool IceInternal::UdpTransceiver::write(Buffer& buf) { assert(buf.i == buf.b.begin()); - // - // The maximum packetSize is either the maximum allowable UDP - // packet size, or the UDP send buffer size (which ever is - // smaller). - // - const int packetSize = min(_maxPacketSize, _sndSize - _udpOverhead); - if(packetSize < static_cast<int>(buf.b.size())) + assert(_fd != INVALID_SOCKET && _state >= StateConnected); + + // The caller is supposed to check the send size before by calling checkSendSize + assert(min(_maxPacketSize, _sndSize - _udpOverhead) >= static_cast<int>(buf.b.size())); + +repeat: + + ssize_t ret; + if(_state == StateConnected) { - // - // We don't log a warning here because the client gets an exception anyway. - // - throw DatagramLimitException(__FILE__, __LINE__); +#ifdef _WIN32 + ret = ::send(_fd, reinterpret_cast<const char*>(&buf.b[0]), static_cast<int>(buf.b.size()), 0); +#else + ret = ::send(_fd, reinterpret_cast<const char*>(&buf.b[0]), buf.b.size(), 0); +#endif } + else + { + socklen_t len = static_cast<socklen_t>(sizeof(_peerAddr)); + if(_peerAddr.ss_family == AF_INET) + { + len = sizeof(sockaddr_in); + } + else if(_peerAddr.ss_family == AF_INET6) + { + len = sizeof(sockaddr_in6); + } + else + { + // No peer has sent a datagram yet. + SocketException ex(__FILE__, __LINE__); + ex.error = 0; + throw ex; + } -repeat: - assert(!_connect); - assert(_fd != INVALID_SOCKET); #ifdef _WIN32 - ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&buf.b[0]), static_cast<int>(buf.b.size()), 0); + ret = ::sendto(_fd, reinterpret_cast<const char*>(&buf.b[0]), static_cast<int>(buf.b.size()), 0, + reinterpret_cast<struct sockaddr*>(&_peerAddr), len); #else - ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&buf.b[0]), buf.b.size(), 0); + ret = ::sendto(_fd, reinterpret_cast<const char*>(&buf.b[0]), buf.b.size(), 0, + reinterpret_cast<struct sockaddr*>(&_peerAddr), len); #endif + } if(ret == SOCKET_ERROR) { @@ -160,94 +192,92 @@ bool IceInternal::UdpTransceiver::read(Buffer& buf) { assert(buf.i == buf.b.begin()); + assert(_fd != INVALID_SOCKET); - // - // The maximum packetSize is either the maximum allowable UDP - // packet size, or the UDP send buffer size (which ever is - // smaller). - // const int packetSize = min(_maxPacketSize, _rcvSize - _udpOverhead); - if(packetSize < static_cast<int>(buf.b.size())) - { - // - // We log a warning here because this is the server side -- without the - // the warning, there would only be silence. - // - if(_warn) - { - Warning out(_logger); - out << "DatagramLimitException: maximum size of " << packetSize << " exceeded"; - } - throw DatagramLimitException(__FILE__, __LINE__); - } buf.b.resize(packetSize); buf.i = buf.b.begin(); repeat: ssize_t ret; - if(_connect) + if(_state == StateConnected) { - // - // If we must connect, then we connect to the first peer that - // sends us a packet. - // - struct sockaddr_storage peerAddr; + ret = ::recv(_fd, reinterpret_cast<char*>(&buf.b[0]), packetSize, 0); + } + else + { + assert(_incoming); + + sockaddr_storage peerAddr; memset(&peerAddr, 0, sizeof(struct sockaddr_storage)); socklen_t len = static_cast<socklen_t>(sizeof(peerAddr)); - assert(_fd != INVALID_SOCKET); - ret = recvfrom(_fd, reinterpret_cast<char*>(&buf.b[0]), packetSize, - 0, reinterpret_cast<struct sockaddr*>(&peerAddr), &len); + + ret = recvfrom(_fd, reinterpret_cast<char*>(&buf.b[0]), packetSize, 0, + reinterpret_cast<struct sockaddr*>(&peerAddr), &len); + if(ret != SOCKET_ERROR) { -#ifndef NDEBUG - bool connected = doConnect(_fd, peerAddr); - assert(connected); -#else - doConnect(_fd, peerAddr); -#endif - _connect = false; // We are connected now. - - if(_traceLevels->network >= 1) - { - Trace out(_logger, _traceLevels->networkCat); - out << "connected udp socket\n" << toString(); - } + _peerAddr = peerAddr; } } - else - { - assert(_fd != INVALID_SOCKET); - ret = ::recv(_fd, reinterpret_cast<char*>(&buf.b[0]), packetSize, 0); - } if(ret == SOCKET_ERROR) { - if(interrupted()) - { - goto repeat; - } - - if(wouldBlock()) + if(recvTruncated()) { - return false; + // The message was truncated and the whole buffer is filled. We ignore + // this error here, it will be detected at the connection level when + // the Ice message size is checked against the buffer size. + ret = buf.b.size(); } - - if(recvTruncated()) + else { - DatagramLimitException ex(__FILE__, __LINE__); - if(_warn) + if(interrupted()) { - Warning out(_logger); - out << "DatagramLimitException: maximum size of " << packetSize << " exceeded"; + goto repeat; + } + + if(wouldBlock()) + { + return false; + } + + if(connectionLost()) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; } - throw ex; - } + } + + if(_state == StateNeedConnect) + { + // + // If we must connect, we connect to the first peer that sends us a packet. + // + assert(_incoming); // Client connections should always be connected at this point. - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; +#ifndef NDEBUG + bool connected = doConnect(_fd, _peerAddr); + assert(connected); +#else + doConnect(_fd, _peerAddr); +#endif + _state = StateConnected; + + if(_traceLevels->network >= 1) + { + Trace out(_logger, _traceLevels->networkCat); + out << "connected udp socket\n" << toString(); + } } if(_traceLevels->network >= 3) @@ -272,26 +302,40 @@ IceInternal::UdpTransceiver::startWrite(Buffer& buf) { assert(buf.i == buf.b.begin()); - // - // The maximum packetSize is either the maximum allowable UDP - // packet size, or the UDP send buffer size (which ever is - // smaller). - // - const int packetSize = min(_maxPacketSize, _sndSize - _udpOverhead); - if(packetSize < static_cast<int>(buf.b.size())) - { - // - // We don't log a warning here because the client gets an exception anyway. - // - throw DatagramLimitException(__FILE__, __LINE__); - } + // The caller is supposed to check the send size before by calling checkSendSize + assert(min(_maxPacketSize, _sndSize - _udpOverhead) >= static_cast<int>(buf.b.size())); - assert(!_connect); assert(_fd != INVALID_SOCKET); _write.buf.len = static_cast<int>(buf.b.size()); _write.buf.buf = reinterpret_cast<char*>(&*buf.i); - int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL); + int err; + if(_state == StateConnected) + { + err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL); + } + else + { + socklen_t len = static_cast<socklen_t>(sizeof(_peerAddr)); + if(_peerAddr.ss_family == AF_INET) + { + len = sizeof(sockaddr_in); + } + else if(_peerAddr.ss_family == AF_INET6) + { + len = sizeof(sockaddr_in6); + } + else + { + // No peer has sent a datagram yet. + SocketException ex(__FILE__, __LINE__); + ex.error = 0; + throw ex; + } + err = WSASendTo(_fd, &_write.buf, 1, &_write.count, 0, reinterpret_cast<struct sockaddr*>(&_peerAddr), + len, &_write, NULL); + } + if(err == SOCKET_ERROR) { if(!wouldBlock()) @@ -350,25 +394,7 @@ IceInternal::UdpTransceiver::finishWrite(Buffer& buf) void IceInternal::UdpTransceiver::startRead(Buffer& buf) { - // - // The maximum packetSize is either the maximum allowable UDP - // packet size, or the UDP send buffer size (which ever is - // smaller). - // const int packetSize = min(_maxPacketSize, _rcvSize - _udpOverhead); - if(packetSize < static_cast<int>(buf.b.size())) - { - // - // We log a warning here because this is the server side -- without the - // the warning, there would only be silence. - // - if(_warn) - { - Warning out(_logger); - out << "DatagramLimitException: maximum size of " << packetSize << " exceeded"; - } - throw DatagramLimitException(__FILE__, __LINE__); - } buf.b.resize(packetSize); buf.i = buf.b.begin(); @@ -376,10 +402,27 @@ IceInternal::UdpTransceiver::startRead(Buffer& buf) _read.buf.len = packetSize; _read.buf.buf = reinterpret_cast<char*>(&*buf.i); - int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL); + int err; + if(_state == StateConnected) + { + err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL); + } + else + { + memset(&_readAddr, 0, sizeof(struct sockaddr_storage)); + _readAddrLen = static_cast<socklen_t>(sizeof(_readAddr)); + + err = WSARecvFrom(_fd, &_read.buf, 1, &_read.count, &_read.flags, + reinterpret_cast<struct sockaddr*>(&_readAddr), &_readAddrLen, &_read, NULL); + } + if(err == SOCKET_ERROR) { - if(!wouldBlock()) + if(recvTruncated()) + { + // Nothing to do. + } + else if(!wouldBlock()) { if(connectionLost()) { @@ -403,19 +446,35 @@ IceInternal::UdpTransceiver::finishRead(Buffer& buf) if(_read.count == SOCKET_ERROR) { WSASetLastError(_read.error); - if(connectionLost()) + + if(recvTruncated()) { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; + // The message was truncated and the whole buffer is filled. We ignore + // this error here, it will be detected at the connection level when + // the Ice message size is checked against the buffer size. + _read.count = buf.b.size(); } else { - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; + if(connectionLost()) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } } } + + if(_state == StateNotConnected) + { + _peerAddr = _readAddr; + } if(_traceLevels->network >= 3) { @@ -442,14 +501,32 @@ IceInternal::UdpTransceiver::type() const string IceInternal::UdpTransceiver::toString() const { - if(_mcastAddr.ss_family != AF_UNSPEC && _fd != INVALID_SOCKET) + if(_fd == INVALID_SOCKET) { - return fdToString(_fd) + "\nmulticast address = " + addrToString(_mcastAddr); + return "<closed>"; + } + + ostringstream s; + if(_state == StateNotConnected) + { + struct sockaddr_storage localAddr; + fdToLocalAddress(_fd, localAddr); + s << "local address = " << addrToString(localAddr); + if(_peerAddr.ss_family != AF_UNSPEC) + { + s << "\nremote address = " << addrToString(_peerAddr); + } } else { - return fdToString(_fd); + s << fdToString(_fd); + } + + if(_mcastAddr.ss_family != AF_UNSPEC) + { + s << "\nmulticast address = " + addrToString(_mcastAddr); } + return s.str(); } Ice::ConnectionInfoPtr @@ -457,8 +534,33 @@ IceInternal::UdpTransceiver::getInfo() const { assert(_fd != INVALID_SOCKET); Ice::UDPConnectionInfoPtr info = new Ice::UDPConnectionInfo(); - fdToAddressAndPort(_fd, info->localAddress, info->localPort, info->remoteAddress, info->remotePort); - addrToAddressAndPort(_mcastAddr, info->mcastAddress, info->mcastPort); + if(_state == StateNotConnected) + { + struct sockaddr_storage localAddr; + fdToLocalAddress(_fd, localAddr); + addrToAddressAndPort(localAddr, info->localAddress, info->localPort); + if(_peerAddr.ss_family != AF_UNSPEC) + { + addrToAddressAndPort(_peerAddr, info->remoteAddress, info->remotePort); + } + else + { + info->remotePort = 0; + } + } + else + { + fdToAddressAndPort(_fd, info->localAddress, info->localPort, info->remoteAddress, info->remotePort); + } + + if(_mcastAddr.ss_family != AF_UNSPEC) + { + addrToAddressAndPort(_mcastAddr, info->mcastAddress, info->mcastPort); + } + else + { + info->mcastPort = 0; + } return info; } @@ -469,6 +571,11 @@ IceInternal::UdpTransceiver::checkSendSize(const Buffer& buf, size_t messageSize { Ex::throwMemoryLimitException(__FILE__, __LINE__, buf.b.size(), messageSizeMax); } + + // + // The maximum packetSize is either the maximum allowable UDP packet size, or + // the UDP send buffer size (which ever is smaller). + // const int packetSize = min(_maxPacketSize, _sndSize - _udpOverhead); if(packetSize < static_cast<int>(buf.b.size())) { @@ -489,24 +596,40 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s _stats(instance->initializationData().stats), _incoming(false), _addr(addr), - _connect(true), - _warn(instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0) + _state(StateNeedConnect) #ifdef ICE_USE_IOCP , _read(SocketOperationRead), _write(SocketOperationWrite) #endif { - // AF_UNSPEC means not multicast. - _mcastAddr.ss_family = AF_UNSPEC; + _mcastAddr.ss_family = AF_UNSPEC; // AF_UNSPEC means not multicast. + _peerAddr.ss_family = AF_UNSPEC; // Not initialized yet. _fd = createSocket(true, _addr.ss_family); setBufSize(instance); setBlock(_fd, false); + // + // In general, connecting a datagram socket should be non-blocking as this just setups + // the default destination address for the socket. However, on some OS, connect sometime + // returns EWOULDBLOCK. If that's the case, we keep the state as StateNeedConnect. This + // will make sure the transceiver is notified when the socket is ready for sending (see + // the initialize() implementation). + // if(doConnect(_fd, _addr)) { - _connect = false; // We're connected now + _state = StateConnected; } + +#ifdef ICE_USE_IOCP + // + // On Windows when using IOCP, we must make sure that the socket is connected without + // blocking as there's no way to do a non-blocking datagram socket conection (ConnectEx + // only supports connection oriented sockets). According to Microsoft documentation of + // the connect() call, this should always be the case. + // + assert(_state == StateConnected); +#endif if(isMulticast(_addr)) { @@ -528,8 +651,7 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s _stats(instance->initializationData().stats), _incoming(true), _addr(getAddressForServer(host, port, instance->protocolSupport())), - _connect(connect), - _warn(instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0) + _state(connect ? StateNeedConnect : StateNotConnected) #ifdef ICE_USE_IOCP , _read(SocketOperationRead), _write(SocketOperationWrite) @@ -544,6 +666,8 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s out << "attempting to bind to udp socket " << addrToString(_addr); } + _peerAddr.ss_family = AF_UNSPEC; // Not assigned yet. + if(isMulticast(_addr)) { setReuseAddress(_fd, true); @@ -552,11 +676,13 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s #ifdef _WIN32 // // Windows does not allow binding to the mcast address itself - // so we bind to INADDR_ANY (0.0.0.0) instead. + // so we bind to INADDR_ANY (0.0.0.0) instead. As a result, + // bi-directional connection won't work because the source + // address won't be the multicast address and the client will + // therefore reject the datagram. // - const_cast<struct sockaddr_storage&>(_addr) = - getAddressForServer("", getPort(_mcastAddr), - _mcastAddr.ss_family == AF_INET ? EnableIPv4 : EnableIPv6); + const_cast<struct sockaddr_storage&>(_addr) = + getAddressForServer("", port, _mcastAddr.ss_family == AF_INET ? EnableIPv4 : EnableIPv6); #endif const_cast<struct sockaddr_storage&>(_addr) = doBind(_fd, _addr); @@ -585,8 +711,7 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s #endif const_cast<struct sockaddr_storage&>(_addr) = doBind(_fd, _addr); - // AF_UNSPEC means not multicast. - _mcastAddr.ss_family = AF_UNSPEC; + _mcastAddr.ss_family = AF_UNSPEC; // AF_UNSPEC means not multicast. } if(_traceLevels->network >= 1) @@ -645,7 +770,7 @@ IceInternal::UdpTransceiver::setBufSize(const InstancePtr& instance) // Get property for buffer size and check for sanity. // Int sizeRequested = instance->initializationData().properties->getPropertyAsIntWithDefault(prop, dfltSize); - if(sizeRequested < _udpOverhead) + if(sizeRequested < (_udpOverhead + headerSize)) { Warning out(_logger); out << "Invalid " << prop << " value of " << sizeRequested << " adjusted to " << dfltSize; |