From f265bcb8a0fbc8f946883ba419fa83c5dc40f36a Mon Sep 17 00:00:00 2001 From: Dwayne Boone Date: Fri, 2 Jan 2009 15:25:31 -0330 Subject: Bug 3621 - updated copyright to 2009 --- cpp/src/Ice/ThreadPool.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'cpp/src/Ice/ThreadPool.cpp') diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 7f0f1b72e27..4b245401c3d 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -1,6 +1,6 @@ // ********************************************************************** // -// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. -- cgit v1.2.3 From 3994b371119ba308c6c4f27475c1b16a1fc518eb Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Thu, 22 Jan 2009 15:19:02 +0100 Subject: Fixed bug 3484 & 3604 --- cpp/src/Ice/ThreadPool.cpp | 19 ++++++++++++++++--- cpp/src/Ice/UdpTransceiver.cpp | 1 - cpp/test/Ice/udp/AllTests.cpp | 32 ++++++++++++++++++++++++++++++++ cpp/test/Ice/udp/Client.cpp | 8 +++++++- cpp/test/Ice/udp/Server.cpp | 8 +++++++- cpp/test/Ice/udp/Test.ice | 3 +++ cpp/test/Ice/udp/TestI.cpp | 15 ++++++++++++++- cpp/test/Ice/udp/TestI.h | 1 + cs/src/Ice/ConnectionI.cs | 9 ++++++--- cs/test/Ice/udp/AllTests.cs | 32 ++++++++++++++++++++++++++++++++ cs/test/Ice/udp/Client.cs | 8 +++++++- cs/test/Ice/udp/Server.cs | 8 +++++++- cs/test/Ice/udp/Test.ice | 3 +++ cs/test/Ice/udp/TestIntfI.cs | 13 ++++++++++++- java/src/IceInternal/ThreadPool.java | 16 ++++++++++++++-- java/test/Ice/udp/AllTests.java | 32 ++++++++++++++++++++++++++++++++ java/test/Ice/udp/Client.java | 9 ++++++++- java/test/Ice/udp/Server.java | 8 +++++++- java/test/Ice/udp/Test.ice | 3 +++ java/test/Ice/udp/TestIntfI.java | 12 ++++++++++++ 20 files changed, 223 insertions(+), 17 deletions(-) (limited to 'cpp/src/Ice/ThreadPool.cpp') diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 4b245401c3d..ab41198104c 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -480,6 +480,8 @@ IceInternal::ThreadPool::run() } catch(const DatagramLimitException&) // Expected. { + handler->_stream.resize(0); + handler->_stream.i = stream.b.begin(); continue; } catch(const SocketException& ex) @@ -496,6 +498,8 @@ IceInternal::ThreadPool::run() Warning out(_instance->initializationData().logger); out << "datagram connection exception:\n" << ex << '\n' << handler->toString(); } + handler->_stream.resize(0); + handler->_stream.i = stream.b.begin(); } else { @@ -632,7 +636,17 @@ bool IceInternal::ThreadPool::read(const EventHandlerPtr& handler) { BasicStream& stream = handler->_stream; - + + if(stream.i - stream.b.begin() >= headerSize) + { + if(!handler->read(stream)) + { + return false; + } + assert(stream.i == stream.b.end()); + return true; + } + if(stream.b.size() == 0) { stream.b.resize(headerSize); @@ -656,6 +670,7 @@ IceInternal::ThreadPool::read(const EventHandlerPtr& handler) // throw IllegalMessageSizeException(__FILE__, __LINE__); } + stream.i = stream.b.begin(); const Byte* m; stream.readBlob(m, static_cast(sizeof(magic))); @@ -721,8 +736,6 @@ IceInternal::ThreadPool::read(const EventHandlerPtr& handler) { Warning out(_instance->initializationData().logger); out << "DatagramLimitException: maximum size of " << pos << " exceeded"; - stream.resize(0); - stream.i = stream.b.begin(); } throw DatagramLimitException(__FILE__, __LINE__); } diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp index 92852685aea..731de9ce9e8 100644 --- a/cpp/src/Ice/UdpTransceiver.cpp +++ b/cpp/src/Ice/UdpTransceiver.cpp @@ -65,7 +65,6 @@ IceInternal::UdpTransceiver::write(Buffer& buf) // // We don't log a warning here because the client gets an exception anyway. // - cerr << packetSize << " " << _maxPacketSize << " " << _sndSize << endl; throw DatagramLimitException(__FILE__, __LINE__); } diff --git a/cpp/test/Ice/udp/AllTests.cpp b/cpp/test/Ice/udp/AllTests.cpp index 41a21e287d7..7b05acad659 100644 --- a/cpp/test/Ice/udp/AllTests.cpp +++ b/cpp/test/Ice/udp/AllTests.cpp @@ -79,6 +79,38 @@ allTests(const CommunicatorPtr& communicator) obj->ping(reply); bool ret = replyI->waitReply(3, IceUtil::Time::seconds(2)); test(ret == true); + + Test::ByteSeq seq; + try + { + seq.resize(1024); + while(true) + { + seq.resize(seq.size() * 2 + 10); + replyI->reset(); + obj->sendByteSeq(seq, reply); + replyI->waitReply(1, IceUtil::Time::seconds(10)); + } + } + catch(const DatagramLimitException&) + { + test(seq.size() > 16384); + } + + communicator->getProperties()->setProperty("Ice.UDP.SndSize", "64000"); + seq.resize(50000); + try + { + replyI->reset(); + obj->sendByteSeq(seq, reply); + test(!replyI->waitReply(1, IceUtil::Time::milliSeconds(500))); + } + catch(const Ice::LocalException& ex) + { + cerr << ex << endl; + test(false); + } + cout << "ok" << endl; cout << "testing udp multicast... " << flush; diff --git a/cpp/test/Ice/udp/Client.cpp b/cpp/test/Ice/udp/Client.cpp index 3e476d383f8..b6a24ad19dc 100644 --- a/cpp/test/Ice/udp/Client.cpp +++ b/cpp/test/Ice/udp/Client.cpp @@ -31,7 +31,13 @@ main(int argc, char* argv[]) try { - communicator = Ice::initialize(argc, argv); + Ice::InitializationData initData; + initData.properties = Ice::createProperties(argc, argv); + + initData.properties->setProperty("Ice.Warn.Connections", "0"); + initData.properties->setProperty("Ice.UDP.SndSize", "16384"); + + communicator = Ice::initialize(argc, argv, initData); status = run(argc, argv, communicator); } catch(const Ice::Exception& ex) diff --git a/cpp/test/Ice/udp/Server.cpp b/cpp/test/Ice/udp/Server.cpp index 3793b860171..bcaa010beea 100644 --- a/cpp/test/Ice/udp/Server.cpp +++ b/cpp/test/Ice/udp/Server.cpp @@ -50,7 +50,13 @@ main(int argc, char* argv[]) try { - communicator = Ice::initialize(argc, argv); + Ice::InitializationData initData; + initData.properties = Ice::createProperties(argc, argv); + + initData.properties->setProperty("Ice.Warn.Connections", "0"); + initData.properties->setProperty("Ice.UDP.RcvSize", "16384"); + + communicator = Ice::initialize(argc, argv, initData); status = run(argc, argv, communicator); } catch(const Ice::Exception& ex) diff --git a/cpp/test/Ice/udp/Test.ice b/cpp/test/Ice/udp/Test.ice index 46adf09ca03..0d4af4d441a 100644 --- a/cpp/test/Ice/udp/Test.ice +++ b/cpp/test/Ice/udp/Test.ice @@ -18,9 +18,12 @@ interface PingReply void reply(); }; +sequence ByteSeq; + interface TestIntf { void ping(PingReply* reply); + void sendByteSeq(ByteSeq seq, PingReply* reply); void shutdown(); }; diff --git a/cpp/test/Ice/udp/TestI.cpp b/cpp/test/Ice/udp/TestI.cpp index 8cd9bf8e1ea..55657e25ada 100644 --- a/cpp/test/Ice/udp/TestI.cpp +++ b/cpp/test/Ice/udp/TestI.cpp @@ -21,7 +21,20 @@ TestIntfI::ping(const Test::PingReplyPrx& reply, const Current& current) { reply->reply(); } - catch(const Ice::Exception ex) + catch(const Ice::Exception&) + { + assert(false); + } +} + +void +TestIntfI::sendByteSeq(const Test::ByteSeq&, const Test::PingReplyPrx& reply, const Current& current) +{ + try + { + reply->reply(); + } + catch(const Ice::Exception&) { assert(false); } diff --git a/cpp/test/Ice/udp/TestI.h b/cpp/test/Ice/udp/TestI.h index 5781faa3e81..ce404a0f264 100644 --- a/cpp/test/Ice/udp/TestI.h +++ b/cpp/test/Ice/udp/TestI.h @@ -17,6 +17,7 @@ class TestIntfI : public Test::TestIntf public: virtual void ping(const Test::PingReplyPrx&, const Ice::Current&); + virtual void sendByteSeq(const Test::ByteSeq&, const Test::PingReplyPrx&, const Ice::Current&); virtual void shutdown(const Ice::Current&); }; diff --git a/cs/src/Ice/ConnectionI.cs b/cs/src/Ice/ConnectionI.cs index 70b5967b96b..021313db534 100644 --- a/cs/src/Ice/ConnectionI.cs +++ b/cs/src/Ice/ConnectionI.cs @@ -1988,6 +1988,7 @@ namespace Ice } Debug.Assert(_transceiver != null); + bool parseHeader = _stream.isEmpty() || _stream.pos() < IceInternal.Protocol.headerSize; // // Complete an asynchronous read operation if necessary. This may raise a SocketException @@ -2042,7 +2043,7 @@ namespace Ice // When we've read enough to fill out the header, we need to validate it. The stream // will be enlarged if necessary to contain the entire message. // - if(pos == IceInternal.Protocol.headerSize) + if(parseHeader && pos >= IceInternal.Protocol.headerSize) { validateHeader(_stream); @@ -2058,8 +2059,6 @@ namespace Ice _logger.warning("DatagramLimitException: maximum size of " + _stream.pos() + " exceeded"); } - _stream.pos(0); - _stream.resize(0, true); throw new Ice.DatagramLimitException(); } } @@ -2141,6 +2140,8 @@ namespace Ice // // Expected. Restart the read. // + _stream.pos(0); + _stream.resize(IceInternal.Protocol.headerSize, true); // Make room for the next header. readAsync(null); } catch(IceInternal.ReadAbortedException) @@ -2167,6 +2168,8 @@ namespace Ice // // Restart the read. // + _stream.pos(0); + _stream.resize(IceInternal.Protocol.headerSize, true); // Make room for the next header. readAsync(null); } else diff --git a/cs/test/Ice/udp/AllTests.cs b/cs/test/Ice/udp/AllTests.cs index ed19a348211..671e6d7c126 100644 --- a/cs/test/Ice/udp/AllTests.cs +++ b/cs/test/Ice/udp/AllTests.cs @@ -84,6 +84,38 @@ public class AllTests obj.ping(reply); bool ret = replyI.waitReply(3, 2000); test(ret == true); + + byte[] seq = null; + try + { + seq = new byte[1024]; + while(true) + { + seq = new byte[seq.Length * 2 + 10]; + replyI.reset(); + obj.sendByteSeq(seq, reply); + replyI.waitReply(1, 10000); + } + } + catch(Ice.DatagramLimitException) + { + test(seq.Length > 16384); + } + + communicator.getProperties().setProperty("Ice.UDP.SndSize", "64000"); + seq = new byte[50000]; + try + { + replyI.reset(); + obj.sendByteSeq(seq, reply); + test(!replyI.waitReply(1, 500)); + } + catch(Ice.LocalException ex) + { + Console.Out.WriteLine(ex); + test(false); + } + Console.Out.WriteLine("ok"); Console.Out.Write("testing udp multicast... "); diff --git a/cs/test/Ice/udp/Client.cs b/cs/test/Ice/udp/Client.cs index 1d9e485f0c9..68c990c8c05 100644 --- a/cs/test/Ice/udp/Client.cs +++ b/cs/test/Ice/udp/Client.cs @@ -36,7 +36,13 @@ public class Client try { - communicator = Ice.Util.initialize(ref args); + Ice.InitializationData initData = new Ice.InitializationData(); + initData.properties = Ice.Util.createProperties(ref args); + + initData.properties.setProperty("Ice.Warn.Connections", "0"); + initData.properties.setProperty("Ice.UDP.SndSize", "16384"); + + communicator = Ice.Util.initialize(ref args, initData); status = run(args, communicator); } catch(System.Exception ex) diff --git a/cs/test/Ice/udp/Server.cs b/cs/test/Ice/udp/Server.cs index 82f15c5bb46..1c7c8526358 100644 --- a/cs/test/Ice/udp/Server.cs +++ b/cs/test/Ice/udp/Server.cs @@ -57,7 +57,13 @@ public class Server try { - communicator = Ice.Util.initialize(ref args); + Ice.InitializationData initData = new Ice.InitializationData(); + initData.properties = Ice.Util.createProperties(ref args); + + initData.properties.setProperty("Ice.Warn.Connections", "0"); + initData.properties.setProperty("Ice.UDP.RcvSize", "16384"); + + communicator = Ice.Util.initialize(ref args, initData); status = run(args, communicator); } catch(System.Exception ex) diff --git a/cs/test/Ice/udp/Test.ice b/cs/test/Ice/udp/Test.ice index 46adf09ca03..0d4af4d441a 100644 --- a/cs/test/Ice/udp/Test.ice +++ b/cs/test/Ice/udp/Test.ice @@ -18,9 +18,12 @@ interface PingReply void reply(); }; +sequence ByteSeq; + interface TestIntf { void ping(PingReply* reply); + void sendByteSeq(ByteSeq seq, PingReply* reply); void shutdown(); }; diff --git a/cs/test/Ice/udp/TestIntfI.cs b/cs/test/Ice/udp/TestIntfI.cs index e7b5aaa6226..d00ce1abe30 100644 --- a/cs/test/Ice/udp/TestIntfI.cs +++ b/cs/test/Ice/udp/TestIntfI.cs @@ -13,7 +13,18 @@ public sealed class TestIntfI : Test.TestIntfDisp_ { public override void ping(Test.PingReplyPrx reply, Ice.Current current) { - System.Console.WriteLine("ping"); + try + { + reply.reply(); + } + catch(Ice.LocalException) + { + Debug.Assert(false); + } + } + + public override void sendByteSeq(byte[] seq, Test.PingReplyPrx reply, Ice.Current current) + { try { reply.reply(); diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 78316f1f910..ea80b046a77 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -524,6 +524,8 @@ public final class ThreadPool } catch(Ice.DatagramLimitException ex) // Expected. { + handler._stream.pos(0); + handler._stream.resize(0, true); continue; } catch(Ice.SocketException ex) @@ -548,6 +550,8 @@ public final class ThreadPool _instance.initializationData().logger.warning( "datagram connection exception:\n" + ex + "\n" + handler.toString()); } + handler._stream.pos(0); + handler._stream.resize(0, true); } else { @@ -718,6 +722,16 @@ public final class ThreadPool { BasicStream stream = handler._stream; + if(stream.pos() >= Protocol.headerSize) + { + if(!handler.read(stream)) + { + return false; + } + assert(stream.pos() == stream.size()); + return true; + } + if(stream.size() == 0) { stream.resize(Protocol.headerSize, true); @@ -805,8 +819,6 @@ public final class ThreadPool _instance.initializationData().logger.warning("DatagramLimitException: maximum size of " + stream.pos() + " exceeded"); } - stream.pos(0); - stream.resize(0, true); throw new Ice.DatagramLimitException(); } else diff --git a/java/test/Ice/udp/AllTests.java b/java/test/Ice/udp/AllTests.java index b15d57f901c..5e23f10d62e 100644 --- a/java/test/Ice/udp/AllTests.java +++ b/java/test/Ice/udp/AllTests.java @@ -82,6 +82,38 @@ public class AllTests obj.ping(reply); boolean ret = replyI.waitReply(3, 2000); test(ret == true); + + byte[] seq = null; + try + { + seq = new byte[1024]; + while(true) + { + seq = new byte[seq.length * 2 + 10]; + replyI.reset(); + obj.sendByteSeq(seq, reply); + replyI.waitReply(1, 10000); + } + } + catch(Ice.DatagramLimitException ex) + { + test(seq.length > 16384); + } + + communicator.getProperties().setProperty("Ice.UDP.SndSize", "64000"); + seq = new byte[50000]; + try + { + replyI.reset(); + obj.sendByteSeq(seq, reply); + test(!replyI.waitReply(1, 500)); + } + catch(Ice.LocalException ex) + { + System.err.println(ex); + test(false); + } + System.out.println("ok"); System.out.print("testing udp multicast... "); diff --git a/java/test/Ice/udp/Client.java b/java/test/Ice/udp/Client.java index 3315d78da6b..321f89d3ecf 100644 --- a/java/test/Ice/udp/Client.java +++ b/java/test/Ice/udp/Client.java @@ -27,7 +27,14 @@ public class Client try { - communicator = Ice.Util.initialize(args); + Ice.StringSeqHolder argHolder = new Ice.StringSeqHolder(args); + Ice.InitializationData initData = new Ice.InitializationData(); + initData.properties = Ice.Util.createProperties(argHolder); + + initData.properties.setProperty("Ice.Warn.Connections", "0"); + initData.properties.setProperty("Ice.UDP.SndSize", "16384"); + + communicator = Ice.Util.initialize(argHolder, initData); status = run(args, communicator); } catch(Exception ex) diff --git a/java/test/Ice/udp/Server.java b/java/test/Ice/udp/Server.java index 4ea6e29a4b8..bedd63218bb 100644 --- a/java/test/Ice/udp/Server.java +++ b/java/test/Ice/udp/Server.java @@ -48,7 +48,13 @@ public class Server try { Ice.StringSeqHolder argHolder = new Ice.StringSeqHolder(args); - communicator = Ice.Util.initialize(argHolder); + Ice.InitializationData initData = new Ice.InitializationData(); + initData.properties = Ice.Util.createProperties(argHolder); + + initData.properties.setProperty("Ice.Warn.Connections", "0"); + initData.properties.setProperty("Ice.UDP.RcvSize", "16384"); + + communicator = Ice.Util.initialize(argHolder, initData); status = run(argHolder.value, communicator); } catch(Exception ex) diff --git a/java/test/Ice/udp/Test.ice b/java/test/Ice/udp/Test.ice index 46adf09ca03..0d4af4d441a 100644 --- a/java/test/Ice/udp/Test.ice +++ b/java/test/Ice/udp/Test.ice @@ -18,9 +18,12 @@ interface PingReply void reply(); }; +sequence ByteSeq; + interface TestIntf { void ping(PingReply* reply); + void sendByteSeq(ByteSeq seq, PingReply* reply); void shutdown(); }; diff --git a/java/test/Ice/udp/TestIntfI.java b/java/test/Ice/udp/TestIntfI.java index 8a3d3887767..585d00be077 100644 --- a/java/test/Ice/udp/TestIntfI.java +++ b/java/test/Ice/udp/TestIntfI.java @@ -21,6 +21,18 @@ public final class TestIntfI extends Test._TestIntfDisp } } + public void sendByteSeq(byte[] seq, Test.PingReplyPrx reply, Ice.Current current) + { + try + { + reply.reply(); + } + catch(Ice.LocalException ex) + { + assert(false); + } + } + public void shutdown(Ice.Current current) { current.adapter.getCommunicator().shutdown(); -- cgit v1.2.3