summaryrefslogtreecommitdiff
path: root/cppe
diff options
context:
space:
mode:
Diffstat (limited to 'cppe')
-rwxr-xr-xcppe/src/IceE/Connection.cpp180
-rw-r--r--cppe/src/IceE/Transceiver.h19
-rw-r--r--cppe/src/TcpTransport/Transceiver.cpp6
3 files changed, 100 insertions, 105 deletions
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp
index e32fdbe84b8..3bc849851d0 100755
--- a/cppe/src/IceE/Connection.cpp
+++ b/cppe/src/IceE/Connection.cpp
@@ -350,7 +350,7 @@ Ice::Connection::sendRequest(BasicStream* os)
//
os->i = os->b.begin();
traceRequest("sending request", *os, _logger, _traceLevels);
- _transceiver->write(*os, _endpoint->timeout());
+ _transceiver->write(*os);
}
#ifdef ICEE_BLOCKING_CLIENT
@@ -388,7 +388,14 @@ Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing*
if(out)
{
- readStream(*is);
+ try
+ {
+ readStream(*is);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ exception(ex);
+ }
}
}
@@ -677,7 +684,7 @@ Ice::Connection::flushBatchRequests()
//
_batchStream.i = _batchStream.b.begin();
traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
- _transceiver->write(_batchStream, _endpoint->timeout());
+ _transceiver->write(_batchStream);
}
catch(const LocalException& ex)
{
@@ -736,7 +743,7 @@ Ice::Connection::sendResponse(BasicStream* os)
//
os->i = os->b.begin();
traceReply("sending reply", *os, _logger, _traceLevels);
- _transceiver->write(*os, _endpoint->timeout());
+ _transceiver->write(*os);
}
catch(const LocalException& ex)
{
@@ -1100,7 +1107,7 @@ Ice::Connection::validate()
traceHeader("sending validate connection", os, _logger, _traceLevels);
try
{
- _transceiver->write(os, timeout);
+ _transceiver->writeWithTimeout(os, timeout);
}
catch(const TimeoutException&)
{
@@ -1115,7 +1122,7 @@ Ice::Connection::validate()
is.i = is.b.begin();
try
{
- _transceiver->read(is, timeout);
+ _transceiver->readWithTimeout(is, timeout);
}
catch(const TimeoutException&)
{
@@ -1404,7 +1411,7 @@ Ice::Connection::initiateShutdown() const
//
os.i = os.b.begin();
traceHeader("sending close connection", os, _logger, _traceLevels);
- _transceiver->write(os, _endpoint->timeout());
+ _transceiver->write(os);
//
// The CloseConnection message should be sufficient. Closing the
@@ -1710,99 +1717,74 @@ Ice::Connection::invokeAll(BasicStream& stream, Int invokeNum, Int requestId,
void
Ice::Connection::readStream(IceInternal::BasicStream& stream)
{
- try
- {
- stream.b.resize(headerSize);
- stream.i = stream.b.begin();
- _transceiver->read(stream,
-#ifdef ICEE_PURE_BLOCKING_CLIENT
- _endpoint->timeout()
-#else
-# ifdef ICEE_BLOCKING_CLIENT
- _blocking ? _endpoint->timeout() :
-# endif
- -1
-#endif
- );
-
- ptrdiff_t pos = stream.i - stream.b.begin();
- assert(pos >= headerSize);
- stream.i = stream.b.begin();
- Ice::Byte m[4];
- stream.read(m[0]);
- stream.read(m[1]);
- stream.read(m[2]);
- stream.read(m[3]);
- if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3])
- {
- BadMagicException ex(__FILE__, __LINE__);
- ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(m));
- throw ex;
- }
- Byte pMajor;
- Byte pMinor;
- stream.read(pMajor);
- stream.read(pMinor);
- if(pMajor != protocolMajor)
- {
- UnsupportedProtocolException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(pMajor);
- ex.badMinor = static_cast<unsigned char>(pMinor);
- ex.major = static_cast<unsigned char>(protocolMajor);
- ex.minor = static_cast<unsigned char>(protocolMinor);
- throw ex;
- }
- Byte eMajor;
- Byte eMinor;
- stream.read(eMajor);
- stream.read(eMinor);
- if(eMajor != encodingMajor)
- {
- UnsupportedEncodingException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(eMajor);
- ex.badMinor = static_cast<unsigned char>(eMinor);
- ex.major = static_cast<unsigned char>(encodingMajor);
- ex.minor = static_cast<unsigned char>(encodingMinor);
- throw ex;
- }
- Byte messageType;
- stream.read(messageType);
- Byte compress;
- stream.read(compress);
- Int size;
- stream.read(size);
- if(size < headerSize)
- {
- throw IllegalMessageSizeException(__FILE__, __LINE__);
- }
- if(size > static_cast<Int>(_instance->messageSizeMax()))
- {
- throw MemoryLimitException(__FILE__, __LINE__);
- }
- if(size > static_cast<Int>(stream.b.size()))
- {
- stream.b.resize(size);
- }
- stream.i = stream.b.begin() + pos;
+ stream.b.resize(headerSize);
+ stream.i = stream.b.begin();
+ _transceiver->read(stream);
- if(stream.i != stream.b.end())
- {
- _transceiver->read(stream,
-#ifdef ICEE_PURE_BLOCKING_CLIENT
- _endpoint->timeout()
-#else
-# ifdef ICEE_BLOCKING_CLIENT
- _blocking ? _endpoint->timeout() :
-# endif
- -1
-#endif
- );
- assert(stream.i == stream.b.end());
- }
+ ptrdiff_t pos = stream.i - stream.b.begin();
+ assert(pos >= headerSize);
+ stream.i = stream.b.begin();
+ Ice::Byte m[4];
+ stream.read(m[0]);
+ stream.read(m[1]);
+ stream.read(m[2]);
+ stream.read(m[3]);
+ if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3])
+ {
+ BadMagicException ex(__FILE__, __LINE__);
+ ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(m));
+ throw ex;
}
- catch(const LocalException& ex)
+ Byte pMajor;
+ Byte pMinor;
+ stream.read(pMajor);
+ stream.read(pMinor);
+ if(pMajor != protocolMajor)
+ {
+ UnsupportedProtocolException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(pMajor);
+ ex.badMinor = static_cast<unsigned char>(pMinor);
+ ex.major = static_cast<unsigned char>(protocolMajor);
+ ex.minor = static_cast<unsigned char>(protocolMinor);
+ throw ex;
+ }
+ Byte eMajor;
+ Byte eMinor;
+ stream.read(eMajor);
+ stream.read(eMinor);
+ if(eMajor != encodingMajor)
{
- exception(ex);
+ UnsupportedEncodingException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(eMajor);
+ ex.badMinor = static_cast<unsigned char>(eMinor);
+ ex.major = static_cast<unsigned char>(encodingMajor);
+ ex.minor = static_cast<unsigned char>(encodingMinor);
+ throw ex;
+ }
+ Byte messageType;
+ stream.read(messageType);
+ Byte compress;
+ stream.read(compress);
+ Int size;
+ stream.read(size);
+ if(size < headerSize)
+ {
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
+ }
+ if(size > static_cast<Int>(_instance->messageSizeMax()))
+ {
+ throw MemoryLimitException(__FILE__, __LINE__);
+ }
+ if(size > static_cast<Int>(stream.b.size()))
+ {
+ stream.b.resize(size);
+ }
+ stream.i = stream.b.begin() + pos;
+
+ if(stream.i != stream.b.end())
+ {
+ _transceiver->read(stream);
+ assert(stream.i == stream.b.end());
}
}
@@ -1869,6 +1851,10 @@ Ice::Connection::run()
{
continue;
}
+ catch(const Ice::LocalException& ex)
+ {
+ exception(ex);
+ }
Int requestId = 0;
#ifndef ICEE_PURE_CLIENT
diff --git a/cppe/src/IceE/Transceiver.h b/cppe/src/IceE/Transceiver.h
index fad156219a8..0abcdb271b5 100644
--- a/cppe/src/IceE/Transceiver.h
+++ b/cppe/src/IceE/Transceiver.h
@@ -38,8 +38,19 @@ public:
void close();
void shutdownWrite();
void shutdownReadWrite();
- void write(Buffer&, int);
- void read(Buffer&, int);
+ void writeWithTimeout(Buffer&, int);
+ void readWithTimeout(Buffer&, int);
+
+ void write(Buffer& buf)
+ {
+ writeWithTimeout(buf, _timeout);
+ }
+
+ void read(Buffer& buf)
+ {
+ readWithTimeout(buf, _timeout);
+ }
+
std::string type() const;
std::string toString() const;
@@ -58,6 +69,8 @@ private:
const Ice::LoggerPtr _logger;
SOCKET _fd;
+ const int _timeout;
+
#ifdef ICEE_USE_SELECT_FOR_TIMEOUTS
#ifdef _WIN32
WSAEVENT _event;
@@ -67,8 +80,6 @@ private:
fd_set _wFdSet;
fd_set _rFdSet;
#endif
-#else
- const int _timeout;
#endif
const std::string _desc;
diff --git a/cppe/src/TcpTransport/Transceiver.cpp b/cppe/src/TcpTransport/Transceiver.cpp
index f3bb4715748..37d29563021 100644
--- a/cppe/src/TcpTransport/Transceiver.cpp
+++ b/cppe/src/TcpTransport/Transceiver.cpp
@@ -91,7 +91,7 @@ IceInternal::Transceiver::shutdownReadWrite()
}
void
-IceInternal::Transceiver::write(Buffer& buf, int timeout)
+IceInternal::Transceiver::writeWithTimeout(Buffer& buf, int timeout)
{
Buffer::Container::difference_type packetSize =
static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
@@ -202,7 +202,7 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout)
}
void
-IceInternal::Transceiver::read(Buffer& buf, int timeout)
+IceInternal::Transceiver::readWithTimeout(Buffer& buf, int timeout)
{
assert(timeout != 0);
@@ -338,9 +338,7 @@ IceInternal::Transceiver::Transceiver(const InstancePtr& instance, SOCKET fd, in
_traceLevels(instance->traceLevels()),
_logger(instance->logger()),
_fd(fd),
-#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS
_timeout(timeout),
-#endif
_desc(fdToString(fd))
#ifdef _WIN32
, _isPeerLocal(isPeerLocal(fd))