diff options
author | Dwayne Boone <dwayne@zeroc.com> | 2005-12-12 16:13:15 +0000 |
---|---|---|
committer | Dwayne Boone <dwayne@zeroc.com> | 2005-12-12 16:13:15 +0000 |
commit | f2a48984a6f77743180602b2da0d07dc5dfee3e3 (patch) | |
tree | da55120b35c3ea5a3244364f321e9aaeb457b7c7 /cppe/src | |
parent | Fixed IceGrid and IceBox bug where spaces in directory names were not (diff) | |
download | ice-f2a48984a6f77743180602b2da0d07dc5dfee3e3.tar.bz2 ice-f2a48984a6f77743180602b2da0d07dc5dfee3e3.tar.xz ice-f2a48984a6f77743180602b2da0d07dc5dfee3e3.zip |
Added blocking client support
Diffstat (limited to 'cppe/src')
-rwxr-xr-x | cppe/src/IceE/Connection.cpp | 591 | ||||
-rw-r--r-- | cppe/src/IceE/Instance.cpp | 17 | ||||
-rw-r--r-- | cppe/src/IceE/Instance.h | 8 |
3 files changed, 415 insertions, 201 deletions
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp index a02a644a671..a4c7db3e694 100755 --- a/cppe/src/IceE/Connection.cpp +++ b/cppe/src/IceE/Connection.cpp @@ -119,6 +119,7 @@ Ice::Connection::close(bool force) } else { +#ifndef ICEE_PURE_BLOCKING_CLIENT // // If we do a graceful shutdown, then we wait until all // outstanding requests have been completed. Otherwise, the @@ -130,6 +131,7 @@ Ice::Connection::close(bool force) { wait(); } +#endif setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } @@ -151,7 +153,9 @@ Ice::Connection::isDestroyed() const bool Ice::Connection::isFinished() const { +#ifndef ICEE_PURE_BLOCKING_CLIENT IceUtil::ThreadPtr threadPerConnection; +#endif { // @@ -166,19 +170,29 @@ Ice::Connection::isFinished() const return false; } - if(_transceiver != 0 || _dispatchCount != 0 || - (_threadPerConnection && _threadPerConnection->getThreadControl().isAlive())) + if(_transceiver != 0 +#ifndef ICEE_PURE_BLOCKING_CLIENT + || _dispatchCount != 0 || (_threadPerConnection && _threadPerConnection->getThreadControl().isAlive()) +#endif + ) { return false; } assert(_state == StateClosed); +#ifndef ICEE_PURE_BLOCKING_CLIENT threadPerConnection = _threadPerConnection; _threadPerConnection = 0; +#endif } - threadPerConnection->getThreadControl().join(); +#ifndef ICEE_PURE_BLOCKING_CLIENT + if(threadPerConnection) + { + threadPerConnection->getThreadControl().join(); + } +#endif return true; } @@ -201,7 +215,9 @@ Ice::Connection::waitUntilHolding() const void Ice::Connection::waitUntilFinished() { +#ifndef ICEE_PURE_BLOCKING_CLIENT IceUtil::ThreadPtr threadPerConnection; +#endif { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -262,14 +278,18 @@ Ice::Connection::waitUntilFinished() assert(_state == StateClosed); +#ifndef ICEE_PURE_BLOCKING_CLIENT threadPerConnection = _threadPerConnection; _threadPerConnection = 0; +#endif } +#ifndef ICEE_PURE_BLOCKING_CLIENT if(threadPerConnection) { threadPerConnection->getThreadControl().join(); } +#endif } // @@ -323,9 +343,16 @@ Ice::Connection::sendRequest(BasicStream* os, Outgoing* out) #endif // - // Add to the requests map. + // Add to the requests map if not blocking. // - _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); +#ifndef ICEE_PURE_BLOCKING_CLIENT +# ifdef ICEE_BLOCKING_CLIENT + if(!_blocking) +# endif + { + _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); + } +#endif } } @@ -353,6 +380,58 @@ Ice::Connection::sendRequest(BasicStream* os, Outgoing* out) os->i = os->b.begin(); traceRequest("sending request", *os, _logger, _traceLevels); _transceiver->write(*os, _endpoint->timeout()); + +#ifdef ICEE_BLOCKING_CLIENT + // + // If blocking client, we wait for the response from the server. + // + if(out +#ifndef ICEE_PURE_BLOCKING_CLIENT + && _blocking +#endif + ) + { + BasicStream stream(_instance.get()); + readStream(stream); + +#ifndef ICEE_PURE_CLIENT + Int invokeNum = 0; + ServantManagerPtr servantManager; + ObjectAdapterPtr adapter; +#endif + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(_state != StateClosed) + { +#ifndef ICEE_PURE_CLIENT + parseMessage(stream, requestId, out, invokeNum, servantManager, adapter); +#else + parseMessage(stream, requestId, out); +#endif + } + + // + // parseMessage() can close the connection, so we must + // check for closed state again. + // + if(_state == StateClosed) + { + try + { + _transceiver->close(); + } + catch(const LocalException&) + { + } + + _transceiver = 0; + out->finished(*_exception.get()); + } + } + } +#endif } catch(const LocalException& ex) { @@ -360,7 +439,12 @@ Ice::Connection::sendRequest(BasicStream* os, Outgoing* out) setState(StateClosed, ex); assert(_exception.get()); - if(out) +#ifndef ICEE_PURE_BLOCKING_CLIENT + if(out +# ifdef ICEE_BLOCKING_CLIENT + && !_blocking +# endif + ) { // // If the request has already been removed from the @@ -393,6 +477,7 @@ Ice::Connection::sendRequest(BasicStream* os, Outgoing* out) } } else +#endif { _exception->ice_throw(); } @@ -781,8 +866,10 @@ Ice::Connection::Connection(const InstancePtr& instance, #ifndef ICEE_PURE_CLIENT _replyHdr(headerSize, 0), #endif +#ifndef ICEE_PURE_BLOCKING_CLIENT _nextRequestId(1), _requestsHint(_requests.end()), +#endif #ifdef ICEE_HAS_BATCH _requestBatchHdr(headerSize + sizeof(Int), 0), _batchStream(_instance.get()), @@ -793,6 +880,16 @@ Ice::Connection::Connection(const InstancePtr& instance, _state(StateNotValidated), _stateTime(IceUtil::Time::now()) { +#ifndef ICEE_PURE_BLOCKING_CLIENT +# ifdef ICEE_BLOCKING_CLIENT + _blocking = _instance->blocking() +# ifndef ICEE_PURE_CLIENT + && !_adapter +# endif + ; +# endif +#endif + vector<Byte>& requestHdr = const_cast<vector<Byte>&>(_requestHdr); requestHdr[0] = magic[0]; requestHdr[1] = magic[1]; @@ -838,36 +935,51 @@ Ice::Connection::Connection(const InstancePtr& instance, } #endif - __setNoDelete(true); - try +#ifdef ICEE_BLOCKING_CLIENT +# ifndef ICEE_PURE_BLOCKING_CLIENT + if(_blocking) { - // - // If we are in thread per connection mode, create the thread - // for this connection. - // - _threadPerConnection = new ThreadPerConnection(this); - _threadPerConnection->start(_instance->threadPerConnectionStackSize()); +# endif + validate(); +# ifndef ICEE_PURE_BLOCKING_CLIENT } - catch(const Ice::Exception& ex) + else +# endif +#endif +#ifndef ICEE_PURE_BLOCKING_CLIENT { - { - Error out(_logger); - out << "cannot create thread for connection:\n" << ex.toString(); - } + __setNoDelete(true); + try + { + // + // If we are in thread per connection mode, create the thread + // for this connection. + // + _threadPerConnection = new ThreadPerConnection(this); + _threadPerConnection->start(_instance->threadPerConnectionStackSize()); + } + catch(const Ice::Exception& ex) + { + { + Error out(_logger); + out << "cannot create thread for connection:\n" << ex.toString(); + } - try - { - _transceiver->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } + try + { + _transceiver->close(); + } + catch(const LocalException&) + { + // Here we ignore any exceptions in close(). + } - __setNoDelete(false); - ex.ice_throw(); + __setNoDelete(false); + ex.ice_throw(); + } + __setNoDelete(false); } - __setNoDelete(false); +#endif } Ice::Connection::~Connection() @@ -876,8 +988,10 @@ Ice::Connection::~Connection() assert(_state == StateClosed); assert(!_transceiver); +#ifndef ICEE_PURE_BLOCKING_CLIENT assert(_dispatchCount == 0); assert(!_threadPerConnection); +#endif } void @@ -1181,6 +1295,25 @@ Ice::Connection::setState(State state) { setState(StateClosed, ex); } + +#ifdef ICEE_BLOCKING_CLIENT + if(_state != StateClosed +# ifndef ICEE_PURE_BLOCKING_CLIENT + && _blocking +# endif + ) + { + try + { + _transceiver->close(); + } + catch(const LocalException&) + { + } + _transceiver = 0; + _state = StateClosed; + } +#endif } } @@ -1224,7 +1357,7 @@ Ice::Connection::initiateShutdown() const } void -Ice::Connection::parseMessage(BasicStream& stream, Int& requestId +Ice::Connection::parseMessage(BasicStream& stream, Int& requestId, Outgoing* out #ifndef ICEE_PURE_CLIENT ,Int& invokeNum, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter #endif @@ -1254,125 +1387,172 @@ Ice::Connection::parseMessage(BasicStream& stream, Int& requestId } stream.i = stream.b.begin() + headerSize; - switch(messageType) +#ifdef ICEE_BLOCKING_CLIENT +# ifndef ICEE_PURE_BLOCKING_CLIENT + if(_blocking) { - case closeConnectionMsg: +# endif + switch(messageType) { - traceHeader("received close connection", stream, _logger, _traceLevels); - setState(StateClosed, CloseConnectionException(__FILE__, __LINE__)); - break; + case closeConnectionMsg: + { + traceHeader("received close connection", stream, _logger, _traceLevels); + setState(StateClosed, CloseConnectionException(__FILE__, __LINE__)); + break; + } + + case replyMsg: + { + traceReply("received reply", stream, _logger, _traceLevels); + + Int reqId; + stream.read(reqId); + if(reqId != requestId) + { + throw UnknownRequestIdException(__FILE__, __LINE__); + } + out->finished(stream); + break; + } + + + default: + { + traceHeader("received unexpected message\n" + "(invalid, closing connection)", + stream, _logger, _traceLevels); + throw UnknownMessageException(__FILE__, __LINE__); + break; + } } +# ifndef ICEE_PURE_BLOCKING_CLIENT + } + else +# endif +#endif +#ifndef ICEE_PURE_BLOCKING_CLIENT + { + switch(messageType) + { + case closeConnectionMsg: + { + traceHeader("received close connection", stream, _logger, _traceLevels); + setState(StateClosed, CloseConnectionException(__FILE__, __LINE__)); + break; + } #ifndef ICEE_PURE_CLIENT - case requestMsg: - { - if(_state == StateClosing) - { - traceRequest("received request during closing\n" - "(ignored by server, client will retry)", - stream, _logger, _traceLevels); - } - else - { - traceRequest("received request", stream, _logger, _traceLevels); - stream.read(requestId); - invokeNum = 1; - servantManager = _servantManager; - adapter = _adapter; - ++_dispatchCount; - } - break; - } + case requestMsg: + { + if(_state == StateClosing) + { + traceRequest("received request during closing\n" + "(ignored by server, client will retry)", + stream, _logger, _traceLevels); + } + else + { + traceRequest("received request", stream, _logger, _traceLevels); + stream.read(requestId); + invokeNum = 1; + servantManager = _servantManager; + adapter = _adapter; + ++_dispatchCount; + } + break; + } - case requestBatchMsg: - { - if(_state == StateClosing) - { - traceBatchRequest("received batch request during closing\n" - "(ignored by server, client will retry)", - stream, _logger, _traceLevels); - } - else - { - traceBatchRequest("received batch request", stream, _logger, _traceLevels); - stream.read(invokeNum); - if(invokeNum < 0) + case requestBatchMsg: + { + if(_state == StateClosing) { - invokeNum = 0; - throw NegativeSizeException(__FILE__, __LINE__); + traceBatchRequest("received batch request during closing\n" + "(ignored by server, client will retry)", + stream, _logger, _traceLevels); } - servantManager = _servantManager; - adapter = _adapter; - _dispatchCount += invokeNum; - } - break; - } + else + { + traceBatchRequest("received batch request", stream, _logger, _traceLevels); + stream.read(invokeNum); + if(invokeNum < 0) + { + invokeNum = 0; + throw NegativeSizeException(__FILE__, __LINE__); + } + servantManager = _servantManager; + adapter = _adapter; + _dispatchCount += invokeNum; + } + break; + } #endif - case replyMsg: - { - traceReply("received reply", stream, _logger, _traceLevels); - - stream.read(requestId); + case replyMsg: + { + traceReply("received reply", stream, _logger, _traceLevels); - map<Int, Outgoing*>::iterator p = _requests.end(); + stream.read(requestId); - if(_requestsHint != _requests.end()) - { - if(_requestsHint->first == requestId) + map<Int, Outgoing*>::iterator p = _requests.end(); + + if(_requestsHint != _requests.end()) { - p = _requestsHint; + if(_requestsHint->first == requestId) + { + p = _requestsHint; + } } - } - - if(p == _requests.end()) - { - p = _requests.find(requestId); - } - if(p == _requests.end()) - { - throw UnknownRequestIdException(__FILE__, __LINE__); - } + if(p == _requests.end()) + { + p = _requests.find(requestId); + } - if(p != _requests.end()) - { - p->second->finished(stream); - - if(p == _requestsHint) + if(p == _requests.end()) { - _requests.erase(p++); - _requestsHint = p; + throw UnknownRequestIdException(__FILE__, __LINE__); } - else + + if(p != _requests.end()) { - _requests.erase(p); + p->second->finished(stream); + + if(p == _requestsHint) + { + _requests.erase(p++); + _requestsHint = p; + } + else + { + _requests.erase(p); + } } - } - break; - } + break; + } - case validateConnectionMsg: - { - traceHeader("received validate connection", stream, _logger, _traceLevels); - if(_warn) - { - Warning out(_logger); - out << "ignoring unexpected validate connection message:\n" << _desc; - } - break; - } + case validateConnectionMsg: + { + traceHeader("received validate connection", stream, _logger, _traceLevels); + if(_warn) + { + Warning out(_logger); + out << "ignoring unexpected validate connection message:\n" << _desc; + } + break; + } - default: - { - traceHeader("received unknown message\n" - "(invalid, closing connection)", - stream, _logger, _traceLevels); - throw UnknownMessageException(__FILE__, __LINE__); - break; + default: + { + traceHeader("received unknown message\n" + "(invalid, closing connection)", + stream, _logger, _traceLevels); + throw UnknownMessageException(__FILE__, __LINE__); + break; + } } } +#endif } catch(const LocalException& ex) { @@ -1468,6 +1648,86 @@ Ice::Connection::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, #endif void +Ice::Connection::readStream(IceInternal::BasicStream& stream) +{ + try + { + stream.b.resize(headerSize); + stream.i = stream.b.begin(); + _transceiver->read(stream, -1); + + ptrdiff_t pos = stream.i - stream.b.begin(); + assert(pos >= headerSize); + stream.i = stream.b.begin(); + ByteSeq m(sizeof(magic), 0); + stream.readBlob(m, static_cast<Int>(sizeof(magic))); + if(!equal(m.begin(), m.end(), magic)) + { + BadMagicException ex(__FILE__, __LINE__); + ex.badMagic = m; + throw ex; + } + Byte pMajor; + Byte pMinor; + stream.read(pMajor); + stream.read(pMinor); + if(pMajor != protocolMajor) + { + UnsupportedProtocolException ex(__FILE__, __LINE__); + ex.badMajor = static_cast<unsigned char>(pMajor); + ex.badMinor = static_cast<unsigned char>(pMinor); + ex.major = static_cast<unsigned char>(protocolMajor); + ex.minor = static_cast<unsigned char>(protocolMinor); + throw ex; + } + Byte eMajor; + Byte eMinor; + stream.read(eMajor); + stream.read(eMinor); + if(eMajor != encodingMajor) + { + UnsupportedEncodingException ex(__FILE__, __LINE__); + ex.badMajor = static_cast<unsigned char>(eMajor); + ex.badMinor = static_cast<unsigned char>(eMinor); + ex.major = static_cast<unsigned char>(encodingMajor); + ex.minor = static_cast<unsigned char>(encodingMinor); + throw ex; + } + Byte messageType; + stream.read(messageType); + Byte compress; + stream.read(compress); + Int size; + stream.read(size); + if(size < headerSize) + { + throw IllegalMessageSizeException(__FILE__, __LINE__); + } + if(size > static_cast<Int>(_instance->messageSizeMax())) + { + throw MemoryLimitException(__FILE__, __LINE__); + } + if(size > static_cast<Int>(stream.b.size())) + { + stream.b.resize(size); + } + stream.i = stream.b.begin() + pos; + + if(stream.i != stream.b.end()) + { + _transceiver->read(stream, -1); + assert(stream.i == stream.b.end()); + } + } + catch(const LocalException& ex) + { + exception(ex); + } +} + +#ifndef ICEE_PURE_BLOCKING_CLIENT + +void Ice::Connection::run() { // @@ -1520,80 +1780,7 @@ Ice::Connection::run() // BasicStream stream(_instance.get()); - - try - { - stream.b.resize(headerSize); - stream.i = stream.b.begin(); - _transceiver->read(stream, -1); - - ptrdiff_t pos = stream.i - stream.b.begin(); - assert(pos >= headerSize); - stream.i = stream.b.begin(); - ByteSeq m(sizeof(magic), 0); - stream.readBlob(m, static_cast<Int>(sizeof(magic))); - if(!equal(m.begin(), m.end(), magic)) - { - BadMagicException ex(__FILE__, __LINE__); - ex.badMagic = m; - throw ex; - } - Byte pMajor; - Byte pMinor; - stream.read(pMajor); - stream.read(pMinor); - if(pMajor != protocolMajor) - { - UnsupportedProtocolException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(pMajor); - ex.badMinor = static_cast<unsigned char>(pMinor); - ex.major = static_cast<unsigned char>(protocolMajor); - ex.minor = static_cast<unsigned char>(protocolMinor); - throw ex; - } - Byte eMajor; - Byte eMinor; - stream.read(eMajor); - stream.read(eMinor); - if(eMajor != encodingMajor) - { - UnsupportedEncodingException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(eMajor); - ex.badMinor = static_cast<unsigned char>(eMinor); - ex.major = static_cast<unsigned char>(encodingMajor); - ex.minor = static_cast<unsigned char>(encodingMinor); - throw ex; - } - Byte messageType; - stream.read(messageType); - Byte compress; - stream.read(compress); - Int size; - stream.read(size); - if(size < headerSize) - { - throw IllegalMessageSizeException(__FILE__, __LINE__); - } - if(size > static_cast<Int>(_instance->messageSizeMax())) - { - throw MemoryLimitException(__FILE__, __LINE__); - } - if(size > static_cast<Int>(stream.b.size())) - { - stream.b.resize(size); - } - stream.i = stream.b.begin() + pos; - - if(stream.i != stream.b.end()) - { - _transceiver->read(stream, -1); - assert(stream.i == stream.b.end()); - } - } - catch(const LocalException& ex) - { - exception(ex); - } + readStream(stream); Int requestId = 0; #ifndef ICEE_PURE_CLIENT @@ -1618,9 +1805,9 @@ Ice::Connection::run() if(_state != StateClosed) { #ifndef ICEE_PURE_CLIENT - parseMessage(stream, requestId, invokeNum, servantManager, adapter); + parseMessage(stream, requestId, 0, invokeNum, servantManager, adapter); #else - parseMessage(stream, requestId); + parseMessage(stream, requestId, 0); #endif } @@ -1714,3 +1901,5 @@ Ice::Connection::ThreadPerConnection::run() _connection = 0; // Resolve cyclic dependency. } + +#endif // ICEE_PURE_BLOCKING_CLIENT diff --git a/cppe/src/IceE/Instance.cpp b/cppe/src/IceE/Instance.cpp index 612a28fd6c2..0f3f59a89a2 100644 --- a/cppe/src/IceE/Instance.cpp +++ b/cppe/src/IceE/Instance.cpp @@ -202,6 +202,14 @@ IceInternal::Instance::objectAdapterFactory() const } #endif +#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT) +bool +IceInternal::Instance::blocking() const +{ + return _blocking; +} +#endif + size_t IceInternal::Instance::threadPerConnectionStackSize() const { @@ -292,6 +300,9 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Prope _properties(properties), _messageSizeMax(0), _threadPerConnectionStackSize(0) +#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT) + , _blocking(false) +#endif { try { @@ -438,6 +449,11 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Prope } } +#ifndef ICEE_PURE_BLOCKING_CLIENT +# ifdef ICEE_BLOCKING_CLIENT + const_cast<bool&>(_blocking) = _properties->getPropertyAsInt("Ice.Blocking") > 0; +# endif + { Int stackSize = _properties->getPropertyAsInt("Ice.ThreadPerConnection.StackSize"); if(stackSize < 0) @@ -446,6 +462,7 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Prope } const_cast<size_t&>(_threadPerConnectionStackSize) = static_cast<size_t>(stackSize); } +#endif #ifdef ICEE_HAS_ROUTER _routerManager = new RouterManager; diff --git a/cppe/src/IceE/Instance.h b/cppe/src/IceE/Instance.h index 8565e1b313e..a9af7a54819 100644 --- a/cppe/src/IceE/Instance.h +++ b/cppe/src/IceE/Instance.h @@ -64,6 +64,10 @@ public: ObjectAdapterFactoryPtr objectAdapterFactory() const; #endif +#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT) + bool blocking() const; +#endif + private: Instance(const Ice::CommunicatorPtr&, const Ice::PropertiesPtr&); @@ -101,6 +105,10 @@ private: #ifndef ICEE_PURE_CLIENT ObjectAdapterFactoryPtr _objectAdapterFactory; #endif + +#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT) + const bool _blocking; +#endif }; } |