// ********************************************************************** // // Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. // // This copy of Ice-E is licensed to you under the terms described in the // ICEE_LICENSE file included in this distribution. // // ********************************************************************** #include #include #include #include #include #include #include #include using namespace std; using namespace Ice; using namespace IceInternal; void IceInternal::incRef(Transceiver* p) { p->__incRef(); } void IceInternal::decRef(Transceiver* p) { p->__decRef(); } void IceInternal::Transceiver::setTimeouts(int readTimeout, int writeTimeout) { _readTimeout = readTimeout; #ifndef ICEE_USE_SELECT_FOR_TIMEOUTS setTimeout(_fd, true, _readTimeout); #endif _writeTimeout = writeTimeout; #ifndef ICEE_USE_SELECT_FOR_TIMEOUTS setTimeout(_fd, false, _writeTimeout); #endif } SOCKET IceInternal::Transceiver::fd() { assert(_fd != INVALID_SOCKET); return _fd; } void IceInternal::Transceiver::close() { if(_traceLevels->network >= 1) { Trace out(_logger, _traceLevels->networkCat); out << "closing tcp connection\n" << toString(); } #ifdef ICEE_USE_SELECT_FOR_TIMEOUTS #ifdef _WIN32 assert(_event != 0); WSACloseEvent(_event); WSACloseEvent(_readEvent); WSACloseEvent(_writeEvent); _event = 0; _readEvent = 0; _writeEvent = 0; #endif #endif assert(_fd != INVALID_SOCKET); try { closeSocket(_fd); _fd = INVALID_SOCKET; } catch(const SocketException&) { _fd = INVALID_SOCKET; throw; } } void IceInternal::Transceiver::shutdownWrite() { if(_traceLevels->network >= 2) { Trace out(_logger, _traceLevels->networkCat); out << "shutting down tcp connection for writing\n" << toString(); } assert(_fd != INVALID_SOCKET); shutdownSocketWrite(_fd); } void IceInternal::Transceiver::shutdownReadWrite() { if(_traceLevels->network >= 2) { Trace out(_logger, _traceLevels->networkCat); out << "shutting down tcp connection for reading and writing\n" << toString(); } assert(_fd != INVALID_SOCKET); shutdownSocketReadWrite(_fd); } void IceInternal::Transceiver::writeWithTimeout(Buffer& buf, int timeout) { Buffer::Container::difference_type packetSize = static_cast(buf.b.end() - buf.i); #ifdef _WIN32 // // Limit packet size to avoid performance problems on WIN32 // if(_isPeerLocal && packetSize > 64 * 1024) { packetSize = 64 * 1024; } #endif #ifndef ICEE_USE_SELECT_FOR_TIMEOUTS if(timeout > 0 && timeout != _writeTimeout) { setTimeout(_fd, false, timeout); } try { #endif while(buf.i != buf.b.end()) { repeatSend: assert(_fd != INVALID_SOCKET); ssize_t ret = ::send(_fd, reinterpret_cast(&*buf.i), packetSize, 0); if(ret == 0) { ConnectionLostException ex(__FILE__, __LINE__); ex.error = 0; throw ex; } if(ret == SOCKET_ERROR) { if(interrupted()) { goto repeatSend; } if(noBuffers() && packetSize > 1024) { packetSize /= 2; goto repeatSend; } #ifndef ICEE_USE_SELECT_FOR_TIMEOUTS if(timedout()) { throw TimeoutException(__FILE__, __LINE__); } #else if(wouldBlock()) { doSelect(false, timeout > 0 ? timeout : _writeTimeout); continue; } #endif if(connectionLost()) { ConnectionLostException ex(__FILE__, __LINE__); ex.error = getSocketErrno(); throw ex; } else { SocketException ex(__FILE__, __LINE__); ex.error = getSocketErrno(); throw ex; } } if(_traceLevels->network >= 3) { Trace out(_logger, _traceLevels->networkCat); out << Ice::printfToString("sent %d of %d", ret, packetSize) << " bytes via tcp\n" << toString(); } buf.i += ret; if(packetSize > buf.b.end() - buf.i) { packetSize = static_cast(buf.b.end() - buf.i); } } #ifndef ICEE_USE_SELECT_FOR_TIMEOUTS } catch(const Ice::LocalException&) { if(timeout > 0 && timeout != _writeTimeout) { try { setTimeout(_fd, false, _writeTimeout); } catch(const Ice::LocalException&) { // IGNORE } } throw; } if(timeout > 0 && timeout != _writeTimeout) { try { setTimeout(_fd, false, _writeTimeout); } catch(const Ice::LocalException&) { // IGNORE } } #endif } void IceInternal::Transceiver::readWithTimeout(Buffer& buf, int timeout) { assert(timeout != 0); Buffer::Container::difference_type packetSize = static_cast(buf.b.end() - buf.i); #ifndef ICEE_USE_SELECT_FOR_TIMEOUTS if(timeout > 0 && timeout != _readTimeout) { setTimeout(_fd, true, timeout); } try { #endif while(buf.i != buf.b.end()) { repeatRead: assert(_fd != INVALID_SOCKET); ssize_t ret = ::recv(_fd, reinterpret_cast(&*buf.i), packetSize, 0); if(ret == 0) { // // If the connection is lost when reading data, we shut // down the write end of the socket. This helps to unblock // threads that are stuck in send() or select() while // sending data. Note: I don't really understand why // send() or select() sometimes don't detect a connection // loss. Therefore this helper to make them detect it. // //assert(_fd != INVALID_SOCKET); //shutdownSocketReadWrite(_fd); ConnectionLostException ex(__FILE__, __LINE__); ex.error = 0; throw ex; } if(ret == SOCKET_ERROR) { if(interrupted()) { goto repeatRead; } if(noBuffers() && packetSize > 1024) { packetSize /= 2; goto repeatRead; } #ifndef ICEE_USE_SELECT_FOR_TIMEOUTS if(timedout()) { throw TimeoutException(__FILE__, __LINE__); } #else if(wouldBlock()) { doSelect(true, timeout > 0 ? timeout : _readTimeout); continue; } #endif if(connectionLost()) { // // See the commment above about shutting down the // socket if the connection is lost while reading // data. // //assert(_fd != INVALID_SOCKET); //shutdownSocketReadWrite(_fd); ConnectionLostException ex(__FILE__, __LINE__); ex.error = getSocketErrno(); throw ex; } else { SocketException ex(__FILE__, __LINE__); ex.error = getSocketErrno(); throw ex; } } if(_traceLevels->network >= 3) { Trace out(_logger, _traceLevels->networkCat); out << Ice::printfToString("received %d of %d", ret, packetSize) << " bytes via tcp\n" << toString(); } buf.i += ret; if(packetSize > buf.b.end() - buf.i) { packetSize = static_cast(buf.b.end() - buf.i); } } #ifndef ICEE_USE_SELECT_FOR_TIMEOUTS } catch(const Ice::LocalException&) { if(timeout > 0 && timeout != _readTimeout) { try { setTimeout(_fd, true, _readTimeout); } catch(const Ice::LocalException&) { // IGNORE } } throw; } if(timeout > 0 && timeout != _readTimeout) { try { setTimeout(_fd, true, _readTimeout); } catch(const Ice::LocalException&) { // IGNORE } } #endif } string IceInternal::Transceiver::type() const { return "tcp"; } string IceInternal::Transceiver::toString() const { return _desc; } IceInternal::Transceiver::Transceiver(const InstancePtr& instance, SOCKET fd) : _traceLevels(instance->traceLevels()), _logger(instance->initializationData().logger), _fd(fd), _readTimeout(-1), _writeTimeout(-1), _desc(fdToString(fd)) #ifdef _WIN32 , _isPeerLocal(isPeerLocal(fd)) #endif { #ifdef ICEE_USE_SELECT_FOR_TIMEOUTS #ifdef _WIN32 _event = WSACreateEvent(); _readEvent = WSACreateEvent(); _writeEvent = WSACreateEvent(); if(_event == 0 || _readEvent == 0 || _writeEvent == 0) { int error = WSAGetLastError(); if(_event != 0) { WSACloseEvent(_event); } if(_readEvent != 0) { WSACloseEvent(_readEvent); } if(_writeEvent != 0) { WSACloseEvent(_writeEvent); } closeSocket(_fd); SocketException ex(__FILE__, __LINE__); ex.error = error; throw ex; } // // Select the READ, WRITE and CLOSE for trigging. // if(WSAEventSelect(_fd, _event, FD_READ|FD_WRITE|FD_CLOSE) == SOCKET_ERROR) { int error = WSAGetLastError(); WSACloseEvent(_event); WSACloseEvent(_readEvent); WSACloseEvent(_writeEvent); closeSocket(_fd); SocketException ex(__FILE__, __LINE__); ex.error = error; throw ex; } #else FD_ZERO(&_wFdSet); FD_ZERO(&_rFdSet); #endif #endif } IceInternal::Transceiver::~Transceiver() { assert(_fd == INVALID_SOCKET); #ifdef ICEE_USE_SELECT_FOR_TIMEOUTS #ifdef _WIN32 assert(_event == 0); assert(_readEvent == 0); assert(_writeEvent == 0); #endif #endif } #ifdef ICEE_USE_SELECT_FOR_TIMEOUTS void IceInternal::Transceiver::doSelect(bool read, int timeout) { while(true) { #ifdef _WIN32 // // This code is basically the same as the code in // ::send above. Check that for detailed comments. // WSAEVENT events[2]; events[0] = _event; events[1] = read ? _readEvent : _writeEvent; long tout = (timeout >= 0) ? timeout : WSA_INFINITE; DWORD rc = WSAWaitForMultipleEvents(2, events, FALSE, tout, FALSE); if(rc == WSA_WAIT_FAILED) { SocketException ex(__FILE__, __LINE__); ex.error = WSAGetLastError(); throw ex; } if(rc == WSA_WAIT_TIMEOUT) { assert(timeout >= 0); throw TimeoutException(__FILE__, __LINE__); } if(rc == WSA_WAIT_EVENT_0) { WSANETWORKEVENTS nevents; if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR) { SocketException ex(__FILE__, __LINE__); ex.error = WSAGetLastError(); throw ex; } // // If we're selecting for reading and have consumed a WRITE // event, set the _writeEvent event. Otherwise, if we're // selecting for writing have consumed a READ event, set the // _readEvent event. // if(read && nevents.lNetworkEvents & FD_WRITE) { WSASetEvent(_writeEvent); } else if(!read && nevents.lNetworkEvents & FD_READ) { WSASetEvent(_readEvent); } // // This checks for an error on the fd (this would // be same as recv itself returning an error). In // the event of an error we set the error code, // and repeat the error handling. // if(read && nevents.lNetworkEvents & FD_READ && nevents.iErrorCode[FD_READ_BIT] != 0) { WSASetLastError(nevents.iErrorCode[FD_READ_BIT]); } else if(!read && nevents.lNetworkEvents & FD_WRITE && nevents.iErrorCode[FD_WRITE_BIT] != 0) { WSASetLastError(nevents.iErrorCode[FD_WRITE_BIT]); } else if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0) { WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]); } else { return; // No errors: we're done. } if(interrupted()) { continue; } if(connectionLost()) { // // See the commment above about shutting down the // socket if the connection is lost while reading // data. // //assert(_fd != INVALID_SOCKET); //shutdownSocketReadWrite(_fd); ConnectionLostException ex(__FILE__, __LINE__); ex.error = getSocketErrno(); throw ex; } else { SocketException ex(__FILE__, __LINE__); ex.error = getSocketErrno(); throw ex; } } else { // // Otherwise the _readEvent or _writeEvent is set, reset it. // if(read) { WSAResetEvent(_readEvent); } else { WSAResetEvent(_writeEvent); } return; } #else int rs; assert(_fd != INVALID_SOCKET); if(read) { FD_SET(_fd, &_rFdSet); } else { FD_SET(_fd, &_wFdSet); } if(timeout >= 0) { struct timeval tv; tv.tv_sec = timeout / 1000; tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000; rs = ::select(_fd + 1, read ? &_rFdSet : 0, read ? 0 : &_wFdSet, 0, &tv); } else { rs = ::select(_fd + 1, read ? &_rFdSet : 0, read ? 0 : &_wFdSet, 0, 0); } if(rs == SOCKET_ERROR) { if(interrupted()) { continue; } SocketException ex(__FILE__, __LINE__); ex.error = getSocketErrno(); throw ex; } if(rs == 0) { throw TimeoutException(__FILE__, __LINE__); } return; #endif } } #endif