diff options
author | Mark Spruiell <mes@zeroc.com> | 2014-06-13 10:06:29 -0700 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2014-06-13 10:06:29 -0700 |
commit | 69c46db2fb9f03a4b730f8da1bd6e03d4ba68895 (patch) | |
tree | a755e1e123ac50a31729e51ebb375a92e86f1303 | |
parent | SSL Cipher test fix for OpenSSL (diff) | |
download | ice-69c46db2fb9f03a4b730f8da1bd6e03d4ba68895.tar.bz2 ice-69c46db2fb9f03a4b730f8da1bd6e03d4ba68895.tar.xz ice-69c46db2fb9f03a4b730f8da1bd6e03d4ba68895.zip |
initial commit of C# transport changes
49 files changed, 2479 insertions, 2406 deletions
diff --git a/cpp/src/Ice/IPEndpointI.cpp b/cpp/src/Ice/IPEndpointI.cpp index 7bf4a057c63..a94f05ce429 100644 --- a/cpp/src/Ice/IPEndpointI.cpp +++ b/cpp/src/Ice/IPEndpointI.cpp @@ -185,6 +185,7 @@ IceInternal::IPEndpointI::hash() const _hashValue = 5381; hashAdd(_hashValue, type()); hashInit(_hashValue); + _hashInitialized = true; } return _hashValue; } diff --git a/cs/src/Ice/Acceptor.cs b/cs/src/Ice/Acceptor.cs index 82552b9d81b..897a1ca7f19 100644 --- a/cs/src/Ice/Acceptor.cs +++ b/cs/src/Ice/Acceptor.cs @@ -20,6 +20,7 @@ namespace IceInternal bool startAccept(AsyncCallback callback, object state); void finishAccept(); Transceiver accept(); + string protocol(); string ToString(); } diff --git a/cs/src/Ice/ConnectionI.cs b/cs/src/Ice/ConnectionI.cs index 363656c0c18..065b4f6da32 100644 --- a/cs/src/Ice/ConnectionI.cs +++ b/cs/src/Ice/ConnectionI.cs @@ -227,7 +227,7 @@ namespace Ice public bool isFinished() { // - // We can use TryEnter here, because as long as there are still + // We can use TryLock here, because as long as there are still // threads operating in this connection object, connection // destruction is considered as not yet finished. // @@ -244,12 +244,12 @@ namespace Ice } Debug.Assert(_state == StateFinished); + return true; } finally { _m.Unlock(); } - return true; } public void throwException() @@ -301,7 +301,7 @@ namespace Ice _m.Wait(); } - Debug.Assert(_state == StateFinished && _dispatchCount == 0); + Debug.Assert(_state == StateFinished); // // Clear the OA. See bug 1673 for the details of why this is necessary. @@ -551,7 +551,7 @@ namespace Ice bool sent; try { - OutgoingMessage msg = new OutgoingMessage(og, og.ostr__, compress, requestId); + OutgoingMessage msg = new OutgoingMessage(og, os, compress, requestId); sent = sendMessage(msg); sentCallback = msg.sentCallback; } @@ -873,6 +873,9 @@ namespace Ice _batchStream.swap(@out.ostr()); + // + // Send the batch stream. + // bool sent = false; try { @@ -971,6 +974,10 @@ namespace Ice _m.Lock(); try { + if(_state > StateClosing) + { + return; + } _callback = callback; } finally @@ -1234,7 +1241,6 @@ namespace Ice { return; } - Debug.Assert(_state < StateClosing); _adapter = adapter; @@ -1305,7 +1311,7 @@ namespace Ice { if(_observer != null) { - observerStartWrite(_writeStream.pos()); + observerStartWrite(_writeStream.getBuffer()); } bool completed; @@ -1320,8 +1326,9 @@ namespace Ice { if(_observer != null && !_readHeader) { - observerStartRead(_readStream.pos()); + observerStartRead(_readStream.getBuffer()); } + completedSynchronously = _transceiver.startRead(_readStream.getBuffer(), cb, this); } } @@ -1342,7 +1349,7 @@ namespace Ice _transceiver.finishWrite(_writeStream.getBuffer()); if(_observer != null) { - observerFinishWrite(_writeStream.pos()); + observerFinishWrite(_writeStream.getBuffer()); } } else if((operation & IceInternal.SocketOperation.Read) != 0) @@ -1350,7 +1357,7 @@ namespace Ice _transceiver.finishRead(_readStream.getBuffer()); if(_observer != null && !_readHeader) { - observerFinishRead(_readStream.pos()); + observerFinishRead(_readStream.getBuffer()); } } } @@ -1381,36 +1388,48 @@ namespace Ice return; } + int readyOp = current.operation; try { unscheduleTimeout(current.operation); - if((current.operation & IceInternal.SocketOperation.Write) != 0 && !_writeStream.isEmpty()) + + int writeOp = IceInternal.SocketOperation.None; + int readOp = IceInternal.SocketOperation.None; + if((readyOp & IceInternal.SocketOperation.Write) != 0) { if(_observer != null) { - observerStartWrite(_writeStream.pos()); + observerStartWrite(_writeStream.getBuffer()); } - if(_writeStream.getBuffer().b.hasRemaining() && !_transceiver.write(_writeStream.getBuffer())) + writeOp = _transceiver.write(_writeStream.getBuffer()); + if(_observer != null && (writeOp & IceInternal.SocketOperation.Write) == 0) { - Debug.Assert(!_writeStream.isEmpty()); - scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout()); - return; + observerFinishWrite(_writeStream.getBuffer()); } - if(_observer != null) - { - observerFinishWrite(_writeStream.pos()); - } - Debug.Assert(!_writeStream.getBuffer().b.hasRemaining()); } - if((current.operation & IceInternal.SocketOperation.Read) != 0 && !_readStream.isEmpty()) + + while((readyOp & IceInternal.SocketOperation.Read) != 0) { + IceInternal.Buffer buf = _readStream.getBuffer(); + + if(_observer != null && !_readHeader) + { + observerStartRead(buf); + } + + readOp = _transceiver.read(buf, ref _hasMoreData); + if((readOp & IceInternal.SocketOperation.Read) != 0) + { + break; + } + if(_observer != null && !_readHeader) + { + Debug.Assert(!buf.b.hasRemaining()); + observerFinishRead(buf); + } + if(_readHeader) // Read header if necessary. { - if(_readStream.getBuffer().b.hasRemaining() && !_transceiver.read(_readStream.getBuffer())) - { - return; - } - Debug.Assert(!_readStream.getBuffer().b.hasRemaining()); _readHeader = false; if(_observer != null) @@ -1466,35 +1485,34 @@ namespace Ice _readStream.pos(pos); } - if(_readStream.getBuffer().b.hasRemaining()) + if(buf.b.hasRemaining()) { if(_endpoint.datagram()) { throw new Ice.DatagramLimitException(); // The message was truncated. } - else - { - if(_observer != null) - { - observerStartRead(_readStream.pos()); - } - if(!_transceiver.read(_readStream.getBuffer())) - { - Debug.Assert(!_readStream.isEmpty()); - scheduleTimeout(IceInternal.SocketOperation.Read, _endpoint.timeout()); - return; - } - if(_observer != null) - { - observerFinishRead(_readStream.pos()); - } - Debug.Assert(!_readStream.getBuffer().b.hasRemaining()); - } + continue; } + break; } + int newOp = readOp | writeOp; + readyOp &= ~newOp; + Debug.Assert(readyOp != 0 || newOp != 0); + if(_state <= StateNotValidated) { + if(newOp != 0) + { + // + // Wait for all the transceiver conditions to be + // satisfied before continuing. + // + scheduleTimeout(newOp); + _threadPool.update(this, current.operation, newOp); + return; + } + if(_state == StateNotInitialized && !initialize(current.operation)) { return; @@ -1515,44 +1533,52 @@ namespace Ice { startCB = _startCallback; _startCallback = null; - ++_dispatchCount; + if(startCB != null) + { + ++_dispatchCount; + } } } else { - Debug.Assert(_state <= StateClosing); + Debug.Assert(_state <= StateClosingPending); // // We parse messages first, if we receive a close // connection message we won't send more messages. // - if((current.operation & IceInternal.SocketOperation.Read) != 0) + if((readyOp & IceInternal.SocketOperation.Read) != 0) { - parseMessage(ref info); + newOp |= parseMessage(ref info); } - if((current.operation & IceInternal.SocketOperation.Write) != 0) + if((readyOp & IceInternal.SocketOperation.Write) != 0) { - sentCBs = sendNextMessage(); - if(sentCBs != null) + sentCBs = new Queue<OutgoingMessage>(); + newOp |= sendNextMessage(sentCBs); + if(sentCBs.Count > 0) { ++_dispatchCount; } + else + { + sentCBs = null; + } } - } - if(_acmLastActivity > 0) - { - _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); - } + if(_state < StateClosed) + { + scheduleTimeout(newOp); + _threadPool.update(this, current.operation, newOp); + } - if(startCB == null && sentCBs == null && info.invokeNum == 0 && info.outAsync == null && - info.heartbeatCallback == null) - { - return; // Nothing to dispatch. - } + if(readyOp == 0) + { + return; + } - msg.completed(ref current); + msg.completed(ref current); + } } catch(DatagramLimitException) // Expected. { @@ -1594,6 +1620,11 @@ namespace Ice msg.finishIOScope(ref current); } + if(_acmLastActivity > 0) + { + _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); + } + // // Unlike C++/Java, this method is called from an IO thread of the .NET thread // pool or from the communicator async IO thread. While it's fine to handle the @@ -1692,13 +1723,13 @@ namespace Ice ++count; } + // + // Method invocation (or multiple invocations for batch messages) + // must be done outside the thread synchronization, so that nested + // calls are possible. + // if(info.invokeNum > 0) { - // - // Method invocation (or multiple invocations for batch messages) - // must be done outside the thread synchronization, so that nested - // calls are possible. - // invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter); @@ -1725,7 +1756,7 @@ namespace Ice // or AMI callback was dispatched when the connection // was already in the closing state. // - if(_state == StateClosing && !_shutdownInitiated) + if(_state == StateClosing) { try { @@ -1768,7 +1799,7 @@ namespace Ice // to call code that will potentially block (this avoids promoting a new leader and // unecessary thread creation, especially if this is called on shutdown). // - if(_startCallback == null && _sendStreams.Count == 0 && _asyncRequests.Count == 0) + if(_startCallback == null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 && _callback == null) { finish(); return; @@ -1783,7 +1814,11 @@ namespace Ice _threadPool.execute( delegate() { - if(_dispatcher != null) + if(_dispatcher == null) + { + finish(); + } + else { try { @@ -1798,10 +1833,6 @@ namespace Ice } } } - else - { - finish(); - } }); } @@ -1823,21 +1854,19 @@ namespace Ice // OutgoingMessage message = _sendStreams.First.Value; _writeStream.swap(message.stream); - + // // The current message might be sent but not yet removed from _sendStreams. If // the response has been received in the meantime, we remove the message from // _sendStreams to not call finished on a message which is already done. // - if(message.requestId > 0 && - (message.@out != null && !_requests.ContainsKey(message.requestId) || - message.outAsync != null && !_asyncRequests.ContainsKey(message.requestId))) + if(message.replyOutAsync != null) { - if(message.sent(this)) + if(message.sent() && message.sentCallback != null) { - Debug.Assert(message.outAsync != null); message.outAsync.invokeSent__(message.sentCallback); } + message.replyOutAsync.finished__(); _sendStreams.RemoveFirst(); } } @@ -1922,7 +1951,7 @@ namespace Ice { setState(StateClosed, new TimeoutException()); } - else if(_state == StateClosing) + else if(_state < StateClosed) { setState(StateClosed, new CloseTimeoutException()); } @@ -1992,9 +2021,8 @@ namespace Ice if(invokeNum > 0) { - Debug.Assert(_dispatchCount > 0); + Debug.Assert(_dispatchCount >= invokeNum); _dispatchCount -= invokeNum; - Debug.Assert(_dispatchCount >= 0); if(_dispatchCount == 0) { if(_state == StateFinished) @@ -2023,23 +2051,21 @@ namespace Ice _communicator = communicator; _instance = instance; _monitor = monitor; - InitializationData initData = instance.initializationData(); _transceiver = transceiver; _desc = transceiver.ToString(); - _type = transceiver.type(); + _type = transceiver.protocol(); _connector = connector; _endpoint = endpoint; _adapter = adapter; + InitializationData initData = instance.initializationData(); _dispatcher = initData.dispatcher; // Cached for better performance. _logger = initData.logger; // Cached for better performance. _traceLevels = instance.traceLevels(); // Cached for better performance. _timer = instance.timer(); _writeTimeout = new TimeoutCallback(this); _writeTimeoutScheduled = false; - _writeStreamPos = -1; _readTimeout = new TimeoutCallback(this); _readTimeoutScheduled = false; - _readStreamPos = -1; _warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0; _warnUdp = initData.properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; _cacheBuffers = initData.properties.getPropertyAsIntWithDefault("Ice.CacheMessageBuffers", 1) == 1; @@ -2060,7 +2086,9 @@ namespace Ice _batchMarker = 0; _readStream = new IceInternal.BasicStream(instance, Util.currentProtocolEncoding); _readHeader = false; + _readStreamPos = -1; _writeStream = new IceInternal.BasicStream(instance, Util.currentProtocolEncoding); + _writeStreamPos = -1; _dispatchCount = 0; _state = StateNotInitialized; @@ -2107,8 +2135,9 @@ namespace Ice private const int StateActive = 2; private const int StateHolding = 3; private const int StateClosing = 4; - private const int StateClosed = 5; - private const int StateFinished = 6; + private const int StateClosingPending = 5; + private const int StateClosed = 6; + private const int StateFinished = 7; private void setState(int state, LocalException ex) { @@ -2125,6 +2154,11 @@ namespace Ice if(_exception == null) { + // + // If we are in closed state, an exception must be set. + // + Debug.Assert(_state != StateClosed); + _exception = ex; // @@ -2140,7 +2174,7 @@ namespace Ice _exception is ConnectionTimeoutException || _exception is CommunicatorDestroyedException || _exception is ObjectAdapterDeactivatedException || - (_exception is ConnectionLostException && _state == StateClosing))) + (_exception is ConnectionLostException && _state >= StateClosing))) { warning("connection exception", _exception); } @@ -2231,21 +2265,15 @@ namespace Ice } case StateClosing: + case StateClosingPending: { // - // Can't change back from closed. + // Can't change back from closing pending. // - if(_state >= StateClosed) + if(_state >= StateClosingPending) { return; } - if(_state == StateHolding) - { - // - // We need to continue to read in closing state. - // - _threadPool.register(this, IceInternal.SocketOperation.Read); - } break; } @@ -2255,6 +2283,7 @@ namespace Ice { return; } + _threadPool.finish(this); _transceiver.close(); break; @@ -2322,7 +2351,7 @@ namespace Ice _exception is ConnectionTimeoutException || _exception is CommunicatorDestroyedException || _exception is ObjectAdapterDeactivatedException || - (_exception is ConnectionLostException && _state == StateClosing))) + (_exception is ConnectionLostException && _state >= StateClosing))) { _observer.failed(_exception.ice_name()); } @@ -2349,15 +2378,17 @@ namespace Ice { Debug.Assert(_state == StateClosing); Debug.Assert(_dispatchCount == 0); - Debug.Assert(!_shutdownInitiated); + if(_shutdownInitiated) + { + return; + } _shutdownInitiated = true; if(!_endpoint.datagram()) { // - // Before we shut down, we send a close connection - // message. + // Before we shut down, we send a close connection message. // IceInternal.BasicStream os = new IceInternal.BasicStream(_instance, Util.currentProtocolEncoding); os.writeBlob(IceInternal.Protocol.magic); @@ -2369,28 +2400,22 @@ namespace Ice if(sendMessage(new OutgoingMessage(os, false, false))) { + setState(StateClosingPending); + // - // Schedule the close timeout to wait for the peer to close the connection. If - // the message was queued for sending, sendNextMessage will schedule the timeout - // once all messages were sent. + // Notify the the transceiver of the graceful connection closure. // - scheduleTimeout(IceInternal.SocketOperation.Write, closeTimeout()); + int op = _transceiver.closing(true, _exception); + if(op != 0) + { + scheduleTimeout(op); + _threadPool.register(this, op); + } } - - // - // The CloseConnection message should be sufficient. Closing the write - // end of the socket is probably an artifact of how things were done - // in IIOP. In fact, shutting down the write end of the socket causes - // problems on Windows by preventing the peer from using the socket. - // For example, the peer is no longer able to continue writing a large - // message after the socket is shutdown. - // - //_transceiver.shutdownWrite(); } } - private void - heartbeat() + private void heartbeat() { Debug.Assert(_state == StateActive); @@ -2418,10 +2443,10 @@ namespace Ice private bool initialize(int operation) { - int s = _transceiver.initialize(); + int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), ref _hasMoreData); if(s != IceInternal.SocketOperation.None) { - scheduleTimeout(s, connectTimeout()); + scheduleTimeout(s); _threadPool.update(this, operation, s); return false; } @@ -2431,6 +2456,7 @@ namespace Ice // _desc = _transceiver.ToString(); setState(StateNotValidated); + return true; } @@ -2454,17 +2480,23 @@ namespace Ice if(_observer != null) { - observerStartWrite(_writeStream.pos()); + observerStartWrite(_writeStream.getBuffer()); } - if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer())) + + if(_writeStream.pos() != _writeStream.size()) { - scheduleTimeout(IceInternal.SocketOperation.Write, connectTimeout()); - _threadPool.update(this, operation, IceInternal.SocketOperation.Write); - return false; + int op = _transceiver.write(_writeStream.getBuffer()); + if(op != 0) + { + scheduleTimeout(op); + _threadPool.update(this, operation, op); + return false; + } } + if(_observer != null) { - observerFinishWrite(_writeStream.pos()); + observerFinishWrite(_writeStream.getBuffer()); } } else // The client side has the passive role for connection validation. @@ -2477,17 +2509,23 @@ namespace Ice if(_observer != null) { - observerStartRead(_readStream.pos()); + observerStartRead(_readStream.getBuffer()); } - if(_readStream.pos() != _readStream.size() && !_transceiver.read(_readStream.getBuffer())) + + if(_readStream.pos() != _readStream.size()) { - scheduleTimeout(IceInternal.SocketOperation.Read, connectTimeout()); - _threadPool.update(this, operation, IceInternal.SocketOperation.Read); - return false; + int op = _transceiver.read(_readStream.getBuffer(), ref _hasMoreData); + if(op != 0) + { + scheduleTimeout(op); + _threadPool.update(this, operation, op); + return false; + } } + if(_observer != null) { - observerFinishRead(_readStream.pos()); + observerFinishRead(_readStream.getBuffer()); } Debug.Assert(_readStream.pos() == IceInternal.Protocol.headerSize); @@ -2504,6 +2542,7 @@ namespace Ice ProtocolVersion pv = new ProtocolVersion(); pv.read__(_readStream); IceInternal.Protocol.checkSupportedProtocol(pv); + EncodingVersion ev = new EncodingVersion(); ev.read__(_readStream); IceInternal.Protocol.checkSupportedProtocolEncoding(ev); @@ -2535,12 +2574,21 @@ namespace Ice return true; } - private Queue<OutgoingMessage> sendNextMessage() + private int sendNextMessage(Queue<OutgoingMessage> callbacks) { - Debug.Assert(_sendStreams.Count > 0); - Debug.Assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size()); + if(_sendStreams.Count == 0) + { + return IceInternal.SocketOperation.None; + } + else if(_state == StateClosingPending && _writeStream.pos() == 0) + { + // Message wasn't sent, empty the _writeStream, we're not going to send more data. + OutgoingMessage message = _sendStreams.First.Value; + _writeStream.swap(message.stream); + return IceInternal.SocketOperation.None; + } - Queue<OutgoingMessage> callbacks = null; + Debug.Assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size()); try { while(true) @@ -2550,14 +2598,8 @@ namespace Ice // OutgoingMessage message = _sendStreams.First.Value; _writeStream.swap(message.stream); - Debug.Assert(_writeStream.isEmpty()); - if(message.sent(this) || message.replyOutAsync != null) + if(message.sent()) { - Debug.Assert(message.outAsync != null); - if(callbacks == null) - { - callbacks = new Queue<OutgoingMessage>(); - } callbacks.Enqueue(message); } _sendStreams.RemoveFirst(); @@ -2571,17 +2613,17 @@ namespace Ice } // - // If we are in the closed state, don't continue sending. + // If we are in the closed state or if the close is + // pending, don't continue sending. // - // The connection can be in the closed state if parseMessage - // (called before sendNextMessage by message()) closes the - // connection. - // - if(_state >= StateClosed) + // This can occur if parseMessage (called before + // sendNextMessage by message()) closes the connection. + // + if(_state >= StateClosingPending) { - return callbacks; + return IceInternal.SocketOperation.None; } - + // // Otherwise, prepare the next message stream for writing. // @@ -2608,45 +2650,49 @@ namespace Ice // if(_observer != null) { - observerStartWrite(_writeStream.pos()); + observerStartWrite(_writeStream.getBuffer()); } - if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer())) + if(_writeStream.pos() != _writeStream.size()) { - Debug.Assert(!_writeStream.isEmpty()); - scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout()); - return callbacks; + int op = _transceiver.write(_writeStream.getBuffer()); + if(op != 0) + { + return op; + } } if(_observer != null) { - observerFinishWrite(_writeStream.pos()); + observerFinishWrite(_writeStream.getBuffer()); } } } catch(Ice.LocalException ex) { setState(StateClosed, ex); - return callbacks; + return IceInternal.SocketOperation.None; } - Debug.Assert(_writeStream.isEmpty()); - _threadPool.unregister(this, IceInternal.SocketOperation.Write); - // // If all the messages were sent and we are in the closing state, we schedule // the close timeout to wait for the peer to close the connection. // if(_state == StateClosing) { - scheduleTimeout(IceInternal.SocketOperation.Write, closeTimeout()); + setState(StateClosingPending); + int op = _transceiver.closing(true, _exception); + if(op != 0) + { + return op; + } } - return callbacks; + return IceInternal.SocketOperation.None; } - private bool sendMessage(OutgoingMessage message) { Debug.Assert(_state < StateClosed); + if(_sendStreams.Count > 0) { message.adopt(); @@ -2677,29 +2723,36 @@ namespace Ice IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels); } + // + // Send the message without blocking. + // if(_observer != null) { - observerStartWrite(message.stream.pos()); + observerStartWrite(message.stream.getBuffer()); } - if(_transceiver.write(message.stream.getBuffer())) + int op = _transceiver.write(message.stream.getBuffer()); + if(op == 0) { if(_observer != null) { - observerFinishWrite(message.stream.pos()); + observerFinishWrite(message.stream.getBuffer()); } - message.sent(this); + + message.sent(); + if(_acmLastActivity > 0) { _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); } return true; } + message.adopt(); _writeStream.swap(message.stream); _sendStreams.AddLast(message); - scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout()); - _threadPool.register(this, IceInternal.SocketOperation.Write); + scheduleTimeout(op); + _threadPool.register(this, op); return false; } @@ -2764,7 +2817,7 @@ namespace Ice public ConnectionCallback heartbeatCallback; } - private void parseMessage(ref MessageInfo info) + private int parseMessage(ref MessageInfo info) { Debug.Assert(_state > StateNotValidated && _state < StateClosed); @@ -2774,6 +2827,8 @@ namespace Ice _readStream.pos(0); _readHeader = true; + Debug.Assert(info.stream.pos() == info.stream.size()); + // // Connection is validated on first message. This is only used by // setState() to check wether or not we can print a connection @@ -2787,7 +2842,6 @@ namespace Ice // // The magic and version fields have already been checked. // - Debug.Assert(info.stream.pos() == info.stream.size()); info.stream.pos(8); byte messageType = info.stream.readByte(); info.compress = info.stream.readByte(); @@ -2820,14 +2874,24 @@ namespace Ice } else { - setState(StateClosed, new CloseConnectionException()); + setState(StateClosingPending, new CloseConnectionException()); + + // + // Notify the the transceiver of the graceful connection closure. + // + int op = _transceiver.closing(false, _exception); + if(op != 0) + { + return op; + } + setState(StateClosed); } break; } case IceInternal.Protocol.requestMsg: { - if(_state == StateClosing) + if(_state >= StateClosing) { IceInternal.TraceUtil.trace("received request during closing\n" + "(ignored by server, client will retry)", info.stream, _logger, @@ -2847,7 +2911,7 @@ namespace Ice case IceInternal.Protocol.requestBatchMsg: { - if(_state == StateClosing) + if(_state >= StateClosing) { IceInternal.TraceUtil.trace("received batch request during closing\n" + "(ignored by server, client will retry)", info.stream, _logger, @@ -2878,6 +2942,7 @@ namespace Ice { _requests.Remove(info.requestId); og.finished(info.stream); + _m.NotifyAll(); // Notify threads blocked in close(false) } else if(_asyncRequests.TryGetValue(info.requestId, out info.outAsync)) { @@ -2900,8 +2965,8 @@ namespace Ice { ++_dispatchCount; } + _m.NotifyAll(); // Notify threads blocked in close(false) } - _m.NotifyAll(); // Notify threads blocked in close(false) break; } @@ -2938,6 +3003,8 @@ namespace Ice setState(StateClosed, ex); } } + + return _state == StateHolding ? IceInternal.SocketOperation.None : IceInternal.SocketOperation.Read; } private void invokeAll(IceInternal.BasicStream stream, int invokeNum, int requestId, byte compress, @@ -2957,6 +3024,8 @@ namespace Ice // Prepare the invocation. // bool response = !_endpoint.datagram() && requestId != 0; + Debug.Assert(!response || invokeNum == 1); + inc = getIncoming(adapter, response, compress, requestId); // @@ -2985,8 +3054,42 @@ namespace Ice } } - private void scheduleTimeout(int status, int timeout) + private void scheduleTimeout(int status) { + int timeout; + if(_state < StateActive) + { + IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); + if(defaultsAndOverrides.overrideConnectTimeout) + { + timeout = defaultsAndOverrides.overrideConnectTimeoutValue; + } + else + { + timeout = _endpoint.timeout(); + } + } + else if(_state < StateClosingPending) + { + if(_readHeader) // No timeout for reading the header. + { + status &= ~IceInternal.SocketOperation.Read; + } + timeout = _endpoint.timeout(); + } + else + { + IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); + if(defaultsAndOverrides.overrideCloseTimeout) + { + timeout = defaultsAndOverrides.overrideCloseTimeoutValue; + } + else + { + timeout = _endpoint.timeout(); + } + } + if(timeout < 0) { return; @@ -2994,11 +3097,19 @@ namespace Ice if((status & IceInternal.SocketOperation.Read) != 0) { + if(_readTimeoutScheduled) + { + _timer.cancel(_readTimeout); + } _timer.schedule(_readTimeout, timeout); _readTimeoutScheduled = true; } if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0) { + if(_writeTimeoutScheduled) + { + _timer.cancel(_writeTimeout); + } _timer.schedule(_writeTimeout, timeout); _writeTimeoutScheduled = true; } @@ -3012,39 +3123,13 @@ namespace Ice _readTimeoutScheduled = false; } if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0 && - _writeTimeoutScheduled) + _writeTimeoutScheduled) { _timer.cancel(_writeTimeout); _writeTimeoutScheduled = false; } } - private int connectTimeout() - { - IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideConnectTimeout) - { - return defaultsAndOverrides.overrideConnectTimeoutValue; - } - else - { - return _endpoint.timeout(); - } - } - - private int closeTimeout() - { - IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideCloseTimeout) - { - return defaultsAndOverrides.overrideCloseTimeoutValue; - } - else - { - return _endpoint.timeout(); - } - } - private ConnectionInfo initConnectionInfo() { if(_info != null) @@ -3085,43 +3170,47 @@ namespace Ice _logger.warning(msg + ":\n" + ex + "\n" + _transceiver.ToString()); } - private void observerStartRead(int pos) + private void observerStartRead(IceInternal.Buffer buf) { if(_readStreamPos >= 0) { - _observer.receivedBytes(pos - _readStreamPos); + Debug.Assert(!buf.empty()); + _observer.receivedBytes(buf.b.position() - _readStreamPos); } - _readStreamPos = pos; + _readStreamPos = buf.empty() ? -1 : buf.b.position(); } - private void observerFinishRead(int pos) + private void observerFinishRead(IceInternal.Buffer buf) { if(_readStreamPos == -1) { return; } - Debug.Assert(pos >= _readStreamPos); - _observer.receivedBytes(pos - _readStreamPos); + Debug.Assert(buf.b.position() >= _readStreamPos); + _observer.receivedBytes(buf.b.position() - _readStreamPos); _readStreamPos = -1; } - private void observerStartWrite(int pos) + private void observerStartWrite(IceInternal.Buffer buf) { if(_writeStreamPos >= 0) { - _observer.sentBytes(pos - _writeStreamPos); + Debug.Assert(!buf.empty()); + _observer.sentBytes(buf.b.position() - _writeStreamPos); } - _writeStreamPos = pos; + _writeStreamPos = buf.empty() ? -1 : buf.b.position(); } - private void observerFinishWrite(int pos) + private void observerFinishWrite(IceInternal.Buffer buf) { if(_writeStreamPos == -1) { return; } - Debug.Assert(pos >= _writeStreamPos); - _observer.sentBytes(pos - _writeStreamPos); + if(buf.b.position() > _writeStreamPos) + { + _observer.sentBytes(buf.b.position() - _writeStreamPos); + } _writeStreamPos = -1; } @@ -3251,9 +3340,9 @@ namespace Ice internal void timedOut() { - Debug.Assert(this.@out != null || this.outAsync != null); - this.@out = null; - this.outAsync = null; + Debug.Assert((@out != null || outAsync != null) && !isSent); + @out = null; + outAsync = null; } internal void adopt() @@ -3268,7 +3357,7 @@ namespace Ice } } - internal bool sent(ConnectionI connection) + internal bool sent() { isSent = true; // The message is sent. @@ -3280,7 +3369,7 @@ namespace Ice else if(outAsync != null) { sentCallback = outAsync.sent__(); - return sentCallback != null; + return sentCallback != null || replyOutAsync != null; } else { @@ -3395,6 +3484,7 @@ namespace Ice ConnectionState.ConnectionStateActive, // StateActive ConnectionState.ConnectionStateHolding, // StateHolding ConnectionState.ConnectionStateClosing, // StateClosing + ConnectionState.ConnectionStateClosing, // StateClosingPending ConnectionState.ConnectionStateClosed, // StateClosed ConnectionState.ConnectionStateClosed, // StateFinished }; diff --git a/cs/src/Ice/EndpointFactory.cs b/cs/src/Ice/EndpointFactory.cs index 4de8bd7f76c..7f2712b850e 100644 --- a/cs/src/Ice/EndpointFactory.cs +++ b/cs/src/Ice/EndpointFactory.cs @@ -9,14 +9,17 @@ namespace IceInternal { + using System.Collections.Generic; public interface EndpointFactory { short type(); string protocol(); - EndpointI create(string str, bool oaEndpoint); + EndpointI create(List<string> args, bool oaEndpoint); EndpointI read(BasicStream s); void destroy(); + + EndpointFactory clone(ProtocolInstance instance); } } diff --git a/cs/src/Ice/EndpointFactoryManager.cs b/cs/src/Ice/EndpointFactoryManager.cs index 789515e91a7..d9f1c79bbeb 100644 --- a/cs/src/Ice/EndpointFactoryManager.cs +++ b/cs/src/Ice/EndpointFactoryManager.cs @@ -21,7 +21,7 @@ namespace IceInternal instance_ = instance; _factories = new List<EndpointFactory>(); } - + public void add(EndpointFactory factory) { lock(this) @@ -37,7 +37,7 @@ namespace IceInternal _factories.Add(factory); } } - + public EndpointFactory get(short type) { lock(this) @@ -53,104 +53,138 @@ namespace IceInternal return null; } } - + public EndpointI create(string str, bool oaEndpoint) { + string[] arr = IceUtilInternal.StringUtil.splitString(str, " \t\r\n"); + if(arr == null) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(); + e.str = "mismatched quote"; + throw e; + } + + if(arr.Length == 0) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(); + e.str = "value has no non-whitespace characters"; + throw e; + } + + List<string> v = new List<string>(arr); + string protocol = v[0]; + v.RemoveAt(0); + + if(protocol.Equals("default")) + { + protocol = instance_.defaultsAndOverrides().defaultProtocol; + } + + EndpointFactory factory = null; + lock(this) { - string s = str.Trim(); - if(s.Length == 0) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "value has no non-whitespace characters"; - throw e; - } - - Regex p = new Regex("([ \t\n\r]+)|$"); - Match m = p.Match(s); - Debug.Assert(m.Success); - - string protocol = s.Substring(0, m.Index); - - if(protocol.Equals("default")) - { - protocol = instance_.defaultsAndOverrides().defaultProtocol; - } - for(int i = 0; i < _factories.Count; i++) { - EndpointFactory f = (EndpointFactory)_factories[i]; + EndpointFactory f = _factories[i]; if(f.protocol().Equals(protocol)) { - return f.create(s.Substring(m.Index + m.Length), oaEndpoint); - - // Code below left in place for debugging. - - /* - EndpointI e = f.create(s.Substring(m.Index + m.Length), oaEndpoint); - BasicStream bs = new BasicStream(instance_, true); - e.streamWrite(bs); - Buffer buf = bs.getBuffer(); - buf.b.position(0); - short type = bs.readShort(); - EndpointI ue = new IceInternal.OpaqueEndpointI(type, bs); - System.Console.Error.WriteLine("Normal: " + e); - System.Console.Error.WriteLine("Opaque: " + ue); - return e; - */ + factory = f; } } + } - // - // If the stringified endpoint is opaque, create an unknown endpoint, - // then see whether the type matches one of the known endpoints. - // - if(protocol.Equals("opaque")) + if(factory != null) + { + EndpointI e = factory.create(v, oaEndpoint); + if(v.Count > 0) { - EndpointI ue = new OpaqueEndpointI(s.Substring(m.Index + m.Length)); - for(int i = 0; i < _factories.Count; i++) - { - EndpointFactory f = (EndpointFactory)_factories[i]; - if(f.type() == ue.type()) - { - // - // Make a temporary stream, write the opaque endpoint data into the stream, - // and ask the factory to read the endpoint data from that stream to create - // the actual endpoint. - // - BasicStream bs = new BasicStream(instance_, Ice.Util.currentProtocolEncoding, true); - ue.streamWrite(bs); - Buffer buf = bs.getBuffer(); - buf.b.position(0); - bs.readShort(); // type - return f.read(bs); - } - } - return ue; // Endpoint is opaque, but we don't have a factory for its type. + Ice.EndpointParseException ex = new Ice.EndpointParseException(); + ex.str = "unrecognized argument `" + v[0] + "' in endpoint `" + str + "'"; + throw ex; } - return null; + return e; + + // Code below left in place for debugging. + + /* + EndpointI e = f.create(s.Substring(m.Index + m.Length), oaEndpoint); + BasicStream bs = new BasicStream(instance_, true); + e.streamWrite(bs); + Buffer buf = bs.getBuffer(); + buf.b.position(0); + short type = bs.readShort(); + EndpointI ue = new IceInternal.OpaqueEndpointI(type, bs); + System.Console.Error.WriteLine("Normal: " + e); + System.Console.Error.WriteLine("Opaque: " + ue); + return e; + */ + } + + // + // If the stringified endpoint is opaque, create an unknown endpoint, + // then see whether the type matches one of the known endpoints. + // + if(protocol.Equals("opaque")) + { + EndpointI ue = new OpaqueEndpointI(v); + if(v.Count > 0) + { + Ice.EndpointParseException ex = new Ice.EndpointParseException(); + ex.str = "unrecognized argument `" + v[0] + "' in endpoint `" + str + "'"; + throw ex; + } + factory = get(ue.type()); + if(factory != null) + { + // + // Make a temporary stream, write the opaque endpoint data into the stream, + // and ask the factory to read the endpoint data from that stream to create + // the actual endpoint. + // + BasicStream bs = new BasicStream(instance_, Ice.Util.currentProtocolEncoding, true); + bs.writeShort(ue.type()); + ue.streamWrite(bs); + Buffer buf = bs.getBuffer(); + buf.b.position(0); + bs.readShort(); // type + bs.startReadEncaps(); + EndpointI e = factory.read(bs); + bs.endReadEncaps(); + return e; + } + return ue; // Endpoint is opaque, but we don't have a factory for its type. } + + return null; } - + public EndpointI read(BasicStream s) { lock(this) { short type = s.readShort(); - - for(int i = 0; i < _factories.Count; i++) + + EndpointFactory factory = get(type); + EndpointI e = null; + + s.startReadEncaps(); + + if(factory != null) { - EndpointFactory f = (EndpointFactory)_factories[i]; - if(f.type() == type) - { - return f.read(s); - } + e = factory.read(s); } - - return new OpaqueEndpointI(type, s); + else + { + e = new OpaqueEndpointI(type, s); + } + + s.endReadEncaps(); + + return e; } } - + internal void destroy() { for(int i = 0; i < _factories.Count; i++) @@ -160,7 +194,7 @@ namespace IceInternal } _factories.Clear(); } - + private readonly Instance instance_; private readonly List<EndpointFactory> _factories; } diff --git a/cs/src/Ice/EndpointHostResolver.cs b/cs/src/Ice/EndpointHostResolver.cs index e2750091303..5b2a14d1d5d 100644 --- a/cs/src/Ice/EndpointHostResolver.cs +++ b/cs/src/Ice/EndpointHostResolver.cs @@ -38,7 +38,7 @@ namespace IceInternal } - public List<Connector> resolve(string host, int port, Ice.EndpointSelectionType selType, EndpointI endpoint) + public List<Connector> resolve(string host, int port, Ice.EndpointSelectionType selType, IPEndpointI endpoint) { // // Try to get the addresses without DNS lookup. If this doesn't @@ -95,7 +95,7 @@ namespace IceInternal return connectors; } - public void resolve(string host, int port, Ice.EndpointSelectionType selType, EndpointI endpoint, + public void resolve(string host, int port, Ice.EndpointSelectionType selType, IPEndpointI endpoint, EndpointI_connectors callback) { // @@ -296,7 +296,7 @@ namespace IceInternal internal string host; internal int port; internal Ice.EndpointSelectionType selType; - internal EndpointI endpoint; + internal IPEndpointI endpoint; internal EndpointI_connectors callback; internal Ice.Instrumentation.Observer observer; } diff --git a/cs/src/Ice/EndpointI.cs b/cs/src/Ice/EndpointI.cs index 762438d48fd..a83bda53756 100644 --- a/cs/src/Ice/EndpointI.cs +++ b/cs/src/Ice/EndpointI.cs @@ -22,22 +22,24 @@ namespace IceInternal } public abstract class EndpointI : Ice.Endpoint, System.IComparable<EndpointI> - { - public EndpointI(string connectionId) - { - connectionId_ = connectionId; - } - - public EndpointI() + { + public override string ToString() { + return ice_toString_(); } - public override string ToString() + public virtual string ice_toString_() { - return ice_toString_(); + // + // WARNING: Certain features, such as proxy validation in Glacier2, + // depend on the format of proxy strings. Changes to toString() and + // methods called to generate parts of the reference string could break + // these features. Please review for all features that depend on the + // format of proxyToString() before changing this and related code. + // + return protocol() + options(); } - public abstract string ice_toString_(); public abstract Ice.EndpointInfo getInfo(); public override bool Equals(object obj) @@ -49,20 +51,9 @@ namespace IceInternal return CompareTo((EndpointI)obj) == 0; } - public override int GetHashCode() - { - int h = 5381; - IceInternal.HashUtil.hashAdd(ref h, connectionId_); - return h; - } - - public virtual int CompareTo(EndpointI p) + public override int GetHashCode() // Avoids a compiler warning. { - if(!connectionId_.Equals(p.connectionId_)) - { - return string.Compare(connectionId_, p.connectionId_, StringComparison.Ordinal); - } - + Debug.Assert(false); return 0; } @@ -95,6 +86,11 @@ namespace IceInternal public abstract EndpointI timeout(int t); // + // Return the connection ID. + // + public abstract string connectionId(); + + // // Return a new endpoint with a different connection id. // public abstract EndpointI connectionId(string connectionId); @@ -116,21 +112,13 @@ namespace IceInternal // Return true if the endpoint is datagram-based. // public abstract bool datagram(); - + // // Return true if the endpoint is secure. // public abstract bool secure(); // - // Return the connection ID. - // - public string connectionId() - { - return connectionId_; - } - - // // Return a server side transceiver for this endpoint, or null if a // transceiver can only be created by an acceptor. In case a // transceiver is created, this operation also returns a new @@ -161,21 +149,68 @@ namespace IceInternal // was specified on client side. // public abstract List<EndpointI> expand(); - + // // Check whether the endpoint is equivalent to another one. // public abstract bool equivalent(EndpointI endpoint); - public virtual List<Connector> connectors(List<EndPoint> addresses, NetworkProxy proxy) + public abstract int CompareTo(EndpointI obj); + + public abstract string options(); + + public virtual void initWithOptions(List<string> args) { - Debug.Assert(false); - return null; + List<string> unknown = new List<string>(); + + string str = "`" + protocol() + " "; + foreach(string p in args) + { + if(IceUtilInternal.StringUtil.findFirstOf(p, " \t\n\r") != -1) + { + str += " \"" + p + "\""; + } + else + { + str += " " + p; + } + } + str += "'"; + + for(int n = 0; n < args.Count; ++n) + { + string option = args[n]; + if(option.Length < 2 || option[0] != '-') + { + unknown.Add(option); + continue; + } + + string argument = null; + if(n + 1 < args.Count && args[n + 1][0] != '-') + { + argument = args[++n]; + } + + if(!checkOption(option, argument, str)) + { + unknown.Add(option); + if(argument != null) + { + unknown.Add(argument); + } + } + } + + args.Clear(); + args.AddRange(unknown); } - protected Ice.ProtocolVersion protocol_; - protected Ice.EncodingVersion encoding_; - protected string connectionId_ = ""; + protected virtual bool checkOption(string option, string argument, string endpoint) + { + // Must be overridden to check for options. + return false; + } } } diff --git a/cs/src/Ice/EventHandler.cs b/cs/src/Ice/EventHandler.cs index eeb2264f4bd..5ef8f6bffdd 100644 --- a/cs/src/Ice/EventHandler.cs +++ b/cs/src/Ice/EventHandler.cs @@ -36,6 +36,7 @@ public abstract class EventHandler internal int _started = 0; internal bool _finish = false; + internal bool _hasMoreData = false; internal int _registered = 0; } diff --git a/cs/src/Ice/IPEndpointI.cs b/cs/src/Ice/IPEndpointI.cs new file mode 100644 index 00000000000..d0857da9068 --- /dev/null +++ b/cs/src/Ice/IPEndpointI.cs @@ -0,0 +1,366 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +namespace IceInternal +{ + + using System.Collections.Generic; + using System.Diagnostics; + using System.Globalization; + using System.Net; + using System; + + public abstract class IPEndpointI : EndpointI + { + public IPEndpointI(ProtocolInstance instance, string host, int port, string connectionId) + { + instance_ = instance; + host_ = host; + port_ = port; + connectionId_ = connectionId; + _hashInitialized = false; + } + + public IPEndpointI(ProtocolInstance instance) + { + instance_ = instance; + host_ = null; + port_ = 0; + connectionId_ = ""; + _hashInitialized = false; + } + + public IPEndpointI(ProtocolInstance instance, BasicStream s) + { + instance_ = instance; + host_ = s.readString(); + port_ = s.readInt(); + connectionId_ = ""; + _hashInitialized = false; + } + + private sealed class InfoI : Ice.IPEndpointInfo + { + public InfoI(IPEndpointI e) + { + _endpoint = e; + } + + override public short type() + { + return _endpoint.type(); + } + + override public bool datagram() + { + return _endpoint.datagram();; + } + + override public bool secure() + { + return _endpoint.secure(); + } + + private IPEndpointI _endpoint; + } + + public override Ice.EndpointInfo getInfo() + { + InfoI info = new InfoI(this); + fillEndpointInfo(info); + return info; + } + + public override void streamWrite(BasicStream s) + { + s.startWriteEncaps(); + streamWriteImpl(s); + s.endWriteEncaps(); + } + + public override short type() + { + return instance_.type(); + } + + public override string protocol() + { + return instance_.protocol(); + } + + public override string connectionId() + { + return connectionId_; + } + + public override EndpointI connectionId(string connectionId) + { + if(connectionId.Equals(connectionId_)) + { + return this; + } + else + { + return createEndpoint(host_, port_, connectionId); + } + } + + public override List<Connector> connectors(Ice.EndpointSelectionType selType) + { +#if SILVERLIGHT + return connectors(Network.getAddresses(host_, + port_, + instance_.protocolSupport(), + selType, + instance_.preferIPv6(), + false), + instance_.networkProxy()); +#else + return instance_.resolve(host_, port_, selType, this); +#endif + } + + public override void connectors_async(Ice.EndpointSelectionType selType, EndpointI_connectors callback) + { +#if SILVERLIGHT + callback.connectors(connectors(selType)); +#else + instance_.resolve(host_, port_, selType, this, callback); +#endif + } + + public override List<EndpointI> expand() + { + List<EndpointI> endps = new List<EndpointI>(); + List<string> hosts = Network.getHostsForEndpointExpand(host_, instance_.protocolSupport(), false); + if(hosts == null || hosts.Count == 0) + { + endps.Add(this); + } + else + { + foreach(string h in hosts) + { + endps.Add(createEndpoint(h, port_, connectionId_)); + } + } + return endps; + } + + public override bool equivalent(EndpointI endpoint) + { + if(!(endpoint is IPEndpointI)) + { + return false; + } + IPEndpointI ipEndpointI = (IPEndpointI)endpoint; + return ipEndpointI.type() == type() && ipEndpointI.host_.Equals(host_) && ipEndpointI.port_ == port_; + } + + public virtual List<Connector> connectors(List<EndPoint> addresses, NetworkProxy proxy) + { + List<Connector> connectors = new List<Connector>(); + foreach(EndPoint p in addresses) + { + connectors.Add(createConnector(p, proxy)); + } + return connectors; + } + + public override string options() + { + // + // WARNING: Certain features, such as proxy validation in Glacier2, + // depend on the format of proxy strings. Changes to toString() and + // methods called to generate parts of the reference string could break + // these features. Please review for all features that depend on the + // format of proxyToString() before changing this and related code. + // + string s = ""; + + if(host_ != null && host_.Length > 0) + { + s += " -h "; + bool addQuote = host_.IndexOf(':') != -1; + if(addQuote) + { + s += "\""; + } + s += host_; + if(addQuote) + { + s += "\""; + } + } + + s += " -p " + port_; + + return s; + } + + public override int GetHashCode() + { + if(!_hashInitialized) + { + _hashValue = 5381; + HashUtil.hashAdd(ref _hashValue, type()); + hashInit(ref _hashValue); + _hashInitialized = true; + } + return _hashValue; + } + + public override int CompareTo(EndpointI obj) + { + if(!(obj is IPEndpointI)) + { + return type() < obj.type() ? -1 : 1; + } + + IPEndpointI p = (IPEndpointI)obj; + if(this == p) + { + return 0; + } + + int v = string.Compare(host_, p.host_, StringComparison.Ordinal); + if(v != 0) + { + return v; + } + + if(port_ < p.port_) + { + return -1; + } + else if(p.port_ < port_) + { + return 1; + } + + return string.Compare(connectionId_, p.connectionId_, StringComparison.Ordinal); + } + + public string host() + { + return host_; + } + + public int port() + { + return port_; + } + + protected virtual void streamWriteImpl(BasicStream s) + { + s.writeString(host_); + s.writeInt(port_); + } + + protected virtual void hashInit(ref int h) + { + HashUtil.hashAdd(ref h, host_); + HashUtil.hashAdd(ref h, port_); + HashUtil.hashAdd(ref h, connectionId_); + } + + protected virtual void fillEndpointInfo(Ice.IPEndpointInfo info) + { + info.host = host_; + info.port = port_; + } + + public void initWithOptions(List<string> args, bool oaEndpoint) + { + base.initWithOptions(args); + + if(host_ == null || host_.Length == 0) + { + host_ = instance_.defaultHost(); + } + else if(host_.Equals("*")) + { + if(oaEndpoint) + { + host_ = ""; + } + else + { + throw new Ice.EndpointParseException("`-h *' not valid for proxy endpoint `" + ToString() + "'"); + } + } + + if(host_ == null) + { + host_ = ""; + } + } + + protected override bool checkOption(string option, string argument, string endpoint) + { + switch(option[1]) + { + case 'h': + { + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for -h option in endpoint " + + endpoint); + } + host_ = argument; + return true; + } + + case 'p': + { + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for -p option in endpoint " + + endpoint); + } + + try + { + port_ = System.Int32.Parse(argument, CultureInfo.InvariantCulture); + } + catch(System.FormatException ex) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(ex); + e.str = "invalid port value `" + argument + "' in endpoint " + endpoint; + throw e; + } + + if(port_ < 0 || port_ > 65535) + { + throw new Ice.EndpointParseException("port value `" + argument + + "' out of range in endpoint " + endpoint); + } + + return true; + } + + default: + { + return false; + } + } + } + + protected abstract Connector createConnector(EndPoint addr, NetworkProxy proxy); + protected abstract IPEndpointI createEndpoint(string host, int port, string connectionId); + + protected ProtocolInstance instance_; + protected string host_; + protected int port_; + protected string connectionId_; + private bool _hashInitialized; + private int _hashValue; + } + +} diff --git a/cs/src/Ice/Instance.cs b/cs/src/Ice/Instance.cs index 222941ae09d..abcc4cab4e3 100644 --- a/cs/src/Ice/Instance.cs +++ b/cs/src/Ice/Instance.cs @@ -829,9 +829,13 @@ namespace IceInternal } _endpointFactoryManager = new EndpointFactoryManager(this); - EndpointFactory tcpEndpointFactory = new TcpEndpointFactory(this); + ProtocolInstance tcpProtocolInstance = + new ProtocolInstance(this, Ice.TCPEndpointType.value, "tcp"); + EndpointFactory tcpEndpointFactory = new TcpEndpointFactory(tcpProtocolInstance); _endpointFactoryManager.add(tcpEndpointFactory); - EndpointFactory udpEndpointFactory = new UdpEndpointFactory(this); + ProtocolInstance udpProtocolInstance = + new ProtocolInstance(this, Ice.UDPEndpointType.value, "udp"); + EndpointFactory udpEndpointFactory = new UdpEndpointFactory(udpProtocolInstance); _endpointFactoryManager.add(udpEndpointFactory); #if !SILVERLIGHT diff --git a/cs/src/Ice/Makefile b/cs/src/Ice/Makefile index b6b312eed25..a943404d4bd 100644 --- a/cs/src/Ice/Makefile +++ b/cs/src/Ice/Makefile @@ -51,6 +51,7 @@ SRCS = Acceptor.cs \ Incoming.cs \ Instance.cs \ InstrumentationI.cs \ + IPEndpointI.cs \ LocatorInfo.cs \ LoggerI.cs \ LoggerPlugin.cs \ @@ -77,6 +78,7 @@ SRCS = Acceptor.cs \ Property.cs \ PropertyNames.cs \ Protocol.cs \ + ProtocolInstance.cs \ ProtocolPluginFacade.cs \ Proxy.cs \ ProxyFactory.cs \ diff --git a/cs/src/Ice/Makefile.mak b/cs/src/Ice/Makefile.mak index e4685bd1b0f..9108c1ed643 100644 --- a/cs/src/Ice/Makefile.mak +++ b/cs/src/Ice/Makefile.mak @@ -52,6 +52,7 @@ SRCS = Acceptor.cs \ Incoming.cs \ Instance.cs \ InstrumentationI.cs \ + IPEndpointI.cs \ LocatorInfo.cs \ LoggerI.cs \ LoggerPlugin.cs \ @@ -78,6 +79,7 @@ SRCS = Acceptor.cs \ Property.cs \ PropertyNames.cs \ Protocol.cs \ + ProtocolInstance.cs \ ProtocolPluginFacade.cs \ Proxy.cs \ ProxyFactory.cs \ diff --git a/cs/src/Ice/OpaqueEndpointI.cs b/cs/src/Ice/OpaqueEndpointI.cs index f1666e75bd9..0dd57e0cac8 100644 --- a/cs/src/Ice/OpaqueEndpointI.cs +++ b/cs/src/Ice/OpaqueEndpointI.cs @@ -16,162 +16,47 @@ namespace IceInternal sealed class OpaqueEndpointI : EndpointI { - public OpaqueEndpointI(string str) : base("") + public OpaqueEndpointI(List<string> args) { + _type = -1; _rawEncoding = Ice.Util.Encoding_1_0; + _rawBytes = new byte[0]; - int topt = 0; - int vopt = 0; + initWithOptions(args); - char[] separators = { ' ', '\t', '\n', '\r' }; - string[] arr = str.Split(separators); - - int i = 0; - while(i < arr.Length) + if(_type < 0) { - if(arr[i].Length == 0) - { - i++; - continue; - } - - string option = arr[i++]; - if(option.Length != 2 || option[0] != '-') - { - throw new Ice.EndpointParseException("expected an endpoint option but found `" + option + - "' in endpoint `opaque " + str + "'"); - } - - string argument = null; - if(i < arr.Length && arr[i][0] != '-') - { - argument = arr[i++]; - } - - switch(option[1]) - { - case 't': - { - if(argument == null) - { - throw new Ice.EndpointParseException( - "no argument provided for -t option in endpoint `opaque " + str + "'"); - } - - int t; - try - { - t = System.Int32.Parse(argument, CultureInfo.InvariantCulture); - } - catch(System.FormatException) - { - throw new Ice.EndpointParseException("invalid timeout value `" + argument + - "' in endpoint `opaque " + str + "'"); - } - - if(t < 0 || t > 65535) - { - throw new Ice.EndpointParseException("timeout value `" + argument + - "' out of range in endpoint `opaque " + str + "'"); - } - - _type = (short)t; - ++topt; - if(topt > 1) - { - throw new Ice.EndpointParseException("multiple -t options in endpoint `opaque " + str + - "'"); - } - break; - } - - case 'e': - { - if(argument == null) - { - throw new Ice.EndpointParseException( - "no argument provided for -e option in endpoint `opaque " + str + "'"); - } - - try - { - _rawEncoding = Ice.Util.stringToEncodingVersion(argument); - } - catch(Ice.VersionParseException e) - { - throw new Ice.EndpointParseException("invalid encoding version `" + argument + - "' in endpoint `opaque " + str + "':\n" + e.str); - } - break; - } - - case 'v': - { - if(argument == null) - { - throw new Ice.EndpointParseException( - "no argument provided for -v option in endpoint `opaque " + str + "'"); - } - for(int j = 0; j < argument.Length; ++j) - { - if(!IceUtilInternal.Base64.isBase64(argument[j])) - { - throw new Ice.EndpointParseException( - "invalid base64 character `" + argument[j] + "' (ordinal " + - ((int)argument[j]) + ") in endpoint `opaque " + str + "'"); - } - } - _rawBytes = IceUtilInternal.Base64.decode(argument); - ++vopt; - if(vopt > 1) - { - throw new Ice.EndpointParseException("multiple -v options in endpoint `opaque " + str + - "'"); - } - break; - } - - default: - { - throw new Ice.EndpointParseException("invalid option `" + option + "' in endpoint `opaque " + - str + "'"); - } - } - } - - if(topt != 1) - { - throw new Ice.EndpointParseException("no -t option in endpoint `opaque " + str + "'"); + throw new Ice.EndpointParseException("no -t option in endpoint " + ToString()); } - if(vopt != 1) + if(_rawBytes.Length == 0) { - throw new Ice.EndpointParseException("no -v option in endpoint `opaque " + str + "'"); + throw new Ice.EndpointParseException("no -v option in endpoint " + ToString()); } + calcHashValue(); } public OpaqueEndpointI(short type, BasicStream s) { _type = type; - _rawEncoding = s.startReadEncaps(); + _rawEncoding = s.getReadEncoding(); int sz = s.getReadEncapsSize(); _rawBytes = new byte[sz]; s.readBlob(_rawBytes); - s.endReadEncaps(); + calcHashValue(); } - + // // Marshal the endpoint // public override void streamWrite(BasicStream s) { - s.writeShort(_type); s.startWriteEncaps(_rawEncoding, Ice.FormatType.DefaultFormat); s.writeBlob(_rawBytes); s.endWriteEncaps(); } - + // // Convert the endpoint to its string form // @@ -183,9 +68,9 @@ namespace IceInternal private sealed class InfoI : Ice.OpaqueEndpointInfo { - public InfoI(short type, Ice.EncodingVersion rawEncoding, byte[] rawBytes) : + public InfoI(short type, Ice.EncodingVersion rawEncoding, byte[] rawBytes) : base(-1, false, rawEncoding, rawBytes) - { + { _type = type; } @@ -193,19 +78,19 @@ namespace IceInternal { return _type; } - + override public bool datagram() { return false; } - + override public bool secure() { return false; } private readonly short _type; - }; + } // // Return the endpoint information. @@ -222,7 +107,7 @@ namespace IceInternal { return _type; } - + // // Return the protocol name; // @@ -239,7 +124,7 @@ namespace IceInternal { return -1; } - + // // Return a new endpoint with a different timeout value, provided // that timeouts are supported by the endpoint. Otherwise the same @@ -250,6 +135,11 @@ namespace IceInternal return this; } + public override string connectionId() + { + return ""; + } + // // Return a new endpoint with a different connection id. // @@ -257,7 +147,7 @@ namespace IceInternal { return this; } - + // // Return true if the endpoints support bzip2 compress, or false // otherwise. @@ -266,7 +156,7 @@ namespace IceInternal { return false; } - + // // Return a new endpoint with a different compression value, // provided that compression is supported by the @@ -276,7 +166,7 @@ namespace IceInternal { return this; } - + // // Return true if the endpoint is datagram-based. // @@ -284,7 +174,7 @@ namespace IceInternal { return false; } - + // // Return true if the endpoint is secure. // @@ -294,6 +184,14 @@ namespace IceInternal } // + // Get the encoded endpoint. + // + public byte[] rawBytes() + { + return _rawBytes; + } + + // // Return a server side transceiver for this endpoint, or null if a // transceiver can only be created by an acceptor. In case a // transceiver is created, this operation also returns a new @@ -338,14 +236,13 @@ namespace IceInternal // host if listening on INADDR_ANY on server side or if no host // was specified on client side. // - public override List<EndpointI> - expand() + public override List<EndpointI> expand() { List<EndpointI> endps = new List<EndpointI>(); endps.Add(this); return endps; } - + // // Check whether the endpoint is equivalent to another one. // @@ -358,7 +255,22 @@ namespace IceInternal { return _hashCode; } - + + public override string options() + { + string s = ""; + if(_type > -1) + { + s += " -t " + _type; + } + s += " -e " + Ice.Util.encodingVersionToString(_rawEncoding); + if(_rawBytes.Length > 0) + { + s += " -v " + IceUtilInternal.Base64.encode(_rawBytes); + } + return s; + } + // // Compare endpoints for sorting purposes // @@ -383,7 +295,7 @@ namespace IceInternal { return 1; } - + if(_rawEncoding.major < p._rawEncoding.major) { return -1; @@ -421,10 +333,96 @@ namespace IceInternal return 1; } } - + return 0; } - + + protected override bool checkOption(string option, string argument, string endpoint) + { + switch(option[1]) + { + case 't': + { + if(_type > -1) + { + throw new Ice.EndpointParseException("multiple -t options in endpoint " + endpoint); + } + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for -t option in endpoint " + endpoint); + } + + int t; + try + { + t = System.Int32.Parse(argument, CultureInfo.InvariantCulture); + } + catch(System.FormatException) + { + throw new Ice.EndpointParseException("invalid type value `" + argument + "' in endpoint " + + endpoint); + } + + if(t < 0 || t > 65535) + { + throw new Ice.EndpointParseException("type value `" + argument + "' out of range in endpoint " + + endpoint); + } + + _type = (short)t; + return true; + } + + case 'v': + { + if(_rawBytes.Length > 0) + { + throw new Ice.EndpointParseException("multiple -v options in endpoint " + endpoint); + } + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for -v option in endpoint " + endpoint); + } + + for(int j = 0; j < argument.Length; ++j) + { + if(!IceUtilInternal.Base64.isBase64(argument[j])) + { + throw new Ice.EndpointParseException("invalid base64 character `" + argument[j] + + "' (ordinal " + ((int)argument[j]) + + ") in endpoint " + endpoint); + } + } + _rawBytes = IceUtilInternal.Base64.decode(argument); + return true; + } + + case 'e': + { + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for -e option in endpoint " + endpoint); + } + + try + { + _rawEncoding = Ice.Util.stringToEncodingVersion(argument); + } + catch(Ice.VersionParseException e) + { + throw new Ice.EndpointParseException("invalid encoding version `" + argument + + "' in endpoint " + endpoint + ":\n" + e.str); + } + return true; + } + + default: + { + return false; + } + } + } + private void calcHashValue() { int h = 5381; @@ -433,7 +431,7 @@ namespace IceInternal IceInternal.HashUtil.hashAdd(ref h, _rawBytes); _hashCode = h; } - + private short _type; private Ice.EncodingVersion _rawEncoding; private byte[] _rawBytes; diff --git a/cs/src/Ice/ProtocolInstance.cs b/cs/src/Ice/ProtocolInstance.cs new file mode 100644 index 00000000000..cc66138ee54 --- /dev/null +++ b/cs/src/Ice/ProtocolInstance.cs @@ -0,0 +1,119 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +namespace IceInternal +{ + + using System.Collections.Generic; + + public class ProtocolInstance + { + public ProtocolInstance(Ice.Communicator communicator, short type, string protocol) + { + instance_ = Util.getInstance(communicator); + traceLevel_ = instance_.traceLevels().network; + traceCategory_ = instance_.traceLevels().networkCat; + logger_ = instance_.initializationData().logger; + properties_ = instance_.initializationData().properties; + type_ = type; + protocol_ = protocol; + } + + public ProtocolInstance(Instance instance, short type, string protocol) + { + instance_ = instance; + traceLevel_ = instance_.traceLevels().network; + traceCategory_ = instance_.traceLevels().networkCat; + logger_ = instance_.initializationData().logger; + properties_ = instance_.initializationData().properties; + type_ = type; + protocol_ = protocol; + } + + public int traceLevel() + { + return traceLevel_; + } + + public string traceCategory() + { + return traceCategory_; + } + + public Ice.Logger logger() + { + return logger_; + } + + public string protocol() + { + return protocol_; + } + + public short type() + { + return type_; + } + + public Ice.Properties properties() + { + return properties_; + } + + public bool preferIPv6() + { + return instance_.preferIPv6(); + } + + public int protocolSupport() + { + return instance_.protocolSupport(); + } + + public string defaultHost() + { + return instance_.defaultsAndOverrides().defaultHost; + } + + public Ice.EncodingVersion defaultEncoding() + { + return instance_.defaultsAndOverrides().defaultEncoding; + } + + public NetworkProxy networkProxy() + { + return instance_.networkProxy(); + } + + public int messageSizeMax() + { + return instance_.messageSizeMax(); + } + + public List<Connector> resolve(string host, int port, Ice.EndpointSelectionType type, IPEndpointI endpt) + { + return instance_.endpointHostResolver().resolve(host, port, type, endpt); + } + + public void resolve(string host, int port, Ice.EndpointSelectionType type, IPEndpointI endpt, + EndpointI_connectors callback) + { + instance_.endpointHostResolver().resolve(host, port, type, endpt, callback); + } + + protected Instance instance_; + protected int traceLevel_; + protected string traceCategory_; + protected Ice.Logger logger_; + protected Ice.Properties properties_; + protected string protocol_; + protected short type_; + } + +} diff --git a/cs/src/Ice/ProtocolPluginFacade.cs b/cs/src/Ice/ProtocolPluginFacade.cs index 6a990b26fcd..9736239d66d 100644 --- a/cs/src/Ice/ProtocolPluginFacade.cs +++ b/cs/src/Ice/ProtocolPluginFacade.cs @@ -17,43 +17,6 @@ namespace IceInternal // Ice.Communicator getCommunicator(); -#if !SILVERLIGHT - // - // Get the endpoint host resolver. - // - IceInternal.EndpointHostResolver getEndpointHostResolver(); -#endif - // - // Get the protocol support. - // - int getProtocolSupport(); - - // - // Get the protocol support. - // - bool getPreferIPv6(); - - // - // Get the network proxy. - // - NetworkProxy getNetworkProxy(); - - // - // Get the default encoding to be used in endpoints. - // - Ice.EncodingVersion getDefaultEncoding(); - - // - // Get the default hostname to be used in endpoints. - // - string getDefaultHost(); - - // - // Get the network trace level and category name. - // - int getNetworkTraceLevel(); - string getNetworkTraceCategory(); - // // Register an EndpointFactory. // @@ -87,69 +50,6 @@ namespace IceInternal return _communicator; } -#if !SILVERLIGHT - // - // Get the endpoint host resolver. - // - public IceInternal.EndpointHostResolver getEndpointHostResolver() - { - return _instance.endpointHostResolver(); - } -#endif - - // - // Get the protocol support. - // - public int getProtocolSupport() - { - return _instance.protocolSupport(); - } - - // - // Get the protocol support. - // - public bool getPreferIPv6() - { - return _instance.preferIPv6(); - } - - // - // Get the network proxy. - // - public NetworkProxy getNetworkProxy() - { - return _instance.networkProxy(); - } - - // - // Get the default hostname to be used in endpoints. - // - public Ice.EncodingVersion getDefaultEncoding() - { - return _instance.defaultsAndOverrides().defaultEncoding; - } - - // - // Get the default hostname to be used in endpoints. - // - public string getDefaultHost() - { - return _instance.defaultsAndOverrides().defaultHost; - } - - // - // Get the network trace level and category name. - // - public int getNetworkTraceLevel() - { - return _instance.traceLevels().network; - } - - public string getNetworkTraceCategory() - { - return _instance.traceLevels().networkCat; - } - // // Register an EndpointFactory. // diff --git a/cs/src/Ice/Reference.cs b/cs/src/Ice/Reference.cs index 085c55ccabb..0f427cec53a 100644 --- a/cs/src/Ice/Reference.cs +++ b/cs/src/Ice/Reference.cs @@ -1017,9 +1017,10 @@ namespace IceInternal if(_endpoints.Length > 0) { Debug.Assert(_adapterId.Length == 0); - for(int i = 0; i < _endpoints.Length; i++) + foreach(EndpointI endpoint in _endpoints) { - _endpoints[i].streamWrite(s); + s.writeShort(endpoint.type()); + endpoint.streamWrite(s); } } else diff --git a/cs/src/Ice/TcpAcceptor.cs b/cs/src/Ice/TcpAcceptor.cs index 633969bea86..833a2257d29 100644 --- a/cs/src/Ice/TcpAcceptor.cs +++ b/cs/src/Ice/TcpAcceptor.cs @@ -23,14 +23,13 @@ namespace IceInternal { public virtual void close() { - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { - string s = "stopping to accept tcp connections at " + ToString(); - _logger.trace(_traceLevels.networkCat, s); + string s = "stopping to accept " + protocol() + " connections at " + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } Debug.Assert(_acceptFd == null); - _fd.Close(); _fd = null; } @@ -39,19 +38,19 @@ namespace IceInternal { Network.doListen(_fd, _backlog); - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { - StringBuilder s = new StringBuilder("listening for tcp connections at "); + StringBuilder s = new StringBuilder("listening for " + protocol() + " connections at "); s.Append(ToString()); List<string> interfaces = - Network.getHostsForEndpointExpand(_addr.Address.ToString(), instance_.protocolSupport(), true); + Network.getHostsForEndpointExpand(_addr.Address.ToString(), _instance.protocolSupport(), true); if(interfaces.Count != 0) { s.Append("\nlocal interfaces: "); s.Append(String.Join(", ", interfaces.ToArray())); } - _logger.trace(_traceLevels.networkCat, s.ToString()); + _instance.logger().trace(_instance.traceCategory(), s.ToString()); } } @@ -99,19 +98,24 @@ namespace IceInternal Network.setBlock(_acceptFd, false); # if !COMPACT - Network.setTcpBufSize(_acceptFd, instance_.initializationData().properties, _logger); + Network.setTcpBufSize(_acceptFd, _instance.properties(), _instance.logger()); # endif - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { - string s = "accepted tcp connection\n" + Network.fdToString(_acceptFd); - _logger.trace(_traceLevels.networkCat, s); + string s = "accepted " + protocol() + " connection\n" + Network.fdToString(_acceptFd); + _instance.logger().trace(_instance.traceCategory(), s); } Socket acceptFd = _acceptFd; _acceptFd = null; _acceptError = null; - return new TcpTransceiver(instance_, acceptFd, null, null, true); + return new TcpTransceiver(_instance, acceptFd, null, null, true); + } + + public string protocol() + { + return _instance.protocol(); } public override string ToString() @@ -124,43 +128,42 @@ namespace IceInternal return _addr.Port; } - internal TcpAcceptor(Instance instance, string host, int port) + internal TcpAcceptor(ProtocolInstance instance, string host, int port) { - instance_ = instance; - _traceLevels = instance.traceLevels(); - _logger = instance.initializationData().logger; - _backlog = instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.TCP.Backlog", 511); + _instance = instance; + _backlog = instance.properties().getPropertyAsIntWithDefault("Ice.TCP.Backlog", 511); try { - int protocol = instance_.protocolSupport(); - _addr = (IPEndPoint)Network.getAddressForServer(host, port, protocol, instance_.preferIPv6()); + int protocol = _instance.protocolSupport(); + _addr = (IPEndPoint)Network.getAddressForServer(host, port, protocol, _instance.preferIPv6()); _fd = Network.createServerSocket(false, _addr.AddressFamily, protocol); Network.setBlock(_fd, false); # if !COMPACT - Network.setTcpBufSize(_fd, instance_.initializationData().properties, _logger); + Network.setTcpBufSize(_fd, _instance.properties(), _instance.logger()); # endif if(AssemblyUtil.platform_ != AssemblyUtil.Platform.Windows) { // - // Enable SO_REUSEADDR on Unix platforms to allow - // re-using the socket even if it's in the TIME_WAIT - // state. On Windows, this doesn't appear to be - // necessary and enabling SO_REUSEADDR would actually - // not be a good thing since it allows a second - // process to bind to an address even it's already - // bound by another process. + // Enable SO_REUSEADDR on Unix platforms to allow re-using the + // socket even if it's in the TIME_WAIT state. On Windows, + // this doesn't appear to be necessary and enabling + // SO_REUSEADDR would actually not be a good thing since it + // allows a second process to bind to an address even it's + // already bound by another process. // - // TODO: using SO_EXCLUSIVEADDRUSE on Windows would - // probably be better but it's only supported by recent - // Windows versions (XP SP2, Windows Server 2003). + // TODO: using SO_EXCLUSIVEADDRUSE on Windows would probably + // be better but it's only supported by recent Windows + // versions (XP SP2, Windows Server 2003). // Network.setReuseAddress(_fd, true); } - if(_traceLevels.network >= 2) + + if(_instance.traceLevel() >= 2) { - string s = "attempting to bind to tcp socket " + Network.addrToString(_addr); - _logger.trace(_traceLevels.networkCat, s); + string s = "attempting to bind to " + _instance.protocol() + " socket " + + Network.addrToString(_addr); + _instance.logger().trace(_instance.traceCategory(), s); } _addr = Network.doBind(_fd, _addr); } @@ -171,9 +174,7 @@ namespace IceInternal } } - private Instance instance_; - private TraceLevels _traceLevels; - private Ice.Logger _logger; + private ProtocolInstance _instance; private Socket _fd; private Socket _acceptFd; private System.Exception _acceptError; diff --git a/cs/src/Ice/TcpConnector.cs b/cs/src/Ice/TcpConnector.cs index bf1e956e1d0..9fdcecff1ab 100644 --- a/cs/src/Ice/TcpConnector.cs +++ b/cs/src/Ice/TcpConnector.cs @@ -16,27 +16,25 @@ namespace IceInternal sealed class TcpConnector : Connector { - internal const short TYPE = 1; - public Transceiver connect() { - if(_traceLevels.network >= 2) + if(_instance.traceLevel() >= 2) { - string s = "trying to establish tcp connection to " + ToString(); - _logger.trace(_traceLevels.networkCat, s); + string s = "trying to establish " + _instance.protocol() + " connection to " + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } try { #if SILVERLIGHT Socket fd = Network.createSocket(false, _addr.AddressFamily == AddressFamily.InterNetworkV6 ? - AddressFamily.InterNetworkV6 : AddressFamily.InterNetwork); + AddressFamily.InterNetworkV6 : AddressFamily.InterNetwork); #else Socket fd = Network.createSocket(false, _addr.AddressFamily); Network.setBlock(fd, false); #endif #if !COMPACT - Network.setTcpBufSize(fd, _instance.initializationData().properties, _logger); + Network.setTcpBufSize(fd, _instance.properties(), _instance.logger()); #endif // @@ -46,10 +44,11 @@ namespace IceInternal } catch(Ice.LocalException ex) { - if(_traceLevels.network >= 2) + if(_instance.traceLevel() >= 2) { - string s = "failed to establish tcp connection to " + ToString() + "\n" + ex; - _logger.trace(_traceLevels.networkCat, s); + string s = "failed to establish " + _instance.protocol() + " connection to " + ToString() + "\n" + + ex; + _instance.logger().trace(_instance.traceCategory(), s); } throw; } @@ -57,17 +56,16 @@ namespace IceInternal public short type() { - return Ice.UDPEndpointType.value; + return _instance.type(); } // // Only for use by TcpEndpoint // - internal TcpConnector(Instance instance, EndPoint addr, NetworkProxy proxy, int timeout, string connectionId) + internal TcpConnector(ProtocolInstance instance, EndPoint addr, NetworkProxy proxy, int timeout, + string connectionId) { _instance = instance; - _traceLevels = instance.traceLevels(); - _logger = instance.initializationData().logger; _addr = addr; _proxy = proxy; _timeout = timeout; @@ -115,9 +113,7 @@ namespace IceInternal return _hashCode; } - private Instance _instance; - private TraceLevels _traceLevels; - private Ice.Logger _logger; + private ProtocolInstance _instance; private EndPoint _addr; private NetworkProxy _proxy; private int _timeout; diff --git a/cs/src/Ice/TcpEndpointI.cs b/cs/src/Ice/TcpEndpointI.cs index c09f2756f89..cb1f556d981 100644 --- a/cs/src/Ice/TcpEndpointI.cs +++ b/cs/src/Ice/TcpEndpointI.cs @@ -15,296 +15,66 @@ namespace IceInternal using System; using System.Globalization; - sealed class TcpEndpointI : EndpointI + sealed class TcpEndpointI : IPEndpointI { - public TcpEndpointI(Instance instance, string ho, int po, int ti, string conId, bool co) : base(conId) + public TcpEndpointI(ProtocolInstance instance, string ho, int po, int ti, string conId, bool co) : + base(instance, ho, po, conId) { - _instance = instance; - _host = ho; - _port = po; _timeout = ti; _compress = co; - calcHashValue(); } - public TcpEndpointI(Instance instance, string str, bool oaEndpoint) : base("") + public TcpEndpointI(ProtocolInstance instance) : + base(instance) { - _instance = instance; - _host = null; - _port = 0; _timeout = -1; _compress = false; - - char[] separators = { ' ', '\t', '\n', '\r' }; - string[] arr = str.Split(separators); - - int i = 0; - while(i < arr.Length) - { - if(arr[i].Length == 0) - { - i++; - continue; - } - - string option = arr[i++]; - if(option.Length != 2 || option[0] != '-') - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "expected an endpoint option but found `" + option + "' in endpoint `tcp " + str + "'"; - throw e; - } - - string argument = null; - if(i < arr.Length && arr[i].Length > 0 && arr[i][0] != '-') - { - argument = arr[i++]; - if(argument[0] == '\"' && argument[argument.Length - 1] == '\"') - { - argument = argument.Substring(1, argument.Length - 2); - } - } - - switch(option[1]) - { - case 'h': - { - if(argument == null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "no argument provided for -h option in endpoint `tcp " + str + "'"; - throw e; - } - - _host = argument; - break; - } - - case 'p': - { - if(argument == null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "no argument provided for -p option in endpoint `tcp " + str + "'"; - throw e; - } - - try - { - _port = System.Int32.Parse(argument, CultureInfo.InvariantCulture); - } - catch(System.FormatException ex) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(ex); - e.str = "invalid port value `" + argument + "' in endpoint `tcp " + str + "'"; - throw e; - } - - if(_port < 0 || _port > 65535) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "port value `" + argument + "' out of range in endpoint `tcp " + str + "'"; - throw e; - } - - break; - } - - case 't': - { - if(argument == null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "no argument provided for -t option in endpoint `tcp " + str + "'"; - throw e; - } - - try - { - _timeout = System.Int32.Parse(argument, CultureInfo.InvariantCulture); - } - catch(System.FormatException ex) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(ex); - e.str = "invalid timeout value `" + argument + "' in endpoint `tcp " + str + "'"; - throw e; - } - - break; - } - - case 'z': - { - if(argument != null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "unexpected argument `" + argument + "' provided for -z option in `tcp " + str + - "'"; - throw e; - } - - _compress = true; - break; - } - - default: - { - throw new Ice.EndpointParseException("unknown option `" + option + "' in endpoint `tcp " + str + "'"); - } - } - } - - if(_host == null) - { - _host = _instance.defaultsAndOverrides().defaultHost; - } - else if(_host.Equals("*")) - { - if(oaEndpoint) - { - _host = null; - } - else - { - throw new Ice.EndpointParseException("`-h *' not valid for proxy endpoint `tcp " + str + "'"); - } - } - - if(_host == null) - { - _host = ""; - } - - calcHashValue(); } - public TcpEndpointI(BasicStream s) + public TcpEndpointI(ProtocolInstance instance, BasicStream s) : + base(instance, s) { - _instance = s.instance(); - s.startReadEncaps(); - _host = s.readString(); - _port = s.readInt(); _timeout = s.readInt(); _compress = s.readBool(); - s.endReadEncaps(); - calcHashValue(); - } - - // - // Marshal the endpoint - // - public override void streamWrite(BasicStream s) - { - s.writeShort(Ice.TCPEndpointType.value); - s.startWriteEncaps(); - s.writeString(_host); - s.writeInt(_port); - s.writeInt(_timeout); - s.writeBool(_compress); - s.endWriteEncaps(); } - // - // Convert the endpoint to its string form - // - public override string ice_toString_() + private sealed class InfoI : Ice.TCPEndpointInfo { - // - // WARNING: Certain features, such as proxy validation in Glacier2, - // depend on the format of proxy strings. Changes to toString() and - // methods called to generate parts of the reference string could break - // these features. Please review for all features that depend on the - // format of proxyToString() before changing this and related code. - // - string s = "tcp"; - - if(_host != null && _host.Length != 0) + public InfoI(IPEndpointI e) { - s += " -h "; - bool addQuote = _host.IndexOf(':') != -1; - if(addQuote) - { - s += "\""; - } - s += _host; - if(addQuote) - { - s += "\""; - } + _endpoint = e; } - s += " -p " + _port; - if(_timeout != -1) - { - s += " -t " + _timeout; - } - if(_compress) + public override short type() { - s += " -z"; + return _endpoint.type(); } - return s; - } - private sealed class InfoI : Ice.TCPEndpointInfo - { - public InfoI(int to, bool comp, string host, int port) : base(to, comp, host, port) + public override bool datagram() { + return _endpoint.datagram(); } - override public short type() - { - return Ice.TCPEndpointType.value; - } - - override public bool datagram() - { - return false; - } - - override public bool secure() + public override bool secure() { - return false; + return _endpoint.secure(); } - }; - // - // Return the endpoint information. - // - public override Ice.EndpointInfo getInfo() - { - return new InfoI(_timeout, _compress, _host, _port); - } - - // - // Return the endpoint type - // - public override short type() - { - return Ice.TCPEndpointType.value; + private IPEndpointI _endpoint; } - // - // Return the protocol name; - // - public override string protocol() + public override Ice.EndpointInfo getInfo() { - return "tcp"; + InfoI info = new InfoI(this); + fillEndpointInfo(info); + return info; } - // - // Return the timeout for the endpoint in milliseconds. 0 means - // non-blocking, -1 means no timeout. - // public override int timeout() { return _timeout; } - // - // Return a new endpoint with a different timeout value, provided - // that timeouts are supported by the endpoint. Otherwise the same - // endpoint is returned. - // public override EndpointI timeout(int timeout) { if(timeout == _timeout) @@ -313,39 +83,15 @@ namespace IceInternal } else { - return new TcpEndpointI(_instance, _host, _port, timeout, connectionId_, _compress); + return new TcpEndpointI(instance_, host_, port_, timeout, connectionId_, _compress); } } - // - // Return a new endpoint with a different connection id. - // - public override EndpointI connectionId(string connectionId) - { - if(connectionId == connectionId_) - { - return this; - } - else - { - return new TcpEndpointI(_instance, _host, _port, _timeout, connectionId, _compress); - } - } - - // - // Return true if the endpoints support bzip2 compress, or false - // otherwise. - // public override bool compress() { return _compress; } - // - // Return a new endpoint with a different compression value, - // provided that compression is supported by the - // endpoint. Otherwise the same endpoint is returned. - // public override EndpointI compress(bool compress) { if(compress == _compress) @@ -354,141 +100,61 @@ namespace IceInternal } else { - return new TcpEndpointI(_instance, _host, _port, _timeout, connectionId_, compress); + return new TcpEndpointI(instance_, host_, port_, _timeout, connectionId_, compress); } } - // - // Return true if the endpoint is datagram-based. - // public override bool datagram() { return false; } - // - // Return true if the endpoint is secure. - // public override bool secure() { return false; } - // - // Return a server side transceiver for this endpoint, or null if a - // transceiver can only be created by an acceptor. In case a - // transceiver is created, this operation also returns a new - // "effective" endpoint, which might differ from this endpoint, - // for example, if a dynamic port number is assigned. - // public override Transceiver transceiver(ref EndpointI endpoint) { endpoint = this; return null; } - // - // Return connectors for this endpoint, or empty list if no connector - // is available. - // - public override List<Connector> connectors(Ice.EndpointSelectionType selType) - { -#if SILVERLIGHT - return connectors(Network.getAddresses(_host, - _port, - _instance.protocolSupport(), - selType, - _instance.preferIPv6(), - false), - _instance.networkProxy()); -#else - return _instance.endpointHostResolver().resolve(_host, _port, selType, this); -#endif - } - - - public override void connectors_async(Ice.EndpointSelectionType selType, EndpointI_connectors callback) - { -#if SILVERLIGHT - callback.connectors(connectors(selType)); -#else - _instance.endpointHostResolver().resolve(_host, _port, selType, this, callback); -#endif - } - - - // - // Return an acceptor for this endpoint, or null if no acceptors - // is available. In case an acceptor is created, this operation - // also returns a new "effective" endpoint, which might differ - // from this endpoint, for example, if a dynamic port number is - // assigned. - // public override Acceptor acceptor(ref EndpointI endpoint, string adapterName) { #if SILVERLIGHT throw new Ice.FeatureNotSupportedException("server endpoint not supported for `" + ToString() + "'"); #else - TcpAcceptor p = new TcpAcceptor(_instance, _host, _port); - endpoint = new TcpEndpointI(_instance, _host, p.effectivePort(), _timeout, connectionId_, _compress); + TcpAcceptor p = new TcpAcceptor(instance_, host_, port_); + endpoint = createEndpoint(host_, p.effectivePort(), connectionId_); return p; #endif } - // - // Expand endpoint out in to separate endpoints for each local - // host if listening on INADDR_ANY. - // - public override List<EndpointI> expand() + public override string options() { - List<EndpointI> endps = new List<EndpointI>(); - List<string> hosts = Network.getHostsForEndpointExpand(_host, _instance.protocolSupport(), false); - if(hosts == null || hosts.Count == 0) - { - endps.Add(this); - } - else - { - foreach(string h in hosts) - { - endps.Add(new TcpEndpointI(_instance, h, _port, _timeout, connectionId_, _compress)); - } - } - return endps; - } + // + // WARNING: Certain features, such as proxy validation in Glacier2, + // depend on the format of proxy strings. Changes to toString() and + // methods called to generate parts of the reference string could break + // these features. Please review for all features that depend on the + // format of proxyToString() before changing this and related code. + // + string s = base.options(); - // - // Check whether the endpoint is equivalent to another one. - // - public override bool equivalent(EndpointI endpoint) - { - if(!(endpoint is TcpEndpointI)) + if(_timeout != -1) { - return false; + s += " -t " + _timeout; } - TcpEndpointI tcpEndpointI = (TcpEndpointI)endpoint; - return tcpEndpointI._host.Equals(_host) && tcpEndpointI._port == _port; - } - - public override List<Connector> connectors(List<EndPoint> addresses, NetworkProxy networkProxy) - { - List<Connector> connectors = new List<Connector>(); - foreach(EndPoint addr in addresses) + if(_compress) { - connectors.Add(new TcpConnector(_instance, addr, networkProxy, _timeout, connectionId_)); + s += " -z"; } - return connectors; - } - public override int GetHashCode() - { - return _hashCode; + return s; } - // - // Compare endpoints for sorting purposes - // public override int CompareTo(EndpointI obj) { if(!(obj is TcpEndpointI)) @@ -501,23 +167,6 @@ namespace IceInternal { return 0; } - else - { - int r = base.CompareTo(p); - if(r != 0) - { - return r; - } - } - - if(_port < p._port) - { - return -1; - } - else if(p._port < _port) - { - return 1; - } if(_timeout < p._timeout) { @@ -537,54 +186,126 @@ namespace IceInternal return 1; } - return string.Compare(_host, p._host, StringComparison.Ordinal); + return base.CompareTo(p); + } + + protected override void streamWriteImpl(BasicStream s) + { + base.streamWriteImpl(s); + s.writeInt(_timeout); + s.writeBool(_compress); } - private void calcHashValue() + protected override void hashInit(ref int h) { - int h = 5381; - IceInternal.HashUtil.hashAdd(ref h, Ice.TCPEndpointType.value); - IceInternal.HashUtil.hashAdd(ref h, _host); - IceInternal.HashUtil.hashAdd(ref h, _port); + base.hashInit(ref h); IceInternal.HashUtil.hashAdd(ref h, _timeout); - IceInternal.HashUtil.hashAdd(ref h, connectionId_); IceInternal.HashUtil.hashAdd(ref h, _compress); - _hashCode = h; } - private Instance _instance; - private string _host; - private int _port; + protected override void fillEndpointInfo(Ice.IPEndpointInfo info) + { + base.fillEndpointInfo(info); + if(info is Ice.TCPEndpointInfo) + { + Ice.TCPEndpointInfo tcpInfo = (Ice.TCPEndpointInfo)info; + tcpInfo.timeout = _timeout; + tcpInfo.compress = _compress; + } + } + + protected override bool checkOption(string option, string argument, string endpoint) + { + if(base.checkOption(option, argument, endpoint)) + { + return true; + } + + switch(option[1]) + { + case 't': + { + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for -t option in endpoint " + + endpoint); + } + + try + { + _timeout = System.Int32.Parse(argument, CultureInfo.InvariantCulture); + } + catch(System.FormatException ex) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(ex); + e.str = "invalid timeout value `" + argument + "' in endpoint " + endpoint; + throw e; + } + + return true; + } + + case 'z': + { + if(argument != null) + { + throw new Ice.EndpointParseException("unexpected argument `" + argument + + "' provided for -z option in " + endpoint); + } + + _compress = true; + + return true; + } + + default: + { + return false; + } + } + } + + protected override Connector createConnector(EndPoint addr, NetworkProxy proxy) + { + return new TcpConnector(instance_, addr, proxy, _timeout, connectionId_); + } + + protected override IPEndpointI createEndpoint(string host, int port, string connectionId) + { + return new TcpEndpointI(instance_, host, port, _timeout, connectionId, _compress); + } + private int _timeout; private bool _compress; - private int _hashCode; } sealed class TcpEndpointFactory : EndpointFactory { - internal TcpEndpointFactory(Instance instance) + internal TcpEndpointFactory(ProtocolInstance instance) { _instance = instance; } public short type() { - return Ice.TCPEndpointType.value; + return _instance.type(); } public string protocol() { - return "tcp"; + return _instance.protocol(); } - public EndpointI create(string str, bool oaEndpoint) + public EndpointI create(List<string> args, bool oaEndpoint) { - return new TcpEndpointI(_instance, str, oaEndpoint); + IPEndpointI endpt = new TcpEndpointI(_instance); + endpt.initWithOptions(args, oaEndpoint); + return endpt; } public EndpointI read(BasicStream s) { - return new TcpEndpointI(s); + return new TcpEndpointI(_instance, s); } public void destroy() @@ -592,7 +313,12 @@ namespace IceInternal _instance = null; } - private Instance _instance; + public EndpointFactory clone(ProtocolInstance instance) + { + return new TcpEndpointFactory(instance); + } + + private ProtocolInstance _instance; } } diff --git a/cs/src/Ice/TcpTransceiver.cs b/cs/src/Ice/TcpTransceiver.cs index 27a214c4ccd..1e9ead6b36d 100644 --- a/cs/src/Ice/TcpTransceiver.cs +++ b/cs/src/Ice/TcpTransceiver.cs @@ -25,7 +25,7 @@ namespace IceInternal sealed class TcpTransceiver : Transceiver { - public int initialize() + public int initialize(Buffer readBuffer, Buffer writeBuffer, ref bool hasMoreData) { try { @@ -54,10 +54,11 @@ namespace IceInternal _writeResult = null; #endif _desc = Network.fdToString(_fd, _proxy, _addr); + if(_proxy != null) { _state = StateProxyConnectRequest; // Send proxy connect request - return SocketOperation.Write; + return SocketOperation.Write; } _state = StateConnected; @@ -74,37 +75,44 @@ namespace IceInternal } catch(Ice.LocalException ex) { - if(_traceLevels.network >= 2) + if(_instance.traceLevel() >= 2) { System.Text.StringBuilder s = new System.Text.StringBuilder(); - s.Append("failed to establish tcp connection\n"); + s.Append("failed to establish " + protocol() + " connection\n"); s.Append(Network.fdToString(_fd, _proxy, _addr)); s.Append("\n"); s.Append(ex.ToString()); - _logger.trace(_traceLevels.networkCat, s.ToString()); + _instance.logger().trace(_instance.traceCategory(), s.ToString()); } throw; } Debug.Assert(_state == StateConnected); - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { - string s = "tcp connection established\n" + _desc; - _logger.trace(_traceLevels.networkCat, s); + string s = protocol() + " connection established\n" + _desc; + _instance.logger().trace(_instance.traceCategory(), s); } return SocketOperation.None; } + public int closing(bool initiator, Ice.LocalException ex) + { + // If we are initiating the connection closure, wait for the peer + // to close the TCP/IP connection. Otherwise, close immediately. + return initiator ? SocketOperation.Read : SocketOperation.None; + } + public void close() { // // If the transceiver is not connected, its description is simply "not connected", // which isn't very helpful. // - if(_state == StateConnected && _traceLevels.network >= 1) + if(_state == StateConnected && _instance.traceLevel() >= 1) { - string s = "closing tcp connection\n" + ToString(); - _logger.trace(_traceLevels.networkCat, s); + string s = "closing " + protocol() + " connection\n" + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } Debug.Assert(_fd != null); @@ -122,17 +130,22 @@ namespace IceInternal } } - public bool write(Buffer buf) + public int write(Buffer buf) { #if COMPACT || SILVERLIGHT // // Silverlight and the Compact .NET Frameworks don't support the use of synchronous socket - // operations on a non-blocking socket. Returning false here forces the caller to schedule - // an asynchronous operation. + // operations on a non-blocking socket. Returning SocketOperation.Write here forces the caller + // to schedule an asynchronous operation. // - return false; + return SocketOperation.Write; #else int packetSize = buf.b.remaining(); + if(packetSize == 0) + { + return SocketOperation.None; + } + if(AssemblyUtil.platform_ == AssemblyUtil.Platform.Windows) { // @@ -161,17 +174,17 @@ namespace IceInternal { if(Network.wouldBlock(e)) { - return false; + return SocketOperation.Write; } throw; } Debug.Assert(ret > 0); - if(_traceLevels.network >= 3) + if(_instance.traceLevel() >= 3) { - string s = "sent " + ret + " of " + packetSize + " bytes via tcp\n" + ToString(); - _logger.trace(_traceLevels.networkCat, s); + string s = "sent " + ret + " of " + packetSize + " bytes via " + protocol() + "\n" + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } buf.b.position(buf.b.position() + ret); if(packetSize > buf.b.remaining()) @@ -190,21 +203,25 @@ namespace IceInternal } } - return true; // No more data to send. + return SocketOperation.None; // No more data to send. #endif } - public bool read(Buffer buf) + public int read(Buffer buf, ref bool hasMoreData) { #if COMPACT || SILVERLIGHT // // Silverlight and the Compact .NET Framework don't support the use of synchronous socket // operations on a non-blocking socket. // - return false; + return SocketOperation.Read; #else int remaining = buf.b.remaining(); int position = buf.b.position(); + if(remaining == 0) + { + return SocketOperation.None; + } while(buf.b.hasRemaining()) { @@ -229,19 +246,20 @@ namespace IceInternal { if(Network.wouldBlock(e)) { - return false; + return SocketOperation.Read; } throw; } Debug.Assert(ret > 0); - if(_traceLevels.network >= 3) + if(_instance.traceLevel() >= 3) { - string s = "received " + ret + " of " + remaining + " bytes via tcp\n" + ToString(); - _logger.trace(_traceLevels.networkCat, s); + string s = "received " + ret + " of " + remaining + " bytes via " + protocol() + "\n" + + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } - + remaining -= ret; buf.b.position(position += ret); } @@ -271,7 +289,7 @@ namespace IceInternal } } - return true; + return SocketOperation.None; #endif } @@ -354,15 +372,15 @@ namespace IceInternal Debug.Assert(ret > 0); - if(_traceLevels.network >= 3) + if(_instance.traceLevel() >= 3) { int packetSize = buf.b.remaining(); if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize) { packetSize = _maxReceivePacketSize; } - string s = "received " + ret + " of " + packetSize + " bytes via tcp\n" + ToString(); - _logger.trace(_traceLevels.networkCat, s); + string s = "received " + ret + " of " + packetSize + " bytes via " + protocol() + "\n" + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } buf.b.position(buf.b.position() + ret); @@ -442,7 +460,7 @@ namespace IceInternal _writeEventArgs.SetBuffer(buf.b.rawBytes(), buf.b.position(), packetSize); bool completedSynchronously = !_fd.SendAsync(_writeEventArgs); #else - _writeResult = _fd.BeginSend(buf.b.rawBytes(), buf.b.position(), packetSize, SocketFlags.None, + _writeResult = _fd.BeginSend(buf.b.rawBytes(), buf.b.position(), packetSize, SocketFlags.None, writeCompleted, state); bool completedSynchronously = _writeResult.CompletedSynchronously; #endif @@ -489,8 +507,8 @@ namespace IceInternal if(_state < StateConnected && _state != StateProxyConnectRequest) { return; - } - + } + try { #if ICE_SOCKET_ASYNC_API @@ -509,15 +527,15 @@ namespace IceInternal } Debug.Assert(ret > 0); - if(_traceLevels.network >= 3) + if(_instance.traceLevel() >= 3) { int packetSize = buf.b.remaining(); if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) { packetSize = _maxSendPacketSize; } - string s = "sent " + ret + " of " + packetSize + " bytes via tcp\n" + ToString(); - _logger.trace(_traceLevels.networkCat, s); + string s = "sent " + ret + " of " + packetSize + " bytes via " + protocol() + "\n" + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } buf.b.position(buf.b.position() + ret); @@ -542,13 +560,12 @@ namespace IceInternal } } - public string type() + public string protocol() { - return "tcp"; + return _instance.protocol(); } - public Ice.ConnectionInfo - getInfo() + public Ice.ConnectionInfo getInfo() { Ice.TCPConnectionInfo info = new Ice.TCPConnectionInfo(); if(_fd != null) @@ -579,17 +596,16 @@ namespace IceInternal // // Only for use by TcpConnector, TcpAcceptor // - internal TcpTransceiver(Instance instance, Socket fd, EndPoint addr, NetworkProxy proxy, bool connected) + internal TcpTransceiver(ProtocolInstance instance, Socket fd, EndPoint addr, NetworkProxy proxy, bool connected) { + _instance = instance; _fd = fd; _addr = addr; _proxy = proxy; - - _traceLevels = instance.traceLevels(); - _logger = instance.initializationData().logger; + _state = connected ? StateConnected : StateNeedConnect; _desc = connected ? Network.fdToString(_fd, _proxy, _addr) : "<not connected>"; - + #if ICE_SOCKET_ASYNC_API _readEventArgs = new SocketAsyncEventArgs(); _readEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(ioCompleted); @@ -597,7 +613,7 @@ namespace IceInternal _writeEventArgs = new SocketAsyncEventArgs(); _writeEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(ioCompleted); #if SILVERLIGHT - String policy = instance.initializationData().properties.getProperty("Ice.ClientAccessPolicyProtocol"); + String policy = instance.properties().getProperty("Ice.ClientAccessPolicyProtocol"); if(policy.Equals("Http")) { _readEventArgs.SocketClientAccessPolicyProtocol = SocketClientAccessPolicyProtocol.Http; @@ -605,7 +621,7 @@ namespace IceInternal } else if(!String.IsNullOrEmpty(policy)) { - _logger.warning("Ignoring invalid Ice.ClientAccessPolicyProtocol value `" + policy + "'"); + _instance.logger().warning("Ignoring invalid Ice.ClientAccessPolicyProtocol value `" + policy + "'"); } #endif #endif @@ -657,11 +673,10 @@ namespace IceInternal } #endif + private ProtocolInstance _instance; private Socket _fd; private EndPoint _addr; private NetworkProxy _proxy; - private TraceLevels _traceLevels; - private Ice.Logger _logger; private string _desc; private int _state; private int _maxSendPacketSize; @@ -681,7 +696,7 @@ namespace IceInternal private const int StateNeedConnect = 0; private const int StateConnectPending = 1; private const int StateProxyConnectRequest = 2; - private const int StateProxyConnectRequestPending = 3; + private const int StateProxyConnectRequestPending = 3; private const int StateConnected = 4; } } diff --git a/cs/src/Ice/ThreadPool.cs b/cs/src/Ice/ThreadPool.cs index 22852069b3e..84d1861e3cb 100644 --- a/cs/src/Ice/ThreadPool.cs +++ b/cs/src/Ice/ThreadPool.cs @@ -33,21 +33,21 @@ namespace IceInternal _finishWithIO = current.startMessage(); return _finishWithIO; } - + public void finishIOScope(ref ThreadPoolCurrent current) { if(_finishWithIO) { - // This must be called with the handler locked. + // This must be called with the handler locked. current.finishMessage(true); } } - + public void completed(ref ThreadPoolCurrent current) { // // Call finishMessage once IO is completed only if serialization is not enabled. - // Otherwise, finishMessage will be called when the event handler is done with + // Otherwise, finishMessage will be called when the event handler is done with // the message (it will be called from destroy below). // Debug.Assert(_finishWithIO); @@ -57,14 +57,14 @@ namespace IceInternal _finish = true; } } - + public void destroy(ref ThreadPoolCurrent current) { if(_finish) { // // A ThreadPoolMessage instance must be created outside the synchronization - // of the event handler. We need to lock the event handler here to call + // of the event handler. We need to lock the event handler here to call // finishMessage. // _mutex.Lock(); @@ -79,7 +79,7 @@ namespace IceInternal } } } - + private IceUtilInternal.Monitor _mutex; private bool _finish; private bool _finishWithIO; @@ -121,17 +121,17 @@ namespace IceInternal { public ThreadPool(Instance instance, string prefix, int timeout) { + Ice.Properties properties = instance.initializationData().properties; + _instance = instance; _dispatcher = instance.initializationData().dispatcher; _destroyed = false; _prefix = prefix; _threadIndex = 0; _inUse = 0; - _serialize = _instance.initializationData().properties.getPropertyAsInt(_prefix + ".Serialize") > 0; + _serialize = properties.getPropertyAsInt(_prefix + ".Serialize") > 0; _serverIdleTime = timeout; - Ice.Properties properties = _instance.initializationData().properties; - string programName = properties.getProperty("Ice.ProgramName"); if(programName.Length > 0) { @@ -209,16 +209,16 @@ namespace IceInternal _priority = IceInternal.Util.stringToThreadPriority(properties.getProperty("Ice.ThreadPriority")); } #endif - + if(_instance.traceLevels().threadPool >= 1) { - string s = "creating " + _prefix + ": Size = " + _size + ", SizeMax = " + _sizeMax + ", SizeWarn = " + + string s = "creating " + _prefix + ": Size = " + _size + ", SizeMax = " + _sizeMax + ", SizeWarn = " + _sizeWarn; _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); } _workItems = new Queue<ThreadPoolWorkItem>(); - + try { _threads = new List<WorkerThread>(); @@ -251,15 +251,14 @@ namespace IceInternal } } - public void updateObservers() + public void destroy() { _m.Lock(); try { - foreach(WorkerThread t in _threads) - { - t.updateObserver(); - } + Debug.Assert(!_destroyed); + _destroyed = true; + _m.NotifyAll(); } finally { @@ -267,14 +266,15 @@ namespace IceInternal } } - public void destroy() + public void updateObservers() { _m.Lock(); try { - Debug.Assert(!_destroyed); - _destroyed = true; - _m.NotifyAll(); + foreach(WorkerThread t in _threads) + { + t.updateObserver(); + } } finally { @@ -298,8 +298,21 @@ namespace IceInternal try { Debug.Assert(!_destroyed); - handler._registered = handler._registered & ~remove; - handler._registered = handler._registered | add; + + // Don't remove what needs to be added + remove &= ~add; + + // Don't remove/add if already un-registered or registered + remove &= handler._registered; + add &= ~handler._registered; + if(remove == add) + { + return; + } + + handler._registered &= ~remove; + handler._registered |= add; + if((add & SocketOperation.Read) != 0 && (handler._pending & SocketOperation.Read) == 0) { handler._pending |= SocketOperation.Read; @@ -314,7 +327,7 @@ namespace IceInternal executeNonBlocking(delegate() { messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Write)); - }); + }); } } finally @@ -334,12 +347,16 @@ namespace IceInternal try { Debug.Assert(!_destroyed); + + // + // If there are no pending asynchronous operations, we can call finish on the handler now. + // if(handler._pending == 0) { handler._registered = SocketOperation.None; executeNonBlocking(delegate() { - ThreadPoolCurrent current = + ThreadPoolCurrent current = new ThreadPoolCurrent(this, handler, SocketOperation.None); handler.finished(ref current); }); @@ -382,8 +399,7 @@ namespace IceInternal } } - public void - execute(ThreadPoolWorkItem workItem) + public void execute(ThreadPoolWorkItem workItem) { _m.Lock(); try @@ -397,10 +413,10 @@ namespace IceInternal // // If this is a dynamic thread pool which can still grow and if all threads are - // currently busy dispatching or about to dispatch, we spawn a new thread to + // currently busy dispatching or about to dispatch, we spawn a new thread to // execute this new work item right away. // - if(_threads.Count < _sizeMax && + if(_threads.Count < _sizeMax && (_inUse + _workItems.Count) > _threads.Count && !_destroyed) { @@ -409,7 +425,7 @@ namespace IceInternal string s = "growing " + _prefix + ": Size = " + (_threads.Count + 1); _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); } - + try { WorkerThread t = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++); @@ -440,8 +456,7 @@ namespace IceInternal } } - public void - executeNonBlocking(ThreadPoolWorkItem workItem) + public void executeNonBlocking(ThreadPoolWorkItem workItem) { _m.Lock(); try @@ -519,9 +534,9 @@ namespace IceInternal { // // If not the last thread or if server idle time isn't configured, - // we can exit. Unlike C++/Java, there's no need to have a thread + // we can exit. Unlike C++/Java, there's no need to have a thread // always spawned in the thread pool because all the IO is done - // by the .NET thread pool threads. Instead, we'll just spawn a + // by the .NET thread pool threads. Instead, we'll just spawn a // new thread when needed (i.e.: when a new work item is queued). // if(_instance.traceLevels().threadPool >= 1) @@ -530,7 +545,7 @@ namespace IceInternal _instance.initializationData().logger.trace( _instance.traceLevels().threadPoolCat, s); } - + _threads.Remove(thread); _instance.asyncIOThread().queue(delegate() { @@ -573,13 +588,13 @@ namespace IceInternal ++_inUse; thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForUser); - + if(_sizeMax > 1 && _inUse == _sizeWarn) { string s = "thread pool `" + _prefix + "' is running low on threads\n" + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn; _instance.initializationData().logger.warning(s); - } + } } finally { @@ -597,11 +612,11 @@ namespace IceInternal } } } - + public bool startMessage(ref ThreadPoolCurrent current) { Debug.Assert((current._handler._pending & current.operation) != 0); - + if((current._handler._started & current.operation) != 0) { Debug.Assert((current._handler._ready & current.operation) == 0); @@ -617,7 +632,7 @@ namespace IceInternal return false; } } - else if((current._handler._ready & current.operation) == 0 && + else if((current._handler._ready & current.operation) == 0 && (current._handler._registered & current.operation) != 0) { Debug.Assert((current._handler._started & current.operation) == 0); @@ -638,7 +653,7 @@ namespace IceInternal return false; } } - + if((current._handler._registered & current.operation) != 0) { Debug.Assert((current._handler._ready & current.operation) != 0); @@ -700,7 +715,7 @@ namespace IceInternal { messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Read)); } - + public void asyncWriteCallback(object state) { messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Write)); @@ -721,7 +736,7 @@ namespace IceInternal { string s = "exception in `" + _prefix + "':\n" + ex + "\nevent handler: " + current._handler.ToString(); _instance.initializationData().logger.error(s); - } + } } private AsyncCallback getCallback(int operation) @@ -772,8 +787,7 @@ namespace IceInternal } } - public void - setState(Ice.Instrumentation.ThreadState s) + public void setState(Ice.Instrumentation.ThreadState s) { // Must be called with the thread pool mutex locked if(_observer != null) @@ -842,7 +856,7 @@ namespace IceInternal string s = "exception in `" + _threadPool._prefix + "' thread " + _thread.Name + ":\n" + ex; _threadPool._instance.initializationData().logger.error(s); } - + if(_observer != null) { _observer.detach(); diff --git a/cs/src/Ice/Transceiver.cs b/cs/src/Ice/Transceiver.cs index 3e30af51676..c1d887f154d 100644 --- a/cs/src/Ice/Transceiver.cs +++ b/cs/src/Ice/Transceiver.cs @@ -14,40 +14,23 @@ namespace IceInternal public interface Transceiver { - // - // Initialize the transceiver using asynchronous I/O. This method never blocks. Returns true - // if initialization is complete, or false if an I/O request is pending. In the latter case, - // the callback must invoke initialize again and repeat this process until it returns true. - // - int initialize(); - + int initialize(Buffer readBuffer, Buffer writeBuffer, ref bool hasMoreData); + int closing(bool initiator, Ice.LocalException ex); void close(); - // - // Write data. - // - // Returns true if all the data was written, false otherwise. - // - bool write(Buffer buf); - - // - // Read data. - // - // Returns true if all the requested data was read, false otherwise. - // - bool read(Buffer buf); + int write(Buffer buf); + int read(Buffer buf, ref bool hasMoreData); // // Read data asynchronously. // - // The I/O request may complete synchronously, in which case endRead - // will be invoked in the same thread as beginRead. The return value - // from beginRead must be passed to endRead, along with the same buffer - // object. The caller must check the buffer after endRead completes to - // determine whether all of the requested data has been read. + // The I/O request may complete synchronously, in which case finishRead + // will be invoked in the same thread as startRead. The caller must check + // the buffer after finishRead completes to determine whether all of the + // requested data has been read. // // The read request is canceled upon the termination of the thread that - // calls beginRead, or when the socket is closed. In this case endRead + // calls startRead, or when the socket is closed. In this case finishRead // raises ReadAbortedException. // bool startRead(Buffer buf, AsyncCallback callback, object state); @@ -56,14 +39,14 @@ namespace IceInternal // // Write data asynchronously. // - // The I/O request may complete synchronously, in which case endWrite - // will be invoked in the same thread as beginWrite. The request - // will be canceled upon the termination of the thread that calls beginWrite. + // The I/O request may complete synchronously, in which case finishWrite + // will be invoked in the same thread as startWrite. The request + // will be canceled upon the termination of the thread that calls startWrite. // bool startWrite(Buffer buf, AsyncCallback callback, object state, out bool completed); void finishWrite(Buffer buf); - string type(); + string protocol(); Ice.ConnectionInfo getInfo(); void checkSendSize(Buffer buf, int messageSizeMax); } diff --git a/cs/src/Ice/UdpConnector.cs b/cs/src/Ice/UdpConnector.cs index b09cb166b89..e6fddc7044d 100644 --- a/cs/src/Ice/UdpConnector.cs +++ b/cs/src/Ice/UdpConnector.cs @@ -18,20 +18,21 @@ namespace IceInternal { public Transceiver connect() { - return new UdpTransceiver(instance_, _addr, _mcastInterface, _mcastTtl); + return new UdpTransceiver(_instance, _addr, _mcastInterface, _mcastTtl); } public short type() { - return Ice.UDPEndpointType.value; + return _instance.type(); } // - // Only for use by TcpEndpoint + // Only for use by UdpEndpointI // - internal UdpConnector(Instance instance, EndPoint addr, string mcastInterface, int mcastTtl, string connectionId) + internal UdpConnector(ProtocolInstance instance, EndPoint addr, string mcastInterface, int mcastTtl, + string connectionId) { - instance_ = instance; + _instance = instance; _addr = addr; _mcastInterface = mcastInterface; _mcastTtl = mcastTtl; @@ -85,7 +86,7 @@ namespace IceInternal return _hashCode; } - private Instance instance_; + private ProtocolInstance _instance; private EndPoint _addr; private string _mcastInterface; private int _mcastTtl; diff --git a/cs/src/Ice/UdpEndpointI.cs b/cs/src/Ice/UdpEndpointI.cs index e7a7db586db..921eb60f04f 100644 --- a/cs/src/Ice/UdpEndpointI.cs +++ b/cs/src/Ice/UdpEndpointI.cs @@ -17,249 +17,28 @@ namespace IceInternal using System; using System.Globalization; - sealed class UdpEndpointI : EndpointI + sealed class UdpEndpointI : IPEndpointI { - public UdpEndpointI(Instance instance, string ho, int po, string mif, int mttl, bool conn, string conId, - bool co) : base(conId) + public UdpEndpointI(ProtocolInstance instance, string ho, int po, string mif, int mttl, bool conn, string conId, + bool co) : + base(instance, ho, po, conId) { - instance_ = instance; - _host = ho; - _port = po; _mcastInterface = mif; _mcastTtl = mttl; _connect = conn; - connectionId_ = conId; _compress = co; - calcHashValue(); } - - public UdpEndpointI(Instance instance, string str, bool oaEndpoint) : base("") + + public UdpEndpointI(ProtocolInstance instance) : + base(instance) { - instance_ = instance; - _host = null; - _port = 0; _connect = false; _compress = false; - - string delim = " \t\n\r"; - - int beg; - int end = 0; - - while(true) - { - beg = IceUtilInternal.StringUtil.findFirstNotOf(str, delim, end); - if(beg == -1) - { - break; - } - - end = IceUtilInternal.StringUtil.findFirstOf(str, delim, beg); - if(end == -1) - { - end = str.Length; - } - - string option = str.Substring(beg, end - beg); - if(option[0] != '-') - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "expected an endpoint option but found `" + option + "' in endpoint `udp " + str + "'"; - throw e; - } - - string argument = null; - int argumentBeg = IceUtilInternal.StringUtil.findFirstNotOf(str, delim, end); - if(argumentBeg != -1 && str[argumentBeg] != '-') - { - beg = argumentBeg; - if(str[beg] == '\"') - { - end = IceUtilInternal.StringUtil.findFirstOf(str, "\"", beg + 1); - if(end == -1) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "mismatched quotes around `" + argument + "' in endpoint `udp " + str + "'"; - throw e; - } - else - { - ++end; - } - } - else - { - end = IceUtilInternal.StringUtil.findFirstOf(str, delim, beg); - if(end == -1) - { - end = str.Length; - } - } - argument = str.Substring(beg, end - beg); - if(argument[0] == '\"' && argument[argument.Length - 1] == '\"') - { - argument = argument.Substring(1, argument.Length - 2); - } - } - - if(option.Equals("-h")) - { - if(argument == null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "no argument provided for -h option in endpoint `udp " + str + "'"; - throw e; - } - - _host = argument; - } - else if(option.Equals("-p")) - { - if(argument == null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "no argument provided for -p option in endpoint `udp " + str + "'"; - throw e; - } - - try - { - _port = System.Int32.Parse(argument, CultureInfo.InvariantCulture); - } - catch(System.FormatException ex) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(ex); - e.str = "invalid port value `" + argument + "' in endpoint `udp " + str + "'"; - throw e; - } - - if(_port < 0 || _port > 65535) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "port value `" + argument + "' out of range in endpoint `udp " + str + "'"; - throw e; - } - } - else if(option.Equals("-c")) - { - if(argument != null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "unexpected argument `" + argument + "' provided for -c option in `udp " + str + "'"; - throw e; - } - - _connect = true; - } - else if(option.Equals("-z")) - { - if(argument != null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "unexpected argument `" + argument + "' provided for -z option in `udp " + str + "'"; - throw e; - } - - _compress = true; - } - else if(option.Equals("-v") || option.Equals("-e")) - { - if(argument == null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "no argument provided for " + option + " option in endpoint " + "`udp " + str + "'"; - throw e; - } - - try - { - Ice.EncodingVersion v = Ice.Util.stringToEncodingVersion(argument); - if(v.major != 1 || v.minor != 0) - { - instance_.initializationData().logger.warning("deprecated udp endpoint option: " + option); - } - } - catch(Ice.VersionParseException ex) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "invalid version `" + argument + "' in endpoint `udp " + str + "':\n" + ex.str; - throw e; - } - } - else if(option.Equals("--interface")) - { - if(argument == null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "no argument provided for --interface option in endpoint `udp " + str + "'"; - throw e; - } - - _mcastInterface = argument; - } - else if(option.Equals("--ttl")) - { - if(argument == null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "no argument provided for --ttl option in endpoint `udp " + str + "'"; - throw e; - } - - try - { - _mcastTtl = System.Int32.Parse(argument, CultureInfo.InvariantCulture); - } - catch(System.FormatException ex) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(ex); - e.str = "invalid TTL value `" + argument + "' in endpoint `udp " + str + "'"; - throw e; - } - - if(_mcastTtl < 0) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "TTL value `" + argument + "' out of range in endpoint `udp " + str + "'"; - throw e; - } - } - else - { - throw new Ice.EndpointParseException("unknown option `" + option + "' in endpoint `udp " + str + "'"); - } - } - - if(_host == null) - { - _host = instance_.defaultsAndOverrides().defaultHost; - } - else if(_host.Equals("*")) - { - if(oaEndpoint) - { - _host = null; - } - else - { - throw new Ice.EndpointParseException("`-h *' not valid for proxy endpoint `udp " + str + "'"); - } - } - - if(_host == null) - { - _host = ""; - } - - calcHashValue(); } - - public UdpEndpointI(BasicStream s) + + public UdpEndpointI(ProtocolInstance instance, BasicStream s) : + base(instance, s) { - instance_ = s.instance(); - s.startReadEncaps(); - _host = s.readString(); - _port = s.readInt(); if(s.getReadEncoding().Equals(Ice.Util.Encoding_1_0)) { s.readByte(); @@ -271,140 +50,62 @@ namespace IceInternal //_connect = s.readBool(); _connect = false; _compress = s.readBool(); - s.endReadEncaps(); - calcHashValue(); } - - // - // Marshal the endpoint - // - public override void streamWrite(BasicStream s) - { - s.writeShort(Ice.UDPEndpointType.value); - s.startWriteEncaps(); - s.writeString(_host); - s.writeInt(_port); - if(s.getWriteEncoding().Equals(Ice.Util.Encoding_1_0)) - { - Ice.Util.Protocol_1_0.write__(s); - Ice.Util.Encoding_1_0.write__(s); - } - // Not transmitted. - //s.writeBool(_connect); - s.writeBool(_compress); - s.endWriteEncaps(); - } - - // - // Convert the endpoint to its string form - // - public override string ice_toString_() - { - // - // WARNING: Certain features, such as proxy validation in Glacier2, - // depend on the format of proxy strings. Changes to toString() and - // methods called to generate parts of the reference string could break - // these features. Please review for all features that depend on the - // format of proxyToString() before changing this and related code. - // - string s = "udp"; - - if(_host != null && _host.Length != 0) - { - s += " -h "; - bool addQuote = _host.IndexOf(':') != -1; - if(addQuote) - { - s += "\""; - } - s += _host; - if(addQuote) - { - s += "\""; - } - } - - s += " -p " + _port; - - if(_mcastInterface.Length != 0) - { - s += " --interface " + _mcastInterface; - } - - if(_mcastTtl != -1) - { - s += " --ttl " + _mcastTtl; - } - - if(_connect) - { - s += " -c"; - } - if(_compress) - { - s += " -z"; - } - - return s; - } - private sealed class InfoI : Ice.UDPEndpointInfo { - public InfoI(bool comp, string host, int port, string mcastInterface, int mcastTtl) : - base(-1, comp, host, port, mcastInterface, mcastTtl) + public InfoI(UdpEndpointI e) { + _endpoint = e; } override public short type() { - return Ice.UDPEndpointType.value; + return _endpoint.type(); } - + override public bool datagram() { - return true; + return _endpoint.datagram(); } - + override public bool secure() { - return false; + return _endpoint.secure(); } - }; + + private UdpEndpointI _endpoint; + } // // Return the endpoint information. // public override Ice.EndpointInfo getInfo() { - return new InfoI(_compress, _host, _port, _mcastInterface, _mcastTtl); + InfoI info = new InfoI(this); + fillEndpointInfo(info); + return info; } // - // Return the endpoint type - // - public override short type() - { - return Ice.UDPEndpointType.value; - } - - // - // Return the protocol name; + // Return the timeout for the endpoint in milliseconds. 0 means + // non-blocking, -1 means no timeout. // - public override string protocol() + public override int timeout() { - return "udp"; + return -1; } // - // Return the timeout for the endpoint in milliseconds. 0 means - // non-blocking, -1 means no timeout. + // Return a new endpoint with a different timeout value, provided + // that timeouts are supported by the endpoint. Otherwise the same + // endpoint is returned. // - public override int timeout() + public override EndpointI timeout(int timeout) { - return -1; + return this; } - + // // Return true if the endpoints support bzip2 compress, or false // otherwise. @@ -413,7 +114,7 @@ namespace IceInternal { return _compress; } - + // // Return a new endpoint with a different compression value, // provided that compression is supported by the @@ -427,45 +128,19 @@ namespace IceInternal } else { - return new UdpEndpointI(instance_, _host, _port, _mcastInterface, _mcastTtl, _connect, connectionId_, + return new UdpEndpointI(instance_, host_, port_, _mcastInterface, _mcastTtl, _connect, connectionId_, compress); } } // - // Return a new endpoint with a different connection id. - // - public override EndpointI connectionId(string connectionId) - { - if(connectionId == connectionId_) - { - return this; - } - else - { - return new UdpEndpointI(instance_, _host, _port, _mcastInterface, _mcastTtl, _connect, connectionId, - _compress); - } - } - - // - // Return a new endpoint with a different timeout value, provided - // that timeouts are supported by the endpoint. Otherwise the same - // endpoint is returned. - // - public override EndpointI timeout(int timeout) - { - return this; - } - - // // Return true if the endpoint is datagram-based. // public override bool datagram() { return true; } - + // // Return true if the endpoint is secure. // @@ -483,34 +158,12 @@ namespace IceInternal // public override Transceiver transceiver(ref EndpointI endpoint) { - UdpTransceiver p = new UdpTransceiver(instance_, _host, _port, _mcastInterface, _connect); - endpoint = new UdpEndpointI(instance_, _host, p.effectivePort(), _mcastInterface, _mcastTtl, - _connect, connectionId_, _compress); + UdpTransceiver p = new UdpTransceiver(instance_, host_, port_, _mcastInterface, _connect); + endpoint = createEndpoint(host_, p.effectivePort(), connectionId_); return p; } // - // Return a connector for this endpoint, or empty list if no connector - // is available. - // - public override List<Connector> connectors(Ice.EndpointSelectionType selType) - { - return connectors(Network.getAddresses(_host, _port, instance_.protocolSupport(), selType, - instance_.preferIPv6(), true), - instance_.networkProxy()); - } - - - public override void connectors_async(Ice.EndpointSelectionType selType, EndpointI_connectors callback) - { -#if SILVERLIGHT - callback.connectors(connectors(selType)); -#else - instance_.endpointHostResolver().resolve(_host, _port, selType, this, callback); -#endif - } - - // // Return an acceptor for this endpoint, or null if no acceptors // is available. In case an acceptor is created, this operation // also returns a new "effective" endpoint, which might differ @@ -523,59 +176,40 @@ namespace IceInternal return null; } - // - // Expand endpoint out in to separate endpoints for each local - // host if listening on INADDR_ANY. - // - public override List<EndpointI> - expand() + public override string options() { - List<EndpointI> endps = new List<EndpointI>(); - List<string> hosts = Network.getHostsForEndpointExpand(_host, instance_.protocolSupport(), false); - if(hosts == null || hosts.Count == 0) + // + // WARNING: Certain features, such as proxy validation in Glacier2, + // depend on the format of proxy strings. Changes to toString() and + // methods called to generate parts of the reference string could break + // these features. Please review for all features that depend on the + // format of proxyToString() before changing this and related code. + // + string s = base.options(); + + if(_mcastInterface.Length != 0) { - endps.Add(this); + s += " --interface " + _mcastInterface; } - else + + if(_mcastTtl != -1) { - foreach(string h in hosts) - { - endps.Add(new UdpEndpointI(instance_, h, _port, _mcastInterface, _mcastTtl, _connect, - connectionId_, _compress)); - } + s += " --ttl " + _mcastTtl; } - return endps; - } - // - // Check whether the endpoint is equivalent to another one. - // - public override bool equivalent(EndpointI endpoint) - { - if(!(endpoint is UdpEndpointI)) + if(_connect) { - return false; + s += " -c"; } - UdpEndpointI udpEndpointI = (UdpEndpointI)endpoint; - return udpEndpointI._host.Equals(_host) && udpEndpointI._port == _port; - } - - public override List<Connector> connectors(List<EndPoint> addresses, NetworkProxy networkProxy) - { - List<Connector> connectors = new List<Connector>(); - foreach(EndPoint addr in addresses) + if(_compress) { - connectors.Add(new UdpConnector(instance_, addr, _mcastInterface, _mcastTtl, connectionId_)); + s += " -z"; } - return connectors; - } - public override int GetHashCode() - { - return _hashCode; + return s; } - + // // Compare endpoints for sorting purposes // @@ -591,24 +225,7 @@ namespace IceInternal { return 0; } - else - { - int r = base.CompareTo(p); - if(r != 0) - { - return r; - } - } - - if(_port < p._port) - { - return -1; - } - else if(p._port < _port) - { - return 1; - } - + if(!_connect && p._connect) { return -1; @@ -617,11 +234,6 @@ namespace IceInternal { return 1; } - - if(!connectionId_.Equals(p.connectionId_)) - { - return string.Compare(connectionId_, p.connectionId_, StringComparison.Ordinal); - } if(!_compress && p._compress) { @@ -631,13 +243,13 @@ namespace IceInternal { return 1; } - + int rc = string.Compare(_mcastInterface, p._mcastInterface, StringComparison.Ordinal); if(rc != 0) { return rc; } - + if(_mcastTtl < p._mcastTtl) { return -1; @@ -647,66 +259,203 @@ namespace IceInternal return 1; } - return string.Compare(_host, p._host, StringComparison.Ordinal); + return base.CompareTo(p); } - - private void calcHashValue() + + // + // Marshal the endpoint + // + protected override void streamWriteImpl(BasicStream s) { - int h = 5381; - IceInternal.HashUtil.hashAdd(ref h, Ice.UDPEndpointType.value); - IceInternal.HashUtil.hashAdd(ref h, _host); - IceInternal.HashUtil.hashAdd(ref h, _port); + base.streamWriteImpl(s); + if(s.getWriteEncoding().Equals(Ice.Util.Encoding_1_0)) + { + Ice.Util.Protocol_1_0.write__(s); + Ice.Util.Encoding_1_0.write__(s); + } + // Not transmitted. + //s.writeBool(_connect); + s.writeBool(_compress); + } + + protected override void hashInit(ref int h) + { + base.hashInit(ref h); IceInternal.HashUtil.hashAdd(ref h, _mcastInterface); IceInternal.HashUtil.hashAdd(ref h, _mcastTtl); IceInternal.HashUtil.hashAdd(ref h, _connect); - IceInternal.HashUtil.hashAdd(ref h, connectionId_); IceInternal.HashUtil.hashAdd(ref h, _compress); - _hashCode = h; } - - private Instance instance_; - private string _host; - private int _port; + + protected override void fillEndpointInfo(Ice.IPEndpointInfo info) + { + base.fillEndpointInfo(info); + if(info is Ice.UDPEndpointInfo) + { + Ice.UDPEndpointInfo udpInfo = (Ice.UDPEndpointInfo)info; + udpInfo.timeout = -1; + udpInfo.compress = _compress; + udpInfo.mcastInterface = _mcastInterface; + udpInfo.mcastTtl = _mcastTtl; + } + } + + protected override bool checkOption(string option, string argument, string endpoint) + { + if(base.checkOption(option, argument, endpoint)) + { + return true; + } + + if(option.Equals("-c")) + { + if(argument != null) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(); + e.str = "unexpected argument `" + argument + "' provided for -c option in " + endpoint; + throw e; + } + + _connect = true; + } + else if(option.Equals("-z")) + { + if(argument != null) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(); + e.str = "unexpected argument `" + argument + "' provided for -z option in " + endpoint; + throw e; + } + + _compress = true; + } + else if(option.Equals("-v") || option.Equals("-e")) + { + if(argument == null) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(); + e.str = "no argument provided for " + option + " option in endpoint " + endpoint; + throw e; + } + + try + { + Ice.EncodingVersion v = Ice.Util.stringToEncodingVersion(argument); + if(v.major != 1 || v.minor != 0) + { + instance_.logger().warning("deprecated udp endpoint option: " + option); + } + } + catch(Ice.VersionParseException ex) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(); + e.str = "invalid version `" + argument + "' in endpoint " + endpoint + ":\n" + ex.str; + throw e; + } + } + else if(option.Equals("--interface")) + { + if(argument == null) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(); + e.str = "no argument provided for --interface option in endpoint " + endpoint; + throw e; + } + + _mcastInterface = argument; + } + else if(option.Equals("--ttl")) + { + if(argument == null) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(); + e.str = "no argument provided for --ttl option in endpoint " + endpoint; + throw e; + } + + try + { + _mcastTtl = System.Int32.Parse(argument, CultureInfo.InvariantCulture); + } + catch(System.FormatException ex) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(ex); + e.str = "invalid TTL value `" + argument + "' in endpoint " + endpoint; + throw e; + } + + if(_mcastTtl < 0) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(); + e.str = "TTL value `" + argument + "' out of range in endpoint " + endpoint; + throw e; + } + } + else + { + return false; + } + + return true; + } + + protected override Connector createConnector(EndPoint addr, NetworkProxy proxy) + { + return new UdpConnector(instance_, addr, _mcastInterface, _mcastTtl, connectionId_); + } + + protected override IPEndpointI createEndpoint(string host, int port, string connectionId) + { + return new UdpEndpointI(instance_, host, port, _mcastInterface, _mcastTtl, _connect, connectionId, + _compress); + } + private string _mcastInterface = ""; private int _mcastTtl = -1; private bool _connect; private bool _compress; - private int _hashCode; } sealed class UdpEndpointFactory : EndpointFactory { - internal UdpEndpointFactory(Instance instance) + internal UdpEndpointFactory(ProtocolInstance instance) { - instance_ = instance; + _instance = instance; } - + public short type() { - return Ice.UDPEndpointType.value; + return _instance.type(); } - + public string protocol() { - return "udp"; + return _instance.protocol(); } - - public EndpointI create(string str, bool oaEndpoint) + + public EndpointI create(List<string> args, bool oaEndpoint) { - return new UdpEndpointI(instance_, str, oaEndpoint); + IPEndpointI endpt = new UdpEndpointI(_instance); + endpt.initWithOptions(args, oaEndpoint); + return endpt; } - + public EndpointI read(BasicStream s) { - return new UdpEndpointI(s); + return new UdpEndpointI(_instance, s); } - + public void destroy() { - instance_ = null; + _instance = null; } - - private Instance instance_; + + public EndpointFactory clone(ProtocolInstance instance) + { + return new UdpEndpointFactory(instance); + } + + private ProtocolInstance _instance; } } diff --git a/cs/src/Ice/UdpTransceiver.cs b/cs/src/Ice/UdpTransceiver.cs index 550b5c6f380..292ea96d4fc 100644 --- a/cs/src/Ice/UdpTransceiver.cs +++ b/cs/src/Ice/UdpTransceiver.cs @@ -27,7 +27,7 @@ namespace IceInternal sealed class UdpTransceiver : Transceiver { - public int initialize() + public int initialize(Buffer readBuffer, Buffer writeBuffer, ref bool hasMoreData) { if(_state == StateNeedConnect) { @@ -61,7 +61,7 @@ namespace IceInternal { // // On Windows, we delay the join for the mcast group after the connection - // establishment succeeds. This is necessary for older Windows versions + // establishment succeeds. This is necessary for older Windows versions // where joining the group fails if the socket isn't bound. See ICE-5113. // if(Network.isMulticast((IPEndPoint)_addr)) @@ -73,16 +73,16 @@ namespace IceInternal } } } -#endif +#endif _state = StateConnected; } if(_state == StateConnected) { - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { - string s = "starting to send udp packets\n" + ToString(); - _logger.trace(_traceLevels.networkCat, s); + string s = "starting to send " + protocol() + " packets\n" + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } Debug.Assert(_state == StateConnected); } @@ -90,14 +90,22 @@ namespace IceInternal return SocketOperation.None; } + public int closing(bool initiator, Ice.LocalException ex) + { + // + // Nothing to do. + // + return SocketOperation.None; + } + public void close() { - if(_state >= StateConnected && _traceLevels.network >= 1) + if(_state >= StateConnected && _instance.traceLevel() >= 1) { - string s = "closing udp connection\n" + ToString(); - _logger.trace(_traceLevels.networkCat, s); + string s = "closing " + protocol() + " connection\n" + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } - + if(_fd != null) { try @@ -111,16 +119,21 @@ namespace IceInternal } } - public bool write(Buffer buf) + public int write(Buffer buf) { #if COMPACT || SILVERLIGHT // // Silverlight and the Compact .NET Framework don't support the use of synchronous socket - // operations on a non-blocking socket. Returning false here forces the + // operations on a non-blocking socket. Returning SocketOperation.Write here forces the // caller to schedule an asynchronous operation. // - return false; + return SocketOperation.Write; #else + if(!buf.b.hasRemaining()) + { + return SocketOperation.None; + } + Debug.Assert(buf.b.position() == 0); Debug.Assert(_fd != null && _state >= StateConnected); @@ -152,10 +165,10 @@ namespace IceInternal { continue; } - + if(Network.wouldBlock(ex)) { - return false; + return SocketOperation.Write; } if(Network.connectionLost(ex)) @@ -174,27 +187,33 @@ namespace IceInternal } Debug.Assert(ret > 0); - - if(_traceLevels.network >= 3) + + if(_instance.traceLevel() >= 3) { - string s = "sent " + ret + " bytes via udp\n" + ToString(); - _logger.trace(_traceLevels.networkCat, s); + string s = "sent " + ret + " bytes via " + protocol() + "\n" + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } + Debug.Assert(ret == buf.b.limit()); - return true; + return SocketOperation.None; #endif } - public bool read(Buffer buf) + public int read(Buffer buf, ref bool hasMoreData) { #if COMPACT || SILVERLIGHT // // Silverlight and the Compact .NET Framework don't support the use of synchronous socket - // operations on a non-blocking socket. Returning false here forces the + // operations on a non-blocking socket. Returning SocketOperation.Read here forces the // caller to schedule an asynchronous operation. // - return false; + return SocketOperation.Read; #else + if(!buf.b.hasRemaining()) + { + return SocketOperation.None; + } + Debug.Assert(buf.b.position() == 0); Debug.Assert(_fd != null); @@ -236,7 +255,7 @@ namespace IceInternal { if(Network.recvTruncated(e)) { - // The message was truncated and the whole buffer is filled. We ignore + // The message was truncated and the whole buffer is filled. We ignore // this error here, it will be detected at the connection level when // the Ice message size is checked against the buffer size. ret = buf.size(); @@ -247,12 +266,12 @@ namespace IceInternal { continue; } - + if(Network.wouldBlock(e)) { - return false; + return SocketOperation.Read; } - + if(Network.connectionLost(e)) { throw new Ice.ConnectionLostException(); @@ -267,7 +286,7 @@ namespace IceInternal throw new Ice.SyscallException(e); } } - + if(ret == 0) { throw new Ice.ConnectionLostException(); @@ -284,23 +303,23 @@ namespace IceInternal Debug.Assert(connected); _state = StateConnected; // We're connected now - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { - string s = "connected udp socket\n" + ToString(); - _logger.trace(_traceLevels.networkCat, s); + string s = "connected " + protocol() + " socket\n" + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } } - if(_traceLevels.network >= 3) + if(_instance.traceLevel() >= 3) { - string s = "received " + ret + " bytes via udp\n" + ToString(); - _logger.trace(_traceLevels.networkCat, s); + string s = "received " + ret + " bytes via " + protocol() + "\n" + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } buf.resize(ret, true); buf.b.position(ret); - return true; + return SocketOperation.None; #endif } @@ -313,7 +332,7 @@ namespace IceInternal buf.b.position(0); try - { + { if(_state == StateConnected) { _readCallback = callback; @@ -322,7 +341,7 @@ namespace IceInternal _readEventArgs.SetBuffer(buf.b.rawBytes(), buf.b.position(), packetSize); return !_fd.ReceiveAsync(_readEventArgs); #else - _readResult = _fd.BeginReceive(buf.b.rawBytes(), 0, buf.b.limit(), SocketFlags.None, + _readResult = _fd.BeginReceive(buf.b.rawBytes(), 0, buf.b.limit(), SocketFlags.None, readCompleted, state); return _readResult.CompletedSynchronously; #endif @@ -349,7 +368,7 @@ namespace IceInternal peerAddr = new IPEndPoint(IPAddress.IPv6Any, 0); } } - _readResult = _fd.BeginReceiveFrom(buf.b.rawBytes(), 0, buf.b.limit(), SocketFlags.None, + _readResult = _fd.BeginReceiveFrom(buf.b.rawBytes(), 0, buf.b.limit(), SocketFlags.None, ref peerAddr, readCompleted, state); return _readResult.CompletedSynchronously; #endif @@ -424,7 +443,7 @@ namespace IceInternal { if(Network.recvTruncated(ex)) { - // The message was truncated and the whole buffer is filled. We ignore + // The message was truncated and the whole buffer is filled. We ignore // this error here, it will be detected at the connection level when // the Ice message size is checked against the buffer size. ret = buf.size(); @@ -435,7 +454,7 @@ namespace IceInternal { throw new Ice.ConnectionLostException(ex); } - + if(Network.connectionRefused(ex)) { throw new Ice.ConnectionRefusedException(ex); @@ -470,17 +489,17 @@ namespace IceInternal Debug.Assert(connected); _state = StateConnected; // We're connected now - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { string s = "connected udp socket\n" + ToString(); - _logger.trace(_traceLevels.networkCat, s); + _instance.logger().trace(_instance.traceCategory(), s); } } - if(_traceLevels.network >= 3) + if(_instance.traceLevel() >= 3) { string s = "received " + ret + " bytes via udp\n" + ToString(); - _logger.trace(_traceLevels.networkCat, s); + _instance.logger().trace(_instance.traceCategory(), s); } buf.resize(ret, true); @@ -521,7 +540,7 @@ namespace IceInternal _writeEventArgs.SetBuffer(buf.b.rawBytes(), 0, buf.b.limit()); completedSynchronously = !_fd.SendAsync(_writeEventArgs); #else - _writeResult = _fd.BeginSend(buf.b.rawBytes(), 0, buf.b.limit(), SocketFlags.None, + _writeResult = _fd.BeginSend(buf.b.rawBytes(), 0, buf.b.limit(), SocketFlags.None, writeCompleted, state); completedSynchronously = _writeResult.CompletedSynchronously; #endif @@ -633,25 +652,24 @@ namespace IceInternal { throw new Ice.ConnectionLostException(); } - + Debug.Assert(ret > 0); - - if(_traceLevels.network >= 3) + + if(_instance.traceLevel() >= 3) { string s = "sent " + ret + " bytes via udp\n" + ToString(); - _logger.trace(_traceLevels.networkCat, s); + _instance.logger().trace(_instance.traceCategory(), s); } Debug.Assert(ret == buf.b.limit()); buf.b.position(buf.b.position() + ret); } - public string type() + public string protocol() { - return "udp"; + return _instance.protocol(); } - public Ice.ConnectionInfo - getInfo() + public Ice.ConnectionInfo getInfo() { Ice.UDPConnectionInfo info = new Ice.UDPConnectionInfo(); if(_fd != null) @@ -695,7 +713,7 @@ namespace IceInternal } // - // The maximum packetSize is either the maximum allowable UDP packet size, or + // The maximum packetSize is either the maximum allowable UDP packet size, or // the UDP send buffer size (which ever is smaller). // int packetSize = System.Math.Min(_maxPacketSize, _sndSize - _udpOverhead); @@ -730,8 +748,8 @@ namespace IceInternal if(_mcastAddr != null) { s += "\nmulticast address = " + Network.addrToString(_mcastAddr); - } -#endif + } +#endif return s; } @@ -743,10 +761,9 @@ namespace IceInternal // // Only for use by UdpConnector. // - internal UdpTransceiver(Instance instance, EndPoint addr, string mcastInterface, int mcastTtl) + internal UdpTransceiver(ProtocolInstance instance, EndPoint addr, string mcastInterface, int mcastTtl) { - _traceLevels = instance.traceLevels(); - _logger = instance.initializationData().logger; + _instance = instance; _addr = addr; #if ICE_SOCKET_ASYNC_API @@ -758,7 +775,7 @@ namespace IceInternal _writeEventArgs.RemoteEndPoint = _addr; _writeEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(ioCompleted); #if SILVERLIGHT - String policy = instance.initializationData().properties.getProperty("Ice.ClientAccessPolicyProtocol"); + String policy = instance.properties().getProperty("Ice.ClientAccessPolicyProtocol"); if(policy.Equals("Http")) { _readEventArgs.SocketClientAccessPolicyProtocol = SocketClientAccessPolicyProtocol.Http; @@ -766,7 +783,7 @@ namespace IceInternal } else if(!String.IsNullOrEmpty(policy)) { - _logger.warning("Ignoring invalid Ice.ClientAccessPolicyProtocol value `" + policy + "'"); + _instance.logger().warning("Ignoring invalid Ice.ClientAccessPolicyProtocol value `" + policy + "'"); } #endif #endif @@ -779,14 +796,14 @@ namespace IceInternal try { _fd = Network.createSocket(true, _addr.AddressFamily); - setBufSize(instance); + setBufSize(instance.properties()); #if !SILVERLIGHT Network.setBlock(_fd, false); if(AssemblyUtil.osx_) { // // On Windows, we delay the join for the mcast group after the connection - // establishment succeeds. This is necessary for older Windows versions + // establishment succeeds. This is necessary for older Windows versions // where joining the group fails if the socket isn't bound. See ICE-5113. // if(Network.isMulticast((IPEndPoint)_addr)) @@ -810,13 +827,12 @@ namespace IceInternal // // Only for use by UdpEndpoint. // - internal UdpTransceiver(Instance instance, string host, int port, string mcastInterface, bool connect) + internal UdpTransceiver(ProtocolInstance instance, string host, int port, string mcastInterface, bool connect) { - _traceLevels = instance.traceLevels(); - _logger = instance.initializationData().logger; + _instance = instance; _state = connect ? StateNeedConnect : StateNotConnected; _incoming = true; - + try { _addr = Network.getAddressForServer(host, port, instance.protocolSupport(), instance.preferIPv6()); @@ -825,23 +841,24 @@ namespace IceInternal _readEventArgs = new SocketAsyncEventArgs(); _readEventArgs.RemoteEndPoint = _addr; _readEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(ioCompleted); - + _writeEventArgs = new SocketAsyncEventArgs(); _writeEventArgs.RemoteEndPoint = _addr; _writeEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(ioCompleted); #endif _fd = Network.createServerSocket(true, _addr.AddressFamily, instance.protocolSupport()); - setBufSize(instance); + setBufSize(instance.properties()); #if !SILVERLIGHT Network.setBlock(_fd, false); #endif - if(_traceLevels.network >= 2) + if(_instance.traceLevel() >= 2) { - string s = "attempting to bind to udp socket " + Network.addrToString(_addr); - _logger.trace(_traceLevels.networkCat, s); + string s = "attempting to bind to " + instance.protocol() + " socket " + + Network.addrToString(_addr); + _instance.logger().trace(_instance.traceCategory(), s); } - + #if !SILVERLIGHT if(Network.isMulticast((IPEndPoint)_addr)) { @@ -852,7 +869,7 @@ namespace IceInternal // // Windows does not allow binding to the mcast address itself // so we bind to INADDR_ANY (0.0.0.0) instead. As a result, - // bi-directional connection won't work because the source + // bi-directional connection won't work because the source // address won't the multicast address and the client will // therefore reject the datagram. // @@ -877,13 +894,12 @@ namespace IceInternal if(AssemblyUtil.platform_ != AssemblyUtil.Platform.Windows) { // - // Enable SO_REUSEADDR on Unix platforms to allow - // re-using the socket even if it's in the TIME_WAIT - // state. On Windows, this doesn't appear to be - // necessary and enabling SO_REUSEADDR would actually - // not be a good thing since it allows a second - // process to bind to an address even it's already - // bound by another process. + // Enable SO_REUSEADDR on Unix platforms to allow re-using + // the socket even if it's in the TIME_WAIT state. On + // Windows, this doesn't appear to be necessary and + // enabling SO_REUSEADDR would actually not be a good + // thing since it allows a second process to bind to an + // address even it's already bound by another process. // // TODO: using SO_EXCLUSIVEADDRUSE on Windows would // probably be better but it's only supported by recent @@ -894,18 +910,18 @@ namespace IceInternal _addr = Network.doBind(_fd, _addr); } #endif - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { - StringBuilder s = new StringBuilder("starting to receive udp packets\n"); + StringBuilder s = new StringBuilder("starting to receive " + instance.protocol() + " packets\n"); s.Append(ToString()); List<string> interfaces = Network.getHostsForEndpointExpand( - Network.endpointAddressToString(_addr), instance.protocolSupport(), true); + Network.endpointAddressToString(_addr), instance.protocolSupport(), true); if(interfaces.Count != 0) { s.Append("\nlocal interfaces: "); s.Append(String.Join(", ", interfaces.ToArray())); } - _logger.trace(_traceLevels.networkCat, s.ToString()); + _instance.logger().trace(_instance.traceCategory(), s.ToString()); } } catch(Ice.LocalException) @@ -915,7 +931,7 @@ namespace IceInternal } } - private void setBufSize(Instance instance) + private void setBufSize(Ice.Properties properties) { Debug.Assert(_fd != null); @@ -942,11 +958,11 @@ namespace IceInternal // // Get property for buffer size and check for sanity. // - int sizeRequested = - instance.initializationData().properties.getPropertyAsIntWithDefault(prop, dfltSize); + int sizeRequested = properties.getPropertyAsIntWithDefault(prop, dfltSize); if(sizeRequested < (_udpOverhead + IceInternal.Protocol.headerSize)) { - _logger.warning("Invalid " + prop + " value of " + sizeRequested + " adjusted to " + dfltSize); + _instance.logger().warning("Invalid " + prop + " value of " + sizeRequested + " adjusted to " + + dfltSize); sizeRequested = dfltSize; } @@ -976,8 +992,8 @@ namespace IceInternal // if(sizeSet < sizeRequested) { - _logger.warning("UDP " + direction + " buffer size: requested size of " + sizeRequested + - " adjusted to " + sizeSet); + _instance.logger().warning("UDP " + direction + " buffer size: requested size of " + + sizeRequested + " adjusted to " + sizeSet); } } } @@ -1020,8 +1036,7 @@ namespace IceInternal } #endif - private TraceLevels _traceLevels; - private Ice.Logger _logger; + private ProtocolInstance _instance; private int _state; private bool _incoming; private int _rcvSize; diff --git a/cs/src/IceSSL/AcceptorI.cs b/cs/src/IceSSL/AcceptorI.cs index 273925131c1..2e89e913147 100644 --- a/cs/src/IceSSL/AcceptorI.cs +++ b/cs/src/IceSSL/AcceptorI.cs @@ -23,14 +23,14 @@ namespace IceSSL { public void close() { - if(_instance.networkTraceLevel() >= 1) + if(_instance.traceLevel() >= 1) { - string s = "stopping to accept ssl connections at " + ToString(); - _logger.trace(_instance.networkTraceCategory(), s); + string s = "stopping to accept " + protocol() + " connections at " + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } - + Debug.Assert(_acceptFd == null); - + try { _fd.Close(); @@ -46,20 +46,20 @@ namespace IceSSL { IceInternal.Network.doListen(_fd, _backlog); - if(_instance.networkTraceLevel() >= 1) + if(_instance.traceLevel() >= 1) { - StringBuilder s = new StringBuilder("listening for ssl connections at "); + StringBuilder s = new StringBuilder("listening for " + protocol() + " connections at "); s.Append(ToString()); - List<string> interfaces = - IceInternal.Network.getHostsForEndpointExpand(_addr.Address.ToString(), + List<string> interfaces = + IceInternal.Network.getHostsForEndpointExpand(_addr.Address.ToString(), _instance.protocolSupport(), true); if(interfaces.Count != 0) { s.Append("\nlocal interfaces: "); s.Append(String.Join(", ", interfaces.ToArray())); } - _logger.trace(_instance.networkTraceCategory(), s.ToString()); + _instance.logger().trace(_instance.traceCategory(), s.ToString()); } } @@ -117,12 +117,13 @@ namespace IceSSL } IceInternal.Network.setBlock(_acceptFd, true); // SSL requires a blocking socket. - IceInternal.Network.setTcpBufSize(_acceptFd, _instance.communicator().getProperties(), _logger); + IceInternal.Network.setTcpBufSize(_acceptFd, _instance.properties(), _instance.logger()); - if(_instance.networkTraceLevel() >= 1) + if(_instance.traceLevel() >= 1) { - string s = "attempting to accept ssl connection\n" + IceInternal.Network.fdToString(_acceptFd); - _logger.trace(_instance.networkTraceCategory(), s); + string s = "attempting to accept " + protocol() + " connection\n" + + IceInternal.Network.fdToString(_acceptFd); + _instance.logger().trace(_instance.traceCategory(), s); } Socket acceptFd = _acceptFd; @@ -131,6 +132,11 @@ namespace IceSSL return new TransceiverI(_instance, acceptFd, "", true, true, _adapterName, null, null); } + public string protocol() + { + return _instance.protocol(); + } + public override string ToString() { return IceInternal.Network.addrToString(_addr); @@ -145,8 +151,7 @@ namespace IceSSL { _instance = instance; _adapterName = adapterName; - _logger = instance.communicator().getLogger(); - _backlog = instance.communicator().getProperties().getPropertyAsIntWithDefault("Ice.TCP.Backlog", 511); + _backlog = instance.properties().getPropertyAsIntWithDefault("Ice.TCP.Backlog", 511); // // .NET requires that a certificate be supplied. @@ -162,10 +167,11 @@ namespace IceSSL try { int protocol = instance.protocolSupport(); - _addr = (IPEndPoint)IceInternal.Network.getAddressForServer(host, port, protocol, instance.preferIPv6()); + _addr = IceInternal.Network.getAddressForServer(host, port, protocol, instance.preferIPv6()) as + IPEndPoint; _fd = IceInternal.Network.createServerSocket(false, _addr.AddressFamily, protocol); IceInternal.Network.setBlock(_fd, false); - IceInternal.Network.setTcpBufSize(_fd, _instance.communicator().getProperties(), _logger); + IceInternal.Network.setTcpBufSize(_fd, _instance.properties(), _instance.logger()); if(IceInternal.AssemblyUtil.platform_ != IceInternal.AssemblyUtil.Platform.Windows) { // @@ -183,10 +189,11 @@ namespace IceSSL // IceInternal.Network.setReuseAddress(_fd, true); } - if(_instance.networkTraceLevel() >= 2) + if(_instance.traceLevel() >= 2) { - string s = "attempting to bind to ssl socket " + IceInternal.Network.addrToString(_addr); - _logger.trace(_instance.networkTraceCategory(), s); + string s = "attempting to bind to " + protocol() + " socket " + + IceInternal.Network.addrToString(_addr); + _instance.logger().trace(_instance.traceCategory(), s); } _addr = IceInternal.Network.doBind(_fd, _addr); } @@ -199,7 +206,6 @@ namespace IceSSL private Instance _instance; private string _adapterName; - private Ice.Logger _logger; private Socket _fd; private Socket _acceptFd; private System.Exception _acceptError; diff --git a/cs/src/IceSSL/ConnectorI.cs b/cs/src/IceSSL/ConnectorI.cs index 9aeeb0ae3e0..d903bf712c8 100644 --- a/cs/src/IceSSL/ConnectorI.cs +++ b/cs/src/IceSSL/ConnectorI.cs @@ -17,8 +17,6 @@ namespace IceSSL sealed class ConnectorI : IceInternal.Connector { - internal const short TYPE = 2; - public IceInternal.Transceiver connect() { // @@ -31,10 +29,10 @@ namespace IceSSL throw ex; } - if(_instance.networkTraceLevel() >= 2) + if(_instance.traceLevel() >= 2) { - string s = "trying to establish ssl connection to " + ToString(); - _logger.trace(_instance.networkTraceCategory(), s); + string s = "trying to establish " + _instance.protocol() + " connection to " + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } try @@ -49,7 +47,7 @@ namespace IceSSL // if(_addr.AddressFamily != AddressFamily.InterNetworkV6 || !IceInternal.AssemblyUtil.xp_) { - IceInternal.Network.setTcpBufSize(fd, _instance.communicator().getProperties(), _logger); + IceInternal.Network.setTcpBufSize(fd, _instance.properties(), _instance.logger()); } // @@ -59,10 +57,11 @@ namespace IceSSL } catch(Ice.LocalException ex) { - if(_instance.networkTraceLevel() >= 2) + if(_instance.traceLevel() >= 2) { - string s = "failed to establish ssl connection to " + ToString() + "\n" + ex; - _logger.trace(_instance.networkTraceCategory(), s); + string s = "failed to establish " + _instance.protocol() + " connection to " + ToString() + "\n" + + ex; + _instance.logger().trace(_instance.traceCategory(), s); } throw; } @@ -70,20 +69,20 @@ namespace IceSSL public short type() { - return TYPE; + return _instance.type(); } // // Only for use by EndpointI. // - internal ConnectorI(Instance instance, string host, EndPoint addr, IceInternal.NetworkProxy proxy, int timeout, + internal ConnectorI(Instance instance, string host, EndPoint addr, IceInternal.NetworkProxy proxy, int timeout, string conId) { _instance = instance; _host = host; - _logger = instance.communicator().getLogger(); + _instance.logger() = instance.communicator().getLogger(); _addr = (IPEndPoint)addr; - _proxy = proxy; + _proxy = proxy; _timeout = timeout; _connectionId = conId; @@ -130,7 +129,6 @@ namespace IceSSL } private Instance _instance; - private Ice.Logger _logger; private string _host; private IPEndPoint _addr; private IceInternal.NetworkProxy _proxy; diff --git a/cs/src/IceSSL/EndpointI.cs b/cs/src/IceSSL/EndpointI.cs index fe1bbbe321f..cb39879cacc 100644 --- a/cs/src/IceSSL/EndpointI.cs +++ b/cs/src/IceSSL/EndpointI.cs @@ -14,281 +14,66 @@ namespace IceSSL using System.Collections.Generic; using System.Net; using System.Globalization; - - sealed class EndpointI : IceInternal.EndpointI + + sealed class EndpointI : IceInternal.IPEndpointI { - internal EndpointI(Instance instance, string ho, int po, int ti, string conId, bool co) : base(conId) + internal EndpointI(Instance instance, string ho, int po, int ti, string conId, bool co) : + base(instance, ho, po, conId) { _instance = instance; - _host = ho; - _port = po; _timeout = ti; - connectionId_ = conId; _compress = co; - calcHashValue(); } - internal EndpointI(Instance instance, string str, bool oaEndpoint) : base("") + internal EndpointI(Instance instance) { + base(instance); _instance = instance; - _host = null; - _port = 0; _timeout = -1; _compress = false; - - char[] separators = { ' ', '\t', '\n', '\r' }; - string[] arr = str.Split(separators); - - int i = 0; - while(i < arr.Length) - { - if(arr[i].Length == 0) - { - i++; - continue; - } - - string option = arr[i++]; - if(option.Length != 2 || option[0] != '-') - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "expected an endpoint option but found `" + option + "' in endpoint `ssl " + str + "'"; - throw e; - } - - string argument = null; - if(i < arr.Length && arr[i][0] != '-') - { - argument = arr[i++]; - if(argument[0] == '\"' && argument[argument.Length - 1] == '\"') - { - argument = argument.Substring(1, argument.Length - 2); - } - } - - switch(option[1]) - { - case 'h': - { - if(argument == null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "no argument provided for -h option in endpoint `ssl " + str + "'"; - throw e; - } - - _host = argument; - break; - } - - case 'p': - { - if(argument == null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "no argument provided for -p option in endpoint `ssl " + str + "'"; - throw e; - } - - try - { - _port = System.Int32.Parse(argument, CultureInfo.InvariantCulture); - } - catch(System.FormatException ex) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(ex); - e.str = "invalid port value `" + argument + "' in endpoint `ssl " + str + "'"; - throw e; - } - - if(_port < 0 || _port > 65535) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "port value `" + argument + "' out of range in endpoint `ssl " + str + "'"; - throw e; - } - - break; - } - - case 't': - { - if(argument == null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "no argument provided for -t option in endpoint `ssl " + str + "'"; - throw e; - } - - try - { - _timeout = System.Int32.Parse(argument, CultureInfo.InvariantCulture); - } - catch(System.FormatException ex) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(ex); - e.str = "invalid timeout value `" + argument + "' in endpoint `ssl " + str + "'"; - throw e; - } - - break; - } - - case 'z': - { - if(argument != null) - { - Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "unexpected argument `" + argument + "' provided for -z option in `ssl " + str + - "'"; - throw e; - } - - _compress = true; - break; - } - - default: - { - throw new Ice.EndpointParseException("unknown option `" + option + "' in endpoint `ssl " + str + "'"); - } - } - } - - if(_host == null) - { - _host = _instance.defaultHost(); - } - else if(_host.Equals("*")) - { - if(oaEndpoint) - { - _host = null; - } - else - { - throw new Ice.EndpointParseException("`-h *' not valid for proxy endpoint `ssl " + str + "'"); - } - } - - if(_host == null) - { - _host = ""; - } - - calcHashValue(); } internal EndpointI(Instance instance, IceInternal.BasicStream s) { + base(instance, s); _instance = instance; - s.startReadEncaps(); - _host = s.readString(); - _port = s.readInt(); _timeout = s.readInt(); _compress = s.readBool(); - s.endReadEncaps(); - calcHashValue(); - } - - // - // Marshal the endpoint. - // - public override void streamWrite(IceInternal.BasicStream s) - { - s.writeShort(EndpointType.value); - s.startWriteEncaps(); - s.writeString(_host); - s.writeInt(_port); - s.writeInt(_timeout); - s.writeBool(_compress); - s.endWriteEncaps(); } - // - // Convert the endpoint to its string form. - // - public override string ice_toString_() - { - // - // WARNING: Certain features, such as proxy validation in Glacier2, - // depend on the format of proxy strings. Changes to toString() and - // methods called to generate parts of the reference string could break - // these features. Please review for all features that depend on the - // format of proxyToString() before changing this and related code. - // - string s = "ssl"; - - if(_host != null && _host.Length != 0) - { - s += " -h "; - bool addQuote = _host.IndexOf(':') != -1; - if(addQuote) - { - s += "\""; - } - s += _host; - if(addQuote) - { - s += "\""; - } - } - s += " -p " + _port; - if(_timeout != -1) - { - s += " -t " + _timeout; - } - if(_compress) - { - s += " -z"; - } - return s; - } - private sealed class InfoI : IceSSL.EndpointInfo { - public InfoI(int to, bool comp, string h, int p) : base(to, comp, h, p) + public InfoI(EndpointI e) { + _endpoint = e; } override public short type() { - return EndpointType.value; + return _endpoint.type(); } - + override public bool datagram() { - return false; + return _endpoint.datagram(); } - + override public bool secure() { - return true; + return _endpoint.secure(); } - }; - // - // Return the endpoint information. - // - public override Ice.EndpointInfo getInfo() - { - return new InfoI(_timeout, _compress, _host, _port); + private EndpointI _endpoint; } // - // Return the endpoint type. - // - public override short type() - { - return EndpointType.value; - } - - // - // Return the protocol name; + // Return the endpoint information. // - public override string protocol() + public override Ice.EndpointInfo getInfo() { - return "ssl"; + InfoI info = new InfoI(this); + fillEndpointInfo(info); + return info; } // @@ -313,22 +98,7 @@ namespace IceSSL } else { - return new EndpointI(_instance, _host, _port, timeout, connectionId_, _compress); - } - } - - // - // Return a new endpoint with a different connection id. - // - public override IceInternal.EndpointI connectionId(string connectionId) - { - if(connectionId == connectionId_) - { - return this; - } - else - { - return new EndpointI(_instance, _host, _port, _timeout, connectionId, _compress); + return new EndpointI(_instance, host_, port_, timeout, connectionId_, _compress); } } @@ -354,7 +124,7 @@ namespace IceSSL } else { - return new EndpointI(_instance, _host, _port, _timeout, connectionId_, compress); + return new EndpointI(_instance, host_, port_, _timeout, connectionId_, compress); } } @@ -388,21 +158,6 @@ namespace IceSSL } // - // Return connectors for this endpoint, or empty list if no connector - // is available. - // - public override List<IceInternal.Connector> connectors(Ice.EndpointSelectionType selType) - { - return _instance.endpointHostResolver().resolve(_host, _port, selType, this); - } - - public override void connectors_async(Ice.EndpointSelectionType selType, - IceInternal.EndpointI_connectors callback) - { - _instance.endpointHostResolver().resolve(_host, _port, selType, this, callback); - } - - // // Return an acceptor for this endpoint, or null if no acceptor // is available. In case an acceptor is created, this operation // also returns a new "effective" endpoint, which might differ @@ -411,61 +166,33 @@ namespace IceSSL // public override IceInternal.Acceptor acceptor(ref IceInternal.EndpointI endpoint, string adapterName) { - AcceptorI p = new AcceptorI(_instance, adapterName, _host, _port); - endpoint = new EndpointI(_instance, _host, p.effectivePort(), _timeout, connectionId_, _compress); + AcceptorI p = new AcceptorI(_instance, adapterName, host_, port_); + endpoint = new EndpointI(_instance, host_, p.effectivePort(), _timeout, connectionId_, _compress); return p; } - // - // Expand endpoint out in to separate endpoints for each local - // host if listening on INADDR_ANY. - // - public override List<IceInternal.EndpointI> expand() + public override string options() { - List<IceInternal.EndpointI> endps = new List<IceInternal.EndpointI>(); - List<string> hosts = - IceInternal.Network.getHostsForEndpointExpand(_host, _instance.protocolSupport(), false); - if(hosts == null || hosts.Count == 0) - { - endps.Add(this); - } - else - { - foreach(string h in hosts) - { - endps.Add(new EndpointI(_instance, h, _port, _timeout, connectionId_, _compress)); - } - } - return endps; - } + // + // WARNING: Certain features, such as proxy validation in Glacier2, + // depend on the format of proxy strings. Changes to toString() and + // methods called to generate parts of the reference string could break + // these features. Please review for all features that depend on the + // format of proxyToString() before changing this and related code. + // + string s = base.options(); - // - // Check whether the endpoint is equivalent to another one. - // - public override bool equivalent(IceInternal.EndpointI endpoint) - { - if(!(endpoint is EndpointI)) + if(_timeout != -1) { - return false; + s += " -t " + _timeout; } - EndpointI sslEndpointI = (EndpointI)endpoint; - return sslEndpointI._host.Equals(_host) && sslEndpointI._port == _port; - } - - public override List<IceInternal.Connector> connectors(List<EndPoint> addresses, IceInternal.NetworkProxy proxy) - { - List<IceInternal.Connector> connectors = new List<IceInternal.Connector>(); - foreach(EndPoint addr in addresses) + if(_compress) { - connectors.Add(new ConnectorI(_instance, _host, addr, proxy, _timeout, connectionId_)); + s += " -z"; } - return connectors; - } - public override int GetHashCode() - { - return _hashCode; + return s; } // @@ -477,74 +204,124 @@ namespace IceSSL { return type() < obj.type() ? -1 : 1; } - + EndpointI p = (EndpointI)obj; if(this == p) { return 0; } - else - { - int r = base.CompareTo(p); - if(r != 0) - { - return r; - } - } - if(_port < p._port) + if(_timeout < p._timeout) { return -1; } - else if(p._port < _port) + else if(p._timeout < _timeout) { return 1; } - if(_timeout < p._timeout) + if(!_compress && p._compress) { return -1; } - else if(p._timeout < _timeout) + else if(!p._compress && _compress) { return 1; } - if(!connectionId_.Equals(p.connectionId_)) + return base.CompareTo(p); + } + + protected override void streamWriteImpl(IceInternal.BasicStream s) + { + base.streamWriteImpl(s); + s.writeInt(_timeout); + s.writeBool(_compress); + } + + protected override void hashInit(ref int h) + { + base.hashInit(ref h); + IceInternal.HashUtil.hashAdd(ref h, _timeout); + IceInternal.HashUtil.hashAdd(ref h, _compress); + } + + protected override void fillEndpointInfo(Ice.IPEndpointInfo info) + { + base.fillEndpointInfo(info); + if(info is IceSSL.EndpointInfo) { - return string.Compare(connectionId_, p.connectionId_, StringComparison.Ordinal); + IceSSL.EndpointInfo sslInfo = (IceSSL.EndpointInfo)info; + sslInfo.timeout = _timeout; + sslInfo.compress = _compress; } + } - if(!_compress && p._compress) + protected override bool checkOption(string option, string argument, string endpoint) + { + if(base.checkOption(option, argument, endpoint)) { - return -1; + return true; } - else if(!p._compress && _compress) + + switch(option[1]) { - return 1; + case 't': + { + if(argument == null) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(); + e.str = "no argument provided for -t option in endpoint " + endpoint; + throw e; + } + + try + { + _timeout = System.Int32.Parse(argument, CultureInfo.InvariantCulture); + } + catch(System.FormatException ex) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(ex); + e.str = "invalid timeout value `" + argument + "' in endpoint " + endpoint; + throw e; + } + + return true; + } + + case 'z': + { + if(argument != null) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(); + e.str = "unexpected argument `" + argument + "' provided for -z option in " + endpoint; + throw e; + } + + _compress = true; + return true; } - return string.Compare(_host, p._host, StringComparison.Ordinal); + default: + { + return false; + } + } } - private void calcHashValue() + protected override IceInternal.Connector createConnector(EndPoint addr, IceInternal.NetworkProxy proxy) { - int h = 5381; - IceInternal.HashUtil.hashAdd(ref h, EndpointType.value); - IceInternal.HashUtil.hashAdd(ref h, _host); - IceInternal.HashUtil.hashAdd(ref h, _port); - IceInternal.HashUtil.hashAdd(ref h, _timeout); - IceInternal.HashUtil.hashAdd(ref h, connectionId_); - IceInternal.HashUtil.hashAdd(ref h, _compress); - _hashCode = h; + return new ConnectorI(_instance, host_, addr, proxy, _timeout, connectionId_); + } + + protected override IceInternal.IPEndpointI createEndpoint(string host, int port, string connectionId) + { + return new EndpointI(_instance, host, port, _timeout, connectionId, _compress); } private Instance _instance; - private string _host; - private int _port; private int _timeout; private bool _compress; - private int _hashCode; } internal sealed class EndpointFactoryI : IceInternal.EndpointFactory @@ -556,17 +333,19 @@ namespace IceSSL public short type() { - return EndpointType.value; + return _instance.type(); } public string protocol() { - return "ssl"; + return _instance.protocol(); } - public IceInternal.EndpointI create(string str, bool oaEndpoint) + public IceInternal.EndpointI create(List<string> args, bool oaEndpoint) { - return new EndpointI(_instance, str, oaEndpoint); + IceInternal.IPEndpointI endpt = new EndpointI(_instance); + endpt.initWithOptions(args, oaEndpoint); + return endpt; } public IceInternal.EndpointI read(IceInternal.BasicStream s) @@ -579,6 +358,11 @@ namespace IceSSL _instance = null; } + public IceInternal.EndpointFactory clone(IceInternal.ProtocolInstance instance) + { + return new EndpointFactoryI(new Instance(_instance.sharedInstance(), instance.type(), instance.protocol())); + } + private Instance _instance; } } diff --git a/cs/src/IceSSL/Instance.cs b/cs/src/IceSSL/Instance.cs index 3ca13045433..b9c30126f70 100644 --- a/cs/src/IceSSL/Instance.cs +++ b/cs/src/IceSSL/Instance.cs @@ -19,24 +19,18 @@ namespace IceSSL using System.Security.Cryptography.X509Certificates; using System.Text; using System.Globalization; - - internal class Instance + + internal class SharedInstance { - internal Instance(Ice.Communicator communicator) + internal SharedInstance(IceInternal.ProtocolPluginFacade facade) { - _logger = communicator.getLogger(); - _facade = IceInternal.Util.getProtocolPluginFacade(communicator); - _securityTraceLevel = communicator.getProperties().getPropertyAsIntWithDefault("IceSSL.Trace.Security", 0); + _communicator = facade.getCommunicator(); + _logger = _communicator.getLogger(); + _facade = facade; + _securityTraceLevel = _communicator.getProperties().getPropertyAsIntWithDefault("IceSSL.Trace.Security", 0); _securityTraceCategory = "Security"; _initialized = false; _trustManager = new TrustManager(communicator); - - // - // Register the endpoint factory. We have to do this now, rather than - // in initialize, because the communicator may need to interpret - // proxies before the plug-in is fully initialized. - // - _facade.addEndpointFactory(new EndpointFactoryI(this)); } internal void initialize() @@ -208,7 +202,7 @@ namespace IceSSL _certs = new X509Certificate2Collection(); string certFile = properties.getProperty(prefix + "CertFile"); string passwordStr = properties.getProperty(prefix + "Password"); - + if(certFile.Length > 0) { if(!checkPath(ref certFile)) @@ -318,46 +312,6 @@ namespace IceSSL return _facade.getCommunicator(); } - internal IceInternal.EndpointHostResolver endpointHostResolver() - { - return _facade.getEndpointHostResolver(); - } - - internal int protocolSupport() - { - return _facade.getProtocolSupport(); - } - - internal bool preferIPv6() - { - return _facade.getPreferIPv6(); - } - - internal IceInternal.NetworkProxy networkProxy() - { - return _facade.getNetworkProxy(); - } - - internal Ice.EncodingVersion defaultEncoding() - { - return _facade.getDefaultEncoding(); - } - - internal string defaultHost() - { - return _facade.getDefaultHost(); - } - - internal int networkTraceLevel() - { - return _facade.getNetworkTraceLevel(); - } - - internal string networkTraceCategory() - { - return _facade.getNetworkTraceCategory(); - } - internal int securityTraceLevel() { return _securityTraceLevel; @@ -405,7 +359,7 @@ namespace IceSSL s.Append("\ncipher algorithm = " + stream.CipherAlgorithm + "/" + stream.CipherStrength); s.Append("\nkey exchange algorithm = " + stream.KeyExchangeAlgorithm + "/" + stream.KeyExchangeStrength); s.Append("\nprotocol = " + stream.SslProtocol); - communicator().getLogger().trace(_securityTraceCategory, s.ToString()); + _logger.trace(_securityTraceCategory, s.ToString()); } internal void verifyPeer(NativeConnectionInfo info, System.Net.Sockets.Socket fd, string address) @@ -633,7 +587,7 @@ namespace IceSSL if(_verifier != null && !_verifier.verify(info)) { - string msg = (info.incoming ? "incoming" : "outgoing") + + string msg = (info.incoming ? "incoming" : "outgoing") + " connection rejected by certificate verifier\n" + IceInternal.Network.fdToString(fd); if(_securityTraceLevel >= 1) { @@ -649,7 +603,8 @@ namespace IceSSL // // Parse a string of the form "location.name" into two parts. // - internal void parseStore(string prop, string store, ref StoreLocation loc, ref StoreName name, ref string sname) + private static void parseStore(string prop, string store, ref StoreLocation loc, ref StoreName name, + ref string sname) { int pos = store.IndexOf('.'); if(pos == -1) @@ -823,7 +778,7 @@ namespace IceSSL // Split strings using a delimiter. Quotes are supported. // Returns null for an unmatched quote. // - private string[] splitString(string str, char delim) + private static string[] splitString(string str, char delim) { ArrayList l = new ArrayList(); char[] arr = new char[str.Length]; @@ -889,7 +844,7 @@ namespace IceSSL private SslProtocols parseProtocols(string property) { SslProtocols result = SslProtocols.Default; - string[] arr = communicator().getProperties().getPropertyAsList(property); + string[] arr = _communicator.getProperties().getPropertyAsList(property); if(arr.Length > 0) { result = 0; @@ -912,13 +867,13 @@ namespace IceSSL protocol = "Tls"; break; } - case "TLS1_1": + case "TLS1_1": case "TLSV1_1": { protocol = "Tls11"; break; } - case "TLS1_2": + case "TLS1_2": case "TLSV1_2": { protocol = "Tls12"; @@ -946,7 +901,7 @@ namespace IceSSL return result; } - private X509Certificate2Collection findCertificates(string prop, string storeSpec, string value) + private static X509Certificate2Collection findCertificates(string prop, string storeSpec, string value) { StoreLocation storeLoc = 0; StoreName storeName = 0; @@ -1159,6 +1114,7 @@ namespace IceSSL return (next + len <= data.Length); } + private Ice.Communicator _communicator; private Ice.Logger _logger; private IceInternal.ProtocolPluginFacade _facade; private int _securityTraceLevel; @@ -1174,4 +1130,60 @@ namespace IceSSL private PasswordCallback _passwordCallback; private TrustManager _trustManager; } + + internal class Instance : IceInternal.ProtocolInstance + { + internal Instance(SharedInstance sharedInstance, short type, string protocol) + { + base(sharedInstance.communicator(), type, protocol); + _sharedInstance = sharedInstance; + } + + internal SharedInstance sharedInstance() + { + return _sharedInstance; + } + + internal int securityTraceLevel() + { + return _sharedInstance.securityTraceLevel(); + } + + internal string securityTraceCategory() + { + return _sharedInstance.securityTraceCategory(); + } + + internal bool initialized() + { + return _sharedInstance.initialized(); + } + + internal X509Certificate2Collection certs() + { + return _sharedInstance.certs(); + } + + internal SslProtocols protocols() + { + return _sharedInstance.protocols(); + } + + internal int checkCRL() + { + return _sharedInstance.checkCRL(); + } + + internal void traceStream(System.Net.Security.SslStream stream, string connInfo) + { + _sharedInstance.traceStream(stream, connInfo); + } + + internal void verifyPeer(NativeConnectionInfo info, System.Net.Sockets.Socket fd, string address) + { + _sharedInstance.verifyPeer(info, fd, address); + } + + private SharedInstance _sharedInstance; + } } diff --git a/cs/src/IceSSL/PluginI.cs b/cs/src/IceSSL/PluginI.cs index d5f212f6b84..7a2fb6a21c7 100644 --- a/cs/src/IceSSL/PluginI.cs +++ b/cs/src/IceSSL/PluginI.cs @@ -26,8 +26,7 @@ namespace IceSSL /// <returns>The new plug-in. null can be returned to indicate /// that a general error occurred. Alternatively, create can throw /// PluginInitializationException to provide more detailed information.</returns> - public Ice.Plugin - create(Ice.Communicator communicator, string name, string[] args) + public Ice.Plugin create(Ice.Communicator communicator, string name, string[] args) { return new PluginI(communicator); } @@ -35,47 +34,55 @@ namespace IceSSL public sealed class PluginI : Plugin { - public - PluginI(Ice.Communicator communicator) + public PluginI(Ice.Communicator communicator) { - instance_ = new Instance(communicator); + IceInternal.ProtocolPluginFacade facade = IceInternal.Util.getProtocolPluginFacade(communicator); + + _sharedInstance = new SharedInstance(facade); + + // + // Register the endpoint factory. We have to do this now, rather than + // in initialize, because the communicator may need to interpret + // proxies before the plug-in is fully initialized. + // + facade.addEndpointFactory( + new EndpointFactoryI(new Instance(_sharedInstance, IceSSL.EndpointType.value, "ssl"))); } public override void initialize() { - instance_.initialize(); + _sharedInstance.initialize(); } - public override void - destroy() + public override void destroy() { } public override void setCertificates(X509Certificate2Collection certs) { - instance_.setCertificates(certs); + _sharedInstance.setCertificates(certs); } public override void setCertificateVerifier(CertificateVerifier verifier) { - instance_.setCertificateVerifier(verifier); + _sharedInstance.setCertificateVerifier(verifier); } public override CertificateVerifier getCertificateVerifier() { - return instance_.getCertificateVerifier(); + return _sharedInstance.getCertificateVerifier(); } public override void setPasswordCallback(PasswordCallback callback) { - instance_.setPasswordCallback(callback); + _sharedInstance.setPasswordCallback(callback); } public override PasswordCallback getPasswordCallback() { - return instance_.getPasswordCallback(); + return _sharedInstance.getPasswordCallback(); } - private Instance instance_; + private SharedInstance _sharedInstance; } } diff --git a/cs/src/IceSSL/TransceiverI.cs b/cs/src/IceSSL/TransceiverI.cs index cd29bfa8bb9..89942b4dccb 100644 --- a/cs/src/IceSSL/TransceiverI.cs +++ b/cs/src/IceSSL/TransceiverI.cs @@ -24,7 +24,7 @@ namespace IceSSL sealed class TransceiverI : IceInternal.Transceiver { - public int initialize() + public int initialize(IceInternal.Buffer readBuffer, IceInternal.Buffer writeBuffer, ref bool hasMoreData) { try { @@ -34,7 +34,7 @@ namespace IceSSL _state = StateConnectPending; return IceInternal.SocketOperation.Connect; } - else if(_state == StateConnectPending) + else if(_state <= StateConnectPending) { IceInternal.Network.doFinishConnectAsync(_fd, _writeResult); _writeResult = null; @@ -43,7 +43,7 @@ namespace IceSSL if(_proxy != null) { _state = StateProxyConnectRequest; // Send proxy connect request - return IceInternal.SocketOperation.Write; + return IceInternal.SocketOperation.Write; } _state = StateAuthenticatePending; @@ -74,25 +74,32 @@ namespace IceSSL } catch(Ice.LocalException e) { - if(_instance.networkTraceLevel() >= 2) + if(_instance.traceLevel() >= 2) { System.Text.StringBuilder s = new System.Text.StringBuilder(); - s.Append("failed to establish ssl connection\n"); + s.Append("failed to establish " + protocol() + " connection\n"); s.Append(IceInternal.Network.fdToString(_fd, _proxy, _addr)); s.Append("\n"); s.Append(e.ToString()); - _logger.trace(_instance.networkTraceCategory(), s.ToString()); + _instance.logger().trace(_instance.traceCategory(), s.ToString()); } throw; } } + public int closing(bool initiator, Ice.LocalException ex) + { + // If we are initiating the connection closure, wait for the peer + // to close the TCP/IP connection. Otherwise, close immediately. + return initiator ? IceInternal.SocketOperation.Read : IceInternal.SocketOperation.None; + } + public void close() { - if(_state == StateConnected && _instance.networkTraceLevel() >= 1) + if(_state == StateConnected && _instance.traceLevel() >= 1) { - string s = "closing ssl connection\n" + ToString(); - _logger.trace(_instance.networkTraceCategory(), s); + string s = "closing " + protocol() + " connection\n" + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } Debug.Assert(_fd != null); @@ -121,16 +128,22 @@ namespace IceSSL } } - public bool write(IceInternal.Buffer buf) + public int write(IceInternal.Buffer buf) { Debug.Assert(_fd != null); - return false; // Caller will use async write. + // + // Force caller to use async write. + // + return buf.b.hasRemaining() ? IceInternal.SocketOperation.Write : IceInternal.SocketOperation.None; } - public bool read(IceInternal.Buffer buf) + public int read(IceInternal.Buffer buf, ref bool hasMoreData) { Debug.Assert(_fd != null); - return false; // Caller will use async read. + // + // Force caller to use async read. + // + return buf.b.hasRemaining() ? IceInternal.SocketOperation.Read : IceInternal.SocketOperation.None; } public bool startRead(IceInternal.Buffer buf, IceInternal.AsyncCallback callback, object state) @@ -210,7 +223,7 @@ namespace IceSSL Debug.Assert(ret > 0); - if(_instance.networkTraceLevel() >= 3) + if(_instance.traceLevel() >= 3) { int packetSize = buf.b.remaining(); if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize) @@ -218,7 +231,7 @@ namespace IceSSL packetSize = _maxReceivePacketSize; } string s = "received " + ret + " of " + packetSize + " bytes via ssl\n" + ToString(); - _logger.trace(_instance.networkTraceCategory(), s); + _instance.logger().trace(_instance.traceCategory(), s); } buf.b.position(buf.b.position() + ret); @@ -254,11 +267,11 @@ namespace IceSSL } } - public bool startWrite(IceInternal.Buffer buf, IceInternal.AsyncCallback callback, object state, + public bool startWrite(IceInternal.Buffer buf, IceInternal.AsyncCallback callback, object state, out bool completed) { Debug.Assert(_fd != null); - + if(_state < StateConnected) { completed = false; @@ -300,12 +313,12 @@ namespace IceSSL _writeCallback = callback; if(_stream != null) { - _writeResult = _stream.BeginWrite(buf.b.rawBytes(), buf.b.position(), packetSize, writeCompleted, + _writeResult = _stream.BeginWrite(buf.b.rawBytes(), buf.b.position(), packetSize, writeCompleted, state); } else { - _writeResult = _fd.BeginSend(buf.b.rawBytes(), buf.b.position(), packetSize, SocketFlags.None, + _writeResult = _fd.BeginSend(buf.b.rawBytes(), buf.b.position(), packetSize, SocketFlags.None, writeCompleted, state); } completed = packetSize == buf.b.remaining(); @@ -346,9 +359,9 @@ namespace IceSSL buf.b.position(buf.size()); // Assume all the data was sent for at-most-once semantics. } _writeResult = null; - return; + return; } - + if(_state < StateConnected && _state != StateProxyConnectRequest) { return; @@ -374,10 +387,11 @@ namespace IceSSL packetSize = _maxSendPacketSize; } - if(_instance.networkTraceLevel() >= 3) + if(_instance.traceLevel() >= 3) { - string s = "sent " + packetSize + " of " + packetSize + " bytes via ssl\n" + ToString(); - _logger.trace(_instance.networkTraceCategory(), s); + string s = "sent " + packetSize + " of " + packetSize + " bytes via " + protocol() + "\n" + + ToString(); + _instance.logger().trace(_instance.traceCategory(), s); } buf.b.position(buf.b.position() + packetSize); @@ -413,9 +427,9 @@ namespace IceSSL } } - public string type() + public string protocol() { - return "ssl"; + return _instance.protocol(); } public Ice.ConnectionInfo getInfo() @@ -450,7 +464,6 @@ namespace IceSSL _addr = addr; _proxy = proxy; _stream = null; - _logger = instance.communicator().getLogger(); _desc = connected ? IceInternal.Network.fdToString(_fd, _proxy, _addr) : "<not connected>"; _state = connected ? StateNeedAuthenticate : StateNeedConnect; @@ -471,8 +484,7 @@ namespace IceSSL // // Determine whether a certificate is required from the peer. // - _verifyPeer = - _instance.communicator().getProperties().getPropertyAsIntWithDefault("IceSSL.VerifyPeer", 2); + _verifyPeer = _instance.properties().getPropertyAsIntWithDefault("IceSSL.VerifyPeer", 2); } else { @@ -480,7 +492,6 @@ namespace IceSSL } } - private NativeConnectionInfo getNativeConnectionInfo() { IceSSL.NativeConnectionInfo info = new IceSSL.NativeConnectionInfo(); @@ -557,7 +568,7 @@ namespace IceSSL } _writeResult = _stream.BeginAuthenticateAsServer(cert, _verifyPeer > 1, _instance.protocols(), - _instance.checkCRL() > 0, + _instance.checkCRL() > 0, delegate(IAsyncResult result) { if(!result.CompletedSynchronously) @@ -611,18 +622,18 @@ namespace IceSSL _instance.verifyPeer(getNativeConnectionInfo(), _fd, _host); - if(_instance.networkTraceLevel() >= 1) + if(_instance.traceLevel() >= 1) { string s; if(_adapterName == null) { - s = "ssl connection established\n" + _desc; + s = protocol() + " connection established\n" + _desc; } else { - s = "accepted ssl connection\n" + _desc; + s = "accepted " + protocol() + " connection\n" + _desc; } - _logger.trace(_instance.networkTraceCategory(), s); + _instance.logger().trace(_instance.traceCategory(), s); } if(_instance.securityTraceLevel() >= 1) @@ -693,7 +704,7 @@ namespace IceSSL { if(_instance.securityTraceLevel() >= 1) { - _logger.trace(_instance.securityTraceCategory(), + _instance.logger().trace(_instance.securityTraceCategory(), "SSL certificate validation failed - client certificate not provided"); } return false; @@ -784,19 +795,21 @@ namespace IceSSL { if(message.Length > 0) { - _logger.trace(_instance.securityTraceCategory(), "SSL certificate validation failed:" + - message); + _instance.logger().trace(_instance.securityTraceCategory(), + "SSL certificate validation failed:" + message); } else { - _logger.trace(_instance.securityTraceCategory(), "SSL certificate validation failed"); + _instance.logger().trace(_instance.securityTraceCategory(), + "SSL certificate validation failed"); } } return false; } else if(message.Length > 0 && _instance.securityTraceLevel() >= 1) { - _logger.trace(_instance.securityTraceCategory(), "SSL certificate validation status:" + message); + _instance.logger().trace(_instance.securityTraceCategory(), "SSL certificate validation status:" + + message); } return true; @@ -826,7 +839,6 @@ namespace IceSSL private IPEndPoint _addr; private IceInternal.NetworkProxy _proxy; private SslStream _stream; - private Ice.Logger _logger; private string _desc; private int _verifyPeer; private int _maxSendPacketSize; @@ -841,7 +853,7 @@ namespace IceSSL private const int StateNeedConnect = 0; private const int StateConnectPending = 1; private const int StateProxyConnectRequest = 2; - private const int StateProxyConnectRequestPending = 3; + private const int StateProxyConnectRequestPending = 3; private const int StateNeedAuthenticate = 4; private const int StateAuthenticatePending = 5; private const int StateConnected = 6; diff --git a/cs/test/Ice/background/Acceptor.cs b/cs/test/Ice/background/Acceptor.cs index cc5d224a404..e9432d2580a 100644 --- a/cs/test/Ice/background/Acceptor.cs +++ b/cs/test/Ice/background/Acceptor.cs @@ -37,6 +37,11 @@ internal class Acceptor : IceInternal.Acceptor return new Transceiver(_acceptor.accept()); } + public string protocol() + { + return _acceptor.protocol(); + } + public override string ToString() { return _acceptor.ToString(); diff --git a/cs/test/Ice/background/AllTests.cs b/cs/test/Ice/background/AllTests.cs index 50535cd0f34..b2c8515ed22 100644 --- a/cs/test/Ice/background/AllTests.cs +++ b/cs/test/Ice/background/AllTests.cs @@ -86,6 +86,10 @@ public class AllTests _response.called(); } + public void responseNoOp() + { + } + public void noResponse() { test(false); @@ -327,6 +331,40 @@ public class AllTests } Console.Out.WriteLine("ok"); + bool ws = communicator.getProperties().getProperty("Ice.Default.Protocol").Equals("test-ws"); + bool wss = communicator.getProperties().getProperty("Ice.Default.Protocol").Equals("test-wss"); + if(!ws && !wss) + { + Console.Write("testing buffered transport... "); + Console.Out.Flush(); + + configuration.buffered(true); + backgroundController.buffered(true); + background.begin_op(); + background.ice_getCachedConnection().close(true); + background.begin_op(); + + Ice.AsyncResult r; + OpAMICallback cb = new OpAMICallback(); + + for(int i = 0; i < 10000; ++i) + { + r = background.begin_op().whenCompleted(cb.responseNoOp, cb.noException); + if(i % 50 == 0) + { + backgroundController.holdAdapter(); + backgroundController.resumeAdapter(); + } + if(i % 100 == 0) + { + r.waitForCompleted(); + } + } + r.waitForCompleted(); + + Console.Out.WriteLine("ok"); + } + return background; } diff --git a/cs/test/Ice/background/BackgroundControllerI.cs b/cs/test/Ice/background/BackgroundControllerI.cs index 6f50af360ae..dd66e6aa932 100644 --- a/cs/test/Ice/background/BackgroundControllerI.cs +++ b/cs/test/Ice/background/BackgroundControllerI.cs @@ -92,6 +92,11 @@ internal class BackgroundControllerI : BackgroundControllerDisp_ _configuration.writeException(enable ? new Ice.SocketException() : null); } + public override void buffered(bool enable, Ice.Current current) + { + _configuration.buffered(enable); + } + internal BackgroundControllerI(Ice.ObjectAdapter adapter) { _adapter = adapter; diff --git a/cs/test/Ice/background/Configuration.cs b/cs/test/Ice/background/Configuration.cs index d76bcd527ac..2a9c51b3271 100644 --- a/cs/test/Ice/background/Configuration.cs +++ b/cs/test/Ice/background/Configuration.cs @@ -146,6 +146,16 @@ internal class Configuration } } + public void buffered(bool b) + { + _buffered = b; + } + + public bool buffered() + { + return _buffered; + } + static public Configuration getInstance() { return _instance; @@ -158,6 +168,7 @@ internal class Configuration private Ice.LocalException _readException; private int _writeReadyCount; private Ice.LocalException _writeException; + private bool _buffered; private static Configuration _instance = new Configuration(); } diff --git a/cs/test/Ice/background/EndpointFactory.cs b/cs/test/Ice/background/EndpointFactory.cs index 92e2e000f3b..f5b23b5c985 100644 --- a/cs/test/Ice/background/EndpointFactory.cs +++ b/cs/test/Ice/background/EndpointFactory.cs @@ -7,6 +7,9 @@ // // ********************************************************************** +using System.Collections.Generic; +using System.Diagnostics; + internal class EndpointFactory : IceInternal.EndpointFactory { internal EndpointFactory(IceInternal.EndpointFactory factory) @@ -24,20 +27,30 @@ internal class EndpointFactory : IceInternal.EndpointFactory return "test-" + _factory.protocol(); } - public IceInternal.EndpointI create(string str, bool server) + public IceInternal.EndpointI create(List<string> args, bool server) { - return new EndpointI(_factory.create(str, server)); + return new EndpointI(_factory.create(args, server)); } public IceInternal.EndpointI read(IceInternal.BasicStream s) { - s.readShort(); - return new EndpointI(_factory.read(s)); + short type = s.readShort(); + Debug.Assert(type == _factory.type()); + + s.startReadEncaps(); + IceInternal.EndpointI endpoint = new EndpointI(_factory.read(s)); + s.endReadEncaps(); + return endpoint; } public void destroy() { } + public IceInternal.EndpointFactory clone(IceInternal.ProtocolInstance instance) + { + return this; + } + private IceInternal.EndpointFactory _factory; } diff --git a/cs/test/Ice/background/EndpointI.cs b/cs/test/Ice/background/EndpointI.cs index 7c2dd1b8e93..9b091ebaf2e 100644 --- a/cs/test/Ice/background/EndpointI.cs +++ b/cs/test/Ice/background/EndpointI.cs @@ -20,18 +20,6 @@ internal class EndpointI : IceInternal.EndpointI _configuration = Configuration.getInstance(); } - // - // Marshal the endpoint - // - public override void streamWrite(IceInternal.BasicStream s) - { - s.writeShort(type()); - _endpoint.streamWrite(s); - } - - // - // Convert the endpoint to its string form - // public override string ice_toString_() { return "test-" + _endpoint.ToString(); @@ -42,36 +30,29 @@ internal class EndpointI : IceInternal.EndpointI return _endpoint.getInfo(); } - // - // Return the endpoint type - // + public override void streamWrite(IceInternal.BasicStream s) + { + s.startWriteEncaps(); + s.writeShort(_endpoint.type()); + _endpoint.streamWrite(s); + s.endWriteEncaps(); + } + public override short type() { return (short)(TYPE_BASE + _endpoint.type()); } - // - // Return the protocol name; - // public override string protocol() { return _endpoint.protocol(); } - // - // Return the timeout for the endpoint in milliseconds. 0 means - // non-blocking, -1 means no timeout. - // public override int timeout() { return _endpoint.timeout(); } - // - // Return a new endpoint with a different timeout value, provided - // that timeouts are supported by the endpoint. Otherwise the same - // endpoint is returned. - // public override IceInternal.EndpointI timeout(int timeout) { IceInternal.EndpointI endpoint = _endpoint.timeout(timeout); @@ -85,9 +66,11 @@ internal class EndpointI : IceInternal.EndpointI } } - // - // Return a new endpoint with a different connection id. - // + public override string connectionId() + { + return _endpoint.connectionId(); + } + public override IceInternal.EndpointI connectionId(string connectionId) { IceInternal.EndpointI endpoint = _endpoint.connectionId(connectionId); @@ -101,20 +84,11 @@ internal class EndpointI : IceInternal.EndpointI } } - // - // Return true if the endpoints support bzip2 compress, or false - // otherwise. - // public override bool compress() { return _endpoint.compress(); } - // - // Return a new endpoint with a different compression value, - // provided that compression is supported by the - // endpoint. Otherwise the same endpoint is returned. - // public override IceInternal.EndpointI compress(bool compress) { IceInternal.EndpointI endpoint = _endpoint.compress(compress); @@ -128,29 +102,16 @@ internal class EndpointI : IceInternal.EndpointI } } - // - // Return true if the endpoint is datagram-based. - // public override bool datagram() { return _endpoint.datagram(); } - // - // Return true if the endpoint is secure. - // public override bool secure() { return _endpoint.secure(); } - // - // Return a server side transceiver for this endpoint, or null if a - // transceiver can only be created by an acceptor. In case a - // transceiver is created, this operation also returns a new - // "effective" endpoint, which might differ from this endpoint, - // for example, if a dynamic port number is assigned. - // public override IceInternal.Transceiver transceiver(ref IceInternal.EndpointI endpoint) { IceInternal.Transceiver transceiver = _endpoint.transceiver(ref endpoint); @@ -173,10 +134,6 @@ internal class EndpointI : IceInternal.EndpointI } } - // - // Return connectors for this endpoint, or empty list if no connector - // is available. - // public override List<IceInternal.Connector> connectors(Ice.EndpointSelectionType selType) { _configuration.checkConnectorsException(); @@ -257,14 +214,16 @@ internal class EndpointI : IceInternal.EndpointI return testEndpoint._endpoint.equivalent(_endpoint); } + public override string options() + { + return _endpoint.options(); + } + public override int GetHashCode() { return _endpoint.GetHashCode(); } - // - // Compare endpoints for sorting purposes - // public override int CompareTo(IceInternal.EndpointI obj) { EndpointI p = null; diff --git a/cs/test/Ice/background/Test.ice b/cs/test/Ice/background/Test.ice index 2749525fc48..e05437fcf4c 100644 --- a/cs/test/Ice/background/Test.ice +++ b/cs/test/Ice/background/Test.ice @@ -10,6 +10,7 @@ #pragma once #include <Ice/BuiltinSequences.ice> +#include <Ice/Endpoint.ice> module Test { @@ -37,6 +38,9 @@ interface BackgroundController void writeReady(bool enable); void writeException(bool enable); + + void buffered(bool enable); }; }; + diff --git a/cs/test/Ice/background/Transceiver.cs b/cs/test/Ice/background/Transceiver.cs index dad661ef88e..7dcc31e729b 100644 --- a/cs/test/Ice/background/Transceiver.cs +++ b/cs/test/Ice/background/Transceiver.cs @@ -8,60 +8,96 @@ // ********************************************************************** using System; +using System.Diagnostics; using System.Net.Sockets; internal class Transceiver : IceInternal.Transceiver { - public int initialize() + public int initialize(IceInternal.Buffer readBuffer, IceInternal.Buffer writeBuffer, ref bool hasMoreData) { _configuration.checkInitializeException(); if(!_initialized) { - int s = _transceiver.initialize(); - if(s != IceInternal.SocketOperation.None) + int status = _transceiver.initialize(readBuffer, writeBuffer, ref hasMoreData); + if(status != IceInternal.SocketOperation.None) { - return s; + return status; } _initialized = true; } return IceInternal.SocketOperation.None; } + public int closing(bool initiator, Ice.LocalException ex) + { + return _transceiver.closing(initiator, ex); + } + public void close() { _transceiver.close(); } - public bool write(IceInternal.Buffer buf) + public int write(IceInternal.Buffer buf) { - if(!_initialized) + if(!_configuration.writeReady() && buf.b.hasRemaining()) { - throw new Ice.SocketException(); - } - - if(!_configuration.writeReady()) - { - return false; + return IceInternal.SocketOperation.Write; } _configuration.checkWriteException(); return _transceiver.write(buf); } - public bool read(IceInternal.Buffer buf) + public int read(IceInternal.Buffer buf, ref bool hasMoreData) { - if(!_initialized) + if(!_configuration.readReady() && buf.b.hasRemaining()) { - throw new Ice.SocketException(); + return IceInternal.SocketOperation.Read; } - if(!_configuration.readReady()) + _configuration.checkReadException(); + + if(_buffered) { - return false; + while(buf.b.hasRemaining()) + { + if(_readBufferPos == _readBuffer.b.position()) + { + _readBufferPos = 0; + _readBuffer.b.position(0); + _transceiver.read(_readBuffer, ref hasMoreData); + if(_readBufferPos == _readBuffer.b.position()) + { + hasMoreData = false; + return IceInternal.SocketOperation.Read; + } + } + + int pos = _readBuffer.b.position(); + Debug.Assert(pos > _readBufferPos); + int requested = buf.b.remaining(); + int available = pos - _readBufferPos; + Debug.Assert(available > 0); + if(available >= requested) + { + available = requested; + } + + byte[] arr = new byte[available]; + _readBuffer.b.position(_readBufferPos); + _readBuffer.b.get(arr); + buf.b.put(arr); + _readBufferPos += available; + _readBuffer.b.position(pos); + } + hasMoreData = _readBufferPos < _readBuffer.b.position(); + return IceInternal.SocketOperation.None; + } + else + { + return _transceiver.read(buf, ref hasMoreData); } - - _configuration.checkReadException(); - return _transceiver.read(buf); } public bool startRead(IceInternal.Buffer buf, IceInternal.AsyncCallback callback, object state) @@ -70,13 +106,76 @@ internal class Transceiver : IceInternal.Transceiver { _configuration.checkReadException(); // Only raise if we're configured to read now. } - return _transceiver.startRead(buf, callback, state); + if(_buffered) + { + int pos = _readBuffer.b.position(); + int available = pos - _readBufferPos; + if(available > 0) + { + int requested = buf.b.remaining(); + if(available >= requested) + { + available = requested; + } + + byte[] arr = new byte[available]; + _readBuffer.b.position(_readBufferPos); + _readBuffer.b.get(arr); + buf.b.put(arr); + _readBufferPos += available; + _readBuffer.b.position(pos); + } + + if(_readBufferPos == _readBuffer.b.position() && buf.b.hasRemaining()) + { + _readBufferPos = 0; + _readBuffer.b.position(0); + return _transceiver.startRead(_readBuffer, callback, state); + } + else + { + Debug.Assert(!buf.b.hasRemaining()); + return true; // Completed synchronously + } + } + else + { + return _transceiver.startRead(buf, callback, state); + } } public void finishRead(IceInternal.Buffer buf) { _configuration.checkReadException(); - _transceiver.finishRead(buf); + if(_buffered) + { + if(buf.b.hasRemaining()) + { + _transceiver.finishRead(_readBuffer); + + int pos = _readBuffer.b.position(); + int requested = buf.b.remaining(); + int available = pos - _readBufferPos; + if(available > 0) + { + if(available >= requested) + { + available = requested; + } + + byte[] arr = new byte[available]; + _readBuffer.b.position(_readBufferPos); + _readBuffer.b.get(arr); + buf.b.put(arr); + _readBufferPos += available; + _readBuffer.b.position(pos); + } + } + } + else + { + _transceiver.finishRead(buf); + } } public bool startWrite(IceInternal.Buffer buf, IceInternal.AsyncCallback callback, object state, out bool completed) @@ -91,9 +190,9 @@ internal class Transceiver : IceInternal.Transceiver _transceiver.finishWrite(buf); } - public string type() + public string protocol() { - return "test-" + _transceiver.type(); + return "test-" + _transceiver.protocol(); } public Ice.ConnectionInfo getInfo() @@ -118,9 +217,18 @@ internal class Transceiver : IceInternal.Transceiver { _transceiver = transceiver; _configuration = Configuration.getInstance(); + _initialized = false; + _readBuffer = new IceInternal.Buffer(100 * 1024); + _readBuffer.resize(1024 * 8, true); // 8KB buffer + _readBuffer.b.position(0); + _readBufferPos = 0; + _buffered = _configuration.buffered(); } private IceInternal.Transceiver _transceiver; private Configuration _configuration; - private bool _initialized = false; + private bool _initialized; + private IceInternal.Buffer _readBuffer; + private int _readBufferPos; + private bool _buffered; } diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index fb2c0ce8a33..a39905470ac 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -2348,7 +2348,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { return IceInternal.SocketOperation.None; } - else if(_state == StateClosingPending && _writeStream.pos() == _writeStream.size()) + else if(_state == StateClosingPending && _writeStream.pos() == 0) { // Message wasn't sent, empty the _writeStream, we're not going to send more data. OutgoingMessage message = _sendStreams.getFirst(); diff --git a/java/src/IceInternal/IPEndpointI.java b/java/src/IceInternal/IPEndpointI.java index 7c860106754..e887fb95770 100644 --- a/java/src/IceInternal/IPEndpointI.java +++ b/java/src/IceInternal/IPEndpointI.java @@ -151,6 +151,7 @@ public abstract class IPEndpointI extends EndpointI _hashValue = 5381; _hashValue = HashUtil.hashAdd(_hashValue, type()); _hashValue = hashInit(_hashValue); + _hashInitialized = true; } return _hashValue; } diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 53a1abbfc90..f9e5a50b57a 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -28,20 +28,6 @@ public final class OutgoingConnectionFactory list.add(value); } - /* - public void - removeElementWithValue(K key, V value) - { - java.util.List<V> list = this.get(key); - assert(list != null); - list.remove(value); - if(list.isEmpty()) - { - this.remove(key); - } - } - */ - public boolean removeElementWithValue(K key, V value) { @@ -54,7 +40,7 @@ public final class OutgoingConnectionFactory } return v; } - }; + } interface CreateConnectionCallback { diff --git a/java/src/IceInternal/TcpAcceptor.java b/java/src/IceInternal/TcpAcceptor.java index d8f35bb625a..ef20c146dac 100644 --- a/java/src/IceInternal/TcpAcceptor.java +++ b/java/src/IceInternal/TcpAcceptor.java @@ -11,18 +11,16 @@ package IceInternal; class TcpAcceptor implements Acceptor { - public java.nio.channels.ServerSocketChannel - fd() + public java.nio.channels.ServerSocketChannel fd() { return _fd; } - public void - close() + public void close() { if(_instance.traceLevel() >= 1) { - String s = "stopping to accept tcp connections at " + toString(); + String s = "stopping to accept " + protocol() + " connections at " + toString(); _instance.logger().trace(_instance.traceCategory(), s); } @@ -31,17 +29,16 @@ class TcpAcceptor implements Acceptor _fd = null; } - public void - listen() + public void listen() { // Nothing to do. if(_instance.traceLevel() >= 1) { - StringBuffer s = new StringBuffer("listening for tcp connections at "); + StringBuffer s = new StringBuffer("listening for " + protocol() + " connections at "); s.append(toString()); - java.util.List<String> interfaces = + java.util.List<String> interfaces = Network.getHostsForEndpointExpand(_addr.getAddress().getHostAddress(), _instance.protocolSupport(), true); if(!interfaces.isEmpty()) @@ -53,8 +50,7 @@ class TcpAcceptor implements Acceptor } } - public Transceiver - accept() + public Transceiver accept() { java.nio.channels.SocketChannel fd = Network.doAccept(_fd); Network.setBlock(fd, false); @@ -62,27 +58,24 @@ class TcpAcceptor implements Acceptor if(_instance.traceLevel() >= 1) { - String s = "accepted tcp connection\n" + Network.fdToString(fd); + String s = "accepted " + protocol() + " connection\n" + Network.fdToString(fd); _instance.logger().trace(_instance.traceCategory(), s); } return new TcpTransceiver(_instance, fd); } - public String - protocol() + public String protocol() { return _instance.protocol(); } - public String - toString() + public String toString() { return Network.addrToString(_addr); } - int - effectivePort() + int effectivePort() { return _addr.getPort(); } @@ -100,24 +93,24 @@ class TcpAcceptor implements Acceptor if(!System.getProperty("os.name").startsWith("Windows")) { // - // Enable SO_REUSEADDR on Unix platforms to allow - // re-using the socket even if it's in the TIME_WAIT - // state. On Windows, this doesn't appear to be - // necessary and enabling SO_REUSEADDR would actually - // not be a good thing since it allows a second - // process to bind to an address even it's already - // bound by another process. + // Enable SO_REUSEADDR on Unix platforms to allow re-using the + // socket even if it's in the TIME_WAIT state. On Windows, + // this doesn't appear to be necessary and enabling + // SO_REUSEADDR would actually not be a good thing since it + // allows a second process to bind to an address even it's + // already bound by another process. // - // TODO: using SO_EXCLUSIVEADDRUSE on Windows would - // probably be better but it's only supported by recent - // Windows versions (XP SP2, Windows Server 2003). + // TODO: using SO_EXCLUSIVEADDRUSE on Windows would probably + // be better but it's only supported by recent Windows + // versions (XP SP2, Windows Server 2003). // Network.setReuseAddress(_fd, true); } + _addr = Network.getAddressForServer(host, port, instance.protocolSupport(), instance.preferIPv6()); if(instance.traceLevel() >= 2) { - String s = "attempting to bind to tcp socket " + toString(); + String s = "attempting to bind to " + protocol() + " socket " + toString(); instance.logger().trace(instance.traceCategory(), s); } _addr = Network.doBind(_fd, _addr, _backlog); @@ -129,8 +122,7 @@ class TcpAcceptor implements Acceptor } } - protected synchronized void - finalize() + protected synchronized void finalize() throws Throwable { try diff --git a/java/src/IceInternal/UdpConnector.java b/java/src/IceInternal/UdpConnector.java index 054014efc2c..94ddf2d14cd 100644 --- a/java/src/IceInternal/UdpConnector.java +++ b/java/src/IceInternal/UdpConnector.java @@ -38,7 +38,7 @@ final class UdpConnector implements Connector } // - // Only for use by TcpEndpoint + // Only for use by UdpEndpointI // UdpConnector(ProtocolInstance instance, java.net.InetSocketAddress addr, String mcastInterface, int mcastTtl, String connectionId) diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java index 15f1683cf4e..d8344adb130 100644 --- a/java/src/IceInternal/UdpTransceiver.java +++ b/java/src/IceInternal/UdpTransceiver.java @@ -66,6 +66,11 @@ final class UdpTransceiver implements Transceiver return SocketOperation.Write; } + if(!buf.b.hasRemaining()) + { + return SocketOperation.None; + } + assert(buf.b.position() == 0); assert(_fd != null && _state >= StateConnected); @@ -127,6 +132,11 @@ final class UdpTransceiver implements Transceiver @SuppressWarnings("deprecation") public int read(Buffer buf, Ice.BooleanHolder moreData) { + if(!buf.b.hasRemaining()) + { + return SocketOperation.None; + } + assert(buf.b.position() == 0); final int packetSize = java.lang.Math.min(_maxPacketSize, _rcvSize - _udpOverhead); diff --git a/java/test/Ice/background/AllTests.java b/java/test/Ice/background/AllTests.java index 45bb92f6a05..e029f2a8056 100644 --- a/java/test/Ice/background/AllTests.java +++ b/java/test/Ice/background/AllTests.java @@ -152,6 +152,26 @@ public class AllTests private Callback _sent = new Callback(); } + private static class OpAMICallbackNoOp extends Callback_Background_op + { + @Override + public void response() + { + } + + @Override + public void exception(Ice.LocalException ex) + { + ex.printStackTrace(); + test(false); + } + + @Override + public void sent(boolean ss) + { + } + } + private static class NoResponse extends Callback_Background_opWithPayload { @Override @@ -371,6 +391,40 @@ public class AllTests } out.println("ok"); + final boolean ws = communicator.getProperties().getProperty("Ice.Default.Protocol").equals("test-ws"); + final boolean wss = communicator.getProperties().getProperty("Ice.Default.Protocol").equals("test-wss"); + if(!ws && !wss) + { + out.print("testing buffered transport... "); + out.flush(); + + configuration.buffered(true); + backgroundController.buffered(true); + background.begin_op(); + background.ice_getCachedConnection().close(true); + background.begin_op(); + + Ice.AsyncResult r = null; + OpAMICallbackNoOp cb = new OpAMICallbackNoOp(); + + for(int i = 0; i < 10000; ++i) + { + r = background.begin_op(cb); + if(i % 50 == 0) + { + backgroundController.holdAdapter(); + backgroundController.resumeAdapter(); + } + if(i % 100 == 0) + { + r.waitForCompleted(); + } + } + r.waitForCompleted(); + + out.println("ok"); + } + return background; } diff --git a/java/test/Ice/background/BackgroundControllerI.java b/java/test/Ice/background/BackgroundControllerI.java index 594a623d131..64cefee73ca 100644 --- a/java/test/Ice/background/BackgroundControllerI.java +++ b/java/test/Ice/background/BackgroundControllerI.java @@ -88,6 +88,12 @@ class BackgroundControllerI extends _BackgroundControllerDisp { _configuration.writeException(enable ? new Ice.SocketException() : null); } + + public void + buffered(boolean enable, Ice.Current current) + { + _configuration.buffered(enable); + } public BackgroundControllerI(Configuration configuration, Ice.ObjectAdapter adapter) diff --git a/java/test/Ice/background/Test.ice b/java/test/Ice/background/Test.ice index 95d72cb42b6..b612eb4d280 100644 --- a/java/test/Ice/background/Test.ice +++ b/java/test/Ice/background/Test.ice @@ -18,8 +18,8 @@ module Test interface Background { - ["ami"] void op(); - ["ami"] void opWithPayload(Ice::ByteSeq seq); + void op(); + void opWithPayload(Ice::ByteSeq seq); void shutdown(); }; @@ -28,7 +28,7 @@ interface BackgroundController { void pauseCall(string call); void resumeCall(string call); - + void holdAdapter(); void resumeAdapter(); @@ -40,6 +40,8 @@ interface BackgroundController void writeReady(bool enable); void writeException(bool enable); + + void buffered(bool enable); }; }; diff --git a/java/test/Ice/background/Transceiver.java b/java/test/Ice/background/Transceiver.java index 36a0b85a6d2..a16eb335cfe 100644 --- a/java/test/Ice/background/Transceiver.java +++ b/java/test/Ice/background/Transceiver.java @@ -105,9 +105,10 @@ final class Transceiver implements IceInternal.Transceiver return IceInternal.SocketOperation.Read; } } - assert(_readBuffer.b.position() > _readBufferPos); - int requested = buf.b.remaining(); - int available = _readBuffer.b.position() - _readBufferPos; + final int pos = _readBuffer.b.position(); + assert(pos > _readBufferPos); + final int requested = buf.b.remaining(); + int available = pos - _readBufferPos; assert(available > 0); if(available >= requested) { @@ -115,9 +116,11 @@ final class Transceiver implements IceInternal.Transceiver } byte[] arr = new byte[available]; + _readBuffer.b.position(_readBufferPos); _readBuffer.b.get(arr); buf.b.put(arr); _readBufferPos += available; + _readBuffer.b.position(pos); } moreData.value = _readBufferPos < _readBuffer.b.position(); return IceInternal.SocketOperation.None; |