summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cs/src/Ice/ConnectionI.cs390
-rw-r--r--cs/src/Ice/ThreadPool.cs6
2 files changed, 201 insertions, 195 deletions
diff --git a/cs/src/Ice/ConnectionI.cs b/cs/src/Ice/ConnectionI.cs
index 34aeb7d2a03..a7c71f8059d 100644
--- a/cs/src/Ice/ConnectionI.cs
+++ b/cs/src/Ice/ConnectionI.cs
@@ -1117,266 +1117,270 @@ namespace Ice
int dispatchCount = 0;
IceInternal.ThreadPoolMessage msg = new IceInternal.ThreadPoolMessage(this);
- lock(this)
+ try
{
- if(!msg.startIOScope(ref current))
- {
- return;
- }
-
- if(_state >= StateClosed)
- {
- return;
- }
-
- int readyOp = current.operation;
- try
+ lock(this)
{
- unscheduleTimeout(current.operation);
-
- int writeOp = IceInternal.SocketOperation.None;
- int readOp = IceInternal.SocketOperation.None;
- if((readyOp & IceInternal.SocketOperation.Write) != 0)
+ if(!msg.startIOScope(ref current))
{
- if(_observer != null)
- {
- observerStartWrite(_writeStream.getBuffer());
- }
- writeOp = write(_writeStream.getBuffer());
- if(_observer != null && (writeOp & IceInternal.SocketOperation.Write) == 0)
- {
- observerFinishWrite(_writeStream.getBuffer());
- }
+ return;
}
- while((readyOp & IceInternal.SocketOperation.Read) != 0)
+ if(_state >= StateClosed)
{
- IceInternal.Buffer buf = _readStream.getBuffer();
+ return;
+ }
- if(_observer != null && !_readHeader)
- {
- observerStartRead(buf);
- }
+ int readyOp = current.operation;
+ try
+ {
+ unscheduleTimeout(current.operation);
- readOp = read(buf);
- if((readOp & IceInternal.SocketOperation.Read) != 0)
- {
- break;
- }
- if(_observer != null && !_readHeader)
+ int writeOp = IceInternal.SocketOperation.None;
+ int readOp = IceInternal.SocketOperation.None;
+ if((readyOp & IceInternal.SocketOperation.Write) != 0)
{
- Debug.Assert(!buf.b.hasRemaining());
- observerFinishRead(buf);
+ if(_observer != null)
+ {
+ observerStartWrite(_writeStream.getBuffer());
+ }
+ writeOp = write(_writeStream.getBuffer());
+ if(_observer != null && (writeOp & IceInternal.SocketOperation.Write) == 0)
+ {
+ observerFinishWrite(_writeStream.getBuffer());
+ }
}
- if(_readHeader) // Read header if necessary.
+ while((readyOp & IceInternal.SocketOperation.Read) != 0)
{
- _readHeader = false;
+ IceInternal.Buffer buf = _readStream.getBuffer();
- if(_observer != null)
+ if(_observer != null && !_readHeader)
{
- _observer.receivedBytes(IceInternal.Protocol.headerSize);
+ observerStartRead(buf);
}
- int pos = _readStream.pos();
- if(pos < IceInternal.Protocol.headerSize)
+ readOp = read(buf);
+ if((readOp & IceInternal.SocketOperation.Read) != 0)
{
- //
- // This situation is possible for small UDP packets.
- //
- throw new Ice.IllegalMessageSizeException();
+ break;
+ }
+ if(_observer != null && !_readHeader)
+ {
+ Debug.Assert(!buf.b.hasRemaining());
+ observerFinishRead(buf);
}
- _readStream.pos(0);
- byte[] m = new byte[4];
- m[0] = _readStream.readByte();
- m[1] = _readStream.readByte();
- m[2] = _readStream.readByte();
- m[3] = _readStream.readByte();
- if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] ||
- m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3])
+ if(_readHeader) // Read header if necessary.
{
- Ice.BadMagicException ex = new Ice.BadMagicException();
- ex.badMagic = m;
- throw ex;
+ _readHeader = false;
+
+ if(_observer != null)
+ {
+ _observer.receivedBytes(IceInternal.Protocol.headerSize);
+ }
+
+ int pos = _readStream.pos();
+ if(pos < IceInternal.Protocol.headerSize)
+ {
+ //
+ // This situation is possible for small UDP packets.
+ //
+ throw new Ice.IllegalMessageSizeException();
+ }
+
+ _readStream.pos(0);
+ byte[] m = new byte[4];
+ m[0] = _readStream.readByte();
+ m[1] = _readStream.readByte();
+ m[2] = _readStream.readByte();
+ m[3] = _readStream.readByte();
+ if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] ||
+ m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3])
+ {
+ Ice.BadMagicException ex = new Ice.BadMagicException();
+ ex.badMagic = m;
+ throw ex;
+ }
+
+ ProtocolVersion pv = new ProtocolVersion();
+ pv.read__(_readStream);
+ IceInternal.Protocol.checkSupportedProtocol(pv);
+ EncodingVersion ev = new EncodingVersion();
+ ev.read__(_readStream);
+ IceInternal.Protocol.checkSupportedProtocolEncoding(ev);
+
+ _readStream.readByte(); // messageType
+ _readStream.readByte(); // compress
+ int size = _readStream.readInt();
+ if(size < IceInternal.Protocol.headerSize)
+ {
+ throw new Ice.IllegalMessageSizeException();
+ }
+ if(size > _instance.messageSizeMax())
+ {
+ IceInternal.Ex.throwMemoryLimitException(size, _instance.messageSizeMax());
+ }
+ if(size > _readStream.size())
+ {
+ _readStream.resize(size, true);
+ }
+ _readStream.pos(pos);
}
- ProtocolVersion pv = new ProtocolVersion();
- pv.read__(_readStream);
- IceInternal.Protocol.checkSupportedProtocol(pv);
- EncodingVersion ev = new EncodingVersion();
- ev.read__(_readStream);
- IceInternal.Protocol.checkSupportedProtocolEncoding(ev);
-
- _readStream.readByte(); // messageType
- _readStream.readByte(); // compress
- int size = _readStream.readInt();
- if(size < IceInternal.Protocol.headerSize)
+ if(buf.b.hasRemaining())
{
- throw new Ice.IllegalMessageSizeException();
+ if(_endpoint.datagram())
+ {
+ throw new Ice.DatagramLimitException(); // The message was truncated.
+ }
+ continue;
}
- if(size > _instance.messageSizeMax())
+ break;
+ }
+
+ int newOp = readOp | writeOp;
+ readyOp &= ~newOp;
+ Debug.Assert(readyOp != 0 || newOp != 0);
+
+ if(_state <= StateNotValidated)
+ {
+ if(newOp != 0)
{
- IceInternal.Ex.throwMemoryLimitException(size, _instance.messageSizeMax());
+ //
+ // Wait for all the transceiver conditions to be
+ // satisfied before continuing.
+ //
+ scheduleTimeout(newOp);
+ _threadPool.update(this, current.operation, newOp);
+ return;
}
- if(size > _readStream.size())
+
+ if(_state == StateNotInitialized && !initialize(current.operation))
{
- _readStream.resize(size, true);
+ return;
}
- _readStream.pos(pos);
- }
- if(buf.b.hasRemaining())
- {
- if(_endpoint.datagram())
+ if(_state <= StateNotValidated && !validate(current.operation))
{
- throw new Ice.DatagramLimitException(); // The message was truncated.
+ return;
}
- continue;
- }
- break;
- }
- int newOp = readOp | writeOp;
- readyOp &= ~newOp;
- Debug.Assert(readyOp != 0 || newOp != 0);
+ _threadPool.unregister(this, current.operation);
- if(_state <= StateNotValidated)
- {
- if(newOp != 0)
- {
//
- // Wait for all the transceiver conditions to be
- // satisfied before continuing.
+ // We start out in holding state.
//
- scheduleTimeout(newOp);
- _threadPool.update(this, current.operation, newOp);
- return;
- }
-
- if(_state == StateNotInitialized && !initialize(current.operation))
- {
- return;
+ setState(StateHolding);
+ if(_startCallback != null)
+ {
+ startCB = _startCallback;
+ _startCallback = null;
+ if(startCB != null)
+ {
+ ++dispatchCount;
+ }
+ }
}
-
- if(_state <= StateNotValidated && !validate(current.operation))
+ else
{
- return;
- }
+ Debug.Assert(_state <= StateClosingPending);
- _threadPool.unregister(this, current.operation);
+ //
+ // We parse messages first, if we receive a close
+ // connection message we won't send more messages.
+ //
+ if((readyOp & IceInternal.SocketOperation.Read) != 0)
+ {
+ newOp |= parseMessage(ref info);
+ dispatchCount += info.messageDispatchCount;
+ }
- //
- // We start out in holding state.
- //
- setState(StateHolding);
- if(_startCallback != null)
- {
- startCB = _startCallback;
- _startCallback = null;
- if(startCB != null)
+ if((readyOp & IceInternal.SocketOperation.Write) != 0)
{
- ++dispatchCount;
+ newOp |= sendNextMessage(out sentCBs);
+ if(sentCBs != null)
+ {
+ ++dispatchCount;
+ }
}
- }
- }
- else
- {
- Debug.Assert(_state <= StateClosingPending);
- //
- // We parse messages first, if we receive a close
- // connection message we won't send more messages.
- //
- if((readyOp & IceInternal.SocketOperation.Read) != 0)
- {
- newOp |= parseMessage(ref info);
- dispatchCount += info.messageDispatchCount;
- }
+ if(_state < StateClosed)
+ {
+ scheduleTimeout(newOp);
+ _threadPool.update(this, current.operation, newOp);
+ }
- if((readyOp & IceInternal.SocketOperation.Write) != 0)
- {
- newOp |= sendNextMessage(out sentCBs);
- if(sentCBs != null)
+ if(readyOp == 0)
{
- ++dispatchCount;
+ Debug.Assert(dispatchCount == 0);
+ return;
}
}
- if(_state < StateClosed)
+ if(_acmLastActivity > 0)
{
- scheduleTimeout(newOp);
- _threadPool.update(this, current.operation, newOp);
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
}
- if(readyOp == 0)
+ if(dispatchCount == 0)
{
- Debug.Assert(dispatchCount == 0);
- return;
+ return; // Nothing to dispatch we're done!
}
- }
- if(_acmLastActivity > 0)
- {
- _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
- }
+ _dispatchCount += dispatchCount;
- if(dispatchCount == 0)
- {
- return; // Nothing to dispatch we're done!
+ msg.completed(ref current);
}
-
- _dispatchCount += dispatchCount;
-
- msg.completed(ref current);
- }
- catch(DatagramLimitException) // Expected.
- {
- if(_warnUdp)
+ catch(DatagramLimitException) // Expected.
{
- _logger.warning("maximum datagram size of " + _readStream.pos() + " exceeded");
- }
- _readStream.resize(IceInternal.Protocol.headerSize, true);
- _readStream.pos(0);
- _readHeader = true;
- return;
- }
- catch(SocketException ex)
- {
- setState(StateClosed, ex);
- return;
- }
- catch(LocalException ex)
- {
- if(_endpoint.datagram())
- {
- if(_warn)
+ if(_warnUdp)
{
- String s = "datagram connection exception:\n" + ex + '\n' + _desc;
- _logger.warning(s);
+ _logger.warning("maximum datagram size of " + _readStream.pos() + " exceeded");
}
_readStream.resize(IceInternal.Protocol.headerSize, true);
_readStream.pos(0);
_readHeader = true;
+ return;
}
- else
+ catch(SocketException ex)
{
setState(StateClosed, ex);
+ return;
+ }
+ catch(LocalException ex)
+ {
+ if(_endpoint.datagram())
+ {
+ if(_warn)
+ {
+ String s = "datagram connection exception:\n" + ex + '\n' + _desc;
+ _logger.warning(s);
+ }
+ _readStream.resize(IceInternal.Protocol.headerSize, true);
+ _readStream.pos(0);
+ _readHeader = true;
+ }
+ else
+ {
+ setState(StateClosed, ex);
+ }
+ return;
}
- return;
- }
- finally
- {
- msg.finishIOScope(ref current);
- }
- IceInternal.ThreadPoolCurrent c = current;
- _threadPool.dispatch(() =>
- {
- dispatch(startCB, sentCBs, info);
- msg.destroy(ref c);
- }, this);
+ IceInternal.ThreadPoolCurrent c = current;
+ _threadPool.dispatch(() =>
+ {
+ dispatch(startCB, sentCBs, info);
+ msg.destroy(ref c);
+ }, this);
+ }
}
+ finally
+ {
+ msg.finishIOScope(ref current);
+ }
+
}
private void dispatch(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info)
diff --git a/cs/src/Ice/ThreadPool.cs b/cs/src/Ice/ThreadPool.cs
index 5b28c54a2b6..57af5f232f0 100644
--- a/cs/src/Ice/ThreadPool.cs
+++ b/cs/src/Ice/ThreadPool.cs
@@ -38,8 +38,10 @@ namespace IceInternal
{
if(_finishWithIO)
{
- // This must be called with the handler locked.
- current.finishMessage(true);
+ lock(_mutex)
+ {
+ current.finishMessage(true);
+ }
}
}