diff options
Diffstat (limited to 'cpp/src/Ice/TcpAcceptor.cpp')
-rwxr-xr-x[-rw-r--r--] | cpp/src/Ice/TcpAcceptor.cpp | 172 |
1 files changed, 146 insertions, 26 deletions
diff --git a/cpp/src/Ice/TcpAcceptor.cpp b/cpp/src/Ice/TcpAcceptor.cpp index 8ca874cc218..e20ad639d91 100644..100755 --- a/cpp/src/Ice/TcpAcceptor.cpp +++ b/cpp/src/Ice/TcpAcceptor.cpp @@ -17,8 +17,21 @@ #include <Ice/StreamSocket.h> #include <IceUtil/StringUtil.h> -#ifdef ICE_USE_IOCP +#if defined(ICE_USE_IOCP) # include <Mswsock.h> +#elif defined(ICE_OS_WINRT) +using namespace Platform; +using namespace Windows::Foundation; +using namespace Windows::Storage::Streams; +using namespace Windows::Networking; +using namespace Windows::Networking::Sockets; +#endif + +// +// Use the system default for the listen() backlog or 511 if not defined. +// +#ifndef SOMAXCONN +# define SOMAXCONN 511 #endif using namespace std; @@ -33,22 +46,28 @@ IceInternal::TcpAcceptor::getNativeInfo() return this; } -#ifdef ICE_USE_IOCP -AsyncInfo* -# ifndef NDEBUG -IceInternal::TcpAcceptor::getAsyncInfo(SocketOperation op) -# else -IceInternal::TcpAcceptor::getAsyncInfo(SocketOperation) -# endif -{ - assert(op == SocketOperationRead); - return &_info; -} -#endif - void IceInternal::TcpAcceptor::close() { +#if defined(ICE_OS_WINRT) + IceUtil::Mutex::Lock lock(_mutex); + if(_acceptPending) + { + assert(_accepted.empty()); + completed(SocketOperationRead); + _acceptPending = false; + } + else if(!_accepted.empty()) + { + for(deque<Windows::Networking::Sockets::StreamSocket^>::const_iterator p = _accepted.begin(); + p != _accepted.end(); ++p) + { + closeSocket(*p); + } + _accepted.clear(); + } +#endif + if(_fd != INVALID_SOCKET) { closeSocketNoThrow(_fd); @@ -62,7 +81,9 @@ IceInternal::TcpAcceptor::listen() try { const_cast<Address&>(_addr) = doBind(_fd, _addr); +#if !defined(ICE_OS_WINRT) doListen(_fd, _backlog); +#endif } catch(...) { @@ -73,7 +94,14 @@ IceInternal::TcpAcceptor::listen() return _endpoint; } -#ifdef ICE_USE_IOCP +#if defined(ICE_USE_IOCP) + +AsyncInfo* +IceInternal::TcpAcceptor::getAsyncInfo(SocketOperation) +{ + return &_info; +} + void IceInternal::TcpAcceptor::startAccept() { @@ -120,12 +148,9 @@ IceInternal::TcpAcceptor::finishAccept() } } -#endif - TransceiverPtr IceInternal::TcpAcceptor::accept() { -#ifdef ICE_USE_IOCP if(_acceptFd == INVALID_SOCKET) { SocketException ex(__FILE__, __LINE__); @@ -144,13 +169,82 @@ IceInternal::TcpAcceptor::accept() SOCKET fd = _acceptFd; _acceptFd = INVALID_SOCKET; + return new TcpTransceiver(_instance, new StreamSocket(_instance, fd)); +} + +#elif defined(ICE_OS_WINRT) + +AsyncInfo* +IceInternal::TcpAcceptor::getAsyncInfo(SocketOperation) +{ + return 0; // Not used +} + +void +IceInternal::TcpAcceptor::startAccept() +{ + assert(_fd != INVALID_SOCKET); + + // + // If there are already sockets waiting to be accepted, we just + // notify the selector that the acceptor is ready for acceting the + // new socket. Otherwise, we set the _acceptPending flag, when a + // new socket connection event is received, the message handler + // will notify the selector. + // + IceUtil::Mutex::Lock lock(_mutex); + assert(!_acceptPending); + if(!_accepted.empty()) + { + completed(SocketOperationRead); + } + else + { + _acceptPending = true; + } +} + +void +IceInternal::TcpAcceptor::finishAccept() +{ + // + // Nothing to do, we just check there's at least one accepted + // socket or the acceptor was closed. + // + IceUtil::Mutex::Lock lock(_mutex); + assert(!_acceptPending && (!_accepted.empty() || _fd == INVALID_SOCKET)); +} + +TransceiverPtr +IceInternal::TcpAcceptor::accept() +{ + if(_fd == INVALID_SOCKET) // Acceptor closed. + { + assert(_accepted.empty()); + throw SocketException(__FILE__, __LINE__); + } + + Windows::Networking::Sockets::StreamSocket^ fd; + { + IceUtil::Mutex::Lock lock(_mutex); + assert(!_accepted.empty()); + fd = _accepted.front(); + _accepted.pop_front(); + } + + return new TcpTransceiver(_instance, new StreamSocket(_instance, fd)); +} + #else - SOCKET fd = doAccept(_fd); -#endif - return new TcpTransceiver(_instance, new StreamSocket(_instance, fd)); +TransceiverPtr +IceInternal::TcpAcceptor::accept() +{ + return new TcpTransceiver(_instance, new StreamSocket(_instance, doAccept(_fd))); } +#endif + string IceInternal::TcpAcceptor::protocol() const { @@ -191,23 +285,49 @@ IceInternal::TcpAcceptor::TcpAcceptor(const TcpEndpointIPtr& endpoint, _instance(instance), _addr(getAddressForServer(host, port, _instance->protocolSupport(), instance->preferIPv6())) #ifdef ICE_USE_IOCP - , _acceptFd(INVALID_SOCKET), - _info(SocketOperationRead) + , _acceptFd(INVALID_SOCKET), _info(SocketOperationRead) #endif { -#ifdef SOMAXCONN _backlog = instance->properties()->getPropertyAsIntWithDefault("Ice.TCP.Backlog", SOMAXCONN); + +#if defined(ICE_OS_WINRT) + _fd = ref new StreamSocketListener(); + safe_cast<StreamSocketListener^>(_fd)->ConnectionReceived += + ref new TypedEventHandler<StreamSocketListener^, StreamSocketListenerConnectionReceivedEventArgs^>( + [=](StreamSocketListener^, StreamSocketListenerConnectionReceivedEventArgs^ args) + { + IceUtil::Mutex::Lock lock(_mutex); + if(_fd == INVALID_SOCKET) // Acceptor was closed. + { + closeSocket(args->Socket); + return; + } + _accepted.push_back(args->Socket); + + // + // If the acceptor is waiting for a socket to be accepted, notify + // the selector that the acceptor is ready for "read". This will + // in turn caused finishAccept() and accept() to be called by the + // thread pool. If the acceptor isn't ready to accept the socket, + // it is just queued, when startAccept is called it will be dequed. + // + if(_acceptPending) + { + completed(SocketOperationRead); + _acceptPending = false; + } + }); #else - _backlog = instance->properties()->getPropertyAsIntWithDefault("Ice.TCP.Backlog", 511); + _fd = createServerSocket(false, _addr, instance->protocolSupport()); #endif - _fd = createServerSocket(false, _addr, instance->protocolSupport()); #ifdef ICE_USE_IOCP _acceptBuf.resize((sizeof(sockaddr_storage) + 16) * 2); #endif setBlock(_fd, false); setTcpBufSize(_fd, _instance); + #ifndef _WIN32 // // Enable SO_REUSEADDR on Unix platforms to allow re-using the |