diff options
-rw-r--r-- | cs/src/Ice/ConnectionI.cs | 390 | ||||
-rw-r--r-- | cs/src/Ice/ThreadPool.cs | 6 |
2 files changed, 201 insertions, 195 deletions
diff --git a/cs/src/Ice/ConnectionI.cs b/cs/src/Ice/ConnectionI.cs index 34aeb7d2a03..a7c71f8059d 100644 --- a/cs/src/Ice/ConnectionI.cs +++ b/cs/src/Ice/ConnectionI.cs @@ -1117,266 +1117,270 @@ namespace Ice int dispatchCount = 0; IceInternal.ThreadPoolMessage msg = new IceInternal.ThreadPoolMessage(this); - lock(this) + try { - if(!msg.startIOScope(ref current)) - { - return; - } - - if(_state >= StateClosed) - { - return; - } - - int readyOp = current.operation; - try + lock(this) { - unscheduleTimeout(current.operation); - - int writeOp = IceInternal.SocketOperation.None; - int readOp = IceInternal.SocketOperation.None; - if((readyOp & IceInternal.SocketOperation.Write) != 0) + if(!msg.startIOScope(ref current)) { - if(_observer != null) - { - observerStartWrite(_writeStream.getBuffer()); - } - writeOp = write(_writeStream.getBuffer()); - if(_observer != null && (writeOp & IceInternal.SocketOperation.Write) == 0) - { - observerFinishWrite(_writeStream.getBuffer()); - } + return; } - while((readyOp & IceInternal.SocketOperation.Read) != 0) + if(_state >= StateClosed) { - IceInternal.Buffer buf = _readStream.getBuffer(); + return; + } - if(_observer != null && !_readHeader) - { - observerStartRead(buf); - } + int readyOp = current.operation; + try + { + unscheduleTimeout(current.operation); - readOp = read(buf); - if((readOp & IceInternal.SocketOperation.Read) != 0) - { - break; - } - if(_observer != null && !_readHeader) + int writeOp = IceInternal.SocketOperation.None; + int readOp = IceInternal.SocketOperation.None; + if((readyOp & IceInternal.SocketOperation.Write) != 0) { - Debug.Assert(!buf.b.hasRemaining()); - observerFinishRead(buf); + if(_observer != null) + { + observerStartWrite(_writeStream.getBuffer()); + } + writeOp = write(_writeStream.getBuffer()); + if(_observer != null && (writeOp & IceInternal.SocketOperation.Write) == 0) + { + observerFinishWrite(_writeStream.getBuffer()); + } } - if(_readHeader) // Read header if necessary. + while((readyOp & IceInternal.SocketOperation.Read) != 0) { - _readHeader = false; + IceInternal.Buffer buf = _readStream.getBuffer(); - if(_observer != null) + if(_observer != null && !_readHeader) { - _observer.receivedBytes(IceInternal.Protocol.headerSize); + observerStartRead(buf); } - int pos = _readStream.pos(); - if(pos < IceInternal.Protocol.headerSize) + readOp = read(buf); + if((readOp & IceInternal.SocketOperation.Read) != 0) { - // - // This situation is possible for small UDP packets. - // - throw new Ice.IllegalMessageSizeException(); + break; + } + if(_observer != null && !_readHeader) + { + Debug.Assert(!buf.b.hasRemaining()); + observerFinishRead(buf); } - _readStream.pos(0); - byte[] m = new byte[4]; - m[0] = _readStream.readByte(); - m[1] = _readStream.readByte(); - m[2] = _readStream.readByte(); - m[3] = _readStream.readByte(); - if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || - m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) + if(_readHeader) // Read header if necessary. { - Ice.BadMagicException ex = new Ice.BadMagicException(); - ex.badMagic = m; - throw ex; + _readHeader = false; + + if(_observer != null) + { + _observer.receivedBytes(IceInternal.Protocol.headerSize); + } + + int pos = _readStream.pos(); + if(pos < IceInternal.Protocol.headerSize) + { + // + // This situation is possible for small UDP packets. + // + throw new Ice.IllegalMessageSizeException(); + } + + _readStream.pos(0); + byte[] m = new byte[4]; + m[0] = _readStream.readByte(); + m[1] = _readStream.readByte(); + m[2] = _readStream.readByte(); + m[3] = _readStream.readByte(); + if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || + m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) + { + Ice.BadMagicException ex = new Ice.BadMagicException(); + ex.badMagic = m; + throw ex; + } + + ProtocolVersion pv = new ProtocolVersion(); + pv.read__(_readStream); + IceInternal.Protocol.checkSupportedProtocol(pv); + EncodingVersion ev = new EncodingVersion(); + ev.read__(_readStream); + IceInternal.Protocol.checkSupportedProtocolEncoding(ev); + + _readStream.readByte(); // messageType + _readStream.readByte(); // compress + int size = _readStream.readInt(); + if(size < IceInternal.Protocol.headerSize) + { + throw new Ice.IllegalMessageSizeException(); + } + if(size > _instance.messageSizeMax()) + { + IceInternal.Ex.throwMemoryLimitException(size, _instance.messageSizeMax()); + } + if(size > _readStream.size()) + { + _readStream.resize(size, true); + } + _readStream.pos(pos); } - ProtocolVersion pv = new ProtocolVersion(); - pv.read__(_readStream); - IceInternal.Protocol.checkSupportedProtocol(pv); - EncodingVersion ev = new EncodingVersion(); - ev.read__(_readStream); - IceInternal.Protocol.checkSupportedProtocolEncoding(ev); - - _readStream.readByte(); // messageType - _readStream.readByte(); // compress - int size = _readStream.readInt(); - if(size < IceInternal.Protocol.headerSize) + if(buf.b.hasRemaining()) { - throw new Ice.IllegalMessageSizeException(); + if(_endpoint.datagram()) + { + throw new Ice.DatagramLimitException(); // The message was truncated. + } + continue; } - if(size > _instance.messageSizeMax()) + break; + } + + int newOp = readOp | writeOp; + readyOp &= ~newOp; + Debug.Assert(readyOp != 0 || newOp != 0); + + if(_state <= StateNotValidated) + { + if(newOp != 0) { - IceInternal.Ex.throwMemoryLimitException(size, _instance.messageSizeMax()); + // + // Wait for all the transceiver conditions to be + // satisfied before continuing. + // + scheduleTimeout(newOp); + _threadPool.update(this, current.operation, newOp); + return; } - if(size > _readStream.size()) + + if(_state == StateNotInitialized && !initialize(current.operation)) { - _readStream.resize(size, true); + return; } - _readStream.pos(pos); - } - if(buf.b.hasRemaining()) - { - if(_endpoint.datagram()) + if(_state <= StateNotValidated && !validate(current.operation)) { - throw new Ice.DatagramLimitException(); // The message was truncated. + return; } - continue; - } - break; - } - int newOp = readOp | writeOp; - readyOp &= ~newOp; - Debug.Assert(readyOp != 0 || newOp != 0); + _threadPool.unregister(this, current.operation); - if(_state <= StateNotValidated) - { - if(newOp != 0) - { // - // Wait for all the transceiver conditions to be - // satisfied before continuing. + // We start out in holding state. // - scheduleTimeout(newOp); - _threadPool.update(this, current.operation, newOp); - return; - } - - if(_state == StateNotInitialized && !initialize(current.operation)) - { - return; + setState(StateHolding); + if(_startCallback != null) + { + startCB = _startCallback; + _startCallback = null; + if(startCB != null) + { + ++dispatchCount; + } + } } - - if(_state <= StateNotValidated && !validate(current.operation)) + else { - return; - } + Debug.Assert(_state <= StateClosingPending); - _threadPool.unregister(this, current.operation); + // + // We parse messages first, if we receive a close + // connection message we won't send more messages. + // + if((readyOp & IceInternal.SocketOperation.Read) != 0) + { + newOp |= parseMessage(ref info); + dispatchCount += info.messageDispatchCount; + } - // - // We start out in holding state. - // - setState(StateHolding); - if(_startCallback != null) - { - startCB = _startCallback; - _startCallback = null; - if(startCB != null) + if((readyOp & IceInternal.SocketOperation.Write) != 0) { - ++dispatchCount; + newOp |= sendNextMessage(out sentCBs); + if(sentCBs != null) + { + ++dispatchCount; + } } - } - } - else - { - Debug.Assert(_state <= StateClosingPending); - // - // We parse messages first, if we receive a close - // connection message we won't send more messages. - // - if((readyOp & IceInternal.SocketOperation.Read) != 0) - { - newOp |= parseMessage(ref info); - dispatchCount += info.messageDispatchCount; - } + if(_state < StateClosed) + { + scheduleTimeout(newOp); + _threadPool.update(this, current.operation, newOp); + } - if((readyOp & IceInternal.SocketOperation.Write) != 0) - { - newOp |= sendNextMessage(out sentCBs); - if(sentCBs != null) + if(readyOp == 0) { - ++dispatchCount; + Debug.Assert(dispatchCount == 0); + return; } } - if(_state < StateClosed) + if(_acmLastActivity > 0) { - scheduleTimeout(newOp); - _threadPool.update(this, current.operation, newOp); + _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); } - if(readyOp == 0) + if(dispatchCount == 0) { - Debug.Assert(dispatchCount == 0); - return; + return; // Nothing to dispatch we're done! } - } - if(_acmLastActivity > 0) - { - _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); - } + _dispatchCount += dispatchCount; - if(dispatchCount == 0) - { - return; // Nothing to dispatch we're done! + msg.completed(ref current); } - - _dispatchCount += dispatchCount; - - msg.completed(ref current); - } - catch(DatagramLimitException) // Expected. - { - if(_warnUdp) + catch(DatagramLimitException) // Expected. { - _logger.warning("maximum datagram size of " + _readStream.pos() + " exceeded"); - } - _readStream.resize(IceInternal.Protocol.headerSize, true); - _readStream.pos(0); - _readHeader = true; - return; - } - catch(SocketException ex) - { - setState(StateClosed, ex); - return; - } - catch(LocalException ex) - { - if(_endpoint.datagram()) - { - if(_warn) + if(_warnUdp) { - String s = "datagram connection exception:\n" + ex + '\n' + _desc; - _logger.warning(s); + _logger.warning("maximum datagram size of " + _readStream.pos() + " exceeded"); } _readStream.resize(IceInternal.Protocol.headerSize, true); _readStream.pos(0); _readHeader = true; + return; } - else + catch(SocketException ex) { setState(StateClosed, ex); + return; + } + catch(LocalException ex) + { + if(_endpoint.datagram()) + { + if(_warn) + { + String s = "datagram connection exception:\n" + ex + '\n' + _desc; + _logger.warning(s); + } + _readStream.resize(IceInternal.Protocol.headerSize, true); + _readStream.pos(0); + _readHeader = true; + } + else + { + setState(StateClosed, ex); + } + return; } - return; - } - finally - { - msg.finishIOScope(ref current); - } - IceInternal.ThreadPoolCurrent c = current; - _threadPool.dispatch(() => - { - dispatch(startCB, sentCBs, info); - msg.destroy(ref c); - }, this); + IceInternal.ThreadPoolCurrent c = current; + _threadPool.dispatch(() => + { + dispatch(startCB, sentCBs, info); + msg.destroy(ref c); + }, this); + } } + finally + { + msg.finishIOScope(ref current); + } + } private void dispatch(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info) diff --git a/cs/src/Ice/ThreadPool.cs b/cs/src/Ice/ThreadPool.cs index 5b28c54a2b6..57af5f232f0 100644 --- a/cs/src/Ice/ThreadPool.cs +++ b/cs/src/Ice/ThreadPool.cs @@ -38,8 +38,10 @@ namespace IceInternal { if(_finishWithIO) { - // This must be called with the handler locked. - current.finishMessage(true); + lock(_mutex) + { + current.finishMessage(true); + } } } |