diff --git a/uvloop/handles/pipe.pxd b/uvloop/handles/pipe.pxd index 56fc2658..94c35bbb 100644 --- a/uvloop/handles/pipe.pxd +++ b/uvloop/handles/pipe.pxd @@ -25,6 +25,8 @@ cdef class ReadUnixTransport(UVStream): cdef ReadUnixTransport new(Loop loop, object protocol, Server server, object waiter) + cpdef write(self, data) + cdef class WriteUnixTransport(UVStream): diff --git a/uvloop/handles/pipe.pyx b/uvloop/handles/pipe.pyx index 4b95ed6e..305eab40 100644 --- a/uvloop/handles/pipe.pyx +++ b/uvloop/handles/pipe.pyx @@ -156,7 +156,7 @@ cdef class ReadUnixTransport(UVStream): def get_write_buffer_size(self): raise NotImplementedError - def write(self, data): + cpdef write(self, data): raise NotImplementedError def writelines(self, list_of_data): diff --git a/uvloop/sslproto.pxd b/uvloop/sslproto.pxd index 3da10f00..26df3153 100644 --- a/uvloop/sslproto.pxd +++ b/uvloop/sslproto.pxd @@ -38,8 +38,8 @@ cdef class SSLProtocol: object _extra - object _write_backlog - size_t _write_buffer_size + list _write_backlog + Py_ssize_t _write_buffer_size object _waiter Loop _loop @@ -67,13 +67,13 @@ cdef class SSLProtocol: bint _ssl_writing_paused bint _app_reading_paused - size_t _incoming_high_water - size_t _incoming_low_water + Py_ssize_t _incoming_high_water + Py_ssize_t _incoming_low_water bint _ssl_reading_paused bint _app_writing_paused - size_t _outgoing_high_water - size_t _outgoing_low_water + Py_ssize_t _outgoing_high_water + Py_ssize_t _outgoing_low_water object _app_protocol bint _app_protocol_is_buffer @@ -108,7 +108,9 @@ cdef class SSLProtocol: # Outgoing flow - cdef _write_appdata(self, list_of_data, object context) + cdef inline bint _is_protocol_ready(self) except? -1 + cdef inline _check_and_enqueue_appdata(self, data) + cdef inline _flush_write_backlog(self, object context) cdef _do_write(self) cdef _process_outgoing(self) @@ -122,7 +124,7 @@ cdef class SSLProtocol: # Flow control for writes from APP socket cdef _control_app_writing(self, object context=*) - cdef size_t _get_write_buffer_size(self) + cdef Py_ssize_t _get_write_buffer_size(self) cdef _set_write_buffer_limits(self, high=*, low=*) # Flow control for reads to APP socket @@ -134,5 +136,5 @@ cdef class SSLProtocol: cdef _control_ssl_reading(self) cdef _set_read_buffer_limits(self, high=*, low=*) - cdef size_t _get_read_buffer_size(self) + cdef Py_ssize_t _get_read_buffer_size(self) cdef _fatal_error(self, exc, message=*) diff --git a/uvloop/sslproto.pyx b/uvloop/sslproto.pyx index 42bb7644..a55bc7d2 100644 --- a/uvloop/sslproto.pyx +++ b/uvloop/sslproto.pyx @@ -147,12 +147,11 @@ cdef class _SSLProtocolTransport: This does not block; it buffers the data and arranges for it to be sent out asynchronously. """ - if not isinstance(data, (bytes, bytearray, memoryview)): - raise TypeError(f"data: expecting a bytes-like instance, " - f"got {type(data).__name__}") - if not data: + if not self._ssl_protocol._is_protocol_ready(): return - self._ssl_protocol._write_appdata((data,), self.context.copy()) + + self._ssl_protocol._check_and_enqueue_appdata(data) + self._ssl_protocol._flush_write_backlog(self.context.copy()) def writelines(self, list_of_data): """Write a list (or any iterable) of data bytes to the transport. @@ -160,7 +159,22 @@ cdef class _SSLProtocolTransport: The default implementation concatenates the arguments and calls write() on the result. """ - self._ssl_protocol._write_appdata(list_of_data, self.context.copy()) + if not self._ssl_protocol._is_protocol_ready(): + return + + cdef Py_ssize_t backlog_len_before = len(self._ssl_protocol._write_backlog) + cdef size_t backlog_size_before = self._ssl_protocol._write_buffer_size + + try: + for data in list_of_data: + self._ssl_protocol._check_and_enqueue_appdata(data) + except: + # Remove already enqueued items on exception + del self._ssl_protocol._write_backlog[backlog_len_before:] + self._ssl_protocol._write_buffer_size = backlog_size_before + raise + + self._ssl_protocol._flush_write_backlog(self.context.copy()) def write_eof(self): """Close the write end after flushing buffered data. @@ -246,7 +260,7 @@ cdef class SSLProtocol: self._extra = dict(sslcontext=sslcontext) # App data write buffering - self._write_backlog = col_deque() + self._write_backlog = [] self._write_buffer_size = 0 self._waiter = waiter @@ -652,19 +666,28 @@ cdef class SSLProtocol: # Outgoing flow - cdef _write_appdata(self, list_of_data, object context): + cdef bint _is_protocol_ready(self) except? -1: if self._state in (FLUSHING, SHUTDOWN, UNWRAPPED): if self._conn_lost >= LOG_THRESHOLD_FOR_CONNLOST_WRITES: aio_logger.warning('SSL connection is closed') self._conn_lost += 1 + return False + else: + return True + + cdef _check_and_enqueue_appdata(self, data): + if not isinstance(data, (bytes, bytearray, memoryview)): + raise TypeError(f"data: expecting a bytes-like instance, " + f"got {type(data).__name__}") + if not data: return - for data in list_of_data: - self._write_backlog.append(data) - self._write_buffer_size += len(data) + self._write_backlog.append(data) + self._write_buffer_size += len(data) + cdef _flush_write_backlog(self, object context): try: - if self._state == WRAPPED: + if self._state == WRAPPED and self._write_buffer_size > 0: self._do_write() self._process_outgoing() self._control_app_writing(context) @@ -674,22 +697,27 @@ cdef class SSLProtocol: cdef _do_write(self): """Do SSL write, consumes write backlog and fills outgoing BIO.""" - cdef size_t data_len, count + cdef Py_ssize_t data_len, bytes_written + cdef Py_ssize_t idx = 0 + try: - while self._write_backlog: - data = self._write_backlog[0] - count = self._sslobj_write(data) + while idx < len(self._write_backlog): + data = self._write_backlog[idx] data_len = len(data) - if count < data_len: - if not PyMemoryView_Check(data): - data = PyMemoryView_FromObject(data) - self._write_backlog[0] = data[count:] - self._write_buffer_size -= count - else: - del self._write_backlog[0] + bytes_written = self._sslobj_write(data) + if bytes_written == data_len: self._write_buffer_size -= data_len + idx += 1 + continue + + if not PyMemoryView_Check(data): + data = PyMemoryView_FromObject(data) + self._write_backlog[0] = data[bytes_written:] + self._write_buffer_size -= bytes_written except ssl_SSLAgainErrors as exc: pass + finally: + del self._write_backlog[:idx] cdef _process_outgoing(self): """Send bytes from the outgoing BIO.""" @@ -821,7 +849,7 @@ cdef class SSLProtocol: # Flow control for writes from APP socket cdef _control_app_writing(self, object context=None): - cdef size_t size = self._get_write_buffer_size() + cdef Py_ssize_t size = self._get_write_buffer_size() if size >= self._outgoing_high_water and not self._app_writing_paused: self._app_writing_paused = True try: @@ -861,8 +889,8 @@ cdef class SSLProtocol: 'protocol': self, }) - cdef size_t _get_write_buffer_size(self): - return self._outgoing.pending + self._write_buffer_size + cdef Py_ssize_t _get_write_buffer_size(self): + return self._write_buffer_size + self._outgoing.pending cdef _set_write_buffer_limits(self, high=None, low=None): high, low = add_flowcontrol_defaults( @@ -889,7 +917,7 @@ cdef class SSLProtocol: # Flow control for reads from SSL socket cdef _control_ssl_reading(self): - cdef size_t size = self._get_read_buffer_size() + cdef Py_ssize_t size = self._get_read_buffer_size() if size >= self._incoming_high_water and not self._ssl_reading_paused: self._ssl_reading_paused = True self._transport.pause_reading() @@ -903,8 +931,8 @@ cdef class SSLProtocol: self._incoming_high_water = high self._incoming_low_water = low - cdef size_t _get_read_buffer_size(self): - return self._incoming.pending + cdef Py_ssize_t _get_read_buffer_size(self): + return self._incoming.pending # Flow control for writes to SSL socket