summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/demo/IceStorm/replicated/Subscriber.cpp14
-rw-r--r--cpp/src/Ice/ConnectionI.cpp34
-rw-r--r--cpp/src/Ice/TcpTransceiver.cpp9
-rw-r--r--cpp/src/Ice/TcpTransceiver.h1
-rw-r--r--cpp/src/Ice/Transceiver.h1
-rw-r--r--cpp/src/Ice/UdpTransceiver.cpp40
-rw-r--r--cpp/src/Ice/UdpTransceiver.h1
-rw-r--r--cpp/src/IceSSL/TransceiverI.cpp9
-rw-r--r--cpp/src/IceSSL/TransceiverI.h1
-rw-r--r--cpp/test/IceStorm/single/Publisher.cpp10
-rw-r--r--cpp/test/IceStorm/single/Subscriber.cpp51
-rwxr-xr-xcpp/test/IceStorm/single/run.py2
12 files changed, 127 insertions, 46 deletions
diff --git a/cpp/demo/IceStorm/replicated/Subscriber.cpp b/cpp/demo/IceStorm/replicated/Subscriber.cpp
index 1adafbcd804..8755650cf9f 100644
--- a/cpp/demo/IceStorm/replicated/Subscriber.cpp
+++ b/cpp/demo/IceStorm/replicated/Subscriber.cpp
@@ -59,18 +59,14 @@ Subscriber::run(int argc, char* argv[])
}
//
- // Set the requested quality of service "reliability" =
- // "batch". This tells IceStorm to send events to the subscriber
- // in batches at regular intervals.
+ // Create the servant to receive the events.
//
- IceStorm::QoS qos;
- qos["reliability"] = "batch";
+ Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapter("Clock.Subscriber");
//
- // Create the servant to receive the events.
+ // We want to use oneway batch messages.
//
- Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapter("Clock.Subscriber");
- Ice::ObjectPrx clock = adapter->addWithUUID(new ClockI);
+ Ice::ObjectPrx clock = adapter->addWithUUID(new ClockI)->ice_batchOneway();
IceStorm::TopicPrx topic;
Ice::ObjectProxySeq::const_iterator p;
@@ -103,7 +99,7 @@ Subscriber::run(int argc, char* argv[])
for(p = topics.begin(); p != topics.end(); ++p)
{
topic = IceStorm::TopicPrx::uncheckedCast(*p);
- topic->subscribe(qos, clock);
+ topic->subscribeAndGetPublisher(IceStorm::QoS(), clock);
}
adapter->activate();
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 2a393793093..d832f87a886 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -883,25 +883,39 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
//
_batchStream.swap(*os);
- if(_batchAutoFlush && _batchStream.b.size() > _instance->messageSizeMax())
+ if(_batchAutoFlush)
{
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+ if(!_transceiver)
+ {
+ assert(_exception.get());
+ _exception->ice_throw(); // The exception is immutable at this point.
+ }
+
//
// Throw memory limit exception if the first message added causes us to
// go over limit. Otherwise put aside the marshalled message that caused
// limit to be exceeded and rollback stream to the marker.
//
- if(_batchRequestNum == 0)
+ try
{
- resetBatch(true);
- throw MemoryLimitException(__FILE__, __LINE__);
+ _transceiver->checkSendSize(_batchStream, _instance->messageSizeMax());
+ }
+ catch(const Ice::Exception&)
+ {
+ if(_batchRequestNum == 0)
+ {
+ resetBatch(true);
+ throw;
+ }
+ vector<Ice::Byte>(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()).swap(lastRequest);
+ _batchStream.b.resize(_batchMarker);
+ autoflush = true;
}
-
- vector<Ice::Byte>(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()).swap(lastRequest);
- _batchStream.b.resize(_batchMarker);
- autoflush = true;
}
- else
- {
+
+ if(!autoflush)
+ {
//
// Increment the number of requests in the batch.
//
diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp
index fbada343127..4eb5f54fa02 100644
--- a/cpp/src/Ice/TcpTransceiver.cpp
+++ b/cpp/src/Ice/TcpTransceiver.cpp
@@ -343,6 +343,15 @@ IceInternal::TcpTransceiver::initialize(int)
{
}
+void
+IceInternal::TcpTransceiver::checkSendSize(const Buffer& buf, size_t messageSizeMax)
+{
+ if(buf.b.size() > messageSizeMax)
+ {
+ throw MemoryLimitException(__FILE__, __LINE__);
+ }
+}
+
IceInternal::TcpTransceiver::TcpTransceiver(const InstancePtr& instance, SOCKET fd) :
_traceLevels(instance->traceLevels()),
_logger(instance->initializationData().logger),
diff --git a/cpp/src/Ice/TcpTransceiver.h b/cpp/src/Ice/TcpTransceiver.h
index e2d9d301695..a6a7abbdb29 100644
--- a/cpp/src/Ice/TcpTransceiver.h
+++ b/cpp/src/Ice/TcpTransceiver.h
@@ -35,6 +35,7 @@ public:
virtual std::string type() const;
virtual std::string toString() const;
virtual void initialize(int);
+ virtual void checkSendSize(const Buffer&, size_t);
private:
diff --git a/cpp/src/Ice/Transceiver.h b/cpp/src/Ice/Transceiver.h
index 0a95324b7de..1e3d637c9f0 100644
--- a/cpp/src/Ice/Transceiver.h
+++ b/cpp/src/Ice/Transceiver.h
@@ -38,6 +38,7 @@ public:
virtual std::string type() const = 0;
virtual std::string toString() const = 0;
virtual void initialize(int) = 0;
+ virtual void checkSendSize(const Buffer&, size_t) = 0;
};
}
diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp
index 62b95f8286f..f7e26883e38 100644
--- a/cpp/src/Ice/UdpTransceiver.cpp
+++ b/cpp/src/Ice/UdpTransceiver.cpp
@@ -109,13 +109,18 @@ void
IceInternal::UdpTransceiver::write(Buffer& buf, int)
{
assert(buf.i == buf.b.begin());
+ //
+ // The maximum packetSize is either the maximum allowable UDP
+ // packet size, or the UDP send buffer size (which ever is
+ // smaller).
+ //
const int packetSize = min(_maxPacketSize, _sndSize - _udpOverhead);
if(packetSize < static_cast<int>(buf.b.size()))
{
//
// We don't log a warning here because the client gets an exception anyway.
//
- throw Ice::DatagramLimitException(__FILE__, __LINE__);
+ throw DatagramLimitException(__FILE__, __LINE__);
}
repeat:
@@ -190,6 +195,11 @@ IceInternal::UdpTransceiver::read(Buffer& buf, int)
{
assert(buf.i == buf.b.begin());
+ //
+ // The maximum packetSize is either the maximum allowable UDP
+ // packet size, or the UDP send buffer size (which ever is
+ // smaller).
+ //
const int packetSize = min(_maxPacketSize, _rcvSize - _udpOverhead);
if(packetSize < static_cast<int>(buf.b.size()))
{
@@ -202,7 +212,7 @@ IceInternal::UdpTransceiver::read(Buffer& buf, int)
Warning out(_logger);
out << "DatagramLimitException: maximum size of " << packetSize << " exceeded";
}
- throw Ice::DatagramLimitException(__FILE__, __LINE__);
+ throw DatagramLimitException(__FILE__, __LINE__);
}
buf.b.resize(packetSize);
buf.i = buf.b.begin();
@@ -337,6 +347,20 @@ IceInternal::UdpTransceiver::initialize(int)
{
}
+void
+IceInternal::UdpTransceiver::checkSendSize(const Buffer& buf, size_t messageSizeMax)
+{
+ if(buf.b.size() > messageSizeMax)
+ {
+ throw MemoryLimitException(__FILE__, __LINE__);
+ }
+ const int packetSize = min(_maxPacketSize, _sndSize - _udpOverhead);
+ if(packetSize < static_cast<int>(buf.b.size()))
+ {
+ throw DatagramLimitException(__FILE__, __LINE__);
+ }
+}
+
bool
IceInternal::UdpTransceiver::equivalent(const string& host, int port) const
{
@@ -475,18 +499,6 @@ IceInternal::UdpTransceiver::setBufSize(const InstancePtr& instance)
sizeRequested = dfltSize;
}
- //
- // Ice.MessageSizeMax overrides UDP buffer sizes if Ice.MessageSizeMax + _udpOverhead is less.
- //
- size_t messageSizeMax = instance->messageSizeMax();
- if(static_cast<size_t>(sizeRequested) > messageSizeMax + _udpOverhead)
- {
- Warning out(_logger);
- out << "UDP " << direction << " buffer size: requested size of " << sizeRequested << " adjusted to ";
- sizeRequested = min(static_cast<int>(messageSizeMax), _maxPacketSize) + _udpOverhead;
- out << sizeRequested << " (Ice.MessageSizeMax takes precedence)";
- }
-
if(sizeRequested != dfltSize)
{
//
diff --git a/cpp/src/Ice/UdpTransceiver.h b/cpp/src/Ice/UdpTransceiver.h
index 12dc68f32ad..c39bd3f0a19 100644
--- a/cpp/src/Ice/UdpTransceiver.h
+++ b/cpp/src/Ice/UdpTransceiver.h
@@ -41,6 +41,7 @@ public:
virtual std::string type() const;
virtual std::string toString() const;
virtual void initialize(int);
+ virtual void checkSendSize(const Buffer&, size_t);
bool equivalent(const std::string&, int) const;
int effectivePort() const;
diff --git a/cpp/src/IceSSL/TransceiverI.cpp b/cpp/src/IceSSL/TransceiverI.cpp
index 8ee4211da2b..eaf89fafef6 100644
--- a/cpp/src/IceSSL/TransceiverI.cpp
+++ b/cpp/src/IceSSL/TransceiverI.cpp
@@ -518,6 +518,15 @@ IceSSL::TransceiverI::initialize(int timeout)
}
}
+void
+IceSSL::TransceiverI::checkSendSize(const IceInternal::Buffer& buf, size_t messageSizeMax)
+{
+ if(buf.b.size() > messageSizeMax)
+ {
+ throw MemoryLimitException(__FILE__, __LINE__);
+ }
+}
+
ConnectionInfo
IceSSL::TransceiverI::getConnectionInfo() const
{
diff --git a/cpp/src/IceSSL/TransceiverI.h b/cpp/src/IceSSL/TransceiverI.h
index 7de64ddc05c..46daa6d9570 100644
--- a/cpp/src/IceSSL/TransceiverI.h
+++ b/cpp/src/IceSSL/TransceiverI.h
@@ -38,6 +38,7 @@ public:
virtual std::string type() const;
virtual std::string toString() const;
virtual void initialize(int);
+ virtual void checkSendSize(const IceInternal::Buffer&, size_t);
ConnectionInfo getConnectionInfo() const;
diff --git a/cpp/test/IceStorm/single/Publisher.cpp b/cpp/test/IceStorm/single/Publisher.cpp
index 0d7f606900a..0dab4de5ffd 100644
--- a/cpp/test/IceStorm/single/Publisher.cpp
+++ b/cpp/test/IceStorm/single/Publisher.cpp
@@ -28,8 +28,8 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
return EXIT_FAILURE;
}
- ObjectPrx base = communicator->stringToProxy(managerProxy);
- IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(base);
+ IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(
+ communicator->stringToProxy(managerProxy));
if(!manager)
{
cerr << argv[0] << ": `" << managerProxy << "' is not running" << endl;
@@ -59,12 +59,6 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
single->event(i);
}
- //
- // Before we exit, we ping the proxy as twoway, to make sure that
- // all oneways are delivered.
- //
- SinglePrx::uncheckedCast(single->ice_twoway())->ice_ping();
-
return EXIT_SUCCESS;
}
diff --git a/cpp/test/IceStorm/single/Subscriber.cpp b/cpp/test/IceStorm/single/Subscriber.cpp
index c0c8e1b4f93..b1857119707 100644
--- a/cpp/test/IceStorm/single/Subscriber.cpp
+++ b/cpp/test/IceStorm/single/Subscriber.cpp
@@ -34,7 +34,8 @@ public:
event(int i, const Current& current)
{
- if((_name == "default" || _name == "oneway" || _name == "batch") && current.requestId != 0)
+ if((_name == "default" || _name == "oneway" || _name == "batch" || _name == "datagram" ||
+ _name == "batch datagram") && current.requestId != 0)
{
cerr << endl << "expected oneway request";
test(false);
@@ -48,6 +49,11 @@ public:
cerr << endl << "received unordered event for `" << _name << "': " << i << " " << _last;
test(false);
}
+ if((_name == "datagram" || _name == "batch datagram") && current.con->type() != "udp")
+ {
+ cerr << endl << "expected datagram to be received over udp";
+ test(false);
+ }
Lock sync(*this);
++_last;
if(++_count == 1000)
@@ -61,11 +67,28 @@ public:
{
Lock sync(*this);
cout << "testing " << _name << " reliability... " << flush;
+ bool datagram = _name == "datagram" || _name == "batch datagram";
+ IceUtil::Time timeout = (datagram) ? IceUtil::Time::seconds(5) : IceUtil::Time::seconds(20);
while(_count < 1000)
{
- if(!timedWait(IceUtil::Time::seconds(20)))
+ if(!timedWait(timeout))
{
- test(false);
+ if(datagram && _count > 0)
+ {
+ if(_count < 100)
+ {
+ cout << "[" << _count << "/1000: This may be an error!!]";
+ }
+ else
+ {
+ cout << "[" << _count << "/1000] ";
+ }
+ break;
+ }
+ else
+ {
+ test(false);
+ }
}
}
cout << "ok" << endl;
@@ -101,7 +124,7 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
return EXIT_FAILURE;
}
- ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("SingleAdapter", "default");
+ ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("SingleAdapter", "default:udp");
TopicPrx topic;
try
@@ -149,6 +172,18 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
qos["reliability"] = "twoway ordered";
topic->subscribe(qos, adapter->addWithUUID(subscribers.back()));
}
+ {
+ subscribers.push_back(new SingleI(communicator, "datagram"));
+ IceStorm::QoS qos;
+ qos["reliability"] = "oneway";
+ topic->subscribe(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())->ice_datagram());
+ }
+ {
+ subscribers.push_back(new SingleI(communicator, "batch datagram"));
+ IceStorm::QoS qos;
+ qos["reliability"] = "batch";
+ topic->subscribe(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())->ice_datagram());
+ }
//
// Next we use the new API call with the new proxy semantics.
//
@@ -174,6 +209,14 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
qos["reliability"] = "ordered";
topic->subscribeAndGetPublisher(qos, adapter->addWithUUID(subscribers.back()));
}
+ {
+ subscribers.push_back(new SingleI(communicator, "datagram"));
+ topic->subscribeAndGetPublisher(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())->ice_datagram());
+ }
+ {
+ subscribers.push_back(new SingleI(communicator, "batch datagram"));
+ topic->subscribeAndGetPublisher(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())->ice_batchDatagram());
+ }
adapter->activate();
diff --git a/cpp/test/IceStorm/single/run.py b/cpp/test/IceStorm/single/run.py
index 3c2f43442c3..4772685001c 100755
--- a/cpp/test/IceStorm/single/run.py
+++ b/cpp/test/IceStorm/single/run.py
@@ -32,7 +32,7 @@ iceBoxEndpoints = ' --Ice.OA.IceBox.ServiceManager.Endpoints="default -p 12010"
iceStormService = " --IceBox.Service.IceStorm=IceStormService," + TestUtil.getIceSoVersion() + ":createIceStorm" + \
' --Ice.OA.IceStorm.TopicManager.Endpoints="default -p 12011"' + \
- ' --Ice.OA.IceStorm.Publish.Endpoints="default"' + \
+ ' --Ice.OA.IceStorm.Publish.Endpoints="default:udp"' + \
" --IceBox.PrintServicesReady=IceStorm" + \
" --IceBox.InheritContainerProperties=1"
iceStormReference = ' --IceStorm.TopicManager.Proxy="IceStorm/TopicManager:default -p 12011"'