diff options
-rw-r--r-- | cpp/src/Ice/WSTransceiver.cpp | 73 | ||||
-rw-r--r-- | cs/src/Ice/WSTransceiver.cs | 168 | ||||
-rw-r--r-- | java/src/Ice/src/main/java/IceInternal/WSTransceiver.java | 100 |
3 files changed, 172 insertions, 169 deletions
diff --git a/cpp/src/Ice/WSTransceiver.cpp b/cpp/src/Ice/WSTransceiver.cpp index 4bb0fda7ee7..8e79aa4127d 100644 --- a/cpp/src/Ice/WSTransceiver.cpp +++ b/cpp/src/Ice/WSTransceiver.cpp @@ -507,9 +507,8 @@ IceInternal::WSTransceiver::write(Buffer& buf) { return s; } - } - - if(_incoming && !buf.b.empty() && _writeState == WriteStatePayload) + } + else if(_incoming && !buf.b.empty() && _writeState == WriteStatePayload) { SocketOperation s = _delegate->write(buf); if(s) @@ -1442,18 +1441,6 @@ IceInternal::WSTransceiver::preWrite(Buffer& buf) assert(buf.i = buf.b.begin()); prepareWriteHeader(OP_DATA, buf.b.size()); - // - // For server connections, we use the _writeBuffer only to - // write the header, the message is sent directly from the - // message buffer. For client connections, we use the write - // buffer for both the header and message buffer since we need - // to mask the message data. - // - if(_incoming) - { - _writeBuffer.b.resize(_writeBuffer.i - _writeBuffer.b.begin()); - _writeBuffer.i = _writeBuffer.b.begin(); - } _writeState = WriteStatePayload; } else if(_state == StatePingPending) @@ -1516,33 +1503,43 @@ IceInternal::WSTransceiver::preWrite(Buffer& buf) // // For an outgoing connection, each message must be masked with a random // 32-bit value, so we copy the entire message into the internal buffer - // for writing. + // for writing. For incoming connections, we just copy the start of the + // message in the internal buffer after the hedaer. If the message is + // larger, the reminder is sent directly from the message buffer to avoid + // copying. // - // For an incoming connection, we use the internal buffer to hold the - // frame header, and then write the caller's buffer to avoid copying. - // - if(!_incoming) + + if(!_incoming && (_writePayloadLength == 0 || _writeBuffer.i == _writeBuffer.b.end())) { - if((_writePayloadLength == 0 || _writeBuffer.i == _writeBuffer.b.end())) + if(_writeBuffer.i == _writeBuffer.b.end()) { - if(_writeBuffer.i == _writeBuffer.b.end()) - { - _writeBuffer.i = _writeBuffer.b.begin(); - } - - size_t n = buf.i - buf.b.begin(); - for(; n < buf.b.size() && _writeBuffer.i < _writeBuffer.b.end(); ++_writeBuffer.i, ++n) - { - *_writeBuffer.i = buf.b[n] ^ _writeMask[n % 4]; - } - _writePayloadLength = n; - - if(_writeBuffer.i < _writeBuffer.b.end()) - { - _writeBuffer.b.resize(_writeBuffer.i - _writeBuffer.b.begin()); - } _writeBuffer.i = _writeBuffer.b.begin(); } + + size_t n = buf.i - buf.b.begin(); + for(; n < buf.b.size() && _writeBuffer.i < _writeBuffer.b.end(); ++_writeBuffer.i, ++n) + { + *_writeBuffer.i = buf.b[n] ^ _writeMask[n % 4]; + } + _writePayloadLength = n; + if(_writeBuffer.i < _writeBuffer.b.end()) + { + _writeBuffer.b.resize(_writeBuffer.i - _writeBuffer.b.begin()); + } + _writeBuffer.i = _writeBuffer.b.begin(); + } + else if(_writePayloadLength == 0) + { + size_t n = min(_writeBuffer.b.end() - _writeBuffer.i, buf.b.end() - buf.i); + memcpy(_writeBuffer.i, buf.i, n); + _writeBuffer.i += n; + buf.i += n; + _writePayloadLength = n; + if(_writeBuffer.i < _writeBuffer.b.end()) + { + _writeBuffer.b.resize(_writeBuffer.i - _writeBuffer.b.begin()); + } + _writeBuffer.i = _writeBuffer.b.begin(); } return true; } @@ -1612,7 +1609,7 @@ IceInternal::WSTransceiver::postWrite(Buffer& buf) } } - if(!_incoming && _writePayloadLength > 0) + if((!_incoming || buf.i == buf.b.begin()) && _writePayloadLength > 0) { if(_writeBuffer.i == _writeBuffer.b.end()) { diff --git a/cs/src/Ice/WSTransceiver.cs b/cs/src/Ice/WSTransceiver.cs index 2ba308fa783..f7bef2ae13c 100644 --- a/cs/src/Ice/WSTransceiver.cs +++ b/cs/src/Ice/WSTransceiver.cs @@ -16,14 +16,14 @@ namespace IceInternal using System.Security.Cryptography; using System.Text; - sealed class WSTransceiver : IceInternal.Transceiver + sealed class WSTransceiver : Transceiver { public Socket fd() { return _delegate.fd(); } - public int initialize(IceInternal.Buffer readBuffer, IceInternal.Buffer writeBuffer, ref bool hasMoreData) + public int initialize(Buffer readBuffer, Buffer writeBuffer, ref bool hasMoreData) { // // Delegate logs exceptions that occur during initialize(), so there's no need to trap them here. @@ -109,7 +109,7 @@ namespace IceInternal if(_readBuffer.b.hasRemaining()) { int s = _delegate.read(_readBuffer, ref hasMoreData); - if(s == IceInternal.SocketOperation.Write || _readBuffer.b.position() == 0) + if(s == SocketOperation.Write || _readBuffer.b.position() == 0) { return s; } @@ -129,7 +129,7 @@ namespace IceInternal { if(_readBuffer.b.hasRemaining()) { - return IceInternal.SocketOperation.Read; + return SocketOperation.Read; } // @@ -238,7 +238,7 @@ namespace IceInternal } } - return IceInternal.SocketOperation.None; + return SocketOperation.None; } public int closing(bool initiator, Ice.LocalException reason) @@ -263,11 +263,11 @@ namespace IceInternal // Debug.Assert(!initiator); _closingInitiator = false; - return IceInternal.SocketOperation.Write; + return SocketOperation.Write; } else if(s >= StateClosingRequestPending) { - return IceInternal.SocketOperation.None; + return SocketOperation.None; } _closingInitiator = initiator; @@ -292,12 +292,12 @@ namespace IceInternal if(_state == StateOpened) { _state = StateClosingRequestPending; - return initiator ? IceInternal.SocketOperation.Read : IceInternal.SocketOperation.Write; + return initiator ? SocketOperation.Read : SocketOperation.Write; } else { _nextState = StateClosingRequestPending; - return IceInternal.SocketOperation.None; + return SocketOperation.None; } } @@ -318,11 +318,11 @@ namespace IceInternal _delegate.destroy(); } - public int write(IceInternal.Buffer buf) + public int write(Buffer buf) { if(_writePending) { - return IceInternal.SocketOperation.Write; + return SocketOperation.Write; } if(_state < StateOpened) @@ -337,7 +337,7 @@ namespace IceInternal } } - int s = IceInternal.SocketOperation.None; + int s = SocketOperation.None; do { if(preWrite(buf)) @@ -351,13 +351,11 @@ namespace IceInternal s = _delegate.write(buf); } - if(s == IceInternal.SocketOperation.None && _writeBuffer.b.hasRemaining()) + if(s == SocketOperation.None && _writeBuffer.b.hasRemaining()) { s = _delegate.write(_writeBuffer); } - - if(s == IceInternal.SocketOperation.None && _incoming && !buf.empty() && - _writeState == WriteStatePayload) + else if(s == SocketOperation.None && _incoming && !buf.empty() && _writeState == WriteStatePayload) { s = _delegate.write(buf); } @@ -365,22 +363,22 @@ namespace IceInternal } while(postWrite(buf, s)); - if(s != IceInternal.SocketOperation.None) + if(s != SocketOperation.None) { return s; } if(_state == StateClosingResponsePending && !_closingInitiator) { - return IceInternal.SocketOperation.Read; + return SocketOperation.Read; } - return IceInternal.SocketOperation.None; + return SocketOperation.None; } - public int read(IceInternal.Buffer buf, ref bool hasMoreData) + public int read(Buffer buf, ref bool hasMoreData) { if(_readPending) { - return IceInternal.SocketOperation.Read; + return SocketOperation.Read; } if(_state < StateOpened) @@ -391,13 +389,13 @@ namespace IceInternal } else { - if(_delegate.read(_readBuffer, ref hasMoreData) == IceInternal.SocketOperation.Write) + if(_delegate.read(_readBuffer, ref hasMoreData) == SocketOperation.Write) { - return IceInternal.SocketOperation.Write; + return SocketOperation.Write; } else { - return IceInternal.SocketOperation.None; + return SocketOperation.None; } } } @@ -408,7 +406,7 @@ namespace IceInternal return SocketOperation.None; } - int s = IceInternal.SocketOperation.None; + int s = SocketOperation.None; do { if(preRead(buf)) @@ -438,7 +436,7 @@ namespace IceInternal s = _delegate.read(_readBuffer, ref hasMoreData); } - if(s == IceInternal.SocketOperation.Write) + if(s == SocketOperation.Write) { postRead(buf); return s; @@ -465,13 +463,13 @@ namespace IceInternal _writeState == WriteStateHeader) { // We have things to write, ask to be notified when writes are ready. - s |= IceInternal.SocketOperation.Write; + s |= SocketOperation.Write; } return s; } - public bool startRead(IceInternal.Buffer buf, IceInternal.AsyncCallback callback, object state) + public bool startRead(Buffer buf, AsyncCallback callback, object state) { _readPending = true; if(_state < StateOpened) @@ -522,7 +520,7 @@ namespace IceInternal } } - public void finishRead(IceInternal.Buffer buf) + public void finishRead(Buffer buf) { Debug.Assert(_readPending); _readPending = false; @@ -560,7 +558,7 @@ namespace IceInternal postRead(buf); } - public bool startWrite(IceInternal.Buffer buf, IceInternal.AsyncCallback callback, object state, + public bool startWrite(Buffer buf, AsyncCallback callback, object state, out bool completed) { _writePending = true; @@ -595,7 +593,7 @@ namespace IceInternal } } - public void finishWrite(IceInternal.Buffer buf) + public void finishWrite(Buffer buf) { _writePending = false; if(_state < StateOpened) @@ -621,7 +619,7 @@ namespace IceInternal _delegate.finishWrite(buf); } - postWrite(buf, IceInternal.SocketOperation.None); + postWrite(buf, SocketOperation.None); } public string protocol() @@ -640,7 +638,7 @@ namespace IceInternal return info; } - public void checkSendSize(IceInternal.Buffer buf) + public void checkSendSize(Buffer buf) { _delegate.checkSendSize(buf); } @@ -656,7 +654,7 @@ namespace IceInternal } internal - WSTransceiver(ProtocolInstance instance, IceInternal.Transceiver del, string host, int port, string resource) + WSTransceiver(ProtocolInstance instance, Transceiver del, string host, int port, string resource) { init(instance, del); _host = host; @@ -671,7 +669,7 @@ namespace IceInternal // size. // Socket fd = del.fd(); - _writeBufferSize = Math.Max(IceInternal.Network.getSendBufferSize(fd), 1024); + _writeBufferSize = Math.Max(Network.getSendBufferSize(fd), 1024); // // Write and read buffer size must be large enough to hold the frame header! @@ -680,7 +678,7 @@ namespace IceInternal Debug.Assert(_readBufferSize > 256); } - internal WSTransceiver(ProtocolInstance instance, IceInternal.Transceiver del) + internal WSTransceiver(ProtocolInstance instance, Transceiver del) { init(instance, del); _host = ""; @@ -695,21 +693,21 @@ namespace IceInternal Debug.Assert(_readBufferSize > 256); } - private void init(ProtocolInstance instance, IceInternal.Transceiver del) + private void init(ProtocolInstance instance, Transceiver del) { _instance = instance; _delegate = del; _state = StateInitializeDelegate; _parser = new HttpParser(); _readState = ReadStateOpcode; - _readBuffer = new IceInternal.Buffer(IceInternal.ByteBuffer.ByteOrder.BIG_ENDIAN); // Network byte order + _readBuffer = new Buffer(ByteBuffer.ByteOrder.BIG_ENDIAN); // Network byte order _readBufferSize = 1024; _readLastFrame = true; _readOpCode = 0; _readHeaderLength = 0; _readPayloadLength = 0; _writeState = WriteStateHeader; - _writeBuffer = new IceInternal.Buffer(IceInternal.ByteBuffer.ByteOrder.BIG_ENDIAN); // Network byte order + _writeBuffer = new Buffer(ByteBuffer.ByteOrder.BIG_ENDIAN); // Network byte order _writeBufferSize = 1024; _readPending = false; _finishRead = false; @@ -721,7 +719,7 @@ namespace IceInternal _rand = new Random(); } - private void handleRequest(IceInternal.Buffer responseBuffer) + private void handleRequest(Buffer responseBuffer) { // // HTTP/1.1 @@ -960,7 +958,7 @@ namespace IceInternal } } - private bool preRead(IceInternal.Buffer buf) + private bool preRead(Buffer buf) { while(true) { @@ -1245,7 +1243,7 @@ namespace IceInternal } } - private bool postRead(IceInternal.Buffer buf) + private bool postRead(Buffer buf) { if(_readState != ReadStatePayload) { @@ -1283,7 +1281,7 @@ namespace IceInternal return buf.b.hasRemaining(); } - private bool preWrite(IceInternal.Buffer buf) + private bool preWrite(Buffer buf) { if(_writeState == WriteStateHeader) { @@ -1297,17 +1295,6 @@ namespace IceInternal Debug.Assert(buf.b.position() == 0); prepareWriteHeader((byte)OP_DATA, buf.size()); - // - // For server connections, we use the _writeBuffer only to - // write the header, the message is sent directly from the - // message buffer. For client connections, we use the write - // buffer for both the header and message buffer since we need - // to mask the message data. - // - if(_incoming) - { - _writeBuffer.b.flip(); - } _writeState = WriteStatePayload; } else if(_state == StatePingPending) @@ -1368,35 +1355,46 @@ namespace IceInternal // // For an outgoing connection, each message must be masked with a random // 32-bit value, so we copy the entire message into the internal buffer - // for writing. - // - // For an incoming connection, we use the internal buffer to hold the - // frame header, and then write the caller's buffer to avoid copying. + // for writing. For incoming connections, we just copy the start of the + // message in the internal buffer after the hedaer. If the message is + // larger, the reminder is sent directly from the message buffer to avoid + // copying. // - if(!_incoming) + if(!_incoming && (_writePayloadLength == 0 || !_writeBuffer.b.hasRemaining())) { - if(_writePayloadLength == 0 || !_writeBuffer.b.hasRemaining()) + if(!_writeBuffer.b.hasRemaining()) { - if(!_writeBuffer.b.hasRemaining()) - { - _writeBuffer.b.position(0); - } + _writeBuffer.b.position(0); + } + + int n = buf.b.position(); + int sz = buf.size(); + int pos = _writeBuffer.b.position(); + int count = Math.Min(sz - n, _writeBuffer.b.remaining()); + byte[] src = buf.b.rawBytes(); + byte[] dest = _writeBuffer.b.rawBytes(); + for(int i = 0; i < count; ++i, ++n, ++pos) + { + dest[pos] = (byte)(src[n] ^ _writeMask[n % 4]); + } + _writeBuffer.b.position(pos); + _writePayloadLength = n; - int n = buf.b.position(); - int sz = buf.size(); + _writeBuffer.b.flip(); + } + else if(_writePayloadLength == 0) + { + Debug.Assert(_incoming); + if(_writeBuffer.b.hasRemaining()) + { + Debug.Assert(buf.b.position() == 0); + int n = Math.Min(_writeBuffer.b.remaining(), buf.b.remaining()); int pos = _writeBuffer.b.position(); - int count = Math.Min(sz - n, _writeBuffer.b.remaining()); - byte[] src = buf.b.rawBytes(); - byte[] dest = _writeBuffer.b.rawBytes(); - for(int i = 0; i < count; ++i, ++n, ++pos) - { - dest[pos] = (byte)(src[n] ^ _writeMask[n % 4]); - } - _writeBuffer.b.position(pos); + System.Buffer.BlockCopy(buf.b.rawBytes(), 0, _writeBuffer.b.rawBytes(), pos, n); + _writeBuffer.b.position(pos + n); _writePayloadLength = n; - - _writeBuffer.b.flip(); - } + } + _writeBuffer.b.flip(); } return true; } @@ -1411,7 +1409,7 @@ namespace IceInternal } } - private bool postWrite(IceInternal.Buffer buf, int status) + private bool postWrite(Buffer buf, int status) { if(_state > StateOpened && _writeState == WriteStateControlFrame) { @@ -1464,11 +1462,11 @@ namespace IceInternal } else { - return status == IceInternal.SocketOperation.None; + return status == SocketOperation.None; } } - if(!_incoming && _writePayloadLength > 0) + if((!_incoming || buf.b.position() == 0) && _writePayloadLength > 0) { if(!_writeBuffer.b.hasRemaining()) { @@ -1476,7 +1474,7 @@ namespace IceInternal } } - if(status == IceInternal.SocketOperation.Write && !buf.b.hasRemaining() && !_writeBuffer.b.hasRemaining()) + if(status == SocketOperation.Write && !buf.b.hasRemaining() && !_writeBuffer.b.hasRemaining()) { // // Our buffers are empty but the delegate needs another call to write(). @@ -1497,7 +1495,7 @@ namespace IceInternal } else if(_state == StateOpened) { - return status == IceInternal.SocketOperation.None; + return status == SocketOperation.None; } return false; @@ -1589,7 +1587,7 @@ namespace IceInternal } private ProtocolInstance _instance; - private IceInternal.Transceiver _delegate; + private Transceiver _delegate; private string _host; private int _port; private string _resource; @@ -1618,7 +1616,7 @@ namespace IceInternal private const int ReadStatePayload = 3; private int _readState; - private IceInternal.Buffer _readBuffer; + private Buffer _readBuffer; private int _readBufferPos; private int _readBufferSize; @@ -1636,7 +1634,7 @@ namespace IceInternal private const int WriteStateFlush = 3; private int _writeState; - private IceInternal.Buffer _writeBuffer; + private Buffer _writeBuffer; private int _writeBufferSize; private byte[] _writeMask; private int _writePayloadLength; diff --git a/java/src/Ice/src/main/java/IceInternal/WSTransceiver.java b/java/src/Ice/src/main/java/IceInternal/WSTransceiver.java index bc65891c058..facca10cd4b 100644 --- a/java/src/Ice/src/main/java/IceInternal/WSTransceiver.java +++ b/java/src/Ice/src/main/java/IceInternal/WSTransceiver.java @@ -345,9 +345,7 @@ final class WSTransceiver implements Transceiver { s = _delegate.write(_writeBuffer); } - - if(s == SocketOperation.None && _incoming && !buf.empty() && - _writeState == WriteStatePayload) + else if(s == SocketOperation.None && _incoming && !buf.empty() && _writeState == WriteStatePayload) { s = _delegate.write(buf); } @@ -1174,17 +1172,6 @@ final class WSTransceiver implements Transceiver assert(buf.b.position() == 0); prepareWriteHeader((byte)OP_DATA, buf.size()); - // - // For server connections, we use the _writeBuffer only to - // write the header, the message is sent directly from the - // message buffer. For client connections, we use the write - // buffer for both the header and message buffer since we need - // to mask the message data. - // - if(_incoming) - { - _writeBuffer.b.flip(); - } _writeState = WriteStatePayload; } else if(_state == StatePingPending) @@ -1245,48 +1232,69 @@ final class WSTransceiver implements Transceiver // // For an outgoing connection, each message must be masked with a random // 32-bit value, so we copy the entire message into the internal buffer - // for writing. + // for writing. For incoming connections, we just copy the start of the + // message in the internal buffer after the hedaer. If the message is + // larger, the reminder is sent directly from the message buffer to avoid + // copying. // - // For an incoming connection, we use the internal buffer to hold the - // frame header, and then write the caller's buffer to avoid copying. - // - if(!_incoming) + + if(!_incoming && (_writePayloadLength == 0 || !_writeBuffer.b.hasRemaining())) { - if(_writePayloadLength == 0 || !_writeBuffer.b.hasRemaining()) + if(!_writeBuffer.b.hasRemaining()) { - if(!_writeBuffer.b.hasRemaining()) + _writeBuffer.b.position(0); + } + + int n = buf.b.position(); + final int sz = buf.size(); + if(buf.b.hasArray() && _writeBuffer.b.hasArray()) + { + int pos = _writeBuffer.b.position(); + final int count = Math.min(sz - n, _writeBuffer.b.remaining()); + final byte[] src = buf.b.array(); + final int srcOff = buf.b.arrayOffset(); + final byte[] dest = _writeBuffer.b.array(); + final int destOff = _writeBuffer.b.arrayOffset(); + for(int i = 0; i < count; ++i, ++n, ++pos) { - _writeBuffer.b.position(0); + dest[destOff + pos] = (byte)(src[srcOff + n] ^ _writeMask[n % 4]); } - - int n = buf.b.position(); - final int sz = buf.size(); - if(buf.b.hasArray() && _writeBuffer.b.hasArray()) + _writeBuffer.b.position(pos); + } + else + { + for(; n < sz && _writeBuffer.b.hasRemaining(); ++n) { - int pos = _writeBuffer.b.position(); - final int count = Math.min(sz - n, _writeBuffer.b.remaining()); - final byte[] src = buf.b.array(); - final int srcOff = buf.b.arrayOffset(); - final byte[] dest = _writeBuffer.b.array(); - final int destOff = _writeBuffer.b.arrayOffset(); - for(int i = 0; i < count; ++i, ++n, ++pos) - { - dest[destOff + pos] = (byte)(src[srcOff + n] ^ _writeMask[n % 4]); - } - _writeBuffer.b.position(pos); + final byte b = (byte)(buf.b.get(n) ^ _writeMask[n % 4]); + _writeBuffer.b.put(b); + } + } + _writePayloadLength = n; + _writeBuffer.b.flip(); + } + else if(_writePayloadLength == 0) + { + assert(_incoming); + if(_writeBuffer.b.hasRemaining()) + { + assert(buf.b.position() == 0); + int n = _writeBuffer.b.remaining(); + if(buf.b.remaining() > n) + { + int limit = buf.b.limit(); + buf.b.limit(n); + _writeBuffer.b.put(buf.b); + buf.b.limit(limit); + _writePayloadLength = n; } else { - for(; n < sz && _writeBuffer.b.hasRemaining(); ++n) - { - final byte b = (byte)(buf.b.get(n) ^ _writeMask[n % 4]); - _writeBuffer.b.put(b); - } + _writePayloadLength = buf.b.remaining(); + _writeBuffer.b.put(buf.b); } - _writePayloadLength = n; - - _writeBuffer.b.flip(); + buf.b.position(0); } + _writeBuffer.b.flip(); } return true; } @@ -1358,7 +1366,7 @@ final class WSTransceiver implements Transceiver } } - if(!_incoming && _writePayloadLength > 0) + if((!_incoming || buf.b.position() == 0) && _writePayloadLength > 0) { if(!_writeBuffer.b.hasRemaining()) { |