Fix sequential channel to check for cancellation

This commit is contained in:
Sergey Mashkov
2020-01-24 20:00:20 +03:00
committed by Leonid Stashevsky
parent 558f6fb948
commit b65dbef484

View File

@@ -82,7 +82,20 @@ abstract class ByteChannelSequentialBase(
}
private fun ensureNotClosed() {
if (closed) throw ClosedWriteChannelException("Channel is already closed")
if (closed) {
throw closedCause ?: ClosedWriteChannelException("Channel is already closed")
}
}
private fun ensureNotFailed() {
closedCause?.let { throw it }
}
private fun ensureNotFailed(closeable: BytePacketBuilder) {
closedCause?.let { cause ->
closeable.release()
throw cause
}
}
override suspend fun writeByte(b: Byte) {
@@ -329,7 +342,7 @@ abstract class ByteChannelSequentialBase(
}
override suspend fun readRemaining(limit: Long, headerSizeHint: Int): ByteReadPacket {
closedCause?.let { throw it }
ensureNotFailed()
val builder = BytePacketBuilder(headerSizeHint)
@@ -338,6 +351,7 @@ abstract class ByteChannelSequentialBase(
return if (remaining == 0L || (readable.isEmpty && closed)) {
afterRead()
ensureNotFailed(builder)
builder.build()
} else {
readRemainingSuspend(builder, remaining)
@@ -349,12 +363,14 @@ abstract class ByteChannelSequentialBase(
val partLimit = minOf(limit - builder.size, readable.remaining)
builder.writePacket(readable, partLimit)
afterRead()
ensureNotFailed(builder)
if (readable.remaining == 0L && writable.size == 0 && closed) break
awaitSuspend(1)
}
ensureNotFailed(builder)
return builder.build()
}
@@ -629,6 +645,7 @@ abstract class ByteChannelSequentialBase(
} else {
flush()
}
atLeastNBytesAvailableForRead.signal()
atLeastNBytesAvailableForWrite.signal()
notFull.signal()
@@ -671,7 +688,7 @@ abstract class ByteChannelSequentialBase(
protected fun afterWrite() {
if (closed) {
writable.release()
throw closedCause ?: ClosedWriteChannelException("Channel is already closed")
ensureNotClosed()
}
if (autoFlush || availableForWrite == 0) {
flush()
@@ -683,6 +700,7 @@ abstract class ByteChannelSequentialBase(
notFull.await {
flush()
}
ensureNotClosed()
}
final override suspend fun peekTo(