Move websocket primitives to common

This commit is contained in:
Leonid Stashevsky
2019-02-01 14:23:37 +03:00
parent 40da754769
commit 6cc32ea341
24 changed files with 749 additions and 187 deletions

View File

@@ -14,12 +14,12 @@ data class HttpProtocolVersion(val name: String, val major: Int, val minor: Int)
* HTTP/2.0 version.
*/
val HTTP_2_0 = HttpProtocolVersion("HTTP", 2, 0)
/**
* HTTP/1.1 version.
*/
val HTTP_1_1 = HttpProtocolVersion("HTTP", 1, 1)
/**
* HTTP/1.0 version.
*/
@@ -36,13 +36,29 @@ data class HttpProtocolVersion(val name: String, val major: Int, val minor: Int)
val QUIC = HttpProtocolVersion("QUIC", 1, 0)
/**
* Creates an instance of [HttpProtocolVersion] from the given parameters.
* Creates an instance of [HttpProtocolVersion] from the given parameters.
*/
fun fromValue(name: String, major: Int, minor: Int): HttpProtocolVersion = when {
name == "HTTP" && major == 1 && minor == 1 -> HTTP_1_1
name == "HTTP" && major == 2 && minor == 0 -> HTTP_2_0
else -> HttpProtocolVersion(name, major, minor)
}
/**
* Create an instance of [HttpProtocolVersion] from http string representation.
*/
fun parse(value: CharSequence): HttpProtocolVersion {
/**
* Format: protocol/major.minor
*/
val (protocol, major, minor) = value.split("/", ".").also {
check(it.size == 3) {
"Failed to parse HttpProtocolVersion. Expected format: protocol/major.minor, but actual: $value"
}
}
return fromValue(protocol, major.toInt(), minor.toInt())
}
}
override fun toString(): String = "$name/$major.$minor"

View File

@@ -1,6 +1,8 @@
package io.ktor.http.websocket
import io.ktor.util.*
import kotlinx.io.charsets.*
import kotlinx.io.core.*
private const val WEBSOCKET_SERVER_ACCEPT_TAIL = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
@@ -10,4 +12,4 @@ private const val WEBSOCKET_SERVER_ACCEPT_TAIL = "258EAFA5-E914-47DA-95CA-C5AB0D
*/
@KtorExperimentalAPI
fun websocketServerAccept(nonce: String): String =
encodeBase64(sha1("${nonce.trim()}$WEBSOCKET_SERVER_ACCEPT_TAIL".toByteArray(Charsets.ISO_8859_1)))
sha1("${nonce.trim()}$WEBSOCKET_SERVER_ACCEPT_TAIL".toByteArray(Charset.forName("ISO_8859_1"))).encodeBase64()

View File

@@ -0,0 +1,24 @@
package io.ktor.http.cio.websocket
import kotlinx.coroutines.*
/**
* Create [DefaultWebSocketSession] from session.
*/
@UseExperimental(WebSocketInternalAPI::class)
expect fun DefaultWebSocketSession(
session: WebSocketSession,
pingInterval: Long,
timeoutMillis: Long
): DefaultWebSocketSession
/**
* Default websocket session with ping-pong and timeout processing and built-in [closeReason] population
*/
expect interface DefaultWebSocketSession : WebSocketSession {
/**
* A close reason for this session. It could be `null` if a session is terminated with no close reason
* (for example due to connection failure).
*/
val closeReason: Deferred<CloseReason?>
}

View File

@@ -0,0 +1,107 @@
package io.ktor.http.cio.websocket
import kotlinx.coroutines.*
import kotlinx.io.charsets.*
import kotlinx.io.core.*
/**
* A frame received or ready to be sent. It is not reusable and not thread-safe
* @property fin is it final fragment, should be always `true` for control frames and if no fragmentation is used
* @property frameType enum value
* @property data - a frame content or fragment content
* @property disposableHandle could be invoked when the frame is processed
*/
expect sealed class Frame private constructor(
fin: Boolean,
frameType: FrameType,
data: ByteArray,
disposableHandle: DisposableHandle = NonDisposableHandle
) {
val fin: Boolean
val frameType: FrameType
val data: ByteArray
val disposableHandle: DisposableHandle
/**
* Represents an application level binary frame.
* In a RAW web socket session a big text frame could be fragmented
* (separated into several text frames so they have [fin] = false except the last one).
* Note that usually there is no need to handle fragments unless you have a RAW web socket session.
*/
class Binary(fin: Boolean, data: ByteArray) : Frame {
constructor(fin: Boolean, packet: ByteReadPacket)
}
/**
* Represents an application level text frame.
* In a RAW web socket session a big text frame could be fragmented
* (separated into several text frames so they have [fin] = false except the last one).
* Please note that a boundary between fragments could be in the middle of multi-byte (unicode) character
* so don't apply String constructor to every fragment but use decoder loop instead of concatenate fragments first.
* Note that usually there is no need to handle fragments unless you have a RAW web socket session.
*/
class Text(fin: Boolean, data: ByteArray) : Frame {
constructor(text: String)
constructor(fin: Boolean, packet: ByteReadPacket)
}
/**
* Represents a low-level level close frame. It could be sent to indicate web socket session end.
* Usually there is no need to send/handle it unless you have a RAW web socket session.
*/
class Close(data: ByteArray) : Frame {
constructor(reason: CloseReason)
constructor(packet: ByteReadPacket)
constructor()
}
/**
* Represents a low-level ping frame. Could be sent to test connection (peer should reply with [Pong]).
* Usually there is no need to send/handle it unless you have a RAW web socket session.
*/
class Ping(data: ByteArray) : Frame {
constructor(packet: ByteReadPacket)
}
/**
* Represents a low-level pong frame. Should be sent in reply to a [Ping] frame.
* Usually there is no need to send/handle it unless you have a RAW web socket session.
*/
class Pong(
data: ByteArray, disposableHandle: DisposableHandle = NonDisposableHandle
) : Frame {
constructor(packet: ByteReadPacket)
}
/**
* Creates a frame copy
*/
fun copy(): Frame
companion object {
/**
* Create a particular [Frame] instance by frame type
*/
fun byType(fin: Boolean, frameType: FrameType, data: ByteArray): Frame
}
}
/**
* Read text content from text frame. Shouldn't be used for fragmented frames: such frames need to be reassembled first
*/
fun Frame.Text.readText(): String {
require(fin) { "Text could be only extracted from non-fragmented frame" }
return Charsets.UTF_8.newDecoder().decode(buildPacket { writeFully(data) })
}
/**
* Read binary content from a frame. For fragmented frames only returns this fragment.
*/
fun Frame.readBytes(): ByteArray {
return data.copyOf()
}
internal object NonDisposableHandle : DisposableHandle {
override fun dispose() {}
override fun toString(): String = "NonDisposableHandle"
}

View File

@@ -0,0 +1,45 @@
package io.ktor.http.cio.websocket
/**
* Frame types enum
* @property controlFrame if this is control frame type
* @property opcode - frame type id that is used to transport it
*/
enum class FrameType(val controlFrame: Boolean, val opcode: Int) {
/**
* Regular application level text frame
*/
TEXT(false, 1),
/**
* Regular application level binary frame
*/
BINARY(false, 2),
/**
* Low level close frame type
*/
CLOSE(true, 8),
/**
* Low level ping frame type
*/
PING(true, 9),
/**
* Low level pong frame type
*/
PONG(true, 0xa);
companion object {
private val maxOpcode = values().maxBy { it.opcode }!!.opcode
private val byOpcodeArray = Array(maxOpcode + 1) { op -> values().singleOrNull { it.opcode == op } }
/**
* Find [FrameType] instance by numeric [opcode]
* @return a [FrameType] instance or `null` of the [opcode] value is not valid
*/
operator fun get(opcode: Int): FrameType? = if (opcode in 0..maxOpcode) byOpcodeArray[opcode] else null
}
}

View File

@@ -0,0 +1,10 @@
@file:kotlin.jvm.JvmMultifileClass
@file:kotlin.jvm.JvmName("UtilsKt")
package io.ktor.http.cio.websocket
@Suppress("NOTHING_TO_INLINE")
internal inline infix fun Byte.xor(other: Byte) = toInt().xor(other.toInt()).toByte()
@Suppress("NOTHING_TO_INLINE")
internal inline fun Boolean.flagAt(at: Int) = if (this) 1 shl at else 0

View File

@@ -3,13 +3,11 @@ package io.ktor.http.cio.websocket
import io.ktor.util.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import java.nio.*
import kotlin.coroutines.*
/**
* Represents a web socket session between two peers
*/
interface WebSocketSession : CoroutineScope {
expect interface WebSocketSession : CoroutineScope {
/**
* Incoming frames channel
*/
@@ -22,25 +20,12 @@ interface WebSocketSession : CoroutineScope {
val outgoing: SendChannel<Frame>
/**
* Enable or disable masking output messages by a random xor mask.
* Please note that changing this flag on the fly could be applied to the messages already sent (enqueued earlier)
* as the sending pipeline works asynchronously
* Enqueue frame, may suspend if outgoing queue is full. May throw an exception if outgoing channel is already
* closed so it is impossible to transfer any message. Frames that were sent after close frame could be silently
* ignored. Please note that close frame could be sent automatically in reply to a peer close frame unless it is
* raw websocket session.
*/
var masking: Boolean
/**
* Specifies frame size limit. Connection will be closed if violated
*/
var maxFrameSize: Long
/**
* Dispatcher to handle io operations
*/
@Deprecated(
"Use coroutineContext instead", ReplaceWith("coroutineContext"),
level = DeprecationLevel.ERROR
)
val dispatcher: CoroutineContext get() = coroutineContext
suspend fun send(frame: Frame)
/**
* Flush all outstanding messages and suspend until all earlier sent messages will be written. Could be called
@@ -50,16 +35,6 @@ interface WebSocketSession : CoroutineScope {
*/
suspend fun flush()
/**
* Enqueue frame, may suspend if outgoing queue is full. May throw an exception if outgoing channel is already
* closed so it is impossible to transfer any message. Frames that were sent after close frame could be silently
* ignored. Please note that close frame could be sent automatically in reply to a peer close frame unless it is
* raw websocket session.
*/
suspend fun send(frame: Frame) {
outgoing.send(frame)
}
/**
* Initiate connection termination immediately. Termination may complete asynchronously.
*/
@@ -69,9 +44,23 @@ interface WebSocketSession : CoroutineScope {
* Close session with the specified [cause] or with no reason if `null`
*/
@KtorExperimentalAPI
suspend fun close(cause: Throwable? = null)
suspend fun close(cause: Throwable?)
}
/**
* Enqueues a text frame for sending with the specified [content].
*
* May suspend if the outgoing queue is full, and throw an exception if the channel is already closed.
*/
suspend fun WebSocketSession.send(content: String): Unit = send(Frame.Text(content))
/**
* Enqueues a final binary frame for sending with the specified [content].
*
* May suspend if the outgoing queue is full, and throw an exception if the channel is already closed.
*/
suspend fun WebSocketSession.send(content: ByteArray): Unit = send(Frame.Binary(true, content))
/**
* Send a close frame with the specified [reason]. May suspend if outgoing channel is full or
* may throw an exception if it is already closed. The specified [reason] could be ignored if there was already
@@ -84,17 +73,3 @@ suspend fun WebSocketSession.close(reason: CloseReason) {
} catch (ignore: ClosedSendChannelException) {
}
}
/**
* Enqueues a text frame for sending with the specified [content].
*
* May suspend if the outgoing queue is full, and throw an exception if the channel is already closed.
*/
suspend fun WebSocketSession.send(content: String) = send(Frame.Text(content))
/**
* Enqueues a final binary frame for sending with the specified [content].
*
* May suspend if the outgoing queue is full, and throw an exception if the channel is already closed.
*/
suspend fun WebSocketSession.send(content: ByteArray) = send(Frame.Binary(true, ByteBuffer.wrap(content)))

View File

@@ -0,0 +1,23 @@
package io.ktor.http.cio.websocket
import kotlinx.coroutines.*
/**
* Default websocket session with ping-pong and timeout processing and built-in [closeReason] population
*/
actual interface DefaultWebSocketSession : WebSocketSession {
/**
* A close reason for this session. It could be `null` if a session is terminated with no close reason
* (for example due to connection failure).
*/
actual val closeReason: Deferred<CloseReason?>
}
/**
* Create [DefaultWebSocketSession] from session.
*/
actual fun DefaultWebSocketSession(
session: WebSocketSession,
pingInterval: Long,
timeoutMillis: Long
): DefaultWebSocketSession = error("There is no CIO js websocket implementation. Consider using platform default.")

View File

@@ -0,0 +1,102 @@
package io.ktor.http.cio.websocket
import kotlinx.coroutines.*
import kotlinx.io.core.*
import kotlinx.io.core.ByteOrder
/**
* A frame received or ready to be sent. It is not reusable and not thread-safe
* @property fin is it final fragment, should be always `true` for control frames and if no fragmentation is used
* @property frameType enum value
* @property data - a frame content or fragment content
* @property disposableHandle could be invoked when the frame is processed
*/
actual sealed class Frame private actual constructor(
actual val fin: Boolean,
actual val frameType: FrameType,
actual val data: ByteArray,
actual val disposableHandle: DisposableHandle
) {
/**
* Represents an application level binary frame.
* In a RAW web socket session a big text frame could be fragmented
* (separated into several text frames so they have [fin] = false except the last one).
* Note that usually there is no need to handle fragments unless you have a RAW web socket session.
*/
actual class Binary actual constructor(fin: Boolean, data: ByteArray) : Frame(fin, FrameType.BINARY, data) {
actual constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readBytes())
}
/**
* Represents an application level text frame.
* In a RAW web socket session a big text frame could be fragmented
* (separated into several text frames so they have [fin] = false except the last one).
* Please note that a boundary between fragments could be in the middle of multi-byte (unicode) character
* so don't apply String constructor to every fragment but use decoder loop instead of concatenate fragments first.
* Note that usually there is no need to handle fragments unless you have a RAW web socket session.
*/
actual class Text actual constructor(fin: Boolean, data: ByteArray) : Frame(fin, FrameType.TEXT, data) {
actual constructor(text: String) : this(true, text.toByteArray())
actual constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readBytes())
}
/**
* Represents a low-level level close frame. It could be sent to indicate web socket session end.
* Usually there is no need to send/handle it unless you have a RAW web socket session.
*/
actual class Close actual constructor(data: ByteArray) : Frame(true, FrameType.CLOSE, data) {
actual constructor(reason: CloseReason) : this(buildPacket {
byteOrder = ByteOrder.BIG_ENDIAN
writeShort(reason.code)
writeStringUtf8(reason.message)
})
actual constructor(packet: ByteReadPacket) : this(packet.readBytes())
actual constructor() : this(Empty)
}
/**
* Represents a low-level ping frame. Could be sent to test connection (peer should reply with [Pong]).
* Usually there is no need to send/handle it unless you have a RAW web socket session.
*/
actual class Ping actual constructor(data: ByteArray) : Frame(true, FrameType.PING, data) {
actual constructor(packet: ByteReadPacket) : this(packet.readBytes())
}
/**
* Represents a low-level pong frame. Should be sent in reply to a [Ping] frame.
* Usually there is no need to send/handle it unless you have a RAW web socket session.
*/
actual class Pong actual constructor(
data: ByteArray,
disposableHandle: DisposableHandle
) : Frame(true, FrameType.PONG, data, disposableHandle) {
actual constructor(packet: ByteReadPacket) : this(packet.readBytes())
}
override fun toString() = "Frame $frameType (fin=$fin, buffer len = ${data.size})"
/**
* Creates a frame copy
*/
actual fun copy(): Frame = byType(fin, frameType, data.copyOf())
actual companion object {
private val Empty: ByteArray = ByteArray(0)
/**
* Create a particular [Frame] instance by frame type
*/
actual fun byType(
fin: Boolean,
frameType: FrameType,
data: ByteArray
): Frame = when (frameType) {
FrameType.BINARY -> Binary(fin, data)
FrameType.TEXT -> Text(fin, data)
FrameType.CLOSE -> Close(data)
FrameType.PING -> Ping(data)
FrameType.PONG -> Pong(data)
}
}
}

View File

@@ -0,0 +1,48 @@
package io.ktor.http.cio.websocket
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
/**
* Represents a web socket session between two peers
*/
actual interface WebSocketSession : CoroutineScope {
/**
* Incoming frames channel
*/
actual val incoming: ReceiveChannel<Frame>
/**
* Outgoing frames channel. It could have limited capacity so sending too much frames may lead to suspension at
* corresponding send invocations. It also may suspend if a peer doesn't read frames for some reason.
*/
actual val outgoing: SendChannel<Frame>
/**
* Enqueue frame, may suspend if outgoing queue is full. May throw an exception if outgoing channel is already
* closed so it is impossible to transfer any message. Frames that were sent after close frame could be silently
* ignored. Please note that close frame could be sent automatically in reply to a peer close frame unless it is
* raw websocket session.
*/
@Suppress("ACTUAL_WITHOUT_EXPECT")
suspend fun send(frame: Frame) {
outgoing.send(frame)
}
/**
* Flush all outstanding messages and suspend until all earlier sent messages will be written. Could be called
* at any time even after close. May return immediately if the connection is already terminated.
* However it may also fail with an exception (or cancellation) at any point due to session failure.
* Please note that [flush] doesn't guarantee that frames were actually delivered.
*/
actual suspend fun flush()
/**
* Initiate connection termination immediately. Termination may complete asynchronously.
*/
actual fun terminate()
/**
* Close session with the specified [cause] or with no reason if `null`
*/
actual suspend fun close(cause: Throwable?)
}

View File

@@ -2,24 +2,34 @@ package io.ktor.http.cio.websocket
import kotlinx.coroutines.*
/**
* Create [DefaultWebSocketSession] from session.
*/
@UseExperimental(WebSocketInternalAPI::class)
actual fun DefaultWebSocketSession(
session: WebSocketSession,
pingInterval: Long,
timeoutMillis: Long
): DefaultWebSocketSession = DefaultWebSocketSessionImpl(
session, pingInterval, timeoutMillis
)
/**
* Default websocket session with ping-pong and timeout processing and built-in [closeReason] population
*/
interface DefaultWebSocketSession : WebSocketSession {
actual interface DefaultWebSocketSession : WebSocketSession {
/**
* Ping interval or `-1L` to disable pinger. Please note that pongs will be handled despite of this setting.
*/
var pingIntervalMillis: Long
/**
* A timeout to wait for pong reply to ping otherwise the session will be terminated immediately.
* It doesn't have any effect if [pingIntervalMillis] is `-1` (pinger is disabled).
*/
var timeoutMillis: Long
/**
* A close reason for this session. It could be `null` if a session is terminated with no close reason
* (for example due to connection failure).
*/
val closeReason: Deferred<CloseReason?>
actual val closeReason: Deferred<CloseReason?>
}

View File

@@ -2,12 +2,12 @@ package io.ktor.http.cio.websocket
import io.ktor.util.*
import io.ktor.util.cio.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.io.core.*
import kotlinx.io.pool.*
import java.nio.*
import java.util.concurrent.atomic.*
private val IncomingProcessorCoroutineName = CoroutineName("ws-incoming-processor")
private val OutgoingProcessorCoroutineName = CoroutineName("ws-outgoing-processor")
@@ -23,11 +23,11 @@ class DefaultWebSocketSessionImpl(
private val pool: ObjectPool<ByteBuffer> = KtorDefaultPool
) : DefaultWebSocketSession, WebSocketSession by raw {
private val pinger = AtomicReference<SendChannel<Frame.Pong>?>(null)
private val pinger = atomic<SendChannel<Frame.Pong>?>(null)
private val closeReasonRef = CompletableDeferred<CloseReason>()
private val filtered = Channel<Frame>(8)
private val outgoingToBeProcessed = Channel<Frame>(8)
private val closed: AtomicBoolean = AtomicBoolean(false)
private val closed: AtomicBoolean = atomic(false)
override val incoming: ReceiveChannel<Frame> get() = filtered
override val outgoing: SendChannel<Frame> get() = outgoingToBeProcessed
@@ -79,7 +79,7 @@ class DefaultWebSocketSessionImpl(
sendCloseSequence(frame.readReason())
return@launch
}
is Frame.Pong -> pinger.get()?.send(frame)
is Frame.Pong -> pinger.value?.send(frame)
is Frame.Ping -> ponger.send(frame)
else -> {
if (!frame.fin) {
@@ -140,7 +140,7 @@ class DefaultWebSocketSessionImpl(
val reasonToSend = reason ?: CloseReason(CloseReason.Codes.NORMAL, "")
try {
runOrCancelPinger()
send(Frame.Close(reasonToSend))
raw.outgoing.send(Frame.Close(reasonToSend))
} finally {
closeReasonRef.complete(reasonToSend)
}
@@ -148,8 +148,9 @@ class DefaultWebSocketSessionImpl(
private fun runOrCancelPinger() {
val interval = pingIntervalMillis
val newPinger: SendChannel<Frame.Pong>? = when {
closed.get() -> null
closed.value -> null
interval >= 0L -> pinger(raw.outgoing, interval, timeoutMillis, pool)
else -> null
}
@@ -161,13 +162,13 @@ class DefaultWebSocketSessionImpl(
newPinger?.offer(EmptyPong) // it is safe here to send dummy pong because pinger will ignore it
if (closed.get() && newPinger != null) {
if (closed.value && newPinger != null) {
runOrCancelPinger()
}
}
companion object {
private val EmptyPong = Frame.Pong(ByteBuffer.allocate(0))
private val EmptyPong = Frame.Pong(ByteArray(0))
}
}

View File

@@ -6,59 +6,23 @@ import kotlinx.io.core.*
import kotlinx.io.core.ByteOrder
import java.nio.*
/**
* Frame types enum
* @property controlFrame if this is control frame type
* @property opcode - frame type id that is used to transport it
*/
enum class FrameType (val controlFrame: Boolean, val opcode: Int) {
/**
* Regular application level text frame
*/
TEXT(false, 1),
/**
* Regular application level binary frame
*/
BINARY(false, 2),
/**
* Low level close frame type
*/
CLOSE(true, 8),
/**
* Low level ping frame type
*/
PING(true, 9),
/**
* Low level pong frame type
*/
PONG(true, 0xa);
companion object {
private val maxOpcode = values().maxBy { it.opcode }!!.opcode
private val byOpcodeArray = Array(maxOpcode + 1) { op -> values().singleOrNull { it.opcode == op } }
/**
* Find [FrameType] instance by numeric [opcode]
* @return a [FrameType] instance or `null` of the [opcode] value is not valid
*/
operator fun get(opcode: Int): FrameType? = if (opcode in 0..maxOpcode) byOpcodeArray[opcode] else null
}
}
/**
* A frame received or ready to be sent. It is not reusable and not thread-safe
* @property fin is it final fragment, should be always `true` for control frames and if no fragmentation is used
* @property frameType enum value
* @property buffer - a frame content or fragment content
* @property data - a frame content or fragment content
* @property disposableHandle could be invoked when the frame is processed
*/
sealed class Frame(val fin: Boolean, val frameType: FrameType, val buffer: ByteBuffer, val disposableHandle: DisposableHandle = NonDisposableHandle) {
private val initialSize = buffer.remaining()
actual sealed class Frame private actual constructor(
actual val fin: Boolean,
actual val frameType: FrameType,
actual val data: ByteArray,
actual val disposableHandle: DisposableHandle
) {
/**
* Frame content
*/
val buffer: ByteBuffer = ByteBuffer.wrap(data)
/**
* Represents an application level binary frame.
@@ -66,8 +30,10 @@ sealed class Frame(val fin: Boolean, val frameType: FrameType, val buffer: ByteB
* (separated into several text frames so they have [fin] = false except the last one).
* Note that usually there is no need to handle fragments unless you have a RAW web socket session.
*/
class Binary(fin: Boolean, buffer: ByteBuffer) : Frame(fin, FrameType.BINARY, buffer) {
constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readByteBuffer())
actual class Binary actual constructor(fin: Boolean, data: ByteArray) : Frame(fin, FrameType.BINARY, data) {
constructor(fin: Boolean, buffer: ByteBuffer) : this(fin, buffer.moveToByteArray())
actual constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readBytes())
}
/**
@@ -78,98 +44,100 @@ sealed class Frame(val fin: Boolean, val frameType: FrameType, val buffer: ByteB
* so don't apply String constructor to every fragment but use decoder loop instead of concatenate fragments first.
* Note that usually there is no need to handle fragments unless you have a RAW web socket session.
*/
class Text(fin: Boolean, buffer: ByteBuffer) : Frame(fin, FrameType.TEXT, buffer) {
constructor(text: String) : this(true, ByteBuffer.wrap(text.toByteArray(Charsets.UTF_8)))
constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readByteBuffer())
actual class Text actual constructor(fin: Boolean, data: ByteArray) : Frame(fin, FrameType.TEXT, data) {
actual constructor(text: String) : this(true, text.toByteArray())
actual constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readBytes())
constructor(fin: Boolean, buffer: ByteBuffer) : this(fin, buffer.moveToByteArray())
}
/**
* Represents a low-level level close frame. It could be sent to indicate web socket session end.
* Usually there is no need to send/handle it unless you have a RAW web socket session.
*/
class Close(buffer: ByteBuffer) : Frame(true, FrameType.CLOSE, buffer) {
constructor(reason: CloseReason) : this(buildPacket {
actual class Close actual constructor(data: ByteArray) : Frame(true, FrameType.CLOSE, data) {
actual constructor(reason: CloseReason) : this(buildPacket {
byteOrder = ByteOrder.BIG_ENDIAN
writeShort(reason.code)
writeStringUtf8(reason.message)
})
constructor(packet: ByteReadPacket) : this(packet.readByteBuffer())
constructor() : this(Empty)
actual constructor(packet: ByteReadPacket) : this(packet.readBytes())
actual constructor() : this(Empty)
constructor(buffer: ByteBuffer) : this(buffer.moveToByteArray())
}
/**
* Represents a low-level ping frame. Could be sent to test connection (peer should reply with [Pong]).
* Usually there is no need to send/handle it unless you have a RAW web socket session.
*/
class Ping(buffer: ByteBuffer) : Frame(true, FrameType.PING, buffer) {
constructor(packet: ByteReadPacket) : this(packet.readByteBuffer())
actual class Ping actual constructor(data: ByteArray) : Frame(true, FrameType.PING, data) {
actual constructor(packet: ByteReadPacket) : this(packet.readBytes())
constructor(buffer: ByteBuffer) : this(buffer.moveToByteArray())
}
/**
* Represents a low-level pong frame. Should be sent in reply to a [Ping] frame.
* Usually there is no need to send/handle it unless you have a RAW web socket session.
*/
class Pong(buffer: ByteBuffer, disposableHandle: DisposableHandle) : Frame(true, FrameType.PONG, buffer, disposableHandle) {
constructor(buffer: ByteBuffer) : this(buffer, NonDisposableHandle)
constructor(packet: ByteReadPacket) : this(packet.readByteBuffer())
actual class Pong actual constructor(
data: ByteArray,
disposableHandle: DisposableHandle
) : Frame(true, FrameType.PONG, data, disposableHandle) {
actual constructor(packet: ByteReadPacket) : this(packet.readBytes())
constructor(
buffer: ByteBuffer,
disposableHandle: DisposableHandle = NonDisposableHandle
) : this(buffer.moveToByteArray(), disposableHandle)
constructor(buffer: ByteBuffer) : this(buffer.moveToByteArray(), NonDisposableHandle)
}
override fun toString() = "Frame $frameType (fin=$fin, buffer len = $initialSize)"
override fun toString() = "Frame $frameType (fin=$fin, buffer len = ${data.size})"
/**
* Creates a frame copy
*/
fun copy() = byType(fin, frameType, ByteBuffer.allocate(buffer.remaining()).apply { buffer.slice().moveTo(this); clear() })
actual fun copy(): Frame = byType(fin, frameType, data.copyOf())
companion object {
private val Empty = ByteBuffer.allocate(0)
actual companion object {
private val Empty: ByteArray = ByteArray(0)
/**
* Create a particular [Frame] instance by frame type
*/
fun byType(fin: Boolean, frameType: FrameType, buffer: ByteBuffer): Frame = when (frameType) {
FrameType.BINARY -> Binary(fin, buffer)
FrameType.TEXT -> Text(fin, buffer)
FrameType.CLOSE -> Close(buffer)
FrameType.PING -> Ping(buffer)
FrameType.PONG -> Pong(buffer)
actual fun byType(
fin: Boolean,
frameType: FrameType,
data: ByteArray
): Frame = when (frameType) {
FrameType.BINARY -> Binary(fin, data)
FrameType.TEXT -> Text(fin, data)
FrameType.CLOSE -> Close(data)
FrameType.PING -> Ping(data)
FrameType.PONG -> Pong(data)
}
/**
* Create a particular [Frame] instance by frame type
*/
fun byType(fin: Boolean, frameType: FrameType, buffer: ByteBuffer): Frame =
byType(fin, frameType, buffer.moveToByteArray())
}
}
/**
* Read text content from text frame. Shouldn't be used for fragmented frames: such frames need to be reassembled first
*/
fun Frame.Text.readText(): String {
require(fin) { "Text could be only extracted from non-fragmented frame" }
return Charsets.UTF_8.decode(buffer.duplicate()).toString()
}
/**
* Read binary content from a frame. For fragmented frames only returns this fragment.
*/
fun Frame.readBytes(): ByteArray {
return buffer.duplicate().moveToByteArray()
}
/**
* Read close reason from close frame or null if no close reason provided
*/
fun Frame.Close.readReason(): CloseReason? {
if (buffer.remaining() < 2) {
if (data.size < 2) {
return null
}
buffer.mark()
val code = buffer.getShort()
val message = buffer.decodeString(Charsets.UTF_8)
buffer.reset()
return CloseReason(code, message)
}
private object NonDisposableHandle : DisposableHandle {
override fun dispose() {}
override fun toString(): String = "NonDisposableHandle"
}

View File

@@ -47,7 +47,7 @@ class RawWebSocket(
}
override suspend fun close(cause: Throwable?) {
terminate()
cause?.let { socketJob.completeExceptionally(it) } ?: terminate()
}
}

View File

@@ -1,19 +0,0 @@
package io.ktor.http.cio.websocket
import java.nio.*
@Suppress("NOTHING_TO_INLINE")
private inline infix fun Byte.xor(other: Byte) = toInt().xor(other.toInt()).toByte()
internal fun ByteBuffer.xor(other: ByteBuffer) {
val bb = slice()
val mask = other.slice()
val maskSize = mask.remaining()
for (i in 0 .. bb.remaining() - 1) {
bb.put(i, bb.get(i) xor mask[i % maskSize])
}
}
@Suppress("NOTHING_TO_INLINE")
internal inline fun Boolean.flagAt(at: Int) = if (this) 1 shl at else 0

View File

@@ -0,0 +1,17 @@
@file:kotlin.jvm.JvmMultifileClass
@file:kotlin.jvm.JvmName("UtilsKt")
package io.ktor.http.cio.websocket
import java.nio.*
internal fun ByteBuffer.xor(other: ByteBuffer) {
val bb = slice()
val mask = other.slice()
val maskSize = mask.remaining()
for (i in 0 until bb.remaining()) {
bb.put(i, bb.get(i) xor mask[i % maskSize])
}
}

View File

@@ -5,7 +5,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.io.*
import kotlinx.io.pool.*
import java.nio.ByteBuffer
import java.nio.*
import java.nio.channels.*
import java.util.concurrent.CancellationException
import kotlin.coroutines.*
@@ -18,10 +18,10 @@ import kotlin.coroutines.*
*/
@WebSocketInternalAPI
class WebSocketReader(
private val byteChannel: ByteReadChannel,
override val coroutineContext: CoroutineContext,
var maxFrameSize: Long,
pool: ObjectPool<ByteBuffer> = KtorDefaultPool
private val byteChannel: ByteReadChannel,
override val coroutineContext: CoroutineContext,
var maxFrameSize: Long,
pool: ObjectPool<ByteBuffer> = KtorDefaultPool
) : CoroutineScope {
@Suppress("UNUSED_PARAMETER")
@@ -30,9 +30,10 @@ class WebSocketReader(
replaceWith = ReplaceWith("WebSocketReader(byteChannel, coroutineContext, maxFrameSize, pool)"),
level = DeprecationLevel.ERROR
)
constructor(byteChannel: ByteReadChannel, maxFrameSize: Long,
parent: Job?, coroutineContext: CoroutineContext, pool: ObjectPool<ByteBuffer> = KtorDefaultPool)
: this(byteChannel, coroutineContext, maxFrameSize, pool)
constructor(
byteChannel: ByteReadChannel, maxFrameSize: Long,
parent: Job?, coroutineContext: CoroutineContext, pool: ObjectPool<ByteBuffer> = KtorDefaultPool
) : this(byteChannel, coroutineContext, maxFrameSize, pool)
private var state = State.HEADER
private val frameParser = FrameParser()

View File

@@ -0,0 +1,60 @@
package io.ktor.http.cio.websocket
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
/**
* Represents a web socket session between two peers
*/
actual interface WebSocketSession : CoroutineScope {
/**
* Enable or disable masking output messages by a random xor mask.
* Please note that changing this flag on the fly could be applied to the messages already sent (enqueued earlier)
* as the sending pipeline works asynchronously
*/
var masking: Boolean
/**
* Specifies frame size limit. Connection will be closed if violated
*/
var maxFrameSize: Long
/**
* Incoming frames channel
*/
actual val incoming: ReceiveChannel<Frame>
/**
* Outgoing frames channel. It could have limited capacity so sending too much frames may lead to suspension at
* corresponding send invocations. It also may suspend if a peer doesn't read frames for some reason.
*/
actual val outgoing: SendChannel<Frame>
/**
* Enqueue frame, may suspend if outgoing queue is full. May throw an exception if outgoing channel is already
* closed so it is impossible to transfer any message. Frames that were sent after close frame could be silently
* ignored. Please note that close frame could be sent automatically in reply to a peer close frame unless it is
* raw websocket session.
*/
@Suppress("ACTUAL_WITHOUT_EXPECT")
suspend fun send(frame: Frame) {
outgoing.send(frame)
}
/**
* Flush all outstanding messages and suspend until all earlier sent messages will be written. Could be called
* at any time even after close. May return immediately if the connection is already terminated.
* However it may also fail with an exception (or cancellation) at any point due to session failure.
* Please note that [flush] doesn't guarantee that frames were actually delivered.
*/
actual suspend fun flush()
/**
* Initiate connection termination immediately. Termination may complete asynchronously.
*/
actual fun terminate()
/**
* Close session with the specified [cause] or with no reason if `null`
*/
@Suppress("ACTUAL_FUNCTION_WITH_DEFAULT_ARGUMENTS")
actual suspend fun close(cause: Throwable? = null)
}

View File

@@ -0,0 +1,23 @@
package io.ktor.http.cio.websocket
import kotlinx.coroutines.*
/**
* Default websocket session with ping-pong and timeout processing and built-in [closeReason] population
*/
actual interface DefaultWebSocketSession : WebSocketSession {
/**
* A close reason for this session. It could be `null` if a session is terminated with no close reason
* (for example due to connection failure).
*/
actual val closeReason: Deferred<CloseReason?>
}
/**
* Create [DefaultWebSocketSession] from session.
*/
actual fun DefaultWebSocketSession(
session: WebSocketSession,
pingInterval: Long,
timeoutMillis: Long
): DefaultWebSocketSession = error("There is no CIO native websocket implementation. Consider using platform default.")

View File

@@ -0,0 +1,103 @@
package io.ktor.http.cio.websocket
import kotlinx.coroutines.*
import kotlinx.io.core.*
import kotlinx.io.core.ByteOrder
/**
* A frame received or ready to be sent. It is not reusable and not thread-safe
* @property fin is it final fragment, should be always `true` for control frames and if no fragmentation is used
* @property frameType enum value
* @property data - a frame content or fragment content
* @property disposableHandle could be invoked when the frame is processed
*/
actual sealed class Frame private actual constructor(
actual val fin: Boolean,
actual val frameType: FrameType,
actual val data: ByteArray,
actual val disposableHandle: DisposableHandle
) {
/**
* Represents an application level binary frame.
* In a RAW web socket session a big text frame could be fragmented
* (separated into several text frames so they have [fin] = false except the last one).
* Note that usually there is no need to handle fragments unless you have a RAW web socket session.
*/
actual class Binary actual constructor(fin: Boolean, data: ByteArray) : Frame(fin, FrameType.BINARY, data) {
actual constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readBytes())
}
/**
* Represents an application level text frame.
* In a RAW web socket session a big text frame could be fragmented
* (separated into several text frames so they have [fin] = false except the last one).
* Please note that a boundary between fragments could be in the middle of multi-byte (unicode) character
* so don't apply String constructor to every fragment but use decoder loop instead of concatenate fragments first.
* Note that usually there is no need to handle fragments unless you have a RAW web socket session.
*/
actual class Text actual constructor(fin: Boolean, data: ByteArray) : Frame(fin, FrameType.TEXT, data) {
actual constructor(text: String) : this(true, text.toByteArray())
actual constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readBytes())
}
/**
* Represents a low-level level close frame. It could be sent to indicate web socket session end.
* Usually there is no need to send/handle it unless you have a RAW web socket session.
*/
actual class Close actual constructor(data: ByteArray) : Frame(true, FrameType.CLOSE, data) {
actual constructor(reason: CloseReason) : this(buildPacket {
byteOrder = ByteOrder.BIG_ENDIAN
writeShort(reason.code)
writeStringUtf8(reason.message)
})
actual constructor(packet: ByteReadPacket) : this(packet.readBytes())
actual constructor() : this(Empty)
}
/**
* Represents a low-level ping frame. Could be sent to test connection (peer should reply with [Pong]).
* Usually there is no need to send/handle it unless you have a RAW web socket session.
*/
actual class Ping actual constructor(data: ByteArray) : Frame(true, FrameType.PING, data) {
actual constructor(packet: ByteReadPacket) : this(packet.readBytes())
}
/**
* Represents a low-level pong frame. Should be sent in reply to a [Ping] frame.
* Usually there is no need to send/handle it unless you have a RAW web socket session.
*/
actual class Pong actual constructor(
data: ByteArray,
disposableHandle: DisposableHandle
) : Frame(true, FrameType.PONG, data, disposableHandle) {
actual constructor(packet: ByteReadPacket) : this(packet.readBytes())
}
override fun toString() = "Frame $frameType (fin=$fin, buffer len = ${data.size})"
/**
* Creates a frame copy
*/
actual fun copy(): Frame = byType(fin, frameType, data.copyOf())
actual companion object {
private val Empty: ByteArray = ByteArray(0)
/**
* Create a particular [Frame] instance by frame type
*/
actual fun byType(
fin: Boolean,
frameType: FrameType,
data: ByteArray
): Frame = when (frameType) {
FrameType.BINARY -> Binary(fin, data)
FrameType.TEXT -> Text(fin, data)
FrameType.CLOSE -> Close(data)
FrameType.PING -> Ping(data)
FrameType.PONG -> Pong(data)
}
}
}

View File

@@ -0,0 +1,49 @@
package io.ktor.http.cio.websocket
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
/**
* Represents a web socket session between two peers
*/
actual interface WebSocketSession : CoroutineScope {
/**
* Incoming frames channel
*/
actual val incoming: ReceiveChannel<Frame>
/**
* Outgoing frames channel. It could have limited capacity so sending too much frames may lead to suspension at
* corresponding send invocations. It also may suspend if a peer doesn't read frames for some reason.
*/
actual val outgoing: SendChannel<Frame>
/**
* Enqueue frame, may suspend if outgoing queue is full. May throw an exception if outgoing channel is already
* closed so it is impossible to transfer any message. Frames that were sent after close frame could be silently
* ignored. Please note that close frame could be sent automatically in reply to a peer close frame unless it is
* raw websocket session.
*/
@Suppress("ACTUAL_WITHOUT_EXPECT")
suspend fun send(frame: Frame) {
outgoing.send(frame)
}
/**
* Flush all outstanding messages and suspend until all earlier sent messages will be written. Could be called
* at any time even after close. May return immediately if the connection is already terminated.
* However it may also fail with an exception (or cancellation) at any point due to session failure.
* Please note that [flush] doesn't guarantee that frames were actually delivered.
*/
actual suspend fun flush()
/**
* Initiate connection termination immediately. Termination may complete asynchronously.
*/
actual fun terminate()
/**
* Close session with the specified [cause] or with no reason if `null`
*/
actual suspend fun close(cause: Throwable?)
}

View File

@@ -24,10 +24,7 @@ private fun getDigest(text: String, algorithm: String, salt: String): ByteArray
@KtorExperimentalAPI
fun sha1(bytes: ByteArray): ByteArray = MessageDigest.getInstance("SHA1").digest(bytes)!!
@InternalAPI
actual fun Digest(name: String): Digest = object : Digest {
private val delegate = MessageDigest.getInstance(name)
private inline class DigestImpl(val delegate: MessageDigest) : Digest {
override fun plusAssign(bytes: ByteArray) {
delegate.update(bytes)
}