diff --git a/ktor-http/common/src/io/ktor/http/HttpProtocolVersion.kt b/ktor-http/common/src/io/ktor/http/HttpProtocolVersion.kt index 44c0c41b8..3be1c4744 100644 --- a/ktor-http/common/src/io/ktor/http/HttpProtocolVersion.kt +++ b/ktor-http/common/src/io/ktor/http/HttpProtocolVersion.kt @@ -14,12 +14,12 @@ data class HttpProtocolVersion(val name: String, val major: Int, val minor: Int) * HTTP/2.0 version. */ val HTTP_2_0 = HttpProtocolVersion("HTTP", 2, 0) - + /** * HTTP/1.1 version. */ val HTTP_1_1 = HttpProtocolVersion("HTTP", 1, 1) - + /** * HTTP/1.0 version. */ @@ -36,13 +36,29 @@ data class HttpProtocolVersion(val name: String, val major: Int, val minor: Int) val QUIC = HttpProtocolVersion("QUIC", 1, 0) /** - * Creates an instance of [HttpProtocolVersion] from the given parameters. + * Creates an instance of [HttpProtocolVersion] from the given parameters. */ fun fromValue(name: String, major: Int, minor: Int): HttpProtocolVersion = when { name == "HTTP" && major == 1 && minor == 1 -> HTTP_1_1 name == "HTTP" && major == 2 && minor == 0 -> HTTP_2_0 else -> HttpProtocolVersion(name, major, minor) } + + /** + * Create an instance of [HttpProtocolVersion] from http string representation. + */ + fun parse(value: CharSequence): HttpProtocolVersion { + /** + * Format: protocol/major.minor + */ + val (protocol, major, minor) = value.split("/", ".").also { + check(it.size == 3) { + "Failed to parse HttpProtocolVersion. Expected format: protocol/major.minor, but actual: $value" + } + } + + return fromValue(protocol, major.toInt(), minor.toInt()) + } } override fun toString(): String = "$name/$major.$minor" diff --git a/ktor-http/jvm/src/io/ktor/http/websocket/Utils.kt b/ktor-http/common/src/io/ktor/http/websocket/Utils.kt similarity index 61% rename from ktor-http/jvm/src/io/ktor/http/websocket/Utils.kt rename to ktor-http/common/src/io/ktor/http/websocket/Utils.kt index f16756c25..ebe22ba2b 100644 --- a/ktor-http/jvm/src/io/ktor/http/websocket/Utils.kt +++ b/ktor-http/common/src/io/ktor/http/websocket/Utils.kt @@ -1,6 +1,8 @@ package io.ktor.http.websocket import io.ktor.util.* +import kotlinx.io.charsets.* +import kotlinx.io.core.* private const val WEBSOCKET_SERVER_ACCEPT_TAIL = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" @@ -10,4 +12,4 @@ private const val WEBSOCKET_SERVER_ACCEPT_TAIL = "258EAFA5-E914-47DA-95CA-C5AB0D */ @KtorExperimentalAPI fun websocketServerAccept(nonce: String): String = - encodeBase64(sha1("${nonce.trim()}$WEBSOCKET_SERVER_ACCEPT_TAIL".toByteArray(Charsets.ISO_8859_1))) + sha1("${nonce.trim()}$WEBSOCKET_SERVER_ACCEPT_TAIL".toByteArray(Charset.forName("ISO_8859_1"))).encodeBase64() diff --git a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/CloseReason.kt b/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/CloseReason.kt similarity index 100% rename from ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/CloseReason.kt rename to ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/CloseReason.kt diff --git a/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/DefaultWebSocketSession.kt b/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/DefaultWebSocketSession.kt new file mode 100644 index 000000000..f05685c6b --- /dev/null +++ b/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/DefaultWebSocketSession.kt @@ -0,0 +1,24 @@ +package io.ktor.http.cio.websocket + +import kotlinx.coroutines.* + +/** + * Create [DefaultWebSocketSession] from session. + */ +@UseExperimental(WebSocketInternalAPI::class) +expect fun DefaultWebSocketSession( + session: WebSocketSession, + pingInterval: Long, + timeoutMillis: Long +): DefaultWebSocketSession + +/** + * Default websocket session with ping-pong and timeout processing and built-in [closeReason] population + */ +expect interface DefaultWebSocketSession : WebSocketSession { + /** + * A close reason for this session. It could be `null` if a session is terminated with no close reason + * (for example due to connection failure). + */ + val closeReason: Deferred +} diff --git a/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/FrameCommon.kt b/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/FrameCommon.kt new file mode 100644 index 000000000..f66b9b9a0 --- /dev/null +++ b/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/FrameCommon.kt @@ -0,0 +1,107 @@ +package io.ktor.http.cio.websocket + +import kotlinx.coroutines.* +import kotlinx.io.charsets.* +import kotlinx.io.core.* + +/** + * A frame received or ready to be sent. It is not reusable and not thread-safe + * @property fin is it final fragment, should be always `true` for control frames and if no fragmentation is used + * @property frameType enum value + * @property data - a frame content or fragment content + * @property disposableHandle could be invoked when the frame is processed + */ +expect sealed class Frame private constructor( + fin: Boolean, + frameType: FrameType, + data: ByteArray, + disposableHandle: DisposableHandle = NonDisposableHandle +) { + val fin: Boolean + val frameType: FrameType + val data: ByteArray + val disposableHandle: DisposableHandle + + /** + * Represents an application level binary frame. + * In a RAW web socket session a big text frame could be fragmented + * (separated into several text frames so they have [fin] = false except the last one). + * Note that usually there is no need to handle fragments unless you have a RAW web socket session. + */ + class Binary(fin: Boolean, data: ByteArray) : Frame { + constructor(fin: Boolean, packet: ByteReadPacket) + } + + /** + * Represents an application level text frame. + * In a RAW web socket session a big text frame could be fragmented + * (separated into several text frames so they have [fin] = false except the last one). + * Please note that a boundary between fragments could be in the middle of multi-byte (unicode) character + * so don't apply String constructor to every fragment but use decoder loop instead of concatenate fragments first. + * Note that usually there is no need to handle fragments unless you have a RAW web socket session. + */ + class Text(fin: Boolean, data: ByteArray) : Frame { + constructor(text: String) + constructor(fin: Boolean, packet: ByteReadPacket) + } + + /** + * Represents a low-level level close frame. It could be sent to indicate web socket session end. + * Usually there is no need to send/handle it unless you have a RAW web socket session. + */ + class Close(data: ByteArray) : Frame { + constructor(reason: CloseReason) + constructor(packet: ByteReadPacket) + constructor() + } + + /** + * Represents a low-level ping frame. Could be sent to test connection (peer should reply with [Pong]). + * Usually there is no need to send/handle it unless you have a RAW web socket session. + */ + class Ping(data: ByteArray) : Frame { + constructor(packet: ByteReadPacket) + } + + /** + * Represents a low-level pong frame. Should be sent in reply to a [Ping] frame. + * Usually there is no need to send/handle it unless you have a RAW web socket session. + */ + class Pong( + data: ByteArray, disposableHandle: DisposableHandle = NonDisposableHandle + ) : Frame { + constructor(packet: ByteReadPacket) + } + + /** + * Creates a frame copy + */ + fun copy(): Frame + + companion object { + /** + * Create a particular [Frame] instance by frame type + */ + fun byType(fin: Boolean, frameType: FrameType, data: ByteArray): Frame + } +} + +/** + * Read text content from text frame. Shouldn't be used for fragmented frames: such frames need to be reassembled first + */ +fun Frame.Text.readText(): String { + require(fin) { "Text could be only extracted from non-fragmented frame" } + return Charsets.UTF_8.newDecoder().decode(buildPacket { writeFully(data) }) +} + +/** + * Read binary content from a frame. For fragmented frames only returns this fragment. + */ +fun Frame.readBytes(): ByteArray { + return data.copyOf() +} + +internal object NonDisposableHandle : DisposableHandle { + override fun dispose() {} + override fun toString(): String = "NonDisposableHandle" +} diff --git a/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/FrameType.kt b/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/FrameType.kt new file mode 100644 index 000000000..136b1fdf1 --- /dev/null +++ b/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/FrameType.kt @@ -0,0 +1,45 @@ +package io.ktor.http.cio.websocket + +/** + * Frame types enum + * @property controlFrame if this is control frame type + * @property opcode - frame type id that is used to transport it + */ +enum class FrameType(val controlFrame: Boolean, val opcode: Int) { + /** + * Regular application level text frame + */ + TEXT(false, 1), + + /** + * Regular application level binary frame + */ + BINARY(false, 2), + + /** + * Low level close frame type + */ + CLOSE(true, 8), + + /** + * Low level ping frame type + */ + PING(true, 9), + + /** + * Low level pong frame type + */ + PONG(true, 0xa); + + companion object { + private val maxOpcode = values().maxBy { it.opcode }!!.opcode + + private val byOpcodeArray = Array(maxOpcode + 1) { op -> values().singleOrNull { it.opcode == op } } + + /** + * Find [FrameType] instance by numeric [opcode] + * @return a [FrameType] instance or `null` of the [opcode] value is not valid + */ + operator fun get(opcode: Int): FrameType? = if (opcode in 0..maxOpcode) byOpcodeArray[opcode] else null + } +} diff --git a/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/Utils.kt b/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/Utils.kt new file mode 100644 index 000000000..c8dc71b61 --- /dev/null +++ b/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/Utils.kt @@ -0,0 +1,10 @@ +@file:kotlin.jvm.JvmMultifileClass +@file:kotlin.jvm.JvmName("UtilsKt") + +package io.ktor.http.cio.websocket + +@Suppress("NOTHING_TO_INLINE") +internal inline infix fun Byte.xor(other: Byte) = toInt().xor(other.toInt()).toByte() + +@Suppress("NOTHING_TO_INLINE") +internal inline fun Boolean.flagAt(at: Int) = if (this) 1 shl at else 0 diff --git a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/WebSocketInternalAPI.kt b/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/WebSocketInternalAPI.kt similarity index 100% rename from ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/WebSocketInternalAPI.kt rename to ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/WebSocketInternalAPI.kt diff --git a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/WebSocketSession.kt b/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/WebSocketSession.kt similarity index 70% rename from ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/WebSocketSession.kt rename to ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/WebSocketSession.kt index 8c4d7c8f0..79b50c61a 100644 --- a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/WebSocketSession.kt +++ b/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/websocket/WebSocketSession.kt @@ -3,13 +3,11 @@ package io.ktor.http.cio.websocket import io.ktor.util.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* -import java.nio.* -import kotlin.coroutines.* /** * Represents a web socket session between two peers */ -interface WebSocketSession : CoroutineScope { +expect interface WebSocketSession : CoroutineScope { /** * Incoming frames channel */ @@ -22,25 +20,12 @@ interface WebSocketSession : CoroutineScope { val outgoing: SendChannel /** - * Enable or disable masking output messages by a random xor mask. - * Please note that changing this flag on the fly could be applied to the messages already sent (enqueued earlier) - * as the sending pipeline works asynchronously + * Enqueue frame, may suspend if outgoing queue is full. May throw an exception if outgoing channel is already + * closed so it is impossible to transfer any message. Frames that were sent after close frame could be silently + * ignored. Please note that close frame could be sent automatically in reply to a peer close frame unless it is + * raw websocket session. */ - var masking: Boolean - - /** - * Specifies frame size limit. Connection will be closed if violated - */ - var maxFrameSize: Long - - /** - * Dispatcher to handle io operations - */ - @Deprecated( - "Use coroutineContext instead", ReplaceWith("coroutineContext"), - level = DeprecationLevel.ERROR - ) - val dispatcher: CoroutineContext get() = coroutineContext + suspend fun send(frame: Frame) /** * Flush all outstanding messages and suspend until all earlier sent messages will be written. Could be called @@ -50,16 +35,6 @@ interface WebSocketSession : CoroutineScope { */ suspend fun flush() - /** - * Enqueue frame, may suspend if outgoing queue is full. May throw an exception if outgoing channel is already - * closed so it is impossible to transfer any message. Frames that were sent after close frame could be silently - * ignored. Please note that close frame could be sent automatically in reply to a peer close frame unless it is - * raw websocket session. - */ - suspend fun send(frame: Frame) { - outgoing.send(frame) - } - /** * Initiate connection termination immediately. Termination may complete asynchronously. */ @@ -69,9 +44,23 @@ interface WebSocketSession : CoroutineScope { * Close session with the specified [cause] or with no reason if `null` */ @KtorExperimentalAPI - suspend fun close(cause: Throwable? = null) + suspend fun close(cause: Throwable?) } +/** + * Enqueues a text frame for sending with the specified [content]. + * + * May suspend if the outgoing queue is full, and throw an exception if the channel is already closed. + */ +suspend fun WebSocketSession.send(content: String): Unit = send(Frame.Text(content)) + +/** + * Enqueues a final binary frame for sending with the specified [content]. + * + * May suspend if the outgoing queue is full, and throw an exception if the channel is already closed. + */ +suspend fun WebSocketSession.send(content: ByteArray): Unit = send(Frame.Binary(true, content)) + /** * Send a close frame with the specified [reason]. May suspend if outgoing channel is full or * may throw an exception if it is already closed. The specified [reason] could be ignored if there was already @@ -84,17 +73,3 @@ suspend fun WebSocketSession.close(reason: CloseReason) { } catch (ignore: ClosedSendChannelException) { } } - -/** - * Enqueues a text frame for sending with the specified [content]. - * - * May suspend if the outgoing queue is full, and throw an exception if the channel is already closed. - */ -suspend fun WebSocketSession.send(content: String) = send(Frame.Text(content)) - -/** - * Enqueues a final binary frame for sending with the specified [content]. - * - * May suspend if the outgoing queue is full, and throw an exception if the channel is already closed. - */ -suspend fun WebSocketSession.send(content: ByteArray) = send(Frame.Binary(true, ByteBuffer.wrap(content))) diff --git a/ktor-http/ktor-http-cio/js/src/io/ktor/http/cio/websocket/DefaultWebSocketSessionJs.kt b/ktor-http/ktor-http-cio/js/src/io/ktor/http/cio/websocket/DefaultWebSocketSessionJs.kt new file mode 100644 index 000000000..1e10a5c09 --- /dev/null +++ b/ktor-http/ktor-http-cio/js/src/io/ktor/http/cio/websocket/DefaultWebSocketSessionJs.kt @@ -0,0 +1,23 @@ +package io.ktor.http.cio.websocket + +import kotlinx.coroutines.* + +/** + * Default websocket session with ping-pong and timeout processing and built-in [closeReason] population + */ +actual interface DefaultWebSocketSession : WebSocketSession { + /** + * A close reason for this session. It could be `null` if a session is terminated with no close reason + * (for example due to connection failure). + */ + actual val closeReason: Deferred +} + +/** + * Create [DefaultWebSocketSession] from session. + */ +actual fun DefaultWebSocketSession( + session: WebSocketSession, + pingInterval: Long, + timeoutMillis: Long +): DefaultWebSocketSession = error("There is no CIO js websocket implementation. Consider using platform default.") diff --git a/ktor-http/ktor-http-cio/js/src/io/ktor/http/cio/websocket/FrameJs.kt b/ktor-http/ktor-http-cio/js/src/io/ktor/http/cio/websocket/FrameJs.kt new file mode 100644 index 000000000..616aef23d --- /dev/null +++ b/ktor-http/ktor-http-cio/js/src/io/ktor/http/cio/websocket/FrameJs.kt @@ -0,0 +1,102 @@ +package io.ktor.http.cio.websocket + +import kotlinx.coroutines.* +import kotlinx.io.core.* +import kotlinx.io.core.ByteOrder + +/** + * A frame received or ready to be sent. It is not reusable and not thread-safe + * @property fin is it final fragment, should be always `true` for control frames and if no fragmentation is used + * @property frameType enum value + * @property data - a frame content or fragment content + * @property disposableHandle could be invoked when the frame is processed + */ +actual sealed class Frame private actual constructor( + actual val fin: Boolean, + actual val frameType: FrameType, + actual val data: ByteArray, + actual val disposableHandle: DisposableHandle +) { + /** + * Represents an application level binary frame. + * In a RAW web socket session a big text frame could be fragmented + * (separated into several text frames so they have [fin] = false except the last one). + * Note that usually there is no need to handle fragments unless you have a RAW web socket session. + */ + actual class Binary actual constructor(fin: Boolean, data: ByteArray) : Frame(fin, FrameType.BINARY, data) { + actual constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readBytes()) + } + + /** + * Represents an application level text frame. + * In a RAW web socket session a big text frame could be fragmented + * (separated into several text frames so they have [fin] = false except the last one). + * Please note that a boundary between fragments could be in the middle of multi-byte (unicode) character + * so don't apply String constructor to every fragment but use decoder loop instead of concatenate fragments first. + * Note that usually there is no need to handle fragments unless you have a RAW web socket session. + */ + actual class Text actual constructor(fin: Boolean, data: ByteArray) : Frame(fin, FrameType.TEXT, data) { + actual constructor(text: String) : this(true, text.toByteArray()) + actual constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readBytes()) + } + + /** + * Represents a low-level level close frame. It could be sent to indicate web socket session end. + * Usually there is no need to send/handle it unless you have a RAW web socket session. + */ + actual class Close actual constructor(data: ByteArray) : Frame(true, FrameType.CLOSE, data) { + actual constructor(reason: CloseReason) : this(buildPacket { + byteOrder = ByteOrder.BIG_ENDIAN + writeShort(reason.code) + writeStringUtf8(reason.message) + }) + + actual constructor(packet: ByteReadPacket) : this(packet.readBytes()) + actual constructor() : this(Empty) + } + + /** + * Represents a low-level ping frame. Could be sent to test connection (peer should reply with [Pong]). + * Usually there is no need to send/handle it unless you have a RAW web socket session. + */ + actual class Ping actual constructor(data: ByteArray) : Frame(true, FrameType.PING, data) { + actual constructor(packet: ByteReadPacket) : this(packet.readBytes()) + } + + /** + * Represents a low-level pong frame. Should be sent in reply to a [Ping] frame. + * Usually there is no need to send/handle it unless you have a RAW web socket session. + */ + actual class Pong actual constructor( + data: ByteArray, + disposableHandle: DisposableHandle + ) : Frame(true, FrameType.PONG, data, disposableHandle) { + actual constructor(packet: ByteReadPacket) : this(packet.readBytes()) + } + + override fun toString() = "Frame $frameType (fin=$fin, buffer len = ${data.size})" + + /** + * Creates a frame copy + */ + actual fun copy(): Frame = byType(fin, frameType, data.copyOf()) + + actual companion object { + private val Empty: ByteArray = ByteArray(0) + + /** + * Create a particular [Frame] instance by frame type + */ + actual fun byType( + fin: Boolean, + frameType: FrameType, + data: ByteArray + ): Frame = when (frameType) { + FrameType.BINARY -> Binary(fin, data) + FrameType.TEXT -> Text(fin, data) + FrameType.CLOSE -> Close(data) + FrameType.PING -> Ping(data) + FrameType.PONG -> Pong(data) + } + } +} diff --git a/ktor-http/ktor-http-cio/js/src/io/ktor/http/cio/websocket/WebSocketSession.kt b/ktor-http/ktor-http-cio/js/src/io/ktor/http/cio/websocket/WebSocketSession.kt new file mode 100644 index 000000000..bb96896fd --- /dev/null +++ b/ktor-http/ktor-http-cio/js/src/io/ktor/http/cio/websocket/WebSocketSession.kt @@ -0,0 +1,48 @@ +package io.ktor.http.cio.websocket + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* + +/** + * Represents a web socket session between two peers + */ +actual interface WebSocketSession : CoroutineScope { + /** + * Incoming frames channel + */ + actual val incoming: ReceiveChannel + /** + * Outgoing frames channel. It could have limited capacity so sending too much frames may lead to suspension at + * corresponding send invocations. It also may suspend if a peer doesn't read frames for some reason. + */ + actual val outgoing: SendChannel + + /** + * Enqueue frame, may suspend if outgoing queue is full. May throw an exception if outgoing channel is already + * closed so it is impossible to transfer any message. Frames that were sent after close frame could be silently + * ignored. Please note that close frame could be sent automatically in reply to a peer close frame unless it is + * raw websocket session. + */ + @Suppress("ACTUAL_WITHOUT_EXPECT") + suspend fun send(frame: Frame) { + outgoing.send(frame) + } + + /** + * Flush all outstanding messages and suspend until all earlier sent messages will be written. Could be called + * at any time even after close. May return immediately if the connection is already terminated. + * However it may also fail with an exception (or cancellation) at any point due to session failure. + * Please note that [flush] doesn't guarantee that frames were actually delivered. + */ + actual suspend fun flush() + + /** + * Initiate connection termination immediately. Termination may complete asynchronously. + */ + actual fun terminate() + + /** + * Close session with the specified [cause] or with no reason if `null` + */ + actual suspend fun close(cause: Throwable?) +} diff --git a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/DefaultWebSocketSession.kt b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/DefaultWebSocketSession.kt index aef1a4fbe..bc7eff8a1 100644 --- a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/DefaultWebSocketSession.kt +++ b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/DefaultWebSocketSession.kt @@ -2,24 +2,34 @@ package io.ktor.http.cio.websocket import kotlinx.coroutines.* +/** + * Create [DefaultWebSocketSession] from session. + */ +@UseExperimental(WebSocketInternalAPI::class) +actual fun DefaultWebSocketSession( + session: WebSocketSession, + pingInterval: Long, + timeoutMillis: Long +): DefaultWebSocketSession = DefaultWebSocketSessionImpl( + session, pingInterval, timeoutMillis +) + /** * Default websocket session with ping-pong and timeout processing and built-in [closeReason] population */ -interface DefaultWebSocketSession : WebSocketSession { +actual interface DefaultWebSocketSession : WebSocketSession { /** * Ping interval or `-1L` to disable pinger. Please note that pongs will be handled despite of this setting. */ var pingIntervalMillis: Long - /** * A timeout to wait for pong reply to ping otherwise the session will be terminated immediately. * It doesn't have any effect if [pingIntervalMillis] is `-1` (pinger is disabled). */ var timeoutMillis: Long - /** * A close reason for this session. It could be `null` if a session is terminated with no close reason * (for example due to connection failure). */ - val closeReason: Deferred + actual val closeReason: Deferred } diff --git a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/DefaultWebSocketSessionImpl.kt b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/DefaultWebSocketSessionImpl.kt index e5b35d36a..eb610a6d2 100644 --- a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/DefaultWebSocketSessionImpl.kt +++ b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/DefaultWebSocketSessionImpl.kt @@ -2,12 +2,12 @@ package io.ktor.http.cio.websocket import io.ktor.util.* import io.ktor.util.cio.* +import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.io.core.* import kotlinx.io.pool.* import java.nio.* -import java.util.concurrent.atomic.* private val IncomingProcessorCoroutineName = CoroutineName("ws-incoming-processor") private val OutgoingProcessorCoroutineName = CoroutineName("ws-outgoing-processor") @@ -23,11 +23,11 @@ class DefaultWebSocketSessionImpl( private val pool: ObjectPool = KtorDefaultPool ) : DefaultWebSocketSession, WebSocketSession by raw { - private val pinger = AtomicReference?>(null) + private val pinger = atomic?>(null) private val closeReasonRef = CompletableDeferred() private val filtered = Channel(8) private val outgoingToBeProcessed = Channel(8) - private val closed: AtomicBoolean = AtomicBoolean(false) + private val closed: AtomicBoolean = atomic(false) override val incoming: ReceiveChannel get() = filtered override val outgoing: SendChannel get() = outgoingToBeProcessed @@ -79,7 +79,7 @@ class DefaultWebSocketSessionImpl( sendCloseSequence(frame.readReason()) return@launch } - is Frame.Pong -> pinger.get()?.send(frame) + is Frame.Pong -> pinger.value?.send(frame) is Frame.Ping -> ponger.send(frame) else -> { if (!frame.fin) { @@ -140,7 +140,7 @@ class DefaultWebSocketSessionImpl( val reasonToSend = reason ?: CloseReason(CloseReason.Codes.NORMAL, "") try { runOrCancelPinger() - send(Frame.Close(reasonToSend)) + raw.outgoing.send(Frame.Close(reasonToSend)) } finally { closeReasonRef.complete(reasonToSend) } @@ -148,8 +148,9 @@ class DefaultWebSocketSessionImpl( private fun runOrCancelPinger() { val interval = pingIntervalMillis + val newPinger: SendChannel? = when { - closed.get() -> null + closed.value -> null interval >= 0L -> pinger(raw.outgoing, interval, timeoutMillis, pool) else -> null } @@ -161,13 +162,13 @@ class DefaultWebSocketSessionImpl( newPinger?.offer(EmptyPong) // it is safe here to send dummy pong because pinger will ignore it - if (closed.get() && newPinger != null) { + if (closed.value && newPinger != null) { runOrCancelPinger() } } companion object { - private val EmptyPong = Frame.Pong(ByteBuffer.allocate(0)) + private val EmptyPong = Frame.Pong(ByteArray(0)) } } diff --git a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/Frame.kt b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/Frame.kt index 301b9b77a..9be39f8d5 100644 --- a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/Frame.kt +++ b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/Frame.kt @@ -6,59 +6,23 @@ import kotlinx.io.core.* import kotlinx.io.core.ByteOrder import java.nio.* -/** - * Frame types enum - * @property controlFrame if this is control frame type - * @property opcode - frame type id that is used to transport it - */ -enum class FrameType (val controlFrame: Boolean, val opcode: Int) { - /** - * Regular application level text frame - */ - TEXT(false, 1), - - /** - * Regular application level binary frame - */ - BINARY(false, 2), - - /** - * Low level close frame type - */ - CLOSE(true, 8), - - /** - * Low level ping frame type - */ - PING(true, 9), - - /** - * Low level pong frame type - */ - PONG(true, 0xa); - - companion object { - private val maxOpcode = values().maxBy { it.opcode }!!.opcode - - private val byOpcodeArray = Array(maxOpcode + 1) { op -> values().singleOrNull { it.opcode == op } } - - /** - * Find [FrameType] instance by numeric [opcode] - * @return a [FrameType] instance or `null` of the [opcode] value is not valid - */ - operator fun get(opcode: Int): FrameType? = if (opcode in 0..maxOpcode) byOpcodeArray[opcode] else null - } -} - /** * A frame received or ready to be sent. It is not reusable and not thread-safe * @property fin is it final fragment, should be always `true` for control frames and if no fragmentation is used * @property frameType enum value - * @property buffer - a frame content or fragment content + * @property data - a frame content or fragment content * @property disposableHandle could be invoked when the frame is processed */ -sealed class Frame(val fin: Boolean, val frameType: FrameType, val buffer: ByteBuffer, val disposableHandle: DisposableHandle = NonDisposableHandle) { - private val initialSize = buffer.remaining() +actual sealed class Frame private actual constructor( + actual val fin: Boolean, + actual val frameType: FrameType, + actual val data: ByteArray, + actual val disposableHandle: DisposableHandle +) { + /** + * Frame content + */ + val buffer: ByteBuffer = ByteBuffer.wrap(data) /** * Represents an application level binary frame. @@ -66,8 +30,10 @@ sealed class Frame(val fin: Boolean, val frameType: FrameType, val buffer: ByteB * (separated into several text frames so they have [fin] = false except the last one). * Note that usually there is no need to handle fragments unless you have a RAW web socket session. */ - class Binary(fin: Boolean, buffer: ByteBuffer) : Frame(fin, FrameType.BINARY, buffer) { - constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readByteBuffer()) + actual class Binary actual constructor(fin: Boolean, data: ByteArray) : Frame(fin, FrameType.BINARY, data) { + constructor(fin: Boolean, buffer: ByteBuffer) : this(fin, buffer.moveToByteArray()) + + actual constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readBytes()) } /** @@ -78,98 +44,100 @@ sealed class Frame(val fin: Boolean, val frameType: FrameType, val buffer: ByteB * so don't apply String constructor to every fragment but use decoder loop instead of concatenate fragments first. * Note that usually there is no need to handle fragments unless you have a RAW web socket session. */ - class Text(fin: Boolean, buffer: ByteBuffer) : Frame(fin, FrameType.TEXT, buffer) { - constructor(text: String) : this(true, ByteBuffer.wrap(text.toByteArray(Charsets.UTF_8))) - constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readByteBuffer()) + actual class Text actual constructor(fin: Boolean, data: ByteArray) : Frame(fin, FrameType.TEXT, data) { + actual constructor(text: String) : this(true, text.toByteArray()) + actual constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readBytes()) + constructor(fin: Boolean, buffer: ByteBuffer) : this(fin, buffer.moveToByteArray()) } /** * Represents a low-level level close frame. It could be sent to indicate web socket session end. * Usually there is no need to send/handle it unless you have a RAW web socket session. */ - class Close(buffer: ByteBuffer) : Frame(true, FrameType.CLOSE, buffer) { - constructor(reason: CloseReason) : this(buildPacket { + actual class Close actual constructor(data: ByteArray) : Frame(true, FrameType.CLOSE, data) { + actual constructor(reason: CloseReason) : this(buildPacket { byteOrder = ByteOrder.BIG_ENDIAN writeShort(reason.code) writeStringUtf8(reason.message) }) - constructor(packet: ByteReadPacket) : this(packet.readByteBuffer()) - constructor() : this(Empty) + + actual constructor(packet: ByteReadPacket) : this(packet.readBytes()) + actual constructor() : this(Empty) + + constructor(buffer: ByteBuffer) : this(buffer.moveToByteArray()) } /** * Represents a low-level ping frame. Could be sent to test connection (peer should reply with [Pong]). * Usually there is no need to send/handle it unless you have a RAW web socket session. */ - class Ping(buffer: ByteBuffer) : Frame(true, FrameType.PING, buffer) { - constructor(packet: ByteReadPacket) : this(packet.readByteBuffer()) + actual class Ping actual constructor(data: ByteArray) : Frame(true, FrameType.PING, data) { + actual constructor(packet: ByteReadPacket) : this(packet.readBytes()) + constructor(buffer: ByteBuffer) : this(buffer.moveToByteArray()) } /** * Represents a low-level pong frame. Should be sent in reply to a [Ping] frame. * Usually there is no need to send/handle it unless you have a RAW web socket session. */ - class Pong(buffer: ByteBuffer, disposableHandle: DisposableHandle) : Frame(true, FrameType.PONG, buffer, disposableHandle) { - constructor(buffer: ByteBuffer) : this(buffer, NonDisposableHandle) - constructor(packet: ByteReadPacket) : this(packet.readByteBuffer()) + actual class Pong actual constructor( + data: ByteArray, + disposableHandle: DisposableHandle + ) : Frame(true, FrameType.PONG, data, disposableHandle) { + actual constructor(packet: ByteReadPacket) : this(packet.readBytes()) + constructor( + buffer: ByteBuffer, + disposableHandle: DisposableHandle = NonDisposableHandle + ) : this(buffer.moveToByteArray(), disposableHandle) + + constructor(buffer: ByteBuffer) : this(buffer.moveToByteArray(), NonDisposableHandle) } - override fun toString() = "Frame $frameType (fin=$fin, buffer len = $initialSize)" + override fun toString() = "Frame $frameType (fin=$fin, buffer len = ${data.size})" /** * Creates a frame copy */ - fun copy() = byType(fin, frameType, ByteBuffer.allocate(buffer.remaining()).apply { buffer.slice().moveTo(this); clear() }) + actual fun copy(): Frame = byType(fin, frameType, data.copyOf()) - companion object { - private val Empty = ByteBuffer.allocate(0) + actual companion object { + private val Empty: ByteArray = ByteArray(0) /** * Create a particular [Frame] instance by frame type */ - fun byType(fin: Boolean, frameType: FrameType, buffer: ByteBuffer): Frame = when (frameType) { - FrameType.BINARY -> Binary(fin, buffer) - FrameType.TEXT -> Text(fin, buffer) - FrameType.CLOSE -> Close(buffer) - FrameType.PING -> Ping(buffer) - FrameType.PONG -> Pong(buffer) + actual fun byType( + fin: Boolean, + frameType: FrameType, + data: ByteArray + ): Frame = when (frameType) { + FrameType.BINARY -> Binary(fin, data) + FrameType.TEXT -> Text(fin, data) + FrameType.CLOSE -> Close(data) + FrameType.PING -> Ping(data) + FrameType.PONG -> Pong(data) } + + /** + * Create a particular [Frame] instance by frame type + */ + fun byType(fin: Boolean, frameType: FrameType, buffer: ByteBuffer): Frame = + byType(fin, frameType, buffer.moveToByteArray()) } } -/** - * Read text content from text frame. Shouldn't be used for fragmented frames: such frames need to be reassembled first - */ -fun Frame.Text.readText(): String { - require(fin) { "Text could be only extracted from non-fragmented frame" } - return Charsets.UTF_8.decode(buffer.duplicate()).toString() -} - -/** - * Read binary content from a frame. For fragmented frames only returns this fragment. - */ -fun Frame.readBytes(): ByteArray { - return buffer.duplicate().moveToByteArray() -} - /** * Read close reason from close frame or null if no close reason provided */ fun Frame.Close.readReason(): CloseReason? { - if (buffer.remaining() < 2) { + if (data.size < 2) { return null } buffer.mark() val code = buffer.getShort() val message = buffer.decodeString(Charsets.UTF_8) - buffer.reset() return CloseReason(code, message) } - -private object NonDisposableHandle : DisposableHandle { - override fun dispose() {} - override fun toString(): String = "NonDisposableHandle" -} diff --git a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/RawWebSocket.kt b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/RawWebSocket.kt index 50d99b626..e6abfa050 100644 --- a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/RawWebSocket.kt +++ b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/RawWebSocket.kt @@ -47,7 +47,7 @@ class RawWebSocket( } override suspend fun close(cause: Throwable?) { - terminate() + cause?.let { socketJob.completeExceptionally(it) } ?: terminate() } } diff --git a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/Utils.kt b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/Utils.kt deleted file mode 100644 index 6285fd8ef..000000000 --- a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/Utils.kt +++ /dev/null @@ -1,19 +0,0 @@ -package io.ktor.http.cio.websocket - -import java.nio.* - -@Suppress("NOTHING_TO_INLINE") -private inline infix fun Byte.xor(other: Byte) = toInt().xor(other.toInt()).toByte() - -internal fun ByteBuffer.xor(other: ByteBuffer) { - val bb = slice() - val mask = other.slice() - val maskSize = mask.remaining() - - for (i in 0 .. bb.remaining() - 1) { - bb.put(i, bb.get(i) xor mask[i % maskSize]) - } -} - -@Suppress("NOTHING_TO_INLINE") -internal inline fun Boolean.flagAt(at: Int) = if (this) 1 shl at else 0 diff --git a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/UtilsJvm.kt b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/UtilsJvm.kt new file mode 100644 index 000000000..55a1db40b --- /dev/null +++ b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/UtilsJvm.kt @@ -0,0 +1,17 @@ +@file:kotlin.jvm.JvmMultifileClass +@file:kotlin.jvm.JvmName("UtilsKt") + +package io.ktor.http.cio.websocket + +import java.nio.* + +internal fun ByteBuffer.xor(other: ByteBuffer) { + val bb = slice() + val mask = other.slice() + val maskSize = mask.remaining() + + for (i in 0 until bb.remaining()) { + bb.put(i, bb.get(i) xor mask[i % maskSize]) + } +} + diff --git a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/WebSocketReader.kt b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/WebSocketReader.kt index 381bb7885..023fe914c 100644 --- a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/WebSocketReader.kt +++ b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/WebSocketReader.kt @@ -5,7 +5,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.io.* import kotlinx.io.pool.* -import java.nio.ByteBuffer +import java.nio.* import java.nio.channels.* import java.util.concurrent.CancellationException import kotlin.coroutines.* @@ -18,10 +18,10 @@ import kotlin.coroutines.* */ @WebSocketInternalAPI class WebSocketReader( - private val byteChannel: ByteReadChannel, - override val coroutineContext: CoroutineContext, - var maxFrameSize: Long, - pool: ObjectPool = KtorDefaultPool + private val byteChannel: ByteReadChannel, + override val coroutineContext: CoroutineContext, + var maxFrameSize: Long, + pool: ObjectPool = KtorDefaultPool ) : CoroutineScope { @Suppress("UNUSED_PARAMETER") @@ -30,9 +30,10 @@ class WebSocketReader( replaceWith = ReplaceWith("WebSocketReader(byteChannel, coroutineContext, maxFrameSize, pool)"), level = DeprecationLevel.ERROR ) - constructor(byteChannel: ByteReadChannel, maxFrameSize: Long, - parent: Job?, coroutineContext: CoroutineContext, pool: ObjectPool = KtorDefaultPool) - : this(byteChannel, coroutineContext, maxFrameSize, pool) + constructor( + byteChannel: ByteReadChannel, maxFrameSize: Long, + parent: Job?, coroutineContext: CoroutineContext, pool: ObjectPool = KtorDefaultPool + ) : this(byteChannel, coroutineContext, maxFrameSize, pool) private var state = State.HEADER private val frameParser = FrameParser() diff --git a/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/WebSocketSessionJvm.kt b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/WebSocketSessionJvm.kt new file mode 100644 index 000000000..473ebc53d --- /dev/null +++ b/ktor-http/ktor-http-cio/jvm/src/io/ktor/http/cio/websocket/WebSocketSessionJvm.kt @@ -0,0 +1,60 @@ +package io.ktor.http.cio.websocket + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* + +/** + * Represents a web socket session between two peers + */ +actual interface WebSocketSession : CoroutineScope { + /** + * Enable or disable masking output messages by a random xor mask. + * Please note that changing this flag on the fly could be applied to the messages already sent (enqueued earlier) + * as the sending pipeline works asynchronously + */ + var masking: Boolean + + /** + * Specifies frame size limit. Connection will be closed if violated + */ + var maxFrameSize: Long + /** + * Incoming frames channel + */ + actual val incoming: ReceiveChannel + /** + * Outgoing frames channel. It could have limited capacity so sending too much frames may lead to suspension at + * corresponding send invocations. It also may suspend if a peer doesn't read frames for some reason. + */ + actual val outgoing: SendChannel + + /** + * Enqueue frame, may suspend if outgoing queue is full. May throw an exception if outgoing channel is already + * closed so it is impossible to transfer any message. Frames that were sent after close frame could be silently + * ignored. Please note that close frame could be sent automatically in reply to a peer close frame unless it is + * raw websocket session. + */ + @Suppress("ACTUAL_WITHOUT_EXPECT") + suspend fun send(frame: Frame) { + outgoing.send(frame) + } + + /** + * Flush all outstanding messages and suspend until all earlier sent messages will be written. Could be called + * at any time even after close. May return immediately if the connection is already terminated. + * However it may also fail with an exception (or cancellation) at any point due to session failure. + * Please note that [flush] doesn't guarantee that frames were actually delivered. + */ + actual suspend fun flush() + + /** + * Initiate connection termination immediately. Termination may complete asynchronously. + */ + actual fun terminate() + + /** + * Close session with the specified [cause] or with no reason if `null` + */ + @Suppress("ACTUAL_FUNCTION_WITH_DEFAULT_ARGUMENTS") + actual suspend fun close(cause: Throwable? = null) +} diff --git a/ktor-http/ktor-http-cio/posix/src/io/ktor/http/cio/websocket/DefaultWebSocketSessionNative.kt b/ktor-http/ktor-http-cio/posix/src/io/ktor/http/cio/websocket/DefaultWebSocketSessionNative.kt new file mode 100644 index 000000000..b338fcae6 --- /dev/null +++ b/ktor-http/ktor-http-cio/posix/src/io/ktor/http/cio/websocket/DefaultWebSocketSessionNative.kt @@ -0,0 +1,23 @@ +package io.ktor.http.cio.websocket + +import kotlinx.coroutines.* + +/** + * Default websocket session with ping-pong and timeout processing and built-in [closeReason] population + */ +actual interface DefaultWebSocketSession : WebSocketSession { + /** + * A close reason for this session. It could be `null` if a session is terminated with no close reason + * (for example due to connection failure). + */ + actual val closeReason: Deferred +} + +/** + * Create [DefaultWebSocketSession] from session. + */ +actual fun DefaultWebSocketSession( + session: WebSocketSession, + pingInterval: Long, + timeoutMillis: Long +): DefaultWebSocketSession = error("There is no CIO native websocket implementation. Consider using platform default.") diff --git a/ktor-http/ktor-http-cio/posix/src/io/ktor/http/cio/websocket/FrameNative.kt b/ktor-http/ktor-http-cio/posix/src/io/ktor/http/cio/websocket/FrameNative.kt new file mode 100644 index 000000000..c0ca0629f --- /dev/null +++ b/ktor-http/ktor-http-cio/posix/src/io/ktor/http/cio/websocket/FrameNative.kt @@ -0,0 +1,103 @@ +package io.ktor.http.cio.websocket + +import kotlinx.coroutines.* +import kotlinx.io.core.* +import kotlinx.io.core.ByteOrder + +/** + * A frame received or ready to be sent. It is not reusable and not thread-safe + * @property fin is it final fragment, should be always `true` for control frames and if no fragmentation is used + * @property frameType enum value + * @property data - a frame content or fragment content + * @property disposableHandle could be invoked when the frame is processed + */ +actual sealed class Frame private actual constructor( + actual val fin: Boolean, + actual val frameType: FrameType, + actual val data: ByteArray, + actual val disposableHandle: DisposableHandle +) { + /** + * Represents an application level binary frame. + * In a RAW web socket session a big text frame could be fragmented + * (separated into several text frames so they have [fin] = false except the last one). + * Note that usually there is no need to handle fragments unless you have a RAW web socket session. + */ + actual class Binary actual constructor(fin: Boolean, data: ByteArray) : Frame(fin, FrameType.BINARY, data) { + actual constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readBytes()) + } + + /** + * Represents an application level text frame. + * In a RAW web socket session a big text frame could be fragmented + * (separated into several text frames so they have [fin] = false except the last one). + * Please note that a boundary between fragments could be in the middle of multi-byte (unicode) character + * so don't apply String constructor to every fragment but use decoder loop instead of concatenate fragments first. + * Note that usually there is no need to handle fragments unless you have a RAW web socket session. + */ + actual class Text actual constructor(fin: Boolean, data: ByteArray) : Frame(fin, FrameType.TEXT, data) { + actual constructor(text: String) : this(true, text.toByteArray()) + actual constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readBytes()) + } + + /** + * Represents a low-level level close frame. It could be sent to indicate web socket session end. + * Usually there is no need to send/handle it unless you have a RAW web socket session. + */ + actual class Close actual constructor(data: ByteArray) : Frame(true, FrameType.CLOSE, data) { + actual constructor(reason: CloseReason) : this(buildPacket { + byteOrder = ByteOrder.BIG_ENDIAN + writeShort(reason.code) + writeStringUtf8(reason.message) + }) + + actual constructor(packet: ByteReadPacket) : this(packet.readBytes()) + actual constructor() : this(Empty) + } + + /** + * Represents a low-level ping frame. Could be sent to test connection (peer should reply with [Pong]). + * Usually there is no need to send/handle it unless you have a RAW web socket session. + */ + actual class Ping actual constructor(data: ByteArray) : Frame(true, FrameType.PING, data) { + actual constructor(packet: ByteReadPacket) : this(packet.readBytes()) + } + + /** + * Represents a low-level pong frame. Should be sent in reply to a [Ping] frame. + * Usually there is no need to send/handle it unless you have a RAW web socket session. + */ + actual class Pong actual constructor( + data: ByteArray, + disposableHandle: DisposableHandle + ) : Frame(true, FrameType.PONG, data, disposableHandle) { + actual constructor(packet: ByteReadPacket) : this(packet.readBytes()) + } + + override fun toString() = "Frame $frameType (fin=$fin, buffer len = ${data.size})" + + /** + * Creates a frame copy + */ + actual fun copy(): Frame = byType(fin, frameType, data.copyOf()) + + actual companion object { + private val Empty: ByteArray = ByteArray(0) + + /** + * Create a particular [Frame] instance by frame type + */ + actual fun byType( + fin: Boolean, + frameType: FrameType, + data: ByteArray + ): Frame = when (frameType) { + FrameType.BINARY -> Binary(fin, data) + FrameType.TEXT -> Text(fin, data) + FrameType.CLOSE -> Close(data) + FrameType.PING -> Ping(data) + FrameType.PONG -> Pong(data) + } + + } +} diff --git a/ktor-http/ktor-http-cio/posix/src/io/ktor/http/cio/websocket/WebSocketSession.kt b/ktor-http/ktor-http-cio/posix/src/io/ktor/http/cio/websocket/WebSocketSession.kt new file mode 100644 index 000000000..886846235 --- /dev/null +++ b/ktor-http/ktor-http-cio/posix/src/io/ktor/http/cio/websocket/WebSocketSession.kt @@ -0,0 +1,49 @@ +package io.ktor.http.cio.websocket + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* + +/** + * Represents a web socket session between two peers + */ +actual interface WebSocketSession : CoroutineScope { + /** + * Incoming frames channel + */ + actual val incoming: ReceiveChannel + + /** + * Outgoing frames channel. It could have limited capacity so sending too much frames may lead to suspension at + * corresponding send invocations. It also may suspend if a peer doesn't read frames for some reason. + */ + actual val outgoing: SendChannel + + /** + * Enqueue frame, may suspend if outgoing queue is full. May throw an exception if outgoing channel is already + * closed so it is impossible to transfer any message. Frames that were sent after close frame could be silently + * ignored. Please note that close frame could be sent automatically in reply to a peer close frame unless it is + * raw websocket session. + */ + @Suppress("ACTUAL_WITHOUT_EXPECT") + suspend fun send(frame: Frame) { + outgoing.send(frame) + } + + /** + * Flush all outstanding messages and suspend until all earlier sent messages will be written. Could be called + * at any time even after close. May return immediately if the connection is already terminated. + * However it may also fail with an exception (or cancellation) at any point due to session failure. + * Please note that [flush] doesn't guarantee that frames were actually delivered. + */ + actual suspend fun flush() + + /** + * Initiate connection termination immediately. Termination may complete asynchronously. + */ + actual fun terminate() + + /** + * Close session with the specified [cause] or with no reason if `null` + */ + actual suspend fun close(cause: Throwable?) +} diff --git a/ktor-utils/jvm/src/io/ktor/util/CryptoJvm.kt b/ktor-utils/jvm/src/io/ktor/util/CryptoJvm.kt index 7046c3d99..08f086c93 100644 --- a/ktor-utils/jvm/src/io/ktor/util/CryptoJvm.kt +++ b/ktor-utils/jvm/src/io/ktor/util/CryptoJvm.kt @@ -24,10 +24,7 @@ private fun getDigest(text: String, algorithm: String, salt: String): ByteArray @KtorExperimentalAPI fun sha1(bytes: ByteArray): ByteArray = MessageDigest.getInstance("SHA1").digest(bytes)!! -@InternalAPI -actual fun Digest(name: String): Digest = object : Digest { - private val delegate = MessageDigest.getInstance(name) - +private inline class DigestImpl(val delegate: MessageDigest) : Digest { override fun plusAssign(bytes: ByteArray) { delegate.update(bytes) }