diff options
author | Matthew Newhook <matthew@zeroc.com> | 2007-01-31 05:20:18 +0000 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2007-01-31 05:20:18 +0000 |
commit | aee8b7cff90d42cd1151ec9911e490b0de553dd5 (patch) | |
tree | 75b49c8837bb98683d9451c5c9edc4427e22924c /cpp | |
parent | updated README. (diff) | |
download | ice-aee8b7cff90d42cd1151ec9911e490b0de553dd5.tar.bz2 ice-aee8b7cff90d42cd1151ec9911e490b0de553dd5.tar.xz ice-aee8b7cff90d42cd1151ec9911e490b0de553dd5.zip |
http://bugzilla.zeroc.com/bugzilla/show_bug.cgi?id=1690,
http://bugzilla.zeroc.com/bugzilla/show_bug.cgi?id=1711
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/demo/IceStorm/replicated/Subscriber.cpp | 14 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 34 | ||||
-rw-r--r-- | cpp/src/Ice/TcpTransceiver.cpp | 9 | ||||
-rw-r--r-- | cpp/src/Ice/TcpTransceiver.h | 1 | ||||
-rw-r--r-- | cpp/src/Ice/Transceiver.h | 1 | ||||
-rw-r--r-- | cpp/src/Ice/UdpTransceiver.cpp | 40 | ||||
-rw-r--r-- | cpp/src/Ice/UdpTransceiver.h | 1 | ||||
-rw-r--r-- | cpp/src/IceSSL/TransceiverI.cpp | 9 | ||||
-rw-r--r-- | cpp/src/IceSSL/TransceiverI.h | 1 | ||||
-rw-r--r-- | cpp/test/IceStorm/single/Publisher.cpp | 10 | ||||
-rw-r--r-- | cpp/test/IceStorm/single/Subscriber.cpp | 51 | ||||
-rwxr-xr-x | cpp/test/IceStorm/single/run.py | 2 |
12 files changed, 127 insertions, 46 deletions
diff --git a/cpp/demo/IceStorm/replicated/Subscriber.cpp b/cpp/demo/IceStorm/replicated/Subscriber.cpp index 1adafbcd804..8755650cf9f 100644 --- a/cpp/demo/IceStorm/replicated/Subscriber.cpp +++ b/cpp/demo/IceStorm/replicated/Subscriber.cpp @@ -59,18 +59,14 @@ Subscriber::run(int argc, char* argv[]) } // - // Set the requested quality of service "reliability" = - // "batch". This tells IceStorm to send events to the subscriber - // in batches at regular intervals. + // Create the servant to receive the events. // - IceStorm::QoS qos; - qos["reliability"] = "batch"; + Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapter("Clock.Subscriber"); // - // Create the servant to receive the events. + // We want to use oneway batch messages. // - Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapter("Clock.Subscriber"); - Ice::ObjectPrx clock = adapter->addWithUUID(new ClockI); + Ice::ObjectPrx clock = adapter->addWithUUID(new ClockI)->ice_batchOneway(); IceStorm::TopicPrx topic; Ice::ObjectProxySeq::const_iterator p; @@ -103,7 +99,7 @@ Subscriber::run(int argc, char* argv[]) for(p = topics.begin(); p != topics.end(); ++p) { topic = IceStorm::TopicPrx::uncheckedCast(*p); - topic->subscribe(qos, clock); + topic->subscribeAndGetPublisher(IceStorm::QoS(), clock); } adapter->activate(); diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 2a393793093..d832f87a886 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -883,25 +883,39 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) // _batchStream.swap(*os); - if(_batchAutoFlush && _batchStream.b.size() > _instance->messageSizeMax()) + if(_batchAutoFlush) { + IceUtil::Mutex::Lock sendSync(_sendMutex); + if(!_transceiver) + { + assert(_exception.get()); + _exception->ice_throw(); // The exception is immutable at this point. + } + // // Throw memory limit exception if the first message added causes us to // go over limit. Otherwise put aside the marshalled message that caused // limit to be exceeded and rollback stream to the marker. // - if(_batchRequestNum == 0) + try { - resetBatch(true); - throw MemoryLimitException(__FILE__, __LINE__); + _transceiver->checkSendSize(_batchStream, _instance->messageSizeMax()); + } + catch(const Ice::Exception&) + { + if(_batchRequestNum == 0) + { + resetBatch(true); + throw; + } + vector<Ice::Byte>(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()).swap(lastRequest); + _batchStream.b.resize(_batchMarker); + autoflush = true; } - - vector<Ice::Byte>(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()).swap(lastRequest); - _batchStream.b.resize(_batchMarker); - autoflush = true; } - else - { + + if(!autoflush) + { // // Increment the number of requests in the batch. // diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp index fbada343127..4eb5f54fa02 100644 --- a/cpp/src/Ice/TcpTransceiver.cpp +++ b/cpp/src/Ice/TcpTransceiver.cpp @@ -343,6 +343,15 @@ IceInternal::TcpTransceiver::initialize(int) { } +void +IceInternal::TcpTransceiver::checkSendSize(const Buffer& buf, size_t messageSizeMax) +{ + if(buf.b.size() > messageSizeMax) + { + throw MemoryLimitException(__FILE__, __LINE__); + } +} + IceInternal::TcpTransceiver::TcpTransceiver(const InstancePtr& instance, SOCKET fd) : _traceLevels(instance->traceLevels()), _logger(instance->initializationData().logger), diff --git a/cpp/src/Ice/TcpTransceiver.h b/cpp/src/Ice/TcpTransceiver.h index e2d9d301695..a6a7abbdb29 100644 --- a/cpp/src/Ice/TcpTransceiver.h +++ b/cpp/src/Ice/TcpTransceiver.h @@ -35,6 +35,7 @@ public: virtual std::string type() const; virtual std::string toString() const; virtual void initialize(int); + virtual void checkSendSize(const Buffer&, size_t); private: diff --git a/cpp/src/Ice/Transceiver.h b/cpp/src/Ice/Transceiver.h index 0a95324b7de..1e3d637c9f0 100644 --- a/cpp/src/Ice/Transceiver.h +++ b/cpp/src/Ice/Transceiver.h @@ -38,6 +38,7 @@ public: virtual std::string type() const = 0; virtual std::string toString() const = 0; virtual void initialize(int) = 0; + virtual void checkSendSize(const Buffer&, size_t) = 0; }; } diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp index 62b95f8286f..f7e26883e38 100644 --- a/cpp/src/Ice/UdpTransceiver.cpp +++ b/cpp/src/Ice/UdpTransceiver.cpp @@ -109,13 +109,18 @@ void IceInternal::UdpTransceiver::write(Buffer& buf, int) { assert(buf.i == buf.b.begin()); + // + // The maximum packetSize is either the maximum allowable UDP + // packet size, or the UDP send buffer size (which ever is + // smaller). + // const int packetSize = min(_maxPacketSize, _sndSize - _udpOverhead); if(packetSize < static_cast<int>(buf.b.size())) { // // We don't log a warning here because the client gets an exception anyway. // - throw Ice::DatagramLimitException(__FILE__, __LINE__); + throw DatagramLimitException(__FILE__, __LINE__); } repeat: @@ -190,6 +195,11 @@ IceInternal::UdpTransceiver::read(Buffer& buf, int) { assert(buf.i == buf.b.begin()); + // + // The maximum packetSize is either the maximum allowable UDP + // packet size, or the UDP send buffer size (which ever is + // smaller). + // const int packetSize = min(_maxPacketSize, _rcvSize - _udpOverhead); if(packetSize < static_cast<int>(buf.b.size())) { @@ -202,7 +212,7 @@ IceInternal::UdpTransceiver::read(Buffer& buf, int) Warning out(_logger); out << "DatagramLimitException: maximum size of " << packetSize << " exceeded"; } - throw Ice::DatagramLimitException(__FILE__, __LINE__); + throw DatagramLimitException(__FILE__, __LINE__); } buf.b.resize(packetSize); buf.i = buf.b.begin(); @@ -337,6 +347,20 @@ IceInternal::UdpTransceiver::initialize(int) { } +void +IceInternal::UdpTransceiver::checkSendSize(const Buffer& buf, size_t messageSizeMax) +{ + if(buf.b.size() > messageSizeMax) + { + throw MemoryLimitException(__FILE__, __LINE__); + } + const int packetSize = min(_maxPacketSize, _sndSize - _udpOverhead); + if(packetSize < static_cast<int>(buf.b.size())) + { + throw DatagramLimitException(__FILE__, __LINE__); + } +} + bool IceInternal::UdpTransceiver::equivalent(const string& host, int port) const { @@ -475,18 +499,6 @@ IceInternal::UdpTransceiver::setBufSize(const InstancePtr& instance) sizeRequested = dfltSize; } - // - // Ice.MessageSizeMax overrides UDP buffer sizes if Ice.MessageSizeMax + _udpOverhead is less. - // - size_t messageSizeMax = instance->messageSizeMax(); - if(static_cast<size_t>(sizeRequested) > messageSizeMax + _udpOverhead) - { - Warning out(_logger); - out << "UDP " << direction << " buffer size: requested size of " << sizeRequested << " adjusted to "; - sizeRequested = min(static_cast<int>(messageSizeMax), _maxPacketSize) + _udpOverhead; - out << sizeRequested << " (Ice.MessageSizeMax takes precedence)"; - } - if(sizeRequested != dfltSize) { // diff --git a/cpp/src/Ice/UdpTransceiver.h b/cpp/src/Ice/UdpTransceiver.h index 12dc68f32ad..c39bd3f0a19 100644 --- a/cpp/src/Ice/UdpTransceiver.h +++ b/cpp/src/Ice/UdpTransceiver.h @@ -41,6 +41,7 @@ public: virtual std::string type() const; virtual std::string toString() const; virtual void initialize(int); + virtual void checkSendSize(const Buffer&, size_t); bool equivalent(const std::string&, int) const; int effectivePort() const; diff --git a/cpp/src/IceSSL/TransceiverI.cpp b/cpp/src/IceSSL/TransceiverI.cpp index 8ee4211da2b..eaf89fafef6 100644 --- a/cpp/src/IceSSL/TransceiverI.cpp +++ b/cpp/src/IceSSL/TransceiverI.cpp @@ -518,6 +518,15 @@ IceSSL::TransceiverI::initialize(int timeout) } } +void +IceSSL::TransceiverI::checkSendSize(const IceInternal::Buffer& buf, size_t messageSizeMax) +{ + if(buf.b.size() > messageSizeMax) + { + throw MemoryLimitException(__FILE__, __LINE__); + } +} + ConnectionInfo IceSSL::TransceiverI::getConnectionInfo() const { diff --git a/cpp/src/IceSSL/TransceiverI.h b/cpp/src/IceSSL/TransceiverI.h index 7de64ddc05c..46daa6d9570 100644 --- a/cpp/src/IceSSL/TransceiverI.h +++ b/cpp/src/IceSSL/TransceiverI.h @@ -38,6 +38,7 @@ public: virtual std::string type() const; virtual std::string toString() const; virtual void initialize(int); + virtual void checkSendSize(const IceInternal::Buffer&, size_t); ConnectionInfo getConnectionInfo() const; diff --git a/cpp/test/IceStorm/single/Publisher.cpp b/cpp/test/IceStorm/single/Publisher.cpp index 0d7f606900a..0dab4de5ffd 100644 --- a/cpp/test/IceStorm/single/Publisher.cpp +++ b/cpp/test/IceStorm/single/Publisher.cpp @@ -28,8 +28,8 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) return EXIT_FAILURE; } - ObjectPrx base = communicator->stringToProxy(managerProxy); - IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(base); + IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast( + communicator->stringToProxy(managerProxy)); if(!manager) { cerr << argv[0] << ": `" << managerProxy << "' is not running" << endl; @@ -59,12 +59,6 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) single->event(i); } - // - // Before we exit, we ping the proxy as twoway, to make sure that - // all oneways are delivered. - // - SinglePrx::uncheckedCast(single->ice_twoway())->ice_ping(); - return EXIT_SUCCESS; } diff --git a/cpp/test/IceStorm/single/Subscriber.cpp b/cpp/test/IceStorm/single/Subscriber.cpp index c0c8e1b4f93..b1857119707 100644 --- a/cpp/test/IceStorm/single/Subscriber.cpp +++ b/cpp/test/IceStorm/single/Subscriber.cpp @@ -34,7 +34,8 @@ public: event(int i, const Current& current) { - if((_name == "default" || _name == "oneway" || _name == "batch") && current.requestId != 0) + if((_name == "default" || _name == "oneway" || _name == "batch" || _name == "datagram" || + _name == "batch datagram") && current.requestId != 0) { cerr << endl << "expected oneway request"; test(false); @@ -48,6 +49,11 @@ public: cerr << endl << "received unordered event for `" << _name << "': " << i << " " << _last; test(false); } + if((_name == "datagram" || _name == "batch datagram") && current.con->type() != "udp") + { + cerr << endl << "expected datagram to be received over udp"; + test(false); + } Lock sync(*this); ++_last; if(++_count == 1000) @@ -61,11 +67,28 @@ public: { Lock sync(*this); cout << "testing " << _name << " reliability... " << flush; + bool datagram = _name == "datagram" || _name == "batch datagram"; + IceUtil::Time timeout = (datagram) ? IceUtil::Time::seconds(5) : IceUtil::Time::seconds(20); while(_count < 1000) { - if(!timedWait(IceUtil::Time::seconds(20))) + if(!timedWait(timeout)) { - test(false); + if(datagram && _count > 0) + { + if(_count < 100) + { + cout << "[" << _count << "/1000: This may be an error!!]"; + } + else + { + cout << "[" << _count << "/1000] "; + } + break; + } + else + { + test(false); + } } } cout << "ok" << endl; @@ -101,7 +124,7 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) return EXIT_FAILURE; } - ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("SingleAdapter", "default"); + ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("SingleAdapter", "default:udp"); TopicPrx topic; try @@ -149,6 +172,18 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) qos["reliability"] = "twoway ordered"; topic->subscribe(qos, adapter->addWithUUID(subscribers.back())); } + { + subscribers.push_back(new SingleI(communicator, "datagram")); + IceStorm::QoS qos; + qos["reliability"] = "oneway"; + topic->subscribe(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())->ice_datagram()); + } + { + subscribers.push_back(new SingleI(communicator, "batch datagram")); + IceStorm::QoS qos; + qos["reliability"] = "batch"; + topic->subscribe(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())->ice_datagram()); + } // // Next we use the new API call with the new proxy semantics. // @@ -174,6 +209,14 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) qos["reliability"] = "ordered"; topic->subscribeAndGetPublisher(qos, adapter->addWithUUID(subscribers.back())); } + { + subscribers.push_back(new SingleI(communicator, "datagram")); + topic->subscribeAndGetPublisher(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())->ice_datagram()); + } + { + subscribers.push_back(new SingleI(communicator, "batch datagram")); + topic->subscribeAndGetPublisher(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())->ice_batchDatagram()); + } adapter->activate(); diff --git a/cpp/test/IceStorm/single/run.py b/cpp/test/IceStorm/single/run.py index 3c2f43442c3..4772685001c 100755 --- a/cpp/test/IceStorm/single/run.py +++ b/cpp/test/IceStorm/single/run.py @@ -32,7 +32,7 @@ iceBoxEndpoints = ' --Ice.OA.IceBox.ServiceManager.Endpoints="default -p 12010" iceStormService = " --IceBox.Service.IceStorm=IceStormService," + TestUtil.getIceSoVersion() + ":createIceStorm" + \ ' --Ice.OA.IceStorm.TopicManager.Endpoints="default -p 12011"' + \ - ' --Ice.OA.IceStorm.Publish.Endpoints="default"' + \ + ' --Ice.OA.IceStorm.Publish.Endpoints="default:udp"' + \ " --IceBox.PrintServicesReady=IceStorm" + \ " --IceBox.InheritContainerProperties=1" iceStormReference = ' --IceStorm.TopicManager.Proxy="IceStorm/TopicManager:default -p 12011"' |