Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions uvloop/handles/pipe.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ cdef class ReadUnixTransport(UVStream):
cdef ReadUnixTransport new(Loop loop, object protocol, Server server,
object waiter)

cpdef write(self, data)


cdef class WriteUnixTransport(UVStream):

Expand Down
2 changes: 1 addition & 1 deletion uvloop/handles/pipe.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ cdef class ReadUnixTransport(UVStream):
def get_write_buffer_size(self):
raise NotImplementedError

def write(self, data):
cpdef write(self, data):
raise NotImplementedError

def writelines(self, list_of_data):
Expand Down
20 changes: 11 additions & 9 deletions uvloop/sslproto.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ cdef class SSLProtocol:

object _extra

object _write_backlog
size_t _write_buffer_size
list _write_backlog
Py_ssize_t _write_buffer_size

object _waiter
Loop _loop
Expand Down Expand Up @@ -67,13 +67,13 @@ cdef class SSLProtocol:
bint _ssl_writing_paused
bint _app_reading_paused

size_t _incoming_high_water
size_t _incoming_low_water
Py_ssize_t _incoming_high_water
Py_ssize_t _incoming_low_water
bint _ssl_reading_paused

bint _app_writing_paused
size_t _outgoing_high_water
size_t _outgoing_low_water
Py_ssize_t _outgoing_high_water
Py_ssize_t _outgoing_low_water

object _app_protocol
bint _app_protocol_is_buffer
Expand Down Expand Up @@ -108,7 +108,9 @@ cdef class SSLProtocol:

# Outgoing flow

cdef _write_appdata(self, list_of_data, object context)
cdef inline bint _is_protocol_ready(self) except? -1
cdef inline _check_and_enqueue_appdata(self, data)
cdef inline _flush_write_backlog(self, object context)
cdef _do_write(self)
cdef _process_outgoing(self)

Expand All @@ -122,7 +124,7 @@ cdef class SSLProtocol:
# Flow control for writes from APP socket

cdef _control_app_writing(self, object context=*)
cdef size_t _get_write_buffer_size(self)
cdef Py_ssize_t _get_write_buffer_size(self)
cdef _set_write_buffer_limits(self, high=*, low=*)

# Flow control for reads to APP socket
Expand All @@ -134,5 +136,5 @@ cdef class SSLProtocol:

cdef _control_ssl_reading(self)
cdef _set_read_buffer_limits(self, high=*, low=*)
cdef size_t _get_read_buffer_size(self)
cdef Py_ssize_t _get_read_buffer_size(self)
cdef _fatal_error(self, exc, message=*)
86 changes: 57 additions & 29 deletions uvloop/sslproto.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,34 @@ cdef class _SSLProtocolTransport:
This does not block; it buffers the data and arranges for it
to be sent out asynchronously.
"""
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError(f"data: expecting a bytes-like instance, "
f"got {type(data).__name__}")
if not data:
if not self._ssl_protocol._is_protocol_ready():
return
self._ssl_protocol._write_appdata((data,), self.context.copy())

self._ssl_protocol._check_and_enqueue_appdata(data)
self._ssl_protocol._flush_write_backlog(self.context.copy())

def writelines(self, list_of_data):
"""Write a list (or any iterable) of data bytes to the transport.

The default implementation concatenates the arguments and
calls write() on the result.
"""
self._ssl_protocol._write_appdata(list_of_data, self.context.copy())
if not self._ssl_protocol._is_protocol_ready():
return

cdef Py_ssize_t backlog_len_before = len(self._ssl_protocol._write_backlog)
cdef size_t backlog_size_before = self._ssl_protocol._write_buffer_size

try:
for data in list_of_data:
self._ssl_protocol._check_and_enqueue_appdata(data)
except:
# Remove already enqueued items on exception
del self._ssl_protocol._write_backlog[backlog_len_before:]
self._ssl_protocol._write_buffer_size = backlog_size_before
raise

self._ssl_protocol._flush_write_backlog(self.context.copy())

def write_eof(self):
"""Close the write end after flushing buffered data.
Expand Down Expand Up @@ -246,7 +260,7 @@ cdef class SSLProtocol:
self._extra = dict(sslcontext=sslcontext)

# App data write buffering
self._write_backlog = col_deque()
self._write_backlog = []
self._write_buffer_size = 0

self._waiter = waiter
Expand Down Expand Up @@ -652,19 +666,28 @@ cdef class SSLProtocol:

# Outgoing flow

cdef _write_appdata(self, list_of_data, object context):
cdef bint _is_protocol_ready(self) except? -1:
if self._state in (FLUSHING, SHUTDOWN, UNWRAPPED):
if self._conn_lost >= LOG_THRESHOLD_FOR_CONNLOST_WRITES:
aio_logger.warning('SSL connection is closed')
self._conn_lost += 1
return False
else:
return True

cdef _check_and_enqueue_appdata(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError(f"data: expecting a bytes-like instance, "
f"got {type(data).__name__}")
if not data:
return

for data in list_of_data:
self._write_backlog.append(data)
self._write_buffer_size += len(data)
self._write_backlog.append(data)
self._write_buffer_size += len(data)

cdef _flush_write_backlog(self, object context):
try:
if self._state == WRAPPED:
if self._state == WRAPPED and self._write_buffer_size > 0:
self._do_write()
self._process_outgoing()
self._control_app_writing(context)
Expand All @@ -674,22 +697,27 @@ cdef class SSLProtocol:

cdef _do_write(self):
"""Do SSL write, consumes write backlog and fills outgoing BIO."""
cdef size_t data_len, count
cdef Py_ssize_t data_len, bytes_written
cdef Py_ssize_t idx = 0

try:
while self._write_backlog:
data = self._write_backlog[0]
count = self._sslobj_write(data)
while idx < len(self._write_backlog):
data = self._write_backlog[idx]
data_len = len(data)
if count < data_len:
if not PyMemoryView_Check(data):
data = PyMemoryView_FromObject(data)
self._write_backlog[0] = data[count:]
self._write_buffer_size -= count
else:
del self._write_backlog[0]
bytes_written = self._sslobj_write(data)
if bytes_written == data_len:
self._write_buffer_size -= data_len
idx += 1
continue

if not PyMemoryView_Check(data):
data = PyMemoryView_FromObject(data)
self._write_backlog[0] = data[bytes_written:]
self._write_buffer_size -= bytes_written
except ssl_SSLAgainErrors as exc:
pass
finally:
del self._write_backlog[:idx]

cdef _process_outgoing(self):
"""Send bytes from the outgoing BIO."""
Expand Down Expand Up @@ -821,7 +849,7 @@ cdef class SSLProtocol:
# Flow control for writes from APP socket

cdef _control_app_writing(self, object context=None):
cdef size_t size = self._get_write_buffer_size()
cdef Py_ssize_t size = self._get_write_buffer_size()
if size >= self._outgoing_high_water and not self._app_writing_paused:
self._app_writing_paused = True
try:
Expand Down Expand Up @@ -861,8 +889,8 @@ cdef class SSLProtocol:
'protocol': self,
})

cdef size_t _get_write_buffer_size(self):
return self._outgoing.pending + self._write_buffer_size
cdef Py_ssize_t _get_write_buffer_size(self):
return self._write_buffer_size + <Py_ssize_t>self._outgoing.pending

cdef _set_write_buffer_limits(self, high=None, low=None):
high, low = add_flowcontrol_defaults(
Expand All @@ -889,7 +917,7 @@ cdef class SSLProtocol:
# Flow control for reads from SSL socket

cdef _control_ssl_reading(self):
cdef size_t size = self._get_read_buffer_size()
cdef Py_ssize_t size = self._get_read_buffer_size()
if size >= self._incoming_high_water and not self._ssl_reading_paused:
self._ssl_reading_paused = True
self._transport.pause_reading()
Expand All @@ -903,8 +931,8 @@ cdef class SSLProtocol:
self._incoming_high_water = high
self._incoming_low_water = low

cdef size_t _get_read_buffer_size(self):
return self._incoming.pending
cdef Py_ssize_t _get_read_buffer_size(self):
return <Py_ssize_t>self._incoming.pending

# Flow control for writes to SSL socket

Expand Down